从本地跑通到上线生产,本节课完整梳理AI Agent应用的生产化路径:项目结构、测试、CI/CD、容器化、监控与部署策略。
从本地跑通到上线生产,这中间隔着一整套工程体系。
本章将带你从项目结构、测试、CI/CD、容器化、监控到部署策略,完整梳理 AI Agent 应用的生产化路径。
10.1 生产级项目结构 一个成熟的 AI Agent 项目不是”一个 Python 文件搞定”,而是分层、分模块的工程化结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 agentic-ai-app/ ├── src/ │ ├── agents/ # Agent 角色定义 │ │ ├── __init__.py │ │ ├── planner.py # 规划 Agent │ │ ├── executor.py # 执行 Agent │ │ └── validator.py # 校验 Agent │ ├── tools/ # 外部工具接口 │ │ ├── __init__.py │ │ ├── database.py # 数据库查询工具 │ │ ├── search.py # 搜索工具 │ │ └── api_client.py # 外部 API 工具 │ ├── workflows/ # LangGraph 工作流定义 │ │ ├── __init__.py │ │ ├── graph.py # 主图构建 │ │ ├── state.py # 状态 TypedDict │ │ └── nodes/ # 节点实现 │ │ ├── classify.py │ │ ├── execute.py │ │ └── validate.py │ ├── memory/ # 持久化后端 │ │ ├── __init__.py │ │ ├── redis_store.py │ │ └── chroma_store.py │ ├── infra/ # 横切关注点 │ │ ├── __init__.py │ │ ├── logging.py # 结构化日志 │ │ ├── retries.py # 重试/退避逻辑 │ │ ├── auth.py # 认证中间件 │ │ ├── config.py # 配置管理 │ │ ├── security.py # 输入/输出安全 │ │ └── metrics.py # 指标采集 │ └── api/ # FastAPI 接口 │ ├── __init__.py │ ├── main.py # FastAPI 应用 │ ├── router.py # 路由定义 │ └── dependencies.py # 共享依赖(checkpointer 等) ├── tests/ │ ├── conftest.py # Pytest fixtures │ ├── unit_tests/ # 单元测试 │ └── integration_tests/ # 集成测试 ├── infra/ # 部署基础设施 │ ├── Dockerfile │ ├── docker-compose.yaml │ ├── k8s/ │ │ ├── deployment.yaml │ │ ├── service.yaml │ │ └── scaledobject.yaml │ └── otel-collector-config.yaml ├── .github/ │ └── workflows/ │ ├── unit-tests.yml │ ├── integration-tests.yml │ └── deploy.yml ├── .env.template # 环境变量模板(不提交真实值) ├── pyproject.toml ├── requirements.txt └── README.md
配置管理 用 Pydantic 管理所有配置,统一从环境变量读取:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from pydantic_settings import BaseSettingsfrom typing import Optional class AgentConfig (BaseSettings ): openai_api_key: str anthropic_api_key: Optional [str ] = None orchestrator_model: str = "gpt-4o" fast_classifier_model: str = "gpt-4o-mini" reasoning_model: str = "claude-sonnet-4-20250514" redis_url: str = "redis://localhost:6379" database_url: Optional [str ] = None langsmith_api_key: Optional [str ] = None langsmith_tracing: bool = False langsmith_project: str = "agent-prod" max_iterations: int = 50 max_tokens_per_task: int = 100_000 default_temperature: float = 0.0 rate_limit_calls: int = 100 rate_limit_window_seconds: int = 60 class Config : env_file = ".env" env_file_encoding = "utf-8" _config: Optional [AgentConfig] = None def get_config () -> AgentConfig: global _config if _config is None : _config = AgentConfig() return _config
环境变量模板 (.env.template,提交到 Git,真实值绝不提交):
1 2 3 4 5 6 7 8 9 10 11 12 # .env.template — 提交此文件,绝不提交 .env OPENAI_API_KEY=sk-... ANTHROPIC_API_KEY=sk-ant-... REDIS_URL=redis://localhost:6379 DATABASE_URL=postgresql://... LANGSMITH_API_KEY=lsv2_... LANGSMITH_TRACING=true LANGSMITH_PROJECT=agent-prod ORCHESTRATOR_MODEL=gpt-4o FAST_CLASSIFIER_MODEL=gpt-4o-mini
10.2 三层测试策略 AI Agent 的测试和传统 Web 应用不同 — LLM 调用不确定、Token 消耗大、延迟高。因此需要分层测试 :
flowchart LR
subgraph L1["🔴 单元测试层"]
direction TB
U1[节点逻辑测试]
U2[Mock LLM 调用]
U3[配置校验]
U4[部分图执行]
end
subgraph L2["🟡 集成测试层"]
direction TB
I1[端到端图执行]
I2[真实 LLM API]
I3[工具链路验证]
end
subgraph L3["🟢 评估测试层"]
direction TB
E1[质量评分]
E2[幻觉检测]
E3[准确率基准]
end
L1 -->|每日多次| L2
L2 -->|每日/手动| L3
style L1 fill:#ffebee
style L2 fill:#fff9c4
style L3 fill:#e8f5e9
层级 目的 速度 外部调用 触发时机 单元测试 节点逻辑、配置校验 毫秒级 无(Mock LLM) 每次提交/PR 集成测试 端到端图执行 秒~分钟 真实 LLM API 每日 + 手动 评估测试 质量、幻觉检测 分钟级 真实 LLM API 每周 + 发版
单元测试:节点隔离 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import pytestfrom langgraph.checkpoint.memory import MemorySaverfrom src.workflows.graph import create_graphdef test_classify_intent_node (): """测试单个节点 — 不需要 LLM 调用""" graph = create_graph() compiled = graph.compile (checkpointer=MemorySaver()) result = compiled.nodes["classify_intent" ].invoke( {"messages" : [], "current_intent" : None , "user_message" : "今天天气如何?" } ) assert result["current_intent" ] == "weather_query" def test_should_validate_or_retry_exceeds_retries (): """测试条件边的逻辑""" from src.workflows.graph import should_validate_or_retry state = {"error_count" : 4 , "tool_results" : {"status" : "error" }} assert should_validate_or_retry(state) == "error" state = {"error_count" : 1 , "tool_results" : {"status" : "success" }} assert should_validate_or_retry(state) == "validate"
单元测试:Mock LLM 调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import pytestfrom unittest.mock import AsyncMock, patchfrom langchain_core.messages import AIMessage@pytest.fixture def mock_llm (): """Mock LLM,返回确定性结果""" with patch("src.agents.planner.ChatOpenAI" ) as mock: instance = mock.return_value instance.ainvoke = AsyncMock(return_value=AIMessage( content='{"tasks": [{"tool": "search", "params": {"query": "test"}}]}' )) yield instance async def test_planner_with_mock_llm (mock_llm ): """使用 Mock LLM 测试规划器 — 零 API 调用""" from src.agents.planner import plan_task result = await plan_task("查找 Python 相关信息" ) assert "tasks" in result assert len (result["tasks" ]) > 0
部分图执行测试 LangGraph 独有的能力 — 模拟某节点已执行,从中间恢复:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def test_partial_execution_from_node2 (): """测试图的子部分 — 不运行整个流水线""" checkpointer = MemorySaver() graph = create_graph() compiled = graph.compile (checkpointer=checkpointer) compiled.update_state( config={"configurable" : {"thread_id" : "test-1" }}, values={"messages" : [], "tool_results" : {"status" : "pending" }}, as_node="classify_intent" , ) result = compiled.invoke( None , config={"configurable" : {"thread_id" : "test-1" }}, interrupt_after="execute_tools" , ) assert result["tool_results" ]["status" ] == "success"
集成测试:真实 LLM 调用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import pytestpytestmark = pytest.mark.anyio @pytest.fixture(scope="session" ) def anyio_backend (): return "asyncio" @pytest.mark.langsmith async def test_agent_simple_passthrough (): """端到端测试 — 需要 API Key""" from src.workflows.graph import create_graph from langgraph.checkpoint.memory import MemorySaver graph = create_graph() compiled = graph.compile (checkpointer=MemorySaver()) result = await compiled.ainvoke( {"messages" : [{"role" : "user" , "content" : "你好" }]}, config={"configurable" : {"thread_id" : "integration-test-1" }} ) assert result is not None assert len (result["messages" ]) > 0
10.3 CI/CD 流水线
flowchart TD
A["开发者 Push / PR"] --> B{触发类型?}
B -->|每次 Push| C["单元测试流水线"]
C --> C1["Lint 检查 ruff"]
C1 --> C2["类型检查 mypy"]
C2 --> C3["单元测试 pytest"]
C3 --> C4{通过?}
C4 -->|是| C5["✅ 允许合并"]
C4 -->|否| C6["❌ 阻止合并"]
B -->|每日定时| D["集成测试流水线"]
D --> D1["端到端测试"]
D1 --> D2["真实 LLM API 调用"]
D2 --> D3["LangSmith 追踪"]
B -->|main 分支 Push| E["部署流水线"]
E --> E1["构建 + 单元测试"]
E1 --> E2["构建 Docker 镜像"]
E2 --> E3["推送到 Registry"]
E3 --> E4["部署到 K8s"]
E4 --> E5["滚动更新验证"]
style C fill:#ffebee
style D fill:#fff9c4
style E fill:#e8f5e9
单元测试流水线(每次 Push/PR 触发) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 name: Unit Tests on: push: branches: [main ] pull_request: branches: [main ] concurrency: group: unit-tests-${{ github.ref }} cancel-in-progress: true jobs: test: runs-on: ubuntu-latest strategy: matrix: python-version: ["3.11" , "3.12" ] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | pip install -U pip pip install -r requirements.txt pip install pytest ruff mypy - name: Lint with ruff run: ruff check . - name: Type check with mypy run: mypy --strict src/ - name: Run unit tests run: pytest tests/unit_tests -v --tb=short
集成测试流水线(每日 + 手动触发) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 name: Integration Tests on: schedule: - cron: "37 14 * * *" workflow_dispatch: jobs: integration-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: python-version: "3.11" - run: pip install -r requirements.txt pytest pytest-asyncio anyio - name: Run integration tests env: ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} LANGSMITH_API_KEY: ${{ secrets.LANGSMITH_API_KEY }} LANGSMITH_TRACING: true run: pytest tests/integration_tests -v
部署流水线 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 name: Deploy Agent Service on: push: branches: [main ] jobs: build-and-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: python-version: "3.11" - run: pip install -r requirements.txt - run: ruff check . - run: pytest tests/unit_tests build-docker: needs: build-and-test runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Build Docker Image run: docker build -t agent-service:${{ github.sha }} . - name: Push to Registry run: | docker tag agent-service:${{ github.sha }} ghcr.io/org/agent-service:latest docker push ghcr.io/org/agent-service:latest deploy: needs: build-docker runs-on: ubuntu-latest steps: - name: Deploy to Kubernetes run: | kubectl set image deployment/agent-service \ agent=ghcr.io/org/agent-service:${{ github.sha }} \ --namespace production kubectl rollout status deployment/agent-service --namespace production
10.4 Docker 容器化 生产级 Dockerfile 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 FROM python:3.11 -slim AS baseRUN groupadd -r appuser && useradd -r -g appuser appuser WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir --upgrade pip && \ pip install --no-cache-dir -r requirements.txt COPY src/ src/ COPY pyproject.toml . ENV PYTHONUNBUFFERED=1 \ PYTHONDONTWRITEBYTECODE=1 \ PYTHONPATH=/app HEALTHCHECK --interval=30s --timeout =10s --start-period=5s --retries=3 \ CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')" || exit 1 USER appuserEXPOSE 8000 CMD ["uvicorn" , "src.api.main:app" , "--host" , "0.0.0.0" , "--port" , "8000" , "--workers" , "4" ]
Docker Compose 多服务编排 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 version: "3.9" services: agent-api: build: . ports: - "8000:8000" env_file: .env depends_on: redis: condition: service_healthy vector-db: condition: service_healthy healthcheck: test: ["CMD" , "curl" , "-f" , "http://localhost:8000/healthz" ] interval: 30s timeout: 10s retries: 3 deploy: replicas: 2 resources: limits: memory: 2G cpus: "2.0" worker-agent: build: . command: ["python" , "-m" , "src.worker" ] env_file: .env depends_on: redis: condition: service_healthy deploy: replicas: 3 resources: limits: memory: 4G redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis-data:/data healthcheck: test: ["CMD" , "redis-cli" , "ping" ] interval: 10s vector-db: image: qdrant/qdrant:latest ports: - "6333:6333" volumes: - qdrant-data:/qdrant/storage healthcheck: test: ["CMD" , "curl" , "-f" , "http://localhost:6333/healthz" ] interval: 10s otel-collector: image: otel/opentelemetry-collector-contrib:latest volumes: - ./infra/otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml ports: - "4317:4317" - "4318:4318" volumes: redis-data: qdrant-data:
健康检查端点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from fastapi import FastAPIfrom datetime import datetimeapp = FastAPI() @app.get("/healthz" ) async def liveness (): """存活探针 — 进程是否在运行""" return {"status" : "ok" } @app.get("/readiness" ) async def readiness (): """就绪探针 — 是否能接受流量""" from src.infra.config import get_config config = get_config() checks = {} try : import redis r = redis.from_url(config.redis_url) r.ping() checks["redis" ] = "ok" except Exception as e: checks["redis" ] = f"error: {e} " try : from langchain_openai import ChatOpenAI llm = ChatOpenAI(model=config.fast_classifier_model, max_tokens=1 ) llm.invoke("ping" ) checks["llm_api" ] = "ok" except Exception as e: checks["llm_api" ] = f"error: {e} " all_ok = all (v == "ok" for v in checks.values()) return { "status" : "ok" if all_ok else "degraded" , "checks" : checks, "timestamp" : datetime.utcnow().isoformat(), }
10.5 监控与可观测性
flowchart TD
subgraph App["Agent 应用"]
A1[LLM 调用]
A2[工具执行]
A3[状态转换]
end
subgraph LS["LangSmith 零代码追踪"]
L1[自动追踪每次调用]
L2[决策过程可视化]
L3[Token 消耗统计]
end
subgraph OTel["OpenTelemetry 分布式追踪"]
O1[跨服务追踪]
O2[自定义 Span]
O3["导出到 LangSmith / Jaeger"]
end
subgraph Log["structlog 结构化日志"]
G1[JSON 格式日志]
G2[上下文绑定]
G3[ELK 采集]
end
App -->|环境变量即可| LS
App -->|代码埋点| OTel
App -->|structlog| Log
style App fill:#e3f2fd
style LS fill:#e8f5e9
style OTel fill:#fff9c4
style Log fill:#f3e5f5
LangSmith 集成(零代码追踪) 1 2 3 4 5 LANGSMITH_TRACING=true LANGSMITH_API_KEY=lsv2_pt_... LANGSMITH_PROJECT=agent-prod LANGSMITH_OTEL_ENABLED=true
不需要改任何代码,LangSmith 会自动追踪每一次 LLM 调用、工具执行和状态转换。
OpenTelemetry 分布式追踪 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporterdef setup_telemetry (service_name: str = "agent-service" ): """配置 OpenTelemetry 分布式追踪""" provider = TracerProvider() exporter = OTLPSpanExporter( endpoint="https://api.smith.langchain.com/otel/v1/traces" , headers={ "x-api-key" : get_config().langsmith_api_key, "Langsmith-Project" : get_config().langsmith_project, }, ) provider.add_span_processor(BatchSpanProcessor(exporter)) trace.set_tracer_provider(provider) return trace.get_tracer(service_name) tracer = setup_telemetry() @tracer.start_as_current_span("agent_execution" ) async def instrumented_agent_call (query: str ): span = trace.get_current_span() span.set_attribute("query_length" , len (query)) try : result = await agent.ainvoke(query) span.set_attribute("success" , True ) span.set_attribute("tool_calls" , len (result.get("tool_calls" , []))) span.set_attribute("input_tokens" , result.get("usage" , {}).get("input_tokens" , 0 )) return result except Exception as e: span.set_attribute("success" , False ) span.set_attribute("error" , str (e)) raise
结构化日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import structlogimport loggingdef setup_logging (): structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso" ), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer(), ], wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), context_class=dict , logger_factory=structlog.PrintLoggerFactory(), cache_logger_on_first_use=True , ) logger = structlog.get_logger() async def process_query (query: str ): log = logger.bind(query_length=len (query), session_id=session_id) log.info("agent_invocation_started" ) try : result = await agent.ainvoke(query) log.info("agent_invocation_completed" , tokens_used=result.get("usage" , {}), tool_calls=len (result.get("tool_calls" , []))) return result except Exception as e: log.error("agent_invocation_failed" , error=str (e), error_type=type (e).__name__) raise
关键指标与目标 指标 建议目标 说明 P95 延迟 < 5s(单轮) 包含 LLM + 工具执行时间 工具调用错误率 < 3% 超过说明工具不稳定 循环遏制率 > 99% Agent 不应陷入无限循环 缓存命中率 > 30% 语义缓存命中率 Token 消耗 按预算监控 设置月度上限和告警
10.6 部署模式
flowchart TD
A[AI Agent 部署] --> B[模式一: REST API]
A --> C[模式二: SSE 流式]
A --> D[模式三: LangGraph Cloud]
A --> E[模式四: Kubernetes]
B --> B1["FastAPI + LangGraph<br/>同步请求场景<br/>最简单"]
C --> C1["StreamingResponse<br/>实时输出中间过程<br/>用户体验好"]
D --> D1["全托管服务<br/>开箱即用<br/>零运维"]
E --> E1["KEDA 弹性伸缩<br/>队列驱动扩缩<br/>企业级"]
style B fill:#e3f2fd
style C fill:#fff9c4
style D fill:#e8f5e9
style E fill:#f3e5f5
模式一:REST API(FastAPI + LangGraph) 最简单的部署方式,适合同步请求场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom langgraph.checkpoint.memory import MemorySaverfrom src.workflows.graph import create_graphapp = FastAPI(title="AI Agent Service" , version="1.0.0" ) checkpointer = MemorySaver() graph = create_graph() compiled_graph = graph.compile (checkpointer=checkpointer) class ChatRequest (BaseModel ): message: str thread_id: str = "default" class ChatResponse (BaseModel ): response: str thread_id: str @app.post("/chat" , response_model=ChatResponse ) async def chat (request: ChatRequest ): config = {"configurable" : {"thread_id" : request.thread_id}} try : result = await compiled_graph.ainvoke( {"messages" : [{"role" : "user" , "content" : request.message}]}, config=config, ) last_message = result["messages" ][-1 ].content return ChatResponse(response=last_message, thread_id=request.thread_id) except Exception as e: raise HTTPException(status_code=500 , detail=str (e))
模式二:SSE 流式响应 实时输出 Agent 的中间过程,用户体验更好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 from fastapi import APIRouter, Form, UploadFilefrom fastapi.responses import StreamingResponseimport jsonrouter = APIRouter(prefix="/api" ) @router.post("/stream" ) async def stream_agent ( user_message: str = Form(... ), session_id: str = Form(... ), ): async def event_generator (): try : async for event in compiled_graph.astream( {"messages" : [{"role" : "user" , "content" : user_message}]}, config={"configurable" : {"thread_id" : session_id}}, stream_mode=["custom" , "updates" , "messages" ], subgraphs=True , ): if isinstance (event, tuple ): mode, payload = event else : mode, payload = "unknown" , event yield f"data: {json.dumps({'mode' : mode, 'payload' : payload} )}\n\n" except Exception as e: yield f"data: {json.dumps({'error' : str (e)} )}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream" , headers={ "Cache-Control" : "no-cache" , "Connection" : "keep-alive" , "X-Accel-Buffering" : "no" , }, )
模式三:LangGraph Cloud 托管部署 LangGraph 提供全托管服务,开箱即用:
1 2 3 4 5 6 7 8 pip install langgraph-cli langgraph deploy langgraph build
LangGraph Cloud 提供 :自动 API 生成、内置 Checkpointer 持久化、流式端点(SSE + WebSocket)、LangSmith 可观测性集成、水平扩展、认证与授权。
模式四:Kubernetes 部署 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 apiVersion: apps/v1 kind: Deployment metadata: name: agent-service spec: replicas: 3 selector: matchLabels: app: agent-service template: metadata: labels: app: agent-service spec: containers: - name: agent image: ghcr.io/org/agent-service:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: agent-secrets key: openai-api-key - name: REDIS_URL value: "redis://redis-service:6379" - name: LANGSMITH_TRACING value: "true" resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "2000m" livenessProbe: httpGet: path: /healthz port: 8000 initialDelaySeconds: 10 periodSeconds: 30 readinessProbe: httpGet: path: /readiness port: 8000 initialDelaySeconds: 15 periodSeconds: 10 terminationGracePeriodSeconds: 120
部署策略选择 策略 风险 成本 适用场景 滚动更新 中 低 小版本更新、Bug 修复 蓝绿部署 低 高 大版本变更、Breaking Change 金丝雀发布 很低 中 Prompt 更新、模型切换 影子部署 无 很高 安全关键变更
实操建议 :基础设施用滚动更新,Agent 逻辑(Prompt/工具/模型)用金丝雀发布,高风险变更用影子部署。
10.7 安全策略
flowchart TD
A["用户输入"] --> B["输入清洗 sanitize_input"]
B --> C{"敏感信息检测"}
C -->|检测到| D["脱敏处理<br/>SSN → SSN_REDACTED<br/>信用卡 → CC_REDACTED"]
C -->|安全| E["注入攻击检测"]
D --> E
E -->|检测到| F["过滤攻击指令<br/>ignore previous → FILTERED"]
E -->|安全| G["进入 Agent"]
F --> G
G --> H{"工具调用风控"}
H -->|LOW 风险| I["自动执行"]
H -->|MEDIUM/HIGH 风险| J["需要审核"]
H -->|CRITICAL 风险| K["需要明确审批"]
J --> L["限流检查<br/>RateLimiter"]
K --> L
L -->|通过| M["执行工具"]
L -->|超限| N["拒绝请求"]
style B fill:#e3f2fd
style C fill:#fff9c4
style E fill:#fff9c4
style H fill:#f3e5f5
style L fill:#ffebee
API Key 三层管理 环境 方式 本地开发 .env 文件(gitignored)CI/CD GitHub Secrets / GitLab CI Variables 生产 AWS Secrets Manager / HashiCorp Vault
输入安全(OWASP 对齐) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import reMAX_INPUT_LENGTH = 10000 SENSITIVE_PATTERNS = [ (r"\b\d{3}-\d{2}-\d{4}\b" , "[SSN_REDACTED]" ), (r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b" , "[CC_REDACTED]" ), (r"(?i)(api[_-]?key|password|secret|token)\s*[:=]\s*\S+" , "[CREDENTIAL_REDACTED]" ), ] INJECTION_PATTERNS = [ r"(?i)ignore\s+(all\s+)?previous\s+instructions" , r"(?i)forget\s+(all\s+)?previous" , r"(?i)system\s*:" , r"(?i)you\s+are\s+now" , ] def sanitize_input (text: str ) -> str : """用户输入进入 Agent 前的清洗""" if len (text) > MAX_INPUT_LENGTH: text = text[:MAX_INPUT_LENGTH] for pattern, replacement in SENSITIVE_PATTERNS: text = re.sub(pattern, replacement, text) return text def sanitize_for_memory (text: str ) -> str : """持久化到记忆前的清洗""" text = sanitize_input(text) for pattern in INJECTION_PATTERNS: text = re.sub(pattern, "[FILTERED]" , text) return text
工具调用风控 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from enum import Enumclass RiskLevel (Enum ): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" TOOL_RISK_LEVELS = { "search_documents" : RiskLevel.LOW, "read_file" : RiskLevel.LOW, "write_file" : RiskLevel.MEDIUM, "send_email" : RiskLevel.HIGH, "execute_code" : RiskLevel.HIGH, "database_delete" : RiskLevel.CRITICAL, "transfer_funds" : RiskLevel.CRITICAL, } AUTO_APPROVE_THRESHOLD = RiskLevel.LOW def requires_human_approval (tool_name: str ) -> bool : risk = TOOL_RISK_LEVELS.get(tool_name, RiskLevel.HIGH) return risk.value > AUTO_APPROVE_THRESHOLD.value
限流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from datetime import datetime, timedeltafrom typing import List class RateLimiter : def __init__ (self, max_calls: int = 100 , window_seconds: int = 60 ): self .max_calls = max_calls self .window = timedelta(seconds=window_seconds) self .calls: List [datetime] = [] def check (self ) -> bool : now = datetime.utcnow() self .calls = [t for t in self .calls if now - t < self .window] if len (self .calls) >= self .max_calls: return False self .calls.append(now) return True
10.8 扩缩容策略 关键洞察 :传统基于 CPU 的 HPA 对 Agent 无效!Agent 大部分时间在等 LLM API 响应,CPU 利用率可能只有 5%,但系统已经饱和。
基于队列深度的弹性伸缩(推荐) 使用消息队列 + KEDA 实现事件驱动扩缩:
flowchart LR
U[用户请求] --> GW[API Gateway]
GW --> Q["任务队列<br/>Redis List"]
Q --> W1[Agent Worker 1]
Q --> W2[Agent Worker 2]
Q --> W3[Agent Worker N]
W1 --> R[处理 & 回调]
W2 --> R
W3 --> R
KEDA["KEDA 监控<br/>队列深度"] -->|队列 > 5/Worker| AUTO[自动扩容]
KEDA -->|队列 = 0| SHRK[自动缩容]
style Q fill:#ffebee
style KEDA fill:#e3f2fd
style AUTO fill:#e8f5e9
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: agent-worker-scaler spec: scaleTargetRef: name: agent-worker minReplicaCount: 2 maxReplicaCount: 50 triggers: - type: redis-lists metadata: listName: agent-tasks listLength: "5" address: redis-service:6379
异步并发控制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asynciofrom typing import Any , Callable class AsyncTaskProcessor : def __init__ (self, max_concurrent: int = 10 ): self .semaphore = asyncio.Semaphore(max_concurrent) async def process (self, task: Callable , *args, **kwargs ) -> Any : async with self .semaphore: return await task(*args, **kwargs) processor = AsyncTaskProcessor(max_concurrent=10 ) @app.post("/chat/async" ) async def chat_async (request: ChatRequest ): task_id = str (uuid4()) asyncio.create_task( processor.process(agent.ainvoke, request.message) ) return {"task_id" : task_id, "status" : "processing" }
扩缩容决策矩阵 场景 策略 无状态 Agent Worker 基于队列深度的 HPA(KEDA) 有状态 Agent(会话型) StatefulSet + 外部状态存储 批处理 K8s Jobs/CronJobs + Provider Batch API GPU 工作负载(本地模型) NVIDIA Device Plugin + vLLM
10.9 版本管理 Prompt 版本化 1 2 3 4 5 6 7 8 9 10 11 12 13 14 from pathlib import Pathimport yamlclass PromptRegistry : def __init__ (self, prompts_dir: str = "src/workflows/prompts" ): self .prompts_dir = Path(prompts_dir) def get (self, prompt_name: str , version: str = "latest" ) -> str : """从 YAML 文件加载版本化的 Prompt""" prompt_file = self .prompts_dir / f"{prompt_name} .yaml" with open (prompt_file) as f: prompts = yaml.safe_load(f) return prompts["versions" ][version]["template" ]
Prompt YAML 示例:
1 2 3 4 5 6 7 8 9 10 11 versions: v1: template: "你是一个有用的助手..." model: gpt-4o temperature: 0 v2: template: "你是一个 AI 编排 Agent..." model: gpt-4o temperature: 0 changelog: "添加工具感知和结构化输出"
模型版本 + 降级链 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from dataclasses import dataclassfrom typing import Optional @dataclass class ModelConfig : name: str version: str provider: str fallback: Optional ["ModelConfig" ] = None MODELS = { "orchestrator" : ModelConfig( name="gpt-4o" , version="2024-11-20" , provider="openai" , fallback=ModelConfig(name="gpt-4o-mini" , version="latest" , provider="openai" ) ), "reasoner" : ModelConfig( name="claude-sonnet-4-20250514" , version="2025-05-14" , provider="anthropic" , fallback=ModelConfig(name="gpt-4o" , version="2024-11-20" , provider="openai" ) ), }
A/B 测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import hashlibclass ABTestRouter : def __init__ (self, experiments: dict ): self .experiments = experiments def get_variant (self, user_id: str , experiment: str ) -> str : """确定性分配用户到实验组""" config = self .experiments[experiment] hash_input = f"{user_id} :{experiment} " hash_val = int (hashlib.md5(hash_input.encode()).hexdigest(), 16 ) % 100 cumulative = 0 for variant, percentage in config["variants" ].items(): cumulative += percentage if hash_val < cumulative: return variant return list (config["variants" ].keys())[-1 ] experiments = { "prompt_v2_rollout" : { "variants" : { "control" : 80 , "treatment" : 20 , } } }
10.10 成本优化 组合优化栈
flowchart TD
A[请求进入] --> B{1. 语义缓存命中?}
B -->|命中| R1["返回缓存结果<br/>节省 ~30% 请求"]
B -->|未命中| C{2. 任务复杂度?}
C -->|简单| R2["路由到便宜模型<br/>节省 30-70%"]
C -->|复杂| D{3. 前缀缓存可用?}
D -->|可用| R3["复用 KV Cache<br/>节省 ~90%"]
D -->|不可用| E{4. 可异步处理?}
E -->|可以| R4["使用 Batch API<br/>节省 ~50%"]
E -->|实时| F[5. 高端模型执行]
F --> G{6. 超出预算?}
G -->|是| R5["返回部分结果<br/>预算治理"]
G -->|否| R6[完整结果返回]
style B fill:#e3f2fd
style C fill:#fff9c4
style D fill:#f3e5f5
style E fill:#e8f5e9
style G fill:#ffebee
语义缓存 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from gptcache import Cachefrom gptcache.embedding import OpenAI as OpenAIEmbeddingfrom gptcache.adapter import openai as gptcache_openaifrom gptcache.similarity_evaluation import Cosinecache = Cache() embedding_func = OpenAIEmbedding(model="text-embedding-3-small" ) cache.init( embedding_func=embedding_func.embedding, similarity_evaluation=Cosine(), similarity_threshold=0.85 , ) response = gptcache_openai.ChatCompletion.create( model="gpt-4o" , messages=[{"role" : "user" , "content" : "总结 Q4 财报" }] )
动态模型路由 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from langchain_openai import ChatOpenAIfrom langchain_anthropic import ChatAnthropicclass ModelRouter : def __init__ (self ): self .cheap = ChatOpenAI(model="gpt-4o-mini" , temperature=0 ) self .mid = ChatOpenAI(model="gpt-4o" , temperature=0 ) self .premium = ChatAnthropic(model="claude-sonnet-4-20250514" , temperature=0.3 ) async def route (self, task: str , complexity_score: float ): if complexity_score < 0.3 : return "cheap" elif complexity_score < 0.7 : return "mid" else : return "premium" async def execute (self, query: str , complexity_score: float ): route = await self .route(query, complexity_score) model_map = {"cheap" : self .cheap, "mid" : self .mid, "premium" : self .premium} return await model_map[route].ainvoke([{"role" : "user" , "content" : query}])
预算治理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from dataclasses import dataclass@dataclass class TokenBudget : max_tokens_per_task: int = 100_000 max_iterations_per_task: int = 50 monthly_budget_usd: float = 10_000 alert_thresholds: list = None class BudgetExhausted (Exception ): pass class BudgetGovernor : def __init__ (self, budget: TokenBudget ): self .budget = budget self .total_spend = 0.0 def check_iteration_limit (self, iteration_count: int ): if iteration_count >= self .budget.max_iterations_per_task: raise BudgetExhausted( f"Agent 超过 {self.budget.max_iterations_per_task} 次迭代,可能陷入循环" ) def check_token_budget (self, tokens_used: int ): if tokens_used >= self .budget.max_tokens_per_task: raise BudgetExhausted( f"Token 预算耗尽({tokens_used} /{self.budget.max_tokens_per_task} )" ) def record_spend (self, cost_usd: float , feature: str ): self .total_spend += cost_usd for threshold in (self .budget.alert_thresholds or [0.5 , 0.8 , 1.0 ]): if self .total_spend >= self .budget.monthly_budget_usd * threshold: action = {0.5 : "监控" , 0.8 : "审查" , 1.0 : "熔断" }[threshold] print (f"[告警 {action} ] 支出已达预算的 {threshold*100 :.0 f} %" )
成本优化总结 策略 节省 实现难度 语义缓存 ~30% 请求拦截 中 模型路由 30-70% 被路由请求 低 前缀/KV 缓存 90% 长系统提示词 低 Batch API 50% 异步负载 低 Prompt 压缩 最高 20x Token 压缩 中 预算治理 防止费用失控 低 组合优化 60-80% 总成本降低 中
本章小结 环节 关键行动 项目结构 分层、Pydantic 配置、.env.template 测试 三层:单元(Mock LLM)→ 集成(真实 API)→ 评估(质量) CI/CD 每次 Push 跑单元测试 + Lint,每天跑集成测试,通过后自动部署 容器化 非 root 用户、层缓存、健康检查、Docker Compose 编排 监控 LangSmith 零代码追踪 + OpenTelemetry 分布式追踪 + 结构化日志 部署 REST API / SSE 流式 / LangGraph Cloud / Kubernetes 安全 输入脱敏 + 输出护栏 + 风控分级 + 限流 扩缩容 基于队列深度(不是 CPU!),KEDA 事件驱动 版本管理 Prompt YAML 版本化 + 模型降级链 + A/B 测试 成本优化 语义缓存 + 模型路由 + 批处理 + 预算治理 = 60-80% 降低
生产化核心原则 :Agent 系统的大部分问题不是模型不行,而是 Harness 不行。先做好监控和安全,再追求性能优化。