跳到主要内容

Airflow

问题

什么是 Apache Airflow?它如何管理数据管道?

答案

什么是 Airflow

Apache Airflow 是一个工作流调度和编排平台,用 Python 代码定义任务依赖关系(DAG),自动按顺序执行并监控。

核心概念

概念说明
DAG有向无环图,定义任务间的依赖关系
Task一个具体的执行单元
OperatorTask 的类型(BashOperator、PythonOperator、SparkSubmitOperator)
Schedule执行计划(cron 表达式)
XCom任务间传递小量数据
Connection外部系统连接配置

DAG 示例

数仓每日 ETL DAG
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
}

with DAG(
'daily_etl',
default_args=default_args,
schedule_interval='0 2 * * *', # 每天凌晨 2 点
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:

# ODS 层数据同步
ods_sync = BashOperator(
task_id='ods_sync',
bash_command='spark-submit ods_sync.py --date {{ ds }}',
)

# DWD 层清洗
dwd_clean = BashOperator(
task_id='dwd_clean',
bash_command='spark-submit dwd_clean.py --date {{ ds }}',
)

# DWS 层聚合
dws_agg = BashOperator(
task_id='dws_aggregate',
bash_command='spark-submit dws_agg.py --date {{ ds }}',
)

# 定义依赖
ods_sync >> dwd_clean >> dws_agg

Airflow 架构

最佳实践

实践说明
幂等设计重跑不产生重复数据(用 INSERT OVERWRITE)
分区执行{{ ds }} 模板变量传日期
小任务一个 Task 做一件事,便于重跑
SLA 告警设置任务超时告警
依赖清晰DAG 不要太复杂,必要时拆分

常见面试问题

Q1: Airflow 的调度是怎么工作的?

答案

  • Scheduler 定期扫描 DAG 文件,检查是否到达 schedule_interval
  • 到点后创建 DAG Run,为每个 Task 创建 Task Instance
  • Executor 把 Task 分发给 Worker 执行
  • Task 执行完成后更新状态到 Metadata DB

Q2: Airflow 任务失败了怎么处理?

答案

  1. 自动重试retries=2 配置自动重试
  2. 告警通知email_on_failure 或 Slack 通知
  3. 手动重跑:在 Web UI 中 Clear 失败任务,从该节点重跑
  4. 排查原因:查看 Task 日志,定位是数据问题还是系统问题

相关链接

  • ETL vs ELT - ETL 流程概述
  • dbt - 与 Airflow 配合的转换工具