Airflow
问题
什么是 Apache Airflow?它如何管理数据管道?
答案
什么是 Airflow
Apache Airflow 是一个工作流调度和编排平台,用 Python 代码定义任务依赖关系(DAG),自动按顺序执行并监控。
核心概念
| 概念 | 说明 |
|---|---|
| DAG | 有向无环图,定义任务间的依赖关系 |
| Task | 一个具体的执行单元 |
| Operator | Task 的类型(BashOperator、PythonOperator、SparkSubmitOperator) |
| Schedule | 执行计划(cron 表达式) |
| XCom | 任务间传递小量数据 |
| Connection | 外部系统连接配置 |
DAG 示例
数仓每日 ETL DAG
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
}
with DAG(
'daily_etl',
default_args=default_args,
schedule_interval='0 2 * * *', # 每天凌晨 2 点
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# ODS 层数据同步
ods_sync = BashOperator(
task_id='ods_sync',
bash_command='spark-submit ods_sync.py --date {{ ds }}',
)
# DWD 层清洗
dwd_clean = BashOperator(
task_id='dwd_clean',
bash_command='spark-submit dwd_clean.py --date {{ ds }}',
)
# DWS 层聚合
dws_agg = BashOperator(
task_id='dws_aggregate',
bash_command='spark-submit dws_agg.py --date {{ ds }}',
)
# 定义依赖
ods_sync >> dwd_clean >> dws_agg
Airflow 架构
最佳实践
| 实践 | 说明 |
|---|---|
| 幂等设计 | 重跑不产生重复数据(用 INSERT OVERWRITE) |
| 分区执行 | 用 {{ ds }} 模板变量传日期 |
| 小任务 | 一个 Task 做一件事,便于重跑 |
| SLA 告警 | 设置任务超时告警 |
| 依赖清晰 | DAG 不要太复杂,必要时拆分 |
常见面试问题
Q1: Airflow 的调度是怎么工作的?
答案:
- Scheduler 定期扫描 DAG 文件,检查是否到达 schedule_interval
- 到点后创建 DAG Run,为每个 Task 创建 Task Instance
- Executor 把 Task 分发给 Worker 执行
- Task 执行完成后更新状态到 Metadata DB
Q2: Airflow 任务失败了怎么处理?
答案:
- 自动重试:
retries=2配置自动重试 - 告警通知:
email_on_failure或 Slack 通知 - 手动重跑:在 Web UI 中 Clear 失败任务,从该节点重跑
- 排查原因:查看 Task 日志,定位是数据问题还是系统问题
相关链接
- ETL vs ELT - ETL 流程概述
- dbt - 与 Airflow 配合的转换工具