迁移实战
问题
需要将数据从一个数据库迁移到另一个数据库(如 MySQL → MySQL 分库分表、MySQL → PostgreSQL),如何做到不停机迁移?
答案
一、迁移全流程
二、各阶段详解
阶段 1:评估与规划
| 评估项 | 内容 |
|---|---|
| 数据量 | 总行数、总大小、增长速度 |
| 表结构差异 | 字段类型、索引、约束差异 |
| 业务依赖 | 哪些服务读写这些表 |
| 预计耗时 | 历史数据迁移时间估算 |
| 回滚方案 | 每个阶段如何回滚 |
阶段 2:双写
// 双写层:同时写入新旧库
class DualWriteRepository {
constructor(
private oldDB: Database,
private newDB: Database,
) {}
async create(data: Order): Promise<void> {
// 旧库为主,写入失败则整体失败
await this.oldDB.insert('orders', data);
// 新库为辅,写入失败只告警不阻塞
try {
await this.newDB.insert('orders', data);
} catch (error) {
// 记录差异日志,后续补偿
await this.logDifference('insert', data, error);
}
}
async update(id: string, data: Partial<Order>): Promise<void> {
await this.oldDB.update('orders', id, data);
try {
await this.newDB.update('orders', id, data);
} catch (error) {
await this.logDifference('update', { id, ...data }, error);
}
}
}
双写注意事项
- 旧库为主:新库写入失败不影响业务
- 差异日志:记录所有新库写入失败的数据
- 补偿机制:定时消费差异日志重试写入
- 幂等设计:补偿写入需要幂等
阶段 3:历史数据迁移
// 分批迁移历史数据
async function migrateHistory(batchSize = 5000) {
let lastId = 0;
while (true) {
// 按 ID 范围分批读取
const rows = await oldDB.query(
'SELECT * FROM orders WHERE id > ? ORDER BY id LIMIT ?',
[lastId, batchSize]
);
if (rows.length === 0) break;
// 批量写入新库(使用 UPSERT 保证幂等)
await newDB.batchUpsert('orders', rows);
lastId = rows[rows.length - 1].id;
// 打印进度
console.log(`Migrated up to id=${lastId}`);
// 控制速度,避免压垮数据库
await sleep(100);
}
}
阶段 4:数据校验
// 三级校验
async function verifyData() {
// 1. 行数校验
const oldCount = await oldDB.query('SELECT COUNT(*) FROM orders');
const newCount = await newDB.query('SELECT COUNT(*) FROM orders');
assert(oldCount === newCount, `行数不一致: old=${oldCount}, new=${newCount}`);
// 2. 聚合校验
const oldSum = await oldDB.query('SELECT SUM(amount) FROM orders');
const newSum = await newDB.query('SELECT SUM(amount) FROM orders');
assert(oldSum === newSum, `金额不一致`);
// 3. 抽样逐行校验
const sampleIds = await oldDB.query(
'SELECT id FROM orders ORDER BY RAND() LIMIT 1000'
);
for (const { id } of sampleIds) {
const oldRow = await oldDB.query('SELECT * FROM orders WHERE id = ?', [id]);
const newRow = await newDB.query('SELECT * FROM orders WHERE id = ?', [id]);
assert(deepEqual(oldRow, newRow), `数据不一致: id=${id}`);
}
}
阶段 5:灰度切读
// 灰度切读:按比例将读流量切到新库
class ReadRouter {
private newDBRatio: number; // 0 ~ 100
async read(sql: string, params: any[]) {
if (Math.random() * 100 < this.newDBRatio) {
// 读新库
const result = await this.newDB.query(sql, params);
// 可选:对比旧库结果(shadow read)
return result;
}
return this.oldDB.query(sql, params);
}
// 逐步调大比例:1% → 10% → 50% → 100%
setRatio(ratio: number) {
this.newDBRatio = ratio;
}
}
三、异构数据库迁移(MySQL → PostgreSQL)
| 差异点 | MySQL | PostgreSQL | 处理 |
|---|---|---|---|
| 自增 | AUTO_INCREMENT | SERIAL / GENERATED | DDL 转换 |
| JSON | JSON | JSONB | 注意语法差异 |
| 时间 | DATETIME | TIMESTAMP | 注意时区 |
| 引号 | 反引号 ` | 双引号 " | SQL 改写 |
| 分页 | LIMIT x, y | LIMIT y OFFSET x | SQL 改写 |
四、回滚方案
| 阶段 | 回滚方式 |
|---|---|
| 双写阶段 | 关闭新库写入即可 |
| 历史迁移 | 清空新库重新迁移 |
| 灰度切读 | 将读比例调回 0% |
| 全量切读 | 切回旧库读 |
| 停旧写 | 不可逆点,需格外确认 |
常见面试问题
Q1: 双写期间如何处理新旧库数据不一致?
答案:
- 差异日志:新库写入失败时记录完整操作日志
- 定时补偿:消费差异日志重试写入新库
- 数据校验:周期性运行校验脚本
- 以旧库为准:迁移完成前,旧库是数据真实来源
Q2: 大表(亿级)历史数据迁移需要多久?
答案:
估算公式:迁移时间 = 总行数 / 每秒迁移行数
- 分批 INSERT(5000/批):约 5 万~10 万行/秒
- 1 亿行 ÷ 5 万/秒 ≈ 2000 秒 ≈ 30 分钟
- 但需要限速避免影响线上:实际 2~8 小时
加速手段:
- 多线程并发迁移(按 ID 范围分段)
- 临时关闭新库索引,迁移完再建
- 使用专用工具(mysqldump、DTS)
Q3: 如何保证迁移过程中数据零丢失?
答案:
- 双写覆盖增量:迁移期间的新写入通过双写保证
- 历史数据 UPSERT:重复迁移不会覆盖新数据
- binlog 兜底:历史迁移完成后,用 CDC 追平差量
- 多轮校验:迁移后运行校验脚本,差异数据重新同步