前面学习了asyncio的基础知识,现在是时候在真实Web框架中应用了!FastAPI是现代Python Web框架,天然支持异步,性能极高。本课将教你如何在FastAPI中使用异步编程,包括异步路由、数据库操作、外部API调用、文件处理等。我们将构建一个完整的异步API服务,学习请求并发处理、连接池管理、性能优化等生产环境必备技能。学完后你将能够开发高性能的异步Web应用。
📖 课程目标 掌握FastAPI异步路由的编写 学会异步数据库操作(SQLAlchemy + asyncpg) 实现异步外部API调用和数据聚合 掌握异步文件上传和处理 学习连接池和并发控制 完成实战项目:异步API服务 🎯 为什么选择FastAPI? FastAPI的优势 特性 说明 优势 天然异步 基于Starlette,完全支持async/await 高性能 自动文档 自动生成OpenAPI文档 易于测试 类型提示 基于Pydantic,强类型验证 代码健壮 高性能 性能接近Node.js和Go 快速响应 易用性 语法简洁,学习曲线平缓 快速开发
性能对比 框架 请求/秒 延迟 Django(同步) ~1,000 高 Flask(同步) ~2,000 高 FastAPI(异步) ~30,000 低 Node.js ~35,000 低
🚀 环境准备 安装依赖 1 2 3 4 5 6 7 8 9 10 11 cd demo_13source ../.venv/bin/activateuv pip install fastapi uvicorn[standard] uv pip install aiohttp aiofiles uv pip install sqlalchemy asyncpg aiosqlite uv pip install httpx
项目结构 1 2 3 4 5 6 7 8 9 10 11 12 13 07_fastapi_project/ ├── main.py # 主应用 ├── models.py # 数据模型 ├── database.py # 数据库配置 ├── routers/ # 路由模块 │ ├── __init__.py │ ├── users.py # 用户相关 │ ├── posts.py # 文章相关 │ └── external.py # 外部API ├── services/ # 业务逻辑 │ ├── __init__.py │ └── external_api.py # 外部API服务 └── requirements.txt # 依赖列表
📝 基础示例 1. 创建第一个异步API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi import FastAPIimport asyncioapp = FastAPI(title="异步API示例" ) @app.get("/" ) async def root (): """根路由""" return {"message" : "Hello FastAPI Async!" } @app.get("/slow" ) async def slow_endpoint (): """模拟慢接口""" await asyncio.sleep(2 ) return {"message" : "任务完成" } @app.get("/fast" ) async def fast_endpoint (): """快速接口""" return {"message" : "立即响应" }
运行方式 :
1 uvicorn main:app --reload
访问 http://localhost:8000/docs 查看自动生成的API文档!
💡 实战1:异步数据库操作 配置异步数据库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.ext.asyncio import async_sessionmakerfrom sqlalchemy.orm import declarative_baseDATABASE_URL = "sqlite+aiosqlite:///./test.db" engine = create_async_engine( DATABASE_URL, echo=True , ) AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) Base = declarative_base() async def get_db (): """获取数据库会话""" async with AsyncSessionLocal() as session: yield session async def init_db (): """初始化数据库""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
定义模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from sqlalchemy import Column, Integer, String, Text, DateTimefrom datetime import datetimefrom database import Baseclass User (Base ): """用户模型""" __tablename__ = "users" id = Column(Integer, primary_key=True , index=True ) username = Column(String(50 ), unique=True , index=True ) email = Column(String(100 ), unique=True ) created_at = Column(DateTime, default=datetime.utcnow) class Post (Base ): """文章模型""" __tablename__ = "posts" id = Column(Integer, primary_key=True , index=True ) title = Column(String(200 )) content = Column(Text) user_id = Column(Integer) created_at = Column(DateTime, default=datetime.utcnow)
异步CRUD操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 from fastapi import APIRouter, Depends, HTTPExceptionfrom sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy import selectfrom database import get_dbfrom models import Userfrom pydantic import BaseModelrouter = APIRouter(prefix="/users" , tags=["用户" ]) class UserCreate (BaseModel ): """用户创建模型""" username: str email: str class UserResponse (BaseModel ): """用户响应模型""" id : int username: str email: str class Config : from_attributes = True @router.post("/" , response_model=UserResponse ) async def create_user ( user: UserCreate, db: AsyncSession = Depends(get_db ) ): """创建用户(异步)""" db_user = User(username=user.username, email=user.email) db.add(db_user) await db.commit() await db.refresh(db_user) return db_user @router.get("/" , response_model=list [UserResponse] ) async def get_users ( skip: int = 0 , limit: int = 10 , db: AsyncSession = Depends(get_db ) ): """获取用户列表(异步)""" result = await db.execute( select(User).offset(skip).limit(limit) ) users = result.scalars().all () return users @router.get("/{user_id}" , response_model=UserResponse ) async def get_user ( user_id: int , db: AsyncSession = Depends(get_db ) ): """获取单个用户(异步)""" result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404 , detail="用户不存在" ) return user
💡 实战2:异步外部API调用 场景:聚合多个API数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import aiohttpimport asynciofrom typing import List , Dict class ExternalAPIService : """外部API服务""" def __init__ (self ): self.timeout = aiohttp.ClientTimeout(total=10 ) async def fetch_weather (self, city: str ) -> Dict : """获取天气信息""" await asyncio.sleep(0.5 ) return { "city" : city, "temperature" : 25 , "weather" : "晴天" } async def fetch_news (self, category: str ) -> List [Dict ]: """获取新闻""" await asyncio.sleep(0.8 ) return [ {"title" : f"{category} 新闻1" , "source" : "新闻网A" }, {"title" : f"{category} 新闻2" , "source" : "新闻网B" } ] async def fetch_stock (self, symbol: str ) -> Dict : """获取股票信息""" await asyncio.sleep(0.3 ) return { "symbol" : symbol, "price" : 150.5 , "change" : "+2.5%" } async def aggregate_data (self, city: str ) -> Dict : """聚合多个API数据(并发)""" weather, news, stock = await asyncio.gather( self.fetch_weather(city), self.fetch_news("科技" ), self.fetch_stock("AAPL" ), return_exceptions=True ) return { "weather" : weather, "news" : news, "stock" : stock } external_api_service = ExternalAPIService()
路由实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi import APIRouterfrom services.external_api import external_api_servicerouter = APIRouter(prefix="/external" , tags=["外部API" ]) @router.get("/aggregate/{city}" ) async def get_aggregate_data (city: str ): """ 聚合数据接口 同时调用天气、新闻、股票API,大幅提升响应速度 """ data = await external_api_service.aggregate_data(city) return data @router.get("/weather/{city}" ) async def get_weather (city: str ): """获取天气""" return await external_api_service.fetch_weather(city)
💡 实战3:异步文件上传和处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 from fastapi import APIRouter, UploadFile, Fileimport aiofilesimport asynciofrom pathlib import Pathrouter = APIRouter(prefix="/files" , tags=["文件" ]) UPLOAD_DIR = Path("uploads" ) UPLOAD_DIR.mkdir(exist_ok=True ) @router.post("/upload" ) async def upload_file (file: UploadFile = File(... ) ): """异步文件上传""" file_path = UPLOAD_DIR / file.filename async with aiofiles.open (file_path, 'wb' ) as f: content = await file.read() await f.write(content) return { "filename" : file.filename, "size" : len (content), "path" : str (file_path) } @router.post("/upload-multiple" ) async def upload_multiple_files (files: List [UploadFile] = File(... ) ): """批量异步上传""" async def save_file (file: UploadFile ): file_path = UPLOAD_DIR / file.filename async with aiofiles.open (file_path, 'wb' ) as f: content = await file.read() await f.write(content) return { "filename" : file.filename, "size" : len (content) } results = await asyncio.gather( *[save_file(file) for file in files] ) return {"files" : results, "total" : len (results)} @router.get("/process/{filename}" ) async def process_file (filename: str ): """异步处理文件""" file_path = UPLOAD_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404 , detail="文件不存在" ) async with aiofiles.open (file_path, 'r' , encoding='utf-8' ) as f: content = await f.read() processed = content.upper() return { "filename" : filename, "original_length" : len (content), "processed_length" : len (processed) }
💡 实战4:WebSocket实时通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from fastapi import WebSocket, WebSocketDisconnectfrom typing import List class ConnectionManager : """WebSocket连接管理器""" def __init__ (self ): self.active_connections: List [WebSocket] = [] async def connect (self, websocket: WebSocket ): """接受连接""" await websocket.accept() self.active_connections.append(websocket) def disconnect (self, websocket: WebSocket ): """断开连接""" self.active_connections.remove(websocket) async def broadcast (self, message: str ): """广播消息""" for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws/{client_id}" ) async def websocket_endpoint (websocket: WebSocket, client_id: str ): """WebSocket端点""" await manager.connect(websocket) try : while True : data = await websocket.receive_text() await manager.broadcast(f"客户端 {client_id} : {data} " ) except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"客户端 {client_id} 断开连接" )
🎯 性能优化技巧 1. 连接池配置 1 2 3 4 5 6 7 8 9 engine = create_async_engine( DATABASE_URL, echo=False , pool_size=20 , max_overflow=10 , pool_timeout=30 , pool_recycle=3600 , )
2. 限制并发 1 2 3 4 5 6 7 8 9 10 11 12 from asyncio import Semaphorerate_limiter = Semaphore(10 ) @router.get("/limited" ) async def limited_endpoint (): """限流接口""" async with rate_limiter: await asyncio.sleep(1 ) return {"message" : "完成" }
3. 缓存优化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from functools import lru_cacheimport asyncio_cache = {} async def get_cached_data (key: str ): """带缓存的数据获取""" if key in _cache: return _cache[key] await asyncio.sleep(1 ) data = f"数据_{key} " _cache[key] = data return data @router.get("/cached/{key}" ) async def cached_endpoint (key: str ): """缓存接口""" data = await get_cached_data(key) return {"data" : data, "cached" : key in _cache}
4. 后台任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from fastapi import BackgroundTasksasync def send_email (email: str , message: str ): """发送邮件(后台任务)""" await asyncio.sleep(2 ) print (f"邮件已发送到 {email} : {message} " ) @router.post("/send-notification" ) async def send_notification ( email: str , message: str , background_tasks: BackgroundTasks ): """发送通知(立即返回)""" background_tasks.add_task(send_email, email, message) return {"message" : "通知已加入队列" }
📊 性能测试 使用httpx进行压力测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioimport httpximport timeasync def test_concurrent_requests (): """测试并发性能""" url = "http://localhost:8000/external/weather/北京" async with httpx.AsyncClient() as client: start_time = time.time() tasks = [client.get(url) for _ in range (100 )] responses = await asyncio.gather(*tasks) elapsed = time.time() - start_time print (f"完成100个请求,耗时:{elapsed:.2 f} 秒" ) print (f"QPS:{100 /elapsed:.0 f} " ) print (f"成功:{sum (1 for r in responses if r.status_code == 200 )} " ) if __name__ == "__main__" : asyncio.run(test_concurrent_requests())
📝 完整项目示例 查看 07_fastapi_project/ 目录中的完整示例项目!
源码仓库:https://gitee.com/uyynot/asynciofastapiproject.git
运行项目 1 2 3 4 5 6 7 8 9 10 11 cd 07_fastapi_projectuvicorn main:app --reload --port 8000 python test_performance.py
🎯 最佳实践 1. 依赖注入 1 2 3 4 5 6 7 8 9 10 11 12 from fastapi import Dependsasync def get_current_user (token: str = Header(... ) ): """获取当前用户(依赖)""" return {"user_id" : 1 , "username" : "admin" } @router.get("/me" ) async def read_users_me (current_user = Depends(get_current_user ) ): """获取当前用户信息""" return current_user
2. 错误处理 1 2 3 4 5 6 7 8 9 10 from fastapi import HTTPExceptionfrom fastapi.responses import JSONResponse@app.exception_handler(Exception ) async def global_exception_handler (request, exc ): """全局异常处理""" return JSONResponse( status_code=500 , content={"message" : "服务器内部错误" , "detail" : str (exc)} )
3. 中间件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from fastapi.middleware.cors import CORSMiddlewareimport time@app.middleware("http" ) async def add_process_time_header (request, call_next ): """添加响应时间中间件""" start_time = time.time() response = await call_next(request) process_time = time.time() - start_time response.headers["X-Process-Time" ] = str (process_time) return response app.add_middleware( CORSMiddleware, allow_origins=["*" ], allow_credentials=True , allow_methods=["*" ], allow_headers=["*" ], )
📝 本课小结 核心知识点 FastAPI异步路由 - 使用 async def 定义路由异步数据库 - SQLAlchemy + asyncpg/aiosqlite外部API调用 - 并发调用多个API文件处理 - 异步上传和批量处理WebSocket - 实时通信性能优化 - 连接池、限流、缓存FastAPI vs Django 特性 FastAPI Django 异步支持 ✅ 原生支持 ⚠️ 部分支持 性能 ⚡ 极高 🐢 一般 学习曲线 📈 平缓 📈 陡峭 文档 ✅ 自动生成 ❌ 需手动 适用场景 API服务 全栈应用
何时使用FastAPI异步 ✅ 高并发API服务 ✅ 大量外部API调用 ✅ 实时数据处理 ✅ WebSocket应用 ✅ 微服务架构 🎯 下一步 运行完整示例项目 尝试添加新的功能 进行性能测试和优化 部署到生产环境 💪 课后练习 练习1:实现用户认证 使用JWT实现异步用户认证系统。
练习2:数据聚合API 创建一个API,并发调用3个外部服务并聚合数据。
练习3:文件批量处理 实现批量图片压缩API。
答案在 07_fastapi_project/ 目录中! 😊
🎉 恭喜! 你已经掌握了FastAPI异步开发,可以开发高性能的Web应用了!
继续加油! 🚀