0. 系列闭环(不公开源码也能跟读)

端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 4/17 篇 · 数据环 · 状态 TypedDict

阶段 用户可见 代码入口 对应篇
建会话 欢迎语 POST /api/sessions 09
多轮对话 SSE 流式 chat/stream → run_guide_single_turn 06, 14
信息充分 开始分析 _run_analysis_background 05, 07
履历解析 进度 30% run_resume_parser 12
画像/RIASEC 进度 50% run_profile_analyzer 03, 13
职业匹配 进度 70% run_career_matcher 02
报告 进度 90% run_reporter 11
下载 PDF 文件 GET …/report/pdf 11, 15
说明
读本篇前 第 02 篇各 node 返回值
读完本篇 区分外层 iCanWorkflowState 与内层 GuideState
下一环 第 05 篇:用 needs_more_info 做路由(第 5 篇)

全系列闭环索引:SERIES-LOOP.md

一、LangGraph 的状态传递机制

LangGraph 的核心理念是状态驱动。每个节点接收 core/state.py 里定义的 TypedDict,处理完后返回部分字段更新,LangGraph 自动合并到全局状态。

1
2
3
4
5
节点 A 接收 state → 处理 → 返回 {"field_a": "value_a"}

LangGraph 自动合并到全局 state

节点 B 接收更新后的 state → 处理 → 返回 {"field_b": "value_b"}

这个机制的关键问题是:如何合并?

外层与内层 State 分层

二、TypedDict 定义 Agent 状态

实现位置:core/state.py。LangGraph 使用 Python 的 TypedDict 定义状态结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# core/state.py — iCanWorkflowState 节选
from typing import Annotated, Any
from typing_extensions import TypedDict
import operator

class iCanWorkflowState(TypedDict, total=False):
session_id: str
user_id: str
conversation_history: list[dict[str, str]]
raw_input: str
structured_profile: dict[str, Any]
personal_profile: dict[str, Any]
career_matches: list[dict[str, Any]]
final_report: str
current_agent: str
needs_more_info: bool
workflow_messages: Annotated[list[str], operator.add]

total=False 表示所有字段都是可选的(节点可以只返回需要更新的字段)。

三、Annotated[list, operator.add] — Reducer 详解

默认行为:覆盖

如果没有 Reducer,LangGraph 的默认行为是新值覆盖旧值

1
2
3
4
5
6
7
8
# 节点 A 返回
{"current_agent": "guide"}

# 节点 B 返回
{"current_agent": "resume_parser"}

# 最终 state
{"current_agent": "resume_parser"} # ← B 的值覆盖了 A 的值

这对于 current_agentneeds_more_info 这类”只有一个最新值”的字段是正确的。

Reducer 行为:累积

1
2
3
4
5
# core/state.py — 内层 GuideState
messages: Annotated[list[str], operator.add]

# core/state.py — 外层 iCanWorkflowState
workflow_messages: Annotated[list[str], operator.add]

Annotated[list[str], operator.add] 告诉 LangGraph:这个字段用 operator.add(列表 +)合并。

1
2
3
4
5
6
7
8
# 节点 A 返回
{"messages": ["你好!"]}

# 节点 B 返回
{"messages": ["能说说你的困惑吗?"]}

# 最终 state
{"messages": ["你好!", "能说说你的困惑吗?"]} # ← 累积合并

为什么 Guide 内层 messages 必须用 Reducer

agents/guide.py 的多轮子图里,welcome / assess_need / collect_basic_info / dig_deeper 每个节点都会 return {"messages": [reply]}。如果不用 Reducer:

1
2
3
4
5
welcome 输出:   messages = ["你好!我是小C"]
assess_need 输出: messages = ["能具体说说你的困惑吗?"]

不用 Reducer → 最终只有 ["能具体说说你的困惑吗?"] ← 第一条丢了!
用 Reducer → 最终是 ["你好!我是小C", "能具体说说你的困惑吗?"] ✅ 都在

四、如何选择覆盖 vs 累积

选择原则

字段特征 使用方式 例子
只有一个最新值 直接赋值(覆盖) current_agent, needs_more_info
需要历史记录 Annotated + operator.add GuideState.messages, workflow_messages
逐步填充的字典 直接赋值(覆盖整个 dict) structured_profile, personal_profile
累积的列表 Annotated + operator.add messages(内层), workflow_messages(外层)

常见错误

错误 1:conversation_history 用了 Reducer 但它是 dict 列表

1
2
3
4
5
# ❌ 错误写法:conversation_history 不需要 Reducer
conversation_history: Annotated[list[dict], operator.add]

# ✅ 正确写法:由节点手动管理整个列表
conversation_history: list[dict[str, str]]

原因:conversation_history 包含 user 和 assistant 的消息,需要手动控制追加顺序(先 user 后 assistant),不能让 LangGraph 自动合并。

错误 2:dict 类型用了 Reducer

1
2
3
4
5
# ❌ 错误写法:dict 不能用 operator.add
collected_info: Annotated[dict, operator.add]

# ✅ 正确写法:直接覆盖
collected_info: dict[str, Any]

五、分层状态设计

iCan 项目在 core/state.py 采用外层 + 内层分层:顶层 iCanWorkflowState 与各 Agent 的 GuideState / ProfileAnalysisState / CareerMatchState / ReporterState 等。

注意PlannerState 也在 core/state.py 中定义,但 workflow.py 尚未接入 Planner 节点,勿在状态流转图里画第六段 Agent。

外层状态(core/state.pyiCanWorkflowState

1
2
3
4
5
6
7
8
class iCanWorkflowState(TypedDict, total=False):
session_id: str
conversation_history: list[dict] # 完整对话历史(节点手动 append,无 Reducer)
structured_profile: dict # resume_parser_node 输出
personal_profile: dict # profile_analyzer_node 输出
career_matches: list[dict] # career_matcher_node 输出
needs_more_info: bool # route_after_guide 路由标志
workflow_messages: Annotated[list[str], operator.add]

内层状态(core/state.pyGuideState

1
2
3
4
5
6
7
class GuideState(TypedDict, total=False):
conversation_history: list[dict]
collected_info: dict
messages: Annotated[list, operator.add] # AI 回复累积
current_stage: str
is_info_sufficient: bool
emotion_state: str # 仅内层,不泄漏到外层

为什么分层

  1. 职责隔离:Guide 的内部字段(如 emotion_statemissing_fields)定义在 GuideState,不会进入 iCanWorkflowState
  2. 可独立测试run_guide_agent(guide_state) / run_profile_analyzer(analyzer_state) 可脱离顶层图单测
  3. 数据转换workflow.py*_node 手动做内外层映射

数据转换示例(对照 workflow.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# workflow.py — guide_node
async def guide_node(state: iCanWorkflowState) -> dict:
conversation_history = list(state.get("conversation_history", []))
if state.get("raw_input"):
conversation_history.append({"role": "user", "content": state["raw_input"]})

guide_state = create_initial_guide_state() # core/state.py
guide_state["conversation_history"] = conversation_history
guide_result = await run_guide_agent(guide_state) # agents/guide.py

updated_history = list(conversation_history)
if guide_result.get("messages"):
updated_history.append({"role": "assistant", "content": guide_result["messages"][-1]})

return {
"conversation_history": updated_history,
"needs_more_info": not guide_result.get("is_info_sufficient", False),
"current_agent": "guide",
}

profile_analyzer_node 同理:从外层取 structured_profile,构造 ProfileAnalysisState,调 run_profile_analyzer(),再把结果组装进 personal_profile

六、其他 Reducer 的用法

除了 operator.add,还可以用其他 Reducer:

1
2
3
4
5
6
7
8
9
10
11
from typing import Annotated
import operator

# 集合合并(去重)
tags: Annotated[set[str], operator.or_]

# 总是取最新值(等价于默认行为)
latest_value: Annotated[str, lambda old, new: new]

# 自定义合并逻辑(保留最大值)
max_score: Annotated[float, lambda old, new: max(old or 0, new)]

七、踩坑记录

  1. **外层误用 messages**:只有 GuideState.messages 带 Reducer;顶层是 workflow_messages。审计脚本与日志排查都应以 core/state.py 为准。
  2. conversation_history 不要加 Reducer:user/assistant 顺序由 workflow.pyguide_node 手动 append,自动合并会破坏对话结构。
  3. **ProfileAnalysisState.analysis_messages**:内层分析过程消息可累积,但外层只取结构化字段(ability_modelriasec_scores 等),不要整包 return guide_result
  4. **初始 needs_more_info**:create_initial_workflow_state() 默认为 False,首次进入 guide_node 后才会被设为 True/False;写测试时注意初始值。

八、状态污染的防范

问题场景

如果 Guide Agent 的内部状态(如 emotion_state)意外出现在外层状态中,下游节点可能会错误地读取它。

防范措施

  1. 类型校验:TypedDict 严格定义每个状态的字段,未定义的字段不会出现
  2. 手动映射:节点函数只返回需要更新的字段,不透传无关字段
  3. 独立状态类型:每个 Agent 有自己的 TypedDict,编译时检查字段

九、小结

LangGraph 状态管理在 iCan 中的落地要点:

  • 所有 TypedDict 集中在 core/state.py
  • 默认覆盖,适用于 current_agentneeds_more_info 等单值字段
  • Annotated + Reducer 用于 GuideState.messagesiCanWorkflowState.workflow_messages
  • 分层 + 手动映射workflow.py*_node 是内外层的唯一转换层
  • PlannerState 已定义未接入,扩展时勿与现有五段流水线混淆

下一篇:workflow.py 的条件路由 — route_after_guide 与 Guide 内层 should_continue 如何配合。


← 返回 iCan 专题