:::tip 🎮 Interactive Playground Visualize this concept: Try the Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::
Batch Orchestration Patterns for ML Pipelines
Reading time: ~40 min | Production relevance: Critical | Roles: Data Engineer, ML Engineer, MLOps Engineer
The 2 AM Silent Failure
It is 9:47 AM on a Thursday. A senior data scientist at a logistics company named Meridian Freight walks into the weekly model review meeting and drops a chart on the projector. Delivery-time prediction accuracy has fallen from 91% to 78% over the past seven days. The business team is escalating. A distribution center manager called at 6 AM because drivers are now scheduled wrong for the afternoon rush.
The data engineering team spends the next three hours tracing the issue. They find it eventually: a nightly cron job that ingests GPS telemetry data from 40,000 fleet vehicles silently failed six nights ago at 2:03 AM. There was no alert. There was no retry. The job just stopped, left no log anyone checked, and the downstream feature pipeline happily ran against the previous day's GPS snapshot - seven times in a row. The ML model trained on features computed from week-old location data. Nobody noticed because the model's retraining job also runs on a cron schedule, it completed successfully (it always does), and the deployment pipeline considered a successful training run a green light.
What makes this infuriating is that the job failure had a simple cause: a transient network timeout when connecting to the telemetry vendor's SFTP server. A single retry with a 30-second delay would have succeeded. But cron does not retry. Cron does not know what "retry" means.
There is a deeper problem here. The cron-based system at Meridian Freight consists of 200 scheduled jobs - ingestion, transformation, feature computation, model training, evaluation, deployment. These jobs have implicit dependencies. Feature computation must happen after ingestion. Training must happen after feature computation. Evaluation must happen after training. But cron does not know any of this. Each job is a standalone scheduled command. When one fails, the others run anyway, operating on whatever state was left behind. The system is a distributed state machine with no state tracking, no dependency graph, and no alerting. It is held together by the assumption that everything always works.
Proper ML pipeline orchestration solves all of these problems simultaneously. This lesson covers how to build orchestration that is observable, reliable, and expressive enough to capture real production complexity.
Why Orchestration: What Cron Cannot Do
Cron is a Unix job scheduler from 1975. It does one thing: run a command at a specified time. This is fine for simple isolated tasks. It is completely inadequate for interdependent ML pipelines.
The five fundamental failures of cron for ML:
-
No dependency management. Cron cannot express "run job B only after job A succeeds." You fake it with fixed time buffers - schedule B 30 minutes after A. When A takes 45 minutes, B runs on stale data. When A finishes in 10 minutes, B waits 20 minutes unnecessarily.
-
No retry semantics. A failed job is gone. There is no retry, no backoff, no escalation. A transient error (network blip, temporary lock) looks identical to a fatal error. The only remediation is manual re-run.
-
No alerting. Cron has no native alerting mechanism. A failed job writes to system logs that nobody reads. You find out about failures when downstream consumers complain.
-
No visibility. You cannot look at a cron system and understand what ran, what is running, what failed, and what is waiting. You have log files scattered across servers. That is it.
-
No parameterization. Cron runs the same job the same way every time. You cannot easily backfill a date range, change the processing window, or run the pipeline for a specific partition without manual intervention.
Modern orchestrators - Airflow, Dagster, Prefect - solve all five problems simultaneously, and add:
- Visual DAG representations of pipeline dependencies
- Task-level retry configuration with exponential backoff
- SLA monitoring and alerting
- Historical run logs and task duration analytics
- Programmatic backfill for missed or failed runs
- Dynamic task generation for parameterized pipelines
Historical Context
Apache Airflow was created at Airbnb in 2014 by Maxime Beauchemin, the same engineer who created Apache Superset. Airbnb was running a data platform with hundreds of interdependent ETL jobs and had exactly the problems described above at massive scale. Beauchemin's insight was to model pipelines as Directed Acyclic Graphs (DAGs) in Python - a format expressive enough to capture real dependencies and general enough to run any type of task.
Airflow was open-sourced in 2015 and donated to the Apache Software Foundation in 2016. It became the de facto standard for data pipeline orchestration. By 2020, virtually every large data-driven company was running Airflow in some form.
The critiques of Airflow were also well-known: its scheduler had scaling problems at very high task volumes, the Python-based DAG definition mixed scheduling concerns with business logic, and debugging failures required navigating multiple UI screens. These critiques birthed a new generation of orchestrators: Prefect (2018), Dagster (2019), and Mage (2022). Each takes a different approach to solving the core orchestration problem.
Apache Airflow Architecture
Understanding Airflow's architecture tells you both what it can do and where its limits are.
Components:
-
Scheduler: The brain. Continuously parses DAG files, determines which tasks are ready to run (all upstream dependencies succeeded), and submits them to the executor. The scheduler does not run tasks itself - it orchestrates.
-
Executor: The dispatcher. Receives task execution requests from the scheduler and decides how to run them. Three main executors:
LocalExecutor: Runs tasks as subprocesses on the scheduler machine. Fine for development and small pipelines (single-machine limit).CeleryExecutor: Distributes tasks to a pool of worker machines via a Redis or RabbitMQ message queue. Horizontally scalable - add more workers to handle more parallelism. Requires managing the worker fleet.KubernetesExecutor: Launches each task as an isolated Kubernetes Pod. Maximum isolation, dynamic scaling, no persistent worker fleet to manage. Higher per-task overhead (pod startup time ~15-30 seconds). Best for heterogeneous task requirements (different CPU/memory/GPU needs per task).
-
Webserver: A Flask application serving the Airflow UI. Shows DAG topology, task states, historical runs, logs. Read-only relative to pipeline execution - it monitors, not controls, except for manual triggers.
-
Metadata DB: PostgreSQL (recommended) or MySQL. Stores all state: DAG definitions, DAG run records, task instance states, XCom data, connection configurations. This is the single source of truth for all pipeline state.
-
Workers: The actual machines that execute task code. In CeleryExecutor mode, these are long-running processes polling the queue. In KubernetesExecutor mode, these are ephemeral pods.
For ML pipelines at medium scale (hundreds of tasks/day): CeleryExecutor with 5-10 workers is the common choice. It provides horizontal scaling without Kubernetes complexity. For ML platforms at large scale (thousands of tasks/day) or when tasks need GPU isolation: KubernetesExecutor. LocalExecutor is fine for teams just starting out - everything runs on one machine.
DAG Fundamentals
A DAG in Airflow is a Python object that defines the structure and behavior of a pipeline.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
# Default arguments applied to all tasks unless overridden
default_args = {
"owner": "data-engineering",
"depends_on_past": False, # Each run independent by default
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True, # 5m, 10m, 20m delays
"email_on_failure": True,
}
with DAG(
dag_id="nightly_ml_feature_pipeline",
description="Ingest telemetry, compute features, trigger training",
schedule="0 2 * * *", # 2 AM daily (cron expression)
start_date=datetime(2024, 1, 1),
catchup=False, # Don't backfill missed runs
max_active_runs=1, # Prevent concurrent runs (important for ML)
tags=["ml", "features", "nightly"],
default_args=default_args,
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
ingest = PythonOperator(
task_id="ingest_telemetry",
python_callable=ingest_telemetry_data,
op_kwargs={"date": "{{ ds }}"}, # Jinja: execution date as YYYY-MM-DD
)
compute_features = PythonOperator(
task_id="compute_features",
python_callable=compute_feature_table,
)
validate = PythonOperator(
task_id="validate_features",
python_callable=validate_feature_quality,
)
# Dependency chain: start → ingest → compute → validate → end
start >> ingest >> compute_features >> validate >> end
Key concepts:
-
schedule: A cron expression ortimedelta. Unlike cron, Airflow schedules relative to adata_interval- the period of data the run covers. A run scheduled for2024-01-15 02:00covers data from the previous day by default. -
catchup=False: Critical for ML pipelines. If your DAG was paused for two weeks, you do not want 14 consecutive runs firing immediately. Disable catchup unless you specifically need historical backfill. -
max_active_runs=1: Prevents a slow run from overlapping with the next scheduled run. For ML feature pipelines that write to feature tables, concurrent runs cause data corruption. -
{{ ds }}: Jinja templating for the execution date. Airflow makes the logical execution date available in task parameters, enabling data-interval-aware processing.
XCom: Inter-Task Communication
XCom (cross-communication) lets tasks share small values - typically metadata, not data.
def ingest_telemetry_data(**context):
# Simulate ingest: returns record count
records_ingested = 1_423_891
# Push to XCom under the default key
context["ti"].xcom_push(key="records_ingested", value=records_ingested)
return records_ingested # return value also gets pushed as 'return_value'
def validate_feature_quality(**context):
# Pull from upstream task's XCom
ti = context["ti"]
records = ti.xcom_pull(
task_ids="ingest_telemetry",
key="records_ingested"
)
if records < 1_000_000:
raise ValueError(
f"Expected 1M+ records, got {records:,}. "
"Upstream ingest may have failed partially."
)
print(f"Validated: {records:,} records ingested successfully")
XCom values are stored in the Airflow metadata DB. Do NOT push large objects (DataFrames, model weights, large arrays). The default XCom backend stores values as pickled blobs - a 100MB object will corrupt your metadata DB and cause the scheduler to crawl. XCom is for metadata: counts, paths, timestamps, flags. For actual data between tasks, use S3/GCS paths and push the path string to XCom.
Operators for ML Pipelines
Operators are the task types in Airflow. Each operator knows how to run a specific category of work.
SparkSubmitOperator
Submits a Spark application to a cluster:
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
compute_gps_features = SparkSubmitOperator(
task_id="spark_compute_gps_features",
application="/opt/spark-jobs/compute_gps_features.py",
name="gps-feature-computation-{{ ds }}",
conn_id="spark_default", # Airflow connection with Spark master URL
executor_cores=4,
executor_memory="8g",
num_executors=20,
application_args=[
"--date", "{{ ds }}",
"--input-path", "s3://data/telemetry/{{ ds }}/",
"--output-path", "s3://features/gps/{{ ds }}/",
],
conf={
"spark.sql.adaptive.enabled": "true",
"spark.sql.shuffle.partitions": "400",
"spark.dynamicAllocation.enabled": "true",
},
# Retry independently of DAG-level defaults
retries=3,
retry_delay=timedelta(minutes=10),
)
BranchPythonOperator: Conditional Flow
The evaluation gate pattern - only deploy if metrics are above threshold:
from airflow.operators.python import BranchPythonOperator
def check_model_metrics(**context):
"""Read evaluation results and decide: deploy or hold."""
ti = context["ti"]
metrics = ti.xcom_pull(task_ids="evaluate_model", key="metrics")
mae = metrics["mean_absolute_error"]
baseline_mae = metrics["baseline_mae"]
# Gate: model must beat baseline by at least 5%
if mae < baseline_mae * 0.95:
return "deploy_model" # Task ID to run next
else:
return "flag_regression" # Alternative branch
evaluation_gate = BranchPythonOperator(
task_id="evaluation_gate",
python_callable=check_model_metrics,
)
deploy = PythonOperator(
task_id="deploy_model",
python_callable=trigger_model_deployment,
)
flag_regression = PythonOperator(
task_id="flag_regression",
python_callable=create_regression_ticket,
)
# Both branches converge at end
evaluation_gate >> [deploy, flag_regression] >> end
KubernetesPodOperator: Containerized Workloads
Run any containerized task without managing a worker fleet:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
run_spark_job = KubernetesPodOperator(
task_id="spark_feature_computation",
name="spark-feature-{{ ds }}",
namespace="airflow",
image="company-registry/spark-feature-pipeline:v2.4.1",
cmds=["python", "/app/compute_features.py"],
arguments=["--date", "{{ ds }}", "--partitions", "200"],
env_vars={
"S3_BUCKET": "s3://ml-features",
"AWS_REGION": "us-east-1",
},
# Resource requests and limits
container_resources=k8s.V1ResourceRequirements(
requests={"cpu": "4", "memory": "16Gi"},
limits={"cpu": "8", "memory": "32Gi"},
),
# Image pull policy
image_pull_policy="Always",
# Clean up pod after completion
is_delete_operator_pod=True,
# Get logs from pod
get_logs=True,
# Don't inherit service account permissions broadly
service_account_name="spark-sa",
)
Sensors: Waiting for Upstream Data
Sensors block execution until a condition is met - essential for cross-system dependencies:
from airflow.sensors.s3_key_sensor import S3KeySensor
from airflow.sensors.external_task import ExternalTaskSensor
# Wait for upstream data to land in S3
wait_for_telemetry_data = S3KeySensor(
task_id="wait_for_telemetry",
bucket_name="data-lake",
bucket_key="telemetry/{{ ds }}/_SUCCESS", # Sentinel file
aws_conn_id="aws_default",
poke_interval=300, # Check every 5 minutes
timeout=7200, # Fail after 2 hours
mode="reschedule", # Release worker slot while waiting (important!)
)
# Wait for another DAG to complete
wait_for_raw_ingestion = ExternalTaskSensor(
task_id="wait_for_raw_ingestion_dag",
external_dag_id="raw_data_ingestion",
external_task_id="ingest_complete",
allowed_states=["success"],
execution_delta=timedelta(hours=1), # That DAG runs 1hr before this one
timeout=3600,
mode="reschedule",
)
rescheduleBy default, sensors use mode="poke", which holds a worker slot for the entire duration of the wait. If you have a 2-hour sensor and 10 workers, 10 poke-mode sensors starve all other tasks. Use mode="reschedule" - it releases the worker slot between checks, freeing capacity. In KubernetesExecutor, always use reschedule since each sensor would otherwise occupy its own pod.
Complete Nightly Feature Pipeline DAG
Here is a production-grade DAG for Meridian Freight's nightly ML feature pipeline:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.utils.task_group import TaskGroup
import requests
# ── Callbacks ─────────────────────────────────────────────────────────────────
def slack_failure_callback(context):
"""Send Slack alert with task details on failure."""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
execution_date = context["execution_date"].isoformat()
log_url = context["task_instance"].log_url
exception = context.get("exception", "Unknown error")
message = {
"text": (
f":red_circle: *Pipeline Failure*\n"
f"*DAG*: `{dag_id}`\n"
f"*Task*: `{task_id}`\n"
f"*Execution*: `{execution_date}`\n"
f"*Error*: `{str(exception)[:200]}`\n"
f"*Logs*: <{log_url}|View Logs>"
)
}
requests.post(
url="{{ var.value.slack_webhook_url }}",
json=message,
timeout=10,
)
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Alert when tasks exceed their expected duration."""
task_names = [s.task_id for s in slas]
requests.post(
url="{{ var.value.slack_webhook_url }}",
json={"text": f":warning: SLA missed for tasks: {task_names} in DAG {dag.dag_id}"},
timeout=10,
)
# ── Default Args ───────────────────────────────────────────────────────────────
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"on_failure_callback": slack_failure_callback,
"execution_timeout": timedelta(hours=2),
}
# ── DAG Definition ─────────────────────────────────────────────────────────────
with DAG(
dag_id="meridian_nightly_feature_pipeline",
description="Nightly GPS telemetry ingestion → Spark features → dbt → training",
schedule="0 1 * * *", # 1 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
sla_miss_callback=sla_miss_callback,
tags=["ml", "features", "nightly", "logistics"],
default_args=default_args,
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end", trigger_rule="none_failed_min_one_success")
# ── Stage 1: Wait for source data ─────────────────────────────────────────
wait_for_gps = S3KeySensor(
task_id="wait_for_gps_data",
bucket_name="meridian-datalake",
bucket_key="raw/gps/{{ ds }}/_SUCCESS",
poke_interval=300,
timeout=7200,
mode="reschedule",
sla=timedelta(hours=1),
)
# ── Stage 2: Spark feature computation ────────────────────────────────────
with TaskGroup("spark_features") as spark_features_group:
compute_gps_features = SparkSubmitOperator(
task_id="compute_gps_features",
application="/jobs/gps_features.py",
application_args=["--date", "{{ ds }}"],
executor_memory="8g",
num_executors=20,
sla=timedelta(hours=1, minutes=30),
)
compute_route_features = SparkSubmitOperator(
task_id="compute_route_features",
application="/jobs/route_features.py",
application_args=["--date", "{{ ds }}"],
executor_memory="4g",
num_executors=10,
)
compute_weather_features = SparkSubmitOperator(
task_id="compute_weather_features",
application="/jobs/weather_features.py",
application_args=["--date", "{{ ds }}"],
executor_memory="2g",
num_executors=5,
)
# ── Stage 3: dbt transformation ───────────────────────────────────────────
dbt_run = PythonOperator(
task_id="dbt_run_feature_models",
python_callable=run_dbt_models,
op_kwargs={
"models": "tag:ml_features",
"vars": {"run_date": "{{ ds }}"},
},
sla=timedelta(minutes=30),
)
# ── Stage 4: Feature validation ───────────────────────────────────────────
validate_features = PythonOperator(
task_id="validate_feature_quality",
python_callable=run_great_expectations_checkpoint,
op_kwargs={"checkpoint_name": "ml_features_nightly"},
)
# ── Stage 5: Evaluation gate ──────────────────────────────────────────────
def decide_training(**context):
ti = context["ti"]
validation_passed = ti.xcom_pull(
task_ids="validate_feature_quality",
key="validation_passed"
)
return "trigger_training" if validation_passed else "skip_training_alert"
training_gate = BranchPythonOperator(
task_id="training_gate",
python_callable=decide_training,
)
trigger_training = PythonOperator(
task_id="trigger_training",
python_callable=submit_training_job,
op_kwargs={"feature_date": "{{ ds }}"},
)
skip_training_alert = PythonOperator(
task_id="skip_training_alert",
python_callable=alert_validation_failure,
)
# ── Dependencies ──────────────────────────────────────────────────────────
start >> wait_for_gps >> spark_features_group >> dbt_run
dbt_run >> validate_features >> training_gate
training_gate >> [trigger_training, skip_training_alert] >> end
Dynamic DAGs: One Task Per Feature Group
When you have many similar tasks, generate them programmatically:
# Feature groups defined externally - could come from a config file or DB
FEATURE_GROUPS = [
{"name": "gps", "script": "gps_features.py", "executors": 20, "memory": "8g"},
{"name": "route", "script": "route_features.py", "executors": 10, "memory": "4g"},
{"name": "weather", "script": "weather_features.py", "executors": 5, "memory": "2g"},
{"name": "traffic", "script": "traffic_features.py", "executors": 15, "memory": "6g"},
{"name": "customer", "script": "customer_features.py","executors": 8, "memory": "4g"},
]
with DAG(
dag_id="dynamic_feature_computation",
schedule="0 1 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
start = EmptyOperator(task_id="start")
# Fan-in task - aggregates all feature groups
aggregate = PythonOperator(
task_id="aggregate_feature_store",
python_callable=merge_feature_groups,
)
# Dynamically generate one Spark task per feature group
feature_tasks = []
for group in FEATURE_GROUPS:
task = SparkSubmitOperator(
task_id=f"compute_{group['name']}_features",
application=f"/jobs/{group['script']}",
application_args=["--date", "{{ ds }}", "--group", group["name"]],
num_executors=group["executors"],
executor_memory=group["memory"],
retries=2,
)
feature_tasks.append(task)
# Fan-out: start → all feature tasks (parallel)
# Fan-in: all feature tasks → aggregate
start >> feature_tasks >> aggregate
Pipeline Orchestration DAG (Visual)
Dagster: Software-Defined Assets
Dagster takes a fundamentally different approach to orchestration. Instead of defining tasks and scheduling logic, you define assets - data objects (tables, files, models) that your code produces. Dagster figures out the execution order from asset dependencies.
from dagster import (
asset, AssetIn, Output, MetadataValue,
define_asset_job, ScheduleDefinition, AssetCheckResult, asset_check
)
import pandas as pd
# ── Assets ─────────────────────────────────────────────────────────────────────
@asset(
group_name="raw_ingestion",
description="Raw GPS telemetry records ingested from vendor SFTP",
compute_kind="python",
)
def raw_gps_telemetry(context) -> pd.DataFrame:
"""Pull GPS telemetry from vendor SFTP for today's date."""
date = context.partition_key # If using date partitions
df = ingest_from_sftp(date=date)
context.log.info(f"Ingested {len(df):,} records for {date}")
return Output(
value=df,
metadata={
"record_count": MetadataValue.int(len(df)),
"date": MetadataValue.text(date),
"columns": MetadataValue.int(len(df.columns)),
}
)
@asset(
ins={"raw_gps": AssetIn("raw_gps_telemetry")},
group_name="features",
description="GPS-derived features: avg speed, stop frequency, route deviation",
compute_kind="spark",
)
def gps_features(context, raw_gps: pd.DataFrame) -> pd.DataFrame:
"""Compute GPS features using Spark."""
spark_df = spark_session.createDataFrame(raw_gps)
features = compute_gps_feature_transforms(spark_df)
return features.toPandas()
@asset(
ins={
"gps": AssetIn("gps_features"),
"route": AssetIn("route_features"),
"weather": AssetIn("weather_features"),
},
group_name="ml_ready",
description="Final merged feature table ready for model training",
compute_kind="dbt",
)
def ml_feature_table(context, gps, route, weather) -> pd.DataFrame:
"""Merge all feature groups into the ML training feature table."""
merged = (
gps
.merge(route, on=["vehicle_id", "date"], how="inner")
.merge(weather, on=["region", "date"], how="left")
)
return merged
# ── Asset Check ────────────────────────────────────────────────────────────────
@asset_check(asset=ml_feature_table, description="Validate feature table quality")
def ml_feature_table_quality_check(ml_feature_table: pd.DataFrame) -> AssetCheckResult:
"""Assert feature table meets quality thresholds."""
null_rate = ml_feature_table.isnull().mean().max()
row_count = len(ml_feature_table)
passed = (null_rate < 0.01) and (row_count > 500_000)
return AssetCheckResult(
passed=passed,
metadata={
"null_rate": MetadataValue.float(null_rate),
"row_count": MetadataValue.int(row_count),
},
severity=AssetCheckSeverity.ERROR if not passed else AssetCheckSeverity.WARN,
)
# ── Job and Schedule ───────────────────────────────────────────────────────────
nightly_feature_job = define_asset_job(
name="nightly_feature_job",
selection=["raw_gps_telemetry", "gps_features", "ml_feature_table"],
)
nightly_schedule = ScheduleDefinition(
job=nightly_feature_job,
cron_schedule="0 1 * * *",
)
Dagster's key advantages over Airflow:
- Asset-centric thinking: The DAG represents the data lineage, not just execution order. You can see exactly which table depends on which upstream source.
- Asset checks: Built-in framework for data quality assertions that run after materialization.
- Partitions: First-class support for date partitions - backfill a specific date range with one command.
- No scheduler/worker split: Simpler deployment than Airflow's multi-component architecture.
Prefect: Flows and Deployments
Prefect takes the most developer-friendly approach - normal Python functions become tasks and flows with a decorator:
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
import pendulum
@task(retries=3, retry_delay_seconds=60, name="Ingest GPS Telemetry")
def ingest_gps(date: str) -> int:
"""Return record count."""
records = ingest_from_sftp(date=date)
write_to_s3(records, f"s3://data/gps/{date}/")
return len(records)
@task(retries=2, name="Compute GPS Features")
def compute_gps_features(date: str, record_count: int) -> str:
"""Run Spark job, return output path."""
if record_count < 1_000_000:
raise ValueError(f"Too few records: {record_count:,}")
submit_spark_job(date=date)
return f"s3://features/gps/{date}/"
@task(name="Validate Features")
def validate_features(feature_path: str) -> bool:
"""Run Great Expectations checkpoint."""
return run_ge_checkpoint(feature_path)
@flow(name="Nightly Feature Pipeline", log_prints=True)
def nightly_feature_pipeline(date: str = None):
if date is None:
date = pendulum.now().subtract(days=1).date().isoformat()
# Task calls return futures - Prefect handles parallelism
record_count = ingest_gps(date)
feature_path = compute_gps_features(date, record_count)
is_valid = validate_features(feature_path)
if not is_valid:
raise RuntimeError(f"Feature validation failed for {date}")
print(f"Pipeline complete for {date}: features at {feature_path}")
# Deploy to Prefect Cloud / Server
deployment = Deployment.build_from_flow(
flow=nightly_feature_pipeline,
name="nightly-production",
schedule=CronSchedule(cron="0 1 * * *"),
work_pool_name="kubernetes-pool",
parameters={"date": None},
)
SLA Monitoring and Alerting
SLA (Service Level Agreement) monitoring ensures jobs complete within expected time windows:
from airflow.models import SlaCallbackRequest
from airflow.utils.email import send_email
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called by Airflow scheduler when any task misses its SLA."""
missed_tasks = [sla.task_id for sla in slas]
blocking_tasks = [ti.task_id for ti in blocking_tis]
# Send pager alert for critical tasks
critical_tasks = {"compute_gps_features", "dbt_run_feature_models"}
if any(t in critical_tasks for t in missed_tasks):
send_pager_alert(
f"CRITICAL: SLA missed for {missed_tasks} - training window at risk"
)
# Always send email
send_email(
subject=f"[SLA Miss] DAG: {dag.dag_id} | Tasks: {missed_tasks}",
html_content=f"""
<h3>SLA Miss Detected</h3>
<p>Tasks that missed SLA: {missed_tasks}</p>
<p>Tasks blocking execution: {blocking_tasks}</p>
<p>Training model deployment may be delayed.</p>
""",
)
# Per-task SLA configuration
compute_gps_features = SparkSubmitOperator(
task_id="compute_gps_features",
application="/jobs/gps_features.py",
sla=timedelta(hours=1, minutes=30), # Must complete within 90 min
...
)
Production Engineering Notes
Airflow metadata DB maintenance: The metadata DB grows continuously with DAG run history, task logs, and XCom data. Run airflow db clean --clean-before-timestamp 90d monthly to prune old data. At 1000+ tasks/day, unmanaged metadata DB growth causes scheduler slowdowns.
DAG parsing performance: Airflow's scheduler parses all DAG files on every cycle. Complex DAGs with heavy imports (loading ML libraries, making API calls) slow scheduler performance. Keep DAG files lightweight - move all business logic to imported modules, not directly in the DAG file.
Concurrency tuning: The [core] parallelism setting controls maximum concurrent tasks across all DAGs. The [core] dag_concurrency setting controls per-DAG parallelism. Tune these based on your executor capacity, not arbitrarily.
Idempotent tasks: Every task in an ML pipeline must be idempotent - running it twice produces the same result. Write output to a deterministic path (include date partition), and overwrite rather than append. This makes retry safe and backfill correct.
Avoid task-level computation: Operators should submit work (to Spark, dbt, etc.), not do the work themselves in the Airflow process. A PythonOperator that processes 10GB of data in the worker will kill your Airflow workers and starve other tasks.
Common Mistakes
# WRONG - brittle cron-based timing
# Feature job runs at 3 AM, hoping ingestion (2 AM) finishes
"0 3 * * *" # What if ingestion takes 2 hours? Features run on stale data.
# RIGHT - sensor-based dependency
wait_for_ingestion = S3KeySensor(
task_id="wait_for_ingestion_complete",
bucket_key="raw/{{ ds }}/_SUCCESS",
mode="reschedule",
)
# WRONG - pushes a 500MB DataFrame through XCom
def compute_features(**context):
df = spark_compute()
context["ti"].xcom_push(key="features", value=df.to_dict()) # Kills metadata DB!
# RIGHT - push only the path
def compute_features(**context):
df = spark_compute()
path = "s3://features/2024-01-15/features.parquet"
df.write.parquet(path)
context["ti"].xcom_push(key="feature_path", value=path)
catchup=True on ML pipelinesEnabling catchup on a DAG that was paused for a month causes 30 consecutive runs to fire immediately. For ML training pipelines, this floods your training infrastructure and produces 30 model versions nobody needs. Always set catchup=False for ML pipelines. Handle backfill explicitly with airflow dags backfill when needed.
max_active_runs=1Without this, if a run takes longer than its schedule interval, the next scheduled run fires while the previous is still running. Two concurrent feature pipelines writing to the same partitioned table will produce corrupted or duplicated features. Always set max_active_runs=1 for pipelines that write to shared storage.
Interview Q&A
Q: What is the difference between CeleryExecutor and KubernetesExecutor?
A: CeleryExecutor distributes tasks to a pool of long-running worker processes via a message queue (Redis or RabbitMQ). Workers stay running between tasks - lower per-task overhead (milliseconds to start a task). You must manage the worker fleet: provision machines, handle failures, scale up/down manually or via autoscaling. Good for high-frequency tasks where startup cost matters.
KubernetesExecutor launches each task as a fresh Kubernetes pod. No persistent workers - Kubernetes creates and destroys pods per task. Higher per-task overhead (15-30 seconds for pod startup). Advantages: perfect task isolation (no shared state between tasks), natural resource heterogeneity (each task can request different CPU/memory/GPU), automatic cleanup, and no worker fleet to manage. Best for ML workloads where tasks need different environments and isolation matters more than startup time.
Q: How do you handle dependencies between DAGs in Airflow?
A: Three approaches, depending on coupling preference:
-
ExternalTaskSensor: The downstream DAG polls for upstream task completion. Polling interval configurable. Requires both DAGs to run on the same Airflow instance. Tight coupling - upstream DAG ID and task ID are hardcoded. -
Dataset-based triggers (Airflow 2.4+): The upstream DAG declares it produces a dataset; the downstream DAG declares it consumes it. Airflow triggers the consumer automatically when the producer completes. Looser coupling via dataset URI.
-
S3/GCS sentinel file: The upstream DAG writes a
_SUCCESSfile; the downstream DAG usesS3KeySensor. Fully decoupled - works across Airflow instances or even different orchestrators.
For production ML pipelines, I prefer the sentinel file approach because it works across system boundaries and is testable independently of Airflow.
Q: What is XCom and what are its limitations for ML pipelines?
A: XCom (cross-communication) is Airflow's mechanism for passing small values between tasks within the same DAG run. Values are stored as serialized blobs in the Airflow metadata DB. Tasks push values with ti.xcom_push(key, value) and pull with ti.xcom_pull(task_ids, key).
The critical limitation: XCom stores data in the metadata DB. This is fine for metadata (file paths, record counts, timestamps, flags). It is catastrophic for actual data. A task pushing a 100MB DataFrame will corrupt the DB, slow the scheduler, and cause failures across all DAGs sharing that DB. The rule: XCom carries paths, not data. Write the actual data to object storage and push the path.
Q: How do you implement an evaluation gate that prevents model deployment on bad metrics?
A: Use BranchPythonOperator. The evaluation task writes metrics to XCom. The gate task reads those metrics, applies threshold logic, and returns the task_id of either "deploy" or "flag_regression." Airflow skips all tasks not on the chosen branch. The final task uses trigger_rule="none_failed_min_one_success" so it runs regardless of which branch was chosen.
For stricter gates in production, I also add a human approval task (using Airflow's async @task.sensor or an external webhook) for models going to production. Automated metrics gates catch regressions; human review catches anything the metrics miss.
Q: What are the advantages of Dagster's software-defined assets over Airflow DAGs?
A: Three key advantages:
First, data lineage is explicit and first-class. In Airflow, you infer what data a task produces by reading the code. In Dagster, assets are named objects - you can see the full lineage graph from raw source to ML-ready feature table in the UI, and understand the impact of upstream changes immediately.
Second, asset checks provide a native data quality framework integrated with the orchestration graph. Checks run after their asset materializes. If a check fails, downstream assets that depend on it are blocked. Airflow requires external tools (Great Expectations) and custom operator logic to achieve the same.
Third, partitions are a first-class concept. Running a backfill in Dagster for a specific date range is a UI or CLI operation with built-in support. In Airflow, backfilling a parameterized range requires using airflow dags backfill with careful start/end date management.
Q: How do you scale Airflow for 1000+ tasks per day?
A: Multiple dimensions of scaling:
Scheduler: Use Airflow 2.0+ which supports multiple schedulers in HA mode. Pre-2.0 single-scheduler Airflow had a hard ceiling around 500-1000 tasks/day.
Executor: Switch from LocalExecutor to CeleryExecutor or KubernetesExecutor. Configure [core] parallelism to match worker capacity.
DAG parsing: Keep DAG files lightweight. Heavy imports (loading scikit-learn, making API calls at import time) slow the scheduler's parse cycle. Use @dag lazy imports.
Metadata DB: Use PostgreSQL (not SQLite). Run connection pooling via PgBouncer. Archive old run data regularly.
Worker capacity: For CeleryExecutor, autoscale workers using AWS ASG or Kubernetes HPA. Set [celery] worker_concurrency based on task memory requirements.
Monitoring: At this scale, instrument Airflow with Prometheus/StatsD metrics. Alert on scheduler heartbeat lag, task queue depth, and executor utilization. Scheduler lag is the first sign of a scaling problem.
Next lesson: Testing Data Pipelines - because a pipeline nobody trusts is a pipeline nobody uses.
