数据倾斜排查
场景描述
Spark / Hive 任务运行了 3 小时还没结束,Spark UI 显示 999 个 Task 1 分钟完成,1 个 Task 跑了 3 小时。
数据倾斜的本质
排查步骤
1. 定位倾斜 Key
-- 查看 Key 分布,找出数据量最大的 Key
SELECT join_key, COUNT(*) AS cnt
FROM table_a
GROUP BY join_key
ORDER BY cnt DESC
LIMIT 20;
-- 常见倾斜 Key:NULL、空字符串、默认值(如 0、-1、unknown)
2. 查看 Spark UI
- Stage 详情:某个 Stage 的 Task 执行时间方差极大
- Shuffle Read:某个 Task 的 Shuffle Read Size 远大于其他(如 10GB vs 10MB)
常见解决方案
| 方案 | 适用场景 | 说明 |
|---|---|---|
| 过滤倾斜 Key | NULL / 无效值导致 | 提前过滤,单独处理 |
| Salting Key | JOIN 倾斜 | 给倾斜 Key 加随机后缀打散 |
| Map-side Join | 小表 JOIN 大表 | 广播小表避免 Shuffle |
| 两阶段聚合 | GROUP BY 倾斜 | 先局部聚合再全局聚合 |
| 调整并行度 | Partition 数不合理 | 增加 Task 数 |
Salting Key 示例
-- 大表加随机后缀
SELECT /*+ REPARTITION(100) */
CONCAT(user_id, '_', FLOOR(RAND() * 10)) AS salted_key,
amount
FROM order_fact
WHERE dt = '2024-01-15';
-- 小表膨胀 10 倍
SELECT user_id,
CONCAT(user_id, '_', idx) AS salted_key,
user_name
FROM user_dim
LATERAL VIEW EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9)) t AS idx;
-- Salted Key JOIN
SELECT b.user_name, SUM(a.amount) AS total_amount
FROM salted_order a
JOIN expanded_user b ON a.salted_key = b.salted_key
GROUP BY b.user_name;
两阶段聚合
-- 第一阶段:加随机前缀,局部聚合
SELECT CONCAT(FLOOR(RAND() * 10), '_', city) AS prefix_city,
SUM(amount) AS partial_sum
FROM order_fact
GROUP BY CONCAT(FLOOR(RAND() * 10), '_', city);
-- 第二阶段:去掉前缀,全局聚合
SELECT SUBSTR(prefix_city, INSTR(prefix_city, '_') + 1) AS city,
SUM(partial_sum) AS total_amount
FROM partial_result
GROUP BY SUBSTR(prefix_city, INSTR(prefix_city, '_') + 1);
常见面试问题
Q1: 如何预防数据倾斜?
答案:
- 数据清洗:上游处理好 NULL 和异常值
- 分区设计:避免用高倾斜字段做分区键
- AQE(Adaptive Query Execution):Spark 3.0+ 开启自动倾斜优化
- 指标监控:监控 Task 执行时间方差,及时告警
Q2: Spark AQE 如何处理数据倾斜?
答案:
Spark 3.0+ 的 AQE(spark.sql.adaptive.enabled = true)能自动:
- 合并小分区:将过小的 Shuffle 分区合并
- 拆分倾斜分区:将过大的分区自动拆分为多个 Task 并行处理
- 配置
spark.sql.adaptive.skewJoin.enabled = true