第十章:生产级开发与部署流程

从本地跑通到上线生产,本节课完整梳理AI Agent应用的生产化路径:项目结构、测试、CI/CD、容器化、监控与部署策略。

从本地跑通到上线生产,这中间隔着一整套工程体系。

本章将带你从项目结构、测试、CI/CD、容器化、监控到部署策略,完整梳理 AI Agent 应用的生产化路径。


10.1 生产级项目结构

一个成熟的 AI Agent 项目不是”一个 Python 文件搞定”,而是分层、分模块的工程化结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
agentic-ai-app/
├── src/
│ ├── agents/ # Agent 角色定义
│ │ ├── __init__.py
│ │ ├── planner.py # 规划 Agent
│ │ ├── executor.py # 执行 Agent
│ │ └── validator.py # 校验 Agent
│ ├── tools/ # 外部工具接口
│ │ ├── __init__.py
│ │ ├── database.py # 数据库查询工具
│ │ ├── search.py # 搜索工具
│ │ └── api_client.py # 外部 API 工具
│ ├── workflows/ # LangGraph 工作流定义
│ │ ├── __init__.py
│ │ ├── graph.py # 主图构建
│ │ ├── state.py # 状态 TypedDict
│ │ └── nodes/ # 节点实现
│ │ ├── classify.py
│ │ ├── execute.py
│ │ └── validate.py
│ ├── memory/ # 持久化后端
│ │ ├── __init__.py
│ │ ├── redis_store.py
│ │ └── chroma_store.py
│ ├── infra/ # 横切关注点
│ │ ├── __init__.py
│ │ ├── logging.py # 结构化日志
│ │ ├── retries.py # 重试/退避逻辑
│ │ ├── auth.py # 认证中间件
│ │ ├── config.py # 配置管理
│ │ ├── security.py # 输入/输出安全
│ │ └── metrics.py # 指标采集
│ └── api/ # FastAPI 接口
│ ├── __init__.py
│ ├── main.py # FastAPI 应用
│ ├── router.py # 路由定义
│ └── dependencies.py # 共享依赖(checkpointer 等)
├── tests/
│ ├── conftest.py # Pytest fixtures
│ ├── unit_tests/ # 单元测试
│ └── integration_tests/ # 集成测试
├── infra/ # 部署基础设施
│ ├── Dockerfile
│ ├── docker-compose.yaml
│ ├── k8s/
│ │ ├── deployment.yaml
│ │ ├── service.yaml
│ │ └── scaledobject.yaml
│ └── otel-collector-config.yaml
├── .github/
│ └── workflows/
│ ├── unit-tests.yml
│ ├── integration-tests.yml
│ └── deploy.yml
├── .env.template # 环境变量模板(不提交真实值)
├── pyproject.toml
├── requirements.txt
└── README.md

配置管理

用 Pydantic 管理所有配置,统一从环境变量读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# src/infra/config.py
from pydantic_settings import BaseSettings
from typing import Optional

class AgentConfig(BaseSettings):
# LLM 配置
openai_api_key: str
anthropic_api_key: Optional[str] = None

# 模型选择
orchestrator_model: str = "gpt-4o"
fast_classifier_model: str = "gpt-4o-mini"
reasoning_model: str = "claude-sonnet-4-20250514"

# 基础设施
redis_url: str = "redis://localhost:6379"
database_url: Optional[str] = None

# LangSmith 可观测性
langsmith_api_key: Optional[str] = None
langsmith_tracing: bool = False
langsmith_project: str = "agent-prod"

# Agent 参数
max_iterations: int = 50
max_tokens_per_task: int = 100_000
default_temperature: float = 0.0

# 限流
rate_limit_calls: int = 100
rate_limit_window_seconds: int = 60

class Config:
env_file = ".env"
env_file_encoding = "utf-8"

# 单例模式
_config: Optional[AgentConfig] = None

def get_config() -> AgentConfig:
global _config
if _config is None:
_config = AgentConfig()
return _config

环境变量模板.env.template,提交到 Git,真实值绝不提交):

1
2
3
4
5
6
7
8
9
10
11
12
# .env.template — 提交此文件,绝不提交 .env
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
REDIS_URL=redis://localhost:6379
DATABASE_URL=postgresql://...

LANGSMITH_API_KEY=lsv2_...
LANGSMITH_TRACING=true
LANGSMITH_PROJECT=agent-prod

ORCHESTRATOR_MODEL=gpt-4o
FAST_CLASSIFIER_MODEL=gpt-4o-mini

10.2 三层测试策略

AI Agent 的测试和传统 Web 应用不同 — LLM 调用不确定、Token 消耗大、延迟高。因此需要分层测试

flowchart LR
subgraph L1["🔴 单元测试层"]
    direction TB
    U1[节点逻辑测试]
    U2[Mock LLM 调用]
    U3[配置校验]
    U4[部分图执行]
end

subgraph L2["🟡 集成测试层"]
    direction TB
    I1[端到端图执行]
    I2[真实 LLM API]
    I3[工具链路验证]
end

subgraph L3["🟢 评估测试层"]
    direction TB
    E1[质量评分]
    E2[幻觉检测]
    E3[准确率基准]
end

L1 -->|每日多次| L2
L2 -->|每日/手动| L3

style L1 fill:#ffebee
style L2 fill:#fff9c4
style L3 fill:#e8f5e9
层级目的速度外部调用触发时机
单元测试节点逻辑、配置校验毫秒级无(Mock LLM)每次提交/PR
集成测试端到端图执行秒~分钟真实 LLM API每日 + 手动
评估测试质量、幻觉检测分钟级真实 LLM API每周 + 发版

单元测试:节点隔离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# tests/unit_tests/test_nodes.py
import pytest
from langgraph.checkpoint.memory import MemorySaver
from src.workflows.graph import create_graph

def test_classify_intent_node():
"""测试单个节点 — 不需要 LLM 调用"""
graph = create_graph()
compiled = graph.compile(checkpointer=MemorySaver())

# 直接调用节点,绕过 checkpointer
result = compiled.nodes["classify_intent"].invoke(
{"messages": [], "current_intent": None, "user_message": "今天天气如何?"}
)
assert result["current_intent"] == "weather_query"

def test_should_validate_or_retry_exceeds_retries():
"""测试条件边的逻辑"""
from src.workflows.graph import should_validate_or_retry

state = {"error_count": 4, "tool_results": {"status": "error"}}
assert should_validate_or_retry(state) == "error"

state = {"error_count": 1, "tool_results": {"status": "success"}}
assert should_validate_or_retry(state) == "validate"

单元测试:Mock LLM 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# tests/unit_tests/test_agent_with_mock.py
import pytest
from unittest.mock import AsyncMock, patch
from langchain_core.messages import AIMessage

@pytest.fixture
def mock_llm():
"""Mock LLM,返回确定性结果"""
with patch("src.agents.planner.ChatOpenAI") as mock:
instance = mock.return_value
instance.ainvoke = AsyncMock(return_value=AIMessage(
content='{"tasks": [{"tool": "search", "params": {"query": "test"}}]}'
))
yield instance

async def test_planner_with_mock_llm(mock_llm):
"""使用 Mock LLM 测试规划器 — 零 API 调用"""
from src.agents.planner import plan_task

result = await plan_task("查找 Python 相关信息")
assert "tasks" in result
assert len(result["tasks"]) > 0

部分图执行测试

LangGraph 独有的能力 — 模拟某节点已执行,从中间恢复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# tests/unit_tests/test_partial_graph.py
def test_partial_execution_from_node2():
"""测试图的子部分 — 不运行整个流水线"""
checkpointer = MemorySaver()
graph = create_graph()
compiled = graph.compile(checkpointer=checkpointer)

# 模拟 classify_intent 节点已执行
compiled.update_state(
config={"configurable": {"thread_id": "test-1"}},
values={"messages": [], "tool_results": {"status": "pending"}},
as_node="classify_intent",
)

# 从该状态恢复,在 execute_tools 后暂停
result = compiled.invoke(
None,
config={"configurable": {"thread_id": "test-1"}},
interrupt_after="execute_tools",
)
assert result["tool_results"]["status"] == "success"

集成测试:真实 LLM 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# tests/integration_tests/test_graph.py
import pytest

pytestmark = pytest.mark.anyio # 本模块所有测试都是异步

@pytest.fixture(scope="session")
def anyio_backend():
return "asyncio"

@pytest.mark.langsmith # 启用 LangSmith 追踪
async def test_agent_simple_passthrough():
"""端到端测试 — 需要 API Key"""
from src.workflows.graph import create_graph
from langgraph.checkpoint.memory import MemorySaver

graph = create_graph()
compiled = graph.compile(checkpointer=MemorySaver())

result = await compiled.ainvoke(
{"messages": [{"role": "user", "content": "你好"}]},
config={"configurable": {"thread_id": "integration-test-1"}}
)
assert result is not None
assert len(result["messages"]) > 0

10.3 CI/CD 流水线

flowchart TD
A["开发者 Push / PR"] --> B{触发类型?}

B -->|每次 Push| C["单元测试流水线"]
C --> C1["Lint 检查 ruff"]
C1 --> C2["类型检查 mypy"]
C2 --> C3["单元测试 pytest"]
C3 --> C4{通过?}
C4 -->|是| C5["✅ 允许合并"]
C4 -->|否| C6["❌ 阻止合并"]

B -->|每日定时| D["集成测试流水线"]
D --> D1["端到端测试"]
D1 --> D2["真实 LLM API 调用"]
D2 --> D3["LangSmith 追踪"]

B -->|main 分支 Push| E["部署流水线"]
E --> E1["构建 + 单元测试"]
E1 --> E2["构建 Docker 镜像"]
E2 --> E3["推送到 Registry"]
E3 --> E4["部署到 K8s"]
E4 --> E5["滚动更新验证"]

style C fill:#ffebee
style D fill:#fff9c4
style E fill:#e8f5e9

单元测试流水线(每次 Push/PR 触发)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# .github/workflows/unit-tests.yml
name: Unit Tests
on:
push:
branches: [main]
pull_request:
branches: [main]

concurrency:
group: unit-tests-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install -U pip
pip install -r requirements.txt
pip install pytest ruff mypy

- name: Lint with ruff
run: ruff check .

- name: Type check with mypy
run: mypy --strict src/

- name: Run unit tests
run: pytest tests/unit_tests -v --tb=short

集成测试流水线(每日 + 手动触发)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# .github/workflows/integration-tests.yml
name: Integration Tests
on:
schedule:
- cron: "37 14 * * *" # 每天UTC 14:37
workflow_dispatch: # 手动触发

jobs:
integration-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.11"
- run: pip install -r requirements.txt pytest pytest-asyncio anyio
- name: Run integration tests
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
LANGSMITH_API_KEY: ${{ secrets.LANGSMITH_API_KEY }}
LANGSMITH_TRACING: true
run: pytest tests/integration_tests -v

部署流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# .github/workflows/deploy.yml
name: Deploy Agent Service
on:
push:
branches: [main]

jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.11"
- run: pip install -r requirements.txt
- run: ruff check .
- run: pytest tests/unit_tests

build-docker:
needs: build-and-test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Docker Image
run: docker build -t agent-service:${{ github.sha }} .
- name: Push to Registry
run: |
docker tag agent-service:${{ github.sha }} ghcr.io/org/agent-service:latest
docker push ghcr.io/org/agent-service:latest

deploy:
needs: build-docker
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/agent-service \
agent=ghcr.io/org/agent-service:${{ github.sha }} \
--namespace production
kubectl rollout status deployment/agent-service --namespace production

10.4 Docker 容器化

生产级 Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Dockerfile
FROM python:3.11-slim AS base

# 安全:创建非 root 用户
RUN groupadd -r appuser && useradd -r -g appuser appuser

WORKDIR /app

# 先装依赖(利用层缓存)
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt

# 再拷代码
COPY src/ src/
COPY pyproject.toml .

# 生产环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')" || exit 1

# 非 root 用户运行
USER appuser

EXPOSE 8000

CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Docker Compose 多服务编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# docker-compose.yaml
version: "3.9"

services:
agent-api:
build: .
ports:
- "8000:8000"
env_file: .env
depends_on:
redis:
condition: service_healthy
vector-db:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/healthz"]
interval: 30s
timeout: 10s
retries: 3
deploy:
replicas: 2
resources:
limits:
memory: 2G
cpus: "2.0"

worker-agent:
build: .
command: ["python", "-m", "src.worker"]
env_file: .env
depends_on:
redis:
condition: service_healthy
deploy:
replicas: 3
resources:
limits:
memory: 4G

redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s

vector-db:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant-data:/qdrant/storage
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:6333/healthz"]
interval: 10s

otel-collector:
image: otel/opentelemetry-collector-contrib:latest
volumes:
- ./infra/otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml
ports:
- "4317:4317" # gRPC
- "4318:4318" # HTTP

volumes:
redis-data:
qdrant-data:

健康检查端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# src/api/main.py
from fastapi import FastAPI
from datetime import datetime

app = FastAPI()

@app.get("/healthz")
async def liveness():
"""存活探针 — 进程是否在运行"""
return {"status": "ok"}

@app.get("/readiness")
async def readiness():
"""就绪探针 — 是否能接受流量"""
from src.infra.config import get_config
config = get_config()

checks = {}
# 检查 Redis
try:
import redis
r = redis.from_url(config.redis_url)
r.ping()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = f"error: {e}"

# 检查 LLM API 连通性
try:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model=config.fast_classifier_model, max_tokens=1)
llm.invoke("ping")
checks["llm_api"] = "ok"
except Exception as e:
checks["llm_api"] = f"error: {e}"

all_ok = all(v == "ok" for v in checks.values())
return {
"status": "ok" if all_ok else "degraded",
"checks": checks,
"timestamp": datetime.utcnow().isoformat(),
}

10.5 监控与可观测性

flowchart TD
subgraph App["Agent 应用"]
    A1[LLM 调用]
    A2[工具执行]
    A3[状态转换]
end

subgraph LS["LangSmith 零代码追踪"]
    L1[自动追踪每次调用]
    L2[决策过程可视化]
    L3[Token 消耗统计]
end

subgraph OTel["OpenTelemetry 分布式追踪"]
    O1[跨服务追踪]
    O2[自定义 Span]
    O3["导出到 LangSmith / Jaeger"]
end

subgraph Log["structlog 结构化日志"]
    G1[JSON 格式日志]
    G2[上下文绑定]
    G3[ELK 采集]
end

App -->|环境变量即可| LS
App -->|代码埋点| OTel
App -->|structlog| Log

style App fill:#e3f2fd
style LS fill:#e8f5e9
style OTel fill:#fff9c4
style Log fill:#f3e5f5

LangSmith 集成(零代码追踪)

1
2
3
4
5
# 只需设置环境变量,所有 LangChain/LangGraph 调用自动追踪
LANGSMITH_TRACING=true
LANGSMITH_API_KEY=lsv2_pt_...
LANGSMITH_PROJECT=agent-prod
LANGSMITH_OTEL_ENABLED=true

不需要改任何代码,LangSmith 会自动追踪每一次 LLM 调用、工具执行和状态转换。

OpenTelemetry 分布式追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# src/infra/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

def setup_telemetry(service_name: str = "agent-service"):
"""配置 OpenTelemetry 分布式追踪"""
provider = TracerProvider()

# 导出到 LangSmith(通过 OTel)
exporter = OTLPSpanExporter(
endpoint="https://api.smith.langchain.com/otel/v1/traces",
headers={
"x-api-key": get_config().langsmith_api_key,
"Langsmith-Project": get_config().langsmith_project,
},
)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)

# 埋点示例
tracer = setup_telemetry()

@tracer.start_as_current_span("agent_execution")
async def instrumented_agent_call(query: str):
span = trace.get_current_span()
span.set_attribute("query_length", len(query))

try:
result = await agent.ainvoke(query)
span.set_attribute("success", True)
span.set_attribute("tool_calls", len(result.get("tool_calls", [])))
span.set_attribute("input_tokens", result.get("usage", {}).get("input_tokens", 0))
return result
except Exception as e:
span.set_attribute("success", False)
span.set_attribute("error", str(e))
raise

结构化日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# src/infra/logging.py
import structlog
import logging

def setup_logging():
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(), # JSON 格式,方便 ELK 采集
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)

# 使用
logger = structlog.get_logger()

async def process_query(query: str):
log = logger.bind(query_length=len(query), session_id=session_id)
log.info("agent_invocation_started")

try:
result = await agent.ainvoke(query)
log.info("agent_invocation_completed",
tokens_used=result.get("usage", {}),
tool_calls=len(result.get("tool_calls", [])))
return result
except Exception as e:
log.error("agent_invocation_failed", error=str(e), error_type=type(e).__name__)
raise

关键指标与目标

指标建议目标说明
P95 延迟< 5s(单轮)包含 LLM + 工具执行时间
工具调用错误率< 3%超过说明工具不稳定
循环遏制率> 99%Agent 不应陷入无限循环
缓存命中率> 30%语义缓存命中率
Token 消耗按预算监控设置月度上限和告警

10.6 部署模式

flowchart TD
A[AI Agent 部署] --> B[模式一: REST API]
A --> C[模式二: SSE 流式]
A --> D[模式三: LangGraph Cloud]
A --> E[模式四: Kubernetes]

B --> B1["FastAPI + LangGraph<br/>同步请求场景<br/>最简单"]
C --> C1["StreamingResponse<br/>实时输出中间过程<br/>用户体验好"]
D --> D1["全托管服务<br/>开箱即用<br/>零运维"]
E --> E1["KEDA 弹性伸缩<br/>队列驱动扩缩<br/>企业级"]

style B fill:#e3f2fd
style C fill:#fff9c4
style D fill:#e8f5e9
style E fill:#f3e5f5

模式一:REST API(FastAPI + LangGraph)

最简单的部署方式,适合同步请求场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# src/api/main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.memory import MemorySaver
from src.workflows.graph import create_graph

app = FastAPI(title="AI Agent Service", version="1.0.0")

# 共享 checkpointer(生产环境用 Redis/Postgres)
checkpointer = MemorySaver()
graph = create_graph()
compiled_graph = graph.compile(checkpointer=checkpointer)

class ChatRequest(BaseModel):
message: str
thread_id: str = "default"

class ChatResponse(BaseModel):
response: str
thread_id: str

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
config = {"configurable": {"thread_id": request.thread_id}}
try:
result = await compiled_graph.ainvoke(
{"messages": [{"role": "user", "content": request.message}]},
config=config,
)
last_message = result["messages"][-1].content
return ChatResponse(response=last_message, thread_id=request.thread_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

模式二:SSE 流式响应

实时输出 Agent 的中间过程,用户体验更好:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# src/api/router.py
from fastapi import APIRouter, Form, UploadFile
from fastapi.responses import StreamingResponse
import json

router = APIRouter(prefix="/api")

@router.post("/stream")
async def stream_agent(
user_message: str = Form(...),
session_id: str = Form(...),
):
async def event_generator():
try:
async for event in compiled_graph.astream(
{"messages": [{"role": "user", "content": user_message}]},
config={"configurable": {"thread_id": session_id}},
stream_mode=["custom", "updates", "messages"],
subgraphs=True,
):
if isinstance(event, tuple):
mode, payload = event
else:
mode, payload = "unknown", event
yield f"data: {json.dumps({'mode': mode, 'payload': payload})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx 必须加这个
},
)

模式三:LangGraph Cloud 托管部署

LangGraph 提供全托管服务,开箱即用:

1
2
3
4
5
6
7
8
# 安装 CLI
pip install langgraph-cli

# 部署到 LangGraph Cloud
langgraph deploy

# 或者构建独立容器
langgraph build

LangGraph Cloud 提供:自动 API 生成、内置 Checkpointer 持久化、流式端点(SSE + WebSocket)、LangSmith 可观测性集成、水平扩展、认证与授权。

模式四:Kubernetes 部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# infra/k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service
spec:
replicas: 3
selector:
matchLabels:
app: agent-service
template:
metadata:
labels:
app: agent-service
spec:
containers:
- name: agent
image: ghcr.io/org/agent-service:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: LANGSMITH_TRACING
value: "true"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /healthz
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /readiness
port: 8000
initialDelaySeconds: 15
periodSeconds: 10
terminationGracePeriodSeconds: 120 # Agent 需要时间完成当前任务

部署策略选择

策略风险成本适用场景
滚动更新小版本更新、Bug 修复
蓝绿部署大版本变更、Breaking Change
金丝雀发布很低Prompt 更新、模型切换
影子部署很高安全关键变更

实操建议:基础设施用滚动更新,Agent 逻辑(Prompt/工具/模型)用金丝雀发布,高风险变更用影子部署。


10.7 安全策略

flowchart TD
A["用户输入"] --> B["输入清洗 sanitize_input"]
B --> C{"敏感信息检测"}
C -->|检测到| D["脱敏处理<br/>SSN → SSN_REDACTED<br/>信用卡 → CC_REDACTED"]
C -->|安全| E["注入攻击检测"]
D --> E
E -->|检测到| F["过滤攻击指令<br/>ignore previous → FILTERED"]
E -->|安全| G["进入 Agent"]
F --> G

G --> H{"工具调用风控"}
H -->|LOW 风险| I["自动执行"]
H -->|MEDIUM/HIGH 风险| J["需要审核"]
H -->|CRITICAL 风险| K["需要明确审批"]

J --> L["限流检查<br/>RateLimiter"]
K --> L
L -->|通过| M["执行工具"]
L -->|超限| N["拒绝请求"]

style B fill:#e3f2fd
style C fill:#fff9c4
style E fill:#fff9c4
style H fill:#f3e5f5
style L fill:#ffebee

API Key 三层管理

环境方式
本地开发.env 文件(gitignored)
CI/CDGitHub Secrets / GitLab CI Variables
生产AWS Secrets Manager / HashiCorp Vault

输入安全(OWASP 对齐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# src/infra/security.py
import re

MAX_INPUT_LENGTH = 10000

# 敏感信息脱敏
SENSITIVE_PATTERNS = [
(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REDACTED]"), # 社保号
(r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", "[CC_REDACTED]"), # 信用卡
(r"(?i)(api[_-]?key|password|secret|token)\s*[:=]\s*\S+", "[CREDENTIAL_REDACTED]"),
]

# 注入攻击检测
INJECTION_PATTERNS = [
r"(?i)ignore\s+(all\s+)?previous\s+instructions",
r"(?i)forget\s+(all\s+)?previous",
r"(?i)system\s*:",
r"(?i)you\s+are\s+now",
]

def sanitize_input(text: str) -> str:
"""用户输入进入 Agent 前的清洗"""
if len(text) > MAX_INPUT_LENGTH:
text = text[:MAX_INPUT_LENGTH]
for pattern, replacement in SENSITIVE_PATTERNS:
text = re.sub(pattern, replacement, text)
return text

def sanitize_for_memory(text: str) -> str:
"""持久化到记忆前的清洗"""
text = sanitize_input(text)
for pattern in INJECTION_PATTERNS:
text = re.sub(pattern, "[FILTERED]", text)
return text

工具调用风控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# src/infra/guardrails.py
from enum import Enum

class RiskLevel(Enum):
LOW = "low" # 自动执行
MEDIUM = "medium" # 需要审核
HIGH = "high" # 需要审核
CRITICAL = "critical" # 需要明确审批

TOOL_RISK_LEVELS = {
"search_documents": RiskLevel.LOW,
"read_file": RiskLevel.LOW,
"write_file": RiskLevel.MEDIUM,
"send_email": RiskLevel.HIGH,
"execute_code": RiskLevel.HIGH,
"database_delete": RiskLevel.CRITICAL,
"transfer_funds": RiskLevel.CRITICAL,
}

AUTO_APPROVE_THRESHOLD = RiskLevel.LOW

def requires_human_approval(tool_name: str) -> bool:
risk = TOOL_RISK_LEVELS.get(tool_name, RiskLevel.HIGH)
return risk.value > AUTO_APPROVE_THRESHOLD.value

限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# src/infra/guardrails.py(续)
from datetime import datetime, timedelta
from typing import List

class RateLimiter:
def __init__(self, max_calls: int = 100, window_seconds: int = 60):
self.max_calls = max_calls
self.window = timedelta(seconds=window_seconds)
self.calls: List[datetime] = []

def check(self) -> bool:
now = datetime.utcnow()
self.calls = [t for t in self.calls if now - t < self.window]
if len(self.calls) >= self.max_calls:
return False
self.calls.append(now)
return True

10.8 扩缩容策略

关键洞察:传统基于 CPU 的 HPA 对 Agent 无效!Agent 大部分时间在等 LLM API 响应,CPU 利用率可能只有 5%,但系统已经饱和。

基于队列深度的弹性伸缩(推荐)

使用消息队列 + KEDA 实现事件驱动扩缩:

flowchart LR
U[用户请求] --> GW[API Gateway]
GW --> Q["任务队列<br/>Redis List"]
Q --> W1[Agent Worker 1]
Q --> W2[Agent Worker 2]
Q --> W3[Agent Worker N]

W1 --> R[处理 & 回调]
W2 --> R
W3 --> R

KEDA["KEDA 监控<br/>队列深度"] -->|队列 > 5/Worker| AUTO[自动扩容]
KEDA -->|队列 = 0| SHRK[自动缩容]

style Q fill:#ffebee
style KEDA fill:#e3f2fd
style AUTO fill:#e8f5e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# infra/k8s/scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: agent-worker-scaler
spec:
scaleTargetRef:
name: agent-worker
minReplicaCount: 2
maxReplicaCount: 50
triggers:
- type: redis-lists
metadata:
listName: agent-tasks
listLength: "5" # 每 Worker 5 个任务时开始扩容
address: redis-service:6379

异步并发控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# src/api/async_handler.py
import asyncio
from typing import Any, Callable

class AsyncTaskProcessor:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)

async def process(self, task: Callable, *args, **kwargs) -> Any:
async with self.semaphore:
return await task(*args, **kwargs)

processor = AsyncTaskProcessor(max_concurrent=10)

@app.post("/chat/async")
async def chat_async(request: ChatRequest):
task_id = str(uuid4())
asyncio.create_task(
processor.process(agent.ainvoke, request.message)
)
return {"task_id": task_id, "status": "processing"}

扩缩容决策矩阵

场景策略
无状态 Agent Worker基于队列深度的 HPA(KEDA)
有状态 Agent(会话型)StatefulSet + 外部状态存储
批处理K8s Jobs/CronJobs + Provider Batch API
GPU 工作负载(本地模型)NVIDIA Device Plugin + vLLM

10.9 版本管理

Prompt 版本化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# src/workflows/prompts/__init__.py
from pathlib import Path
import yaml

class PromptRegistry:
def __init__(self, prompts_dir: str = "src/workflows/prompts"):
self.prompts_dir = Path(prompts_dir)

def get(self, prompt_name: str, version: str = "latest") -> str:
"""从 YAML 文件加载版本化的 Prompt"""
prompt_file = self.prompts_dir / f"{prompt_name}.yaml"
with open(prompt_file) as f:
prompts = yaml.safe_load(f)
return prompts["versions"][version]["template"]

Prompt YAML 示例:

1
2
3
4
5
6
7
8
9
10
11
# src/workflows/prompts/orchestrator.yaml
versions:
v1:
template: "你是一个有用的助手..."
model: gpt-4o
temperature: 0
v2:
template: "你是一个 AI 编排 Agent..."
model: gpt-4o
temperature: 0
changelog: "添加工具感知和结构化输出"

模型版本 + 降级链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# src/infra/model_registry.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelConfig:
name: str
version: str
provider: str
fallback: Optional["ModelConfig"] = None

MODELS = {
"orchestrator": ModelConfig(
name="gpt-4o", version="2024-11-20", provider="openai",
fallback=ModelConfig(name="gpt-4o-mini", version="latest", provider="openai")
),
"reasoner": ModelConfig(
name="claude-sonnet-4-20250514", version="2025-05-14", provider="anthropic",
fallback=ModelConfig(name="gpt-4o", version="2024-11-20", provider="openai")
),
}

A/B 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# src/infra/ab_testing.py
import hashlib

class ABTestRouter:
def __init__(self, experiments: dict):
self.experiments = experiments

def get_variant(self, user_id: str, experiment: str) -> str:
"""确定性分配用户到实验组"""
config = self.experiments[experiment]
hash_input = f"{user_id}:{experiment}"
hash_val = int(hashlib.md5(hash_input.encode()).hexdigest(), 16) % 100

cumulative = 0
for variant, percentage in config["variants"].items():
cumulative += percentage
if hash_val < cumulative:
return variant
return list(config["variants"].keys())[-1]

# 使用
experiments = {
"prompt_v2_rollout": {
"variants": {
"control": 80, # 80% 用 v1 prompt
"treatment": 20, # 20% 用 v2 prompt
}
}
}

10.10 成本优化

组合优化栈

flowchart TD
A[请求进入] --> B{1. 语义缓存命中?}
B -->|命中| R1["返回缓存结果<br/>节省 ~30% 请求"]
B -->|未命中| C{2. 任务复杂度?}
C -->|简单| R2["路由到便宜模型<br/>节省 30-70%"]
C -->|复杂| D{3. 前缀缓存可用?}
D -->|可用| R3["复用 KV Cache<br/>节省 ~90%"]
D -->|不可用| E{4. 可异步处理?}
E -->|可以| R4["使用 Batch API<br/>节省 ~50%"]
E -->|实时| F[5. 高端模型执行]
F --> G{6. 超出预算?}
G -->|是| R5["返回部分结果<br/>预算治理"]
G -->|否| R6[完整结果返回]

style B fill:#e3f2fd
style C fill:#fff9c4
style D fill:#f3e5f5
style E fill:#e8f5e9
style G fill:#ffebee

语义缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from gptcache import Cache
from gptcache.embedding import OpenAI as OpenAIEmbedding
from gptcache.adapter import openai as gptcache_openai
from gptcache.similarity_evaluation import Cosine

cache = Cache()
embedding_func = OpenAIEmbedding(model="text-embedding-3-small")
cache.init(
embedding_func=embedding_func.embedding,
similarity_evaluation=Cosine(),
similarity_threshold=0.85,
)

# 语义相同的问题直接命中缓存,无需调用 LLM
response = gptcache_openai.ChatCompletion.create(
model="gpt-4o",
messages=[{"role": "user", "content": "总结 Q4 财报"}]
)

动态模型路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# src/infra/model_router.py
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

class ModelRouter:
def __init__(self):
self.cheap = ChatOpenAI(model="gpt-4o-mini", temperature=0) # ~$0.15/M tokens
self.mid = ChatOpenAI(model="gpt-4o", temperature=0) # ~$2.50/M tokens
self.premium = ChatAnthropic(model="claude-sonnet-4-20250514", temperature=0.3) # ~$3.00/M tokens

async def route(self, task: str, complexity_score: float):
if complexity_score < 0.3:
return "cheap"
elif complexity_score < 0.7:
return "mid"
else:
return "premium"

async def execute(self, query: str, complexity_score: float):
route = await self.route(query, complexity_score)
model_map = {"cheap": self.cheap, "mid": self.mid, "premium": self.premium}
return await model_map[route].ainvoke([{"role": "user", "content": query}])

预算治理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# src/infra/budget.py
from dataclasses import dataclass

@dataclass
class TokenBudget:
max_tokens_per_task: int = 100_000
max_iterations_per_task: int = 50
monthly_budget_usd: float = 10_000
alert_thresholds: list = None # [0.5, 0.8, 1.0]

class BudgetExhausted(Exception):
pass

class BudgetGovernor:
def __init__(self, budget: TokenBudget):
self.budget = budget
self.total_spend = 0.0

def check_iteration_limit(self, iteration_count: int):
if iteration_count >= self.budget.max_iterations_per_task:
raise BudgetExhausted(
f"Agent 超过 {self.budget.max_iterations_per_task} 次迭代,可能陷入循环"
)

def check_token_budget(self, tokens_used: int):
if tokens_used >= self.budget.max_tokens_per_task:
raise BudgetExhausted(
f"Token 预算耗尽({tokens_used}/{self.budget.max_tokens_per_task})"
)

def record_spend(self, cost_usd: float, feature: str):
self.total_spend += cost_usd
for threshold in (self.budget.alert_thresholds or [0.5, 0.8, 1.0]):
if self.total_spend >= self.budget.monthly_budget_usd * threshold:
action = {0.5: "监控", 0.8: "审查", 1.0: "熔断"}[threshold]
print(f"[告警 {action}] 支出已达预算的 {threshold*100:.0f}%")

成本优化总结

策略节省实现难度
语义缓存~30% 请求拦截
模型路由30-70% 被路由请求
前缀/KV 缓存90% 长系统提示词
Batch API50% 异步负载
Prompt 压缩最高 20x Token 压缩
预算治理防止费用失控
组合优化60-80% 总成本降低

本章小结

环节关键行动
项目结构分层、Pydantic 配置、.env.template
测试三层:单元(Mock LLM)→ 集成(真实 API)→ 评估(质量)
CI/CD每次 Push 跑单元测试 + Lint,每天跑集成测试,通过后自动部署
容器化非 root 用户、层缓存、健康检查、Docker Compose 编排
监控LangSmith 零代码追踪 + OpenTelemetry 分布式追踪 + 结构化日志
部署REST API / SSE 流式 / LangGraph Cloud / Kubernetes
安全输入脱敏 + 输出护栏 + 风控分级 + 限流
扩缩容基于队列深度(不是 CPU!),KEDA 事件驱动
版本管理Prompt YAML 版本化 + 模型降级链 + A/B 测试
成本优化语义缓存 + 模型路由 + 批处理 + 预算治理 = 60-80% 降低

生产化核心原则:Agent 系统的大部分问题不是模型不行,而是 Harness 不行。先做好监控和安全,再追求性能优化。