跳到主要内容

数据同步与迁移实战

问题

如何实现跨库迁移和异构数据同步?双写方案如何保障数据一致性?

答案

一、数据迁移场景

场景说明难度
同构迁移MySQL → MySQL(分库分表或升级版本)
异构迁移MySQL → PostgreSQL
数据库拆分单库 → 微服务独立库
云迁移自建 → 云数据库
NoSQL 迁移MySQL → MongoDB / ES

二、停机迁移

最简单但影响最大的方案:

# MySQL 全量导出
mysqldump -h source_host -u root -p mydb > backup.sql

# 导入目标库
mysql -h target_host -u root -p mydb < backup.sql

适用场景:数据量小、可接受短暂停服(如凌晨维护窗口)。

三、双写迁移(不停机)

Phase 1:双写

应用同时写入新旧两个数据库

class UserService {
async createUser(data: CreateUserDTO) {
// 先写旧库(主)
const user = await this.oldDb.user.create(data);

// 异步写新库(不影响主流程)
try {
await this.newDb.user.create(data);
} catch (error) {
// 记录失败,后续补偿
await this.syncQueue.add('user:create', { data, userId: user.id });
this.logger.error('双写新库失败', error);
}

return user;
}
}

Phase 2:历史数据迁移 + 数据校验

1. 全量迁移历史数据到新库
2. 对比新旧库数据:
- 总行数是否一致
- 抽样对比记录内容
- 校验关键字段(金额、状态等)
3. 修复不一致数据

Phase 3:切读

class UserService {
// 灰度切换读流量
async getUser(id: number) {
if (this.featureFlag.isEnabled('read-from-new-db', id)) {
return this.newDb.user.findById(id);
}
return this.oldDb.user.findById(id);
}
}

Phase 4:停旧写 + 观察

确认新库读取正常后,停止写入旧库,所有读写都切到新库。

Phase 5:清理旧库引用

双写的风险
  1. 一致性问题:新库写入失败导致数据不一致,需要补偿队列
  2. 代码复杂度:双写逻辑侵入业务代码
  3. 性能影响:每次写入 RT 增加
  4. 事务边界:跨库事务难以保证

四、CDC 迁移(变更数据捕获)

通过监听数据库变更日志实现同步,对应用代码零侵入

Canal(阿里巴巴)

// Canal 消费示例
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", ""
);
connector.connect();
connector.subscribe("mydb\\.users"); // 订阅 users 表

while (true) {
Message message = connector.getWithoutAck(100);
for (Entry entry : message.getEntries()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.INSERT) {
// 写入目标库
syncToTarget(rowData.getAfterColumnsList());
}
}
}
connector.ack(message.getId());
}

Debezium

debezium-connector-config.json
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "source-mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "myserver",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.posts",
"topic.prefix": "dbserver1"
}
}

Canal vs Debezium

维度CanalDebezium
数据源MySQL 为主MySQL、PG、MongoDB 等
输出自定义协议Kafka Connect
部署独立部署Kafka Connect 集群
生态国内流行国际主流
社区阿里开源Red Hat 维护

五、云数据库迁移服务

各云厂商提供的 DTS(数据传输服务):

云厂商服务特点
阿里云DTS全量 + 增量,支持异构
AWSDMSDatabase Migration Service
腾讯云DTS数据订阅 + 迁移
华为云DRS数据复制服务

六、数据校验

迁移后必须进行数据校验:

-- 1. 行数对比
SELECT COUNT(*) FROM source_db.users;
SELECT COUNT(*) FROM target_db.users;

-- 2. 抽样校验(随机抽取 1000 条对比)
SELECT id, name, email, age FROM source_db.users
WHERE id IN (SELECT id FROM source_db.users ORDER BY RAND() LIMIT 1000);

-- 3. 聚合校验
SELECT SUM(balance), COUNT(*) FROM source_db.accounts;
SELECT SUM(balance), COUNT(*) FROM target_db.accounts;

-- 4. 校验工具
-- pt-table-checksum(Percona)
pt-table-checksum --host=source --databases=mydb

常见面试问题

Q1: 双写方案中,新库写入失败怎么办?

答案

  1. 异步重试:将失败操作放入消息队列,异步重试
  2. 补偿任务:定时扫描旧库与新库差异,补偿缺失数据
  3. 告警监控:失败率超过阈值则告警,人工介入
  4. 降级策略:新库连续失败时暂停双写,避免队列积压

关键原则:旧库写入是主路径,新库写入失败不影响业务。

Q2: CDC 方案如何保证顺序性?

答案

  1. 单表顺序:binlog 本身是有序的,同一表的变更按顺序消费
  2. Kafka 保序:同一主键的变更路由到同一 Partition
  3. 消费者串行:单 Partition 单消费者保证顺序处理
binlog: INSERT user#1 → UPDATE user#1 → DELETE user#1
Kafka: Partition 0 (key=user#1) → 保序消费

Q3: 如何迁移亿级大表?

答案

  1. 全量阶段
    • 使用 mysqldump --single-transaction(InnoDB 一致性快照)
    • 或者按 ID 范围分片并行导出
    • 使用 LOAD DATA INFILE 高速导入
  2. 增量阶段
    • 全量导出前记录 binlog 位点
    • 全量导入完成后,从记录的位点开始 CDC 增量同步
  3. 追平后切换:增量延迟追到秒级后,择机切换

Q4: 异构数据库迁移(MySQL → PostgreSQL)有哪些坑?

答案

差异MySQLPostgreSQL
自增AUTO_INCREMENTSERIAL / IDENTITY
布尔TINYINT(1)BOOLEAN
日期零值0000-00-00不允许
大小写默认不敏感默认敏感
反引号` 包裹标识符" 包裹标识符
LIMITLIMIT N, MLIMIT M OFFSET N
GROUP BY允许非聚合列严格模式不允许

处理方式:使用 pgloader 等工具自动转换,但仍需人工审查不兼容的部分。

相关链接