0. 系列闭环(不公开源码也能跟读)

端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 9/17 篇 · 接入环 · SSE / 进度

阶段 用户可见 代码入口 对应篇
建会话 欢迎语 POST /api/sessions 09
多轮对话 SSE 流式 chat/stream → run_guide_single_turn 06, 14
信息充分 开始分析 _run_analysis_background 05, 07
履历解析 进度 30% run_resume_parser 12
画像/RIASEC 进度 50% run_profile_analyzer 03, 13
职业匹配 进度 70% run_career_matcher 02
报告 进度 90% run_reporter 11
下载 PDF 文件 GET …/report/pdf 11, 15
说明
读本篇前 第 07 篇 run_analysis_pipeline
读完本篇 跟读 chat/stream 与 _run_analysis_background 时序
下一环 第 10 篇:结果写入 workflow_data(第 10 篇)

全系列闭环索引:SERIES-LOOP.md

1. 要解决什么问题

iCan 前端有两类实时需求:

  1. Guide 阶段:用户发消息后希望逐字看到 AI 回复(打字机效果);
  2. 分析阶段:信息充足后后台跑 run_analysis_pipeline(parse → analyze → match → report),前端需要知道「进行到哪一步」。

如果把 LLM 流式和分析进度混在同一条 HTTP 长连接里,协议和超时策略会纠缠。当前实现是 SSE 负责 LLM 输出,内存字典 + 轮询负责分析进度,WebSocket 基础设施已搭好但进度推送尚未接入主路径


2. 实现位置

模块 职责
api/routes/chat.py 对话 CRUD、SSE 流式、run_guide_chat、后台分析任务
workflow.py run_guide_chat(单轮引导)、run_analysis_pipeline(四段分析)
api/ws_manager.py ConnectionManager:connect / send_progress / send_completed / send_error
api/routes/ws.py WebSocket 端点 /ws/{session_id}

路由前缀:chat.py 的 router 为 prefix="/api/sessions"


SSE 流式与 WebSocket 进度


3. 非流式对话:run_guide_chat

普通对话走两个 POST 端点,内部都调 workflow.run_guide_chatagents/guide.run_guide_single_turn(单轮 LLM,不走 Guide 内层 5 节点子图,见第 6 篇):

端点 场景
POST /api/sessions 创建会话 + 首条消息
POST /api/sessions/{session_id}/chat 后续多轮

is_info_sufficient=True 时,chat.py 调用 _set_status(session_id, "processing", "parse")asyncio.create_task(_run_analysis_background(...)),HTTP 响应立即返回 stage="processing",分析在后台跑。

超时:asyncio.wait_for(..., timeout=90) 包住 run_guide_chat,超时返回固定提示而不抛 500。


4. SSE 流式:POST /api/sessions/{session_id}/chat/stream

这是 Guide 阶段打字机效果的唯一 SSE 入口。返回 StreamingResponse(..., media_type="text/event-stream")

4.1 Guide 模式(报告尚未生成)

流程:生产者-消费者 + asyncio.Queue(不用线程 queue.Queue)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# api/routes/chat.py — event_generator 核心(节选)
async def _stream_worker():
async for chunk in model.astream(processed):
if chunk.content:
await chunk_queue.put(chunk.content)

task = asyncio.create_task(_stream_worker())

while True:
chunk = await asyncio.wait_for(chunk_queue.get(), timeout=120)
if chunk is None:
break
yield f"data: {json.dumps({'type': 'chunk', 'content': chunk})}\n\n"

# 流结束后 cancel worker
if not task.done():
task.cancel()

消息体:GUIDE_SYSTEM_PROMPT + 已有 conversation_history + 当前用户消息;经 _inject_no_think 处理(Ollama Qwen3,见第 8 篇)。

SSE 事件类型:

type 含义
chunk LLM 增量文本,content 字段
done 流结束,stageguideprocessingis_sufficient 布尔
error 超时或异常

流结束后在服务端做 关键词充分性判断(与 run_guide_single_turn 相同规则:found_keywords >= 6>=4 且文本>=50),写库 conversation_history / collected_info。若充分,_set_status(..., "parse")create_task(_run_analysis_background)done 事件里 stage=processing

4.2 QA 模式(final_report 已存在)

workflow_data.final_report 非空,chat/stream 切换为报告答疑:system prompt 注入 personal_profilecareer_matches JSON 摘要,同样 astream + Queue,但 donestage=qa,并写入 qa_history

4.3 并发保护

_workflow_status[session_id].status == "processing" 时,chat/stream 返回 409「正在处理中」;已有报告时 Guide 流式不可用,走 QA 分支。


5. 分析进度:内存 + 定时器 + 轮询(非 WebSocket 主路径)

进度状态在 chat.py 模块级字典 _workflow_status 维护,并通过 _save_status_to_db 异步写入 workflow_data.status

1
2
3
4
5
6
7
8
9
10
11
12
13
STAGE_PROGRESS = {"guide": 0.1, "parse": 0.3, "analyze": 0.5,
"match": 0.7, "report": 0.9, "completed": 1.0}

def _set_status(session_id, status, stage, reply=None):
data = {
"status": status,
"stage": stage,
"progress": STAGE_PROGRESS.get(stage, 0.0),
"message": STAGE_LABELS.get(stage, stage),
"reply": reply,
}
_workflow_status[session_id] = data
# ensure_future → _save_status_to_db

后台任务 _run_analysis_background 调用 run_analysis_pipeline,同时启动 _update_progress_periodically

1
2
3
4
5
6
async def _update_progress_periodically(session_id: str):
for stage in ["parse", "analyze", "match", "report"]:
await asyncio.sleep(20) # 每 20 秒切一个 stage
if _workflow_status.get(session_id, {}).get("status") in ("completed", "failed"):
return
_set_status(session_id, "processing", stage)

前端轮询 **GET /api/sessions/{session_id}/status**:优先读内存 _workflow_status,miss 时从 DB workflow_data.status 恢复。

api/routes/report_gen.py 独立上传简历生成报告时,复用同一套 _set_status / _update_progress_periodically(从 chat.py import)。


6. WebSocket:ws_manager 现状

6.1 连接端点

1
2
3
4
5
6
7
8
9
10
# api/routes/ws.py
@router.websocket("/ws/{session_id}")
async def websocket_endpoint(ws: WebSocket, session_id: str):
await ws.accept()
await ws_manager.connect(session_id, ws)
try:
while True:
await ws.receive_text() # 保持连接,客户端可发心跳
except WebSocketDisconnect:
ws_manager.disconnect(session_id, ws)

api/ws_manager.pyConnectionManagersession_id 维护连接列表,提供:

1
2
3
4
5
6
7
8
9
10
async def send_progress(self, session_id, stage, progress, message):
data = json.dumps({"type": "progress", "stage": stage,
"progress": progress, "message": message})
# 遍历 active[session_id],失败只 warning

async def send_completed(self, session_id, reply=""):
data = json.dumps({"type": "completed", "reply": reply})

async def send_error(self, session_id, error: str):
data = json.dumps({"type": "error", "error": error})

6.2 实际调用点

grep 全仓库:**send_progress / send_completed 未被 chat.py_run_analysis_background 调用**。
唯一生产路径调用是 workflow.run_analysis_pipeline 顶层 except 里的 await ws_manager.send_error(session_id, str(e))

结论:WebSocket 管理器已就绪,当前前端进度应依赖 GET .../status 轮询;若要用 WS 推进度,需在 _set_status_run_analysis_background 各 stage 处补 await ws_manager.send_progress(...),完成时补 send_completed


7. 数据流总览

1
2
3
4
5
6
7
8
9
10
11
12
13
用户消息
├─ POST /chat ────────── run_guide_chat ──► 一次性 JSON 回复
└─ POST /chat/stream ─── model.astream ──► SSE chunk/done

└─ is_sufficient ──► create_task(_run_analysis_background)

├─ run_analysis_pipeline (workflow.py)
├─ _update_progress_periodically (定时改 stage)
└─ _set_status → 内存 + DB

前端 GET /status ─────────────────────────────┘

WebSocket /ws/{id} ── 已连接,进度 push 未接线(仅 pipeline 异常 send_error)

分析完成后 _run_analysis_background 合并 final_reportpersonal_profile 等到 workflow_data_set_status(..., "completed", "completed", reply="报告生成完成!")


8. 踩坑

① WS 进度方法与业务未连接
send_progress / send_completedws_manager.py 已实现,但 _run_analysis_background 只用 _set_status。文档或前端若假设「连 WS 就能收 progress 事件」,会对不上源码。

② 进度是定时模拟,不是 Agent 真实回调
_update_progress_periodically 每 20 秒换一个 stage 名称,与 run_resume_parser 等是否完成无关。长耗时步骤可能 stage 已显示 report 但 Reporter 仍在跑。

③ SSE 与 REST 各有一套充分性逻辑
chat/stream 在流结束后本地算关键词;POST /chatrun_guide_single_turn。两者规则一致,但与内层 check_sufficiency(LLM 判 sufficient)不同——勿与第 6 篇子图逻辑混为一谈。

④ 生产者必须用 asyncio.create_task
_stream_worker 若直接 await 会阻塞 event_generator 无法 yield;Queue 必须用 asyncio.Queue。消费者退出时要 task.cancel(),否则 worker 可能泄漏。

connect 是 async,disconnect 是 sync
ws_manager.connect 为 async 函数(虽内部无异步 IO);disconnect 为普通 def。扩展时注意接口一致性。


9. 小结

  • SSE 端点POST /api/sessions/{session_id}/chat/stream;事件 chunk / done / error;报告生成后同一端点切 QA 模式。
  • 非流式POST /api/sessionsPOST .../chat,经 workflow.run_guide_chat
  • 分析进度_workflow_status + _set_status + 定时 _update_progress_periodically;前端 GET /api/sessions/{session_id}/status 轮询。
  • WebSocket/ws/{session_id} 可连;ws_manager 进度/完成方法 尚未接入 主流程,仅 pipeline 异常 send_error
  • 扩展实时能力时,优先在 _set_status 旁补 WS 推送,或改为 Agent 节点内显式回调,避免继续依赖纯定时 stage。

下一篇:SQLAlchemy 自动迁移(db/session.py)。


附录:关键源码(逐行注释)

以下代码摘自 iCan 实现,每行上方均有中文注释,不公开仓库也可跟读。
生成命令:python3 bin/build-ican-annotated-snippets.py

SSE 端点入口 chat_stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# ========== SSE 端点入口 chat_stream ==========
# 源文件: api/routes/chat.py 行 257-275

# L257: 装饰器
@router.post("/{session_id}/chat/stream")
# L258: 异步函数 chat_stream:可被 await,适合 IO 型 LLM/DB 调用
async def chat_stream(session_id: str, request: ChatRequest, user_id: str = Depends(get_current_user)):
# L259: 开始 try 块,后续 except 负责兜底
try:
# L260: 赋值:更新局部变量或 state 字段
session_data = repository.get_session(session_id)
# L261: 条件分支
if not session_data:
# L262: 向上抛出异常,由调用方或 LangGraph 处理
raise HTTPException(status_code=404, detail=f"会话不存在: {session_id}")

# L264: 条件分支
if user_id and session_data.get("user_id") != user_id:
# L265: 向上抛出异常,由调用方或 LangGraph 处理
raise HTTPException(status_code=403, detail="无权访问此会话")

# L267: 赋值:更新局部变量或 state 字段
current_status = _workflow_status.get(session_id, {}).get("status")
# L268: 条件分支
if current_status == "processing":
# L269: 向上抛出异常,由调用方或 LangGraph 处理
raise HTTPException(status_code=409, detail="正在处理中,请稍后")

# L271: JSON 字段:存对话历史、中间结果、final_report 等
workflow_data = session_data.get("workflow_data") or {}
# L272: JSON 字段:存对话历史、中间结果、final_report 等
final_report = workflow_data.get("final_report", "")
# L273: JSON 字段:存对话历史、中间结果、final_report 等
personal_profile = workflow_data.get("personal_profile") or workflow_data.get("structured_profile") or {}
# L274: JSON 字段:存对话历史、中间结果、final_report 等
career_matches = workflow_data.get("career_matches") or []

_run_analysis_background

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# ========== _run_analysis_background ==========
# 源文件: api/routes/chat.py 行 495-561

# L495: 异步函数 _run_analysis_background:可被 await,适合 IO 型 LLM/DB 调用
async def _run_analysis_background(session_id: str, conversation_history: list, collected_info: dict, user_id: str):
# L496: 开始 try 块,后续 except 负责兜底
try:
# L497: 执行该语句(细节见上文业务描述)
_set_status(session_id, "processing", "parse")

# L499: 赋值:更新局部变量或 state 字段
progress_task = asyncio.create_task(_update_progress_periodically(session_id))

# L501: 导入依赖模块
from ican.workflow import run_analysis_pipeline
# L502: HTTP 主分析链:parse→analyze→match→report,跳过顶层 guide 环
workflow_result = await run_analysis_pipeline(
# L503: 多轮对话列表,元素为 {role, content}
conversation_history=conversation_history,
# L504: 赋值:更新局部变量或 state 字段
collected_info=collected_info,
# L505: 赋值:更新局部变量或 state 字段
session_id=session_id,
# L506: 赋值:更新局部变量或 state 字段
user_id=user_id,
# L507: 执行该语句(细节见上文业务描述)
)

# L509: 执行该语句(细节见上文业务描述)
progress_task.cancel()
# L510: 开始 try 块,后续 except 负责兜底
try:
# L511: 执行该语句(细节见上文业务描述)
await progress_task
# L512: 捕获异常,避免整图/整请求崩溃
except asyncio.CancelledError:
# L513: 执行该语句(细节见上文业务描述)
pass

# L515: 赋值:更新局部变量或 state 字段
final_report = workflow_result.get("final_report", "")

# L517: 开始 try 块,后续 except 负责兜底
try:
# L518: 持久化会话/报告到 SQLite(经 repository 层)
repository.save_report(
# L519: 赋值:更新局部变量或 state 字段
session_id=session_id,
# L520: 赋值:更新局部变量或 state 字段
report_type="comprehensive",
# L521: 赋值:更新局部变量或 state 字段
report_content=final_report,
# L522: 赋值:更新局部变量或 state 字段
user_id=user_id,
# L523: 执行该语句(细节见上文业务描述)
)
# L524: 捕获异常,避免整图/整请求崩溃
except Exception as db_err:
# L525: 记录日志,便于线上排查节点入参/出参
logger.warning("[_run_analysis_background] 保存报告到数据库异常: %s", db_err)

# L527: 赋值:更新局部变量或 state 字段
session_data = repository.get_session(session_id) or {}
# L528: JSON 字段:存对话历史、中间结果、final_report 等
old_workflow = session_data.get("workflow_data", {}) if isinstance(session_data, dict) else {}
# L529: 赋值:更新局部变量或 state 字段
merged_workflow = dict(old_workflow)
# L530: 执行该语句(细节见上文业务描述)
merged_workflow.update({
# L531: 执行该语句(细节见上文业务描述)
"final_report": final_report,
# L532: 执行该语句(细节见上文业务描述)
"structured_profile": workflow_result.get("structured_profile", {}),
# L533: 执行该语句(细节见上文业务描述)
"personal_profile": workflow_result.get("personal_profile", {}),
# L534: 执行该语句(细节见上文业务描述)
"career_matches": workflow_result.get("career_matches", []),
# L535: 执行该语句(细节见上文业务描述)
})

# L537: 持久化会话/报告到 SQLite(经 repository 层)
repository.save_session(
# L538: 赋值:更新局部变量或 state 字段
session_id=session_id,
# L539: 赋值:更新局部变量或 state 字段
user_id=user_id,
# L540: 赋值:更新局部变量或 state 字段
status="completed",
# L541: 赋值:更新局部变量或 state 字段
current_stage="completed",
# L542: JSON 字段:存对话历史、中间结果、final_report 等
workflow_data=merged_workflow,
# L543: 执行该语句(细节见上文业务描述)
)

# L545: 条件分支
if final_report:
# L546: 执行该语句(细节见上文业务描述)
repository.save_career_report(
# L547: 赋值:更新局部变量或 state 字段
session_id=session_id,
# L548: 赋值:更新局部变量或 state 字段
report_data={"report_type": "full", "full_report": {"content": final_report}},
# L549: 执行该语句(细节见上文业务描述)
)

# L551: 赋值:更新局部变量或 state 字段
personal_profile = workflow_result.get("personal_profile", {})
# L552: 条件分支
if personal_profile:
# L553: 赋值:更新局部变量或 state 字段
repository.save_user_profile(session_id=session_id, profile_data=personal_profile)

# L555: 赋值:更新局部变量或 state 字段
_set_status(session_id, "completed", "completed", reply="报告生成完成!")
# L556: 记录日志,便于线上排查节点入参/出参
logger.info("[_run_analysis_background] 分析完成: session_id=%s, 报告长度=%d", session_id, len(final_report))

# L558: 捕获异常,避免整图/整请求崩溃
except Exception as e:
# L559: 记录日志,便于线上排查节点入参/出参
logger.error("[_run_analysis_background] 分析异常: session_id=%s, error=%s", session_id, e, exc_info=True)
# L560: 赋值:更新局部变量或 state 字段
_set_status(session_id, "failed", "failed", reply=f"报告生成失败: {str(e)}")
# L561: 执行该语句(细节见上文业务描述)
repository.update_session_status(session_id, "failed", "failed")

run_guide_chat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# ========== run_guide_chat ==========
# 源文件: workflow.py 行 558-600

# L558: 异步函数 run_guide_chat:可被 await,适合 IO 型 LLM/DB 调用
async def run_guide_chat(conversation_history: list, user_message: str) -> dict:
# L560: 【文档】运行单轮对话引导(不触发分析流程)
# L562: 【文档】功能描述:
# L563: 【文档】调用 Guide Agent 的单轮对话模式,快速返回 AI 回复。
# L564: 【文档】根据对话历史和用户新消息,判断信息是否充分。
# L566: 【文档】入参:
# L567: 【文档】conversation_history (list): 已有对话历史
# L568: 【文档】user_message (str): 用户最新消息
# L570: 【文档】出参:
# L571: 【文档】dict: 包含 reply、is_info_sufficient、conversation_history(更新后)
# (L559-572 为函数/模块文档字符串,已转为注释便于阅读)
# L573: 开始 try 块,后续 except 负责兜底
try:
# L574: 多轮对话列表,元素为 {role, content}
logger.info("[run_guide_chat] 开始单轮对话,历史长度=%d, 消息长度=%d", len(conversation_history), len(user_message))

# L576: 导入依赖模块
from ican.agents.guide import run_guide_single_turn
# L577: API 单轮 Guide:不跑内层 5 节点子图,一次 LLM 回复
result = await run_guide_single_turn(conversation_history, user_message)

# L579: 多轮对话列表,元素为 {role, content}
updated_history = list(conversation_history)
# L580: 执行该语句(细节见上文业务描述)
updated_history.append({"role": "user", "content": user_message})
# L581: 条件分支
if result.get("reply"):
# L582: 执行该语句(细节见上文业务描述)
updated_history.append({"role": "assistant", "content": result["reply"]})

# L584: 返回本节点要合并进 state 的字段(LangGraph 会 merge)
return {
# L585: 执行该语句(细节见上文业务描述)
"reply": result.get("reply", ""),
# L586: Guide 判定用户信息是否足够进入分析阶段
"is_info_sufficient": result.get("is_info_sufficient", False),
# L587: 执行该语句(细节见上文业务描述)
"collected_info": result.get("collected_info", {}),
# L588: 多轮对话列表,元素为 {role, content}
"conversation_history": updated_history,
# L589: 执行该语句(细节见上文业务描述)
}

# L591: 捕获异常,避免整图/整请求崩溃
except Exception as e:
# L592: 记录日志,便于线上排查节点入参/出参
logger.error("[run_guide_chat] 单轮对话异常: %s", e, exc_info=True)
# L593: 返回本节点要合并进 state 的字段(LangGraph 会 merge)
return {
# L594: 执行该语句(细节见上文业务描述)
"reply": "抱歉,处理出了点问题,能再说一次吗?",
# L595: Guide 判定用户信息是否足够进入分析阶段
"is_info_sufficient": False,
# L596: 执行该语句(细节见上文业务描述)
"collected_info": {},
# L597: 多轮对话列表,元素为 {role, content}
"conversation_history": conversation_history + [
# L598: 执行该语句(细节见上文业务描述)
{"role": "user", "content": user_message},
# L599: 执行该语句(细节见上文业务描述)
],
# L600: 执行该语句(细节见上文业务描述)
}

系列导航

主题
1 系统全景
2 五 Agent 协作
3 霍兰德 RIASEC
4–7 状态 · 路由 · 嵌套 · 容错
8–11 LLM 层 · SSE/WS · DB 迁移 · PDF
12–14 JSON Prompt · RIASEC Prompt · Guide Prompt
15–17 Docker · 中间件 · 配置

← 返回 iCan 专题