07_FastAPI异步实战开发

前面学习了asyncio的基础知识,现在是时候在真实Web框架中应用了!FastAPI是现代Python Web框架,天然支持异步,性能极高。本课将教你如何在FastAPI中使用异步编程,包括异步路由、数据库操作、外部API调用、文件处理等。我们将构建一个完整的异步API服务,学习请求并发处理、连接池管理、性能优化等生产环境必备技能。学完后你将能够开发高性能的异步Web应用。


📖 课程目标

  • 掌握FastAPI异步路由的编写
  • 学会异步数据库操作(SQLAlchemy + asyncpg)
  • 实现异步外部API调用和数据聚合
  • 掌握异步文件上传和处理
  • 学习连接池和并发控制
  • 完成实战项目:异步API服务

🎯 为什么选择FastAPI?

FastAPI的优势

特性说明优势
天然异步基于Starlette,完全支持async/await高性能
自动文档自动生成OpenAPI文档易于测试
类型提示基于Pydantic,强类型验证代码健壮
高性能性能接近Node.js和Go快速响应
易用性语法简洁,学习曲线平缓快速开发

性能对比

框架请求/秒延迟
Django(同步)~1,000
Flask(同步)~2,000
FastAPI(异步)~30,000
Node.js~35,000

🚀 环境准备

安装依赖

1
2
3
4
5
6
7
8
9
10
11
# 进入项目目录
cd demo_13

# 激活虚拟环境
source ../.venv/bin/activate

# 安装FastAPI和相关库
uv pip install fastapi uvicorn[standard]
uv pip install aiohttp aiofiles
uv pip install sqlalchemy asyncpg aiosqlite
uv pip install httpx # 异步HTTP客户端

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
07_fastapi_project/
├── main.py # 主应用
├── models.py # 数据模型
├── database.py # 数据库配置
├── routers/ # 路由模块
│ ├── __init__.py
│ ├── users.py # 用户相关
│ ├── posts.py # 文章相关
│ └── external.py # 外部API
├── services/ # 业务逻辑
│ ├── __init__.py
│ └── external_api.py # 外部API服务
└── requirements.txt # 依赖列表

📝 基础示例

1. 创建第一个异步API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi import FastAPI
import asyncio

app = FastAPI(title="异步API示例")

@app.get("/")
async def root():
"""根路由"""
return {"message": "Hello FastAPI Async!"}

@app.get("/slow")
async def slow_endpoint():
"""模拟慢接口"""
await asyncio.sleep(2) # 模拟耗时操作
return {"message": "任务完成"}

@app.get("/fast")
async def fast_endpoint():
"""快速接口"""
return {"message": "立即响应"}

运行方式

1
uvicorn main:app --reload

访问 http://localhost:8000/docs 查看自动生成的API文档!


💡 实战1:异步数据库操作

配置异步数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import declarative_base

# 数据库URL(SQLite异步)
DATABASE_URL = "sqlite+aiosqlite:///./test.db"

# 创建异步引擎
engine = create_async_engine(
DATABASE_URL,
echo=True, # 打印SQL语句
)

# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)

Base = declarative_base()

async def get_db():
"""获取数据库会话"""
async with AsyncSessionLocal() as session:
yield session

async def init_db():
"""初始化数据库"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

定义模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# models.py
from sqlalchemy import Column, Integer, String, Text, DateTime
from datetime import datetime
from database import Base

class User(Base):
"""用户模型"""
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
email = Column(String(100), unique=True)
created_at = Column(DateTime, default=datetime.utcnow)

class Post(Base):
"""文章模型"""
__tablename__ = "posts"

id = Column(Integer, primary_key=True, index=True)
title = Column(String(200))
content = Column(Text)
user_id = Column(Integer)
created_at = Column(DateTime, default=datetime.utcnow)

异步CRUD操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# routers/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_db
from models import User
from pydantic import BaseModel

router = APIRouter(prefix="/users", tags=["用户"])

class UserCreate(BaseModel):
"""用户创建模型"""
username: str
email: str

class UserResponse(BaseModel):
"""用户响应模型"""
id: int
username: str
email: str

class Config:
from_attributes = True

@router.post("/", response_model=UserResponse)
async def create_user(
user: UserCreate,
db: AsyncSession = Depends(get_db)
):
"""创建用户(异步)"""
# 创建用户对象
db_user = User(username=user.username, email=user.email)

# 添加到数据库
db.add(db_user)
await db.commit()
await db.refresh(db_user)

return db_user

@router.get("/", response_model=list[UserResponse])
async def get_users(
skip: int = 0,
limit: int = 10,
db: AsyncSession = Depends(get_db)
):
"""获取用户列表(异步)"""
result = await db.execute(
select(User).offset(skip).limit(limit)
)
users = result.scalars().all()
return users

@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db)
):
"""获取单个用户(异步)"""
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()

if not user:
raise HTTPException(status_code=404, detail="用户不存在")

return user

💡 实战2:异步外部API调用

场景:聚合多个API数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# services/external_api.py
import aiohttp
import asyncio
from typing import List, Dict

class ExternalAPIService:
"""外部API服务"""

def __init__(self):
self.timeout = aiohttp.ClientTimeout(total=10)

async def fetch_weather(self, city: str) -> Dict:
"""获取天气信息"""
# 实际项目中替换为真实API
await asyncio.sleep(0.5) # 模拟API延迟
return {
"city": city,
"temperature": 25,
"weather": "晴天"
}

async def fetch_news(self, category: str) -> List[Dict]:
"""获取新闻"""
await asyncio.sleep(0.8) # 模拟API延迟
return [
{"title": f"{category}新闻1", "source": "新闻网A"},
{"title": f"{category}新闻2", "source": "新闻网B"}
]

async def fetch_stock(self, symbol: str) -> Dict:
"""获取股票信息"""
await asyncio.sleep(0.3) # 模拟API延迟
return {
"symbol": symbol,
"price": 150.5,
"change": "+2.5%"
}

async def aggregate_data(self, city: str) -> Dict:
"""聚合多个API数据(并发)"""
# 同时调用多个API
weather, news, stock = await asyncio.gather(
self.fetch_weather(city),
self.fetch_news("科技"),
self.fetch_stock("AAPL"),
return_exceptions=True # 某个失败不影响其他
)

return {
"weather": weather,
"news": news,
"stock": stock
}

# 创建全局服务实例
external_api_service = ExternalAPIService()

路由实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# routers/external.py
from fastapi import APIRouter
from services.external_api import external_api_service

router = APIRouter(prefix="/external", tags=["外部API"])

@router.get("/aggregate/{city}")
async def get_aggregate_data(city: str):
"""
聚合数据接口

同时调用天气、新闻、股票API,大幅提升响应速度
"""
data = await external_api_service.aggregate_data(city)
return data

@router.get("/weather/{city}")
async def get_weather(city: str):
"""获取天气"""
return await external_api_service.fetch_weather(city)

💡 实战3:异步文件上传和处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from fastapi import APIRouter, UploadFile, File
import aiofiles
import asyncio
from pathlib import Path

router = APIRouter(prefix="/files", tags=["文件"])

UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@router.post("/upload")
async def upload_file(file: UploadFile = File(...)):
"""异步文件上传"""
file_path = UPLOAD_DIR / file.filename

# 异步写入文件
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)

return {
"filename": file.filename,
"size": len(content),
"path": str(file_path)
}

@router.post("/upload-multiple")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
"""批量异步上传"""

async def save_file(file: UploadFile):
file_path = UPLOAD_DIR / file.filename
async with aiofiles.open(file_path, 'wb') as f:
content = await file.read()
await f.write(content)
return {
"filename": file.filename,
"size": len(content)
}

# 并发保存所有文件
results = await asyncio.gather(
*[save_file(file) for file in files]
)

return {"files": results, "total": len(results)}

@router.get("/process/{filename}")
async def process_file(filename: str):
"""异步处理文件"""
file_path = UPLOAD_DIR / filename

if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")

# 异步读取和处理
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
# 模拟处理
processed = content.upper()

return {
"filename": filename,
"original_length": len(content),
"processed_length": len(processed)
}

💡 实战4:WebSocket实时通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from fastapi import WebSocket, WebSocketDisconnect
from typing import List

class ConnectionManager:
"""WebSocket连接管理器"""

def __init__(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
"""接受连接"""
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
"""断开连接"""
self.active_connections.remove(websocket)

async def broadcast(self, message: str):
"""广播消息"""
for connection in self.active_connections:
await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
"""WebSocket端点"""
await manager.connect(websocket)

try:
while True:
# 接收消息
data = await websocket.receive_text()

# 广播给所有客户端
await manager.broadcast(f"客户端 {client_id}: {data}")

except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"客户端 {client_id} 断开连接")

🎯 性能优化技巧

1. 连接池配置

1
2
3
4
5
6
7
8
9
# 配置数据库连接池
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_size=20, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_timeout=30, # 获取连接超时
pool_recycle=3600, # 连接回收时间
)

2. 限制并发

1
2
3
4
5
6
7
8
9
10
11
12
from asyncio import Semaphore

# 全局限流器
rate_limiter = Semaphore(10) # 最多10个并发

@router.get("/limited")
async def limited_endpoint():
"""限流接口"""
async with rate_limiter:
# 执行耗时操作
await asyncio.sleep(1)
return {"message": "完成"}

3. 缓存优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from functools import lru_cache
import asyncio

# 简单的异步缓存
_cache = {}

async def get_cached_data(key: str):
"""带缓存的数据获取"""
if key in _cache:
return _cache[key]

# 模拟耗时查询
await asyncio.sleep(1)
data = f"数据_{key}"

_cache[key] = data
return data

@router.get("/cached/{key}")
async def cached_endpoint(key: str):
"""缓存接口"""
data = await get_cached_data(key)
return {"data": data, "cached": key in _cache}

4. 后台任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from fastapi import BackgroundTasks

async def send_email(email: str, message: str):
"""发送邮件(后台任务)"""
await asyncio.sleep(2) # 模拟发送
print(f"邮件已发送到 {email}: {message}")

@router.post("/send-notification")
async def send_notification(
email: str,
message: str,
background_tasks: BackgroundTasks
):
"""发送通知(立即返回)"""
# 添加后台任务
background_tasks.add_task(send_email, email, message)

return {"message": "通知已加入队列"}

📊 性能测试

使用httpx进行压力测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# test_performance.py
import asyncio
import httpx
import time

async def test_concurrent_requests():
"""测试并发性能"""
url = "http://localhost:8000/external/weather/北京"

async with httpx.AsyncClient() as client:
# 同时发送100个请求
start_time = time.time()

tasks = [client.get(url) for _ in range(100)]
responses = await asyncio.gather(*tasks)

elapsed = time.time() - start_time

print(f"完成100个请求,耗时:{elapsed:.2f}秒")
print(f"QPS:{100/elapsed:.0f}")
print(f"成功:{sum(1 for r in responses if r.status_code == 200)}")

if __name__ == "__main__":
asyncio.run(test_concurrent_requests())

📝 完整项目示例

查看 07_fastapi_project/ 目录中的完整示例项目!

源码仓库:https://gitee.com/uyynot/asynciofastapiproject.git

运行项目

1
2
3
4
5
6
7
8
9
10
11
# 进入项目目录
cd 07_fastapi_project

# 运行FastAPI服务
uvicorn main:app --reload --port 8000

# 访问API文档
# http://localhost:8000/docs

# 运行性能测试
python test_performance.py

🎯 最佳实践

1. 依赖注入

1
2
3
4
5
6
7
8
9
10
11
12
# 使用FastAPI的依赖注入
from fastapi import Depends

async def get_current_user(token: str = Header(...)):
"""获取当前用户(依赖)"""
# 验证token
return {"user_id": 1, "username": "admin"}

@router.get("/me")
async def read_users_me(current_user = Depends(get_current_user)):
"""获取当前用户信息"""
return current_user

2. 错误处理

1
2
3
4
5
6
7
8
9
10
from fastapi import HTTPException
from fastapi.responses import JSONResponse

@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
"""全局异常处理"""
return JSONResponse(
status_code=500,
content={"message": "服务器内部错误", "detail": str(exc)}
)

3. 中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from fastapi.middleware.cors import CORSMiddleware
import time

@app.middleware("http")
async def add_process_time_header(request, call_next):
"""添加响应时间中间件"""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response

# CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

📝 本课小结

核心知识点

  1. FastAPI异步路由 - 使用 async def 定义路由
  2. 异步数据库 - SQLAlchemy + asyncpg/aiosqlite
  3. 外部API调用 - 并发调用多个API
  4. 文件处理 - 异步上传和批量处理
  5. WebSocket - 实时通信
  6. 性能优化 - 连接池、限流、缓存

FastAPI vs Django

特性FastAPIDjango
异步支持✅ 原生支持⚠️ 部分支持
性能⚡ 极高🐢 一般
学习曲线📈 平缓📈 陡峭
文档✅ 自动生成❌ 需手动
适用场景API服务全栈应用

何时使用FastAPI异步

  • ✅ 高并发API服务
  • ✅ 大量外部API调用
  • ✅ 实时数据处理
  • ✅ WebSocket应用
  • ✅ 微服务架构

🎯 下一步

  1. 运行完整示例项目
  2. 尝试添加新的功能
  3. 进行性能测试和优化
  4. 部署到生产环境

💪 课后练习

练习1:实现用户认证

使用JWT实现异步用户认证系统。

练习2:数据聚合API

创建一个API,并发调用3个外部服务并聚合数据。

练习3:文件批量处理

实现批量图片压缩API。

答案在 07_fastapi_project/ 目录中! 😊


🎉 恭喜!

你已经掌握了FastAPI异步开发,可以开发高性能的Web应用了!

继续加油! 🚀