04_图的流转控制

掌握LangGraph的流程控制技巧!本节课深入讲解条件边、循环、并行执行等高级特性。学会如何根据业务逻辑设计复杂的流程分支,实现智能路由、自动重试等实用功能。通过实战案例,构建灵活可控的AI工作流。


🎯 学习目标

学完本节课,你将能够:

  1. 使用条件边实现动态路由
  2. 构建循环流程(重试、迭代优化)
  3. 实现并行节点提高效率
  4. 组合多种控制流构建复杂工作流
  5. 掌握流程控制的最佳实践

1. 基础:三种边类型

1.1 普通边(固定流转)

特点: 无条件执行,A→B固定连接

1
2
graph.add_edge("node_a", "node_b")
# 执行完node_a,一定会执行node_b

适用场景:

  • 固定的处理流程
  • 数据预处理 → 模型推理 → 结果后处理

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
text: str
processed: str
result: str

def preprocess(state: State) -> State:
state["processed"] = state["text"].strip().lower()
return state

def analyze(state: State) -> State:
state["result"] = f"分析了: {state['processed']}"
return state

graph = StateGraph(State)
graph.add_node("preprocess", preprocess)
graph.add_node("analyze", analyze)

graph.set_entry_point("preprocess")
graph.add_edge("preprocess", "analyze") # 固定流转
graph.add_edge("analyze", END)

1.2 条件边(动态路由)

特点: 根据状态决定下一步,A→B或C或D

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def router(state: State) -> str:
"""返回下一个节点的名称"""
if state["score"] > 80:
return "excellent"
elif state["score"] > 60:
return "good"
else:
return "poor"

graph.add_conditional_edges(
"check", # 源节点
router, # 路由函数
{
"excellent": "reward",
"good": "encourage",
"poor": "retry"
}
)

适用场景:

  • 意图识别后的路由
  • 根据结果判断是否重试
  • 多分支业务逻辑

1.3 入口和出口

1
2
3
4
5
6
7
8
9
10
11
12
# 设置入口:从哪个节点开始
graph.set_entry_point("start_node")

# 设置出口:连接到END表示结束
graph.add_edge("final_node", END)

# 或者在条件边中返回END
def router(state: State) -> str:
if state["done"]:
return END # 直接结束
else:
return "continue"

2. 条件边详解

2.1 基本用法

路由函数的要求:

  1. 接收state作为参数
  2. 返回字符串(节点名称)或END
  3. 返回值必须在映射字典的key中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def my_router(state: State) -> str:
"""
路由函数:根据状态决定下一步

返回值必须是已定义的节点名称或END
"""
if state["condition"]:
return "path_a" # 必须是已添加的节点名
else:
return "path_b"

graph.add_conditional_edges(
"decision_node",
my_router,
{
"path_a": "node_a", # 路由返回"path_a"时,执行node_a
"path_b": "node_b" # 路由返回"path_b"时,执行node_b
}
)

2.2 实战:智能客服路由

创建 01_customer_service_router.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
智能客服路由系统
学习目标:使用条件边实现意图路由
"""
from typing import TypedDict
from typing_extensions import NotRequired
from langgraph.graph import StateGraph, END


# 定义状态
class State(TypedDict):
user_message: str
intent: NotRequired[str]
response: NotRequired[str]


# 节点1:识别意图(简化版,实际应该用LLM)
def detect_intent(state: State) -> State:
"""识别用户意图"""
message = state["user_message"].lower()

if "退款" in message or "退货" in message:
intent = "refund"
elif "投诉" in message or "不满意" in message:
intent = "complaint"
elif "查询" in message or "订单" in message:
intent = "query"
elif "咨询" in message or "问题" in message:
intent = "consult"
else:
intent = "unknown"

state["intent"] = intent
print(f"[意图识别] {intent}")
return state


# 路由函数:根据意图决定下一步
def route_by_intent(state: State) -> str:
"""
根据意图路由到不同的处理节点

返回值必须在条件边的映射中定义
"""
intent_map = {
"refund": "handle_refund",
"complaint": "handle_complaint",
"query": "handle_query",
"consult": "handle_consult",
"unknown": "handle_unknown"
}
return intent_map[state["intent"]]


# 处理节点们
def handle_refund(state: State) -> State:
"""处理退款"""
state["response"] = "退款申请已提交,预计3-5个工作日到账。"
print(f"[退款处理] {state['response']}")
return state


def handle_complaint(state: State) -> State:
"""处理投诉"""
state["response"] = "非常抱歉给您带来不便,已为您转接人工客服。"
print(f"[投诉处理] {state['response']}")
return state


def handle_query(state: State) -> State:
"""处理查询"""
state["response"] = "您的订单正在配送中,预计明天送达。"
print(f"[查询处理] {state['response']}")
return state


def handle_consult(state: State) -> State:
"""处理咨询"""
state["response"] = "请问有什么可以帮您?我们的客服在线时间是9:00-21:00。"
print(f"[咨询处理] {state['response']}")
return state


def handle_unknown(state: State) -> State:
"""处理未知意图"""
state["response"] = "抱歉,我没有理解您的意思,请换一种方式描述。"
print(f"[未知处理] {state['response']}")
return state


# 构建图
def create_graph() -> StateGraph:
"""创建客服路由图"""
graph = StateGraph(State)

# 添加节点
graph.add_node("detect", detect_intent)
graph.add_node("handle_refund", handle_refund)
graph.add_node("handle_complaint", handle_complaint)
graph.add_node("handle_query", handle_query)
graph.add_node("handle_consult", handle_consult)
graph.add_node("handle_unknown", handle_unknown)

# 设置入口
graph.set_entry_point("detect")

# 添加条件边:根据意图路由
graph.add_conditional_edges(
"detect",
route_by_intent,
{
"handle_refund": "handle_refund",
"handle_complaint": "handle_complaint",
"handle_query": "handle_query",
"handle_consult": "handle_consult",
"handle_unknown": "handle_unknown"
}
)

# 所有处理节点都结束
graph.add_edge("handle_refund", END)
graph.add_edge("handle_complaint", END)
graph.add_edge("handle_query", END)
graph.add_edge("handle_consult", END)
graph.add_edge("handle_unknown", END)

return graph


def main() -> None:
"""测试不同意图的路由"""
app = create_graph().compile()

test_cases = [
"我要退款",
"投诉你们的服务态度",
"查询订单状态",
"咨询一下产品信息",
"balabala" # 未知意图
]

print("="*60)
print("智能客服路由系统")
print("="*60)

for i, message in enumerate(test_cases, 1):
print(f"\n{'='*60}")
print(f"测试 {i}: {message}")
print(f"{'='*60}")

result = app.invoke({"user_message": message})
print(f"最终回复: {result['response']}")


if __name__ == "__main__":
main()

运行效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
==================================================
智能客服路由系统
==================================================

==================================================
测试 1: 我要退款
==================================================
[意图识别] refund
[退款处理] 退款申请已提交,预计3-5个工作日到账。
最终回复: 退款申请已提交,预计3-5个工作日到账。

==================================================
测试 2: 投诉你们的服务态度
==================================================
[意图识别] complaint
[投诉处理] 非常抱歉给您带来不便,已为您转接人工客服。
最终回复: 非常抱歉给您带来不便,已为您转接人工客服。

3. 循环流程

3.1 实现循环的方法

方式1:节点指向自己

1
graph.add_edge("retry_node", "retry_node")  # 无限循环,慎用!

方式2:条件循环

1
2
3
4
5
6
7
8
9
10
11
def should_retry(state: State) -> str:
if state["retry_count"] < 3 and not state["success"]:
return "retry" # 回到处理节点
else:
return "done" # 结束

graph.add_conditional_edges("check", should_retry, {
"retry": "process", # 回到process节点
"done": END
})
graph.add_edge("process", "check") # process → check形成循环

3.2 实战:带重试的任务处理

创建 02_retry_mechanism.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
带重试机制的任务处理
学习目标:使用循环实现自动重试
"""
import random
from typing import TypedDict
from typing_extensions import NotRequired
from langgraph.graph import StateGraph, END


# 定义状态
class State(TypedDict):
task: str
retry_count: int
max_retries: int
success: NotRequired[bool]
result: NotRequired[str]
error: NotRequired[str]


# 节点1:执行任务(模拟可能失败)
def execute_task(state: State) -> State:
"""执行任务,可能成功或失败"""
task = state["task"]
retry_count = state["retry_count"]

print(f"\n[执行任务] 第{retry_count + 1}次尝试: {task}")

# 模拟30%的成功率
if random.random() < 0.3:
state["success"] = True
state["result"] = f"任务'{task}'执行成功!"
print(f"✅ 成功!")
else:
state["success"] = False
state["error"] = "网络超时"
print(f"❌ 失败:{state['error']}")

return state


# 节点2:检查是否需要重试
def check_retry(state: State) -> State:
"""检查任务状态,决定是否重试"""
if not state.get("success", False):
state["retry_count"] += 1
print(f"[检查] 准备重试,已重试{state['retry_count']}次")
else:
print(f"[检查] 任务成功,无需重试")

return state


# 路由函数:决定是继续重试还是结束
def should_retry(state: State) -> str:
"""
根据成功状态和重试次数决定是否重试

返回:
"retry": 继续重试
"success": 成功结束
"failed": 超过最大重试次数,失败结束
"""
if state.get("success", False):
return "success"
elif state["retry_count"] < state["max_retries"]:
return "retry"
else:
return "failed"


# 节点3:成功处理
def handle_success(state: State) -> State:
"""处理成功情况"""
print(f"\n{'='*60}")
print(f"✅ 最终结果:{state['result']}")
print(f" 总共尝试:{state['retry_count'] + 1}次")
print(f"{'='*60}")
return state


# 节点4:失败处理
def handle_failure(state: State) -> State:
"""处理失败情况"""
print(f"\n{'='*60}")
print(f"❌ 最终结果:任务失败")
print(f" 错误信息:{state.get('error', '未知错误')}")
print(f" 已重试:{state['retry_count']}次(达到上限)")
print(f"{'='*60}")
state["result"] = "任务最终失败"
return state


# 构建图
def create_graph() -> StateGraph:
"""创建带重试机制的任务处理图"""
graph = StateGraph(State)

# 添加节点
graph.add_node("execute", execute_task)
graph.add_node("check", check_retry)
graph.add_node("success", handle_success)
graph.add_node("failed", handle_failure)

# 设置流程
graph.set_entry_point("execute")

# execute → check
graph.add_edge("execute", "check")

# check → 条件路由(形成循环)
graph.add_conditional_edges(
"check",
should_retry,
{
"retry": "execute", # 回到execute,形成循环
"success": "success", # 成功 → 成功处理
"failed": "failed" # 失败 → 失败处理
}
)

# 结束节点
graph.add_edge("success", END)
graph.add_edge("failed", END)

return graph


def main() -> None:
"""测试重试机制"""
app = create_graph().compile()

print("="*60)
print("带重试机制的任务处理")
print("="*60)

# 运行任务
result = app.invoke({
"task": "下载文件",
"retry_count": 0,
"max_retries": 5
})

print(f"\n最终状态: success={result.get('success', False)}")


if __name__ == "__main__":
main()

运行效果(随机):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
==================================================
带重试机制的任务处理
==================================================

[执行任务] 第1次尝试: 下载文件
❌ 失败:网络超时
[检查] 准备重试,已重试1次

[执行任务] 第2次尝试: 下载文件
❌ 失败:网络超时
[检查] 准备重试,已重试2次

[执行任务] 第3次尝试: 下载文件
✅ 成功!
[检查] 任务成功,无需重试

==================================================
✅ 最终结果:任务'下载文件'执行成功!
总共尝试:3次
==================================================

4. 并行执行

4.1 为什么需要并行?

场景: 查询多个数据源

1
2
3
4
5
6
7
串行(慢):
数据库查询(2s) → 文件读取(1s) → API调用(3s) = 总共6秒

并行(快):
数据库查询(2s) ┐
文件读取(1s) ├→ 汇总 = 总共3秒
API调用(3s) ┘

4.2 LangGraph中的并行

方法: 使用 add_edge 让多个节点指向同一个汇总节点

1
2
3
4
5
6
7
8
9
# 三个数据源节点并行执行
graph.add_edge("start", "fetch_db")
graph.add_edge("start", "fetch_file")
graph.add_edge("start", "fetch_api")

# 都完成后,汇总
graph.add_edge("fetch_db", "aggregate")
graph.add_edge("fetch_file", "aggregate")
graph.add_edge("fetch_api", "aggregate")

注意: LangGraph会等待所有前置节点完成后,才执行汇总节点。

4.3 实战:并行数据查询

创建 03_parallel_fetch.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
并行数据查询
学习目标:使用并行节点提高效率
"""
import time
from typing import TypedDict, List, Dict
from typing_extensions import NotRequired
from langgraph.graph import StateGraph, END


# 定义状态
class State(TypedDict):
query: str
db_result: NotRequired[Dict]
file_result: NotRequired[Dict]
api_result: NotRequired[Dict]
final_result: NotRequired[List[Dict]]


# 节点1:从数据库查询(模拟2秒)
def fetch_from_db(state: State) -> State:
"""从数据库查询"""
print("[数据库] 开始查询...")
time.sleep(2)
state["db_result"] = {
"source": "数据库",
"data": ["记录1", "记录2"],
"time": "2秒"
}
print("[数据库] ✅ 查询完成")
return state


# 节点2:从文件读取(模拟1秒)
def fetch_from_file(state: State) -> State:
"""从文件读取"""
print("[文件] 开始读取...")
time.sleep(1)
state["file_result"] = {
"source": "文件",
"data": ["行1", "行2", "行3"],
"time": "1秒"
}
print("[文件] ✅ 读取完成")
return state


# 节点3:从API调用(模拟3秒)
def fetch_from_api(state: State) -> State:
"""从API调用"""
print("[API] 开始调用...")
time.sleep(3)
state["api_result"] = {
"source": "API",
"data": ["数据A", "数据B"],
"time": "3秒"
}
print("[API] ✅ 调用完成")
return state


# 节点4:汇总结果
def aggregate_results(state: State) -> State:
"""汇总所有数据源的结果"""
print("\n[汇总] 开始汇总所有结果...")

results = []
if "db_result" in state:
results.append(state["db_result"])
if "file_result" in state:
results.append(state["file_result"])
if "api_result" in state:
results.append(state["api_result"])

state["final_result"] = results

print(f"[汇总] ✅ 汇总完成,共{len(results)}个数据源")
return state


# 构建图
def create_graph() -> StateGraph:
"""创建并行查询图"""
graph = StateGraph(State)

# 添加节点
graph.add_node("fetch_db", fetch_from_db)
graph.add_node("fetch_file", fetch_from_file)
graph.add_node("fetch_api", fetch_from_api)
graph.add_node("aggregate", aggregate_results)

# 三个数据源并行执行
graph.set_entry_point("fetch_db")
graph.set_entry_point("fetch_file")
graph.set_entry_point("fetch_api")

# 都完成后汇总
graph.add_edge("fetch_db", "aggregate")
graph.add_edge("fetch_file", "aggregate")
graph.add_edge("fetch_api", "aggregate")

# 汇总后结束
graph.add_edge("aggregate", END)

return graph


def main() -> None:
"""测试并行查询"""
app = create_graph().compile()

print("="*60)
print("并行数据查询演示")
print("="*60)
print("\n提示:三个数据源将并行执行")
print("预计总耗时:约3秒(最慢的API调用时间)\n")

start_time = time.time()

result = app.invoke({"query": "查询用户数据"})

end_time = time.time()
elapsed = end_time - start_time

print(f"\n{'='*60}")
print(f"执行结果")
print(f"{'='*60}")
print(f"实际耗时: {elapsed:.2f}秒")
print(f"数据源数量: {len(result['final_result'])}")

for item in result["final_result"]:
print(f"\n- {item['source']}: {len(item['data'])}条数据")


if __name__ == "__main__":
main()

5. 复杂流程组合

5.1 实战:内容审核系统

需求:

  1. 内容输入
  2. 并行执行:敏感词检测 + AI安全审核
  3. 汇总审核结果
  4. 根据结果决定:通过/人工复审/拒绝

创建 04_content_moderation.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
内容审核系统
学习目标:组合条件边、并行、循环构建复杂流程
"""
from typing import TypedDict, List
from typing_extensions import NotRequired
from langgraph.graph import StateGraph, END


# 定义状态
class State(TypedDict):
content: str
keyword_check: NotRequired[str] # pass/warning/fail
ai_check: NotRequired[str] # pass/warning/fail
final_decision: NotRequired[str] # approved/review/rejected
reason: NotRequired[str]


# 敏感词库(简化版)
SENSITIVE_KEYWORDS = ["违禁词", "敏感内容", "不当言论"]


# 节点1:敏感词检测
def keyword_check(state: State) -> State:
"""检测敏感词"""
print("[敏感词检测] 开始...")
content = state["content"]

found = [kw for kw in SENSITIVE_KEYWORDS if kw in content]

if found:
if len(found) >= 2:
state["keyword_check"] = "fail"
state["reason"] = f"包含{len(found)}个敏感词"
else:
state["keyword_check"] = "warning"
state["reason"] = f"包含敏感词: {found[0]}"
else:
state["keyword_check"] = "pass"

print(f"[敏感词检测] ✅ 结果: {state['keyword_check']}")
return state


# 节点2:AI安全审核(简化版,实际应该调用LLM)
def ai_safety_check(state: State) -> State:
"""AI安全审核"""
print("[AI审核] 开始...")
content = state["content"]

# 简化的规则判断
if len(content) > 100:
state["ai_check"] = "warning"
state["reason"] = "内容过长,建议人工复审"
elif "测试" in content:
state["ai_check"] = "warning"
state["reason"] = "疑似测试内容"
else:
state["ai_check"] = "pass"

print(f"[AI审核] ✅ 结果: {state['ai_check']}")
return state


# 节点3:汇总审核结果
def aggregate_checks(state: State) -> State:
"""汇总审核结果,做出最终决策"""
print("\n[汇总] 分析审核结果...")

keyword = state.get("keyword_check", "pass")
ai = state.get("ai_check", "pass")

# 决策逻辑
if keyword == "fail" or ai == "fail":
decision = "rejected"
reason = "审核不通过"
elif keyword == "warning" or ai == "warning":
decision = "review"
reason = "需要人工复审"
else:
decision = "approved"
reason = "审核通过"

state["final_decision"] = decision
if "reason" not in state or state["reason"] == "":
state["reason"] = reason

print(f"[汇总] ✅ 最终决策: {decision}")
return state


# 节点4:通过处理
def handle_approved(state: State) -> State:
"""处理审核通过的内容"""
print(f"\n✅ 内容已发布")
return state


# 节点5:人工复审
def handle_review(state: State) -> State:
"""转人工复审"""
print(f"\n⚠️ 已转人工复审")
print(f" 原因: {state['reason']}")
return state


# 节点6:拒绝处理
def handle_rejected(state: State) -> State:
"""处理被拒绝的内容"""
print(f"\n❌ 内容已拒绝")
print(f" 原因: {state['reason']}")
return state


# 路由函数:根据决策结果路由
def route_decision(state: State) -> str:
"""根据最终决策路由"""
return state["final_decision"]


# 构建图
def create_graph() -> StateGraph:
"""创建内容审核图"""
graph = StateGraph(State)

# 添加节点
graph.add_node("keyword", keyword_check)
graph.add_node("ai", ai_safety_check)
graph.add_node("aggregate", aggregate_checks)
graph.add_node("approved", handle_approved)
graph.add_node("review", handle_review)
graph.add_node("rejected", handle_rejected)

# 并行执行敏感词检测和AI审核
graph.set_entry_point("keyword")
graph.set_entry_point("ai")

# 都完成后汇总
graph.add_edge("keyword", "aggregate")
graph.add_edge("ai", "aggregate")

# 汇总后根据决策路由
graph.add_conditional_edges(
"aggregate",
route_decision,
{
"approved": "approved",
"review": "review",
"rejected": "rejected"
}
)

# 所有结果都结束
graph.add_edge("approved", END)
graph.add_edge("review", END)
graph.add_edge("rejected", END)

return graph


def main() -> None:
"""测试不同内容的审核"""
app = create_graph().compile()

test_cases = [
"这是一段正常的内容",
"这是一段包含敏感内容的文字",
"这是测试内容,用于测试系统功能",
"这段内容包含违禁词和不当言论,应该被拒绝",
]

print("="*60)
print("内容审核系统")
print("="*60)

for i, content in enumerate(test_cases, 1):
print(f"\n{'='*60}")
print(f"测试 {i}: {content}")
print(f"{'='*60}")

result = app.invoke({"content": content})


if __name__ == "__main__":
main()

6. 流程控制最佳实践

6.1 原则1:保持流程简单

过度复杂:

1
2
3
4
5
A → B → C → D
↓ ↑ ↓
E ← F → G
↓ ↑
H ← I ← J

清晰简洁:

1
2
3
A → B → C → END

D → END

6.2 原则2:明确的退出条件

1
2
3
4
5
6
7
def should_continue(state: State) -> str:
# ✅ 明确的退出条件
if state["count"] >= MAX_ITERATIONS:
return END
if state["success"]:
return END
return "retry"

6.3 原则3:限制循环次数

1
2
3
4
5
# ✅ 总是设置最大循环次数
def check_loop(state: State) -> str:
if state["loop_count"] >= 10: # 防止无限循环
return "exit"
return "continue"

6.4 原则4:并行节点独立

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ❌ 错误:并行节点间有依赖
def node_a(state: State) -> State:
state["shared_var"] = "A"
return state

def node_b(state: State) -> State:
# 依赖node_a的结果,但它们是并行的!
print(state["shared_var"]) # 可能出错
return state

# ✅ 正确:并行节点互不依赖
def node_a(state: State) -> State:
state["result_a"] = "A"
return state

def node_b(state: State) -> State:
state["result_b"] = "B" # 不依赖node_a
return state

6.5 原则5:路由函数要健壮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ❌ 可能返回未定义的值
def bad_router(state: State) -> str:
if state.get("type") == "A":
return "node_a"
elif state.get("type") == "B":
return "node_b"
# 如果都不满足呢?会报错!

# ✅ 总是有默认分支
def good_router(state: State) -> str:
type_map = {
"A": "node_a",
"B": "node_b"
}
return type_map.get(state.get("type"), "default_node")

7. 小结

核心要点

三种边类型:

  • 普通边:固定流转
  • 条件边:动态路由
  • 入口/出口:开始和结束

条件边: 根据状态动态决定下一步

1
graph.add_conditional_edges(source, router_func, mapping)

循环: 通过条件边让流程回到之前的节点

  • 必须有退出条件
  • 限制最大循环次数

并行: 多个节点同时执行,提高效率

  • 并行节点应该互相独立
  • LangGraph会等待所有节点完成

最佳实践:

  1. 保持流程简单
  2. 明确的退出条件
  3. 限制循环次数
  4. 并行节点独立
  5. 路由函数健壮

📚 下一步

现在你已经掌握了复杂的流程控制!但AI应用往往需要调用外部工具和API,比如查询数据库、调用第三方服务等。

下一课: Lesson 05: 工具调用和外部集成

我们将学习:

  • 什么是工具调用(Function Calling)
  • 如何定义和注册工具
  • LLM如何智能选择工具
  • 实战:构建能查天气、计算、搜索的AI助手

📖 参考资料

  • LangGraph条件边文档
  • 流程控制最佳实践