0. 系列闭环(不公开源码也能跟读)
端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 10/17 篇 · 持久环 · DB 与迁移
| 阶段 |
用户可见 |
代码入口 |
对应篇 |
| 建会话 |
欢迎语 |
POST /api/sessions |
09 |
| 多轮对话 |
SSE 流式 |
chat/stream → run_guide_single_turn |
06, 14 |
| 信息充分 |
开始分析 |
_run_analysis_background |
05, 07 |
| 履历解析 |
进度 30% |
run_resume_parser |
12 |
| 画像/RIASEC |
进度 50% |
run_profile_analyzer |
03, 13 |
| 职业匹配 |
进度 70% |
run_career_matcher |
02 |
| 报告 |
进度 90% |
run_reporter |
11 |
| 下载 PDF |
文件 |
GET …/report/pdf |
11, 15 |
|
说明 |
| 读本篇前 |
第 09 篇会话保存 |
| 读完本篇 |
解释 init_db + _auto_migrate 启动顺序 |
| 下一环 |
第 11 篇:从 final_report 到 PDF(第 11 篇) |
全系列闭环索引:SERIES-LOOP.md
1. 要解决什么问题
iCan 用 SQLite(默认 sqlite:///./ican.db)存会话与报告。迭代中给 Session 表加了 workflow_data(JSON,存对话历史、中间分析结果)。
若只用 create_all():新表能建,已有表的缺列不会自动补上,本地升级后常见「列不存在」报错。
MVP 阶段不想上 Alembic 全套配置,因此在 db/session.py 里加了 _auto_migrate:**启动时对比 ORM 与真实表结构,缺列则 ALTER TABLE ADD COLUMN**。
2. 调用链:lifespan → init_db → _auto_migrate
入口在 main.py 的 FastAPI lifespan:
1 2 3 4 5
| @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield dispose_engine()
|
init_db()(db/session.py)顺序固定:
Base.metadata.create_all(bind=engine, checkfirst=True) — 创建不存在的表;
_auto_migrate(engine) — 对已存在的表补列。
每次容器/进程启动都会跑一遍,无需手工 migration 命令。

3. _auto_migrate 实现(与源码一致)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| def _auto_migrate(engine): from ican.db.models import Base from sqlalchemy import inspect, text
inspector = inspect(engine) for table_name, table_obj in Base.metadata.tables.items(): if not inspector.has_table(table_name): continue existing_columns = {col["name"] for col in inspector.get_columns(table_name)} for column in table_obj.columns: if column.name not in existing_columns: col_type = column.type.compile(engine.dialect) nullable = "NULL" if column.nullable else "NOT NULL" default = "" if column.default is not None: default = f" DEFAULT {column.default.arg}" elif column.server_default is not None: default = f" DEFAULT {column.server_default.arg}" sql = f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type} {nullable}{default}" with engine.connect() as conn: conn.execute(text(sql)) conn.commit()
|
逻辑:只增列,不删列、不改类型、不重命名。
4. 典型场景:sessions.workflow_data
db/models.py 中 Session 定义(节选):
1 2 3 4 5 6 7 8 9
| class Session(UUIDPrimaryKeyMixin, TimestampMixin, Base): __tablename__ = "sessions" user_id: Mapped[str] = mapped_column(String(36), nullable=False, default="anonymous") status: Mapped[str] = mapped_column(String(20), nullable=False, default="active") current_stage: Mapped[str] = mapped_column(String(50), nullable=False, default="guide") workflow_data: Mapped[dict | None] = mapped_column( JSON, nullable=True, default=None, comment="工作流完整数据: conversation_history、collected_info 等", )
|
老库若只有前三列,启动日志会出现类似:
1
| [_auto_migrate] 自动迁移: ALTER TABLE sessions ADD COLUMN workflow_data JSON NULL
|
workflow.run_analysis_pipeline 在 Ollama 不可用时会把快速报告写入 SessionRepository.save_session(..., workflow_data={...}),依赖此列存在。
同文件还有 messages、user_profiles、career_reports 等表,新增字段同样走这套逻辑。
5. 引擎与会话:和迁移的关系
_get_engine() 懒加载单例,读 settings.DB_URL:
- SQLite:
check_same_thread=False;
- PostgreSQL 等:
pool_size=5, pool_recycle=3600。
迁移在 engine 创建后、业务请求前 执行一次;与 get_db_session() 上下文管理器(commit/rollback)无关。
6. 能力边界(必须写清)
| 操作 |
_auto_migrate |
Alembic |
| 新表 |
✅ create_all |
✅ |
| 加列 |
✅ |
✅ |
| 删列 |
❌ |
✅ |
| 改列类型 |
❌ |
✅ |
| 回滚 |
❌ |
✅ |
| 多环境版本链 |
❌ |
✅ |
新增 NOT NULL 且无 DEFAULT 的列:对已有数据的表,ALTER ADD 可能失败或留空行违反约束——iCan 当前新增列多为 nullable=True 或带 default,是有意规避。
SQLite 与 JSON:SQLAlchemy JSON 在 SQLite 上存文本;跨 dialect 换 PostgreSQL 时要重新验证类型编译结果。
7. 踩坑
① create_all 不能代替迁移
只跑 create_all 而不跑 _auto_migrate,老用户数据库会一直缺列。确保 lifespan 调的是 init_db() 而不是单独 create_all。
② 生产多实例同时启动
多个 worker 同时 ALTER TABLE 同一列,第二个可能报「duplicate column」——当前代码未 catch;生产应用 Alembic 或启动前单实例迁移。
③ 改类型等于手工活
例如把 String(20) 改成 String(50),_auto_migrate 不会处理,需要手写 SQL 或 Alembic revision。
④ workflow_data 体积
整段对话 + 分析结果塞进 JSON,会话长了单行膨胀;这是产品层问题,不是迁移层,但 DBA 视角要知晓。
8. 小结
- 迁移入口:
main.lifespan → init_db() → create_all + _auto_migrate。
- 实现文件:
db/session.py;模型定义:db/models.py。
- 适合 MVP 快速加列;删改列、回滚、团队协作 应切换 Alembic。
- 与业务强相关列:
sessions.workflow_data 支撑分析流水线持久化与 Ollama 降级报告。
下一篇:tools/pdf_exporter.py 如何把 Reporter 的 Markdown 变成中文 PDF。
附录:关键源码(逐行注释)
以下代码摘自 iCan 实现,每行上方均有中文注释,不公开仓库也可跟读。
生成命令:python3 bin/build-ican-annotated-snippets.py
lifespan 调 init_db
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
logger.info("[lifespan] 应用启动,开始初始化资源")
init_db()
logger.info("[lifespan] 数据库初始化完成,应用已就绪")
|
init_db
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
def init_db() -> None:
try:
logger.info("[init_db] 开始执行,入参: 无")
from ican.db.models import Base
engine = _get_engine()
logger.info("[init_db] 开始创建数据库表...")
Base.metadata.create_all(bind=engine, checkfirst=True)
_auto_migrate(engine)
logger.info("[init_db] 执行完成,数据库表初始化成功")
except Exception as e:
logger.error(f"[init_db] 初始化数据库异常: {e}")
raise
|
_auto_migrate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
def _auto_migrate(engine):
from ican.db.models import Base
from sqlalchemy import inspect, text
inspector = inspect(engine)
for table_name, table_obj in Base.metadata.tables.items():
if not inspector.has_table(table_name):
continue
existing_columns = {col["name"] for col in inspector.get_columns(table_name)}
for column in table_obj.columns:
if column.name not in existing_columns:
col_type = column.type.compile(engine.dialect)
nullable = "NULL" if column.nullable else "NOT NULL"
default = ""
if column.default is not None:
default = f" DEFAULT {column.default.arg}"
elif column.server_default is not None:
default = f" DEFAULT {column.server_default.arg}"
sql = f"ALTER TABLE {table_name} ADD COLUMN {column.name} {col_type} {nullable}{default}"
with engine.connect() as conn:
conn.execute(text(sql))
conn.commit()
logger.info(f"[_auto_migrate] 自动迁移: {sql}")
|
Session.workflow_data
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
class Session(UUIDPrimaryKeyMixin, TimestampMixin, Base):
__tablename__ = "sessions"
user_id: Mapped[str] = mapped_column(
String(36),
nullable=False,
default="anonymous",
comment="用户标识",
)
status: Mapped[str] = mapped_column(
String(20),
nullable=False,
default="active",
comment="会话状态: active, completed, abandoned",
)
current_stage: Mapped[str] = mapped_column(
String(50),
nullable=False,
default="guide",
comment="当前阶段: guide, parse, analyze, match, plan, report",
)
workflow_data: Mapped[dict | None] = mapped_column(
JSON,
nullable=True,
default=None,
comment="工作流完整数据 (JSON): 包含 conversation_history、collected_info 等全部中间结果",
)
|
系列导航
← 返回 iCan 专题