跳到主要内容

数据倾斜排查

场景描述

Spark / Hive 任务运行了 3 小时还没结束,Spark UI 显示 999 个 Task 1 分钟完成,1 个 Task 跑了 3 小时。

数据倾斜的本质

排查步骤

1. 定位倾斜 Key

-- 查看 Key 分布,找出数据量最大的 Key
SELECT join_key, COUNT(*) AS cnt
FROM table_a
GROUP BY join_key
ORDER BY cnt DESC
LIMIT 20;

-- 常见倾斜 Key:NULL、空字符串、默认值(如 0、-1、unknown)

2. 查看 Spark UI

  • Stage 详情:某个 Stage 的 Task 执行时间方差极大
  • Shuffle Read:某个 Task 的 Shuffle Read Size 远大于其他(如 10GB vs 10MB)

常见解决方案

方案适用场景说明
过滤倾斜 KeyNULL / 无效值导致提前过滤,单独处理
Salting KeyJOIN 倾斜给倾斜 Key 加随机后缀打散
Map-side Join小表 JOIN 大表广播小表避免 Shuffle
两阶段聚合GROUP BY 倾斜先局部聚合再全局聚合
调整并行度Partition 数不合理增加 Task 数

Salting Key 示例

-- 大表加随机后缀
SELECT /*+ REPARTITION(100) */
CONCAT(user_id, '_', FLOOR(RAND() * 10)) AS salted_key,
amount
FROM order_fact
WHERE dt = '2024-01-15';

-- 小表膨胀 10 倍
SELECT user_id,
CONCAT(user_id, '_', idx) AS salted_key,
user_name
FROM user_dim
LATERAL VIEW EXPLODE(ARRAY(0,1,2,3,4,5,6,7,8,9)) t AS idx;

-- Salted Key JOIN
SELECT b.user_name, SUM(a.amount) AS total_amount
FROM salted_order a
JOIN expanded_user b ON a.salted_key = b.salted_key
GROUP BY b.user_name;

两阶段聚合

-- 第一阶段:加随机前缀,局部聚合
SELECT CONCAT(FLOOR(RAND() * 10), '_', city) AS prefix_city,
SUM(amount) AS partial_sum
FROM order_fact
GROUP BY CONCAT(FLOOR(RAND() * 10), '_', city);

-- 第二阶段:去掉前缀,全局聚合
SELECT SUBSTR(prefix_city, INSTR(prefix_city, '_') + 1) AS city,
SUM(partial_sum) AS total_amount
FROM partial_result
GROUP BY SUBSTR(prefix_city, INSTR(prefix_city, '_') + 1);

常见面试问题

Q1: 如何预防数据倾斜?

答案

  1. 数据清洗:上游处理好 NULL 和异常值
  2. 分区设计:避免用高倾斜字段做分区键
  3. AQE(Adaptive Query Execution):Spark 3.0+ 开启自动倾斜优化
  4. 指标监控:监控 Task 执行时间方差,及时告警

Q2: Spark AQE 如何处理数据倾斜?

答案

Spark 3.0+ 的 AQE(spark.sql.adaptive.enabled = true)能自动:

  • 合并小分区:将过小的 Shuffle 分区合并
  • 拆分倾斜分区:将过大的分区自动拆分为多个 Task 并行处理
  • 配置 spark.sql.adaptive.skewJoin.enabled = true

相关链接