0. 系列闭环(不公开源码也能跟读)
端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 9/17 篇 · 接入环 · SSE / 进度
| 阶段 |
用户可见 |
代码入口 |
对应篇 |
| 建会话 |
欢迎语 |
POST /api/sessions |
09 |
| 多轮对话 |
SSE 流式 |
chat/stream → run_guide_single_turn |
06, 14 |
| 信息充分 |
开始分析 |
_run_analysis_background |
05, 07 |
| 履历解析 |
进度 30% |
run_resume_parser |
12 |
| 画像/RIASEC |
进度 50% |
run_profile_analyzer |
03, 13 |
| 职业匹配 |
进度 70% |
run_career_matcher |
02 |
| 报告 |
进度 90% |
run_reporter |
11 |
| 下载 PDF |
文件 |
GET …/report/pdf |
11, 15 |
|
说明 |
| 读本篇前 |
第 07 篇 run_analysis_pipeline |
| 读完本篇 |
跟读 chat/stream 与 _run_analysis_background 时序 |
| 下一环 |
第 10 篇:结果写入 workflow_data(第 10 篇) |
全系列闭环索引:SERIES-LOOP.md
1. 要解决什么问题
iCan 前端有两类实时需求:
- Guide 阶段:用户发消息后希望逐字看到 AI 回复(打字机效果);
- 分析阶段:信息充足后后台跑
run_analysis_pipeline(parse → analyze → match → report),前端需要知道「进行到哪一步」。
如果把 LLM 流式和分析进度混在同一条 HTTP 长连接里,协议和超时策略会纠缠。当前实现是 SSE 负责 LLM 输出,内存字典 + 轮询负责分析进度,WebSocket 基础设施已搭好但进度推送尚未接入主路径。
2. 实现位置
| 模块 |
职责 |
api/routes/chat.py |
对话 CRUD、SSE 流式、run_guide_chat、后台分析任务 |
workflow.py |
run_guide_chat(单轮引导)、run_analysis_pipeline(四段分析) |
api/ws_manager.py |
ConnectionManager:connect / send_progress / send_completed / send_error |
api/routes/ws.py |
WebSocket 端点 /ws/{session_id} |
路由前缀:chat.py 的 router 为 prefix="/api/sessions"。

3. 非流式对话:run_guide_chat
普通对话走两个 POST 端点,内部都调 workflow.run_guide_chat → agents/guide.run_guide_single_turn(单轮 LLM,不走 Guide 内层 5 节点子图,见第 6 篇):
| 端点 |
场景 |
POST /api/sessions |
创建会话 + 首条消息 |
POST /api/sessions/{session_id}/chat |
后续多轮 |
当 is_info_sufficient=True 时,chat.py 调用 _set_status(session_id, "processing", "parse") 并 asyncio.create_task(_run_analysis_background(...)),HTTP 响应立即返回 stage="processing",分析在后台跑。
超时:asyncio.wait_for(..., timeout=90) 包住 run_guide_chat,超时返回固定提示而不抛 500。
4. SSE 流式:POST /api/sessions/{session_id}/chat/stream
这是 Guide 阶段打字机效果的唯一 SSE 入口。返回 StreamingResponse(..., media_type="text/event-stream")。
4.1 Guide 模式(报告尚未生成)
流程:生产者-消费者 + asyncio.Queue(不用线程 queue.Queue)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| async def _stream_worker(): async for chunk in model.astream(processed): if chunk.content: await chunk_queue.put(chunk.content)
task = asyncio.create_task(_stream_worker())
while True: chunk = await asyncio.wait_for(chunk_queue.get(), timeout=120) if chunk is None: break yield f"data: {json.dumps({'type': 'chunk', 'content': chunk})}\n\n"
if not task.done(): task.cancel()
|
消息体:GUIDE_SYSTEM_PROMPT + 已有 conversation_history + 当前用户消息;经 _inject_no_think 处理(Ollama Qwen3,见第 8 篇)。
SSE 事件类型:
| type |
含义 |
chunk |
LLM 增量文本,content 字段 |
done |
流结束,stage 为 guide 或 processing,is_sufficient 布尔 |
error |
超时或异常 |
流结束后在服务端做 关键词充分性判断(与 run_guide_single_turn 相同规则:found_keywords >= 6 或 >=4 且文本>=50),写库 conversation_history / collected_info。若充分,_set_status(..., "parse") 并 create_task(_run_analysis_background),done 事件里 stage=processing。
4.2 QA 模式(final_report 已存在)
若 workflow_data.final_report 非空,chat/stream 切换为报告答疑:system prompt 注入 personal_profile 与 career_matches JSON 摘要,同样 astream + Queue,但 done 的 stage=qa,并写入 qa_history。
4.3 并发保护
_workflow_status[session_id].status == "processing" 时,chat/stream 返回 409「正在处理中」;已有报告时 Guide 流式不可用,走 QA 分支。
5. 分析进度:内存 + 定时器 + 轮询(非 WebSocket 主路径)
进度状态在 chat.py 模块级字典 _workflow_status 维护,并通过 _save_status_to_db 异步写入 workflow_data.status:
1 2 3 4 5 6 7 8 9 10 11 12 13
| STAGE_PROGRESS = {"guide": 0.1, "parse": 0.3, "analyze": 0.5, "match": 0.7, "report": 0.9, "completed": 1.0}
def _set_status(session_id, status, stage, reply=None): data = { "status": status, "stage": stage, "progress": STAGE_PROGRESS.get(stage, 0.0), "message": STAGE_LABELS.get(stage, stage), "reply": reply, } _workflow_status[session_id] = data
|
后台任务 _run_analysis_background 调用 run_analysis_pipeline,同时启动 _update_progress_periodically:
1 2 3 4 5 6
| async def _update_progress_periodically(session_id: str): for stage in ["parse", "analyze", "match", "report"]: await asyncio.sleep(20) if _workflow_status.get(session_id, {}).get("status") in ("completed", "failed"): return _set_status(session_id, "processing", stage)
|
前端轮询 **GET /api/sessions/{session_id}/status**:优先读内存 _workflow_status,miss 时从 DB workflow_data.status 恢复。
api/routes/report_gen.py 独立上传简历生成报告时,复用同一套 _set_status / _update_progress_periodically(从 chat.py import)。
6. WebSocket:ws_manager 现状
6.1 连接端点
1 2 3 4 5 6 7 8 9 10
| @router.websocket("/ws/{session_id}") async def websocket_endpoint(ws: WebSocket, session_id: str): await ws.accept() await ws_manager.connect(session_id, ws) try: while True: await ws.receive_text() except WebSocketDisconnect: ws_manager.disconnect(session_id, ws)
|
api/ws_manager.py 的 ConnectionManager 按 session_id 维护连接列表,提供:
1 2 3 4 5 6 7 8 9 10
| async def send_progress(self, session_id, stage, progress, message): data = json.dumps({"type": "progress", "stage": stage, "progress": progress, "message": message})
async def send_completed(self, session_id, reply=""): data = json.dumps({"type": "completed", "reply": reply})
async def send_error(self, session_id, error: str): data = json.dumps({"type": "error", "error": error})
|
6.2 实际调用点
grep 全仓库:**send_progress / send_completed 未被 chat.py 或 _run_analysis_background 调用**。
唯一生产路径调用是 workflow.run_analysis_pipeline 顶层 except 里的 await ws_manager.send_error(session_id, str(e))。
结论:WebSocket 管理器已就绪,当前前端进度应依赖 GET .../status 轮询;若要用 WS 推进度,需在 _set_status 或 _run_analysis_background 各 stage 处补 await ws_manager.send_progress(...),完成时补 send_completed。
7. 数据流总览
1 2 3 4 5 6 7 8 9 10 11 12 13
| 用户消息 ├─ POST /chat ────────── run_guide_chat ──► 一次性 JSON 回复 └─ POST /chat/stream ─── model.astream ──► SSE chunk/done │ └─ is_sufficient ──► create_task(_run_analysis_background) │ ├─ run_analysis_pipeline (workflow.py) ├─ _update_progress_periodically (定时改 stage) └─ _set_status → 内存 + DB ▲ 前端 GET /status ─────────────────────────────┘
WebSocket /ws/{id} ── 已连接,进度 push 未接线(仅 pipeline 异常 send_error)
|
分析完成后 _run_analysis_background 合并 final_report、personal_profile 等到 workflow_data,_set_status(..., "completed", "completed", reply="报告生成完成!")。
8. 踩坑
① WS 进度方法与业务未连接
send_progress / send_completed 在 ws_manager.py 已实现,但 _run_analysis_background 只用 _set_status。文档或前端若假设「连 WS 就能收 progress 事件」,会对不上源码。
② 进度是定时模拟,不是 Agent 真实回调
_update_progress_periodically 每 20 秒换一个 stage 名称,与 run_resume_parser 等是否完成无关。长耗时步骤可能 stage 已显示 report 但 Reporter 仍在跑。
③ SSE 与 REST 各有一套充分性逻辑
chat/stream 在流结束后本地算关键词;POST /chat 走 run_guide_single_turn。两者规则一致,但与内层 check_sufficiency(LLM 判 sufficient)不同——勿与第 6 篇子图逻辑混为一谈。
④ 生产者必须用 asyncio.create_task
_stream_worker 若直接 await 会阻塞 event_generator 无法 yield;Queue 必须用 asyncio.Queue。消费者退出时要 task.cancel(),否则 worker 可能泄漏。
⑤ connect 是 async,disconnect 是 sync
ws_manager.connect 为 async 函数(虽内部无异步 IO);disconnect 为普通 def。扩展时注意接口一致性。
9. 小结
- SSE 端点:
POST /api/sessions/{session_id}/chat/stream;事件 chunk / done / error;报告生成后同一端点切 QA 模式。
- 非流式:
POST /api/sessions 与 POST .../chat,经 workflow.run_guide_chat。
- 分析进度:
_workflow_status + _set_status + 定时 _update_progress_periodically;前端 GET /api/sessions/{session_id}/status 轮询。
- WebSocket:
/ws/{session_id} 可连;ws_manager 进度/完成方法 尚未接入 主流程,仅 pipeline 异常 send_error。
- 扩展实时能力时,优先在
_set_status 旁补 WS 推送,或改为 Agent 节点内显式回调,避免继续依赖纯定时 stage。
下一篇:SQLAlchemy 自动迁移(db/session.py)。
附录:关键源码(逐行注释)
以下代码摘自 iCan 实现,每行上方均有中文注释,不公开仓库也可跟读。
生成命令:python3 bin/build-ican-annotated-snippets.py
SSE 端点入口 chat_stream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
@router.post("/{session_id}/chat/stream")
async def chat_stream(session_id: str, request: ChatRequest, user_id: str = Depends(get_current_user)):
try:
session_data = repository.get_session(session_id)
if not session_data:
raise HTTPException(status_code=404, detail=f"会话不存在: {session_id}")
if user_id and session_data.get("user_id") != user_id:
raise HTTPException(status_code=403, detail="无权访问此会话")
current_status = _workflow_status.get(session_id, {}).get("status")
if current_status == "processing":
raise HTTPException(status_code=409, detail="正在处理中,请稍后")
workflow_data = session_data.get("workflow_data") or {}
final_report = workflow_data.get("final_report", "")
personal_profile = workflow_data.get("personal_profile") or workflow_data.get("structured_profile") or {}
career_matches = workflow_data.get("career_matches") or []
|
_run_analysis_background
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
async def _run_analysis_background(session_id: str, conversation_history: list, collected_info: dict, user_id: str):
try:
_set_status(session_id, "processing", "parse")
progress_task = asyncio.create_task(_update_progress_periodically(session_id))
from ican.workflow import run_analysis_pipeline
workflow_result = await run_analysis_pipeline(
conversation_history=conversation_history,
collected_info=collected_info,
session_id=session_id,
user_id=user_id,
)
progress_task.cancel()
try:
await progress_task
except asyncio.CancelledError:
pass
final_report = workflow_result.get("final_report", "")
try:
repository.save_report(
session_id=session_id,
report_type="comprehensive",
report_content=final_report,
user_id=user_id,
)
except Exception as db_err:
logger.warning("[_run_analysis_background] 保存报告到数据库异常: %s", db_err)
session_data = repository.get_session(session_id) or {}
old_workflow = session_data.get("workflow_data", {}) if isinstance(session_data, dict) else {}
merged_workflow = dict(old_workflow)
merged_workflow.update({
"final_report": final_report,
"structured_profile": workflow_result.get("structured_profile", {}),
"personal_profile": workflow_result.get("personal_profile", {}),
"career_matches": workflow_result.get("career_matches", []),
})
repository.save_session(
session_id=session_id,
user_id=user_id,
status="completed",
current_stage="completed",
workflow_data=merged_workflow,
)
if final_report:
repository.save_career_report(
session_id=session_id,
report_data={"report_type": "full", "full_report": {"content": final_report}},
)
personal_profile = workflow_result.get("personal_profile", {})
if personal_profile:
repository.save_user_profile(session_id=session_id, profile_data=personal_profile)
_set_status(session_id, "completed", "completed", reply="报告生成完成!")
logger.info("[_run_analysis_background] 分析完成: session_id=%s, 报告长度=%d", session_id, len(final_report))
except Exception as e:
logger.error("[_run_analysis_background] 分析异常: session_id=%s, error=%s", session_id, e, exc_info=True)
_set_status(session_id, "failed", "failed", reply=f"报告生成失败: {str(e)}")
repository.update_session_status(session_id, "failed", "failed")
|
run_guide_chat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
async def run_guide_chat(conversation_history: list, user_message: str) -> dict:
try:
logger.info("[run_guide_chat] 开始单轮对话,历史长度=%d, 消息长度=%d", len(conversation_history), len(user_message))
from ican.agents.guide import run_guide_single_turn
result = await run_guide_single_turn(conversation_history, user_message)
updated_history = list(conversation_history)
updated_history.append({"role": "user", "content": user_message})
if result.get("reply"):
updated_history.append({"role": "assistant", "content": result["reply"]})
return {
"reply": result.get("reply", ""),
"is_info_sufficient": result.get("is_info_sufficient", False),
"collected_info": result.get("collected_info", {}),
"conversation_history": updated_history,
}
except Exception as e:
logger.error("[run_guide_chat] 单轮对话异常: %s", e, exc_info=True)
return {
"reply": "抱歉,处理出了点问题,能再说一次吗?",
"is_info_sufficient": False,
"collected_info": {},
"conversation_history": conversation_history + [
{"role": "user", "content": user_message},
],
}
|
系列导航
← 返回 iCan 专题