06_陷阱和最佳实践

异步编程虽然强大,但也容易踩坑。本课总结7个最常见的陷阱(忘记await、阻塞事件循环、不处理异常等)以及对应的解决方案。同时学习性能优化技巧、调试方法、何时该用/不该用异步。通过大量错误示例和正确示例的对比,让你避免走弯路,直接掌握生产环境级别的最佳实践,写出高质量的异步代码。


📖 课程目标

  • 了解异步编程的常见陷阱
  • 掌握调试异步代码的技巧
  • 学习性能优化方法
  • 理解何时使用/不使用异步
  • 掌握生产环境最佳实践

⚠️ 常见陷阱

陷阱1:忘记使用 await

1
2
3
4
5
6
7
8
9
# ❌ 错误:忘记 await
async def 错误示例():
结果 = 异步函数() # 只得到协程对象,不会执行!
print(结果) # <coroutine object ...>

# ✅ 正确:使用 await
async def 正确示例():
结果 = await 异步函数() # 真正执行
print(结果) # 实际结果

后果

  • 代码不会执行
  • 可能导致资源泄漏
  • 难以发现的bug

解决方案

  • 调用异步函数时,总是使用 await
  • 或者使用 asyncio.create_task() 创建任务

陷阱2:在同步函数中使用 await

1
2
3
4
5
6
7
8
9
# ❌ 错误:在普通函数中使用 await
def 错误示例():
结果 = await 异步函数() # 语法错误!
return 结果

# ✅ 正确:在异步函数中使用 await
async def 正确示例():
结果 = await 异步函数() # 正确
return 结果

后果

  • 语法错误,代码无法运行

解决方案

  • 只在 async def 定义的函数中使用 await
  • 如果需要在同步代码中调用异步函数,使用 asyncio.run()

陷阱3:阻塞事件循环

1
2
3
4
5
6
7
8
9
10
# ❌ 错误:使用阻塞操作
async def 错误示例():
import time
time.sleep(5) # 阻塞整个事件循环!
return "完成"

# ✅ 正确:使用异步操作
async def 正确示例():
await asyncio.sleep(5) # 不阻塞事件循环
return "完成"

常见阻塞操作

  • time.sleep() → 用 asyncio.sleep()
  • requests.get() → 用 aiohttp
  • open() → 用 aiofiles.open()
  • CPU密集型计算 → 用 asyncio.to_thread() 或多进程

后果

  • 所有异步任务都会被阻塞
  • 失去异步的优势
  • 性能大幅下降

陷阱4:不处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ❌ 错误:不处理异常
async def 错误示例():
结果 = await asyncio.gather(
可能失败的任务1(),
可能失败的任务2()
)
# 如果任何一个失败,整个程序崩溃

# ✅ 正确:捕获异常
async def 正确示例():
结果 = await asyncio.gather(
可能失败的任务1(),
可能失败的任务2(),
return_exceptions=True # 返回异常而不是抛出
)

forin 结果:
if isinstance(项, Exception):
print(f"任务失败:{项}")
else:
print(f"任务成功:{项}")

陷阱5:创建过多任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ❌ 错误:同时创建10000个任务
async def 错误示例():
url列表 = [f"https://api.com/{i}" for i in range(10000)]

# 同时发起10000个请求!服务器可能会封禁你
任务列表 = [请求(url) for url in url列表]
await asyncio.gather(*任务列表)

# ✅ 正确:限制并发数量
async def 正确示例():
url列表 = [f"https://api.com/{i}" for i in range(10000)]

# 最多同时10个请求
信号量 = asyncio.Semaphore(10)

async def 受限请求(url):
async with 信号量:
return await 请求(url)

任务列表 = [受限请求(url) for url in url列表]
await asyncio.gather(*任务列表)

陷阱6:不复用 Session

1
2
3
4
5
6
7
8
9
10
11
12
13
# ❌ 错误:每次请求都创建新session
async def 错误示例():
for url in url列表:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
pass # 浪费资源!

# ✅ 正确:复用session
async def 正确示例():
async with aiohttp.ClientSession() as session:
for url in url列表:
async with session.get(url) as response:
pass # 高效!

陷阱7:忘记关闭资源

1
2
3
4
5
6
7
8
9
10
11
# ❌ 错误:不关闭资源
async def 错误示例():
f = await aiofiles.open('file.txt', 'r')
内容 = await f.read()
# 忘记关闭文件!内存泄漏

# ✅ 正确:使用 async with
async def 正确示例():
async with aiofiles.open('file.txt', 'r') as f:
内容 = await f.read()
# 自动关闭

🎯 何时使用异步?何时不用?

✅ 适合使用异步的场景

场景原因效率提升
网络请求I/O等待时间长10-100倍 ⚡
文件操作磁盘I/O较慢5-20倍 ⚡
数据库查询等待数据库响应10-50倍 ⚡
WebSocket需要保持连接必须用
微服务调用多个API并发调用10-100倍 ⚡

❌ 不适合使用异步的场景

场景原因建议
CPU密集型没有I/O等待用多进程
简单脚本增加复杂度用同步代码
顺序依赖必须按顺序执行用同步代码
科学计算纯计算任务用NumPy/多进程

判断标准

1
2
3
4
5
6
7
8
9
10
11
# 问自己:代码中有很多"等待"吗?
#
# 有等待(I/O密集型)→ 用异步 ✅
# - 网络请求
# - 文件读写
# - 数据库操作
#
# 没等待(CPU密集型)→ 不用异步 ❌
# - 复杂计算
# - 图像处理
# - 数据加密

🔍 调试技巧

技巧1:启用调试模式

1
2
3
4
5
6
7
8
import asyncio

# 启用调试模式
asyncio.run(main(), debug=True)

# 或者
import logging
logging.basicConfig(level=logging.DEBUG)

技巧2:打印协程状态

1
2
3
4
5
6
7
8
9
async def 调试示例():
任务 = asyncio.create_task(异步函数())

print(f"任务状态:{任务.done()}") # False
print(f"任务取消:{任务.cancelled()}") # False

await 任务

print(f"任务状态:{任务.done()}") # True

技巧3:使用 asyncio.wait_for 防止卡死

1
2
3
4
5
6
7
8
async def 安全调用():
try:
结果 = await asyncio.wait_for(
可能卡死的任务(),
timeout=10 # 最多等10秒
)
except asyncio.TimeoutError:
print("任务超时!")

技巧4:记录异常

1
2
3
4
5
6
7
8
9
import traceback

async def 安全执行(任务):
try:
return await 任务
except Exception as e:
print(f"任务失败:{e}")
traceback.print_exc() # 打印完整堆栈
return None

🚀 性能优化

优化1:批量操作

1
2
3
4
5
6
7
8
9
10
11
12
# ❌ 慢:一个一个插入
async def 慢速插入(数据列表):
async with aiosqlite.connect('db.db') as db:
for 数据 in 数据列表:
await db.execute('INSERT INTO ...', 数据)
await db.commit() # 每次都提交

# ✅ 快:批量插入
async def 快速插入(数据列表):
async with aiosqlite.connect('db.db') as db:
await db.executemany('INSERT INTO ...', 数据列表)
await db.commit() # 一次提交

优化2:连接池

1
2
3
4
5
6
7
8
9
# 使用连接池管理数据库连接
class 连接池:
def __init__(self, 最大连接数=10):
self.信号量 = asyncio.Semaphore(最大连接数)

async def 执行(self, sql):
async with self.信号量:
async with aiosqlite.connect('db.db') as db:
return await db.execute(sql)

优化3:缓存结果

1
2
3
4
5
6
7
8
9
10
11
12
from functools import lru_cache

# 缓存异步函数结果
缓存 = {}

async def 带缓存的查询(key):
if key in 缓存:
return 缓存[key]

结果 = await 查询数据库(key)
缓存[key] = 结果
return 结果

优化4:预加载数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ❌ 慢:用时再加载
async def 慢速处理():
for id in id列表:
数据 = await 加载数据(id) # 一个一个加载
处理(数据)

# ✅ 快:预先加载所有数据
async def 快速处理():
# 并发加载所有数据
任务列表 = [加载数据(id) for id in id列表]
数据列表 = await asyncio.gather(*任务列表)

# 处理数据
for 数据 in 数据列表:
处理(数据)

📋 最佳实践清单

代码风格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ✅ 好的命名
async def 获取用户信息(用户ID: int) -> Dict:
pass

# ✅ 添加类型注解
async def 处理数据(数据: List[str]) -> List[int]:
pass

# ✅ 添加文档字符串
async def 复杂函数():
"""
函数说明

Args:
参数说明

Returns:
返回值说明
"""
pass

错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def 健壮的函数():
try:
结果 = await 可能失败的操作()
return 结果

except asyncio.TimeoutError:
print("超时")
return None

except aiohttp.ClientError as e:
print(f"网络错误:{e}")
return None

except Exception as e:
print(f"未知错误:{e}")
return None

finally:
# 清理资源
pass

资源管理

1
2
3
4
5
# ✅ 总是使用 async with
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async with aiofiles.open(文件, 'r') as f:
pass

并发控制

1
2
3
4
5
6
# ✅ 限制并发数量
信号量 = asyncio.Semaphore(10)

async def 受控任务():
async with 信号量:
await 执行任务()

📝 本课小结

常见陷阱速查表

陷阱后果解决方案
忘记 await代码不执行总是用 await
阻塞事件循环性能下降用异步API
不处理异常程序崩溃try-except
过多并发被封禁用 Semaphore
不复用资源浪费资源复用 Session
忘记关闭资源内存泄漏用 async with

最佳实践总结

  1. 总是使用 await
  2. 使用 async with 管理资源
  3. 限制并发数量
  4. 处理所有异常
  5. 设置超时时间
  6. 添加日志和监控
  7. 编写单元测试

性能优化要点

  1. 批量操作
  2. 连接池
  3. 缓存结果
  4. 预加载数据
  5. 限制并发

🎯 生产环境建议

1. 监控和日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import logging

logger = logging.getLogger(__name__)

async def 生产级函数():
logger.info("开始执行")

try:
结果 = await 执行任务()
logger.info(f"执行成功:{结果}")
return 结果

except Exception as e:
logger.error(f"执行失败:{e}", exc_info=True)
raise

2. 优雅关闭

1
2
3
4
5
6
async def main():
try:
await 运行服务()
except KeyboardInterrupt:
print("收到停止信号,正在关闭...")
await 清理资源()

3. 健康检查

1
2
3
4
5
6
7
8
async def 健康检查():
"""检查服务是否正常"""
try:
await 测试数据库连接()
await 测试API连接()
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}

💪 课后练习

练习1:找出错误

下面的代码有什么问题?如何修复?

1
2
3
4
5
6
7
8
async def 有问题的代码():
import time

任务1 = 异步函数1() # 问题1
time.sleep(2) # 问题2

结果 = await 任务1
return 结果

练习2:优化代码

优化下面的代码,提高性能:

1
2
3
4
5
6
7
8
async def 需要优化的代码():
结果列表 = []
for i in range(100):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.com/{i}") as response:
结果 = await response.json()
结果列表.append(结果)
return 结果列表

练习3:添加错误处理

为下面的代码添加完善的错误处理:

1
2
3
4
async def 需要错误处理的代码():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.com/data") as response:
return await response.json()

答案在 练习答案.py 中! 😊


🎉 课程完成!

恭喜你完成了Python异步编程系列课程!

你已经掌握了:

  • ✅ 异步编程的核心概念
  • ✅ async/await 语法
  • ✅ 并发控制技巧
  • ✅ 异步网络请求
  • ✅ 异步文件和数据库操作
  • ✅ 常见陷阱和最佳实践

下一步建议:

  1. 📚 深入学习:阅读官方文档
  2. 💻 实践项目:用异步重写现有项目
  3. 🔍 阅读源码:学习优秀库的实现
  4. 🤝 分享经验:帮助其他人学习

推荐资源:

  • Python官方asyncio文档
  • aiohttp文档
  • Real Python异步教程

继续加油! 🚀

06_examples.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
"""
第6课示例代码:常见陷阱和最佳实践

运行方式:
python 06_examples.py
"""

import asyncio
import time
from typing import List, Dict, Optional
import logging

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# ============================================
# 陷阱1:忘记使用 await
# ============================================

async def 异步任务() -> str:
"""一个简单的异步任务"""
await asyncio.sleep(0.1)
return "任务完成"


async def 陷阱1_忘记await() -> None:
"""演示忘记使用await的后果"""
print("\n" + "=" * 50)
print("⚠️ 陷阱1:忘记使用 await")
print("=" * 50)

# ❌ 错误:忘记await
print("\n❌ 错误示例:忘记await")
结果错误 = 异步任务() # 只得到协程对象
print(f"结果类型:{type(结果错误)}")
print(f"结果内容:{结果错误}")
print("说明:代码没有真正执行!")

# ✅ 正确:使用await
print("\n✅ 正确示例:使用await")
结果正确 = await 异步任务()
print(f"结果类型:{type(结果正确)}")
print(f"结果内容:{结果正确}")
print("说明:代码真正执行了!")


# ============================================
# 陷阱2:阻塞事件循环
# ============================================

async def 陷阱2_阻塞事件循环() -> None:
"""演示阻塞事件循环的后果"""
print("\n" + "=" * 50)
print("⚠️ 陷阱2:阻塞事件循环")
print("=" * 50)

# ❌ 错误:使用阻塞操作
print("\n❌ 错误示例:使用 time.sleep()")

async def 错误任务(编号: int):
print(f" 任务{编号}开始")
time.sleep(1) # 阻塞整个事件循环!
print(f" 任务{编号}完成")

开始时间 = time.time()
await asyncio.gather(错误任务(1), 错误任务(2), 错误任务(3))
耗时 = time.time() - 开始时间

print(f"总耗时:{耗时:.2f}秒(应该是3秒,因为被阻塞了)")

# ✅ 正确:使用异步操作
print("\n✅ 正确示例:使用 asyncio.sleep()")

async def 正确任务(编号: int):
print(f" 任务{编号}开始")
await asyncio.sleep(1) # 不阻塞事件循环
print(f" 任务{编号}完成")

开始时间 = time.time()
await asyncio.gather(正确任务(1), 正确任务(2), 正确任务(3))
耗时 = time.time() - 开始时间

print(f"总耗时:{耗时:.2f}秒(约1秒,并发执行)")


# ============================================
# 陷阱3:不处理异常
# ============================================

async def 可能失败的任务(编号: int) -> str:
"""可能失败的任务"""
await asyncio.sleep(0.1)
if 编号 == 2:
raise ValueError(f"任务{编号}失败了!")
return f"任务{编号}成功"


async def 陷阱3_不处理异常() -> None:
"""演示不处理异常的后果"""
print("\n" + "=" * 50)
print("⚠️ 陷阱3:不处理异常")
print("=" * 50)

# ❌ 错误:不处理异常
print("\n❌ 错误示例:不处理异常")
try:
结果 = await asyncio.gather(
可能失败的任务(1),
可能失败的任务(2), # 这个会失败
可能失败的任务(3)
)
except Exception as e:
print(f"程序崩溃:{e}")
print("说明:一个任务失败,整个程序崩溃")

# ✅ 正确:捕获异常
print("\n✅ 正确示例:使用 return_exceptions=True")
结果 = await asyncio.gather(
可能失败的任务(1),
可能失败的任务(2), # 这个会失败
可能失败的任务(3),
return_exceptions=True # 返回异常而不是抛出
)

for i, 项 in enumerate(结果, 1):
if isinstance(项, Exception):
print(f" 任务{i}:失败 - {项}")
else:
print(f" 任务{i}{项}")

print("说明:所有任务都执行了,失败的返回异常对象")


# ============================================
# 陷阱4:创建过多任务
# ============================================

async def 模拟请求(编号: int) -> str:
"""模拟网络请求"""
await asyncio.sleep(0.1)
return f"请求{编号}完成"


async def 陷阱4_过多并发() -> None:
"""演示过多并发的问题"""
print("\n" + "=" * 50)
print("⚠️ 陷阱4:创建过多任务")
print("=" * 50)

# ❌ 错误:同时创建100个任务
print("\n❌ 错误示例:同时100个请求")
开始时间 = time.time()

任务列表 = [模拟请求(i) for i in range(100)]
await asyncio.gather(*任务列表)

耗时 = time.time() - 开始时间
print(f"完成100个请求,耗时:{耗时:.2f}秒")
print("说明:可能导致资源耗尽或被服务器封禁")

# ✅ 正确:限制并发数量
print("\n✅ 正确示例:限制并发为10")
开始时间 = time.time()

信号量 = asyncio.Semaphore(10) # 最多10个并发

async def 受限请求(编号: int):
async with 信号量:
return await 模拟请求(编号)

任务列表 = [受限请求(i) for i in range(100)]
await asyncio.gather(*任务列表)

耗时 = time.time() - 开始时间
print(f"完成100个请求,耗时:{耗时:.2f}秒")
print("说明:控制并发数量,更加稳定")


# ============================================
# 最佳实践1:错误处理
# ============================================

async def 健壮的函数(任务编号: int) -> Optional[str]:
"""带完善错误处理的函数"""
try:
logger.info(f"任务{任务编号}开始")

# 模拟可能失败的操作
await asyncio.sleep(0.1)

if 任务编号 % 3 == 0:
raise ValueError(f"任务{任务编号}遇到错误")

logger.info(f"任务{任务编号}成功")
return f"任务{任务编号}完成"

except asyncio.TimeoutError:
logger.error(f"任务{任务编号}超时")
return None

except ValueError as e:
logger.error(f"任务{任务编号}失败:{e}")
return None

except Exception as e:
logger.error(f"任务{任务编号}未知错误:{e}")
return None


async def 最佳实践1_错误处理() -> None:
"""演示错误处理最佳实践"""
print("\n" + "=" * 50)
print("✅ 最佳实践1:完善的错误处理")
print("=" * 50)

任务列表 = [健壮的函数(i) for i in range(1, 6)]
结果列表 = await asyncio.gather(*任务列表)

成功数 = sum(1 for r in 结果列表 if r is not None)
print(f"\n统计:成功{成功数}/{len(结果列表)}个任务")


# ============================================
# 最佳实践2:超时控制
# ============================================

async def 可能很慢的任务(耗时: float) -> str:
"""可能很慢的任务"""
await asyncio.sleep(耗时)
return f"任务完成(耗时{耗时}秒)"


async def 最佳实践2_超时控制() -> None:
"""演示超时控制"""
print("\n" + "=" * 50)
print("✅ 最佳实践2:超时控制")
print("=" * 50)

# 任务1:不会超时
print("\n任务1:快速任务(不会超时)")
try:
结果 = await asyncio.wait_for(可能很慢的任务(0.5), timeout=2)
print(f"✅ {结果}")
except asyncio.TimeoutError:
print("❌ 任务超时")

# 任务2:会超时
print("\n任务2:慢速任务(会超时)")
try:
结果 = await asyncio.wait_for(可能很慢的任务(3), timeout=1)
print(f"✅ {结果}")
except asyncio.TimeoutError:
print("❌ 任务超时(超过1秒)")

print("\n💡 说明:使用 wait_for 可以防止任务卡死")


# ============================================
# 最佳实践3:资源管理
# ============================================

class 模拟Session:
"""模拟HTTP Session"""

def __init__(self, name: str):
self.name = name
self.closed = False

async def __aenter__(self):
print(f" 📂 打开{self.name}")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f" 📁 关闭{self.name}")
self.closed = True

async def get(self, url: str) -> str:
await asyncio.sleep(0.1)
return f"数据来自{url}"


async def 最佳实践3_资源管理() -> None:
"""演示资源管理最佳实践"""
print("\n" + "=" * 50)
print("✅ 最佳实践3:资源管理")
print("=" * 50)

# ❌ 错误:不关闭资源
print("\n❌ 错误示例:不关闭资源")
session1 = 模拟Session("Session1")
await session1.__aenter__()
数据 = await session1.get("https://api.com")
# 忘记关闭!
print(f" 获取数据:{数据}")
print(f" Session关闭了吗?{session1.closed}")

# ✅ 正确:使用 async with
print("\n✅ 正确示例:使用 async with")
async with 模拟Session("Session2") as session2:
数据 = await session2.get("https://api.com")
print(f" 获取数据:{数据}")
print(f" Session关闭了吗?{session2.closed}")


# ============================================
# 最佳实践4:批量操作优化
# ============================================

async def 最佳实践4_批量优化() -> None:
"""演示批量操作优化"""
print("\n" + "=" * 50)
print("✅ 最佳实践4:批量操作优化")
print("=" * 50)

# ❌ 慢:一个一个处理
print("\n❌ 慢速方式:一个一个处理")
开始时间 = time.time()

结果列表 = []
for i in range(10):
await asyncio.sleep(0.1)
结果列表.append(f"数据{i}")

耗时1 = time.time() - 开始时间
print(f"耗时:{耗时1:.2f}秒")

# ✅ 快:批量处理
print("\n✅ 快速方式:批量并发处理")
开始时间 = time.time()

async def 处理项(i: int) -> str:
await asyncio.sleep(0.1)
return f"数据{i}"

任务列表 = [处理项(i) for i in range(10)]
结果列表 = await asyncio.gather(*任务列表)

耗时2 = time.time() - 开始时间
print(f"耗时:{耗时2:.2f}秒")
print(f"💡 效率提升:{耗时1/耗时2:.1f}倍")


# ============================================
# 最佳实践5:优雅关闭
# ============================================

async def 长时间运行的任务():
"""模拟长时间运行的任务"""
try:
for i in range(10):
print(f" 任务运行中...{i+1}/10")
await asyncio.sleep(0.5)
print(" 任务完成")
except asyncio.CancelledError:
print(" 任务被取消,正在清理...")
# 清理资源
await asyncio.sleep(0.2)
print(" 清理完成")
raise


async def 最佳实践5_优雅关闭() -> None:
"""演示优雅关闭"""
print("\n" + "=" * 50)
print("✅ 最佳实践5:优雅关闭")
print("=" * 50)

print("\n启动长时间任务...")
任务 = asyncio.create_task(长时间运行的任务())

# 运行一段时间后取消
await asyncio.sleep(1.5)

print("\n收到停止信号,正在关闭...")
任务.cancel()

try:
await 任务
except asyncio.CancelledError:
print("任务已安全关闭")


# ============================================
# 主程序
# ============================================

async def main() -> None:
"""主程序:运行所有示例"""
print("🎓 第6课:常见陷阱和最佳实践")
print("=" * 50)

# 演示常见陷阱
await 陷阱1_忘记await()
await 陷阱2_阻塞事件循环()
await 陷阱3_不处理异常()
await 陷阱4_过多并发()

# 演示最佳实践
await 最佳实践1_错误处理()
await 最佳实践2_超时控制()
await 最佳实践3_资源管理()
await 最佳实践4_批量优化()
await 最佳实践5_优雅关闭()

# 总结
print("\n" + "=" * 50)
print("🎉 第6课完成!")
print("=" * 50)
print("""
📚 你学到了什么?
1. 常见陷阱:忘记await、阻塞事件循环、不处理异常
2. 并发控制:限制任务数量,避免资源耗尽
3. 错误处理:完善的异常处理和日志记录
4. 超时控制:使用 wait_for 防止卡死
5. 资源管理:使用 async with 自动清理
6. 性能优化:批量操作、并发处理
7. 优雅关闭:正确取消任务和清理资源

⚠️ 记住这些陷阱:
❌ 忘记 await
❌ 使用阻塞操作(time.sleep)
❌ 不处理异常
❌ 创建过多任务
❌ 不复用资源
❌ 忘记关闭资源

✅ 遵循最佳实践:
✅ 总是使用 await
✅ 使用异步API
✅ 添加错误处理
✅ 限制并发数量
✅ 使用 async with
✅ 设置超时时间
✅ 添加日志记录

🎯 何时使用异步?
✅ 网络请求(爬虫、API调用)
✅ 文件操作(批量读写)
✅ 数据库查询(批量操作)
✅ WebSocket(实时通信)

❌ CPU密集型(科学计算)
❌ 简单脚本(增加复杂度)
❌ 顺序依赖(必须按顺序)

💪 继续学习:
1. 阅读官方文档
2. 实践真实项目
3. 阅读优秀源码
4. 完成课后练习

🎉 恭喜完成全部课程!
""")


if __name__ == "__main__":
# 运行主程序
asyncio.run(main())