📊 目录导航

  1. 为什么RAG系统需要三存储架构?
  2. 三存储职责划分与设计哲学
  3. 双写一致性保障机制
  4. 完整实现代码(生产级)
  5. 数据同步与故障恢复
  6. 性能优化与扩展策略
  7. 监控运维与最佳实践
  8. 总结与架构演进路线图

为什么RAG系统需要三存储架构?

单存储的致命缺陷

让我们看看如果只用单一存储会遇到什么问题:

❌ 方案1:只存MySQL(关系数据库)

1
2
3
4
5
6
7
# 假设所有数据都存MySQL
query = "SELECT content FROM documents WHERE MATCH(content) AGAINST('糖尿病' IN NATURAL LANGUAGE MODE)"

# 问题1: 全文检索慢!1000万条数据,查询耗时 > 5秒
# 问题2: 无法做语义相似度搜索
# 问题3: 大字段(PDF原文、图片)导致备份困难
# 问题4: 无法利用向量索引加速

后果:QPS < 10,用户体验极差

❌ 方案2:只存Milvus(向量数据库)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 假设所有数据都塞进Milvus的dynamic fields
data = {
"doc_id": "doc_001",
"vector": [0.1, 0.2, ...], # 1024维
"content": "这里是完整的文档内容...", # 可能几KB到几MB
"metadata": "{...}", # JSON字符串
"original_file": base64_encoded_pdf, # 二进制!
}

milvus.insert(data)

# 问题1: Milvus不适合存大文件(内存爆炸)
# 问题2: 标量查询效率低(没有索引优化)
# 问题3: 数据迁移和备份复杂
# 问题4: 无法用SQL做复杂分析

后果:内存占用是正常的5-10倍,成本高昂

❌ 方案3:只存MinIO(对象存储)

1
2
3
4
5
6
7
# 所有数据都存成文件
minio.put_object("bucket", f"docs/{doc_id}.json", json.dumps(doc))

# 问题1: 没有元数据索引,只能全量扫描
# 问题2: 无法做向量和关键词检索
# 问题3: 并发读写锁竞争严重
# 问题4: 数据版本管理混乱

后果:无法支持在线检索,只能离线批处理

三存储协同的优势

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
graph TB
subgraph WhyTriple["为什么需要三存储?"]
direction TB

subgraph MySQL_Role["🐬 MySQL 角色"]
M1["✅ 结构化元数据<br/>doc_id, title, source, tags..."]
M2["✅ 业务逻辑查询<br/>按时间/类型/状态筛选"]
M3["✅ 事务性操作<br/>ACID保证"]
M4["✅ 复杂关联分析<br/>JOIN, GROUP BY"]
end

subgraph Milvus_Role["🔷 Milvus 角色"]
V1["✅ 向量相似度检索<br/>语义搜索核心"]
V2["✅ 高性能ANN索引<br/>HNSW/IVF_PQ"]
V3["✅ 多模态统一<br/>text/image/table"]
V4["✅ 实时近实时搜索<br/>毫秒级响应"]
end

subgraph MinIO_Role["🪣 MinIO 角色"]
O1["✅ 原始文件存储<br/>PDF/Word/Image/Video"]
O2["✅ 大容量低成本<br/>PB级别扩展"]
O3["✅ 版本管理与快照<br/>数据回滚能力"]
O4["✅ CDN分发加速<br/>静态资源访问"]
end
end

Synergy["🔄 协同效应:<br/>1+1+1 > 3"]

MySQL_Role --> Synergy
Milvus_Role --> Synergy
MinIO_Role --> Synergy

各存储特性对比

特性 MySQL Milvus MinIO
数据类型 结构化元数据 向量 + 轻量标量 文件/二进制大对象
查询能力 SQL (强) 向量相似度 (强) 简单CRUD (弱)
事务支持 ✅ ACID ❌ 最终一致 ❌ 原子操作
适用规模 亿级行 千万-亿级向量 PB级文件
延迟特性 ms级 (带索引) ms级 (ANN) 10-100ms (网络)
成本模型 按CPU/内存 按内存/SSD 按存储空间
典型场景 元数据管理 语义检索 原文归档

参见站内《RAG 离线部分:元数据增强与知识图谱融合预处理》 — 三存储与元数据、血缘的设计依据

三存储职责划分与设计哲学

架构总览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
flowchart TB
subgraph InputLayer["输入层"]
Upload["📤 文件上传<br/>(PDF/Word/Image)"]
API["🔗 API调用<br/>(REST/gRPC)"]
BatchImport["📦 批量导入<br/>(ETL任务)"]
end

subgraph ProcessingLayer["处理层"]
Extractor["🔍 内容提取器<br/>(PyMuPDF/PaddleOCR)"]
Chunker["✂️ 分块器<br/>(5种策略)"]
Embedder["🎯 向量化器<br/>(BGE-M3)"]
end

subgraph StorageLayer["三存储层"]
direction LR

subgraph MySQL_DB["🐬 MySQL 8.0"]
direction TB
DocsTable["documents表<br/>(文档元数据)"]
ChunksTable["chunks表<br/>(分块记录)"]
TasksTable["tasks表<br/>(任务状态)"]

DocsTable --> ChunksTable
end

subgraph Milvus_Store["🔷 Milvus 2.x"]
direction TB
DenseIndex["Dense Vectors<br/>(COSINE/HNSW)"]
SparseIndex["Sparse Weights<br/>(BM25-like)"]
MetadataFilter["Scalar Indexes<br/>(modal_type/business_tag)"]
end

subgraph MinIO_Bucket["🪣 MinIO"]
direction TB
RawFiles["raw-files/<br/>原始文件"]
ProcessedData["processed-data/<br/>处理后内容"]
Models["models/<br/>模型文件"]
Backups["backups/<br/>定期快照"]
end
end

subgraph ServiceLayer["服务层"]
DualWriteService["✍️ 双写协调器<br/>(一致性保证)"]
SyncService["🔄 同步服务<br/>(补偿修复)"]
CacheLayer["💾 Redis缓存层<br/>(热点数据)"]
end

Input --> ProcessingLayer
ProcessingLayer --> ServiceLayer
ServiceLayer --> StorageLayer

DualWriteService --> |"事务性写入"| MySQL_DB
DualWriteService --> |"异步写入"| Milvus_Store
DualWriteService --> |"并行上传"| MinIO_Bucket

数据模型设计

MySQL Schema(元数据存储)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
-- ============================================
-- RAG系统 - MySQL数据库Schema设计
-- ============================================

-- 1. 文档主表
CREATE TABLE documents (
doc_id VARCHAR(64) PRIMARY KEY COMMENT '业务文档ID',
title VARCHAR(512) NOT NULL COMMENT '文档标题',
source_type ENUM('pdf', 'word', 'image', 'webpage', 'table') NOT NULL COMMENT '来源类型',
source_url VARCHAR(1024) COMMENT '原始URL或路径',

-- 业务属性
business_tag VARCHAR(128) DEFAULT '' COMMENT '业务标签(医疗/法律/金融)',
domain VARCHAR(64) DEFAULT '' COMMENT '领域分类',
language VARCHAR(16) DEFAULT 'zh' COMMENT '语言',

-- 文件信息
file_size BIGINT DEFAULT 0 COMMENT '文件大小(bytes)',
page_count INT DEFAULT 0 COMMENT '页数(针对PDF)',
oss_key VARCHAR(256) COMMENT 'MinIO对象键',

-- 处理状态
status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending',
chunk_count INT DEFAULT 0 COMMENT '分块数量',

-- 时间戳
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted_at TIMESTAMP NULL,

INDEX idx_business_tag (business_tag),
INDEX idx_status (status),
INDEX idx_created_at (created_at),
INDEX idx_source_type (source_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG文档主表';


-- 2. 分块记录表
CREATE TABLE chunks (
chunk_id VARCHAR(64) PRIMARY KEY COMMENT '分块唯一ID',
doc_id VARCHAR(64) NOT NULL COMMENT '所属文档ID',

-- 分块信息
chunk_index INT NOT NULL COMMENT '在文档中的序号',
chunk_strategy VARCHAR(32) NOT NULL COMMENT '使用的分块策略',
vector_level VARCHAR(16) DEFAULT 'text' COMMENT '向量化粒度(text/table/row/col/cell)',

-- 内容信息
content TEXT COMMENT '分块文本内容(前500字符用于预览)',
content_hash VARCHAR(64) COMMENT '内容哈希(去重用)',
char_count INT DEFAULT 0 COMMENT '字符数',

-- 关联信息
parent_chunk_id VARCHAR(64) COMMENT '父分块ID(父子分块时)',
associate_id VARCHAR(64) COMMENT '关联ID(表格等结构化数据)',

-- 向量化状态
embedding_status ENUM('pending', 'embedded', 'failed') DEFAULT 'pending',
milvus_synced BOOLEAN DEFAULT FALSE COMMENT '是否已同步至Milvus',

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

UNIQUE KEY uk_doc_index (doc_id, chunk_index),
INDEX idx_doc_id (doc_id),
INDEX idx_vector_level (vector_level),
INDEX idx_embedding_status (embedding_status),
INDEX idx_content_hash (content_hash),

FOREIGN KEY (doc_id) REFERENCES documents(doc_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG文档分块表';


-- 3. 任务队列表(用于异步处理)
CREATE TABLE processing_tasks (
task_id VARCHAR(64) PRIMARY KEY,
task_type ENUM('extract', 'chunk', 'embed', 'sync') NOT NULL,
doc_id VARCHAR(64) NOT NULL,

status ENUM('queued', 'running', 'completed', 'failed') DEFAULT 'queued',
priority INT DEFAULT 0 COMMENT '优先级(数字越大越优先)',
retry_count INT DEFAULT 0 COMMENT '重试次数',

error_message TEXT COMMENT '错误信息',
result_json JSON COMMENT '执行结果',

started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

INDEX idx_status_priority (status, priority),
INDEX idx_doc_id (doc_id),
INDEX idx_task_type (task_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG处理任务表';


-- 4. 操作日志表(审计追踪)
CREATE TABLE operation_logs (
log_id BIGINT AUTO_INCREMENT PRIMARY KEY,
operation_type VARCHAR(32) NOT NULL COMMENT 'INSERT/UPDATE/DELETE/SYNC',
target_table VARCHAR(64) NOT NULL COMMENT '目标表名',
target_id VARCHAR(64) NOT NULL COMMENT '目标记录ID',

payload JSON COMMENT '操作前数据快照',
operator VARCHAR(64) DEFAULT 'system' COMMENT '操作人/系统',

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

INDEX idx_target (target_table, target_id),
INDEX idx_operation_type (operation_type),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
COMMENT='RAG操作日志表';

Milvus Collection设计(已在第5篇详细说明)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Collection: rag_vectors
Fields:
- doc_id: VARCHAR(64) [PK]
- vector: FLOAT_VECTOR(1024) # BGE-M3维度
- modal_type: VARCHAR(16) # text/image/table/video
- business_tag: VARCHAR(128)
- vector_level: VARCHAR(16) # table/row/col/cell
- associate_id: VARCHAR(64)
- oss_id: VARCHAR(64)

Indexes:
- vector: HNSW(M=16, efConstruction=200, metric=COSINE)
- modal_type: Trie
- business_tag: Trie

MinIO Bucket规划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Bucket: rag-knowledge-base
├── raw-files/ # 原始文件
│ ├── pdf/
│ │ └── {YYYY}/{MM}/{DD}/{doc_id}.pdf
│ ├── word/
│ ├── image/
│ └── webpage/

├── processed-data/ # 处理后的数据
│ ├── extracted-text/ # 提取的纯文本
│ │ └── {doc_id}.txt
│ ├── tables/ # 解析出的表格
│ │ └── {doc_id}/
│ │ └── table_{n}.json
│ └── images/ # 提取的图片
│ └── {doc_id}/
│ └── img_{n}.png

├── models/ # 模型文件
│ ├── bge-m3/
│ └── paddleocr/

├── backups/ # 定期备份
│ ├── mysql/
│ │ └── dump_{timestamp}.sql
│ └── metadata/
│ └── manifest_{timestamp}.json

└── temp/ # 临时文件(定期清理)
└── upload_{uuid}/

双写一致性保障机制

一致性挑战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
sequenceDiagram
participant App as 应用程序
participant DWS as 双写服务
participant MySQL as MySQL
participant Milvus as Milvus
participant MinIO as MinIO

Note over App,MinIO: 场景: 写入新文档

App->>DWS: 1. 上传文档 + 元数据

par 并行写入
DWS->>MySQL: 2a. INSERT document (事务开始)
DWS->>MinIO: 2b. PUT object (文件上传)
DWS->>Milvus: 2c. INSERT vectors (批量)
end

alt 全部成功
MySQL-->>DWS: ✅ COMMIT
DWS-->>App: 返回成功
else MySQL成功, Milvus失败
MySQL-->>DWS: ❌ 需要回滚或标记为待同步
DWS->>DWS: 记录到补偿队列
DWS-->>App: 返回"处理中"(最终一致)
else MinIO失败
DWS->>MySQL: ROLLBACK
DWS-->>App: 返回失败
end

一致性等级定义

等级 名称 保证 适用场景 实现成本
强一致 Atomic 要么全成功,要么全失败 金融交易、关键数据 高(分布式事务)
最终一致 Eventual 允许短暂不一致,最终收敛 知识库更新、内容发布 中(消息队列+补偿)
因果一致 Causal 有序的操作看到有序的结果 用户即时查看自己的提交 低(会话绑定)

我们的选择最终一致性 + 关键路径强一致

双写协调器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# dual_write_service.py
"""
双写协调服务 - 生产级实现
保证MySQL、Milvus、MinIO三存储的数据一致性
"""

import time
import uuid
import threading
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from enum import Enum
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import traceback

logger = logging.getLogger(__name__)


class WriteOperation(Enum):
"""写操作类型"""
INSERT = "insert"
UPDATE = "update"
DELETE = "delete"
SYNC = "sync"


class ConsistencyLevel(Enum):
"""一致性等级"""
STRONG = "strong"
EVENTUAL = "eventual"
CAUSAL = "causal"


@dataclass
class WriteResult:
"""写操作结果"""
success: bool
operation: WriteOperation
target: str # mysql/milvus/minio
data_id: str
error: Optional[str] = None
latency_ms: float = 0


@dataclass
class DualWriteResult:
"""双写操作的聚合结果"""
operation_id: str
overall_success: bool
consistency_achieved: bool
results: List[WriteResult] = field(default_factory=list)
compensation_needed: bool = False
total_latency_ms: float = 0


class DualWriteCoordinator:
"""
双写协调器

核心功能:
1. 编排多存储写入顺序
2. 处理部分失败场景
3. 触发补偿机制
4. 保证最终一致性
"""

def __init__(
self,
mysql_client,
milvus_client,
minio_client,
consistency_level: ConsistencyLevel = ConsistencyLevel.EVENTUAL,
max_retries: int = 3,
retry_delay_base: float = 1.0,
enable_compensation_queue: bool = True
):
self.mysql = mysql_client
self.milvus = milvus_client
self.minio = minio_client
self.consistency_level = consistency_level
self.max_retries = max_retries
self.retry_delay_base = retry_delay_base
self.enable_compensation_queue = enable_compensation_queue

# 补偿队列(用于异步修复)
self.compensation_queue = []
self._queue_lock = threading.Lock()

# 统计信息
self.stats = {
'total_operations': 0,
'successful_operations': 0,
'compensations_triggered': 0,
'avg_latency_ms': 0,
}

def execute_dual_write(
self,
operation: WriteOperation,
mysql_data: Dict = None,
milvus_data: List[Dict] = None,
minio_file_path: str = None,
minio_file_content: bytes = None,
transaction_context: Dict = None
) -> DualWriteResult:
"""
执行双写操作

参数:
operation: 操作类型
mysql_data: 要写入MySQL的数据字典
milvus_data: 要写入Milvus的数据列表
minio_file_path: MinIO对象路径
minio_file_content: 文件内容字节
transaction_context: 事务上下文(用于日志追踪)
"""
op_id = str(uuid.uuid4())[:8]
start_time = time.time()

logger.info(f"[{op_id}] 开始双写操作: {operation.value}")

results = []
all_success = True
compensation_needed = False

try:
if operation == WriteOperation.INSERT:
results = self._execute_insert(
op_id, mysql_data, milvus_data,
minio_file_path, minio_file_content
)
elif operation == WriteOperation.UPDATE:
results = self._execute_update(
op_id, mysql_data, milvus_data, minio_file_path, minio_file_content
)
elif operation == WriteOperation.DELETE:
results = self._execute_delete(op_id, mysql_data, milvus_data, minio_file_path)

# 检查结果
failed_targets = [r for r in results if not r.success]
all_success = len(failed_targets) == 0

if not all_success:
compensation_needed = True
logger.warning(
f"[{op_id}] 部分写入失败: "
f"{[f'{r.target}({r.error})' for r in failed_targets]}"
)

if self.enable_compensation_queue:
self._enqueue_compensation(
op_id, operation, mysql_data, milvus_data,
minio_file_path, minio_file_content, failed_targets
)

except Exception as e:
logger.error(f"[{op_id}] 双写异常: {e}\n{traceback.format_exc()}")
all_success = False
compensation_needed = True

results.append(WriteResult(
success=False,
operation=operation,
target="coordinator",
data_id=op_id,
error=str(e)
))

# 更新统计
elapsed_ms = (time.time() - start_time) * 1000
self.stats['total_operations'] += 1
if all_success:
self.stats['successful_operations'] += 1

result = DualWriteResult(
operation_id=op_id,
overall_success=all_success,
consistency_achieved=all_success or self.consistency_level != ConsistencyLevel.STRONG,
results=results,
compensation_needed=compensation_needed,
total_latency_ms=elapsed_ms
)

logger.info(
f"[{op_id}] 双写完成: success={all_success}, "
f"latency={elapsed_ms:.0f}ms, targets={len(results)}"
)

return result

def _execute_insert(
self,
op_id: str,
mysql_data: Dict,
milvus_data: List[Dict],
minio_path: str,
minio_content: bytes
) -> List[WriteResult]:
"""
执行插入操作(推荐顺序: MinIO → MySQL → Milvus)
"""
results = []

# Step 1: 先上传文件到MinIO(确保原始数据安全)
if minio_path and minio_content:
result = self._safe_write(
op_id, "minio", WriteOperation.INSERT,
lambda: self._upload_to_minio(minio_path, minio_content),
data_id=minio_path
)
results.append(result)

if not result.success:
# MinIO失败则终止整个流程
return results

# Step 2: 写入MySQL(事务性操作,作为真实源)
if mysql_data:
result = self._safe_write(
op_id, "mysql", WriteOperation.INSERT,
lambda: self._insert_to_mysql(mysql_data),
data_id=mysql_data.get('doc_id', '')
)
results.append(result)

if not result.success:
# MySQL失败需要回滚MinIO(如果已上传)
if minio_path:
try:
self.minio.remove_object('rag-knowledge-base', minio_path)
except:
pass
return results

# Step 3: 最后写入Milvus(可以异步补偿)
if milvus_data:
result = self._safe_write(
op_id, "milvus", WriteOperation.INSERT,
lambda: self._insert_to_milvus(milvus_data),
data_id=mysql_data.get('doc_id', '') if mysql_data else ''
)
results.append(result)
# Milvus失败不阻断流程,后续通过补偿队列修复

return results

def _safe_write(
self,
op_id: str,
target: str,
operation: WriteOperation,
write_func,
data_id: str,
retry: int = 0
) -> WriteResult:
"""
安全执行写操作(带重试)
"""
start = time.time()

try:
write_func()
latency = (time.time() - start) * 1000

return WriteResult(
success=True,
operation=operation,
target=target,
data_id=data_id,
latency_ms=latency
)

except Exception as e:
latency = (time.time() - start) * 1000

if retry < self.max_retries:
delay = self.retry_delay_base * (2 ** retry) # 指数退避
logger.warning(
f"[{op_id}] {target} 写入失败(第{retry+1}次), "
f"{delay}s后重试: {e}"
)
time.sleep(delay)
return self._safe_write(
op_id, target, operation, write_func,
data_id, retry + 1
)
else:
logger.error(f"[{op_id}] {target} 写入最终失败: {e}")
return WriteResult(
success=False,
operation=operation,
target=target,
data_id=data_id,
error=str(e),
latency_ms=latency
)

def _upload_to_minio(self, path: str, content: bytes):
"""上传文件到MinIO"""
self.minio.put_object(
bucket_name="rag-knowledge-base",
object_name=path,
data=content,
length=len(content)
)

def _insert_to_mysql(self, data: Dict):
"""插入数据到MySQL"""
# 这里使用参数化查询防止SQL注入
sql = """
INSERT INTO documents
(doc_id, title, source_type, business_tag, oss_key, status)
VALUES (%s, %s, %s, %s, %s, %s)
"""
self.mysql.execute(sql, (
data['doc_id'],
data['title'],
data['source_type'],
data.get('business_tag', ''),
data.get('oss_key', ''),
'completed'
))

def _insert_to_milvus(self, data_list: List[Dict]):
"""批量插入数据到Milvus"""
if data_list:
self.milvus.insert(
collection_name="rag_vectors",
data=data_list
)

def _enqueue_compensation(
self,
op_id: str,
operation: WriteOperation,
mysql_data: Dict,
milvus_data: List[Dict],
minio_path: str,
minio_content: bytes,
failed_targets: List[WriteResult]
):
"""将失败操作加入补偿队列"""
with self._queue_lock:
compensation_item = {
'operation_id': op_id,
'operation': operation,
'mysql_data': mysql_data,
'milvus_data': milvus_data,
'minio_path': minio_path,
'minio_content': minio_content,
'failed_targets': [t.target for t in failed_targets],
'created_at': time.time(),
'retry_count': 0
}
self.compensation_queue.append(compensation_item)
self.stats['compensations_triggered'] += 1

logger.info(f"[{op_id}] 已加入补偿队列, 当前队列长度: {len(self.compensation_queue)}")

def process_compensation_queue(self, max_items: int = 100):
"""
处理补偿队列中的失败操作
应该由后台定时任务调用
"""
with self._queue_lock:
items_to_process = self.compensation_queue[:max_items]
self.compensation_queue = self.compensation_queue[max_items:]

processed = 0
success_count = 0

for item in items_to_process:
if item['retry_count'] >= self.max_retries:
logger.error(
f"补偿操作达到最大重试次数, 放弃: {item['operation_id']}"
)
continue

item['retry_count'] += 1

try:
result = self.execute_dual_write(
operation=item['operation'],
mysql_data=item['mysql_data'],
milvus_data=item['milvus_data'],
minio_file_path=item['minio_path'],
minio_file_content=item['minio_content']
)

if result.overall_success:
success_count += 1
logger.info(f"补偿成功: {item['operation_id']}")
else:
# 重新放回队列末尾
with self._queue_lock:
self.compensation_queue.append(item)

except Exception as e:
logger.error(f"补偿执行异常: {item['operation_id']}, {e}")
with self._queue_lock:
self.compensation_queue.append(item)

processed += 1

logger.info(
f"补偿队列处理完成: 处理={processed}, 成功={success_count}, "
f"剩余队列长度={len(self.compensation_queue)}"
)

return {'processed': processed, 'success': success_count}

def get_statistics(self) -> Dict:
"""获取统计信息"""
return {
**self.stats,
'compensation_queue_size': len(self.compensation_queue),
'success_rate': (
self.stats['successful_operations'] / max(1, self.stats['total_operations']) * 100
)
}

参见站内《RAG 落地:生产环境部署与性能监控实践》 — 数据底座在生产环境的运维要点

数据同步与故障恢复

同步检测机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# data_consistency_checker.py
"""
数据一致性检查器
定期比对MySQL、Milvus、MinIO的数据一致性
"""

import time
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
import hashlib
import logging

logger = logging.getLogger(__name__)


@dataclass
class ConsistencyReport:
"""一致性报告"""
check_time: str
total_records_mysql: int
total_records_milvus: int
total_files_minio: int

inconsistencies: List[Dict]
summary: Dict

is_healthy: bool
recommendations: List[str]


class DataConsistencyChecker:
"""
数据一致性检查器

功能:
1. 定期抽样检查三存储数据量是否匹配
2. 发现孤儿记录(在一个存储存在但其他不存在)
3. 生成修复建议
4. 支持自动修复(可选)
"""

def __init__(
self,
mysql_client,
milvus_client,
minio_client,
sample_ratio: float = 0.01 # 抽样比例1%
):
self.mysql = mysql_client
self.milvus = milvus_client
self.minio = minio_client
self.sample_ratio = sample_ratio

def run_full_check(self) -> ConsistencyReport:
"""
执行完整的一致性检查
"""
from datetime import datetime

start_time = time.time()
report = ConsistencyReport(
check_time=datetime.now().isoformat(),
total_records_mysql=0,
total_records_milvus=0,
total_files_minio=0,
inconsistencies=[],
summary={},
is_healthy=True,
recommendations=[]
)

logger.info("🔍 开始数据一致性检查...")

# 1. 统计各存储的数据量
try:
mysql_count = self._count_mysql_documents()
report.total_records_mysql = mysql_count
except Exception as e:
logger.error(f"MySQL计数失败: {e}")
report.inconsistencies.append({
'type': 'connection_error',
'target': 'mysql',
'error': str(e)
})

try:
milvus_count = self._count_milvus_vectors()
report.total_records_milvus = milvus_count
except Exception as e:
logger.error(f"Milvus计数失败: {e}")
report.inconsistencies.append({
'type': 'connection_error',
'target': 'milvus',
'error': str(e)
})

try:
minio_count = self._count_minio_objects()
report.total_files_minio = minio_count
except Exception as e:
logger.error(f"MinIO计数失败: {e}")
report.inconsistencies.append({
'type': 'connection_error',
'target': 'minio',
'error': str(e)
})

# 2. 抽样比对
if report.total_records_mysql > 0:
sample_docs = self._sample_mysql_documents(int(report.total_records_mysql * self.sample_ratio))

for doc in sample_docs:
doc_id = doc['doc_id']
issues = self._check_document_consistency(doc_id)
if issues:
report.inconsistencies.extend(issues)

# 3. 生成报告摘要
elapsed = time.time() - start_time
inconsistency_count = len([i for i in report.inconsistencies if i.get('type') != 'connection_error'])

report.summary = {
'check_duration_seconds': round(elapsed, 2),
'inconsistency_count': inconsistency_count,
'inconsistency_rate': (
inconsistency_count / max(1, report.total_records_mysql * self.sample_ratio) * 100
),
'health_score': max(0, 100 - inconsistency_count * 10)
}

report.is_healthy = inconsistency_count == 0 and len(
[i for i in report.inconsistencies if i['type'] == 'connection_error']
) == 0

# 4. 生成建议
if not report.is_healthy:
report.recommendations = self._generate_recommendations(report)

logger.info(
f"✅ 一致性检查完成: "
f"MySQL={report.total_records_mysql}, "
f"Milvus={report.total_records_milvus}, "
f"MinIO={report.total_files_minio}, "
f"问题数={inconsistency_count}, "
f"健康度={report.summary['health_score']}%"
)

return report

def _check_document_consistency(self, doc_id: str) -> List[Dict]:
"""检查单个文档在三存储中的一致性"""
issues = []

# 检查MySQL
mysql_exists = self._document_exists_in_mysql(doc_id)

# 检查Milvus
milvus_exists = self._document_exists_in_milvus(doc_id)

# 检查MinIO
minio_exists, minio_key = self._file_exists_in_minio(doc_id)

# 分析不一致情况
if mysql_exists and not milvus_exists:
issues.append({
'type': 'missing_in_milvus',
'doc_id': doc_id,
'severity': 'high',
'description': f'文档存在于MySQL但缺失于Milvus'
})

if mysql_exists and not minio_exists:
issues.append({
'type': 'missing_in_minio',
'doc_id': doc_id,
'severity': 'medium',
'description': f'文档存在于MySQL但文件缺失于MinIO'
})

if not mysql_exists and (milvus_exists or minio_exists):
issues.append({
'type': 'orphan_record',
'doc_id': doc_id,
'severity': 'high',
'description': f'孤儿记录: 存在于Milvus/MinIO但不在MySQL中'
})

return issues

def _generate_recommendations(self, report: ConsistencyReport) -> List[str]:
"""基于检查结果生成修复建议"""
recommendations = []

issue_types = set(i['type'] for i in report.inconsistencies)

if 'missing_in_milvus' in issue_types:
count = len([i for i in report.inconsistencies if i['type'] == 'missing_in_milvus'])
recommendations.append(
f"🔧 发现{count}个文档缺失于Milvus,建议运行: python sync_to_milvus.py --fix"
)

if 'missing_in_minio' in issue_types:
count = len([i for i in report.inconsistencies if i['type'] == 'missing_in_minio'])
recommendations.append(
f"⚠️ 发现{count}个文件缺失于MinIO,建议检查上传流程"
)

if 'orphan_record' in issue_types:
count = len([i for i in report.inconsistencies if i['type'] == 'orphan_record'])
recommendations.append(
f"🗑️ 发现{count}条孤儿记录,建议清理: python cleanup_orphans.py"
)

if 'connection_error' in issue_types:
recommendations.append("❌ 存储连接异常,请检查网络和服务状态")

return recommendations

# 以下为具体的检查方法(简化实现)
def _count_mysql_documents(self) -> int:
result = self.mysql.fetch_one("SELECT COUNT(*) as cnt FROM documents WHERE deleted_at IS NULL")
return result['cnt'] if result else 0

def _count_milvus_vectors(self) -> int:
stats = self.milvus.get_collection_stats(collection_name="rag_vectors")
return stats.get('row_count', 0)

def _count_minio_objects(self) -> int:
objects = self.minio.list_objects("rag-knowledge-base", recursive=True)
return len(objects)

def _sample_mysql_documents(self, limit: int) -> List[Dict]:
query = f"SELECT doc_id, oss_key FROM documents WHERE deleted_at IS NULL ORDER BY RAND() LIMIT {limit}"
return self.mysql.fetch_all(query)

def _document_exists_in_mysql(self, doc_id: str) -> bool:
result = self.mysql.fetch_one(
"SELECT 1 FROM documents WHERE doc_id = %s AND deleted_at IS NULL",
(doc_id,)
)
return result is not None

def _document_exists_in_milvus(self, doc_id: str) -> bool:
try:
results = self.milvus.search(
collection_name="rag_vectors",
data=[[0.0]*1024], # dummy vector
limit=1,
filter=f'doc_id == "{doc_id}"'
)
return len(results[0]) > 0
except:
return False

def _file_exists_in_minio(self, doc_id: str) -> Tuple[bool, str]:
try:
# 尝试获取文档对应的OSS key
result = self.mysql.fetch_one(
"SELECT oss_key FROM documents WHERE doc_id = %s",
(doc_id,)
)
if result and result.get('oss_key'):
self.minio.stat_object("rag-knowledge-base", result['oss_key'])
return True, result['oss_key']
except:
pass
return False, ""

性能优化与扩展策略

写入性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
graph TB
subgraph WriteOptimization["写入性能优化策略"]
direction TB

subgraph BatchProcessing["批量处理"]
B1["📦 批量Insert<br/>MySQL: 1000条/批"]
B2["🚀 批量Embedding<br/>GPU: 32-64条/批"]
B3["⚡ 批量Milvus写入<br/>10000条/批"]
end

subgraph AsyncPipeline["异步流水线"]
A1["阶段1: 文件上传<br/>(I/O密集)"]
A2["阶段2: 内容提取<br/>(CPU密集)"]
A3["阶段3: 分块向量化<br/>(GPU密集)"]
A4["阶段4: 双写存储<br/>(网络密集)"]

A1 --> |"并行"| A2
A2 --> |"流水线"| A3
A3 --> |"流水线"| A4
end

subgraph CachingStrategy["缓存策略"]
C1["📝 写入缓冲<br/>Redis Queue"]
C2["💾 本地缓存<br/>LRU热点"]
C3["🔄 预写日志<br/>WAL机制"]
end
end

异步处理管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# async_processing_pipeline.py
"""
异步处理管道
将耗时的ETL操作异步化,提升用户感知性能
"""

import asyncio
import uuid
from typing import Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import logging
from queue import Queue
import threading

logger = logging.getLogger(__name__)


class PipelineStage(Enum):
"""处理阶段"""
UPLOAD = "upload"
EXTRACT = "extract"
CHUNK = "chunk"
EMBED = "embed"
STORE = "store"
COMPLETE = "complete"
FAILED = "failed"


@dataclass
class PipelineTask:
"""管道任务"""
task_id: str
doc_id: str
file_path: str
file_content: bytes
metadata: Dict = field(default_factory=dict)

current_stage: PipelineStage = PipelineStage.UPLOAD
progress_pct: int = 0
error: Optional[str] = None
result: Optional[Dict] = None


class AsyncProcessingPipeline:
"""
异步处理管道

流程:
1. 接收任务 → 立即返回task_id
2. 后台依次执行各阶段
3. 支持进度查询
4. 失败自动重试
"""

def __init__(
self,
dual_write_service,
extractor,
chunker,
embedder,
max_concurrent_tasks: int = 10,
stage_timeout_seconds: int = 300
):
self.dws = dual_write_service
self.extractor = extractor
self.chunker = chunker
self.embedder = embedder

self.max_concurrent = max_concurrent_tasks
self.stage_timeout = stage_timeout_seconds

# 任务注册表
self.tasks: Dict[str, PipelineTask] = {}
self._tasks_lock = threading.Lock()

# 工作线程池
self.executor = None
self._running = False

async def submit_task(
self,
file_content: bytes,
filename: str,
metadata: Dict = None
) -> str:
"""
提交处理任务

返回task_id,可用来查询进度
"""
task_id = str(uuid.uuid4())[:12]
doc_id = f"doc_{task_id}"

task = PipelineTask(
task_id=task_id,
doc_id=doc_id,
file_path=filename,
file_content=file_content,
metadata=metadata or {}
)

with self._tasks_lock:
self.tasks[task_id] = task

# 启动异步处理
asyncio.create_task(self._process_task(task))

logger.info(f"📥 任务已提交: {task_id}")
return task_id

async def get_task_status(self, task_id: str) -> Dict:
"""查询任务状态"""
with self._tasks_lock:
task = self.tasks.get(task_id)

if not task:
return {'error': 'Task not found'}

return {
'task_id': task.task_id,
'doc_id': task.doc_id,
'stage': task.current_stage.value,
'progress': task.progress_pct,
'error': task.error,
'result': task.result
}

async def _process_task(self, task: PipelineTask):
"""执行完整的处理流程"""
try:
# Stage 1: 上传原始文件到MinIO
await self._run_stage(task, PipelineStage.UPLOAD, self._stage_upload)

# Stage 2: 内容提取
await self._run_stage(task, PipelineStage.EXTRACT, self._stage_extract)

# Stage 3: 分块
await self._run_stage(task, PipelineStage.CHUNK, self._stage_chunk)

# Stage 4: 向量化
await self._run_stage(task, PipelineStage.EMBED, self._stage_embed)

# Stage 5: 双写存储
await self._run_stage(task, PipelineStage.STORE, self._stage_store)

# 完成
task.current_stage = PipelineStage.COMPLETE
task.progress_pct = 100
task.result = {'doc_id': task.doc_id, 'status': 'success'}

logger.info(f"✅ 任务完成: {task.task_id}")

except Exception as e:
task.current_stage = PipelineStage.FAILED
task.error = str(e)
logger.error(f"❌ 任务失败: {task.task_id}, {e}")

async def _run_stage(
self,
task: PipelineTask,
stage: PipelineStage,
stage_func
):
"""执行单个阶段(带超时控制)"""
task.current_stage = stage

try:
await asyncio.wait_for(
stage_func(task),
timeout=self.stage_timeout
)
except asyncio.TimeoutError:
raise Exception(f"Stage {stage.value} timeout")

async def _stage_upload(self, task: PipelineTask):
"""Stage 1: 文件上传"""
task.progress_pct = 5

minio_path = f"raw-files/{task.file_path.split('.')[-1]}/{task.doc_id}.{task.file_path.split('.')[-1]}"

# 上传到MinIO(实际应调用dual_write_service)
# 这里简化示例
await asyncio.sleep(0.5) # 模拟I/O

task.metadata['minio_path'] = minio_path
task.progress_pct = 15

async def _stage_extract(self, task: PipelineTask):
"""Stage 2: 内容提取"""
task.progress_pct = 20

# 调用提取器
extracted_text = await asyncio.to_thread(
self.extractor.extract,
task.file_content,
task.file_path
)

task.metadata['extracted_text'] = extracted_text
task.progress_pct = 40

async def _stage_chunk(self, task: PipelineTask):
"""Stage 3: 分块"""
task.progress_pct = 45

chunks = await asyncio.to_thread(
self.chunker.chunk,
task.metadata['extracted_text'],
strategy='parent_child'
)

task.metadata['chunks'] = chunks
task.progress_pct = 60

async def _stage_embed(self, task: PipelineTask):
"""Stage 4: 向量化"""
task.progress_pct = 65

texts = [c['content'] for c in task.metadata['chunks']]
embeddings = await asyncio.to_thread(
self.embedder.embed_batch,
texts
)

task.metadata['embeddings'] = embeddings
task.progress_pct = 80

async def _stage_store(self, task: PipelineTask):
"""Stage 5: 双写存储"""
task.progress_pct = 85

# 准备数据
mysql_data = {
'doc_id': task.doc_id,
'title': task.metadata.get('title', task.file_path),
'source_type': task.file_path.split('.')[-1],
'business_tag': task.metadata.get('business_tag', ''),
'oss_key': task.metadata.get('minio_path', '')
}

milvus_data = [
{
'doc_id': f"{task.doc_id}_chunk_{i}",
'vector': emb.tolist(),
'modal_type': 'text',
'business_tag': task.metadata.get('business_tag', ''),
'vector_level': 'text',
'associate_id': task.doc_id
}
for i, emb in enumerate(task.metadata['embeddings'])
]

# 执行双写
result = await asyncio.to_thread(
self.dws.execute_dual_write,
WriteOperation.INSERT,
mysql_data=mysql_data,
milvus_data=milvus_data,
minio_file_path=task.metadata.get('minio_path'),
minio_file_content=task.file_content
)

task.metadata['write_result'] = result
task.progress_pct = 95

参见站内《RAG 评估:全链路指标设计与效果评测体系》 — 数据一致性、可回溯与评测闭环

监控运维与最佳实践

监控体系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
graph TB
subgraph MonitoringSystem["三存储监控系统"]
direction TB

subgraph Metrics["📊 关键指标"]
M1["MySQL指标<br/>• QPS<br/>• 连接数<br/>• 慢查询数<br/>• 主从延迟"]
M2["Milvus指标<br/>• 搜索QPS<br/>• P99延迟<br/>• 召回率<br/>• 内存使用"]
M3["MinIO指标<br/>• 对象数量<br/>• 存储用量<br/>• 请求延迟<br/>• 错误率"]
end

subgraph Alerts["🚨 告警规则"]
A1["MySQL: 连接数>800"]
A2["Milvus: P99>200ms"]
A3["MinIO: 磁盘使用>85%"]
A4["一致性: 不一致数>100"]
end

subgraph Dashboards["📈 可视化面板"]
D1["Grafana: 实时仪表盘"]
D2["一致性报告: 每日报告"]
D3["容量趋势: 预测告警"]
end
end

Metrics --> |"采集"| Prometheus["Prometheus"]
Alerts --> |"规则"| AlertManager["AlertManager"]
AlertManager --> |"通知"| Notification["钉钉/邮件/Slack"]
Prometheus --> Dashboards

最佳实践清单

✅ 设计阶段

  • 明确数据所有权:MySQL是Source of Truth
  • 定义一致性SLA:允许的最大不一致时间窗口
  • 设计幂等的API接口(防止重复写入)
  • 规划好数据生命周期(TTL、归档策略)

🏗️ 开发阶段

  • 实现完善的错误处理和重试机制
  • 添加详细的操作日志(审计追踪)
  • 编写单元测试覆盖各种失败场景
  • 使用事务确保MySQL操作的原子性

🚀 部署阶段

  • 配置合理的连接池大小
  • 设置适当的超时时间
  • 启用慢查询日志
  • 配置自动备份策略

🔍 运维阶段

  • 每日检查一致性报告
  • 监控补偿队列积压情况
  • 定期进行灾备演练
  • 根据业务增长调整容量

总结与架构演进路线图

核心价值总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
mindmap
root((三存储双写架构))
解决痛点
单点故障风险
数据不一致
查询能力受限
扩展性差
架构优势
职责清晰
各司其职
互补增强
独立扩展
关键技术
双写协调器
补偿队列
一致性检查
异步管道
生产保障
数据可靠性
服务可用性
可观测性
灾备能力
最佳实践
Source of Truth
幂等设计
渐进式交付
持续监控

架构演进路线图

阶段 时间 目标 关键里程碑
V1.0 第1月 MVP上线 基础双写 + 手动补偿
V1.5 第2月 自动化 补偿队列 + 一致性检查
V2.0 第3月 生产就绪 监控告警 + 性能优化
V2.5 第4-6月 规模化 读写分离 + 多集群
V3.0 第7-12月 云原生 Kubernetes + 自动扩缩容

快速启动模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/bin/bash
# quick_deploy.sh - 三存储快速部署脚本

echo "🚀 RAG三存储架构初始化..."

# 1. 启动MySQL
docker run -d \
--name rag-mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=root_password \
-e MYSQL_DATABASE=rag_base_multimodal \
-v mysql_data:/var/lib/mysql \
mysql:8.0

# 2. 启动Milvus
docker run -d \
--name rag-milvus \
-p 19530:19530 \
-p 9091:9091 \
-v milvus_data:/var/lib/milvus \
milvusdb/milvus:v2.3-latest

# 3. 启动MinIO
docker run -d \
--name rag-minio \
-p 9000:9000 \
-p 9001:9001 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
-v minio_data:/data \
minio/minio server /data --console-address ":9001"

# 4. 初始化数据库Schema
sleep 10
docker exec -i rag-mysql mysql -uroot -proot_password rag_base_multimodal < schema.sql

# 5. 创建MinIO Bucket
docker exec rag-minio mc alias set myminio http://localhost:9000 minioadmin minioadmin
docker exec rag-minio mc mb myminio/rag-knowledge-base

echo "✅ 三存储服务已启动:"
echo " MySQL: localhost:3306"
echo " Milvus: localhost:19530"
echo " MinIO Console: http://localhost:9001"
echo ""
echo "下一步: 编辑 .env 配置连接信息并启动应用"

🎉 恭喜你完成了整个RAG系列的学习!

至此,你已经掌握了:

  • 第1篇:Docker部署指南 - 快速搭建开发环境
  • 第2篇:PDF混合提取 - PyMuPDF + PaddleOCR-VL + MLX加速
  • 第3篇:5种分块策略 - 从入门到生产级的ChunkRouter
  • 第4篇:BGE-M3微调实战 - 让模型懂你的领域
  • 第5篇:Milvus Collection设计 + HNSW调优 - 向量数据库性能优化
  • 第6篇:表格4级向量化 - 让AI理解结构化数据
  • 第7篇:RRF多路融合排序 - 多通道检索效果倍增器
  • 第8篇:三存储双写架构 - 企业级数据可靠性保障

你现在拥有的是一套完整的、生产级的RAG系统知识体系,涵盖了从数据摄入、处理、存储、检索到优化的全链路技术栈!


💡 最后的话:技术学习的终点不是掌握工具,而是解决问题。希望这套系列文章能成为你构建高质量RAG系统的坚实基石。如果你在实践中遇到任何问题,欢迎随时交流探讨!

📊 全系列统计

  • 总计:8篇文章
  • 总代码量:约12,000行(含注释和示例)
  • draw.io图:35+个(架构图、流程图、思维导图)
  • 总阅读时长:约4小时
  • 覆盖技术点:50+个

关键词:RAG、三存储架构、双写一致性、MySQL、Milvus、MinIO、数据可靠性、draw.io架构图、生产部署、最佳实践


系列完 🎊


专题导航与站内延伸

本文属于 **企业级 RAG 数据管道实战专题**(工程实战 8 篇,与 RAG 实战全链路理论系列 配套阅读)。

本专题篇章

篇章 标题
第 1 篇 告别检索幻觉!手把手搭建企业级 RAG 数据管道(附 Docker 一键部署)
第 2 篇 PDF 提取总是丢表格?PyMuPDF + PaddleOCR-VL 混合方案实战(含 MLX 加速)
第 3 篇 RAG 分块怎么做才不丢上下文?5 种策略从入门到生产级(附选型决策树)
第 4 篇 BGE-M3 本地微调实战:从零搭建到生产级部署(附完整代码)
第 5 篇 Milvus 生产环境 Collection 设计 + HNSW 调优实战指南
第 6 篇 表格 4 级向量化方案:让 RAG 系统真正理解结构化数据
第 7 篇 RRF 多路融合排序:让 RAG 检索精度提升 30%+ 的秘密武器
第 8 篇 MySQL+Milvus+MinIO 三存储双写架构:构建企业级 RAG 数据底座

站内理论延伸

以下文章来自 RAG 全链路理论系列,帮助理解本专题所依赖的概念与方法论: