:::tip 🎮 Interactive Playground Visualize this concept: Try the Pipeline Orchestration demo on the EngineersOfAI Playground - no code required. :::
Orchestration Patterns for End-to-End ML Pipelines
The Pipeline That Turned a 4-Hour Incident Into a 4-Hour Job
A pipeline failed at step 7 of 12. The fix was a 3-line code change. Straightforward. But re-running from scratch took 4 hours because steps 1 through 6 were not idempotent. They wrote to the same output location every time they ran - no partitioning by run ID, no write-then-rename, just direct overwrites. When the pipeline failed at step 7 and was restarted, steps 1 through 6 re-ran from scratch and overwrote the data that step 8 was already reading from a parallel batch job.
The team ended up with a partially written table, a failed downstream model retraining, and a 4-hour backfill to recover the correct state. With idempotent writes and proper checkpoint patterns, re-running from step 7 would have been a 20-minute operation. The 3-line fix would have been deployed and production would have recovered in the time it took to file the incident ticket.
This is not a story about a bad engineer. It is a story about what happens when pipeline design ignores the properties that make pipelines reliable. Idempotency, checkpointing, sensible retry logic, and proper backfill strategies are not advanced techniques - they are the baseline. Understanding these patterns is the difference between a pipeline that recovers gracefully from failures and one that turns every bug fix into a multi-hour fire drill.
This lesson covers the orchestration patterns that matter most in production ML and data engineering pipelines: idempotency, checkpointing, dependency management, backfill strategies, fan-out/fan-in, retry logic, SLA management, pipeline testing, and deployment patterns.
Why These Patterns Exist - The Production Reality
A local development pipeline runs once, on your laptop, with your data, with you watching it. Production pipelines run unattended, on shared infrastructure, on data that changes character over time, with upstream failures that nobody controls. The gap between "it works on my machine" and "it works reliably in production at 2 AM with no one watching" is almost entirely explained by whether the pipeline was designed with these patterns in mind.
The specific failure modes that motivated each pattern:
- Idempotency - motivated by restarts. Every pipeline will be restarted. The question is whether a restart leaves data in a consistent state.
- Checkpointing - motivated by partial failures. A task that writes 80% of its output before crashing leaves the downstream consumer in a broken state. Atomic writes prevent this.
- Dataset-triggered scheduling - motivated by coupling. Hard-coded time-based schedules create invisible coupling: if upstream is late, downstream silently reads stale data.
- Backfill patterns - motivated by data gaps. Historical data needs to be re-processed. The question is whether the pipeline can do this efficiently without reprocessing everything every time.
- Fan-out/fan-in - motivated by throughput. Sequential processing of independent work is slower than necessary and makes failure recovery more expensive.
- Retry strategies - motivated by transient failures. Network timeouts, API rate limits, and resource contention are inevitable. Naive retries make them worse.
Idempotency - The Single Most Important Property
A function is idempotent if running it multiple times produces the same result as running it once. For a pipeline task, this means: if this task runs 10 times, the final state of the world is the same as if it ran once.
This sounds simple. In practice, it is violated constantly.
The formal definition from mathematics: a function is idempotent if . For pipeline tasks, we want: running the task with the same inputs always produces the same outputs and the same side effects on persistent state.
The Append Anti-Pattern
# DANGEROUS - not idempotent
def load_daily_events(date: str, engine):
"""Load today's events into the events table."""
df = fetch_events_from_source(date)
# This appends on every run - re-runs create duplicate rows
df.to_sql("daily_events", engine, if_exists="append", index=False)
If this task runs twice - due to a retry, a manual re-run, or an orchestrator bug - you get duplicate rows. Your row counts double. Aggregates become wrong. Downstream models train on a corrupted dataset. The bug may not be detected for days.
The Idempotent Delete + Insert Pattern
# CORRECT - idempotent via delete+insert
def load_daily_events_idempotent(date: str, engine):
"""Load today's events idempotently via delete+insert within a transaction."""
df = fetch_events_from_source(date)
with engine.begin() as conn:
# Delete the partition for this date first
conn.execute(
sa.text("DELETE FROM daily_events WHERE event_date = :date"),
{"date": date}
)
# Then insert the fresh data
df.to_sql("daily_events", conn, if_exists="append", index=False)
# If either operation fails, the transaction rolls back.
# The table is never left in a partial state.
Now this task can run 10 times and the result is always: exactly one copy of today's events in the table.
The Upsert Pattern (for keyed tables)
When rows have a natural primary key and you want to update existing rows:
def upsert_user_features(features_df: pd.DataFrame, engine):
"""Upsert user features - update if exists, insert if new."""
with engine.begin() as conn:
# PostgreSQL upsert using ON CONFLICT
for _, row in features_df.iterrows():
conn.execute(sa.text("""
INSERT INTO user_features (user_id, event_count, last_active, feature_date)
VALUES (:user_id, :event_count, :last_active, :feature_date)
ON CONFLICT (user_id, feature_date)
DO UPDATE SET
event_count = EXCLUDED.event_count,
last_active = EXCLUDED.last_active
"""), row.to_dict())
For bulk operations, use INSERT ... ON CONFLICT with a staging table approach:
def bulk_upsert_features(features_df: pd.DataFrame, engine, target_table: str):
"""Efficient bulk upsert via staging table."""
staging_table = f"{target_table}_staging_{int(time.time())}"
with engine.begin() as conn:
# Write to a temp staging table
features_df.to_sql(staging_table, conn, if_exists="replace", index=False)
# Upsert from staging into target
conn.execute(sa.text(f"""
INSERT INTO {target_table}
SELECT * FROM {staging_table}
ON CONFLICT (user_id, feature_date)
DO UPDATE SET
event_count = EXCLUDED.event_count,
last_active = EXCLUDED.last_active
"""))
# Drop staging
conn.execute(sa.text(f"DROP TABLE {staging_table}"))
Testing Idempotency
Idempotency should be tested explicitly:
def test_load_daily_events_is_idempotent(test_engine, test_date):
"""Running the task twice should produce the same result as running it once."""
load_daily_events_idempotent(test_date, test_engine)
count_after_first_run = test_engine.execute(
f"SELECT COUNT(*) FROM daily_events WHERE event_date = '{test_date}'"
).scalar()
load_daily_events_idempotent(test_date, test_engine)
count_after_second_run = test_engine.execute(
f"SELECT COUNT(*) FROM daily_events WHERE event_date = '{test_date}'"
).scalar()
assert count_after_first_run == count_after_second_run, (
f"Task is not idempotent: {count_after_first_run} rows after first run, "
f"{count_after_second_run} after second run"
)
The Checkpoint Pattern - Atomic Writes
Even an idempotent task can cause problems if it fails mid-write and a downstream consumer reads the partial output. The checkpoint pattern prevents this by separating the write into two phases: write to a staging location, then atomically rename/move to the final location.
A downstream consumer reading s3://bucket/features/today/part-0.parquet will either see the complete output or see that the file doesn't exist yet. It will never see a partially written file.
import boto3
import io
import pandas as pd
def write_features_atomically(features_df: pd.DataFrame, date: str):
"""
Write features to S3 atomically using staging + rename pattern.
Downstream consumers never see partial writes.
"""
s3 = boto3.client("s3")
bucket = "my-company-data"
# Write to staging path
staging_key = f"features/_staging/{date}/part-0.parquet"
final_key = f"features/{date}/part-0.parquet"
buffer = io.BytesIO()
features_df.to_parquet(buffer, index=False)
buffer.seek(0)
s3.put_object(
Bucket=bucket,
Key=staging_key,
Body=buffer.getvalue()
)
# Atomic rename: copy from staging to final, delete staging
# S3 copy is atomic from the reader's perspective
s3.copy_object(
CopySource={"Bucket": bucket, "Key": staging_key},
Bucket=bucket,
Key=final_key
)
s3.delete_object(Bucket=bucket, Key=staging_key)
For database writes, use transactions:
def write_aggregates_atomically(agg_df: pd.DataFrame, date: str, engine):
"""
Use a transaction to ensure the delete+insert is atomic.
Readers never see the table with zero rows between delete and insert.
"""
with engine.begin() as conn:
conn.execute(
sa.text("DELETE FROM daily_aggregates WHERE agg_date = :date"),
{"date": date}
)
agg_df.to_sql("daily_aggregates", conn, if_exists="append", index=False)
# Transaction commits here. If anything above fails, it rolls back.
Dependency Management Patterns
ExternalTaskSensor - Waiting for Another DAG
When DAG A produces data that DAG B needs, you can use an ExternalTaskSensor to make DAG B wait until a specific task in DAG A succeeds:
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
with DAG(
dag_id="model_training",
schedule_interval="0 6 * * *",
start_date=datetime(2024, 1, 1),
) as dag:
wait_for_features = ExternalTaskSensor(
task_id="wait_for_feature_engineering",
external_dag_id="feature_engineering",
external_task_id="compute_user_features",
execution_date_fn=lambda dt: dt, # same logical date
mode="reschedule", # don't block a worker slot while waiting
timeout=3600, # fail if features not ready within 1 hour
poke_interval=60, # check every minute
)
train_model = PythonOperator(
task_id="train_model",
python_callable=run_model_training,
)
wait_for_features >> train_model
Dataset-Triggered Scheduling (Airflow 2.4+)
A more modern approach: declare datasets as outputs of DAGs, and trigger downstream DAGs when datasets are updated. This replaces time-based coupling with data-based coupling.
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
user_features_dataset = Dataset("s3://my-bucket/features/user_features/")
# Upstream DAG: declares that it produces the user_features dataset
with DAG(
dag_id="feature_engineering",
schedule_interval="0 2 * * *",
) as upstream_dag:
compute_features = PythonOperator(
task_id="compute_user_features",
python_callable=compute_user_features,
outlets=[user_features_dataset], # declares this task updates the dataset
)
# Downstream DAG: triggers whenever user_features_dataset is updated
with DAG(
dag_id="model_training",
schedule=[user_features_dataset], # dataset-based schedule
) as downstream_dag:
train = PythonOperator(
task_id="train_model",
python_callable=train_model,
)
Now model training triggers automatically whenever feature engineering completes - not on a fixed clock schedule. If feature engineering runs late, model training waits. If feature engineering runs early, model training starts early.
Event-Driven Triggering via REST API
For triggering from external systems - S3 events, Kafka messages, database CDC events:
import boto3
import json
import requests
def trigger_dag_on_s3_event(event: dict, context):
"""
Lambda function that triggers an Airflow DAG when a new file lands in S3.
Deploy this as an S3 event notification target.
"""
s3_key = event["Records"][0]["s3"]["object"]["key"]
bucket = event["Records"][0]["s3"]["bucket"]["name"]
airflow_url = "https://airflow.internal/api/v1/dags/feature_engineering/dagRuns"
airflow_auth = ("admin", "secure_password")
response = requests.post(
airflow_url,
auth=airflow_auth,
json={
"conf": {
"source_bucket": bucket,
"source_key": s3_key,
"triggered_by": "s3_event",
}
}
)
if response.status_code not in (200, 409): # 409 = already running, acceptable
raise RuntimeError(
f"Failed to trigger DAG: {response.status_code} {response.text}"
)
Backfill Patterns - Processing Historical Data
Backfills are necessary when you fix a bug (re-process affected dates), add a new feature (compute it for all historical dates), or onboard a new data source (populate it historically). Three patterns cover most needs.
Full Backfill - Re-Process Everything
The simplest approach. Re-run the pipeline for every date from the start date to today.
from datetime import date, timedelta
from airflow.models import DAG
from airflow.operators.python import PythonOperator
def run_full_backfill(start_date: date, end_date: date):
"""
Trigger backfill runs for every date in the range.
Use Airflow's backfill CLI for orchestrated re-runs:
airflow dags backfill feature_engineering
--start-date 2024-01-01
--end-date 2024-03-01
--reset-dagruns
"""
current = start_date
while current <= end_date:
trigger_dag_run(dag_id="feature_engineering", execution_date=current)
current += timedelta(days=1)
When to use: bug affected all dates, or data volume is small enough that full re-processing is cheap.
When NOT to use: pipeline processes petabytes of data, re-processing would take weeks and exceed your SLA.
Incremental Backfill - Watermark-Based
Track the last successfully processed position and re-process only what's new or changed.
import sqlalchemy as sa
from datetime import datetime, date
class WatermarkBackfill:
"""
Track the last processed date in a watermark table.
Incremental backfills re-process only dates after the watermark.
"""
def __init__(self, engine, pipeline_name: str):
self.engine = engine
self.pipeline_name = pipeline_name
def get_watermark(self) -> date:
with self.engine.connect() as conn:
result = conn.execute(sa.text("""
SELECT MAX(processed_date)
FROM pipeline_watermarks
WHERE pipeline_name = :name
"""), {"name": self.pipeline_name}).scalar()
return result or date(2020, 1, 1)
def update_watermark(self, processed_date: date):
with self.engine.begin() as conn:
conn.execute(sa.text("""
INSERT INTO pipeline_watermarks (pipeline_name, processed_date)
VALUES (:name, :date)
ON CONFLICT (pipeline_name)
DO UPDATE SET processed_date = EXCLUDED.processed_date
"""), {"name": self.pipeline_name, "date": processed_date})
def get_dates_to_process(self, end_date: date) -> list[date]:
watermark = self.get_watermark()
dates = []
current = watermark + timedelta(days=1)
while current <= end_date:
dates.append(current)
current += timedelta(days=1)
return dates
When to use: large historical datasets, re-processing only new data is sufficient, no retroactive corrections needed.
Selective Backfill - Specific Date Ranges
When a bug affected only a known range of dates, re-process only those dates.
from airflow.api.client.local_client import Client
def selective_backfill(
dag_id: str,
affected_dates: list[str],
reason: str,
) -> None:
"""
Trigger re-runs for specific affected dates.
Useful when a bug has a known impact window.
Example:
selective_backfill(
dag_id="feature_engineering",
affected_dates=["2024-02-14", "2024-02-15", "2024-02-16"],
reason="TZ bug caused off-by-one in daily aggregates"
)
"""
client = Client(None, None)
for date_str in affected_dates:
print(f"Triggering backfill for {date_str}: {reason}")
client.trigger_dag(
dag_id=dag_id,
run_id=f"selective_backfill_{date_str}_{int(time.time())}",
conf={
"backfill_date": date_str,
"backfill_reason": reason,
},
execution_date=date_str,
)
time.sleep(2) # Avoid overwhelming the scheduler
Fan-Out / Fan-In - Parallel Processing
Independent work should run in parallel. When a pipeline trains 10 models for 10 user segments, training them sequentially wastes 9x the wall clock time compared to training in parallel.
Dynamic Task Mapping in Airflow
Airflow 2.3+ supports dynamic task mapping - create task instances at runtime based on a list of inputs.
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval="0 6 * * *", start_date=datetime(2024, 1, 1))
def parallel_model_training():
@task
def get_segments() -> list[dict]:
"""Return the list of segments to train models for."""
return [
{"segment": "high_value", "feature_table": "features_high_value"},
{"segment": "mid_value", "feature_table": "features_mid_value"},
{"segment": "new_users", "feature_table": "features_new_users"},
{"segment": "churned", "feature_table": "features_churned"},
{"segment": "dormant", "feature_table": "features_dormant"},
]
@task
def train_segment_model(segment_config: dict) -> dict:
"""Train a model for a single segment. Runs in parallel across all segments."""
segment = segment_config["segment"]
features_table = segment_config["feature_table"]
# Load features for this segment
features = load_features(features_table)
model, metrics = train_model(features)
# Save model artifact
model_path = f"s3://models/{segment}/latest.pkl"
save_model(model, model_path)
return {
"segment": segment,
"model_path": model_path,
"accuracy": metrics["accuracy"],
"auc": metrics["auc"],
}
@task
def aggregate_results(training_results: list[dict]) -> None:
"""Collect results from all parallel training runs."""
import pandas as pd
results_df = pd.DataFrame(training_results)
print(f"Trained {len(results_df)} segment models")
print(results_df[["segment", "accuracy", "auc"]].to_string())
# Update model registry
register_all_models(results_df)
segments = get_segments()
# .expand() creates one task instance per segment - all run in parallel
training_results = train_segment_model.expand(segment_config=segments)
aggregate_results(training_results)
dag = parallel_model_training()
Parallel Tasks in Prefect
from prefect import flow, task
from prefect.futures import PrefectFuture
@task
def train_segment_model(segment: str, feature_table: str) -> dict:
features = load_features(feature_table)
model, metrics = train_model(features)
return {"segment": segment, "metrics": metrics}
@flow
def parallel_training_flow(segments: list[dict]):
# Submit all tasks - they run concurrently
futures: list[PrefectFuture] = [
train_segment_model.submit(
segment=seg["segment"],
feature_table=seg["feature_table"]
)
for seg in segments
]
# Collect results - waits for all futures to complete
results = [future.result() for future in futures]
aggregate_and_register(results)
Retry Strategies - Handling Transient Failures
Retrying blindly on all failures is as dangerous as not retrying at all. A logic error that generates a corrupt output should not be retried 10 times, writing corrupt data 10 times. Retry strategy must be targeted and bounded.
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta
import time
import random
def exponential_backoff_retry(func, max_retries: int = 5, base_delay: float = 1.0):
"""
Retry with exponential backoff and jitter.
Retry only on transient errors (network, rate limits, resource contention).
"""
retryable_exceptions = (
ConnectionError,
TimeoutError,
IOError,
# Add specific API exceptions here
)
for attempt in range(max_retries):
try:
return func()
except retryable_exceptions as e:
if attempt == max_retries - 1:
raise # Final attempt - re-raise the exception
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)
except Exception as e:
# Non-retryable error - fail immediately, don't retry
print(f"Non-retryable error: {type(e).__name__}: {e}")
raise
# In Airflow, configure retry at the task level
with DAG("my_pipeline", ...) as dag:
load_data = PythonOperator(
task_id="load_data",
python_callable=load_data_function,
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True, # Doubles the delay on each retry
max_retry_delay=timedelta(minutes=30),
on_retry_callback=send_slack_retry_alert,
on_failure_callback=send_pagerduty_alert,
)
Distinguish retryable from non-retryable failures:
| Error Type | Retry? | Strategy |
|---|---|---|
| Network timeout | Yes | Exponential backoff, up to 5 retries |
| API rate limit (429) | Yes | Exponential backoff, respect Retry-After header |
| Resource temporarily unavailable | Yes | Short delay, up to 3 retries |
| Authentication failure (401) | No | Fail immediately, alert |
| Schema mismatch | No | Fail immediately, alert |
| Data validation failure | No | Route to dead letter, alert |
| Disk full | No | Fail immediately, page on-call |
def smart_retry(func, context: dict):
"""
Retry with error-type-aware logic.
"""
try:
return func()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limited - wait for the Retry-After header value
retry_after = int(e.response.headers.get("Retry-After", 60))
time.sleep(retry_after)
return func()
elif e.response.status_code == 401:
raise RuntimeError(
"Authentication failed - check API credentials. Not retrying."
) from e
else:
raise
except (ConnectionError, TimeoutError):
time.sleep(30)
return func() # One immediate retry for transient network failures
SLA Management - Defining and Enforcing Pipeline SLAs
An SLA is a commitment about when a pipeline will complete. "The daily features will be available by 6 AM" is an SLA. Airflow supports SLA monitoring natively.
from airflow import DAG
from airflow.models import TaskInstance
from datetime import timedelta, datetime
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when a task misses its SLA. Send a Slack alert."""
import requests
sla_tasks = [sla.task_id for sla in slas]
message = (
f":warning: *SLA Miss* in DAG `{dag.dag_id}`\n"
f"Tasks that missed SLA: {', '.join(sla_tasks)}\n"
f"Expected completion by: {slas[0].execution_date + slas[0].timedelta}"
)
requests.post(
url="https://hooks.slack.com/services/T00000/B00000/xxxxx",
json={"text": message},
)
with DAG(
dag_id="feature_engineering",
sla_miss_callback=sla_miss_callback,
schedule_interval="0 2 * * *",
) as dag:
compute_features = PythonOperator(
task_id="compute_user_features",
python_callable=compute_features_fn,
sla=timedelta(hours=3), # Must complete within 3 hours of scheduled start
)
upload_features = PythonOperator(
task_id="upload_to_feature_store",
python_callable=upload_features_fn,
sla=timedelta(hours=4), # Total pipeline must finish within 4 hours
)
SLA compliance dashboard - track SLA compliance over time and surface degradation trends before they become incidents:
import sqlalchemy as sa
from datetime import date, timedelta
def compute_sla_compliance(
engine,
dag_id: str,
lookback_days: int = 30
) -> dict:
"""
Compute SLA compliance rate for a DAG over the last N days.
"""
with engine.connect() as conn:
result = conn.execute(sa.text("""
SELECT
COUNT(*) as total_runs,
SUM(CASE WHEN duration_seconds <= sla_seconds THEN 1 ELSE 0 END) as on_time_runs,
AVG(duration_seconds) as avg_duration_seconds,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_seconds) as p95_duration
FROM pipeline_run_metrics
WHERE dag_id = :dag_id
AND run_date >= CURRENT_DATE - INTERVAL ':days days'
"""), {"dag_id": dag_id, "days": lookback_days}).fetchone()
return {
"total_runs": result.total_runs,
"on_time_runs": result.on_time_runs,
"compliance_rate": result.on_time_runs / result.total_runs,
"avg_duration_minutes": result.avg_duration_seconds / 60,
"p95_duration_minutes": result.p95_duration / 60,
}
Pipeline Testing - Three Levels
Level 1: Unit Tests - test transformation logic in isolation, no infrastructure:
import pytest
import pandas as pd
from my_pipeline.transforms import compute_user_features
def test_compute_user_features_aggregates_correctly():
events = pd.DataFrame({
"user_id": ["u1", "u1", "u2", "u2", "u2"],
"event_id": range(5),
"amount": [10, 20, 5, 15, 25],
"timestamp": pd.date_range("2024-01-01", periods=5, freq="H"),
})
features = compute_user_features(events)
u1_features = features[features["user_id"] == "u1"].iloc[0]
assert u1_features["event_count"] == 2
assert u1_features["total_spend"] == 30.0
u2_features = features[features["user_id"] == "u2"].iloc[0]
assert u2_features["event_count"] == 3
assert u2_features["total_spend"] == 45.0
Level 2: Integration Tests - test the full pipeline with synthetic data and real infrastructure (test database, test S3 bucket):
@pytest.fixture
def test_database():
"""Spin up a PostgreSQL test database via Docker."""
import subprocess
container_id = subprocess.check_output([
"docker", "run", "-d",
"-e", "POSTGRES_PASSWORD=test",
"-p", "5433:5432",
"postgres:16"
]).decode().strip()
time.sleep(2) # Wait for startup
engine = sa.create_engine("postgresql://postgres:test@localhost:5433/postgres")
yield engine
subprocess.run(["docker", "stop", container_id])
subprocess.run(["docker", "rm", container_id])
def test_full_pipeline_integration(test_database):
"""Run the full pipeline with synthetic data. Verify output correctness."""
# Seed input data
seed_test_events(test_database, n_users=100, n_events_per_user=20)
# Run the pipeline
run_pipeline(test_database, execution_date="2024-01-15")
# Verify output
with test_database.connect() as conn:
result = conn.execute(
sa.text("SELECT COUNT(*) FROM user_features WHERE feature_date = '2024-01-15'")
).scalar()
assert result == 100, f"Expected 100 user feature rows, got {result}"
Level 3: Shadow Mode - run the new pipeline in parallel with the old one and compare outputs:
def run_shadow_comparison(date: str, engine):
"""
Run old and new pipeline in parallel.
Log any output differences for investigation.
Don't use new pipeline output until comparison passes.
"""
# Run both pipelines
old_output = run_old_pipeline(date, engine)
new_output = run_new_pipeline(date, engine)
# Compare outputs
merged = old_output.merge(
new_output,
on="user_id",
suffixes=("_old", "_new"),
how="outer"
)
discrepancies = merged[
abs(merged["event_count_old"] - merged["event_count_new"]) > 0
]
if len(discrepancies) > 0:
discrepancy_rate = len(discrepancies) / len(merged)
log.warning(
f"Shadow comparison: {len(discrepancies)} discrepancies "
f"({discrepancy_rate:.1%}) for date {date}"
)
# Write discrepancies to investigation table
discrepancies.to_sql(
"pipeline_shadow_discrepancies", engine,
if_exists="append", index=False
)
else:
log.info(f"Shadow comparison: no discrepancies for date {date}. New pipeline validated.")
Blue/Green Pipeline Deployment
Run the old and new pipeline simultaneously, routing production traffic to the old pipeline until the new one is validated.
from enum import Enum
class PipelineVersion(Enum):
BLUE = "blue" # Current production
GREEN = "green" # Candidate
def get_active_version() -> PipelineVersion:
"""Read the active pipeline version from a feature flag store."""
import redis
r = redis.Redis(host="flags.internal", port=6379)
version = r.get("active_pipeline_version")
return PipelineVersion(version.decode() if version else "blue")
def run_pipeline_for_date(date: str, engine):
"""Route to the active pipeline version."""
version = get_active_version()
if version == PipelineVersion.BLUE:
run_blue_pipeline(date, engine)
else:
run_green_pipeline(date, engine)
def promote_to_production():
"""Cut over to the green pipeline after validation."""
import redis
r = redis.Redis(host="flags.internal", port=6379)
r.set("active_pipeline_version", "green")
log.info("Promoted green pipeline to production")
Mermaid Diagram - Backfill Strategy Comparison
Common Mistakes
:::danger Append-Only Writes Without a Delete-First Step
Using if_exists="append" in pandas or an INSERT without a prior DELETE creates duplicate rows on every re-run. Duplicate rows in feature tables propagate into model training and degrade model quality in ways that are hard to debug. Always use delete+insert within a transaction, or use upsert with ON CONFLICT.
:::
:::danger Retrying on Non-Retryable Errors A data validation failure should not be retried - the data is wrong, and retrying won't fix it. Retrying blindly on all exceptions masks root causes and can write corrupt output multiple times. Classify exceptions explicitly and only retry transient failures. :::
:::warning Hard-Coding Execution Dates as datetime.now()
Using datetime.now() inside a pipeline task breaks backfills. When you re-run a task for 2024-01-15, datetime.now() returns today's date, not 2024-01-15. Always pass the execution date as a parameter from the orchestrator: context["ds"] in Airflow, context.partition_key in Dagster.
:::
:::warning Missing Idempotency Tests Idempotency is not something you can check by code review alone. Explicitly test it: run the task twice with the same inputs and assert the output is identical. Add this test to CI so regressions are caught before they reach production. :::
Interview Questions and Answers
Q1: What does idempotency mean for a pipeline task, and how do you implement it for a database write?
Idempotency means running a task multiple times with the same inputs produces the same final state as running it once. For a database write, the standard implementation is delete-then-insert within a single transaction, partitioned by the logical date. You first delete all rows for the date partition being processed, then insert the fresh data. Both operations happen in the same transaction, so if either fails, the table rolls back to its pre-run state. The alternative is an upsert (INSERT ... ON CONFLICT DO UPDATE), which is appropriate when rows have natural primary keys and you want to update existing rows rather than replace entire partitions.
Q2: What is the checkpoint pattern, and why is it necessary even if your task is idempotent?
The checkpoint pattern separates writes into two phases: write to a staging location, then atomically rename/move to the final location. It is necessary because idempotency protects against duplicate writes but not against partial writes. If a task writes 80% of a file to S3 before crashing, a downstream consumer polling that path may read corrupt partial data. The rename/copy step is atomic from the reader's perspective - they either see the complete file or see nothing. Without checkpointing, you must coordinate at the application level to handle partial reads, which is much more complex.
Q3: What is the difference between ExternalTaskSensor and Dataset-triggered scheduling in Airflow?
ExternalTaskSensor polls for the completion of a specific task in another DAG. It is time-based: it waits for the task at a specific execution_date. Dataset-triggered scheduling (Airflow 2.4+) triggers a DAG when a declared Dataset is updated, regardless of the time. The key difference: ExternalTaskSensor couples upstream and downstream by clock time - if upstream finishes early or late, downstream still waits for the scheduled time. Dataset-triggered scheduling creates data coupling: downstream triggers as soon as the data is ready. Dataset-triggered scheduling is more correct for data pipelines because it reflects actual data availability rather than scheduled run time.
Q4: Explain the fan-out/fan-in pattern and when you would use dynamic task mapping instead of static task definition.
Fan-out/fan-in parallelizes independent work: split a problem into N independent subproblems (fan-out), process all N in parallel, then collect and aggregate the results (fan-in). Static task definition - manually writing one task per subproblem - works when the number of subproblems is known at DAG authoring time and rarely changes. Dynamic task mapping (Airflow 2.3+) is necessary when the number of subproblems is determined at runtime: the number of user segments, the list of files in S3, or the output of a previous task. task.expand(inputs=upstream_list) creates one task instance per element of upstream_list at the time the upstream task completes, without any DAG redefinition.
Q5: How should you handle retries differently for a network timeout versus a data validation failure?
Network timeouts are transient - the root cause (network congestion, brief API unavailability) resolves on its own. Exponential backoff retry is appropriate: wait 1s, then 2s, then 4s, up to 5 retries. Data validation failures are deterministic - if the input data doesn't match the schema today, it won't match tomorrow either without a code or data fix. Retrying a data validation failure wastes compute and potentially writes corrupt output multiple times. The correct response is: fail immediately, route the bad records to a dead letter location for investigation, and alert the on-call engineer. The key implementation pattern: catch exception types explicitly and only wrap retryable exceptions in the retry logic.
Q6: What is blue/green pipeline deployment, and when is it worth the operational complexity?
Blue/green deployment runs two versions of a pipeline simultaneously: the current production version (blue) and the candidate new version (green). Both write to separate output locations. A feature flag controls which version's output is consumed by downstream systems. When the green pipeline's output passes validation against the blue pipeline's output (shadow mode comparison), you flip the feature flag to route production consumers to green. The operational complexity - maintaining two pipelines, a shadow comparison job, and feature flag infrastructure - is justified when: the pipeline change has significant business risk (model retraining, revenue reporting), the change cannot be validated by unit tests alone, or rollback must be instantaneous (flipping a flag versus re-deploying code and running a backfill).
Q7: How do you design a backfill strategy that minimizes cost when a bug affects a known 3-day window?
Use selective backfill: trigger pipeline re-runs only for the three affected dates. This is the cheapest approach because it processes only the affected data rather than the entire history. Prerequisites for this to work: (1) the pipeline must be idempotent so re-running the affected dates overwrites the corrupt output cleanly, (2) downstream assets must be re-materialized after the backfill (or be materialized as a downstream step in the backfill job), and (3) you must know the complete impact window - if the bug might have affected dates outside your known window, selective backfill is dangerous and you should run a broader incremental backfill from before the known window to the present.
