前面学习了asyncio的基础知识,现在是时候在真实Web框架中应用了!FastAPI是现代Python Web框架,天然支持异步,性能极高。本课将教你如何在FastAPI中使用异步编程,包括异步路由、数据库操作、外部API调用、文件处理等。我们将构建一个完整的异步API服务,学习请求并发处理、连接池管理、性能优化等生产环境必备技能。学完后你将能够开发高性能的异步Web应用。
📖 课程目标 掌握FastAPI异步路由的编写 学会异步数据库操作(SQLAlchemy + asyncpg) 实现异步外部API调用和数据聚合 掌握异步文件上传和处理 学习连接池和并发控制 完成实战项目:异步API服务 🎯 为什么选择FastAPI? FastAPI的优势 特性 说明 优势 天然异步 基于Starlette,完全支持async/await 高性能 自动文档 自动生成OpenAPI文档 易于测试 类型提示 基于Pydantic,强类型验证 代码健壮 高性能 性能接近Node.js和Go 快速响应 易用性 语法简洁,学习曲线平缓 快速开发 
性能对比 框架 请求/秒 延迟 Django(同步) ~1,000 高 Flask(同步) ~2,000 高 FastAPI(异步) ~30,000 低 Node.js ~35,000 低 
🚀 环境准备 安装依赖 1 2 3 4 5 6 7 8 9 10 11 cd  demo_13source  ../.venv/bin/activateuv pip install fastapi uvicorn[standard] uv pip install aiohttp aiofiles uv pip install sqlalchemy asyncpg aiosqlite uv pip install httpx   
项目结构 1 2 3 4 5 6 7 8 9 10 11 12 13 07_fastapi_project/ ├── main.py                 # 主应用 ├── models.py              # 数据模型 ├── database.py            # 数据库配置 ├── routers/               # 路由模块 │   ├── __init__.py │   ├── users.py          # 用户相关 │   ├── posts.py          # 文章相关 │   └── external.py       # 外部API ├── services/              # 业务逻辑 │   ├── __init__.py │   └── external_api.py   # 外部API服务 └── requirements.txt       # 依赖列表 
📝 基础示例 1. 创建第一个异步API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from  fastapi import  FastAPIimport  asyncioapp = FastAPI(title="异步API示例" ) @app.get("/"  ) async  def  root ():    """根路由"""      return  {"message" : "Hello FastAPI Async!" } @app.get("/slow"  ) async  def  slow_endpoint ():    """模拟慢接口"""      await  asyncio.sleep(2 )       return  {"message" : "任务完成" } @app.get("/fast"  ) async  def  fast_endpoint ():    """快速接口"""      return  {"message" : "立即响应" } 
运行方式 :
1 uvicorn main:app --reload 
访问 http://localhost:8000/docs  查看自动生成的API文档!
💡 实战1:异步数据库操作 配置异步数据库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from  sqlalchemy.ext.asyncio import  create_async_engine, AsyncSessionfrom  sqlalchemy.ext.asyncio import  async_sessionmakerfrom  sqlalchemy.orm import  declarative_baseDATABASE_URL = "sqlite+aiosqlite:///./test.db"  engine = create_async_engine(     DATABASE_URL,     echo=True ,   ) AsyncSessionLocal = async_sessionmaker(     engine,     class_=AsyncSession,     expire_on_commit=False  ) Base = declarative_base() async  def  get_db ():    """获取数据库会话"""      async  with  AsyncSessionLocal() as  session:         yield  session async  def  init_db ():    """初始化数据库"""      async  with  engine.begin() as  conn:         await  conn.run_sync(Base.metadata.create_all) 
定义模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from  sqlalchemy import  Column, Integer, String, Text, DateTimefrom  datetime import  datetimefrom  database import  Baseclass  User (Base ):    """用户模型"""      __tablename__ = "users"           id  = Column(Integer, primary_key=True , index=True )     username = Column(String(50 ), unique=True , index=True )     email = Column(String(100 ), unique=True )     created_at = Column(DateTime, default=datetime.utcnow) class  Post (Base ):    """文章模型"""      __tablename__ = "posts"           id  = Column(Integer, primary_key=True , index=True )     title = Column(String(200 ))     content = Column(Text)     user_id = Column(Integer)     created_at = Column(DateTime, default=datetime.utcnow) 
异步CRUD操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 from  fastapi import  APIRouter, Depends, HTTPExceptionfrom  sqlalchemy.ext.asyncio import  AsyncSessionfrom  sqlalchemy import  selectfrom  database import  get_dbfrom  models import  Userfrom  pydantic import  BaseModelrouter = APIRouter(prefix="/users" , tags=["用户" ]) class  UserCreate (BaseModel ):    """用户创建模型"""      username: str      email: str  class  UserResponse (BaseModel ):    """用户响应模型"""      id : int      username: str      email: str           class  Config :         from_attributes = True  @router.post("/" , response_model=UserResponse ) async  def  create_user (    user: UserCreate,     db: AsyncSession = Depends(get_db )  ):    """创建用户(异步)"""           db_user = User(username=user.username, email=user.email)               db.add(db_user)     await  db.commit()     await  db.refresh(db_user)          return  db_user @router.get("/" , response_model=list [UserResponse] ) async  def  get_users (    skip: int  = 0 ,     limit: int  = 10 ,     db: AsyncSession = Depends(get_db )  ):    """获取用户列表(异步)"""      result = await  db.execute(         select(User).offset(skip).limit(limit)     )     users = result.scalars().all ()     return  users @router.get("/{user_id}" , response_model=UserResponse ) async  def  get_user (    user_id: int ,     db: AsyncSession = Depends(get_db )  ):    """获取单个用户(异步)"""      result = await  db.execute(         select(User).where(User.id  == user_id)     )     user = result.scalar_one_or_none()          if  not  user:         raise  HTTPException(status_code=404 , detail="用户不存在" )          return  user 
💡 实战2:异步外部API调用 场景:聚合多个API数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import  aiohttpimport  asynciofrom  typing import  List , Dict class  ExternalAPIService :    """外部API服务"""           def  __init__ (self ):         self.timeout = aiohttp.ClientTimeout(total=10 )          async  def  fetch_weather (self, city: str  ) -> Dict :         """获取天气信息"""                   await  asyncio.sleep(0.5 )           return  {             "city" : city,             "temperature" : 25 ,             "weather" : "晴天"          }          async  def  fetch_news (self, category: str  ) -> List [Dict ]:         """获取新闻"""          await  asyncio.sleep(0.8 )           return  [             {"title" : f"{category} 新闻1" , "source" : "新闻网A" },             {"title" : f"{category} 新闻2" , "source" : "新闻网B" }         ]          async  def  fetch_stock (self, symbol: str  ) -> Dict :         """获取股票信息"""          await  asyncio.sleep(0.3 )           return  {             "symbol" : symbol,             "price" : 150.5 ,             "change" : "+2.5%"          }          async  def  aggregate_data (self, city: str  ) -> Dict :         """聚合多个API数据(并发)"""                   weather, news, stock = await  asyncio.gather(             self.fetch_weather(city),             self.fetch_news("科技" ),             self.fetch_stock("AAPL" ),             return_exceptions=True            )                  return  {             "weather" : weather,             "news" : news,             "stock" : stock         } external_api_service = ExternalAPIService() 
路由实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from  fastapi import  APIRouterfrom  services.external_api import  external_api_servicerouter = APIRouter(prefix="/external" , tags=["外部API" ]) @router.get("/aggregate/{city}"  ) async  def  get_aggregate_data (city: str  ):    """      聚合数据接口          同时调用天气、新闻、股票API,大幅提升响应速度     """     data = await  external_api_service.aggregate_data(city)     return  data @router.get("/weather/{city}"  ) async  def  get_weather (city: str  ):    """获取天气"""      return  await  external_api_service.fetch_weather(city) 
💡 实战3:异步文件上传和处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 from  fastapi import  APIRouter, UploadFile, Fileimport  aiofilesimport  asynciofrom  pathlib import  Pathrouter = APIRouter(prefix="/files" , tags=["文件" ]) UPLOAD_DIR = Path("uploads" ) UPLOAD_DIR.mkdir(exist_ok=True ) @router.post("/upload"  ) async  def  upload_file (file: UploadFile = File(... ) ):    """异步文件上传"""      file_path = UPLOAD_DIR / file.filename               async  with  aiofiles.open (file_path, 'wb' ) as  f:         content = await  file.read()         await  f.write(content)          return  {         "filename" : file.filename,         "size" : len (content),         "path" : str (file_path)     } @router.post("/upload-multiple"  ) async  def  upload_multiple_files (files: List [UploadFile] = File(... ) ):    """批量异步上传"""           async  def  save_file (file: UploadFile ):         file_path = UPLOAD_DIR / file.filename         async  with  aiofiles.open (file_path, 'wb' ) as  f:             content = await  file.read()             await  f.write(content)         return  {             "filename" : file.filename,             "size" : len (content)         }               results = await  asyncio.gather(         *[save_file(file) for  file in  files]     )          return  {"files" : results, "total" : len (results)} @router.get("/process/{filename}"  ) async  def  process_file (filename: str  ):    """异步处理文件"""      file_path = UPLOAD_DIR / filename          if  not  file_path.exists():         raise  HTTPException(status_code=404 , detail="文件不存在" )               async  with  aiofiles.open (file_path, 'r' , encoding='utf-8' ) as  f:         content = await  f.read()                  processed = content.upper()          return  {         "filename" : filename,         "original_length" : len (content),         "processed_length" : len (processed)     } 
💡 实战4:WebSocket实时通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from  fastapi import  WebSocket, WebSocketDisconnectfrom  typing import  List class  ConnectionManager :    """WebSocket连接管理器"""           def  __init__ (self ):         self.active_connections: List [WebSocket] = []          async  def  connect (self, websocket: WebSocket ):         """接受连接"""          await  websocket.accept()         self.active_connections.append(websocket)          def  disconnect (self, websocket: WebSocket ):         """断开连接"""          self.active_connections.remove(websocket)          async  def  broadcast (self, message: str  ):         """广播消息"""          for  connection in  self.active_connections:             await  connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws/{client_id}"  ) async  def  websocket_endpoint (websocket: WebSocket, client_id: str  ):    """WebSocket端点"""      await  manager.connect(websocket)          try :         while  True :                          data = await  websocket.receive_text()                                       await  manager.broadcast(f"客户端 {client_id} : {data} " )          except  WebSocketDisconnect:         manager.disconnect(websocket)         await  manager.broadcast(f"客户端 {client_id}  断开连接" ) 
🎯 性能优化技巧 1. 连接池配置 1 2 3 4 5 6 7 8 9 engine = create_async_engine(     DATABASE_URL,     echo=False ,     pool_size=20 ,             max_overflow=10 ,          pool_timeout=30 ,          pool_recycle=3600 ,    ) 
2. 限制并发 1 2 3 4 5 6 7 8 9 10 11 12 from  asyncio import  Semaphorerate_limiter = Semaphore(10 )   @router.get("/limited"  ) async  def  limited_endpoint ():    """限流接口"""      async  with  rate_limiter:                  await  asyncio.sleep(1 )         return  {"message" : "完成" } 
3. 缓存优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from  functools import  lru_cacheimport  asyncio_cache = {} async  def  get_cached_data (key: str  ):    """带缓存的数据获取"""      if  key in  _cache:         return  _cache[key]               await  asyncio.sleep(1 )     data = f"数据_{key} "           _cache[key] = data     return  data @router.get("/cached/{key}"  ) async  def  cached_endpoint (key: str  ):    """缓存接口"""      data = await  get_cached_data(key)     return  {"data" : data, "cached" : key in  _cache} 
4. 后台任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from  fastapi import  BackgroundTasksasync  def  send_email (email: str , message: str  ):    """发送邮件(后台任务)"""      await  asyncio.sleep(2 )       print (f"邮件已发送到 {email} : {message} " ) @router.post("/send-notification"  ) async  def  send_notification (    email: str ,     message: str ,     background_tasks: BackgroundTasks  ):    """发送通知(立即返回)"""           background_tasks.add_task(send_email, email, message)          return  {"message" : "通知已加入队列" } 
📊 性能测试 使用httpx进行压力测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import  asyncioimport  httpximport  timeasync  def  test_concurrent_requests ():    """测试并发性能"""      url = "http://localhost:8000/external/weather/北京"           async  with  httpx.AsyncClient() as  client:                  start_time = time.time()                  tasks = [client.get(url) for  _ in  range (100 )]         responses = await  asyncio.gather(*tasks)                  elapsed = time.time() - start_time                  print (f"完成100个请求,耗时:{elapsed:.2 f} 秒" )         print (f"QPS:{100 /elapsed:.0 f} " )         print (f"成功:{sum (1  for  r in  responses if  r.status_code == 200 )} " ) if  __name__ == "__main__" :    asyncio.run(test_concurrent_requests()) 
📝 完整项目示例 查看 07_fastapi_project/ 目录中的完整示例项目!
源码仓库:https://gitee.com/uyynot/asynciofastapiproject.git 
运行项目 1 2 3 4 5 6 7 8 9 10 11 cd  07_fastapi_projectuvicorn main:app --reload --port 8000 python test_performance.py 
🎯 最佳实践 1. 依赖注入 1 2 3 4 5 6 7 8 9 10 11 12 from  fastapi import  Dependsasync  def  get_current_user (token: str  = Header(... ) ):    """获取当前用户(依赖)"""           return  {"user_id" : 1 , "username" : "admin" } @router.get("/me"  ) async  def  read_users_me (current_user = Depends(get_current_user ) ):    """获取当前用户信息"""      return  current_user 
2. 错误处理 1 2 3 4 5 6 7 8 9 10 from  fastapi import  HTTPExceptionfrom  fastapi.responses import  JSONResponse@app.exception_handler(Exception ) async  def  global_exception_handler (request, exc ):    """全局异常处理"""      return  JSONResponse(         status_code=500 ,         content={"message" : "服务器内部错误" , "detail" : str (exc)}     ) 
3. 中间件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from  fastapi.middleware.cors import  CORSMiddlewareimport  time@app.middleware("http"  ) async  def  add_process_time_header (request, call_next ):    """添加响应时间中间件"""      start_time = time.time()     response = await  call_next(request)     process_time = time.time() - start_time     response.headers["X-Process-Time" ] = str (process_time)     return  response app.add_middleware(     CORSMiddleware,     allow_origins=["*" ],     allow_credentials=True ,     allow_methods=["*" ],     allow_headers=["*" ], ) 
📝 本课小结 核心知识点 FastAPI异步路由  - 使用 async def 定义路由异步数据库  - SQLAlchemy + asyncpg/aiosqlite外部API调用  - 并发调用多个API文件处理  - 异步上传和批量处理WebSocket  - 实时通信性能优化  - 连接池、限流、缓存FastAPI vs Django 特性 FastAPI Django 异步支持 ✅ 原生支持 ⚠️ 部分支持 性能 ⚡ 极高 🐢 一般 学习曲线 📈 平缓 📈 陡峭 文档 ✅ 自动生成 ❌ 需手动 适用场景 API服务 全栈应用 
何时使用FastAPI异步 ✅ 高并发API服务 ✅ 大量外部API调用 ✅ 实时数据处理 ✅ WebSocket应用 ✅ 微服务架构 🎯 下一步 运行完整示例项目 尝试添加新的功能 进行性能测试和优化 部署到生产环境 💪 课后练习 练习1:实现用户认证 使用JWT实现异步用户认证系统。
练习2:数据聚合API 创建一个API,并发调用3个外部服务并聚合数据。
练习3:文件批量处理 实现批量图片压缩API。
答案在 07_fastapi_project/ 目录中!  😊
🎉 恭喜! 你已经掌握了FastAPI异步开发,可以开发高性能的Web应用了!
继续加油!  🚀