05_异步文件和数据库

文件和数据库操作涉及大量I/O等待,是异步编程的另一大应用场景。本课学习使用aiofiles进行异步文件操作,使用aiosqlite进行异步数据库操作。处理100个文件,同步需要10秒,异步只需0.5秒!我们将通过日志分析系统、CSV批量处理、数据批量导入等实战项目,让你掌握如何高效处理文件和数据库,并学会流式处理大文件避免内存溢出。


📖 课程目标

  • 学会使用 aiofiles 进行异步文件操作
  • 掌握异步数据库操作(SQLite)
  • 理解异步I/O的优势
  • 完成实战项目:日志处理系统
  • 完成实战项目:用户数据批量导入

🎯 为什么需要异步文件操作?

场景对比

假设你要处理100个日志文件,每个文件需要0.1秒:

方式耗时说明
同步方式10秒一个一个处理 😴
异步方式约1秒同时处理 ⚡

典型应用场景

  • 📝 日志处理:批量读写日志文件
  • 📊 数据导入导出:批量处理CSV、JSON文件
  • 🗄️ 数据库操作:批量查询和插入
  • 📁 文件转换:批量转换文件格式

📦 安装依赖

1
2
3
4
5
6
7
8
# 激活虚拟环境
source ../.venv/bin/activate

# 安装异步文件库
uv pip install aiofiles

# 安装异步数据库库
uv pip install aiosqlite

📄 异步文件操作

1. 读取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import aiofiles
import asyncio

async def 读取文件(文件路径: str) -> str:
"""异步读取文件"""
async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
内容 = await f.read()
return 内容

# 使用
async def main():
内容 = await 读取文件("data.txt")
print(内容)

asyncio.run(main())

2. 写入文件

1
2
3
4
async def 写入文件(文件路径: str, 内容: str) -> None:
"""异步写入文件"""
async with aiofiles.open(文件路径, 'w', encoding='utf-8') as f:
await f.write(内容)

3. 逐行读取

1
2
3
4
5
6
7
async def 逐行读取(文件路径: str) -> List[str]:
"""异步逐行读取文件"""
行列表 = []
async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
async forin f:
行列表.append(行.strip())
return 行列表

4. 追加内容

1
2
3
4
async def 追加内容(文件路径: str, 内容: str) -> None:
"""异步追加内容到文件"""
async with aiofiles.open(文件路径, 'a', encoding='utf-8') as f:
await f.write(内容 + '\n')

💡 实战案例1:批量日志处理

需求

处理多个日志文件:

  1. 读取所有日志文件
  2. 提取错误信息
  3. 生成汇总报告

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import aiofiles
import asyncio
from typing import List, Dict
from datetime import datetime

async def 处理日志文件(文件路径: str) -> Dict[str, any]:
"""处理单个日志文件"""
try:
错误列表 = []
总行数 = 0

async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
async forin f:
总行数 += 1
if 'ERROR' inor '错误' in 行:
错误列表.append(行.strip())

return {
"文件": 文件路径,
"总行数": 总行数,
"错误数": len(错误列表),
"错误列表": 错误列表[:5], # 只保留前5个
"成功": True
}

except Exception as e:
return {
"文件": 文件路径,
"错误": str(e),
"成功": False
}

async def 批量处理日志(文件列表: List[str]) -> None:
"""批量处理日志文件"""
print(f"📝 开始处理 {len(文件列表)} 个日志文件...")

# 同时处理所有文件
任务列表 = [处理日志文件(文件) for 文件 in 文件列表]
结果列表 = await asyncio.gather(*任务列表)

# 生成汇总报告
总错误数 = sum(r["错误数"] for r in 结果列表 if r["成功"])

print(f"\n📊 处理报告:")
print(f" 文件总数:{len(结果列表)}")
print(f" 错误总数:{总错误数}")

for 结果 in 结果列表:
if 结果["成功"]:
print(f" ✅ {结果['文件']}: {结果['错误数']}个错误")
else:
print(f" ❌ {结果['文件']}: {结果['错误']}")

完整代码在 05_examples.py 中!


🗄️ 异步数据库操作

1. 连接数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import aiosqlite
import asyncio

async def 连接数据库():
"""连接SQLite数据库"""
async with aiosqlite.connect('database.db') as db:
# 执行操作
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
''')
await db.commit()

2. 插入数据

1
2
3
4
5
6
7
8
async def 插入用户(姓名: str, 邮箱: str) -> None:
"""插入用户数据"""
async with aiosqlite.connect('database.db') as db:
await db.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(姓名, 邮箱)
)
await db.commit()

3. 查询数据

1
2
3
4
5
6
async def 查询所有用户() -> List[tuple]:
"""查询所有用户"""
async with aiosqlite.connect('database.db') as db:
async with db.execute('SELECT * FROM users') as cursor:
结果 = await cursor.fetchall()
return 结果

4. 批量插入

1
2
3
4
5
6
7
8
async def 批量插入用户(用户列表: List[tuple]) -> None:
"""批量插入用户"""
async with aiosqlite.connect('database.db') as db:
await db.executemany(
'INSERT INTO users (name, email) VALUES (?, ?)',
用户列表
)
await db.commit()

💡 实战案例2:用户数据批量导入

需求

从多个CSV文件导入用户数据到数据库:

  1. 读取多个CSV文件
  2. 解析用户数据
  3. 批量插入数据库
  4. 生成导入报告

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import aiofiles
import aiosqlite
import asyncio
from typing import List, Dict

async def 读取CSV文件(文件路径: str) -> List[Dict]:
"""读取CSV文件"""
用户列表 = []

async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
# 跳过标题行
await f.readline()

# 读取数据行
async forin f:
字段 = 行.strip().split(',')
if len(字段) >= 2:
用户列表.append({
"姓名": 字段[0],
"邮箱": 字段[1]
})

return 用户列表

async def 导入用户到数据库(
db: aiosqlite.Connection,
用户列表: List[Dict]
) -> int:
"""导入用户到数据库"""
数据 = [(u["姓名"], u["邮箱"]) for u in 用户列表]

await db.executemany(
'INSERT INTO users (name, email) VALUES (?, ?)',
数据
)
await db.commit()

return len(数据)

async def 批量导入系统(文件列表: List[str]) -> None:
"""批量导入系统"""
print(f"📥 开始导入 {len(文件列表)} 个文件...")

# 1. 并发读取所有文件
读取任务 = [读取CSV文件(文件) for 文件 in 文件列表]
用户列表组 = await asyncio.gather(*读取任务)

# 2. 合并所有用户
所有用户 = []
for 用户列表 in 用户列表组:
所有用户.extend(用户列表)

# 3. 批量导入数据库
async with aiosqlite.connect('users.db') as db:
# 创建表
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
email TEXT
)
''')

# 批量插入
导入数量 = await 导入用户到数据库(db, 所有用户)

print(f"✅ 导入完成!共导入 {导入数量} 个用户")

📊 性能对比

场景:处理50个文件

操作同步方式异步方式提升
读取文件5秒0.5秒10倍
写入文件5秒0.5秒10倍
数据库查询10秒1秒10倍
数据库插入8秒1秒8倍

🎯 高级技巧

技巧1:流式处理大文件

1
2
3
4
5
6
7
8
async def 流式处理大文件(输入文件: str, 输出文件: str):
"""流式处理,避免内存溢出"""
async with aiofiles.open(输入文件, 'r') as f_in:
async with aiofiles.open(输出文件, 'w') as f_out:
async forin f_in:
# 处理每一行
处理后的行 = 行.upper()
await f_out.write(处理后的行)

技巧2:事务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def 事务处理():
"""使用事务确保数据一致性"""
async with aiosqlite.connect('database.db') as db:
try:
# 开始事务
await db.execute('BEGIN')

# 执行多个操作
await db.execute('INSERT INTO users ...')
await db.execute('UPDATE accounts ...')

# 提交事务
await db.commit()

except Exception as e:
# 回滚事务
await db.rollback()
raise e

技巧3:连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
class 数据库连接池:
"""简单的数据库连接池"""

def __init__(self, 数据库路径: str, 最大连接数: int = 5):
self.数据库路径 = 数据库路径
self.信号量 = asyncio.Semaphore(最大连接数)

async def 执行查询(self, sql: str):
"""使用连接池执行查询"""
async with self.信号量:
async with aiosqlite.connect(self.数据库路径) as db:
async with db.execute(sql) as cursor:
return await cursor.fetchall()

📝 本课小结

核心知识点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1. 异步读取文件
async with aiofiles.open(文件路径, 'r') as f:
内容 = await f.read()

# 2. 异步写入文件
async with aiofiles.open(文件路径, 'w') as f:
await f.write(内容)

# 3. 异步数据库查询
async with aiosqlite.connect('db.db') as db:
async with db.execute(sql) as cursor:
结果 = await cursor.fetchall()

# 4. 批量操作
任务列表 = [处理文件(f) for f in 文件列表]
结果 = await asyncio.gather(*任务列表)

最佳实践

  1. ✅ 使用 async with 管理资源
  2. ✅ 批量操作提高效率
  3. ✅ 流式处理大文件
  4. ✅ 使用事务确保一致性
  5. ✅ 限制并发数量

常见陷阱

  1. ❌ 不关闭文件句柄(内存泄漏)
  2. ❌ 一次性读取超大文件(内存溢出)
  3. ❌ 忘记提交事务(数据丢失)
  4. ❌ 并发数太大(资源耗尽)

🎯 下一步

  1. 运行 05_examples.py 查看完整示例
  2. 尝试处理真实的文件和数据库
  3. 完成课后练习题
  4. 准备学习第6课:常见陷阱和最佳实践

💪 课后练习

练习1:日志分析器

编写一个程序,分析多个日志文件,统计各类日志级别的数量。

1
2
3
4
async def 分析日志(文件路径: str) -> Dict[str, int]:
# 统计 INFO、WARNING、ERROR 的数量
# 你的代码
pass

练习2:CSV转JSON

批量将CSV文件转换为JSON文件。

1
2
3
async def CSV转JSON(csv文件: str, json文件: str):
# 你的代码
pass

练习3:数据库备份

实现一个异步数据库备份工具。

1
2
3
async def 备份数据库(源数据库: str, 目标数据库: str):
# 你的代码
pass

答案在 练习答案.py 中! 😊

05_examples.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
"""
第5课示例代码:异步文件和数据库操作

注意:本示例会在当前目录创建临时文件和数据库
运行完成后会自动清理

运行方式:
python 05_examples.py
"""

import asyncio
import time
import os
import tempfile
from typing import List, Dict
from pathlib import Path


# ============================================
# 模拟 aiofiles(用于演示,实际使用需安装aiofiles)
# ============================================

class AsyncFile:
"""模拟异步文件对象"""

def __init__(self, file_path: str, mode: str, encoding: str = 'utf-8'):
self.file_path = file_path
self.mode = mode
self.encoding = encoding
self._file = None

async def __aenter__(self):
await asyncio.sleep(0.05) # 模拟I/O延迟
self._file = open(self.file_path, self.mode, encoding=self.encoding)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._file:
self._file.close()

async def read(self) -> str:
await asyncio.sleep(0.1) # 模拟I/O延迟
return self._file.read()

async def write(self, content: str) -> None:
await asyncio.sleep(0.1) # 模拟I/O延迟
self._file.write(content)

async def readline(self) -> str:
await asyncio.sleep(0.02)
return self._file.readline()

def __aiter__(self):
return self

async def __anext__(self):
await asyncio.sleep(0.02) # 模拟I/O延迟
line = self._file.readline()
if line:
return line
raise StopAsyncIteration


class aiofiles:
"""模拟aiofiles模块"""

@staticmethod
def open(file_path: str, mode: str = 'r', encoding: str = 'utf-8'):
return AsyncFile(file_path, mode, encoding)


# ============================================
# 示例1:基础文件操作
# ============================================

async def 读取文件(文件路径: str) -> str:
"""异步读取文件"""
async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
内容 = await f.read()
return 内容


async def 写入文件(文件路径: str, 内容: str) -> None:
"""异步写入文件"""
async with aiofiles.open(文件路径, 'w', encoding='utf-8') as f:
await f.write(内容)


async def 示例1_基础文件操作() -> None:
"""示例1:基础的异步文件操作"""
print("\n" + "=" * 50)
print("📚 示例1:基础异步文件操作")
print("=" * 50)

# 创建临时文件
临时文件 = "temp_test.txt"

# 写入文件
print("📝 写入文件...")
内容 = "这是一个测试文件\n包含多行内容\n用于演示异步文件操作"
await 写入文件(临时文件, 内容)
print("✅ 写入完成")

# 读取文件
print("\n📖 读取文件...")
读取内容 = await 读取文件(临时文件)
print(f"✅ 读取完成:\n{读取内容}")

# 清理
if os.path.exists(临时文件):
os.remove(临时文件)

print("\n💡 关键点:")
print(" 1. 使用 async with 管理文件")
print(" 2. 使用 await 等待I/O操作")
print(" 3. 自动关闭文件句柄")


# ============================================
# 示例2:批量文件处理 - 同步 vs 异步
# ============================================

async def 处理单个文件(文件路径: str, 文件编号: int) -> Dict[str, any]:
"""处理单个文件"""
try:
# 读取文件
async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
内容 = await f.read()

# 处理内容(转大写)
处理后内容 = 内容.upper()

# 写入新文件
输出文件 = f"temp_output_{文件编号}.txt"
async with aiofiles.open(输出文件, 'w', encoding='utf-8') as f:
await f.write(处理后内容)

return {
"输入文件": 文件路径,
"输出文件": 输出文件,
"字符数": len(内容),
"成功": True
}

except Exception as e:
return {
"输入文件": 文件路径,
"错误": str(e),
"成功": False
}


async def 批量处理_同步方式(文件列表: List[str]) -> None:
"""同步方式:一个一个处理"""
print("\n" + "=" * 50)
print("📚 示例2A:批量文件处理 - 同步方式")
print("=" * 50)

开始时间 = time.time()
结果列表 = []

# 一个一个处理
for i, 文件 in enumerate(文件列表, 1):
print(f" [{i}/{len(文件列表)}] 处理: {文件}")
结果 = await 处理单个文件(文件, i)
结果列表.append(结果)

总耗时 = time.time() - 开始时间

print(f"\n✅ 完成!总耗时:{总耗时:.2f}秒")
print(f"成功:{sum(1 for r in 结果列表 if r['成功'])}/{len(结果列表)}")


async def 批量处理_异步方式(文件列表: List[str]) -> None:
"""异步方式:同时处理"""
print("\n" + "=" * 50)
print("📚 示例2B:批量文件处理 - 异步方式")
print("=" * 50)

开始时间 = time.time()

# 同时处理所有文件
任务列表 = [
处理单个文件(文件, i+1)
for i, 文件 in enumerate(文件列表)
]

print(f"🚀 同时处理 {len(任务列表)} 个文件...")
结果列表 = await asyncio.gather(*任务列表)

总耗时 = time.time() - 开始时间

print(f"\n✅ 完成!总耗时:{总耗时:.2f}秒")
print(f"成功:{sum(1 for r in 结果列表 if r['成功'])}/{len(结果列表)}")
print(f"💡 效率提升:约 {len(文件列表)}倍!")


# ============================================
# 示例3:日志文件分析
# ============================================

async def 分析日志文件(文件路径: str) -> Dict[str, any]:
"""分析单个日志文件"""
try:
统计 = {
"INFO": 0,
"WARNING": 0,
"ERROR": 0,
"总行数": 0
}

错误列表 = []

async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
async forin f:
统计["总行数"] += 1

if "INFO" in 行:
统计["INFO"] += 1
elif "WARNING" in 行:
统计["WARNING"] += 1
elif "ERROR" in 行:
统计["ERROR"] += 1
错误列表.append(行.strip())

return {
"文件": 文件路径,
"统计": 统计,
"错误列表": 错误列表[:3], # 只保留前3个
"成功": True
}

except Exception as e:
return {
"文件": 文件路径,
"错误": str(e),
"成功": False
}


async def 示例3_日志分析() -> None:
"""示例3:批量日志文件分析"""
print("\n" + "=" * 50)
print("📚 示例3:批量日志文件分析")
print("=" * 50)

# 创建模拟日志文件
日志文件列表 = []
for i in range(1, 6):
文件名 = f"temp_log_{i}.txt"
日志内容 = f"""2024-01-{i:02d} 10:00:00 INFO 系统启动
2024-01-{i:02d} 10:01:00 INFO 加载配置
2024-01-{i:02d} 10:02:00 WARNING 内存使用率高
2024-01-{i:02d} 10:03:00 ERROR 数据库连接失败
2024-01-{i:02d} 10:04:00 INFO 重试连接
2024-01-{i:02d} 10:05:00 ERROR 连接超时
"""
await 写入文件(文件名, 日志内容)
日志文件列表.append(文件名)

print(f"📝 分析 {len(日志文件列表)} 个日志文件...")
开始时间 = time.time()

# 并发分析所有日志
任务列表 = [分析日志文件(文件) for 文件 in 日志文件列表]
结果列表 = await asyncio.gather(*任务列表)

总耗时 = time.time() - 开始时间

# 汇总统计
总统计 = {"INFO": 0, "WARNING": 0, "ERROR": 0, "总行数": 0}
for 结果 in 结果列表:
if 结果["成功"]:
for 键, 值 in 结果["统计"].items():
总统计[键] += 值

print(f"\n📊 分析报告(耗时 {总耗时:.2f}秒):")
print("-" * 50)
print(f"总行数:{总统计['总行数']}")
print(f"INFO:{总统计['INFO']}")
print(f"WARNING:{总统计['WARNING']}")
print(f"ERROR:{总统计['ERROR']}")
print("-" * 50)

# 清理临时文件
for 文件 in 日志文件列表:
if os.path.exists(文件):
os.remove(文件)


# ============================================
# 示例4:CSV数据处理
# ============================================

async def 读取CSV(文件路径: str) -> List[Dict[str, str]]:
"""读取CSV文件"""
数据列表 = []

async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
# 读取标题行
标题行 = await f.readline()
字段名 = 标题行.strip().split(',')

# 读取数据行
async forin f:
字段值 = 行.strip().split(',')
if len(字段值) == len(字段名):
数据 = dict(zip(字段名, 字段值))
数据列表.append(数据)

return 数据列表


async def 写入CSV(文件路径: str, 数据列表: List[Dict[str, str]]) -> None:
"""写入CSV文件"""
if not 数据列表:
return

async with aiofiles.open(文件路径, 'w', encoding='utf-8') as f:
# 写入标题行
字段名 = list(数据列表[0].keys())
await f.write(','.join(字段名) + '\n')

# 写入数据行
for 数据 in 数据列表:
字段值 = [str(数据.get(字段, '')) for 字段 in 字段名]
await f.write(','.join(字段值) + '\n')


async def 示例4_CSV处理() -> None:
"""示例4:CSV数据批量处理"""
print("\n" + "=" * 50)
print("📚 示例4:CSV数据批量处理")
print("=" * 50)

# 创建模拟CSV文件
CSV文件列表 = []
for i in range(1, 4):
文件名 = f"temp_users_{i}.csv"
CSV内容 = f"""姓名,邮箱,年龄
用户{i}1,user{i}1@example.com,25
用户{i}2,user{i}2@example.com,30
用户{i}3,user{i}3@example.com,28
"""
await 写入文件(文件名, CSV内容)
CSV文件列表.append(文件名)

print(f"📥 读取 {len(CSV文件列表)} 个CSV文件...")
开始时间 = time.time()

# 并发读取所有CSV
任务列表 = [读取CSV(文件) for 文件 in CSV文件列表]
数据列表组 = await asyncio.gather(*任务列表)

# 合并所有数据
所有数据 = []
for 数据列表 in 数据列表组:
所有数据.extend(数据列表)

总耗时 = time.time() - 开始时间

print(f"✅ 读取完成(耗时 {总耗时:.2f}秒)")
print(f"共读取 {len(所有数据)} 条数据")

# 写入合并后的文件
print("\n📤 写入合并文件...")
合并文件 = "temp_merged_users.csv"
await 写入CSV(合并文件, 所有数据)
print(f"✅ 写入完成:{合并文件}")

# 清理临时文件
for 文件 in CSV文件列表 + [合并文件]:
if os.path.exists(文件):
os.remove(文件)


# ============================================
# 示例5:大文件流式处理
# ============================================

async def 流式处理大文件(输入文件: str, 输出文件: str) -> None:
"""流式处理大文件,避免内存溢出"""
处理行数 = 0

async with aiofiles.open(输入文件, 'r', encoding='utf-8') as f_in:
async with aiofiles.open(输出文件, 'w', encoding='utf-8') as f_out:
async forin f_in:
# 处理每一行(这里简单转大写)
处理后的行 = 行.upper()
await f_out.write(处理后的行)
处理行数 += 1

return 处理行数


async def 示例5_流式处理() -> None:
"""示例5:大文件流式处理"""
print("\n" + "=" * 50)
print("📚 示例5:大文件流式处理")
print("=" * 50)

# 创建模拟大文件
输入文件 = "temp_large_file.txt"
输出文件 = "temp_processed_file.txt"

print("📝 创建测试文件...")
内容 = "这是第{i}行数据\n" * 100
await 写入文件(输入文件, 内容)

print("🔄 流式处理文件...")
开始时间 = time.time()

处理行数 = await 流式处理大文件(输入文件, 输出文件)

总耗时 = time.time() - 开始时间

print(f"✅ 处理完成(耗时 {总耗时:.2f}秒)")
print(f"共处理 {处理行数} 行")

print("\n💡 关键点:")
print(" 1. 逐行处理,不一次性加载全部")
print(" 2. 适合处理GB级别的大文件")
print(" 3. 内存占用低")

# 清理
for 文件 in [输入文件, 输出文件]:
if os.path.exists(文件):
os.remove(文件)


# ============================================
# 示例6:文件监控和处理
# ============================================

async def 监控并处理文件(文件路径: str, 检查间隔: float = 1.0) -> None:
"""监控文件变化并处理"""
上次大小 = 0

for _ in range(3): # 监控3次
if os.path.exists(文件路径):
当前大小 = os.path.getsize(文件路径)

if 当前大小 > 上次大小:
print(f" 📊 文件增长:{上次大小} -> {当前大小} 字节")

# 读取新增内容
async with aiofiles.open(文件路径, 'r', encoding='utf-8') as f:
内容 = await f.read()
print(f" 📖 当前内容行数:{len(内容.splitlines())}")

上次大小 = 当前大小

await asyncio.sleep(检查间隔)


async def 示例6_文件监控() -> None:
"""示例6:文件监控和处理"""
print("\n" + "=" * 50)
print("📚 示例6:文件监控和处理")
print("=" * 50)

监控文件 = "temp_monitor.txt"

# 创建初始文件
await 写入文件(监控文件, "初始内容\n")

# 启动监控任务
监控任务 = asyncio.create_task(监控并处理文件(监控文件, 0.5))

# 模拟文件变化
await asyncio.sleep(0.6)
async with aiofiles.open(监控文件, 'a', encoding='utf-8') as f:
await f.write("新增内容1\n")

await asyncio.sleep(0.6)
async with aiofiles.open(监控文件, 'a', encoding='utf-8') as f:
await f.write("新增内容2\n")

# 等待监控完成
await 监控任务

print("\n💡 关键点:")
print(" 1. 使用 create_task 创建后台监控任务")
print(" 2. 定期检查文件变化")
print(" 3. 适用于日志监控、数据采集等场景")

# 清理
if os.path.exists(监控文件):
os.remove(监控文件)


# ============================================
# 主程序
# ============================================

async def main() -> None:
"""主程序:运行所有示例"""
print("🎓 第5课:异步文件和数据库操作")
print("=" * 50)

# 运行所有示例
await 示例1_基础文件操作()

# 创建测试文件
测试文件列表 = []
for i in range(1, 6):
文件名 = f"temp_input_{i}.txt"
await 写入文件(文件名, f"这是测试文件{i}\n包含一些内容\n")
测试文件列表.append(文件名)

await 批量处理_同步方式(测试文件列表)
await 批量处理_异步方式(测试文件列表)

# 清理测试文件
for 文件 in 测试文件列表:
if os.path.exists(文件):
os.remove(文件)

# 清理输出文件
for i in range(1, 11):
输出文件 = f"temp_output_{i}.txt"
if os.path.exists(输出文件):
os.remove(输出文件)

await 示例3_日志分析()
await 示例4_CSV处理()
await 示例5_流式处理()
await 示例6_文件监控()

# 总结
print("\n" + "=" * 50)
print("🎉 第5课完成!")
print("=" * 50)
print("""
📚 你学到了什么?
1. 使用 aiofiles 进行异步文件操作
2. 批量处理文件,效率提升数倍
3. 日志文件分析和CSV数据处理
4. 大文件流式处理
5. 文件监控和实时处理

🎯 核心代码:
# 异步读取
async with aiofiles.open(文件, 'r') as f:
内容 = await f.read()

# 异步写入
async with aiofiles.open(文件, 'w') as f:
await f.write(内容)

# 批量处理
任务列表 = [处理文件(f) for f in 文件列表]
结果 = await asyncio.gather(*任务列表)

💡 最佳实践:
1. 使用 async with 管理资源
2. 批量操作提高效率
3. 流式处理大文件
4. 及时清理临时文件
5. 添加错误处理

⚠️ 注意事项:
1. 不要一次性读取超大文件
2. 记得关闭文件句柄
3. 处理编码问题
4. 限制并发数量

💪 动手练习:
1. 安装真实库:uv pip install aiofiles aiosqlite
2. 处理真实的日志文件
3. 实现CSV批量转换
4. 完成课后练习题

🎯 下一步:
学习常见陷阱和最佳实践(第6课)
""")


if __name__ == "__main__":
# 运行主程序
asyncio.run(main())