0. 系列闭环(不公开源码也能跟读)
端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 6/17 篇 · Guide 环 · 内外双层图
| 阶段 |
用户可见 |
代码入口 |
对应篇 |
| 建会话 |
欢迎语 |
POST /api/sessions |
09 |
| 多轮对话 |
SSE 流式 |
chat/stream → run_guide_single_turn |
06, 14 |
| 信息充分 |
开始分析 |
_run_analysis_background |
05, 07 |
| 履历解析 |
进度 30% |
run_resume_parser |
12 |
| 画像/RIASEC |
进度 50% |
run_profile_analyzer |
03, 13 |
| 职业匹配 |
进度 70% |
run_career_matcher |
02 |
| 报告 |
进度 90% |
run_reporter |
11 |
| 下载 PDF |
文件 |
GET …/report/pdf |
11, 15 |
|
说明 |
| 读本篇前 |
第 05 篇外层 guide_node |
| 读完本篇 |
画出内层 5 节点子图,并说明 API 走 run_guide_single_turn |
| 下一环 |
第 14 篇:各阶段 Prompt(第 7 篇) |
全系列闭环索引:SERIES-LOOP.md
1. 要解决什么问题
iCan 顶层工作流有 5 个 Agent 节点(Guide → ResumeParser → ProfileAnalyzer → CareerMatcher → Reporter)。如果把 Guide 内部的 5 个对话阶段(欢迎、需求评估、基础采集、深度挖掘、充分性检查)也平铺进同一个 StateGraph,会出现:
- 状态字段膨胀:Guide 的
collected_info、current_stage 与顶层的 structured_profile、final_report 混在同一 TypedDict;
- 改 Guide 牵连全局:调整
check_sufficiency 的路由逻辑,可能误触顶层 route_after_guide;
- 测试成本高:验证「信息不足时回到 dig_deeper」必须跑完四段分析才能隔离。
实际做法是 外层 5 节点 + 内层 Guide 子图:外层 workflow.py 的 guide_node 只做状态映射,内层 agents/guide.py 的 create_guide_graph() 封装 5 个函数节点与条件边。
2. 实现位置:两层状态 + 两层图
| 层级 |
文件 |
状态类型 |
入口 |
| 外层 |
workflow.py |
iCanWorkflowState |
create_workflow() → guide_node |
| 内层 |
agents/guide.py |
GuideState |
create_guide_graph() → run_guide_agent() |
core/state.py 里两套 TypedDict 职责分离:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class GuideState(TypedDict, total=False): conversation_history: list[dict[str, str]] collected_info: dict[str, Any] is_info_sufficient: bool messages: Annotated[list[str], operator.add] current_stage: str
class iCanWorkflowState(TypedDict, total=False): session_id: str conversation_history: list[dict[str, str]] needs_more_info: bool structured_profile: dict[str, Any] final_report: str
|
外层只关心 needs_more_info、conversation_history、raw_input;内层才持有 current_stage、missing_fields、emotion_state。

3. 外层 guide_node:门面,不是 Guide 类
workflow.py 里没有 GuideAgent 类,只有 async 函数 guide_node。它负责 提取 → 调用 → 写回:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| async def guide_node(state: iCanWorkflowState) -> dict: conversation_history = list(state.get("conversation_history", [])) raw_input = state.get("raw_input", "") if raw_input: conversation_history.append({"role": "user", "content": raw_input})
guide_state: GuideState = create_initial_guide_state() guide_state["conversation_history"] = conversation_history
guide_result = await run_guide_agent(guide_state)
guide_messages = guide_result.get("messages", []) latest_reply = guide_messages[-1] if guide_messages else "" updated_history = list(conversation_history) if latest_reply: updated_history.append({"role": "assistant", "content": latest_reply})
is_sufficient = guide_result.get("is_info_sufficient", False) result = { "conversation_history": updated_history, "current_agent": "guide", "needs_more_info": not is_sufficient, } collected_info = guide_result.get("collected_info", {}) if collected_info.get("collected_raw"): result["raw_input"] = collected_info["collected_raw"] return result
|
外层不知道内层有 welcome、dig_deeper 等节点名,只读 is_info_sufficient 和 messages[-1]。
外层循环由 route_after_guide 控制:
1 2 3 4 5 6 7 8 9
| def route_after_guide(state: iCanWorkflowState) -> str: if not state.get("needs_more_info", True): return "resume_parser_node" user_msg_count = len([m for m in state.get("conversation_history", []) if m.get("role") == "user"]) if user_msg_count >= 3: return "resume_parser_node" return "guide_node"
|
4. 内层 create_guide_graph():五节点 + 条件循环
内层图在 agents/guide.py 构建,节点全是 async 函数,不是类方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| def create_guide_graph() -> StateGraph: graph = StateGraph(GuideState) graph.add_node("welcome", welcome) graph.add_node("assess_need", assess_need) graph.add_node("collect_basic_info", collect_basic_info) graph.add_node("dig_deeper", dig_deeper) graph.add_node("check_sufficiency", check_sufficiency)
graph.set_entry_point("welcome") graph.add_edge("welcome", "assess_need") graph.add_edge("assess_need", "collect_basic_info") graph.add_edge("collect_basic_info", "dig_deeper") graph.add_edge("dig_deeper", "check_sufficiency") graph.add_conditional_edges( "check_sufficiency", should_continue, {"dig_deeper": "dig_deeper", "handoff": END}, ) return graph.compile()
|
run_guide_agent 每次调用都会 create_guide_graph() 再 ainvoke,并设 recursion_limit=15:
1 2 3 4
| async def run_guide_agent(state: GuideState) -> dict: graph = create_guide_graph() result = await graph.ainvoke(state, config={"recursion_limit": 15}) return result
|
内层循环由 should_continue 决定:is_info_sufficient=True → handoff(END);否则回到 dig_deeper。另有 loop_count >= 8 时强制 handoff(用 messages 列表长度估算循环次数)。
各内层节点均通过 get_chat_model() + invoke_llm() 调 LLM(见第 8 篇),异常时返回固定话术而不重试模型。
5. 与 API 路径的差异:子图并非所有入口都走
这是理解嵌套架构的关键:HTTP 对话 API 默认不走内层 5 节点图。
| 入口 |
调用链 |
是否走 create_guide_graph |
顶层 run_workflow() |
guide_node → run_guide_agent |
是 |
POST /api/sessions/.../chat |
run_guide_chat → run_guide_single_turn |
否(单轮 LLM) |
POST .../chat/stream |
直接 model.astream + 关键词充分性判断 |
否 |
workflow.py 的 run_guide_chat 明确走单轮模式:
1 2 3 4
| async def run_guide_chat(conversation_history: list, user_message: str) -> dict: from ican.agents.guide import run_guide_single_turn result = await run_guide_single_turn(conversation_history, user_message)
|
因此:嵌套子图服务于批处理式顶层 workflow;线上逐轮聊天用的是 run_guide_single_turn 或 SSE 流式,逻辑与内层 check_sufficiency(LLM 判 sufficient/insufficient)并不相同。
6. 在流水线中的位置
完整顶层边(create_workflow):
1 2 3 4
| guide_node → route_after_guide ├─ needs_more_info → guide_node(循环) └─ 充分 / 强制退出 → resume_parser_node → profile_analyzer_node → career_matcher_node → reporter_node → END
|
内层一次 ainvoke 会顺序跑完 welcome → … → check_sufficiency,必要时在 dig_deeper ↔ check_sufficiency 间循环。外层每次 guide_node 被调度时,都会 create_initial_guide_state() 并从 welcome 重新开始——这在「无用户实时输入、一次性跑完 workflow」场景下会重复生成欢迎语,属于设计取舍而非 LangGraph 框架限制。
其余四个 Agent(resume_parser、profile_analyzer 等)同样是 外层节点函数 + 内层 run_* 子图/流水线,模式与 Guide 一致,但内层节点数不同;顶层文件只有 workflow.py 一处 create_workflow()。
7. 踩坑
① 注释写「最多循环 2 次」,代码不是 2
should_continue 注释写「最多循环2次」,实际用 loop_count >= 8;外层 route_after_guide 用 user_msg_count >= 3 强制进入分析。写文档或改需求时以 grep 为准,不要抄 docstring。
② 每次 run_guide_agent 重新 compile 图
create_guide_graph() 在每次 run_guide_agent 调用时执行 graph.compile(),没有模块级缓存。Guide 调用频繁时可以考虑缓存 compiled graph,当前 MVP 未做。
③ 外层 guide_node 每次从 welcome 起跑
create_initial_guide_state() 固定 current_stage="greeting",内层入口始终是 welcome。若外层 route_after_guide 多次回到 guide_node,会重复走欢迎节点——批跑 workflow 时要注意;线上 API 因走 run_guide_single_turn 不受此影响。
④ messages reducer 与 conversation_history 双轨
内层 AI 回复进 GuideState.messages(Annotated add);外层持久化用 conversation_history(role/content 字典列表)。guide_node 只把 messages[-1] 映射进 history,中间节点产生的多条 message 不会全部进入外层。
8. 小结
- 嵌套结构:**外层
iCanWorkflowState + guide_node,内层 GuideState + create_guide_graph()**,实现为函数节点而非 Agent 类。
- 外层门面只做字段映射;内层 5 节点 +
should_continue 负责对话阶段与 dig_deeper 循环。
- API 聊天走
run_guide_single_turn,不经过内层子图;子图主要用于 run_workflow / guide_node 路径。
- 双层各有一套退出条件(内层
loop_count/LLM 充分性,外层 needs_more_info/用户轮数),调试时要分清是哪一层在循环。
- 改 Guide 行为先确认改的是子图节点还是单轮/API 流式路径。
下一篇:LangGraph 错误处理与容错(workflow.py 各节点 except、run_analysis_pipeline 降级)。
附录:关键源码(逐行注释)
以下代码摘自 iCan 实现,每行上方均有中文注释,不公开仓库也可跟读。
生成命令:python3 bin/build-ican-annotated-snippets.py
create_guide_graph
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
def create_guide_graph() -> StateGraph:
try:
logger.info("[create_guide_graph] 开始创建对话引导 Agent 的 StateGraph")
graph = StateGraph(GuideState)
graph.add_node("welcome", welcome)
graph.add_node("assess_need", assess_need)
graph.add_node("collect_basic_info", collect_basic_info)
graph.add_node("dig_deeper", dig_deeper)
graph.add_node("check_sufficiency", check_sufficiency)
graph.set_entry_point("welcome")
graph.add_edge("welcome", "assess_need")
graph.add_edge("assess_need", "collect_basic_info")
graph.add_edge("collect_basic_info", "dig_deeper")
graph.add_edge("dig_deeper", "check_sufficiency")
graph.add_conditional_edges(
"check_sufficiency",
should_continue,
{
"dig_deeper": "dig_deeper",
"handoff": END,
},
)
compiled_graph = graph.compile()
logger.info("[create_guide_graph] StateGraph 创建并编译完成")
return compiled_graph
except Exception as e:
logger.error("[create_guide_graph] 创建 StateGraph 异常: %s", e, exc_info=True)
raise
|
外层 guide_node 门面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
async def guide_node(state: iCanWorkflowState) -> dict:
try:
logger.info(
"[guide_node] 开始执行,入参: session_id=%s, conversation_history长度=%d, raw_input长度=%d",
state.get("session_id"),
len(state.get("conversation_history", [])),
len(state.get("raw_input", "")),
)
conversation_history = list(state.get("conversation_history", []))
raw_input = state.get("raw_input", "")
if raw_input:
conversation_history.append({"role": "user", "content": raw_input})
guide_state: GuideState = create_initial_guide_state()
guide_state["conversation_history"] = conversation_history
guide_result = await run_guide_agent(guide_state)
guide_messages = guide_result.get("messages", [])
latest_reply = guide_messages[-1] if guide_messages else ""
updated_history = list(conversation_history)
if latest_reply:
updated_history.append({"role": "assistant", "content": latest_reply})
is_sufficient = guide_result.get("is_info_sufficient", False)
collected_info = guide_result.get("collected_info", {})
result = {
"conversation_history": updated_history,
"current_agent": "guide",
"needs_more_info": not is_sufficient,
}
if collected_info:
raw_collected = collected_info.get("collected_raw", "")
if raw_collected:
result["raw_input"] = raw_collected
logger.info(
"[guide_node] 执行完成,出参: is_sufficient=%s, needs_more_info=%s, conversation_history长度=%d",
is_sufficient,
not is_sufficient,
len(updated_history),
)
return result
|
run_guide_single_turn(API 实际路径)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
async def run_guide_single_turn(conversation_history: list, user_message: str) -> dict:
try:
logger.info("[run_guide_single_turn] 开始执行,用户消息长度=%d, 历史长度=%d", len(user_message), len(conversation_history))
messages = [
{"role": "system", "content": GUIDE_SYSTEM_PROMPT},
]
for msg in conversation_history:
messages.append(msg)
messages.append({"role": "user", "content": user_message})
model = get_chat_model()
reply = await invoke_llm(model, messages)
all_user_text = user_message
for msg in conversation_history:
if msg.get("role") == "user":
all_user_text += " " + msg.get("content", "")
optional_keywords = ["年", "行业", "岗位", "职位", "技能", "经验", "公司", "专业", "学历", "方向", "期望", "困惑",
"工作", "开发", "工程师", "经理", "运营", "产品", "设计", "数据", "架构", "管理",
"经历", "项目", "负责", "参与", "大学", "本科", "硕士", "博士"]
found_keywords = [kw for kw in optional_keywords if kw in all_user_text]
is_sufficient = (
(len(found_keywords) >= 6) or
(len(found_keywords) >= 4 and len(all_user_text) >= 50)
)
collected_info = {"collected_raw": all_user_text}
logger.info("[run_guide_single_turn] 执行完成,is_sufficient=%s, found_keywords=%s", is_sufficient, found_keywords)
return {
"reply": reply or "",
"is_info_sufficient": is_sufficient,
"collected_info": collected_info,
}
except Exception as e:
logger.error("[run_guide_single_turn] 单轮对话异常: %s", e, exc_info=True)
return {
|
系列导航
← 返回 iCan 专题