数据同步与迁移实战
问题
如何实现跨库迁移和异构数据同步?双写方案如何保障数据一致性?
答案
一、数据迁移场景
| 场景 | 说明 | 难度 |
|---|---|---|
| 同构迁移 | MySQL → MySQL(分库分表或升级版本) | 中 |
| 异构迁移 | MySQL → PostgreSQL | 高 |
| 数据库拆分 | 单库 → 微服务独立库 | 高 |
| 云迁移 | 自建 → 云数据库 | 中 |
| NoSQL 迁移 | MySQL → MongoDB / ES | 高 |
二、停机迁移
最简单但影响最大的方案:
# MySQL 全量导出
mysqldump -h source_host -u root -p mydb > backup.sql
# 导入目标库
mysql -h target_host -u root -p mydb < backup.sql
适用场景:数据量小、可接受短暂停服(如凌晨维护窗口)。
三、双写迁移(不停机)
Phase 1:双写
应用同时写入新旧两个数据库:
class UserService {
async createUser(data: CreateUserDTO) {
// 先写旧库(主)
const user = await this.oldDb.user.create(data);
// 异步写新库(不影响主流程)
try {
await this.newDb.user.create(data);
} catch (error) {
// 记录失败,后续补偿
await this.syncQueue.add('user:create', { data, userId: user.id });
this.logger.error('双写新库失败', error);
}
return user;
}
}
Phase 2:历史数据迁移 + 数据校验
1. 全量迁移历史数据到新库
2. 对比新旧库数据:
- 总行数是否一致
- 抽样对比记录内容
- 校验关键字段(金额、状态等)
3. 修复不一致数据
Phase 3:切读
class UserService {
// 灰度切换读流量
async getUser(id: number) {
if (this.featureFlag.isEnabled('read-from-new-db', id)) {
return this.newDb.user.findById(id);
}
return this.oldDb.user.findById(id);
}
}
Phase 4:停旧写 + 观察
确认新库读取正常后,停止写入旧库,所有读写都切到新库。
Phase 5:清理旧库引用
双写的风险
- 一致性问题:新库写入失败导致数据不一致,需要补偿队列
- 代码复杂度:双写逻辑侵入业务代码
- 性能影响:每次写入 RT 增加
- 事务边界:跨库事务难以保证
四、CDC 迁移(变更数据捕获)
通过监听数据库变更日志实现同步,对应用代码零侵入。
Canal(阿里巴巴)
// Canal 消费示例
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", ""
);
connector.connect();
connector.subscribe("mydb\\.users"); // 订阅 users 表
while (true) {
Message message = connector.getWithoutAck(100);
for (Entry entry : message.getEntries()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.INSERT) {
// 写入目标库
syncToTarget(rowData.getAfterColumnsList());
}
}
}
connector.ack(message.getId());
}
Debezium
debezium-connector-config.json
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "source-mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "myserver",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.posts",
"topic.prefix": "dbserver1"
}
}
Canal vs Debezium
| 维度 | Canal | Debezium |
|---|---|---|
| 数据源 | MySQL 为主 | MySQL、PG、MongoDB 等 |
| 输出 | 自定义协议 | Kafka Connect |
| 部署 | 独立部署 | Kafka Connect 集群 |
| 生态 | 国内流行 | 国际主流 |
| 社区 | 阿里开源 | Red Hat 维护 |
五、云数据库迁移服务
各云厂商提供的 DTS(数据传输服务):
| 云厂商 | 服务 | 特点 |
|---|---|---|
| 阿里云 | DTS | 全量 + 增量,支持异构 |
| AWS | DMS | Database Migration Service |
| 腾讯云 | DTS | 数据订阅 + 迁移 |
| 华为云 | DRS | 数据复制服务 |
六、数据校验
迁移后必须进行数据校验:
-- 1. 行数对比
SELECT COUNT(*) FROM source_db.users;
SELECT COUNT(*) FROM target_db.users;
-- 2. 抽样校验(随机抽取 1000 条对比)
SELECT id, name, email, age FROM source_db.users
WHERE id IN (SELECT id FROM source_db.users ORDER BY RAND() LIMIT 1000);
-- 3. 聚合校验
SELECT SUM(balance), COUNT(*) FROM source_db.accounts;
SELECT SUM(balance), COUNT(*) FROM target_db.accounts;
-- 4. 校验工具
-- pt-table-checksum(Percona)
pt-table-checksum --host=source --databases=mydb
常见面试问题
Q1: 双写方案中,新库写入失败怎么办?
答案:
- 异步重试:将失败操作放入消息队列,异步重试
- 补偿任务:定时扫描旧库与新库差异,补偿缺失数据
- 告警监控:失败率超过阈值则告警,人工介入
- 降级策略:新库连续失败时暂停双写,避免队列积压
关键原则:旧库写入是主路径,新库写入失败不影响业务。
Q2: CDC 方案如何保证顺序性?
答案:
- 单表顺序:binlog 本身是有序的,同一表的变更按顺序消费
- Kafka 保序:同一主键的变更路由到同一 Partition
- 消费者串行:单 Partition 单消费者保证顺序处理
binlog: INSERT user#1 → UPDATE user#1 → DELETE user#1
Kafka: Partition 0 (key=user#1) → 保序消费
Q3: 如何迁移亿级大表?
答案:
- 全量阶段:
- 使用
mysqldump --single-transaction(InnoDB 一致性快照) - 或者按 ID 范围分片并行导出
- 使用
LOAD DATA INFILE高速导入
- 使用
- 增量阶段:
- 全量导出前记录 binlog 位点
- 全量导入完成后,从记录的位点开始 CDC 增量同步
- 追平后切换:增量延迟追到秒级后,择机切换
Q4: 异构数据库迁移(MySQL → PostgreSQL)有哪些坑?
答案:
| 差异 | MySQL | PostgreSQL |
|---|---|---|
| 自增 | AUTO_INCREMENT | SERIAL / IDENTITY |
| 布尔 | TINYINT(1) | BOOLEAN |
| 日期零值 | 0000-00-00 | 不允许 |
| 大小写 | 默认不敏感 | 默认敏感 |
| 反引号 | ` 包裹标识符 | " 包裹标识符 |
| LIMIT | LIMIT N, M | LIMIT M OFFSET N |
| GROUP BY | 允许非聚合列 | 严格模式不允许 |
处理方式:使用 pgloader 等工具自动转换,但仍需人工审查不兼容的部分。