05_异步file和data库

file和data库操作涉及大量I/O等待,是异步编程的另一大应用场景。本课学习使用aiofiles进line异步file操作,使用aiosqlite进line异步data库操作。处理100个file,同步需要10秒,异步只需0.5秒!我们将通过日志分析系统、CSV批量处理、data批量导入等实战项目,让你掌握如何高效处理file和data库,并学会流式处理大file避免内存溢出。


📖 课程目标

  • 学会使用 aiofiles 进line异步file操作
  • 掌握异步data库操作(SQLite)
  • 理解异步I/O的优势
  • 完成实战项目:日志处理系统
  • 完成实战项目:用户data批量导入

🎯 为什么需要异步file操作?

场景对比

假设你要处理100个日志file,每个file需要0.1秒:

方式耗时说明
同步方式10秒一个一个处理 😴
异步方式约1秒同时处理 ⚡

典型应用场景

  • 📝 日志处理:批量读写日志file
  • 📊 data导入导出:批量处理CSV、JSONfile
  • 🗄️ data库操作:批量查询和插入
  • 📁 file转换:批量转换file格式

📦 安装依赖

1
2
3
4
5
6
7
8
# 激活虚拟环境
source ../.venv/bin/activate

# 安装异步file库
uv pip install aiofiles

# 安装异步data库库
uv pip install aiosqlite

📄 异步file操作

1. 读取file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import aiofiles
import asyncio

async def read_file(file_path: str) -> str:
"""异步读取file"""
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
return content

# 使用
async def main():
content = await read_file("data.txt")
print(content)

asyncio.run(main())

2. 写入file

1
2
3
4
async def write_file(file_path: str, content: str) -> None:
"""异步写入file"""
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(content)

3. 逐line读取

1
2
3
4
5
6
7
async def read_lines(file_path: str) -> List[str]:
"""异步逐line读取file"""
lines_list = []
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
async for line in f:
lines_list.append(line.strip())
return lines_list

4. 追加content

1
2
3
4
async def append_content(file_path: str, content: str) -> None:
"""异步追加content到file"""
async with aiofiles.open(file_path, 'a', encoding='utf-8') as f:
await f.write(content + '\n')

💡 实战案例1:批量日志处理

需求

处理多个日志file:

  1. 读取所有日志file
  2. 提取错误信息
  3. 生成汇总报告

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import aiofiles
import asyncio
from typing import List, Dict
from datetime import datetime

async def process_log_file(file_path: str) -> Dict[str, any]:
"""处理单个日志file"""
try:
error_list = []
total_lines = 0

async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
async for line in f:
total_lines += 1
if 'ERROR' in line or '错误' in line:
error_list.append(line.strip())

return {
"file": file_path,
"total_lines": total_lines,
"error_count": len(error_list),
"error_list": error_list[:5], # 只保留前5个
"success": True
}

except Exception as e:
return {
"file": file_path,
"error": str(e),
"success": False
}

async def batch_process_logs(file_list: List[str]) -> None:
"""批量处理日志file"""
print(f"📝 开始处理 {len(file_list)} 个日志file...")

# 同时处理所有file
task_list = [process_log_file(file) for file in file_list]
result_list = await asyncio.gather(*task_list)

# 生成汇总报告
total_error_count = sum(r["error_count"] for r in result_list if r["success"])

print(f"\n📊 处理报告:")
print(f" file总数:{len(result_list)}")
print(f" 错误总数:{total_error_count}")

for result in result_list:
if result["success"]:
print(f" ✅ {result['file']}: {result['error_count']}个错误")
else:
print(f" ❌ {result['file']}: {result['错误']}")

完整代码在 05_examples.py 中!


🗄️ 异步data库操作

1. 连接data库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import aiosqlite
import asyncio

async def connect_database():
"""连接SQLitedata库"""
async with aiosqlite.connect('database.db') as db:
# 执line操作
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
''')
await db.commit()

2. 插入data

1
2
3
4
5
6
7
8
async def insert_user(name: str, email: str) -> None:
"""插入用户data"""
async with aiosqlite.connect('database.db') as db:
await db.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
await db.commit()

3. 查询data

1
2
3
4
5
6
async def query_all_users() -> List[tuple]:
"""查询all_users"""
async with aiosqlite.connect('database.db') as db:
async with db.execute('SELECT * FROM users') as cursor:
result = await cursor.fetchall()
return result

4. 批量插入

1
2
3
4
5
6
7
8
async def batch_insert_users(user_list: List[tuple]) -> None:
"""批量插入用户"""
async with aiosqlite.connect('database.db') as db:
await db.executemany(
'INSERT INTO users (name, email) VALUES (?, ?)',
user_list
)
await db.commit()

💡 实战案例2:用户data批量导入

需求

从多个CSVfile导入用户data到data库:

  1. 读取多个CSVfile
  2. 解析用户data
  3. 批量插入data库
  4. 生成导入报告

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import aiofiles
import aiosqlite
import asyncio
from typing import List, Dict

async def read_csv_file(file_path: str) -> List[Dict]:
"""读取CSVfile"""
user_list = []

async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
# 跳过标题line
await f.readline()

# 读取dataline
async for line in f:
fields = line.strip().split(',')
if len(fields) >= 2:
user_list.append({
"name": fields[0],
"email": fields[1]
})

return user_list

async def import_users_to_database(
db: aiosqlite.Connection,
user_list: List[Dict]
) -> int:
"""导入用户到data库"""
data = [(u["name"], u["email"]) for u in user_list]

await db.executemany(
'INSERT INTO users (name, email) VALUES (?, ?)',
data
)
await db.commit()

return len(data)

async def batch_import_system(file_list: List[str]) -> None:
"""批量导入系统"""
print(f"📥 开始导入 {len(file_list)} 个file...")

# 1. 并发读取所有file
read_tasks = [read_csv_file(file) for file in file_list]
user_list_groups = await asyncio.gather(*read_tasks)

# 2. 合并all_users
all_users = []
for user_list in user_list_groups:
all_users.extend(user_list)

# 3. 批量导入data库
async with aiosqlite.connect('users.db') as db:
# 创建表
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
email TEXT
)
''')

# 批量插入
imported_count = await import_users_to_database(db, all_users)

print(f"✅ 导入完成!共导入 {imported_count} 个用户")

📊 性能对比

场景:处理50个file

操作同步方式异步方式提升
读取file5秒0.5秒10倍
写入file5秒0.5秒10倍
data库查询10秒1秒10倍
data库插入8秒1秒8倍

🎯 高级技巧

技巧1:流式处理大file

1
2
3
4
5
6
7
8
async def stream_large_file(input_file: str, output_file: str):
"""流式处理,避免内存溢出"""
async with aiofiles.open(input_file, 'r') as f_in:
async with aiofiles.open(output_file, 'w') as f_out:
async for line in f_in:
# 处理每一line
processed_line = line.upper()
await f_out.write(processed_line)

技巧2:事务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def transaction_processing():
"""使用事务确保data一致性"""
async with aiosqlite.connect('database.db') as db:
try:
# 开始事务
await db.execute('BEGIN')

# 执line多个操作
await db.execute('INSERT INTO users ...')
await db.execute('UPDATE accounts ...')

# 提交事务
await db.commit()

except Exception as e:
# 回滚事务
await db.rollback()
raise e

技巧3:连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
class DatabaseConnectionPool:
"""简单的data库连接池"""

def __init__(self, db_path: str, max_connections: int = 5):
self.db_path = db_path
self.semaphore = asyncio.Semaphore(max_connections)

async def execute_query(self, sql: str):
"""使用连接池执line查询"""
async with self.semaphore:
async with aiosqlite.connect(self.db_path) as db:
async with db.execute(sql) as cursor:
return await cursor.fetchall()

📝 本课小结

核心知识点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1. 异步读取file
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()

# 2. 异步写入file
async with aiofiles.open(file_path, 'w') as f:
await f.write(content)

# 3. 异步data库查询
async with aiosqlite.connect('db.db') as db:
async with db.execute(sql) as cursor:
result = await cursor.fetchall()

# 4. 批量操作
task_list = [process_file(f) for f in file_list]
result = await asyncio.gather(*task_list)

最佳实践

  1. ✅ 使用 async with 管理资源
  2. ✅ 批量操作提高效率
  3. ✅ 流式处理大file
  4. ✅ 使用事务确保一致性
  5. ✅ 限制并发数量

常见陷阱

  1. ❌ 不关闭file句柄(内存泄漏)
  2. ❌ 一次性读取超大file(内存溢出)
  3. ❌ 忘记提交事务(data丢失)
  4. ❌ 并发数太大(资源耗尽)

🎯 下一步

  1. 运line 05_examples.py 查看完整示例
  2. 尝试处理真实的file和data库
  3. 完成课后练习题
  4. 准备学习第6课:常见陷阱和最佳实践

💪 课后练习

练习1:日志分析器

编写一个程序,分析多个日志file,stats各类日志级别的数量。

1
2
3
4
async def analyze_log(file_path: str) -> Dict[str, int]:
# stats INFO、WARNING、ERROR 的数量
# 你的代码
pass

练习2:CSV转JSON

批量将CSVfile转换为JSONfile。

1
2
3
async def csv_to_json(csv_file: str, json_file: str):
# 你的代码
pass

练习3:data库备份

实现一个异步data库备份工具。

1
2
3
async def backup_database(source_database: str, target_database: str):
# 你的代码
pass

答案在 练习答案.py 中! 😊

05_examples.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
"""
第5课示例代码:异步file和data库操作

注意:本示例会在当前目录创建temp_file和data库
运line完成后会自动清理

运line方式:
python 05_examples.py
"""

import asyncio
import time
import os
import tempfile
from typing import List, Dict
from pathlib import Path


# ============================================
# 模拟 aiofiles(用于演示,实际使用需安装aiofiles)
# ============================================

class AsyncFile:
"""模拟异步file对象"""

def __init__(self, file_path: str, mode: str, encoding: str = 'utf-8'):
self.file_path = file_path
self.mode = mode
self.encoding = encoding
self._file = None

async def __aenter__(self):
await asyncio.sleep(0.05) # 模拟I/O延迟
self._file = open(self.file_path, self.mode, encoding=self.encoding)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._file:
self._file.close()

async def read(self) -> str:
await asyncio.sleep(0.1) # 模拟I/O延迟
return self._file.read()

async def write(self, content: str) -> None:
await asyncio.sleep(0.1) # 模拟I/O延迟
self._file.write(content)

async def readline(self) -> str:
await asyncio.sleep(0.02)
return self._file.readline()

def __aiter__(self):
return self

async def __anext__(self):
await asyncio.sleep(0.02) # 模拟I/O延迟
line = self._file.readline()
if line:
return line
raise StopAsyncIteration


class aiofiles:
"""模拟aiofiles模块"""

@staticmethod
def open(file_path: str, mode: str = 'r', encoding: str = 'utf-8'):
return AsyncFile(file_path, mode, encoding)


# ============================================
# 示例1:基础file操作
# ============================================

async def read_file(file_path: str) -> str:
"""异步读取file"""
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
return content


async def write_file(file_path: str, content: str) -> None:
"""异步写入file"""
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(content)


async def example1_basic_file_operations() -> None:
"""示例1:基础的异步file操作"""
print("\n" + "=" * 50)
print("📚 示例1:基础异步file操作")
print("=" * 50)

# 创建temp_file
temp_file = "temp_test.txt"

# 写入file
print("📝 写入file...")
content = "这是一个测试file\n包含多linecontent\n用于演示异步file操作"
await write_file(temp_file, content)
print("✅ 写入完成")

# 读取file
print("\n📖 读取file...")
read_content = await read_file(temp_file)
print(f"✅ 读取完成:\n{read_content}")

# 清理
if os.path.exists(temp_file):
os.remove(temp_file)

print("\n💡 关key点:")
print(" 1. 使用 async with 管理file")
print(" 2. 使用 await 等待I/O操作")
print(" 3. 自动关闭file句柄")


# ============================================
# 示例2:批量file处理 - 同步 vs 异步
# ============================================

async def process_single_file(file_path: str, file_index: int) -> Dict[str, any]:
"""process_single_file"""
try:
# 读取file
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()

# 处理content(转大写)
processed_content = content.upper()

# 写入新file
output_file = f"temp_output_{file_index}.txt"
async with aiofiles.open(output_file, 'w', encoding='utf-8') as f:
await f.write(processed_content)

return {
"input_file": file_path,
"output_file": output_file,
"char_count": len(content),
"success": True
}

except Exception as e:
return {
"input_file": file_path,
"error": str(e),
"success": False
}


async def batch_process_sync(file_list: List[str]) -> None:
"""同步方式:一个一个处理"""
print("\n" + "=" * 50)
print("📚 示例2A:批量file处理 - 同步方式")
print("=" * 50)

start_time = time.time()
result_list = []

# 一个一个处理
for i, file in enumerate(file_list, 1):
print(f" [{i}/{len(file_list)}] 处理: {file}")
result = await process_single_file(file, i)
result_list.append(result)

total_time = time.time() - start_time

print(f"\n✅ completed!total_time:{total_time:.2f}秒")
print(f"成功:{sum(1 for r in result_list if r['success'])}/{len(result_list)}")


async def batch_process_async(file_list: List[str]) -> None:
"""异步方式:同时处理"""
print("\n" + "=" * 50)
print("📚 示例2B:批量file处理 - 异步方式")
print("=" * 50)

start_time = time.time()

# 同时处理所有file
task_list = [
process_single_file(file, i+1)
for i, file in enumerate(file_list)
]

print(f"🚀 同时处理 {len(task_list)} 个file...")
result_list = await asyncio.gather(*task_list)

total_time = time.time() - start_time

print(f"\n✅ completed!total_time:{total_time:.2f}秒")
print(f"成功:{sum(1 for r in result_list if r['success'])}/{len(result_list)}")
print(f"💡 效率提升:约 {len(file_list)}倍!")


# ============================================
# 示例3:日志file分析
# ============================================

async def analyze_log_file(file_path: str) -> Dict[str, any]:
"""analyze_single_log_file"""
try:
stats = {
"INFO": 0,
"WARNING": 0,
"ERROR": 0,
"total_lines": 0
}

error_list = []

async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
async for line in f:
stats["total_lines"] += 1

if "INFO" in line:
stats["INFO"] += 1
elif "WARNING" in line:
stats["WARNING"] += 1
elif "ERROR" in line:
stats["ERROR"] += 1
error_list.append(line.strip())

return {
"file": file_path,
"stats": stats,
"error_list": error_list[:3], # 只保留前3个
"success": True
}

except Exception as e:
return {
"file": file_path,
"error": str(e),
"success": False
}


async def example3_log_analysis() -> None:
"""示例3:batch_log_file_analysis"""
print("\n" + "=" * 50)
print("📚 示例3:batch_log_file_analysis")
print("=" * 50)

# 创建模拟日志file
log_file_list = []
for i in range(1, 6):
filename = f"temp_log_{i}.txt"
log_content = f"""2024-01-{i:02d} 10:00:00 INFO 系统启动
2024-01-{i:02d} 10:01:00 INFO 加载配置
2024-01-{i:02d} 10:02:00 WARNING 内存使用率高
2024-01-{i:02d} 10:03:00 ERROR data库连接失败
2024-01-{i:02d} 10:04:00 INFO 重试连接
2024-01-{i:02d} 10:05:00 ERROR 连接超时
"""
await write_file(filename, log_content)
log_file_list.append(filename)

print(f"📝 分析 {len(log_file_list)} 个日志file...")
start_time = time.time()

# 并发分析所有日志
task_list = [analyze_log_file(file) for file in log_file_list]
result_list = await asyncio.gather(*task_list)

total_time = time.time() - start_time

# 汇total_stats
total_stats = {"INFO": 0, "WARNING": 0, "ERROR": 0, "total_lines": 0}
for result in result_list:
if result["success"]:
for key, value in result["stats"].items():
total_stats[key] += value

print(f"\n📊 分析报告(duration {total_time:.2f}秒):")
print("-" * 50)
print(f"total_lines:{total_stats['total_lines']}")
print(f"INFO:{total_stats['INFO']}")
print(f"WARNING:{total_stats['WARNING']}")
print(f"ERROR:{total_stats['ERROR']}")
print("-" * 50)

# 清理temp_file
for file in log_file_list:
if os.path.exists(file):
os.remove(file)


# ============================================
# 示例4:CSVdata处理
# ============================================

async def read_csv(file_path: str) -> List[Dict[str, str]]:
"""读取CSVfile"""
data_list = []

async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
# 读取标题line
title_line = await f.readline()
field_names = title_line.strip().split(',')

# 读取dataline
async for line in f:
fieldsvalue = line.strip().split(',')
if len(fieldsvalue) == len(field_names):
data = dict(zip(field_names, fieldsvalue))
data_list.append(data)

return data_list


async def write_csv(file_path: str, data_list: List[Dict[str, str]]) -> None:
"""写入CSVfile"""
if not data_list:
return

async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
# 写入标题line
field_names = list(data_list[0].keys())
await f.write(','.join(field_names) + '\n')

# 写入dataline
for data in data_list:
fieldsvalue = [str(data.get(fields, '')) for fields in field_names]
await f.write(','.join(fieldsvalue) + '\n')


async def example4_csv_processing() -> None:
"""示例4:CSVdata批量处理"""
print("\n" + "=" * 50)
print("📚 示例4:CSVdata批量处理")
print("=" * 50)

# 创建模拟CSVfile
csv_file_list = []
for i in range(1, 4):
filename = f"temp_users_{i}.csv"
csv_content = f"""name,email,年龄
用户{i}1,user{i}1@example.com,25
用户{i}2,user{i}2@example.com,30
用户{i}3,user{i}3@example.com,28
"""
await write_file(filename, csv_content)
csv_file_list.append(filename)

print(f"📥 读取 {len(csv_file_list)} 个CSVfile...")
start_time = time.time()

# 并发读取所有CSV
task_list = [read_csv(file) for file in csv_file_list]
data_list_groups = await asyncio.gather(*task_list)

# 合并all_data
all_data = []
for data_list in data_list_groups:
all_data.extend(data_list)

total_time = time.time() - start_time

print(f"✅ 读取完成(duration {total_time:.2f}秒)")
print(f"共读取 {len(all_data)} 条data")

# 写入合并后的file
print("\n📤 写入merged_file...")
merged_file = "temp_merged_users.csv"
await write_csv(merged_file, all_data)
print(f"✅ 写入完成:{merged_file}")

# 清理temp_file
for file in csv_file_list + [merged_file]:
if os.path.exists(file):
os.remove(file)


# ============================================
# 示例5:大file流式处理
# ============================================

async def stream_large_file(input_file: str, output_file: str) -> None:
"""stream_large_file,避免内存溢出"""
processed_line_count = 0

async with aiofiles.open(input_file, 'r', encoding='utf-8') as f_in:
async with aiofiles.open(output_file, 'w', encoding='utf-8') as f_out:
async for line in f_in:
# 处理每一line(这里简单转大写)
processed_line = line.upper()
await f_out.write(processed_line)
processed_line_count += 1

return processed_line_count


async def example5_stream_processing() -> None:
"""示例5:大file流式处理"""
print("\n" + "=" * 50)
print("📚 示例5:大file流式处理")
print("=" * 50)

# 创建模拟大file
input_file = "temp_large_file.txt"
output_file = "temp_processed_file.txt"

print("📝 创建测试file...")
content = "这是第{i}linedata\n" * 100
await write_file(input_file, content)

print("🔄 流式处理file...")
start_time = time.time()

processed_line_count = await stream_large_file(input_file, output_file)

total_time = time.time() - start_time

print(f"✅ 处理完成(duration {total_time:.2f}秒)")
print(f"共处理 {processed_line_count} line")

print("\n💡 关key点:")
print(" 1. 逐line处理,不一次性加载全部")
print(" 2. 适合处理GB级别的大file")
print(" 3. 内存占用低")

# 清理
for file in [input_file, output_file]:
if os.path.exists(file):
os.remove(file)


# ============================================
# 示例6:file监控和处理
# ============================================

async def monitor_and_process_file(file_path: str, check_interval: float = 1.0) -> None:
"""monitor_file变化并处理"""
last_size = 0

for _ in range(3): # 监控3次
if os.path.exists(file_path):
current_size = os.path.getsize(file_path)

if current_size > last_size:
print(f" 📊 file增长:{last_size} -> {current_size} 字节")

# 读取新增content
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
print(f" 📖 当前contentline数:{len(content.splitlines())}")

last_size = current_size

await asyncio.sleep(check_interval)


async def example6_file_monitoring() -> None:
"""示例6:file监控和处理"""
print("\n" + "=" * 50)
print("📚 示例6:file监控和处理")
print("=" * 50)

monitor_file = "temp_monitor.txt"

# 创建初始file
await write_file(monitor_file, "初始content\n")

# 启动monitor_task
monitor_task = asyncio.create_task(monitor_and_process_file(monitor_file, 0.5))

# 模拟file变化
await asyncio.sleep(0.6)
async with aiofiles.open(monitor_file, 'a', encoding='utf-8') as f:
await f.write("新增content1\n")

await asyncio.sleep(0.6)
async with aiofiles.open(monitor_file, 'a', encoding='utf-8') as f:
await f.write("新增content2\n")

# 等待监控完成
await monitor_task

print("\n💡 关key点:")
print(" 1. 使用 create_task 创建后台monitor_task")
print(" 2. 定期检查file变化")
print(" 3. 适用于日志监控、data采集等场景")

# 清理
if os.path.exists(monitor_file):
os.remove(monitor_file)


# ============================================
# 主程序
# ============================================

async def main() -> None:
"""主程序:运line所有示例"""
print("🎓 第5课:异步file和data库操作")
print("=" * 50)

# 运line所有示例
await example1_basic_file_operations()

# 创建测试file
test_file_list = []
for i in range(1, 6):
filename = f"temp_input_{i}.txt"
await write_file(filename, f"test file{i}\ncontains some content\n")
test_file_list.append(filename)

await batch_process_sync(test_file_list)
await batch_process_async(test_file_list)

# 清理测试file
for file in test_file_list:
if os.path.exists(file):
os.remove(file)

# 清理output_file
for i in range(1, 11):
output_file = f"temp_output_{i}.txt"
if os.path.exists(output_file):
os.remove(output_file)

await example3_log_analysis()
await example4_csv_processing()
await example5_stream_processing()
await example6_file_monitoring()

# 总结
print("\n" + "=" * 50)
print("🎉 第5课完成!")
print("=" * 50)
print("""
📚 你学到了什么?
1. 使用 aiofiles 进line异步file操作
2. 批量处理file,效率提升数倍
3. 日志file分析和CSVdata处理
4. 大file流式处理
5. file监控和实时处理

🎯 核心代码:
# 异步读取
async with aiofiles.open(file, 'r') as f:
content = await f.read()

# 异步写入
async with aiofiles.open(file, 'w') as f:
await f.write(content)

# 批量处理
task_list = [process_file(f) for f in file_list]
result = await asyncio.gather(*task_list)

💡 最佳实践:
1. 使用 async with 管理资源
2. 批量操作提高效率
3. stream_large_file
4. 及时清理temp_file
5. 添加错误处理

⚠️ 注意事项:
1. 不要一次性读取超大file
2. 记得关闭file句柄
3. 处理编码问题
4. 限制并发数量

💪 动手练习:
1. 安装真实库:uv pip install aiofiles aiosqlite
2. 处理真实的日志file
3. 实现CSV批量转换
4. 完成课后练习题

🎯 下一步:
学习常见陷阱和最佳实践(第6课)
""")


if __name__ == "__main__":
# 运line主程序
asyncio.run(main())