
| """ 第3课示例代码:并发执行多个任务
运行方式: python 03_examples.py """
import asyncio import time from typing import List, Dict, Set
async def 任务A() -> str: """任务A:需要2秒""" print("🅰️ 任务A开始") await asyncio.sleep(2) print("🅰️ 任务A完成") return "任务A的结果"
async def 任务B() -> str: """任务B:需要3秒""" print("🅱️ 任务B开始") await asyncio.sleep(3) print("🅱️ 任务B完成") return "任务B的结果"
async def 任务C() -> str: """任务C:需要1秒""" print("🅲 任务C开始") await asyncio.sleep(1) print("🅲 任务C完成") return "任务C的结果"
async def 示例1_gather基础() -> None: """示例1:使用 gather 同时运行多个任务""" print("\n" + "=" * 50) print("📚 示例1:asyncio.gather() 基础用法") print("=" * 50)
开始时间 = time.time()
结果列表 = await asyncio.gather( 任务A(), 任务B(), 任务C() )
总耗时 = time.time() - 开始时间
print(f"\n✅ 所有任务完成!") print(f"结果列表:{结果列表}") print(f"⏱️ 总耗时:{总耗时:.1f}秒(最长的任务是3秒)")
print("\n💡 关键点:") print(" 1. gather 会等待所有任务完成") print(" 2. 返回结果是列表,顺序与传入顺序一致") print(" 3. 总耗时 = 最长任务的时间(而不是总和)")
async def 后台任务(任务名: str, 耗时: int) -> str: """一个后台任务""" print(f"🔧 {任务名}开始执行...") await asyncio.sleep(耗时) print(f"✅ {任务名}执行完成!") return f"{任务名}的结果"
async def 示例2_create_task() -> None: """示例2:使用 create_task 创建后台任务""" print("\n" + "=" * 50) print("📚 示例2:asyncio.create_task() 用法") print("=" * 50)
print("主程序:开始执行")
任务1 = asyncio.create_task(后台任务("后台任务1", 3)) 任务2 = asyncio.create_task(后台任务("后台任务2", 2))
print("主程序:任务已创建,继续做其他事情...")
for i in range(3): await asyncio.sleep(1) print(f"主程序:正在做第{i+1}件事...")
print("主程序:现在等待后台任务完成...")
结果1 = await 任务1 结果2 = await 任务2
print(f"\n✅ 所有任务完成!") print(f"结果1:{结果1}") print(f"结果2:{结果2}")
print("\n💡 关键点:") print(" 1. create_task 创建后立即开始执行") print(" 2. 不会阻塞当前代码") print(" 3. 可以稍后用 await 等待结果")
async def 竞速任务(选手: str, 速度: int) -> str: """竞速任务""" print(f"🏃 {选手}开始跑步...") await asyncio.sleep(速度) print(f"🏁 {选手}到达终点!") return f"{选手}完成"
async def 示例3_wait控制() -> None: """示例3:使用 wait 灵活控制任务""" print("\n" + "=" * 50) print("📚 示例3:asyncio.wait() 灵活控制") print("=" * 50)
任务列表 = [ asyncio.create_task(竞速任务("选手A", 3)), asyncio.create_task(竞速任务("选手B", 2)), asyncio.create_task(竞速任务("选手C", 4)), ]
print("🏁 比赛开始!等待第一个完成...")
完成, 未完成 = await asyncio.wait( 任务列表, return_when=asyncio.FIRST_COMPLETED )
print(f"\n🎉 第一名产生了!") for 任务 in 完成: 结果 = await 任务 print(f"冠军:{结果}")
print(f"\n还有 {len(未完成)} 个选手在跑...")
if 未完成: await asyncio.wait(未完成) print("所有选手都完成了!")
print("\n💡 关键点:") print(" 1. wait 可以灵活控制等待策略") print(" 2. FIRST_COMPLETED:第一个完成就返回") print(" 3. 返回两个集合:完成的和未完成的")
async def 下载文件(文件名: str, 大小MB: int) -> Dict[str, any]: """模拟下载文件""" print(f"📥 开始下载:{文件名} ({大小}MB)")
开始时间 = time.time()
下载时间 = 大小 * 0.3
步骤数 = 5 for 步骤 in range(1, 步骤数 + 1): await asyncio.sleep(下载时间 / 步骤数) 进度 = (步骤 / 步骤数) * 100 print(f" {文件名}: {进度:.0f}%")
耗时 = time.time() - 开始时间
print(f"✅ 下载完成:{文件名} (耗时{耗时:.1f}秒)")
return { "文件名": 文件名, "大小": 大小, "耗时": 耗时 }
async def 批量下载_同步方式() -> None: """同步方式:一个一个下载""" print("\n" + "=" * 50) print("📚 示例4A:批量下载 - 同步方式") print("=" * 50)
文件列表 = [ ("Python教程.pdf", 5), ("数据分析.xlsx", 3), ("项目代码.zip", 7), ("演示视频.mp4", 10), ]
总开始时间 = time.time() 结果列表 = []
for 文件名, 大小 in 文件列表: 结果 = await 下载文件(文件名, 大小) 结果列表.append(结果)
总耗时 = time.time() - 总开始时间
print("\n📊 下载统计") 总大小 = sum(r["大小"] for r in 结果列表) print(f"文件数量:{len(结果列表)}个") print(f"总大小:{总大小}MB") print(f"总耗时:{总耗时:.1f}秒")
async def 批量下载_并发方式() -> None: """并发方式:同时下载""" print("\n" + "=" * 50) print("📚 示例4B:批量下载 - 并发方式") print("=" * 50)
文件列表 = [ ("Python教程.pdf", 5), ("数据分析.xlsx", 3), ("项目代码.zip", 7), ("演示视频.mp4", 10), ]
总开始时间 = time.time()
任务列表 = [下载文件(文件名, 大小) for 文件名, 大小 in 文件列表] 结果列表 = await asyncio.gather(*任务列表)
总耗时 = time.time() - 总开始时间
print("\n📊 下载统计") 总大小 = sum(r["大小"] for r in 结果列表) print(f"文件数量:{len(结果列表)}个") print(f"总大小:{总大小}MB") print(f"总耗时:{总耗时:.1f}秒") print(f"💡 效率提升:{7.5/总耗时:.1f}倍!")
async def 受限下载(文件名: str, 大小: int, 信号量: asyncio.Semaphore) -> str: """使用信号量限制并发的下载""" async with 信号量: print(f"📥 开始下载:{文件名}") await asyncio.sleep(大小 * 0.5) print(f"✅ 完成下载:{文件名}") return 文件名
async def 示例5_限制并发() -> None: """示例5:使用 Semaphore 限制并发数量""" print("\n" + "=" * 50) print("📚 示例5:限制并发数量(Semaphore)") print("=" * 50)
信号量 = asyncio.Semaphore(2)
文件列表 = [ ("文件1", 2), ("文件2", 2), ("文件3", 2), ("文件4", 2), ("文件5", 2), ]
print("🚀 开始下载(最多同时2个)...") 开始时间 = time.time()
任务列表 = [受限下载(名称, 大小, 信号量) for 名称, 大小 in 文件列表] 结果 = await asyncio.gather(*任务列表)
总耗时 = time.time() - 开始时间
print(f"\n✅ 全部完成!总耗时:{总耗时:.1f}秒") print(f"💡 观察:每次最多2个文件在下载")
async def 可能很慢的任务(任务名: str, 耗时: int) -> str: """一个可能很慢的任务""" print(f"🐌 {任务名}开始(需要{耗时}秒)...") await asyncio.sleep(耗时) print(f"✅ {任务名}完成!") return f"{任务名}的结果"
async def 示例6_超时控制() -> None: """示例6:使用 wait_for 控制超时""" print("\n" + "=" * 50) print("📚 示例6:超时控制(wait_for)") print("=" * 50)
print("\n测试1:任务会超时") try: 结果 = await asyncio.wait_for( 可能很慢的任务("慢任务", 5), timeout=2 ) print(f"结果:{结果}") except asyncio.TimeoutError: print("❌ 任务超时了!")
print("\n测试2:任务不会超时") try: 结果 = await asyncio.wait_for( 可能很慢的任务("快任务", 1), timeout=3 ) print(f"结果:{结果}") except asyncio.TimeoutError: print("❌ 任务超时了!")
print("\n💡 关键点:") print(" 1. wait_for 可以设置超时时间") print(" 2. 超时会抛出 TimeoutError 异常") print(" 3. 可以用 try-except 捕获")
async def 长时间任务() -> None: """一个长时间运行的任务""" try: print("🔧 长任务开始...") for i in range(10): await asyncio.sleep(1) print(f" 进度:{(i+1)*10}%") print("✅ 长任务完成!") except asyncio.CancelledError: print("❌ 任务被取消了!") raise
async def 示例7_任务取消() -> None: """示例7:取消任务""" print("\n" + "=" * 50) print("📚 示例7:任务取消") print("=" * 50)
任务 = asyncio.create_task(长时间任务())
await asyncio.sleep(3)
print("\n⚠️ 主程序决定取消任务...") 任务.cancel()
try: await 任务 except asyncio.CancelledError: print("✅ 确认任务已取消")
print("\n💡 关键点:") print(" 1. 使用 task.cancel() 取消任务") print(" 2. 任务会收到 CancelledError 异常") print(" 3. 可以在任务中捕获并清理资源")
async def main() -> None: """主程序:运行所有示例""" print("🎓 第3课:并发执行多个任务") print("=" * 50)
await 示例1_gather基础() await 示例2_create_task() await 示例3_wait控制() await 批量下载_同步方式() await 批量下载_并发方式() await 示例5_限制并发() await 示例6_超时控制() await 示例7_任务取消()
print("\n" + "=" * 50) print("🎉 第3课完成!") print("=" * 50) print(""" 📚 你学到了什么? 1. asyncio.gather() - 同时运行多个任务 2. asyncio.create_task() - 创建后台任务 3. asyncio.wait() - 灵活控制等待策略 4. asyncio.Semaphore() - 限制并发数量 5. asyncio.wait_for() - 超时控制 6. task.cancel() - 取消任务 🎯 使用场景: - gather:批量下载、批量API调用 - create_task:后台任务、不阻塞主流程 - wait:竞速场景、部分完成即可 - Semaphore:API限流、资源限制 - wait_for:防止任务卡死 - cancel:用户取消、超时处理 💪 动手练习: 1. 修改并发数量,观察效果 2. 尝试不同的等待策略 3. 实现带重试的下载器 4. 完成课后练习题 🎯 下一步: 学习异步网络请求实战(第4课) """)
if __name__ == "__main__": asyncio.run(main())
|