0. 系列闭环(不公开源码也能跟读)

端到端链路:Vue 前端 → api/routes/chat.py → Guide 多轮 SSE → run_analysis_pipeline(解析→分析→匹配→报告)→ tools/pdf_exporter PDF。
本篇:第 16/17 篇 · 网关环 · 中间件

阶段 用户可见 代码入口 对应篇
建会话 欢迎语 POST /api/sessions 09
多轮对话 SSE 流式 chat/stream → run_guide_single_turn 06, 14
信息充分 开始分析 _run_analysis_background 05, 07
履历解析 进度 30% run_resume_parser 12
画像/RIASEC 进度 50% run_profile_analyzer 03, 13
职业匹配 进度 70% run_career_matcher 02
报告 进度 90% run_reporter 11
下载 PDF 文件 GET …/report/pdf 11, 15
说明
读本篇前 第 01 篇 main.py
读完本篇 说出 CORS 与 RateLimit 注册顺序及 /health 白名单
下一环 第 17 篇:配置项来源(第 17 篇)

全系列闭环索引:SERIES-LOOP.md

一、要解决什么问题

iCan 的 /chat/stream、报告生成、文件上传都会触发 LLM 或长耗时任务。公开演示或内测时,若不对 IP 限流,单客户端刷接口会导致:

  • LLM 配额被快速耗尽(成本与速率双杀);
  • Uvicorn worker 被占满,正常用户的 SSE 流卡顿;
  • 日志被同一 IP 淹没,难以排查真实故障。

项目在 api/middleware.py 自研 滑动窗口 IP 限流(不引入 slowapi/Redis,符合 MVP 体量),在 main.pycreate_app() 里与 CORSMiddleware 一起注册。

二、实现位置

组件 文件 说明
限流 api/middleware.py RateLimitMiddleware
CORS + 注册顺序 main.py create_app() 内两次 add_middleware
受影响路由 api/routes/chat.py 所有 HTTP 请求经中间件链

FastAPI 请求处理链

三、Starlette 中间件顺序(洋葱模型)

FastAPI/Starlette 后添加的中间件先接触请求main.py 实际注册顺序:

1
2
3
4
5
6
7
8
9
10
11
# main.py — create_app()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

from ican.api.middleware import RateLimitMiddleware
app.add_middleware(RateLimitMiddleware, max_requests=60, window_seconds=60)

因此请求路径是:

1
Client → RateLimitMiddleware → CORSMiddleware → 路由处理 → CORSMiddleware → RateLimitMiddleware → Client

RateLimit 在最外层:超限直接在 dispatch 里返回 429,不必再跑 CORS 逻辑和路由,适合挡恶意刷接口。若把顺序写反,会先算 CORS 再限流,浪费 CPU。

四、RateLimitMiddleware 实现

完整逻辑在 api/middleware.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_requests: int = 60, window_seconds: int = 60):
super().__init__(app)
self.max_requests = max_requests
self.window_seconds = window_seconds
self._requests = defaultdict(list)

async def dispatch(self, request: Request, call_next):
if request.url.path == "/health":
return await call_next(request)

client_ip = request.client.host if request.client else "unknown"
now = time.time()
self._requests[client_ip] = [
t for t in self._requests[client_ip]
if now - t < self.window_seconds
]

if len(self._requests[client_ip]) >= self.max_requests:
return JSONResponse(
status_code=429,
content={
"error": "请求过于频繁",
"detail": f"每{self.window_seconds}秒最多{self.max_requests}次请求",
},
)

self._requests[client_ip].append(now)
return await call_next(request)

设计选择:

说明
滑动窗口 按时间戳过滤过期记录,比「整分钟重置」更平滑
内存 defaultdict(list) 实现简单;进程重启计数清零;多副本互不共享
/health 豁免 Docker/K8s healthcheck 不被 429
429 JSON body 前端可统一 toast,字段 error + detail

默认 60 次 / 60 秒 / IP,在 main.py 写死传入;尚未接到 config.py 的 settings,改配额要改代码或后续抽到配置(第 17 篇)。

五、CORSMiddleware 配置

MVP 阶段允许所有来源,方便本地 Vue dev server 与线上域混测:

1
2
3
4
5
6
7
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

注意 allow_origins=["*"]allow_credentials=True 的组合:浏览器规范下,带 Cookie 的跨域请求不能使用 Access-Control-Allow-Origin: *。开发环境若未带凭证可能无感;生产若启用登录 Cookie,必须把 allow_origins 改成明确域名列表(如 https://app.example.com)。

iCan 当前 JWT/会话若走 Header 而非 Cookie,影响较小,但上线前仍应复核前端实际凭证方式。

六、与业务路由的配合

  • POST /chat/streamapi/routes/chat.py:一次用户发消息通常计 1 次限流;前端若错误地频繁重连 SSE,会快速触达 429。
  • WebSocket(api/routes/ws.py:握手是 HTTP 升级,同样经过 RateLimitMiddleware,计入同一 IP 窗口。
  • POST 上传简历:大 body 仍占一次计数;当前未做「上传单独更低配额」或「按 user_id 限流」。
  • **/health**:不限流,保证编排系统探活稳定。

扩展思路(未实现):在 dispatch 里按 request.url.path 分支,例如 chat 30/min、upload 10/min,参数从 config.py 读取。

七、429 响应与 CORS 头

超限返回的是裸 JSONResponse(429)不经过 CORSMiddleware 为成功响应追加头的那条路径时,部分浏览器跨域场景下前端只能看到 network error,读不到 JSON body。

当前顺序下 RateLimit 在外层,429 由 RateLimit 直接返回,可能缺少 CORS 头。内网同源部署通常无问题;若前端与 API 不同域且需解析 429 body,可在 429 分支手动加 Access-Control-Allow-Origin,或调整中间件顺序并实测浏览器行为。

八、生产演进路径

阶段 方案
MVP(当前) 单进程内存滑动窗口,60/min/IP
多 Uvicorn worker / 多 Pod 各副本计数独立,Effective 限额 ≈ N × 60;需 Redis INCR+TTL 或网关限流
登录用户 user_id 限流,减轻 NAT 下多用户共 IP 误伤
边缘 静态资源 CDN,API 走 WAF/云厂商速率限制

九、踩坑与边界

  1. request.client 为 None 或反代 IP 不对
    代码用 request.client.host;Nginx 未传 X-Forwarded-For 时,反代后可能全是 127.0.0.1"unknown"所有用户共用一个桶。生产应在反向代理配置真实 IP,并在中间件读 X-Forwarded-For 首段(当前源码未做,属已知缺口)。

  2. /health 路径硬编码
    若健康检查改成 /api/health 或带 prefix,必须同步改豁免条件,否则探活被 429 导致容器反复重启。

  3. 内存泄漏边界
    _requests 只清理当前 IP 的时间戳列表;长期运行若 IP 种类极多,dict 会变大。MVP 可接受;大规模需 TTL 清理整个 IP 键或换 Redis。

  4. 与 LLM 超时的关系
    限流挡的是「请求次数」,不挡「单次 LLM 90s」;用户 60 秒内发 60 条消息仍可能把 worker 拖慢——需产品层防抖 + 第 7 篇的超时策略配合。

  5. CORS * + credentials
    见第五节;上线前改为显式 origin 列表,并删掉与规范冲突的组合。

十、小结

  • 限流在 api/middleware.py,CORS 与注册顺序在 main.py后注册的 RateLimit 在外层,先挡刷接口。
  • 滑动窗口 + /health 豁免 + 429 JSON,满足 MVP;多实例前必须换分布式计数。
  • 反代 IP、CORS 与 429、路径豁免是上线前三项必查。
  • 下一篇(第 17 篇):pydantic-settings 配置管理与 API Key 脱敏。

附录:关键源码(逐行注释)

以下代码摘自 iCan 实现,每行上方均有中文注释,不公开仓库也可跟读。
生成命令:python3 bin/build-ican-annotated-snippets.py

RateLimitMiddleware(节选)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# ========== RateLimitMiddleware(节选) ==========
# 源文件: api/middleware.py 行 1-60

# L2: 【文档】文件说明:API 中间件
# L3: 【文档】业务说明:提供请求级别的通用中间件,如 IP 速率限制
# L4: 【文档】数据流向:HTTP 请求 -> 中间件拦截 -> 路由处理 -> HTTP 响应
# (L1-5 为函数/模块文档字符串,已转为注释便于阅读)

# L7: 导入依赖模块
import time
# L8: 导入依赖模块
from collections import defaultdict

# L10: 导入依赖模块
from starlette.middleware.base import BaseHTTPMiddleware
# L11: 导入依赖模块
from starlette.requests import Request
# L12: 导入依赖模块
from starlette.responses import JSONResponse


# L15: 定义类(配置或 ORM 模型)
class RateLimitMiddleware(BaseHTTPMiddleware):
# L17: 【文档】IP 速率限制中间件
# L19: 【文档】功能描述:
# L20: 【文档】基于客户端 IP 地址对请求进行速率限制,防止接口被恶意频繁调用。
# L21: 【文档】支持配置时间窗口内最大请求数,健康检查端点不受限制。
# L23: 【文档】入参:
# L24: 【文档】app: ASGI 应用实例
# L25: 【文档】max_requests (int): 时间窗口内最大请求数,默认 60
# L26: 【文档】window_seconds (int): 时间窗口(秒),默认 60
# L28: 【文档】出参:
# L29: 【文档】正常请求透传到下游处理
# L30: 【文档】超限请求返回 429 状态码
# (L16-31 为函数/模块文档字符串,已转为注释便于阅读)

# L33: 同步函数 __init__:路由决策或工厂方法
def __init__(self, app, max_requests: int = 60, window_seconds: int = 60):
# L34: 执行该语句(细节见上文业务描述)
super().__init__(app)
# L35: 赋值:更新局部变量或 state 字段
self.max_requests = max_requests
# L36: 赋值:更新局部变量或 state 字段
self.window_seconds = window_seconds
# L37: 赋值:更新局部变量或 state 字段
self._requests = defaultdict(list)

# L39: 异步函数 dispatch:可被 await,适合 IO 型 LLM/DB 调用
async def dispatch(self, request: Request, call_next):
# L40: 健康检查不限流
# L41: 条件分支
if request.url.path == "/health":
# L42: 返回本节点要合并进 state 的字段(LangGraph 会 merge)
return await call_next(request)

# L44: 赋值:更新局部变量或 state 字段
client_ip = request.client.host if request.client else "unknown"
# L45: 赋值:更新局部变量或 state 字段
now = time.time()

# L47: 清理过期记录
# L48: 赋值:更新局部变量或 state 字段
self._requests[client_ip] = [
# L49: 执行该语句(细节见上文业务描述)
t for t in self._requests[client_ip]
# L50: 条件分支
if now - t < self.window_seconds
# L51: 执行该语句(细节见上文业务描述)
]

# L53: 条件分支
if len(self._requests[client_ip]) >= self.max_requests:
# L54: 返回本节点要合并进 state 的字段(LangGraph 会 merge)
return JSONResponse(
# L55: 赋值:更新局部变量或 state 字段
status_code=429,
# L56: 赋值:更新局部变量或 state 字段
content={"error": "请求过于频繁", "detail": f"每{self.window_seconds}秒最多{self.max_requests}次请求"}
# L57: 执行该语句(细节见上文业务描述)
)

# L59: 执行该语句(细节见上文业务描述)
self._requests[client_ip].append(now)
# L60: 返回本节点要合并进 state 的字段(LangGraph 会 merge)
return await call_next(request)

create_app CORS + 中间件注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# ========== create_app CORS + 中间件注册 ==========
# 源文件: main.py 行 64-115

# L64: 同步函数 create_app:路由决策或工厂方法
def create_app() -> FastAPI:
# L66: 【文档】创建 FastAPI 应用实例
# L68: 【文档】功能描述:
# L69: 【文档】创建并配置 FastAPI 应用实例,包括:
# L70: 【文档】1. 设置应用标题、描述和版本信息
# L71: 【文档】2. 配置 CORS 中间件,允许前端跨域访问
# L72: 【文档】3. 注册所有 API 路由模块(chat、report、upload)
# L73: 【文档】4. 绑定生命周期管理器
# L75: 【文档】入参:
# L76: 【文档】无
# L78: 【文档】出参:
# L79: 【文档】FastAPI: 配置好的 FastAPI 应用实例,可直接用于 uvicorn 启动
# (L65-80 为函数/模块文档字符串,已转为注释便于阅读)
# L81: 开始 try 块,后续 except 负责兜底
try:
# L82: 记录日志,便于线上排查节点入参/出参
logger.info("[create_app] 开始创建 FastAPI 应用实例")

# L84: 赋值:更新局部变量或 state 字段
app = FastAPI(
# L85: 赋值:更新局部变量或 state 字段
title=settings.APP_NAME,
# L86: 赋值:更新局部变量或 state 字段
description="iCan - 智能职业规划 AI Agent 系统",
# L87: 赋值:更新局部变量或 state 字段
version=settings.APP_VERSION,
# L88: 赋值:更新局部变量或 state 字段
lifespan=lifespan,
# L89: 执行该语句(细节见上文业务描述)
)

# L91: 配置 CORS 中间件
# L92: 执行该语句(细节见上文业务描述)
app.add_middleware(
# L93: 执行该语句(细节见上文业务描述)
CORSMiddleware,
# L94: 赋值:更新局部变量或 state 字段
allow_origins=["*"], # MVP 阶段允许所有来源,生产环境需限制
# L95: 赋值:更新局部变量或 state 字段
allow_credentials=True,
# L96: 赋值:更新局部变量或 state 字段
allow_methods=["*"],
# L97: 赋值:更新局部变量或 state 字段
allow_headers=["*"],
# L98: 执行该语句(细节见上文业务描述)
)
# L99: 记录日志,便于线上排查节点入参/出参
logger.info("[create_app] CORS 中间件配置完成")

# L101: 配置速率限制中间件
# L102: 导入依赖模块
from ican.api.middleware import RateLimitMiddleware
# L103: 赋值:更新局部变量或 state 字段
app.add_middleware(RateLimitMiddleware, max_requests=60, window_seconds=60)
# L104: 记录日志,便于线上排查节点入参/出参
logger.info("[create_app] 速率限制中间件配置完成")

# L106: 根路由重定向
# L107: 装饰器
@app.get("/", include_in_schema=False)
# L108: 异步函数 root:可被 await,适合 IO 型 LLM/DB 调用
async def root():
# L109: 返回本节点要合并进 state 的字段(LangGraph 会 merge)
return RedirectResponse(url="/static/index.html")

# L111: 注册路由
# L112: 执行该语句(细节见上文业务描述)
app.include_router(auth.router)
# L113: 执行该语句(细节见上文业务描述)
app.include_router(ws.router)
# L114: 执行该语句(细节见上文业务描述)
app.include_router(chat.router)
# L115: 执行该语句(细节见上文业务描述)
app.include_router(report.router)

系列导航

主题
1 系统全景
2 五 Agent 协作
3 霍兰德 RIASEC
4–7 状态 · 路由 · 嵌套 · 容错
8–11 LLM 层 · SSE/WS · DB 迁移 · PDF
12–14 JSON Prompt · RIASEC Prompt · Guide Prompt
15–17 Docker · 16 中间件(本篇) · 配置

← 返回 iCan 专题