06_陷阱和最佳实践

异步编程虽然强大,但也容易踩坑。本课总结7个最常见的陷阱(忘记await、阻塞事件循环、不处理异常等)以及对应的解决方案。同时学习性能优化技巧、调试方法、何时该用/不该用异步。通过大量错误示例和正确示例的对比,让你避免走弯路,直接掌握生产环境级别的最佳实践,写出高质量的异步代码。


📖 课程目标

  • 了解异步编程的常见陷阱
  • 掌握调试异步代码的技巧
  • 学习性能优化方法
  • 理解何时使用/不使用异步
  • 掌握生产环境最佳实践

⚠️ 常见陷阱

陷阱1:忘记使用 await

1
2
3
4
5
6
7
8
9
# ❌ error:忘记 await
async def error_example():
result = async_function() # 只得到协程对象,不会执行!
print(result) # <coroutine object ...>

# ✅ 正确:使用 await
async def correct_example():
result = await async_function() # 真正执行
print(result) # 实际结果

后果

  • 代码不会执行
  • 可能导致资源泄漏
  • 难以发现的bug

解决方案

  • 调用异步函数时,总是使用 await
  • 或者使用 asyncio.create_task() 创建任务

陷阱2:在同步函数中使用 await

1
2
3
4
5
6
7
8
9
# ❌ error:在普通函数中使用 await
def error_example():
result = await async_function() # 语法错误!
return result

# ✅ 正确:在异步函数中使用 await
async def correct_example():
result = await async_function() # 正确
return result

后果

  • 语法错误,代码无法运行

解决方案

  • 只在 async def 定义的函数中使用 await
  • 如果需要在同步代码中调用异步函数,使用 asyncio.run()

陷阱3:阻塞事件循环

1
2
3
4
5
6
7
8
9
10
# ❌ error:使用阻塞操作
async def error_example():
import time
time.sleep(5) # 阻塞整个事件循环!
return "completed"

# ✅ 正确:使用异步操作
async def correct_example():
await asyncio.sleep(5) # 不阻塞事件循环
return "completed"

常见阻塞操作

  • time.sleep() → 用 asyncio.sleep()
  • requests.get() → 用 aiohttp
  • open() → 用 aiofiles.open()
  • CPU密集型计算 → 用 asyncio.to_thread() 或多进程

后果

  • 所有异步任务都会被阻塞
  • 失去异步的优势
  • 性能大幅下降

陷阱4:不处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ❌ error:不处理异常
async def error_example():
result = await asyncio.gather(
potentially_failing_task1(),
potentially_failing_task2()
)
# 如果任何一个失败,整个程序崩溃

# ✅ 正确:捕获异常
async def correct_example():
result = await asyncio.gather(
potentially_failing_task1(),
potentially_failing_task2(),
return_exceptions=True # 返回异常而不是抛出
)

for item in result:
if isinstance(item, Exception):
print(f":{item}")
else:
print(f":{item}")

陷阱5:创建过多任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ❌ error:同时创建10000个任务
async def error_example():
url_list = [f"https://api.com/{i}" for i in range(10000)]

# 同时发起10000个请求!服务器可能会封禁你
task_list = [request(url) for url in url_list]
await asyncio.gather(*task_list)

# ✅ 正确:限制并发数量
async def correct_example():
url_list = [f"https://api.com/{i}" for i in range(10000)]

# 最多同时10个请求
semaphore = asyncio.Semaphore(10)

async def limited_request(url):
async with semaphore:
return await request(url)

task_list = [limited_request(url) for url in url_list]
await asyncio.gather(*task_list)

陷阱6:不复用 Session

1
2
3
4
5
6
7
8
9
10
11
12
13
# ❌ error:每次请求都创建新session
async def error_example():
for url in url_list:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
pass # 浪费资源!

# ✅ 正确:
async def correct_example():
async with aiohttp.ClientSession() as session:
for url in url_list:
async with session.get(url) as response:
pass # 高效!

陷阱7:忘记关闭资源

1
2
3
4
5
6
7
8
9
10
11
# ❌ error:不关闭资源
async def error_example():
f = await aiofiles.open('file.txt', 'r')
content = await f.read()
# 忘记关闭文件!内存泄漏

# ✅ 正确:使用 async with
async def correct_example():
async with aiofiles.open('file.txt', 'r') as f:
content = await f.read()
# 自动关闭

🎯 何时使用异步?何时不用?

✅ 适合使用异步的场景

场景原因效率提升
网络请求I/O等待时间长10-100倍 ⚡
文件操作磁盘I/O较慢5-20倍 ⚡
data库查询等待data库响应10-50倍 ⚡
WebSocket需要保持连接必须用
微服务调用多个API并发调用10-100倍 ⚡

❌ 不适合使用异步的场景

场景原因建议
CPU密集型没有I/O等待用多进程
简单脚本增加复杂度用同步代码
顺序依赖必须按顺序执行用同步代码
科学计算纯计算任务用NumPy/多进程

判断标准

1
2
3
4
5
6
7
8
9
10
11
# 问自己:代码中有很多"等待"吗?
#
# 有等待(I/io_intensive)→ 用异步 ✅
# - 网络请求
# -
# - database_operation
#
# 没等待(cpu_intensive)→ 不用异步 ❌
# - 复杂计算
# - 图像处理
# -

🔍 调试技巧

技巧1:启用调试模式

1
2
3
4
5
6
7
8
import asyncio

# 启用调试模式
asyncio.run(main(), debug=True)

# 或者
import logging
logging.basicConfig(level=logging.DEBUG)

技巧2:打印协程状态

1
2
3
4
5
6
7
8
9
async def debug_example():
task = asyncio.create_task(async_function())

print(f":{task.done()}") # False
print(f":{task.cancelled()}") # False

await task

print(f":{task.done()}") # True

技巧3:使用 asyncio.wait_for 防止卡死

1
2
3
4
5
6
7
8
async def safe_call():
try:
result = await asyncio.wait_for(
potentially_blocking_task(),
timeout=10 # 最多等10秒
)
except asyncio.TimeoutError:
print("任务超时!")

技巧4:记录异常

1
2
3
4
5
6
7
8
9
import traceback

async def safe_execute(task):
try:
return await task
except Exception as e:
print(f":{e}")
traceback.print_exc() # 打印完整堆栈
return None

🚀 性能优化

优化1:批量操作

1
2
3
4
5
6
7
8
9
10
11
12
# ❌ 慢:一个一个插入
async def slow_insert(data_list):
async with aiosqlite.connect('db.db') as db:
for data in data_list:
await db.execute('INSERT INTO ...', data)
await db.commit() # 每次都提交

# ✅ 快:
async def fast_insert(data_list):
async with aiosqlite.connect('db.db') as db:
await db.executemany('INSERT INTO ...', data_list)
await db.commit() # 一次提交

优化2:连接池

1
2
3
4
5
6
7
8
9
# 使用连接池管理data库连接
class ConnectionPool:
def __init__(self, max_connections=10):
self.semaphore = asyncio.Semaphore(max_connections)

async def execute(self, sql):
async with self.semaphore:
async with aiosqlite.connect('db.db') as db:
return await db.execute(sql)

优化3:缓存结果

1
2
3
4
5
6
7
8
9
10
11
12
from functools import lru_cache

#
cache = {}

async def cached_query(key):
if key in cache:
return cache[key]

result = await query_database(key)
cache[key] = result
return result

优化4:预加载data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ❌ 慢:用时再加载
async def slow_processing():
for id in id_list:
data = await load_data(id) # 一个一个加载
process(data)

# ✅ 快:预先加载所有data
async def fast_processing():
# 并发加载所有data
task_list = [load_data(id) for id in id_list]
data_list = await asyncio.gather(*task_list)

#
for data in data_list:
process(data)

📋 最佳实践清单

代码风格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ✅ 好的命名
async def get_user_info(user_id: int) -> Dict:
pass

# ✅ 添加类型注解
async def process_data(data: List[str]) -> List[int]:
pass

# ✅ 添加文档字符串
async def complex_function():
"""
Function description

Args:
Parameter description

Returns:
Return value description
"""
pass

错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def robust_function():
try:
result = await potentially_failing_operation()
return result

except asyncio.TimeoutError:
print("超时")
return None

except aiohttp.ClientError as e:
print(f":{e}")
return None

except Exception as e:
print(f"未知错误:{e}")
return None

finally:
# cleanup_resources
pass

资源管理

1
2
3
4
5
# ✅ 总是使用 async with
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async with aiofiles.open(, 'r') as f:
pass

并发控制

1
2
3
4
5
6
# ✅ 限制并发数量
semaphore = asyncio.Semaphore(10)

async def controlled_task():
async with semaphore:
await execute_task()

📝 本课小结

常见陷阱速查表

陷阱后果解决方案
忘记 await代码不执行总是用 await
阻塞事件循环性能下降用异步API
不处理异常程序崩溃try-except
过多并发被封禁用 Semaphore
不复用资源浪费资源复用 Session
忘记关闭资源内存泄漏用 async with

最佳实践总结

  1. 总是使用 await
  2. 使用 async with 管理资源
  3. 限制并发数量
  4. 处理所有异常
  5. 设置超时时间
  6. 添加日志和监控
  7. 编写单元测试

性能优化要点

  1. 批量操作
  2. 连接池
  3. 缓存结果
  4. 预加载data
  5. 限制并发

🎯 生产环境建议

1. 监控和日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import logging

logger = logging.getLogger(__name__)

async def production_ready_function():
logger.info("开始执行")

try:
result = await execute_task()
logger.info(f"execution successful:{result}")
return result

except Exception as e:
logger.error(f"execution failed:{e}", exc_info=True)
raise

2. 优雅关闭

1
2
3
4
5
6
async def main():
try:
await run_service()
except KeyboardInterrupt:
print("收到停止信号,正在关闭...")
await cleanup_resources()

3. 健康检查

1
2
3
4
5
6
7
8
async def health_check():
"""检查服务是否正常"""
try:
await test_database_connection()
await test_api_connection()
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}

💪 课后练习

练习1:找出错误

下面的代码有什么问题?如何修复?

1
2
3
4
5
6
7
8
async def problematic_code():
import time

task1 = async_function1() #
time.sleep(2) #

result = await task1
return result

练习2:优化代码

优化下面的代码,提高性能:

1
2
3
4
5
6
7
8
async def code_needing_optimization():
result_list = []
for i in range(100):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.com/{i}") as response:
result = await response.json()
.append(result)
return result_list

练习3:添加错误处理

为下面的代码添加完善的错误处理:

1
2
3
4
async def code_needing_error_handling():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.com/data") as response:
return await response.json()

答案在 练习答案.py 中! 😊


🎉 课程完成!

恭喜你完成了Python异步编程系列课程!

你已经掌握了:

  • ✅ 异步编程的核心概念
  • ✅ async/await 语法
  • ✅ 并发控制技巧
  • ✅ 异步网络请求
  • ✅ 异步文件和data库操作
  • ✅ 常见陷阱和最佳实践

下一步建议:

  1. 📚 深入学习:阅读官方文档
  2. 💻 实践item目:用异步重写现有item目
  3. 🔍 阅读源码:学习优秀库的实现
  4. 🤝 分享经验:帮助其他人学习

推荐资源:

  • Python官方asyncio文档
  • aiohttp文档
  • Real Python异步教程

继续加油! 🚀

06_examples.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
"""
第6课示例代码:常见陷阱和最佳实践

运行方式:
python 06_examples.py
"""

import asyncio
import time
from typing import List, Dict, Optional
import logging

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# ============================================
# 陷阱1:忘记使用 await
# ============================================

async def async_task() -> str:
""""""
await asyncio.sleep(0.1)
return ""


async def trap1_forget_await() -> None:
"""演示忘记使用await的后果"""
print("\n" + "=" * 50)
print("⚠️ 陷阱1:忘记使用 await")
print("=" * 50)

# ❌ error:忘记await
print("\n❌ 错误示例:忘记await")
result_error = async_task() # 只得到协程对象
print(f"结果类型:{type(result_error)}")
print(f"结果内容:{result_error}")
print("说明:代码没有真正执行!")

# ✅ 正确:使用await
print("\n✅ 正确示例:使用await")
result_correct = await async_task()
print(f"结果类型:{type(result_correct)}")
print(f"结果内容:{result_correct}")
print("说明:代码真正执行了!")


# ============================================
# 陷阱2:阻塞事件循环
# ============================================

async def trap2_block_event_loop() -> None:
"""演示阻塞事件循环的后果"""
print("\n" + "=" * 50)
print("⚠️ 陷阱2:阻塞事件循环")
print("=" * 50)

# ❌ error:使用阻塞操作
print("\n❌ 错误示例:使用 time.sleep()")

async def error_task(number: int):
print(f" task{}开始")
time.sleep(1) # 阻塞整个事件循环!
print(f" task{}completed")

start_time = time.time()
await asyncio.gather(error_task(1), error_task(2), error_task(3))
duration = time.time() -

print(f":{duration:.2f}秒(应该是3秒,因为被阻塞了)")

# ✅ 正确:使用异步操作
print("\n✅ 正确示例:使用 asyncio.sleep()")

async def correct_task(: int):
print(f" task{}开始")
await asyncio.sleep(1) # 不阻塞事件循环
print(f" task{}completed")

start_time = time.time()
await asyncio.gather(correct_task(1), correct_task(2), correct_task(3))
duration = time.time() -

print(f":{duration:.2f}秒(约1秒,并发执行)")


# ============================================
# 陷阱3:不处理异常
# ============================================

async def potentially_failing_task(: int) -> str:
""""""
await asyncio.sleep(0.1)
if number == 2:
raise ValueError(f"task{}失败了!")
return f"task{}success"


async def trap3_no_exception_handling() -> None:
"""演示不处理异常的后果"""
print("\n" + "=" * 50)
print("⚠️ 陷阱3:不处理异常")
print("=" * 50)

# ❌ error:不处理异常
print("\n❌ 错误示例:不处理异常")
try:
result = await asyncio.gather(
potentially_failing_task(1),
potentially_failing_task(2), # 这个会失败
potentially_failing_task(3)
)
except Exception as e:
print(f"程序崩溃:{e}")
print("说明:一个任务失败,整个程序崩溃")

# ✅ 正确:捕获异常
print("\n✅ 正确示例:使用 return_exceptions=True")
result = await asyncio.gather(
potentially_failing_task(1),
potentially_failing_task(2), # 这个会失败
potentially_failing_task(3),
return_exceptions=True # 返回异常而不是抛出
)

for i, item in enumerate(result, 1):
if isinstance(item, Exception):
print(f" task{i}:failed - {item}")
else:
print(f" task{i}{item}")

print("说明:所有任务都执行了,失败的返回异常对象")


# ============================================
# 陷阱4:创建过多任务
# ============================================

async def simulate_request(: int) -> str:
""""""
await asyncio.sleep(0.1)
return f"请求{}completed"


async def trap4_too_much_concurrency() -> None:
"""演示过多并发的问题"""
print("\n" + "=" * 50)
print("⚠️ 陷阱4:创建过多任务")
print("=" * 50)

# ❌ error:同时创建100个任务
print("\n❌ 错误示例:同时100个请求")
start_time = time.time()

task_list = [simulate_request(i) for i in range(100)]
await asyncio.gather(*task_list)

duration = time.time() -
print(f"完成100个请求,:{duration:.2f}秒")
print("说明:可能导致资源耗尽或被服务器封禁")

# ✅ 正确:限制并发数量
print("\n✅ 正确示例:限制并发为10")
start_time = time.time()

semaphore = asyncio.Semaphore(10) # 最多10个并发

async def limited_request(: int):
async with semaphore:
return await simulate_request()

task_list = [limited_request(i) for i in range(100)]
await asyncio.gather(*task_list)

duration = time.time() -
print(f"完成100个请求,:{duration:.2f}秒")
print("说明:控制并发数量,更加稳定")


# ============================================
# 最佳实践1:错误处理
# ============================================

async def robust_function(task_number: int) -> Optional[str]:
""""""
try:
logger.info(f"task{}开始")

#
await asyncio.sleep(0.1)

if % 3 == 0:
raise ValueError(f"task{}遇到错误")

logger.info(f"task{}success")
return f"task{}completed"

except asyncio.TimeoutError:
logger.error(f"task{}超时")
return None

except ValueError as e:
logger.error(f"task{}failed:{e}")
return None

except Exception as e:
logger.error(f"task{}未知错误:{e}")
return None


async def best_practice1_error_handling() -> None:
"""演示错误处理最佳实践"""
print("\n" + "=" * 50)
print("✅ 最佳实践1:完善的错误处理")
print("=" * 50)

task_list = [robust_function(i) for i in range(1, 6)]
result_list = await asyncio.gather(*task_list)

success_count = sum(1 for r in if r is not None)
print(f"\n统计:success{}/{len()}个任务")


# ============================================
# 最佳实践2:超时控制
# ============================================

async def potentially_slow_task(duration: float) -> str:
""""""
await asyncio.sleep()
return f"(duration{}秒)"


async def best_practice2_timeout_control() -> None:
"""演示超时控制"""
print("\n" + "=" * 50)
print("✅ 最佳实践2:超时控制")
print("=" * 50)

# :不会超时
print("\n任务1:(不会超时)")
try:
result = await asyncio.wait_for(potentially_slow_task(0.5), timeout=2)
print(f"✅ {result}")
except asyncio.TimeoutError:
print("❌ 任务超时")

# :会超时
print("\n任务2:(会超时)")
try:
result = await asyncio.wait_for(potentially_slow_task(3), timeout=1)
print(f"✅ {result}")
except asyncio.TimeoutError:
print("❌ 任务超时(超过1秒)")

print("\n💡 说明:使用 wait_for 可以防止任务卡死")


# ============================================
# 最佳实践3:资源管理
# ============================================

class :
""" Session"""

def __init__(self, name: str):
self.name = name
self.closed = False

async def __aenter__(self):
print(f" 📂 打开{self.name}")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f" 📁 关闭{self.name}")
self.closed = True

async def get(self, url: str) -> str:
await asyncio.sleep(0.1)
return f"{url}"


async def best_practice3_resource_management() -> None:
"""演示资源管理最佳实践"""
print("\n" + "=" * 50)
print("✅ 最佳实践3:资源管理")
print("=" * 50)

# ❌ error:不关闭资源
print("\n❌ 错误示例:不关闭资源")
session1 = simulate_session("Session1")
await session1.__aenter__()
data = await session1.get("https://api.com")
# 忘记关闭!
print(f" :{data}")
print(f" ?{session1.closed}")

# ✅ 正确:使用 async with
print("\n✅ 正确示例:使用 async with")
async with simulate_session("Session2") as session2:
data = await session2.get("https://api.com")
print(f" :{data}")
print(f" ?{session2.closed}")


# ============================================
# 最佳实践4:批量操作优化
# ============================================

async def best_practice4_batch_optimization() -> None:
"""演示批量操作优化"""
print("\n" + "=" * 50)
print("✅ 最佳实践4:批量操作优化")
print("=" * 50)

# ❌ 慢:一个一个处理
print("\n❌ 慢速方式:一个一个处理")
start_time = time.time()

result_list = []
for i in range(10):
await asyncio.sleep(0.1)
.append(f"data{i}")

duration1 = time.time() -
print(f":{:.2f}秒")

# ✅ 快:
print("\n✅ 快速方式:批量并发处理")
start_time = time.time()

async def process_item(i: int) -> str:
await asyncio.sleep(0.1)
return f"data{i}"

task_list = [process_item(i) for i in range(10)]
result_list = await asyncio.gather(*task_list)

duration2 = time.time() -
print(f":{:.2f}秒")
print(f"💡 效率提升:{/:.1f}倍")


# ============================================
# 最佳实践5:优雅关闭
# ============================================

async def long_running_task():
""""""
try:
for i in range(10):
print(f" 任务运行中...{i+1}/10")
await asyncio.sleep(0.5)
print(" ")
except asyncio.CancelledError:
print(" 任务被取消,正在清理...")
# cleanup_resources
await asyncio.sleep(0.2)
print(" 清理完成")
raise


async def best_practice5_graceful_shutdown() -> None:
"""演示优雅关闭"""
print("\n" + "=" * 50)
print("✅ 最佳实践5:优雅关闭")
print("=" * 50)

print("\n启动长时间任务...")
task = asyncio.create_task(long_running_task())

# 运行一段时间后取消
await asyncio.sleep(1.5)

print("\n收到停止信号,正在关闭...")
task.cancel()

try:
await task
except asyncio.CancelledError:
print("任务已安全关闭")


# ============================================
#
# ============================================

async def main() -> None:
""":"""
print("🎓 第6课:常见陷阱和最佳实践")
print("=" * 50)

# 演示常见陷阱
await trap1_forget_await()
await trap2_blocking_event_loop()
await trap3_no_exception_handling()
await trap4_too_many_concurrent()

# 演示最佳实践
await best_practice1_error_handling()
await best_practice2_timeout_control()
await best_practice3_resource_management()
await best_practice4_batch_optimization()
await best_practice5_graceful_shutdown()

# 总结
print("\n" + "=" * 50)
print("🎉 第6课完成!")
print("=" * 50)
print("""
📚 你学到了什么?
1. 常见陷阱:忘记await、阻塞事件循环、不处理异常
2. 并发控制:限制任务数量,避免资源耗尽
3. 错误处理:完善的异常处理和日志记录
4. 超时控制:使用 wait_for 防止卡死
5. 资源管理:使用 async with 自动清理
6. 性能优化:、并发处理
7. 优雅关闭:正确取消任务和cleanup_resources

⚠️ 记住这些陷阱:
❌ 忘记 await
❌ 使用阻塞操作(time.sleep)
❌ 不处理异常
❌ 创建过多任务
❌ 不复用资源
❌ 忘记关闭资源

✅ 遵循最佳实践:
✅ 总是使用 await
✅ 使用异步API
✅ 添加错误处理
✅ 限制并发数量
✅ 使用 async with
✅ 设置超时时间
✅ 添加日志记录

🎯 何时使用异步?
✅ 网络请求(爬虫、api_call)
✅ ()
✅ database_query()
✅ WebSocket(实时通信)

❌ cpu_intensive(科学计算)
❌ 简单脚本(增加复杂度)
❌ 顺序依赖(必须按顺序)

💪 继续学习:
1. 阅读官方文档
2. 实践真实item目
3. 阅读优秀源码
4. 完成课后练习

🎉 恭喜完成全部课程!
""")


if __name__ == "__main__":
#
asyncio.run(main())