0. 系列闭环(不公开源码也能跟读)

端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 10/17 篇 · 持久环 · DB 与迁移

阶段 用户可见 代码入口 对应篇
建会话 欢迎语 POST /api/sessions 09
多轮对话 SSE 流式 chat/stream → run_guide_single_turn 06, 14
信息充分 开始分析 _run_analysis_background 05, 07
履历解析 进度 30% run_resume_parser 12
画像/RIASEC 进度 50% run_profile_analyzer 03, 13
职业匹配 进度 70% run_career_matcher 02
报告 进度 90% run_reporter 11
下载 PDF 文件 GET …/report/pdf 11, 15
说明
读本篇前 第 09 篇会话保存
读完本篇 解释 init_db + _auto_migrate 启动顺序
下一环 第 11 篇:从 final_report 到 PDF(第 11 篇)

全系列闭环索引:SERIES-LOOP.md

1. 要解决什么问题

iCan 用 SQLite(默认 sqlite:///./ican.db)存会话与报告。迭代中给 Session 表加了 workflow_data(JSON,存对话历史、中间分析结果)。
若只用 create_all()新表能建,已有表的缺列不会自动补上,本地升级后常见「列不存在」报错。

MVP 阶段不想上 Alembic 全套配置,因此在 db/session.py 里加了 _auto_migrate:**启动时对比 ORM 与真实表结构,缺列则 ALTER TABLE ADD COLUMN**。


2. 调用链:lifespan → init_db → _auto_migrate

入口在 main.py 的 FastAPI lifespan:

1
2
3
4
5
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db() # create_all + _auto_migrate
yield
dispose_engine()

init_db()db/session.py)顺序固定:

  1. Base.metadata.create_all(bind=engine, checkfirst=True) — 创建不存在的表;
  2. _auto_migrate(engine) — 对已存在的表补列。

每次容器/进程启动都会跑一遍,无需手工 migration 命令。


SQLAlchemy 自动迁移


3. _auto_migrate 实现(与源码一致)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def _auto_migrate(engine):
from ican.db.models import Base
from sqlalchemy import inspect, text

inspector = inspect(engine)
for table_name, table_obj in Base.metadata.tables.items():
if not inspector.has_table(table_name):
continue
existing_columns = {col["name"] for col in inspector.get_columns(table_name)}
for column in table_obj.columns:
if column.name not in existing_columns:
col_type = column.type.compile(engine.dialect)
nullable = "NULL" if column.nullable else "NOT NULL"
default = ""
if column.default is not None:
default = f" DEFAULT {column.default.arg}"
elif column.server_default is not None:
default = f" DEFAULT {column.server_default.arg}"
sql = f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type} {nullable}{default}"
with engine.connect() as conn:
conn.execute(text(sql))
conn.commit()

逻辑:只增列,不删列、不改类型、不重命名


4. 典型场景:sessions.workflow_data

db/models.pySession 定义(节选):

1
2
3
4
5
6
7
8
9
class Session(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "sessions"
user_id: Mapped[str] = mapped_column(String(36), nullable=False, default="anonymous")
status: Mapped[str] = mapped_column(String(20), nullable=False, default="active")
current_stage: Mapped[str] = mapped_column(String(50), nullable=False, default="guide")
workflow_data: Mapped[dict | None] = mapped_column(
JSON, nullable=True, default=None,
comment="工作流完整数据: conversation_history、collected_info 等",
)

老库若只有前三列,启动日志会出现类似:

1
[_auto_migrate] 自动迁移: ALTER TABLE sessions ADD COLUMN workflow_data JSON NULL

workflow.run_analysis_pipeline 在 Ollama 不可用时会把快速报告写入 SessionRepository.save_session(..., workflow_data={...})依赖此列存在

同文件还有 messagesuser_profilescareer_reports 等表,新增字段同样走这套逻辑。


5. 引擎与会话:和迁移的关系

_get_engine() 懒加载单例,读 settings.DB_URL

  • SQLite:check_same_thread=False
  • PostgreSQL 等:pool_size=5, pool_recycle=3600

迁移在 engine 创建后、业务请求前 执行一次;与 get_db_session() 上下文管理器(commit/rollback)无关。


6. 能力边界(必须写清)

操作 _auto_migrate Alembic
新表 ✅ create_all
加列
删列
改列类型
回滚
多环境版本链

新增 NOT NULL 且无 DEFAULT 的列:对已有数据的表,ALTER ADD 可能失败或留空行违反约束——iCan 当前新增列多为 nullable=True 或带 default,是有意规避。

SQLite 与 JSON:SQLAlchemy JSON 在 SQLite 上存文本;跨 dialect 换 PostgreSQL 时要重新验证类型编译结果。


7. 踩坑

① create_all 不能代替迁移
只跑 create_all 而不跑 _auto_migrate,老用户数据库会一直缺列。确保 lifespan 调的是 init_db() 而不是单独 create_all

② 生产多实例同时启动
多个 worker 同时 ALTER TABLE 同一列,第二个可能报「duplicate column」——当前代码未 catch;生产应用 Alembic 或启动前单实例迁移。

③ 改类型等于手工活
例如把 String(20) 改成 String(50)_auto_migrate 不会处理,需要手写 SQL 或 Alembic revision。

④ workflow_data 体积
整段对话 + 分析结果塞进 JSON,会话长了单行膨胀;这是产品层问题,不是迁移层,但 DBA 视角要知晓。


8. 小结

  • 迁移入口:main.lifespaninit_db()create_all + _auto_migrate
  • 实现文件:db/session.py;模型定义:db/models.py
  • 适合 MVP 快速加列;删改列、回滚、团队协作 应切换 Alembic。
  • 与业务强相关列:sessions.workflow_data 支撑分析流水线持久化与 Ollama 降级报告。

下一篇:tools/pdf_exporter.py 如何把 Reporter 的 Markdown 变成中文 PDF。


附录:关键源码(逐行注释)

以下代码摘自 iCan 实现,每行上方均有中文注释,不公开仓库也可跟读。
生成命令:python3 bin/build-ican-annotated-snippets.py

lifespan 调 init_db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# ========== lifespan 调 init_db ==========
# 源文件: main.py 行 24-49

# L24: 装饰器
@asynccontextmanager
# L25: 异步函数 lifespan:可被 await,适合 IO 型 LLM/DB 调用
async def lifespan(app: FastAPI):
# L27: 【文档】应用生命周期管理:启动时初始化数据库,关闭时清理资源
# L29: 【文档】功能描述:
# L30: 【文档】在 FastAPI 应用启动时执行数据库初始化(创建所有数据表),
# L31: 【文档】在应用关闭时释放数据库引擎资源。使用 asynccontextmanager
# L32: 【文档】确保资源的正确获取和释放。
# L34: 【文档】入参:
# L35: 【文档】app (FastAPI): FastAPI 应用实例
# L37: 【文档】出参:
# L38: 【文档】无(上下文管理器,yield 控制权给应用运行期间)
# L40: 【文档】说明:
# L41: 【文档】- 启动阶段:调用 init_db() 初始化数据库表结构
# L42: 【文档】- 运行阶段:应用正常处理请求
# L43: 【文档】- 关闭阶段:调用 dispose_engine() 释放数据库连接池
# (L26-44 为函数/模块文档字符串,已转为注释便于阅读)
# L45: 开始 try 块,后续 except 负责兜底
try:
# L46: 记录日志,便于线上排查节点入参/出参
logger.info("[lifespan] 应用启动,开始初始化资源")
# L47: 启动阶段:初始化数据库
# L48: 建表 + 自动迁移缺失列
init_db()
# L49: 记录日志,便于线上排查节点入参/出参
logger.info("[lifespan] 数据库初始化完成,应用已就绪")

init_db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# ========== init_db ==========
# 源文件: db/session.py 行 197-228

# L197: 同步函数 init_db:路由决策或工厂方法
def init_db() -> None:
# L199: 【文档】初始化数据库,创建所有数据表。
# L201: 【文档】功能描述:
# L202: 【文档】根据 ORM 模型定义,在数据库中创建所有不存在的数据表。
# L203: 【文档】使用 checkfirst=True 避免重复创建已存在的表。
# L205: 【文档】入参说明:
# L206: 【文档】无
# L208: 【文档】出参说明:
# L209: 【文档】None
# (L198-210 为函数/模块文档字符串,已转为注释便于阅读)
# L211: 开始 try 块,后续 except 负责兜底
try:
# L212: 记录日志,便于线上排查节点入参/出参
logger.info("[init_db] 开始执行,入参: 无")

# L214: 导入所有模型以确保它们被注册到 Base.metadata
# L215: 导入依赖模块
from ican.db.models import Base # noqa: F401

# L217: 赋值:更新局部变量或 state 字段
engine = _get_engine()

# L219: 记录日志,便于线上排查节点入参/出参
logger.info("[init_db] 开始创建数据库表...")
# L220: 赋值:更新局部变量或 state 字段
Base.metadata.create_all(bind=engine, checkfirst=True)

# L222: 对比 ORM 与数据库,ALTER TABLE 补缺失列
_auto_migrate(engine)

# L224: 记录日志,便于线上排查节点入参/出参
logger.info("[init_db] 执行完成,数据库表初始化成功")

# L226: 捕获异常,避免整图/整请求崩溃
except Exception as e:
# L227: 记录日志,便于线上排查节点入参/出参
logger.error(f"[init_db] 初始化数据库异常: {e}")
# L228: 向上抛出异常,由调用方或 LangGraph 处理
raise

_auto_migrate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# ========== _auto_migrate ==========
# 源文件: db/session.py 行 158-194

# L158: 同步函数 _auto_migrate:路由决策或工厂方法
def _auto_migrate(engine):
# L160: 【文档】自动检查并补充数据库表中缺失的列。
# L162: 【文档】功能描述:
# L163: 【文档】对比 ORM 模型定义与实际数据库表结构,
# L164: 【文档】自动 ALTER TABLE 添加缺失的列。
# L165: 【文档】仅处理新增列,不处理列类型变更或列删除。
# L167: 【文档】入参说明:
# L168: 【文档】engine: SQLAlchemy 数据库引擎
# L170: 【文档】出参说明:
# L171: 【文档】None
# (L159-172 为函数/模块文档字符串,已转为注释便于阅读)
# L173: 导入依赖模块
from ican.db.models import Base
# L174: 导入依赖模块
from sqlalchemy import inspect, text

# L176: 赋值:更新局部变量或 state 字段
inspector = inspect(engine)
# L177: 循环
for table_name, table_obj in Base.metadata.tables.items():
# L178: 条件分支
if not inspector.has_table(table_name):
# L179: 执行该语句(细节见上文业务描述)
continue
# L180: 赋值:更新局部变量或 state 字段
existing_columns = {col["name"] for col in inspector.get_columns(table_name)}
# L181: 循环
for column in table_obj.columns:
# L182: 条件分支
if column.name not in existing_columns:
# L183: 赋值:更新局部变量或 state 字段
col_type = column.type.compile(engine.dialect)
# L184: 赋值:更新局部变量或 state 字段
nullable = "NULL" if column.nullable else "NOT NULL"
# L185: 赋值:更新局部变量或 state 字段
default = ""
# L186: 条件分支
if column.default is not None:
# L187: 赋值:更新局部变量或 state 字段
default = f" DEFAULT {column.default.arg}"
# L188: 条件分支
elif column.server_default is not None:
# L189: 赋值:更新局部变量或 state 字段
default = f" DEFAULT {column.server_default.arg}"
# L190: 赋值:更新局部变量或 state 字段
sql = f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type} {nullable}{default}"
# L191: 执行该语句(细节见上文业务描述)
with engine.connect() as conn:
# L192: 执行该语句(细节见上文业务描述)
conn.execute(text(sql))
# L193: 执行该语句(细节见上文业务描述)
conn.commit()
# L194: 对比 ORM 与数据库,ALTER TABLE 补缺失列
logger.info(f"[_auto_migrate] 自动迁移: {sql}")

Session.workflow_data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# ========== Session.workflow_data ==========
# 源文件: db/models.py 行 170-215

# L170: 定义类(配置或 ORM 模型)
class Session(UUIDPrimaryKeyMixin, TimestampMixin, Base):
# L172: 【文档】会话表模型。
# L174: 【文档】功能描述:
# L175: 【文档】记录用户的每次咨询会话信息,包括会话状态、当前阶段等。
# L176: 【文档】一个会话对应用户的一次完整职业规划咨询流程。
# L178: 【文档】入参说明:
# L179: 【文档】user_id (str): 用户标识
# L180: 【文档】status (str): 会话状态
# L181: 【文档】current_stage (str): 当前阶段
# L183: 【文档】出参说明:
# L184: 【文档】Session ORM 实例
# (L171-185 为函数/模块文档字符串,已转为注释便于阅读)

# L187: 赋值:更新局部变量或 state 字段
__tablename__ = "sessions"

# L189: 赋值:更新局部变量或 state 字段
user_id: Mapped[str] = mapped_column(
# L190: 执行该语句(细节见上文业务描述)
String(36),
# L191: 赋值:更新局部变量或 state 字段
nullable=False,
# L192: 赋值:更新局部变量或 state 字段
default="anonymous",
# L193: 赋值:更新局部变量或 state 字段
comment="用户标识",
# L194: 执行该语句(细节见上文业务描述)
)

# L196: 赋值:更新局部变量或 state 字段
status: Mapped[str] = mapped_column(
# L197: 执行该语句(细节见上文业务描述)
String(20),
# L198: 赋值:更新局部变量或 state 字段
nullable=False,
# L199: 赋值:更新局部变量或 state 字段
default="active",
# L200: 赋值:更新局部变量或 state 字段
comment="会话状态: active, completed, abandoned",
# L201: 执行该语句(细节见上文业务描述)
)

# L203: 赋值:更新局部变量或 state 字段
current_stage: Mapped[str] = mapped_column(
# L204: 执行该语句(细节见上文业务描述)
String(50),
# L205: 赋值:更新局部变量或 state 字段
nullable=False,
# L206: 赋值:更新局部变量或 state 字段
default="guide",
# L207: 赋值:更新局部变量或 state 字段
comment="当前阶段: guide, parse, analyze, match, plan, report",
# L208: 执行该语句(细节见上文业务描述)
)

# L210: JSON 字段:存对话历史、中间结果、final_report 等
workflow_data: Mapped[dict | None] = mapped_column(
# L211: 执行该语句(细节见上文业务描述)
JSON,
# L212: 赋值:更新局部变量或 state 字段
nullable=True,
# L213: 赋值:更新局部变量或 state 字段
default=None,
# L214: 多轮对话列表,元素为 {role, content}
comment="工作流完整数据 (JSON): 包含 conversation_history、collected_info 等全部中间结果",
# L215: 执行该语句(细节见上文业务描述)
)

系列导航

主题
1 系统全景
2 五 Agent 协作
3 霍兰德 RIASEC
4–7 状态 · 路由 · 嵌套 · 容错
8–11 LLM 层 · SSE/WS · DB 迁移 · PDF
12–14 JSON Prompt · RIASEC Prompt · Guide Prompt
15–17 Docker · 中间件 · 配置

← 返回 iCan 专题