:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Quality Checks demo on the EngineersOfAI Playground - no code required. :::
Testing and Monitoring Pipelines
The Bug That Ran Silently for Three Weeks
A pipeline produced subtly wrong output for three weeks. The output schema was correct. Row counts looked right. No errors were raised. No alerts fired. The model trained on this data performed slightly worse each week, but the degradation was gradual enough that nobody attributed it to the data pipeline until a senior engineer did a manual spot check.
The bug was a timezone off-by-one: the pipeline ran in UTC but the business logic treated timestamps as Pacific Time. Every daily aggregate was shifted by one day. Monday's aggregates contained Sunday's data. The model was learning from features that were systematically misaligned with their labels.
The fix took 20 minutes. Discovering the root cause took two weeks of investigation across multiple teams. And the most painful part: a single output validation check - "does today's output contain today's date?" - would have caught it on day one. The failure was not in the code. It was in the testing and monitoring strategy. There were no checks on whether the output was semantically correct, only implicit checks that it existed.
This is the gap that pipeline testing and monitoring fills. Syntactic correctness - the code ran, the output schema matched - is the easy part. Semantic correctness - the output contains what we expect, the values are in valid ranges, the temporal alignment is correct - requires deliberate validation. Building that validation into the pipeline as an automated check is what separates data platforms that catch bugs in minutes from those that discover them after three weeks of silent data corruption.
Why This Exists - The Unique Challenges of Pipeline Testing
Software testing for application code is a mature discipline. You write unit tests, run them in CI, and deploy with confidence. Pipeline testing is harder for three reasons:
Pipelines produce data, not behavior. When you test a web API, you assert that GET /users/123 returns {"id": 123, "name": "Alice"}. When you test a pipeline, you assert that a table contains the right data - but "right" requires domain knowledge about what the data should look like, not just technical knowledge about the code.
The failure modes are subtle. A misconfigured join produces the correct schema and plausible row counts. A timezone bug produces dates that look valid. A float precision error rounds values to within acceptable margins for individual rows but accumulates into significant errors in aggregates. Application code tends to fail loudly. Data pipelines tend to fail quietly.
Infrastructure is expensive to test. Your pipeline reads from a data warehouse, writes to S3, and calls an external API. Setting up a test environment that faithfully simulates all three without running up cloud costs or requiring a production connection is genuinely difficult.
The testing strategy that works addresses all three challenges: unit tests for transformation logic (no infrastructure needed), integration tests for the full pipeline (lightweight test infrastructure), and output validation checks that run as part of the pipeline itself.
Historical Context - From Manual Spot Checks to Automated Validation
Early 2000s: Manual ETL QA. ETL pipelines were tested by running them and comparing output to expected values by hand. A QA analyst would query the output table, sanity-check the counts, and sign off. This scaled to perhaps 5-10 pipelines. Modern data platforms run hundreds.
2010s: Basic Count Checks. The first generation of automated pipeline monitoring was simple: run a query after the pipeline, check that the row count was above a threshold. Better than nothing, but caught only catastrophic failures.
2017: Great Expectations. Abe Gong and colleagues open-sourced Great Expectations, the first framework purpose-built for data validation. The key insight: define expected properties of your data as code - schemas, value ranges, distributions - and run those expectations automatically as part of your pipeline.
2019-2021: dbt Tests. dbt popularized the concept of writing tests alongside SQL models. not_null, unique, accepted_values, relationships tests became standard practice for the transformation layer.
2022-present: Data Observability Platforms. Monte Carlo, Bigeye, and Acyl built ML-based anomaly detection for data: automatically learning what "normal" looks like for each column and alerting when values drift outside normal ranges. These complement hand-written tests for the long tail of data quality issues.
The Three Levels of Pipeline Testing
Level 1 - Unit Tests (Transformation Logic)
Unit tests test the transformation functions themselves, completely independent of any infrastructure. They are fast (milliseconds per test), require no database or S3 access, and should run in CI on every commit.
# my_pipeline/transforms.py
import pandas as pd
from datetime import date
def compute_daily_aggregates(events: pd.DataFrame, report_date: date) -> pd.DataFrame:
"""
Compute daily aggregate metrics per user.
Args:
events: Raw events DataFrame with columns: user_id, event_type,
event_date, amount, session_id
report_date: The date this aggregate is for (used for partition key)
Returns:
DataFrame with one row per user, columns: user_id, agg_date,
event_count, session_count, total_spend, avg_spend_per_event
"""
day_events = events[events["event_date"] == str(report_date)].copy()
agg = (
day_events
.groupby("user_id")
.agg(
event_count=("event_id", "count"),
session_count=("session_id", "nunique"),
total_spend=("amount", "sum"),
)
.reset_index()
)
agg["agg_date"] = str(report_date)
agg["avg_spend_per_event"] = agg["total_spend"] / agg["event_count"]
return agg
# tests/test_transforms.py
import pytest
import pandas as pd
from datetime import date
from my_pipeline.transforms import compute_daily_aggregates
def make_test_events() -> pd.DataFrame:
return pd.DataFrame({
"event_id": range(10),
"user_id": ["alice", "alice", "alice", "bob", "bob",
"carol", "carol", "carol", "carol", "alice"],
"session_id": ["s1", "s1", "s2", "s3", "s3",
"s4", "s4", "s5", "s5", "s2"],
"event_type": ["click"] * 10,
"event_date": ["2024-01-15"] * 10,
"amount": [10.0, 5.0, 20.0, 15.0, 10.0, 8.0, 12.0, 6.0, 14.0, 30.0],
})
def test_row_count_equals_unique_users():
events = make_test_events()
result = compute_daily_aggregates(events, date(2024, 1, 15))
assert len(result) == 3 # alice, bob, carol
def test_event_counts_are_correct():
events = make_test_events()
result = compute_daily_aggregates(events, date(2024, 1, 15))
alice = result[result["user_id"] == "alice"].iloc[0]
assert alice["event_count"] == 4 # alice has 4 events
def test_session_count_counts_unique_sessions():
events = make_test_events()
result = compute_daily_aggregates(events, date(2024, 1, 15))
alice = result[result["user_id"] == "alice"].iloc[0]
assert alice["session_count"] == 2 # s1 and s2 (not s2 counted twice)
def test_total_spend_is_sum_of_amounts():
events = make_test_events()
result = compute_daily_aggregates(events, date(2024, 1, 15))
carol = result[result["user_id"] == "carol"].iloc[0]
assert carol["total_spend"] == pytest.approx(40.0) # 8+12+6+14
def test_avg_spend_is_total_divided_by_event_count():
events = make_test_events()
result = compute_daily_aggregates(events, date(2024, 1, 15))
carol = result[result["user_id"] == "carol"].iloc[0]
assert carol["avg_spend_per_event"] == pytest.approx(10.0) # 40 / 4
def test_agg_date_is_set_correctly():
events = make_test_events()
result = compute_daily_aggregates(events, date(2024, 1, 15))
assert (result["agg_date"] == "2024-01-15").all()
def test_filters_to_report_date_only():
"""Aggregates should only include events on report_date, not other dates."""
events = make_test_events()
# Add events for a different date
extra = events.copy()
extra["event_date"] = "2024-01-16"
all_events = pd.concat([events, extra], ignore_index=True)
result = compute_daily_aggregates(all_events, date(2024, 1, 15))
# Row counts should be the same as without the extra events
assert len(result) == 3
def test_empty_input_returns_empty_output():
empty = pd.DataFrame(columns=["event_id", "user_id", "session_id",
"event_type", "event_date", "amount"])
result = compute_daily_aggregates(empty, date(2024, 1, 15))
assert len(result) == 0
Level 2 - Integration Tests (Full Pipeline)
Integration tests run the complete pipeline against a lightweight test database with synthetic data and verify the output is correct end-to-end.
# tests/conftest.py
import pytest
import sqlalchemy as sa
import subprocess
import time
@pytest.fixture(scope="session")
def test_postgres():
"""
Spin up a PostgreSQL container for testing.
Scope is 'session' so the container is shared across all tests.
"""
container_id = subprocess.check_output([
"docker", "run", "-d",
"--name", "test-postgres",
"-e", "POSTGRES_PASSWORD=testpass",
"-e", "POSTGRES_DB=testdb",
"-p", "5434:5432",
"postgres:16-alpine"
]).decode().strip()
time.sleep(3) # Wait for PostgreSQL to be ready
engine = sa.create_engine(
"postgresql://postgres:testpass@localhost:5434/testdb"
)
yield engine
subprocess.run(["docker", "stop", "test-postgres"])
subprocess.run(["docker", "rm", "test-postgres"])
@pytest.fixture
def seeded_database(test_postgres):
"""Seed the test database with synthetic data for each test."""
import pandas as pd
import numpy as np
from datetime import date, timedelta
np.random.seed(42)
n_users = 50
n_days = 7
events_per_user_day = 5
rows = []
for user_idx in range(n_users):
user_id = f"user_{user_idx:04d}"
for day_offset in range(n_days):
event_date = date(2024, 1, 15) - timedelta(days=day_offset)
for ev_idx in range(events_per_user_day):
rows.append({
"event_id": f"ev_{user_idx}_{day_offset}_{ev_idx}",
"user_id": user_id,
"session_id": f"sess_{user_idx}_{day_offset}",
"event_type": np.random.choice(["click", "view", "purchase"]),
"event_date": str(event_date),
"amount": round(np.random.uniform(1, 100), 2),
})
events_df = pd.DataFrame(rows)
events_df.to_sql("raw_events", test_postgres, if_exists="replace", index=False)
yield test_postgres
# Teardown: drop all tables created during the test
with test_postgres.begin() as conn:
conn.execute(sa.text("DROP TABLE IF EXISTS raw_events, daily_aggregates"))
# tests/test_pipeline_integration.py
import pytest
import sqlalchemy as sa
from datetime import date
from my_pipeline.pipeline import run_daily_pipeline
def test_pipeline_creates_one_row_per_user(seeded_database):
run_daily_pipeline(seeded_database, execution_date=date(2024, 1, 15))
with seeded_database.connect() as conn:
row_count = conn.execute(sa.text(
"SELECT COUNT(*) FROM daily_aggregates WHERE agg_date = '2024-01-15'"
)).scalar()
assert row_count == 50 # 50 users in synthetic data
def test_pipeline_is_idempotent(seeded_database):
run_daily_pipeline(seeded_database, execution_date=date(2024, 1, 15))
count_after_first = seeded_database.execute(sa.text(
"SELECT COUNT(*) FROM daily_aggregates WHERE agg_date = '2024-01-15'"
)).scalar()
run_daily_pipeline(seeded_database, execution_date=date(2024, 1, 15))
count_after_second = seeded_database.execute(sa.text(
"SELECT COUNT(*) FROM daily_aggregates WHERE agg_date = '2024-01-15'"
)).scalar()
assert count_after_first == count_after_second, (
f"Not idempotent: {count_after_first} vs {count_after_second} rows"
)
def test_event_counts_sum_to_total_events(seeded_database):
run_daily_pipeline(seeded_database, execution_date=date(2024, 1, 15))
with seeded_database.connect() as conn:
aggregate_total = conn.execute(sa.text(
"SELECT SUM(event_count) FROM daily_aggregates WHERE agg_date = '2024-01-15'"
)).scalar()
raw_total = conn.execute(sa.text(
"SELECT COUNT(*) FROM raw_events WHERE event_date = '2024-01-15'"
)).scalar()
assert aggregate_total == raw_total, (
f"Aggregate event_count sum ({aggregate_total}) doesn't match "
f"raw event count ({raw_total})"
)
Output Validation With Great Expectations
Great Expectations (GE) is the standard framework for defining and enforcing expectations on pipeline output. A checkpoint runs a suite of expectations against a dataset after a pipeline task completes, and fails the pipeline if any expectation is violated.
Installing and setting up a basic validation suite:
# my_pipeline/validation.py
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.checkpoint import SimpleCheckpoint
import pandas as pd
def validate_daily_aggregates(df: pd.DataFrame, execution_date: str) -> bool:
"""
Validate the daily_aggregates output against a pre-defined expectation suite.
Returns True if all expectations pass, raises an exception if any fail.
"""
context = gx.get_context()
# Create a batch from the DataFrame
batch_request = RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="daily_aggregates",
runtime_parameters={"batch_data": df},
batch_identifiers={"execution_date": execution_date},
)
results = context.run_checkpoint(
checkpoint_name="daily_aggregates_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "daily_aggregates.warning",
}
],
)
if not results.success:
failed = [
r for r in results.run_results.values()
if not r["validation_result"]["success"]
]
raise ValueError(
f"Data validation failed with {len(failed)} failed expectations. "
f"Check the GE Data Docs for details."
)
return True
Defining the expectation suite programmatically:
import great_expectations as gx
def create_daily_aggregates_suite():
context = gx.get_context()
suite = context.add_or_update_expectation_suite("daily_aggregates.warning")
validator = context.get_validator(
batch_request=...,
expectation_suite_name="daily_aggregates.warning",
)
# Schema checks
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("agg_date")
validator.expect_column_to_exist("event_count")
validator.expect_column_to_exist("total_spend")
# Type checks
validator.expect_column_values_to_be_of_type("event_count", "int64")
validator.expect_column_values_to_be_of_type("total_spend", "float64")
# Null checks
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_not_be_null("agg_date")
# Value range checks
validator.expect_column_values_to_be_between("event_count", min_value=1)
validator.expect_column_values_to_be_between("total_spend", min_value=0.0)
validator.expect_column_values_to_be_between("avg_spend_per_event", min_value=0.0)
# Business logic checks
validator.expect_column_values_to_match_regex(
"agg_date", r"^\d{4}-\d{2}-\d{2}$"
)
validator.expect_column_values_to_be_unique("user_id") # One row per user per day
# Completeness check - expect at least 80% of yesterday's user count
# (Dynamic threshold - set in run config, not hardcoded here)
validator.expect_table_row_count_to_be_between(
min_value={"$PARAMETER": "min_expected_row_count"},
max_value={"$PARAMETER": "max_expected_row_count"},
)
validator.save_expectation_suite(discard_failed_expectations=False)
Integrating GE validation into an Airflow task:
from airflow.operators.python import PythonOperator
def validate_features_task(**context):
import pandas as pd
import sqlalchemy as sa
execution_date = context["ds"]
engine = sa.create_engine("postgresql://prod-db/warehouse")
df = pd.read_sql(
f"SELECT * FROM daily_aggregates WHERE agg_date = '{execution_date}'",
engine
)
# Dynamic thresholds based on yesterday's count
yesterday = pd.Timestamp(execution_date) - pd.Timedelta(days=1)
yesterday_count = pd.read_sql(
f"SELECT COUNT(*) FROM daily_aggregates WHERE agg_date = '{yesterday.date()}'",
engine
).iloc[0, 0]
validate_daily_aggregates(
df,
execution_date=execution_date,
run_parameters={
"min_expected_row_count": int(yesterday_count * 0.8),
"max_expected_row_count": int(yesterday_count * 1.2),
}
)
validate_task = PythonOperator(
task_id="validate_daily_aggregates",
python_callable=validate_features_task,
provide_context=True,
)
Pipeline Monitoring - Detecting Anomalies Before They Become Incidents
Task Duration Trending
A task that suddenly takes 3 hours instead of 45 minutes is a signal of a data volume change, a slow query, or infrastructure degradation. Alerting on duration anomalies catches these before they cause SLA misses.
import sqlalchemy as sa
import pandas as pd
from datetime import datetime, timedelta
def check_task_duration_anomaly(
engine,
dag_id: str,
task_id: str,
lookback_days: int = 30,
threshold_multiplier: float = 2.0,
) -> bool:
"""
Alert if today's task duration is more than threshold_multiplier
times the p95 historical duration.
"""
historical = pd.read_sql(
"""
SELECT duration_seconds
FROM airflow_task_metrics
WHERE dag_id = %(dag_id)s
AND task_id = %(task_id)s
AND run_date >= %(cutoff)s
ORDER BY run_date DESC
""",
engine,
params={
"dag_id": dag_id,
"task_id": task_id,
"cutoff": (datetime.now() - timedelta(days=lookback_days)).date(),
}
)
if len(historical) < 7:
return False # Not enough history to make a meaningful comparison
p95_duration = historical["duration_seconds"].quantile(0.95)
today_duration = get_today_task_duration(engine, dag_id, task_id)
if today_duration > p95_duration * threshold_multiplier:
send_slack_alert(
f":clock1: *Duration Anomaly* in `{dag_id}.{task_id}`\n"
f"Today: {today_duration/60:.1f} min | "
f"P95 historical: {p95_duration/60:.1f} min | "
f"Ratio: {today_duration/p95_duration:.1f}x"
)
return True
return False
Row Count Monitoring
Row count drops are the most reliable early signal of data pipeline failures. A drop of 20% or more relative to yesterday almost always indicates an upstream data issue or a broken transformation.
def check_row_count_anomaly(
engine,
table: str,
date_column: str,
report_date: str,
lookback_days: int = 14,
drop_threshold: float = 0.20,
) -> dict:
"""
Compare today's row count to the N-day average.
Alert if the drop exceeds the threshold.
"""
today_count = engine.execute(sa.text(f"""
SELECT COUNT(*) FROM {table}
WHERE {date_column} = :report_date
"""), {"report_date": report_date}).scalar()
historical_avg = engine.execute(sa.text(f"""
SELECT AVG(daily_count) FROM (
SELECT {date_column}, COUNT(*) as daily_count
FROM {table}
WHERE {date_column} >= CURRENT_DATE - INTERVAL '{lookback_days} days'
AND {date_column} < :report_date
GROUP BY {date_column}
) daily_counts
"""), {"report_date": report_date}).scalar()
if historical_avg and historical_avg > 0:
drop_rate = (historical_avg - today_count) / historical_avg
is_anomaly = drop_rate > drop_threshold
return {
"table": table,
"report_date": report_date,
"today_count": today_count,
"historical_avg": historical_avg,
"drop_rate": drop_rate,
"is_anomaly": is_anomaly,
}
return {"is_anomaly": False}
Statistical Checks - Value Distribution Monitoring
Column-level statistical monitoring catches subtle data quality issues that row count checks miss.
import pandas as pd
import numpy as np
from scipy import stats
def check_column_distribution_drift(
current_df: pd.DataFrame,
reference_df: pd.DataFrame,
numeric_columns: list[str],
p_value_threshold: float = 0.01,
) -> list[dict]:
"""
Use the Kolmogorov-Smirnov test to detect distribution drift
in numeric columns between the current and reference datasets.
The KS test is distribution-free: it makes no assumptions about
the underlying distribution shape. The null hypothesis is that the
two samples come from the same distribution.
"""
alerts = []
for col in numeric_columns:
if col not in current_df.columns or col not in reference_df.columns:
continue
current_vals = current_df[col].dropna().values
reference_vals = reference_df[col].dropna().values
if len(current_vals) < 30 or len(reference_vals) < 30:
continue # Not enough data for a meaningful test
statistic, p_value = stats.ks_2samp(current_vals, reference_vals)
if p_value < p_value_threshold:
alerts.append({
"column": col,
"ks_statistic": round(statistic, 4),
"p_value": round(p_value, 6),
"current_mean": round(current_vals.mean(), 4),
"reference_mean": round(reference_vals.mean(), 4),
"current_std": round(current_vals.std(), 4),
"reference_std": round(reference_vals.std(), 4),
})
return alerts
Alerting Stack - Callbacks, Slack, and PagerDuty
Airflow Callbacks
Airflow supports three callback hooks on every task: on_failure_callback, on_retry_callback, and on_success_callback.
import requests
from airflow.models import TaskInstance
def send_slack_failure_alert(context: dict) -> None:
"""Send a Slack alert when a task fails."""
ti: TaskInstance = context["task_instance"]
dag_id = context["dag"].dag_id
task_id = ti.task_id
execution_date = context["execution_date"]
log_url = ti.log_url
message = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": ":red_circle: Pipeline Task Failed",
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*DAG:*\n`{dag_id}`"},
{"type": "mrkdwn", "text": f"*Task:*\n`{task_id}`"},
{"type": "mrkdwn", "text": f"*Execution Date:*\n{execution_date.date()}"},
{"type": "mrkdwn", "text": f"*Log:*\n<{log_url}|View Log>"},
]
},
]
}
requests.post(
url="https://hooks.slack.com/services/T00000/B00000/xxxxx",
json=message,
)
def send_pagerduty_alert(context: dict) -> None:
"""Trigger a PagerDuty incident for P1 pipeline failures."""
ti: TaskInstance = context["task_instance"]
dag_id = context["dag"].dag_id
# Only page for critical pipelines
critical_dags = {"feature_engineering", "model_training", "revenue_reporting"}
if dag_id not in critical_dags:
return
requests.post(
url="https://events.pagerduty.com/v2/enqueue",
headers={"Authorization": f"Token token={PAGERDUTY_ROUTING_KEY}"},
json={
"routing_key": PAGERDUTY_ROUTING_KEY,
"event_action": "trigger",
"payload": {
"summary": f"CRITICAL: {dag_id}.{ti.task_id} failed",
"severity": "critical",
"source": "airflow",
"custom_details": {
"dag_id": dag_id,
"task_id": ti.task_id,
"execution_date": str(context["execution_date"]),
"log_url": ti.log_url,
}
}
}
)
# Apply callbacks to all critical tasks
default_args = {
"on_failure_callback": send_slack_failure_alert,
"on_retry_callback": send_slack_failure_alert,
}
with DAG("feature_engineering", default_args=default_args, ...) as dag:
critical_task = PythonOperator(
task_id="compute_features",
python_callable=compute_features,
on_failure_callback=send_pagerduty_alert, # Override for this specific task
)
Slack Rich Alert with Context
def send_rich_failure_alert(context: dict) -> None:
"""Send a detailed Slack alert with enough context to begin debugging immediately."""
ti: TaskInstance = context["task_instance"]
exception = context.get("exception")
error_message = str(exception)[:500] if exception else "Unknown error"
payload = {
"text": f":x: Pipeline failure: `{context['dag'].dag_id}`",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f":x: *{context['dag'].dag_id}* failed\n"
f"Task: `{ti.task_id}` | "
f"Attempt: {ti.try_number}/{ti.max_tries + 1}\n"
f"<{ti.log_url}|View Log>"
)
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{error_message}```"
}
},
]
}
requests.post(
url=SLACK_WEBHOOK_URL,
json=payload,
)
Dead Letter Queues - Quarantine Bad Records
A single malformed record should not block an entire pipeline run. Route bad records to a quarantine table for investigation rather than letting them fail the task.
import pandas as pd
import sqlalchemy as sa
from datetime import datetime
class DeadLetterQueue:
"""
Quarantine records that fail validation without blocking the pipeline.
Dead-letter records can be investigated and reprocessed manually.
"""
def __init__(self, engine, dlq_table: str = "dead_letter_queue"):
self.engine = engine
self.dlq_table = dlq_table
self._ensure_table_exists()
def _ensure_table_exists(self):
with self.engine.begin() as conn:
conn.execute(sa.text(f"""
CREATE TABLE IF NOT EXISTS {self.dlq_table} (
id SERIAL PRIMARY KEY,
pipeline_name TEXT NOT NULL,
execution_date DATE NOT NULL,
record_json JSONB NOT NULL,
error_message TEXT NOT NULL,
quarantined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""))
def quarantine(
self,
records: pd.DataFrame,
pipeline_name: str,
execution_date: str,
error_message: str,
) -> int:
"""
Route bad records to the DLQ. Returns the number of quarantined records.
"""
if records.empty:
return 0
dlq_rows = pd.DataFrame({
"pipeline_name": pipeline_name,
"execution_date": execution_date,
"record_json": [row.to_json() for _, row in records.iterrows()],
"error_message": error_message,
})
dlq_rows.to_sql(
self.dlq_table, self.engine,
if_exists="append", index=False
)
return len(records)
def process_events_with_dlq(events: pd.DataFrame, engine, execution_date: str):
"""
Process events, routing invalid records to the DLQ rather than failing.
"""
dlq = DeadLetterQueue(engine)
# Validate input records
valid_mask = (
events["user_id"].notna() &
events["amount"].between(0, 100_000) &
events["event_date"].notna()
)
invalid_records = events[~valid_mask]
valid_records = events[valid_mask]
if len(invalid_records) > 0:
quarantined = dlq.quarantine(
records=invalid_records,
pipeline_name="feature_engineering",
execution_date=execution_date,
error_message="Failed null/range validation on user_id, amount, or event_date",
)
print(f"Quarantined {quarantined} invalid records. Processing {len(valid_records)} valid records.")
# Process only valid records
return compute_features(valid_records)
Observability With OpenTelemetry
Instrument pipeline tasks to trace the full execution path, identify bottlenecks, and correlate pipeline health with model degradation.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
import functools
import time
# Configure the tracer
resource = Resource(attributes={SERVICE_NAME: "data-pipeline"})
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("pipeline.tracer")
def trace_pipeline_task(task_name: str):
"""Decorator that wraps a pipeline task in an OpenTelemetry span."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(task_name) as span:
span.set_attribute("pipeline.task_name", task_name)
span.set_attribute("pipeline.execution_date", kwargs.get("execution_date", ""))
start = time.monotonic()
try:
result = func(*args, **kwargs)
duration = time.monotonic() - start
span.set_attribute("pipeline.duration_seconds", round(duration, 2))
span.set_attribute("pipeline.status", "success")
return result
except Exception as e:
duration = time.monotonic() - start
span.set_attribute("pipeline.duration_seconds", round(duration, 2))
span.set_attribute("pipeline.status", "failure")
span.set_attribute("pipeline.error", str(e))
span.record_exception(e)
raise
return wrapper
return decorator
@trace_pipeline_task("compute_user_features")
def compute_user_features_traced(events: pd.DataFrame, execution_date: str) -> pd.DataFrame:
current_span = trace.get_current_span()
features = compute_daily_aggregates(events, execution_date)
# Emit custom metrics as span attributes
current_span.set_attribute("pipeline.input_row_count", len(events))
current_span.set_attribute("pipeline.output_row_count", len(features))
current_span.set_attribute("pipeline.unique_users", features["user_id"].nunique())
return features
Full Production Monitoring Setup
# my_pipeline/monitoring.py
"""
Complete production monitoring setup.
Import this module in your DAG definitions.
"""
import os
import requests
import json
from datetime import datetime
from airflow.models import TaskInstance, DAG
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")
PAGERDUTY_ROUTING_KEY = os.environ.get("PAGERDUTY_ROUTING_KEY")
CRITICAL_DAGS = {"feature_engineering", "model_training", "revenue_reporting"}
def on_task_failure(context: dict) -> None:
ti: TaskInstance = context["task_instance"]
dag: DAG = context["dag"]
exception = context.get("exception")
# Always send Slack
_send_slack(
level=":red_circle:",
title=f"Task Failed: {dag.dag_id}.{ti.task_id}",
details={
"DAG": dag.dag_id,
"Task": ti.task_id,
"Date": str(context["execution_date"].date()),
"Attempt": f"{ti.try_number}/{ti.max_tries + 1}",
"Error": str(exception)[:200] if exception else "Unknown",
"Log": ti.log_url,
}
)
# Page on-call for critical pipelines on final attempt
if dag.dag_id in CRITICAL_DAGS and ti.try_number >= ti.max_tries:
_send_pagerduty(
summary=f"CRITICAL: {dag.dag_id}.{ti.task_id} failed all retries",
dag_id=dag.dag_id,
task_id=ti.task_id,
execution_date=str(context["execution_date"].date()),
)
def on_task_retry(context: dict) -> None:
ti: TaskInstance = context["task_instance"]
dag: DAG = context["dag"]
_send_slack(
level=":warning:",
title=f"Task Retry: {dag.dag_id}.{ti.task_id}",
details={
"DAG": dag.dag_id,
"Task": ti.task_id,
"Attempt": f"{ti.try_number}/{ti.max_tries + 1}",
"Log": ti.log_url,
}
)
def on_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis) -> None:
_send_slack(
level=":clock10:",
title=f"SLA Miss: {dag.dag_id}",
details={
"DAG": dag.dag_id,
"Missed Tasks": ", ".join([sla.task_id for sla in slas]),
"Expected By": str((slas[0].execution_date + slas[0].timedelta).strftime("%H:%M UTC")),
}
)
def _send_slack(level: str, title: str, details: dict) -> None:
if not SLACK_WEBHOOK_URL:
return
field_text = "\n".join(f"*{k}:* {v}" for k, v in details.items())
requests.post(
SLACK_WEBHOOK_URL,
json={"text": f"{level} {title}\n{field_text}"},
timeout=5,
)
def _send_pagerduty(summary: str, dag_id: str, task_id: str, execution_date: str) -> None:
if not PAGERDUTY_ROUTING_KEY:
return
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": PAGERDUTY_ROUTING_KEY,
"event_action": "trigger",
"payload": {
"summary": summary,
"severity": "critical",
"source": "airflow",
"custom_details": {
"dag_id": dag_id,
"task_id": task_id,
"execution_date": execution_date,
}
}
},
timeout=5,
)
Mermaid Diagram - Pipeline Monitoring Stack
Common Mistakes
:::danger No Output Validation - Only Checking That the Task Ran The most common pipeline monitoring mistake: checking that a task completed successfully without checking what it produced. A task that writes empty output, or output with a schema mismatch, or output with a systematic off-by-one error can succeed with exit code 0. Always add output validation as a separate task that runs immediately after the write task. :::
:::danger Alerting on Every Retry Sending a Slack alert for every retry floods the channel and causes alert fatigue. Teams start ignoring the alerts. Reserve Slack alerts for final failures and SLA misses. Use a separate low-noise channel or dashboard for retry events. :::
:::warning Row Count Checks Without Historical Baseline "Row count must be greater than 0" is not a meaningful check. A table with 1 row instead of 1,000,000 passes this check. Compare today's row count to the N-day average and alert on significant deviations - 20% drop is a reasonable default threshold for most pipelines. :::
:::warning Testing Only the Happy Path Unit tests that only test correct input miss the bugs that actually happen in production: null values where you expect non-null, negative amounts, future dates, unicode characters in ID fields, duplicate rows from the source system. Write explicit tests for the edge cases that your data will eventually surface. :::
Interview Questions and Answers
Q1: What are the three levels of pipeline testing, and what does each level test?
Level 1 is unit testing: test the transformation functions themselves in isolation, with no database or external infrastructure. These tests run in milliseconds and should run on every commit. Level 2 is integration testing: run the complete pipeline against a lightweight test database seeded with synthetic data. These tests verify that the full pipeline produces the correct end-to-end output, including correct joins, correct aggregations, and correct data persistence. Level 3 is end-to-end validation: output correctness checks that run as part of the production pipeline itself - schema checks, value range checks, row count comparisons against historical baselines, and business logic assertions. Level 3 catches the class of bugs that unit and integration tests miss because they require real production data to surface.
Q2: What is Great Expectations and how does it integrate with Airflow?
Great Expectations is a data validation framework that lets you define "expectations" - formal assertions about the properties your data should have - and run those expectations automatically as a validation step in your pipeline. You define an expectation suite (a collection of expectations like "column X must not be null", "column Y values must be between 0 and 1", "row count must be within 20% of yesterday's count"), configure a checkpoint that loads a batch of data and runs the suite against it, and call the checkpoint from an Airflow PythonOperator task. If any expectation fails, the operator raises an exception, failing the task and triggering your normal failure alerting path.
Q3: How do you design a row count monitoring system that catches real issues without generating false positives?
Compare today's row count to the rolling N-day average rather than a static threshold. The rolling average adapts to legitimate seasonal patterns - Monday might have 2x the row count of Sunday, and a static threshold would either miss Sunday issues or false-alarm on Mondays. Use a dynamic threshold: alert if today's count drops more than 20% below the N-day average (14 days is typical). Exclude outlier historical days from the baseline calculation to prevent a single anomalous day from skewing the expected range. Add a minimum absolute threshold (e.g., at least 100 rows) to catch complete failures even when the percentage-based threshold is not breached.
Q4: What is a dead letter queue pattern in data pipelines, and when should you use it?
A dead letter queue (DLQ) is a quarantine location for records that fail validation. Instead of failing the entire pipeline task when a subset of records is malformed, you route the invalid records to the DLQ and continue processing the valid records. The DLQ records include the original data, the error message, the pipeline name, and the execution date - enough information to investigate and potentially reprocess the records manually. Use DLQs when: the input data comes from an external source that occasionally sends malformed records, you cannot block pipeline execution on a small percentage of bad data, and the business impact of quarantining individual records is lower than the impact of failing the entire pipeline batch.
Q5: How do you instrument a pipeline task with OpenTelemetry, and why is this useful beyond what Airflow already provides?
OpenTelemetry spans wrap pipeline task execution and emit structured telemetry (duration, input/output row counts, custom business metrics) to a distributed tracing backend like Honeycomb or Jaeger. Airflow's built-in logging captures task success/failure and stdout, but it does not correlate pipeline execution with downstream system behavior. OpenTelemetry traces allow you to correlate: "the compute_user_features task ran 3x slower than usual on January 15th" with "the model serving p95 latency increased by 40% starting January 15th." Distributed traces create a causal link between pipeline health and downstream ML system health that is impossible to establish from logs alone.
Q6: What is the difference between on_failure_callback and on_retry_callback in Airflow, and how should you use them differently?
on_retry_callback fires on each failed attempt that will be retried. on_failure_callback fires on the final failure - when all retries are exhausted. Use on_retry_callback to send a low-priority notification (a Slack message to a data-alerts channel) to surface awareness that a task is retrying. Use on_failure_callback for the high-priority response: a Slack alert to the #data-incidents channel, and for critical pipelines, a PagerDuty page to the on-call engineer. This escalation pattern prevents alert fatigue from transient retries while ensuring final failures receive immediate human attention.
Q7: How would you build a shadow mode pipeline validation, and what does it prove?
Shadow mode runs two versions of the pipeline simultaneously - the current production version and the candidate new version - writing to separate output tables. A comparison job then joins the two outputs on the natural key and reports discrepancies: rows present in one but not the other, and rows with different values for key columns. Shadow mode proves behavioral equivalence between pipeline versions: not just that the new code runs without errors, but that it produces the same data as the current production code. This is valuable when the pipeline change involves a significant refactor, a new computation engine, or a change in data source, where unit tests cannot verify end-to-end equivalence with real production data.
