跳到主要内容

迁移实战

问题

需要将数据从一个数据库迁移到另一个数据库(如 MySQL → MySQL 分库分表、MySQL → PostgreSQL),如何做到不停机迁移?

答案

一、迁移全流程

二、各阶段详解

阶段 1:评估与规划

评估项内容
数据量总行数、总大小、增长速度
表结构差异字段类型、索引、约束差异
业务依赖哪些服务读写这些表
预计耗时历史数据迁移时间估算
回滚方案每个阶段如何回滚

阶段 2:双写

// 双写层:同时写入新旧库
class DualWriteRepository {
constructor(
private oldDB: Database,
private newDB: Database,
) {}

async create(data: Order): Promise<void> {
// 旧库为主,写入失败则整体失败
await this.oldDB.insert('orders', data);

// 新库为辅,写入失败只告警不阻塞
try {
await this.newDB.insert('orders', data);
} catch (error) {
// 记录差异日志,后续补偿
await this.logDifference('insert', data, error);
}
}

async update(id: string, data: Partial<Order>): Promise<void> {
await this.oldDB.update('orders', id, data);
try {
await this.newDB.update('orders', id, data);
} catch (error) {
await this.logDifference('update', { id, ...data }, error);
}
}
}
双写注意事项
  1. 旧库为主:新库写入失败不影响业务
  2. 差异日志:记录所有新库写入失败的数据
  3. 补偿机制:定时消费差异日志重试写入
  4. 幂等设计:补偿写入需要幂等

阶段 3:历史数据迁移

// 分批迁移历史数据
async function migrateHistory(batchSize = 5000) {
let lastId = 0;

while (true) {
// 按 ID 范围分批读取
const rows = await oldDB.query(
'SELECT * FROM orders WHERE id > ? ORDER BY id LIMIT ?',
[lastId, batchSize]
);

if (rows.length === 0) break;

// 批量写入新库(使用 UPSERT 保证幂等)
await newDB.batchUpsert('orders', rows);

lastId = rows[rows.length - 1].id;

// 打印进度
console.log(`Migrated up to id=${lastId}`);

// 控制速度,避免压垮数据库
await sleep(100);
}
}

阶段 4:数据校验

// 三级校验
async function verifyData() {
// 1. 行数校验
const oldCount = await oldDB.query('SELECT COUNT(*) FROM orders');
const newCount = await newDB.query('SELECT COUNT(*) FROM orders');
assert(oldCount === newCount, `行数不一致: old=${oldCount}, new=${newCount}`);

// 2. 聚合校验
const oldSum = await oldDB.query('SELECT SUM(amount) FROM orders');
const newSum = await newDB.query('SELECT SUM(amount) FROM orders');
assert(oldSum === newSum, `金额不一致`);

// 3. 抽样逐行校验
const sampleIds = await oldDB.query(
'SELECT id FROM orders ORDER BY RAND() LIMIT 1000'
);
for (const { id } of sampleIds) {
const oldRow = await oldDB.query('SELECT * FROM orders WHERE id = ?', [id]);
const newRow = await newDB.query('SELECT * FROM orders WHERE id = ?', [id]);
assert(deepEqual(oldRow, newRow), `数据不一致: id=${id}`);
}
}

阶段 5:灰度切读

// 灰度切读:按比例将读流量切到新库
class ReadRouter {
private newDBRatio: number; // 0 ~ 100

async read(sql: string, params: any[]) {
if (Math.random() * 100 < this.newDBRatio) {
// 读新库
const result = await this.newDB.query(sql, params);
// 可选:对比旧库结果(shadow read)
return result;
}
return this.oldDB.query(sql, params);
}

// 逐步调大比例:1% → 10% → 50% → 100%
setRatio(ratio: number) {
this.newDBRatio = ratio;
}
}

三、异构数据库迁移(MySQL → PostgreSQL)

差异点MySQLPostgreSQL处理
自增AUTO_INCREMENTSERIAL / GENERATEDDDL 转换
JSONJSONJSONB注意语法差异
时间DATETIMETIMESTAMP注意时区
引号反引号 `双引号 "SQL 改写
分页LIMIT x, yLIMIT y OFFSET xSQL 改写

四、回滚方案

阶段回滚方式
双写阶段关闭新库写入即可
历史迁移清空新库重新迁移
灰度切读将读比例调回 0%
全量切读切回旧库读
停旧写不可逆点,需格外确认

常见面试问题

Q1: 双写期间如何处理新旧库数据不一致?

答案

  1. 差异日志:新库写入失败时记录完整操作日志
  2. 定时补偿:消费差异日志重试写入新库
  3. 数据校验:周期性运行校验脚本
  4. 以旧库为准:迁移完成前,旧库是数据真实来源

Q2: 大表(亿级)历史数据迁移需要多久?

答案

估算公式:迁移时间 = 总行数 / 每秒迁移行数

  • 分批 INSERT(5000/批):约 5 万~10 万行/秒
  • 1 亿行 ÷ 5 万/秒 ≈ 2000 秒 ≈ 30 分钟
  • 但需要限速避免影响线上:实际 2~8 小时

加速手段:

  • 多线程并发迁移(按 ID 范围分段)
  • 临时关闭新库索引,迁移完再建
  • 使用专用工具(mysqldump、DTS)

Q3: 如何保证迁移过程中数据零丢失?

答案

  1. 双写覆盖增量:迁移期间的新写入通过双写保证
  2. 历史数据 UPSERT:重复迁移不会覆盖新数据
  3. binlog 兜底:历史迁移完成后,用 CDC 追平差量
  4. 多轮校验:迁移后运行校验脚本,差异数据重新同步

相关链接