:::tip 🎮 Interactive Playground Visualize this concept: Try the Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::
Apache Airflow Architecture
The 11-Day Silent Failure
A cron job ran a Python script every night at 2am. The script pulled data from an API, transformed it, and wrote results to a PostgreSQL table. The downstream dashboard read from that table and served numbers to the business team every morning.
The script had no retry logic. If the API returned a 500, the script exited with a non-zero code, cron logged the failure to a file nobody read, and that was the end of it. No alert was sent. No human was woken up. The next night, cron ran the script again, this time successfully - but the gap from the failed night was simply missing.
For 11 consecutive nights, the script failed silently. A dependency library had been upgraded on the server, breaking an import. The dashboard continued to render - it just showed stale data. The business team saw numbers that were 11 days old but had no reason to suspect anything was wrong because the UI looked fine.
When someone finally noticed that the numbers hadn't moved in almost two weeks and raised a ticket, the investigation was painful. There was no execution history. The cron log file had been rotated. The only evidence was the gap in the database table. Rebuilding the 11 days of missing data required manually running the script for each date, hoping the source API still had the historical records.
The team replaced 40 cron jobs with a single Apache Airflow deployment. Every pipeline had a visible execution history, retry logic, alerting on failure, and clear dependency graphs. When something broke, they knew within minutes - not 11 days.
Why Orchestration Exists
The problems with cron and ad-hoc scripts are predictable and universal:
No dependency management. Cron has no concept of "Task B should only run after Task A succeeds." You cobble together sleep commands or polling loops. They break in non-obvious ways.
No retry logic. A transient API failure or a momentary network blip permanently breaks the pipeline until a human intervenes. Most failures at 3am go unnoticed until morning.
No execution history. Cron logs to syslog or a file. There is no structured record of which runs succeeded, which failed, how long each step took, or what the error was.
No parallelism control. If you have 20 tables to process, you either run them sequentially (slow) or fire 20 concurrent scripts with no coordination (overloads the database).
No backfill. If you need to re-process the last 30 days of data, you write a loop script by hand, hope it is idempotent, and run it while praying nothing else is using the cluster.
Orchestration frameworks solve all of these problems. They give pipelines the properties that make them production-grade: observability (you can see what ran and what failed), reliability (automatic retries with exponential backoff), recoverability (resume from the task that failed, not from the beginning), and schedulability (cron-style, interval-based, event-triggered, or data-driven).
Historical Context
Airflow was created by Maxime Beauchemin at Airbnb in 2014. The problem he was solving was exactly the one described above: Airbnb had dozens of cron jobs with implicit dependencies, no monitoring, and no easy way to run historical backfills. His insight was to represent pipelines as Python code rather than configuration files (as earlier tools like Oozie and Luigi required), making them version-controllable, testable, and dynamically generated.
Airbnb open-sourced Airflow in 2016. It entered the Apache Incubator in 2016 and became a top-level Apache project in 2019. By 2020 it had become the de-facto standard for data pipeline orchestration, used at thousands of companies from startups to hyperscalers.
Airflow 2.0 (December 2020) was a major architectural overhaul: the Scheduler was redesigned from a single-threaded process to a multi-threaded high-availability system, the TaskFlow API was introduced (decorators instead of Operator instantiation), and performance was dramatically improved. Airflow 2.x is what is used in production today.
The DAG - Directed Acyclic Graph
The core unit in Airflow is the DAG: a Python object that describes a set of tasks and their dependency relationships. "Directed" means dependencies flow in one direction - upstream to downstream. "Acyclic" means there are no cycles - Task B cannot depend on Task C which depends on Task B.
A minimal DAG looks like this:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-engineering",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
}
with DAG(
dag_id="daily_user_metrics",
description="Compute daily user engagement metrics",
schedule="0 3 * * *", # 3am UTC daily
start_date=datetime(2024, 1, 1),
catchup=False, # don't backfill historical runs on deploy
default_args=default_args,
tags=["metrics", "users"],
) as dag:
def extract():
# fetch from source
pass
def transform():
pass
def load():
pass
t_extract = PythonOperator(task_id="extract", python_callable=extract)
t_transform = PythonOperator(task_id="transform", python_callable=transform)
t_load = PythonOperator(task_id="load", python_callable=load)
# declare dependencies - >> means "upstream of"
t_extract >> t_transform >> t_load
The schedule parameter accepts cron expressions, timedelta objects, or - in Airflow 2.4+ - Dataset objects for data-aware scheduling. The start_date defines when the DAG's first logical run should begin. The catchup=False setting prevents Airflow from scheduling all the historical runs between start_date and now when you first deploy - almost always what you want.
TaskFlow API
Airflow 2.x introduced the @task decorator (TaskFlow API), which reduces boilerplate significantly and makes data passing between tasks feel natural:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
dag_id="user_metrics_taskflow",
schedule="0 3 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["metrics"],
)
def user_metrics_pipeline():
@task(retries=3, retry_delay=timedelta(minutes=5))
def extract_users() -> dict:
"""Pull user events from the source API."""
import requests
resp = requests.get("https://api.internal/events/daily")
resp.raise_for_status()
return {"record_count": len(resp.json()), "s3_path": "s3://data/events/raw/today.parquet"}
@task
def validate_data(metadata: dict) -> dict:
if metadata["record_count"] < 1000:
raise ValueError(f"Too few records: {metadata['record_count']}. Expected 1000+.")
return metadata
@task
def compute_metrics(metadata: dict) -> None:
print(f"Computing metrics for {metadata['record_count']} records from {metadata['s3_path']}")
# wire the pipeline
raw = extract_users()
validated = validate_data(raw)
compute_metrics(validated)
user_metrics_pipeline()
The @task decorator automatically wraps each function in a PythonOperator and serializes return values as XCom entries. The dependency graph is inferred from the Python variable assignments - a significant ergonomic improvement over the >> operator pattern.
Airflow Architecture
Airflow has five main components. Understanding what each does and how they communicate is essential for production operations.
The Scheduler
The Scheduler is the brain of Airflow. It continuously:
- Parses DAG files from the
dags/directory (every 30 seconds by default) - Checks which DAG runs should be triggered based on their schedule and start_date
- Creates
DagRunrecords in the metadata database - Checks which tasks within those runs have their upstream dependencies satisfied
- Queues those task instances for execution
In Airflow 2.x, the Scheduler can run in high-availability mode - multiple Scheduler instances compete for a distributed lock, so the failure of one Scheduler does not stop pipeline execution.
:::warning DAG file parsing cost
The Scheduler parses every Python file in the dags/ folder on a loop. Placing heavy imports, database connections, or API calls at module level (outside functions) will slow down the parse loop and can cause the Scheduler to fall behind. Keep DAG files lean - put all real work inside task functions.
:::
The Metadata Database
Every piece of state in Airflow lives in the metadata database. DAG definitions are read from Python files, but DAG runs, task instances, task logs, connections, variables, pools, and SLA misses are all stored here. The metadata database is a PostgreSQL or MySQL instance (SQLite for development only).
This makes Airflow's state persistent and inspectable - and it also makes the metadata database a single point of failure unless you run it with replication.
Workers and the Executor
Workers are processes that actually execute task code. The Executor is the Airflow abstraction that decides how tasks are dispatched to workers:
| Executor | How It Works | Best For |
|---|---|---|
SequentialExecutor | One task at a time, in-process | Local development only |
LocalExecutor | Subprocess per task, single machine | Small teams, single server |
CeleryExecutor | Tasks queued to Redis/RabbitMQ, consumed by worker processes | Medium-to-large deployments |
KubernetesExecutor | One Kubernetes Pod per task | ML workloads, isolation requirements |
CeleryKubernetesExecutor | Celery for lightweight tasks, K8s for heavy tasks | Hybrid workloads |
KubernetesExecutor is particularly valuable for ML pipelines because each task runs in its own container. Training tasks can request GPU resources. Library dependencies are isolated per task image. A memory leak in one task cannot crash other tasks. The tradeoff is cold-start latency - pod scheduling adds 20-60 seconds per task.
Operators - The Building Blocks
An Operator defines what a task does. Airflow ships with a large library of operators and the provider packages add more.
from airflow.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
# PythonOperator - run a Python function
t1 = PythonOperator(
task_id="compute_features",
python_callable=compute_features_fn,
op_kwargs={"date": "{{ ds }}"}, # Jinja templating with Airflow macros
)
# BashOperator - run a shell command
t2 = BashOperator(
task_id="run_dbt",
bash_command="dbt run --select tag:daily --target prod",
)
# SparkSubmitOperator - submit a Spark job
t3 = SparkSubmitOperator(
task_id="spark_feature_engineering",
application="s3://jobs/feature_engineering.py",
application_args=["--date", "{{ ds }}"],
conf={"spark.executor.memory": "8g", "spark.executor.cores": "4"},
)
# KubernetesPodOperator - run a container
t4 = KubernetesPodOperator(
task_id="train_model",
name="model-training",
namespace="ml-pipelines",
image="company.registry/ml-trainer:v2.1",
env_vars={"TRAIN_DATE": "{{ ds }}", "LEARNING_RATE": "0.001"},
resources={"request_memory": "16Gi", "request_cpu": "4", "limit_nvidia.com/gpu": "1"},
is_delete_operator_pod=True,
)
# BranchPythonOperator - conditional branching
def choose_branch(**context):
auc = context["ti"].xcom_pull(task_ids="evaluate_model", key="auc")
return "deploy_model" if auc > 0.85 else "skip_deployment"
t5 = BranchPythonOperator(
task_id="deployment_decision",
python_callable=choose_branch,
)
When to Prefer KubernetesPodOperator
Use KubernetesPodOperator when:
- The task has heavy dependencies that differ from other tasks (e.g., PyTorch vs. Pandas)
- The task needs GPU resources
- Task isolation is important (memory leaks, native library crashes)
- You want per-task Docker image versioning
Use provider Operators (BigQueryOperator, SparkSubmitOperator) when:
- The task is a thin API call or SQL submission with no heavy local computation
- You want to avoid container cold-start overhead
- The dependency footprint is small
XCom - Passing Data Between Tasks
XCom (Cross-Communication) is Airflow's mechanism for passing small pieces of data between tasks. A task can push a value to XCom using ti.xcom_push() and another task can pull it using ti.xcom_pull().
def extract(**context):
ti = context["ti"]
result = {"record_count": 50000, "s3_path": "s3://data/raw/2024-01-15.parquet"}
ti.xcom_push(key="extract_metadata", value=result)
def transform(**context):
ti = context["ti"]
metadata = ti.xcom_pull(task_ids="extract", key="extract_metadata")
print(f"Transforming {metadata['record_count']} records from {metadata['s3_path']}")
With the TaskFlow API, XCom push/pull happens automatically through return values and function arguments - the code in the "TaskFlow API" section above uses this pattern.
:::danger The XCom-as-DataFrame anti-pattern XCom values are serialized and stored in the metadata database. The metadata database is not a data store - it is a state store. Pushing a 500MB DataFrame through XCom will bloat the database, slow down every query against it, and eventually corrupt your metadata database performance.
The correct pattern: tasks that produce large data should write it to S3, GCS, or a database, and then push only the path or identifier through XCom. The downstream task reads the data from that path.
@task
def extract() -> str:
df = fetch_large_dataset()
path = "s3://data/intermediate/extract_output.parquet"
df.to_parquet(path)
return path # push the path, not the DataFrame
@task
def transform(input_path: str) -> str:
df = pd.read_parquet(input_path) # read from S3, not XCom
# ... transform ...
output_path = "s3://data/intermediate/transform_output.parquet"
df.to_parquet(output_path)
return output_path
:::
Connections and Variables
Airflow Connections store credentials for external systems (databases, cloud providers, APIs). They are stored encrypted in the metadata database and accessed by operators via a conn_id.
from airflow.hooks.base import BaseHook
import psycopg2
def connect_to_warehouse(**context):
conn = BaseHook.get_connection("warehouse_postgres")
pg_conn = psycopg2.connect(
host=conn.host,
port=conn.port,
dbname=conn.schema,
user=conn.login,
password=conn.password,
)
return pg_conn
Airflow Variables store arbitrary key-value configuration. In production, prefer environment variables or a secrets backend (Hashicorp Vault, AWS Secrets Manager) over Airflow Variables for sensitive values, because Variables are retrieved from the metadata database on every access - high-frequency reads can create load.
from airflow.models import Variable
# Reading a variable - each call hits the metadata DB
model_threshold = Variable.get("champion_model_threshold", default_var=0.85, deserialize_json=True)
:::tip Use environment variables for secrets
Set AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend to have Airflow resolve connections and variables from AWS Secrets Manager instead of the metadata database. This avoids storing plaintext (even encrypted) credentials in your database and centralizes secret rotation.
:::
A Complete ML Pipeline DAG
Here is a realistic production DAG that orchestrates a full ML training workflow with data validation, branching deployment logic, and SLA monitoring:
from airflow.decorators import dag, task
from airflow.operators.python import BranchPythonOperator, ShortCircuitOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
import json
K8S_NAMESPACE = "ml-pipelines"
REGISTRY = "registry.company.com"
TRAIN_DATE = "{{ ds }}"
default_args = {
"owner": "ml-platform",
"retries": 2,
"retry_delay": timedelta(minutes=10),
"email_on_failure": True,
"sla": timedelta(hours=6),
}
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Notify PagerDuty when the pipeline misses its SLA."""
import requests
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": "{{ var.value.pagerduty_key }}",
"event_action": "trigger",
"payload": {
"summary": f"SLA MISS: {dag.dag_id} - tasks: {[t.task_id for t in blocking_tis]}",
"severity": "warning",
},
},
)
@dag(
dag_id="daily_churn_model_training",
description="Daily churn model retraining with champion/challenger evaluation",
schedule="0 2 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args=default_args,
sla_miss_callback=sla_miss_callback,
tags=["ml", "churn", "production"],
)
def churn_model_pipeline():
@task(retries=3)
def validate_training_data() -> dict:
"""Check that training data meets quality thresholds."""
import boto3, pandas as pd
s3 = boto3.client("s3")
df = pd.read_parquet(f"s3://data/features/churn/{TRAIN_DATE}/features.parquet")
row_count = len(df)
null_rate = df.isnull().mean().max()
if row_count < 10_000:
raise ValueError(f"Insufficient training data: {row_count} rows (need 10,000+)")
if null_rate > 0.05:
raise ValueError(f"High null rate in features: {null_rate:.1%} (threshold: 5%)")
return {"row_count": row_count, "null_rate": float(null_rate), "s3_path": f"s3://data/features/churn/{TRAIN_DATE}/"}
# KubernetesPodOperator for training - GPU isolated container
train_model = KubernetesPodOperator(
task_id="train_churn_model",
name="churn-model-training",
namespace=K8S_NAMESPACE,
image=f"{REGISTRY}/churn-trainer:v3.2",
env_vars={
"TRAIN_DATE": TRAIN_DATE,
"MODEL_BUCKET": "s3://models/churn",
"LEARNING_RATE": "0.001",
"MAX_EPOCHS": "50",
},
resources={
"request_memory": "32Gi",
"request_cpu": "8",
"limit_nvidia.com/gpu": "1",
},
do_xcom_push=True, # KubernetesPodOperator can push XCom via stdout
is_delete_operator_pod=True,
)
@task
def evaluate_model(train_metadata: dict) -> dict:
"""Load new model and champion, compare AUC. Return evaluation results."""
import mlflow, json
client = mlflow.tracking.MlflowClient()
new_run_id = train_metadata.get("mlflow_run_id")
new_auc = client.get_metric_history(new_run_id, "val_auc")[-1].value
# fetch champion model metrics
champion = client.get_model_version("churn_model", version="production")
champion_auc = float(champion.tags.get("val_auc", 0.0))
improvement = new_auc - champion_auc
return {
"new_auc": new_auc,
"champion_auc": champion_auc,
"improvement": improvement,
"should_deploy": improvement >= 0.005, # require 0.5pp improvement
"new_run_id": new_run_id,
}
def deployment_branch(evaluation: dict, **context) -> str:
"""Choose deployment or skip branch based on evaluation results."""
if evaluation["should_deploy"]:
return "deploy_model"
return "notify_no_deployment"
@task(task_id="deploy_model")
def deploy_model(evaluation: dict) -> None:
import mlflow
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="churn_model",
version=evaluation["new_run_id"],
stage="Production",
)
print(f"Deployed model with AUC {evaluation['new_auc']:.4f} (improvement: +{evaluation['improvement']:.4f})")
@task(task_id="notify_no_deployment", trigger_rule=TriggerRule.ALL_DONE)
def notify_no_deployment(evaluation: dict) -> None:
print(f"Skipped deployment. New AUC {evaluation['new_auc']:.4f} vs champion {evaluation['champion_auc']:.4f} - improvement {evaluation['improvement']:.4f} below threshold 0.005")
# wire the DAG
validated = validate_training_data()
trained = train_model
validated >> trained
evaluation = evaluate_model(trained.output)
deploy = deploy_model(evaluation)
skip = notify_no_deployment(evaluation)
churn_model_pipeline()
Backfill and Idempotency
Backfill is the process of running a DAG for historical dates - useful when you are deploying a new pipeline, fixing a bug in existing logic, or reprocessing after a data issue upstream.
# Re-run the DAG for every day between Jan 1 and Jan 31
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
daily_churn_model_training
:::danger Idempotency is non-negotiable for backfill
For backfill to work correctly, tasks must be idempotent: running a task for the same logical date a second time must produce the same result and not create duplicate data. If your load task does INSERT INTO metrics SELECT ... without a deduplication step, a backfill will double-count every record.
Use INSERT ... ON CONFLICT DO UPDATE (upsert), MERGE in BigQuery/Snowflake, or write to a partitioned path that overwrites the partition (e.g., s3://data/metrics/date=2024-01-15/ - writing to the same path replaces the data).
@task
def load_metrics(date: str) -> None:
# GOOD: upsert - running twice is safe
conn.execute("""
INSERT INTO daily_metrics (date, user_count, revenue)
VALUES (%(date)s, %(user_count)s, %(revenue)s)
ON CONFLICT (date) DO UPDATE SET
user_count = EXCLUDED.user_count,
revenue = EXCLUDED.revenue
""", {"date": date, "user_count": user_count, "revenue": revenue})
:::
Airflow 2.x: Dynamic Task Mapping
Dynamic task mapping (Airflow 2.3+) allows you to create task instances at runtime - for example, to process multiple files discovered in S3 without hardcoding the list:
@task
def list_files() -> list[str]:
import boto3
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket="data", Prefix="raw/2024-01-15/")
return [obj["Key"] for obj in response.get("Contents", [])]
@task
def process_file(file_key: str) -> dict:
# each file gets its own task instance
print(f"Processing {file_key}")
return {"file": file_key, "status": "done"}
files = list_files()
results = process_file.expand(file_key=files) # one task per file, all in parallel
This replaces the older pattern of dynamically generating Operators inside a DAG file - which was fragile because the list had to be available at DAG parse time.
Data-Aware Scheduling (Datasets)
Airflow 2.4+ introduced Datasets - a way to trigger DAGs based on data availability rather than a fixed schedule. A DAG that produces data declares its output as a Dataset. A downstream DAG declares a dependency on that Dataset and Airflow triggers it automatically when the upstream completes.
from airflow import Dataset
# Upstream DAG - produces a dataset
FEATURES_DATASET = Dataset("s3://data/features/daily/")
@dag(schedule="0 1 * * *", ...)
def feature_pipeline():
@task(outlets=[FEATURES_DATASET]) # declare this task produces the dataset
def compute_features():
# write to s3://data/features/daily/
pass
compute_features()
# Downstream DAG - triggered by dataset, not by clock
@dag(schedule=[FEATURES_DATASET], ...) # triggered when FEATURES_DATASET is updated
def training_pipeline():
@task
def train():
# guaranteed features are fresh
pass
train()
This eliminates a whole category of race conditions where a downstream pipeline starts before the upstream has finished - a common failure mode with clock-based scheduling.
Production Engineering Notes
Pools: Use Airflow pools to limit concurrency against specific resources. If your database can only handle 10 concurrent connections, create a pool with 10 slots and assign database tasks to that pool. Without pools, a large backfill can spawn hundreds of tasks that all try to connect to the database simultaneously.
t = PythonOperator(
task_id="query_warehouse",
pool="warehouse_connections", # limit to pool size
pool_slots=1,
)
Task Groups: For DAGs with many tasks, use TaskGroup to logically group related tasks in the UI without affecting execution semantics.
Priority Weights: Assign higher priority weights to tasks on the critical path. Airflow's Scheduler uses these weights to decide which tasks to schedule first when the worker pool is full.
DAG versioning: When you change a DAG that has active runs, Airflow may show inconsistencies between the old and new task structures. Use DAG versioning (Airflow 2.10+) or pause the DAG, let active runs finish, and then update.
:::tip Run the Scheduler in HA mode in production In Airflow 2.x, you can run multiple Scheduler instances against the same metadata database. The instances coordinate via a distributed lock. If one Scheduler crashes, another takes over within seconds. This is essential for production deployments - a single Scheduler is a single point of failure.
# Both instances watch the same database - Airflow handles coordination
airflow scheduler &
airflow scheduler &
:::
Interview Q&A
Q: What is the difference between a DAG run and a task instance in Airflow?
A DAG run is a single execution of the entire DAG for a specific logical date (the execution_date or data_interval_start). Within a DAG run, each task in the DAG has exactly one task instance - representing that specific task's execution within that run. If a task fails and is retried, the retries are recorded as attempts on the same task instance. The task instance state machine goes through: scheduled → queued → running → (success | failed | upstream_failed | skipped). You can view all task instances across all DAG runs from the Airflow UI's Grid view.
Q: What happens if the Airflow Scheduler goes down?
In Airflow 1.x, the Scheduler was a single process and its failure meant no new tasks would be scheduled until it restarted - running tasks continued (workers were not affected), but no new task instances would be queued. In Airflow 2.x, the Scheduler can run in high-availability mode with multiple instances. If one instance fails, another takes over within the next scheduler heartbeat (default: 5 seconds). In production, you should always run at least two Scheduler instances and alert on scheduler_heartbeat metrics going stale.
Q: When would you choose KubernetesExecutor over CeleryExecutor?
CeleryExecutor has persistent worker processes that are always running - good for high task frequency where you cannot afford Kubernetes pod startup latency (20-60 seconds). KubernetesExecutor starts a fresh pod for each task - ideal for ML workloads with variable resource requirements (some tasks need GPU, others need just 1 CPU), strong isolation needs (a training crash cannot affect other tasks), and heterogeneous dependency sets (different tasks need different Python environments or library versions). Many large deployments use CeleryKubernetesExecutor to route lightweight tasks to Celery workers and heavy tasks to Kubernetes.
Q: What is XCom and what should you never put in it?
XCom (Cross-Communication) is Airflow's mechanism for passing data between tasks within the same DAG run. Values are serialized and stored in the metadata database. You should put small, structured metadata in XCom: file paths, record counts, model metric values, run IDs, configuration dictionaries. You should never put large objects in XCom: DataFrames, model weights, large strings, or anything measured in megabytes. The metadata database is not designed as a data store - bloating it with large XCom values degrades the performance of every Airflow component that reads from the metadata database, including the Scheduler and the Webserver.
Q: What is catchup and when should you set it to False?
catchup=True (the default) tells the Scheduler that when a DAG is first deployed - or when start_date is in the past - it should schedule all DAG runs between start_date and now. If you deploy a DAG with start_date=datetime(2023, 1, 1) and schedule="@daily" on January 1, 2025, catchup=True will immediately schedule approximately 730 historical runs. This can overwhelm your workers.
catchup=False tells the Scheduler to only schedule the next upcoming run (or one historical run from the most recent interval). Set catchup=False for most production pipelines where historical backfill is not desired. When you do want to backfill, use airflow dags backfill as an explicit, controlled operation rather than relying on catchup.
Q: How does Airflow handle task retries and what patterns should you follow?
Each task has a retries parameter (number of retry attempts) and retry_delay (time between attempts). Failed tasks transition to up_for_retry state, wait the delay, and then transition back to scheduled. You can also use retry_exponential_backoff=True to double the delay with each retry - useful for tasks that hit rate-limited external APIs.
For production robustness: set retries=3 as the default in default_args, use exponential backoff for API-calling tasks, use retry_delay=timedelta(minutes=5) as a reasonable default, and set email_on_retry=False to avoid alert fatigue (you usually only care about the final failure). For tasks that interact with external systems and are genuinely non-idempotent (e.g., charging a payment), set retries=0 - retrying a charge is worse than failing.
Q: How does dynamic task mapping differ from generating tasks dynamically at parse time?
The older pattern - looping at DAG file parse time to create Operator instances - required the list of items to be known when the Scheduler parsed the DAG file. This meant hitting databases or APIs during parsing (which slows the Scheduler) or hardcoding lists (which is fragile). Dynamic task mapping (task.expand(param=iterable)) defers the fan-out to runtime: a first task produces the list, and then Airflow creates one downstream task instance per item in the list. This is cleaner, faster at parse time, and works correctly with backfill - because the list is re-generated for each historical run.
Airflow Monitoring and Alerting
A production Airflow deployment should be monitored at two levels: the infrastructure level (is Airflow itself healthy?) and the pipeline level (are individual DAGs and tasks succeeding?).
Infrastructure Metrics
Airflow exposes a StatsD endpoint that integrates with Prometheus and Grafana. Key metrics to alert on:
| Metric | Alert Condition | What It Means |
|---|---|---|
scheduler_heartbeat | No heartbeat for 60s | Scheduler process has died |
dagbag_size | Sudden drop | DAG files were deleted or have import errors |
dag_processing_manager_last_run | Stale for 5+ minutes | Scheduler is falling behind on DAG parsing |
executor_open_slots | Near zero for 10+ minutes | Worker pool is saturated - queued tasks are waiting |
task_instance_created_* | Zero over 30+ minutes | Scheduler stopped creating task instances |
Configure StatsD in airflow.cfg:
[metrics]
statsd_on = True
statsd_host = prometheus-statsd-exporter.monitoring.svc
statsd_port = 9125
statsd_prefix = airflow
Pipeline-Level Alerting
For task-level failures, use email_on_failure and on_failure_callback:
def task_failure_callback(context: dict) -> None:
"""Send a rich Slack alert with task context on failure."""
import requests, os
task_instance = context["task_instance"]
dag_run = context["dag_run"]
exception = context.get("exception", "Unknown error")
message = (
f":red_circle: *Task Failure*\n"
f"*DAG:* `{task_instance.dag_id}`\n"
f"*Task:* `{task_instance.task_id}`\n"
f"*Run:* `{dag_run.run_id}`\n"
f"*Date:* `{task_instance.execution_date}`\n"
f"*Error:* ```{str(exception)[:500]}```\n"
f"*Logs:* {task_instance.log_url}"
)
requests.post(os.environ["SLACK_WEBHOOK_URL"], json={"text": message})
default_args = {
"on_failure_callback": task_failure_callback,
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
For DAG-level failures (triggered after all retries are exhausted), use on_failure_callback on the DAG itself:
def dag_failure_callback(context: dict) -> None:
# This fires when the DAG run enters 'failed' state
requests.post(os.environ["PAGERDUTY_WEBHOOK_URL"], json={
"summary": f"DAG FAILED: {context['dag'].dag_id} - {context['dag_run'].run_id}",
"severity": "critical",
})
with DAG(
"critical_revenue_pipeline",
on_failure_callback=dag_failure_callback,
...
) as dag:
pass
Pools, Priorities, and Concurrency Control
Without concurrency control, a large backfill can spawn hundreds of tasks simultaneously, overloading downstream systems. Airflow provides three mechanisms to manage this.
Pools
Pools limit the number of simultaneously running tasks that share a resource. Create a pool in the Airflow UI (Admin → Pools) or via CLI:
airflow pools set warehouse_connections 10 "Max 10 concurrent warehouse queries"
airflow pools set gpu_training 4 "Max 4 concurrent GPU training tasks"
Assign tasks to a pool:
query_task = PythonOperator(
task_id="query_warehouse",
pool="warehouse_connections",
pool_slots=1, # this task uses 1 of the 10 available slots
)
big_query_task = PythonOperator(
task_id="large_analytical_query",
pool="warehouse_connections",
pool_slots=3, # claims 3 slots - counts as 3 against the pool
)
DAG-Level Concurrency
with DAG(
dag_id="bulk_processing",
max_active_runs=3, # at most 3 DAG runs executing simultaneously
max_active_tasks=10, # at most 10 tasks running across all active runs of this DAG
concurrency=10,
...
) as dag:
pass
Priority Weights
When the worker pool is full, Airflow queues tasks and prioritizes them by priority_weight. Tasks on the critical path should have higher weights:
extract = PythonOperator(
task_id="extract",
priority_weight=10, # high priority - runs first when slots free up
)
transform = PythonOperator(
task_id="transform",
priority_weight=5,
)
archive = PythonOperator(
task_id="archive_old_data",
priority_weight=1, # low priority - runs when nothing else is waiting
)
Jinja Templating and Airflow Macros
Airflow supports Jinja2 templating in operator arguments. This lets you inject runtime context - like the execution date - into your task parameters without writing Python code.
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Jinja macros automatically available in all operator arguments
bash_task = BashOperator(
task_id="process_date_partition",
# ds = execution date as YYYY-MM-DD
# ds_nodash = execution date as YYYYMMDD
# data_interval_start / data_interval_end = run's data interval boundaries
bash_command="""
dbt run \
--select tag:daily \
--vars '{"run_date": "{{ ds }}", "run_ts": "{{ data_interval_start | ts }}"}'
""",
)
def process_partition(run_date: str, **kwargs) -> None:
print(f"Processing partition: {run_date}")
python_task = PythonOperator(
task_id="process_partition",
python_callable=process_partition,
op_kwargs={"run_date": "{{ ds }}"}, # Jinja resolved at task runtime
)
Key Airflow macros:
| Macro | Value | Example |
|---|---|---|
{{ ds }} | Execution date string | 2024-01-15 |
{{ ds_nodash }} | Execution date no dashes | 20240115 |
{{ ts }} | ISO 8601 timestamp | 2024-01-15T02:00:00+00:00 |
{{ dag.dag_id }} | DAG ID | daily_user_metrics |
{{ run_id }} | Run ID | scheduled__2024-01-15T02:00:00 |
{{ var.value.KEY }} | Airflow Variable | s3://data/bucket |
{{ conn.CONN_ID.host }} | Connection host | warehouse.db.company.com |
:::note Jinja templating only works in "templated fields"
Not all operator parameters support Jinja templating - only those listed as template_fields in the Operator class. For PythonOperator, only op_args and op_kwargs are templated (not python_callable). For BashOperator, the bash_command field is templated. Check the operator documentation or source for the template_fields attribute.
:::
Hooks - Reusable Connection Logic
Hooks are the connection layer in Airflow. An Operator uses a Hook to communicate with an external system. Hooks abstract credential management (via Connections) and provide a reusable interface that can be shared across multiple operators.
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def load_to_warehouse(**context) -> None:
"""Load processed data from S3 into Postgres using hooks."""
# S3Hook reads credentials from the 'aws_prod' connection
s3 = S3Hook(aws_conn_id="aws_prod")
s3_client = s3.get_conn()
obj = s3_client.get_object(Bucket="data", Key=f"processed/{context['ds']}/output.csv")
csv_data = obj["Body"].read().decode("utf-8")
# PostgresHook reads credentials from the 'warehouse_prod' connection
pg = PostgresHook(postgres_conn_id="warehouse_prod")
pg.run(
sql="""
INSERT INTO daily_metrics (date, value)
VALUES (%(date)s, %(value)s)
ON CONFLICT (date) DO UPDATE SET value = EXCLUDED.value
""",
parameters={"date": context["ds"], "value": 42},
)
# Writing a custom Hook for an internal API
from airflow.hooks.base import BaseHook
import requests
class InternalAPIHook(BaseHook):
conn_name_attr = "internal_api_conn_id"
default_conn_name = "internal_api_default"
conn_type = "http"
def __init__(self, conn_id: str = default_conn_name):
super().__init__()
self.conn_id = conn_id
self._conn = None
def get_conn(self):
if self._conn is None:
conn = self.get_connection(self.conn_id)
self._session = requests.Session()
self._session.headers.update({"Authorization": f"Bearer {conn.password}"})
self._base_url = f"https://{conn.host}"
return self._session, self._base_url
def get_events(self, date: str) -> list:
session, base_url = self.get_conn()
resp = session.get(f"{base_url}/events", params={"date": date})
resp.raise_for_status()
return resp.json()
Custom Hooks make your operator code portable and testable - you can mock the Hook in unit tests without touching the Airflow Connection system.
