:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Quality Checks demo on the EngineersOfAI Playground - no code required. :::
Data SLAs and Incident Response
The Recommendation System That Went Silent
It was 3:47am on a Wednesday when the partition failed. Nobody was watching. The Airflow DAG that populated the recommendation feature store ran on an hourly schedule, and at 3am it had silently succeeded - the task turned green, the DAG marked itself complete, and the monitoring dashboard showed a healthy run.
Except no new data had been written. The downstream task had executed its SQL query successfully. The query returned zero rows - the upstream partition had been corrupted during a compaction job. Zero rows is a valid result in Airflow. The task didn't know to check whether zero rows was the expected outcome. It wrote zero rows to the feature store, marked itself complete, and went to sleep.
By 4am, the recommendation system was serving 6-hour-old features. By 5am, user sessions were showing stale content - not dramatically wrong, just subtly off, the kind of off that a user feels but can't articulate. By 7am, the engagement drop was large enough to show up in the hourly metrics. By 8am, someone noticed. The incident had been running for four hours before any human was aware.
The post-mortem revealed the full scope of what was missing: no SLA defined for feature freshness. No alerting on row count against expected counts. No runbook for the team member who eventually got paged. No backfill playbook. No stakeholder communication template. The pipeline had been treated as infrastructure - "if the DAG is green, we're fine." The DAG was green. They were not fine.
This is what happens when you deploy data pipelines without data SLAs. This lesson is about building the systems that would have caught the 3am failure before users noticed.
Why This Exists: The Invisible Infrastructure Problem
Software engineers have decades of practice with service SLAs. Web services have uptime monitors, latency percentile alerts, error rate dashboards, and on-call rotations. If a service returns 500 errors, PagerDuty fires within minutes.
Data pipelines have none of this by default. A pipeline that runs successfully but produces no output looks identical to a pipeline that runs successfully and produces correct output. A feature that is 6 hours stale looks identical to a feature that is fresh - both are present, both are within range, both pass null checks. The stale feature simply contains yesterday's truth in today's request.
This invisibility is dangerous precisely because the symptoms are downstream and delayed. A stale feature doesn't cause an immediate error - it causes slightly degraded model performance, slightly off recommendations, slightly wrong ranking. The degradation is gradual and the cause is not obvious. By the time the business impact is undeniable, the incident has been running for hours.
Data SLAs are the mechanism by which you make invisible infrastructure problems visible. They define what "correct operation" means in measurable terms, create automated detection of deviation, and establish the processes for response when things go wrong.
Historical Context: From Tribal Knowledge to Formal SLAs
The first generation of data infrastructure - Hadoop clusters, MapReduce jobs, hand-rolled ETL scripts - was managed primarily by tribal knowledge. The data engineer who wrote the pipeline knew what "normal" looked like. When something was wrong, they knew because they had context. This scaled to teams of five. It did not scale to teams of fifty.
The second generation - Airflow, dbt, Spark - automated execution but not observability. You could see whether a pipeline ran. You could not easily see whether the pipeline produced the right amount of correct data. Monitoring was bolted on after the fact, usually after a production incident revealed the need.
The third generation - modern data observability platforms like Monte Carlo (founded 2019), Acceldata (2018), and Atlan (2020) - emerged specifically to fill this gap. They coined the term "data observability" by analogy to software observability (metrics, logs, traces). For data systems, observability means: freshness, volume, schema, distribution, and lineage.
But even the best observability platform requires a foundation of formalized SLAs. The platform monitors; the SLA defines what "good" looks like. Without SLAs, you have dashboards full of metrics with no thresholds, no alerts, and no defined response. The monitoring is theater.
What a Data SLA Is
A data SLA is a formal, measurable commitment about the behavior of a data pipeline or dataset - with defined consequences for violations.
The four dimensions of a data SLA:
Freshness: The data must be no older than X. "The recommendation feature store must be refreshed within 90 minutes of the hour boundary." Freshness SLAs are violated when the most recent data point is older than the threshold.
Completeness: At least Y% of expected entities or rows must be present. "At least 95% of active users must have a feature record in the store." Completeness SLAs catch the silent-zero-rows failure mode - the pipeline ran, but the data isn't there.
Availability: The pipeline must succeed at least Z% of the time over a rolling window. "The feature pipeline must have a weekly success rate of 99%." Availability SLAs catch chronic flakiness that individual run monitoring misses.
Latency: Data must be available by time T for downstream consumers. "Transaction features must be available in the online store by 8am UTC for the morning risk batch." Latency SLAs are absolute deadlines, not relative ones.
The consequence clause is what separates an SLA from a metric. "If the freshness SLA is violated, the data engineering on-call is paged within 5 minutes and stakeholders are notified within 15 minutes." Without consequences, you have a target, not a commitment.
:::note SLA vs SLO vs SLI Borrowing from site reliability engineering: an SLI (Service Level Indicator) is the metric you measure. An SLO (Service Level Objective) is the target for that metric. An SLA (Service Level Agreement) is the SLO plus consequences - what happens when the objective is missed. In practice, data teams often use "SLA" to mean SLO. The distinction matters when the consequences involve legal or contractual obligations (common in data vendor relationships). :::
Implementing SLA Monitoring: Python + Airflow
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, Dict, List, Optional
import requests
import sqlalchemy as sa
logger = logging.getLogger(__name__)
class SLAStatus(Enum):
OK = "ok"
WARNING = "warning" # Approaching SLA, not yet violated
VIOLATED = "violated" # SLA has been breached
@dataclass
class SLADefinition:
name: str
pipeline_id: str
# Freshness
max_age_minutes: Optional[int] = None
warning_age_minutes: Optional[int] = None
# Completeness
min_row_count: Optional[int] = None
min_entity_coverage_pct: Optional[float] = None
# Availability (rolling window)
min_weekly_success_rate: Optional[float] = None
# Latency (absolute deadline)
daily_deadline_utc: Optional[str] = None # e.g. "08:00"
# Notification
slack_channel: Optional[str] = None
pagerduty_service_key: Optional[str] = None
stakeholder_emails: Optional[List[str]] = None
@dataclass
class SLACheckResult:
sla_name: str
dimension: str
status: SLAStatus
current_value: float
threshold: float
message: str
checked_at: datetime
class DataSLAMonitor:
"""
Monitors data pipeline SLAs across freshness, completeness,
availability, and latency dimensions.
Usage:
monitor = DataSLAMonitor(engine=db_engine, slack_token=token)
sla = SLADefinition(
name="recommendation_features",
pipeline_id="rec_feature_pipeline",
max_age_minutes=90,
warning_age_minutes=60,
min_entity_coverage_pct=0.95,
slack_channel="#data-oncall",
)
results = monitor.check_all(sla, table="rec_features")
monitor.handle_results(results, sla)
"""
def __init__(
self,
engine: sa.engine.Engine,
slack_token: Optional[str] = None,
pagerduty_api_key: Optional[str] = None,
):
self.engine = engine
self.slack_token = slack_token
self.pagerduty_api_key = pagerduty_api_key
def check_freshness(
self,
sla: SLADefinition,
table: str,
timestamp_col: str = "updated_at",
) -> SLACheckResult:
"""Check whether the most recent record is within the SLA freshness window."""
query = sa.text(
f"SELECT MAX({timestamp_col}) as latest FROM {table}"
)
with self.engine.connect() as conn:
result = conn.execute(query).fetchone()
latest_ts = result["latest"] if result and result["latest"] else None
if latest_ts is None:
return SLACheckResult(
sla_name=sla.name,
dimension="freshness",
status=SLAStatus.VIOLATED,
current_value=float("inf"),
threshold=float(sla.max_age_minutes or 0),
message=f"No data found in {table} - table may be empty",
checked_at=datetime.utcnow(),
)
age_minutes = (datetime.utcnow() - latest_ts).total_seconds() / 60
if sla.max_age_minutes and age_minutes > sla.max_age_minutes:
status = SLAStatus.VIOLATED
threshold = float(sla.max_age_minutes)
elif sla.warning_age_minutes and age_minutes > sla.warning_age_minutes:
status = SLAStatus.WARNING
threshold = float(sla.warning_age_minutes)
else:
status = SLAStatus.OK
threshold = float(sla.max_age_minutes or age_minutes)
return SLACheckResult(
sla_name=sla.name,
dimension="freshness",
status=status,
current_value=age_minutes,
threshold=threshold,
message=(
f"Data age: {age_minutes:.1f} minutes "
f"(SLA: {sla.max_age_minutes} minutes)"
),
checked_at=datetime.utcnow(),
)
def check_completeness(
self,
sla: SLADefinition,
table: str,
entity_table: str,
entity_col: str = "user_id",
) -> SLACheckResult:
"""
Check what % of expected entities have a record in the feature table.
Catches the silent-zero-rows failure and partial population failures.
"""
query = sa.text(f"""
SELECT
COUNT(DISTINCT e.{entity_col}) as total_entities,
COUNT(DISTINCT f.{entity_col}) as covered_entities
FROM {entity_table} e
LEFT JOIN {table} f ON e.{entity_col} = f.{entity_col}
WHERE e.is_active = true
""")
with self.engine.connect() as conn:
result = conn.execute(query).fetchone()
total = result["total_entities"] or 0
covered = result["covered_entities"] or 0
coverage_pct = covered / total if total > 0 else 0.0
threshold = sla.min_entity_coverage_pct or 0.95
status = SLAStatus.VIOLATED if coverage_pct < threshold else SLAStatus.OK
return SLACheckResult(
sla_name=sla.name,
dimension="completeness",
status=status,
current_value=coverage_pct * 100,
threshold=threshold * 100,
message=(
f"Entity coverage: {coverage_pct*100:.1f}% "
f"({covered}/{total} entities) "
f"- SLA requires {threshold*100:.0f}%"
),
checked_at=datetime.utcnow(),
)
def check_availability(
self,
sla: SLADefinition,
pipeline_runs_table: str,
window_days: int = 7,
) -> SLACheckResult:
"""
Compute rolling success rate over the past N days.
Catches chronic pipeline flakiness that per-run monitoring misses.
"""
cutoff = datetime.utcnow() - timedelta(days=window_days)
query = sa.text(f"""
SELECT
COUNT(*) as total_runs,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_runs
FROM {pipeline_runs_table}
WHERE pipeline_id = :pipeline_id
AND run_time >= :cutoff
""")
with self.engine.connect() as conn:
result = conn.execute(
query,
{"pipeline_id": sla.pipeline_id, "cutoff": cutoff}
).fetchone()
total = result["total_runs"] or 0
successful = result["successful_runs"] or 0
success_rate = successful / total if total > 0 else 0.0
threshold = sla.min_weekly_success_rate or 0.99
status = SLAStatus.VIOLATED if success_rate < threshold else SLAStatus.OK
return SLACheckResult(
sla_name=sla.name,
dimension="availability",
status=status,
current_value=success_rate * 100,
threshold=threshold * 100,
message=(
f"7-day success rate: {success_rate*100:.1f}% "
f"({successful}/{total} runs) "
f"- SLA requires {threshold*100:.0f}%"
),
checked_at=datetime.utcnow(),
)
def check_all(
self,
sla: SLADefinition,
table: str,
**kwargs,
) -> List[SLACheckResult]:
"""Run all configured SLA checks and return results."""
results = []
if sla.max_age_minutes is not None:
results.append(
self.check_freshness(sla, table, **kwargs)
)
if sla.min_entity_coverage_pct is not None and "entity_table" in kwargs:
results.append(
self.check_completeness(
sla, table,
entity_table=kwargs["entity_table"],
entity_col=kwargs.get("entity_col", "user_id"),
)
)
return results
def handle_results(
self, results: List[SLACheckResult], sla: SLADefinition
) -> None:
"""Route SLA check results to appropriate notification channels."""
violations = [r for r in results if r.status == SLAStatus.VIOLATED]
warnings = [r for r in results if r.status == SLAStatus.WARNING]
if violations:
self._handle_violation(violations, sla)
elif warnings:
self._handle_warning(warnings, sla)
else:
logger.info("All SLA checks passed for %s", sla.name)
def _handle_violation(
self, violations: List[SLACheckResult], sla: SLADefinition
) -> None:
"""Page on-call and notify stakeholders for SLA violations."""
message = self._format_violation_message(violations, sla)
logger.error("SLA VIOLATED: %s\n%s", sla.name, message)
if sla.slack_channel and self.slack_token:
self._send_slack(sla.slack_channel, message, severity="critical")
if sla.pagerduty_service_key and self.pagerduty_api_key:
self._page_oncall(sla, violations)
def _handle_warning(
self, warnings: List[SLACheckResult], sla: SLADefinition
) -> None:
"""Send warning-level notification before SLA is fully violated."""
message = self._format_warning_message(warnings, sla)
logger.warning("SLA WARNING: %s\n%s", sla.name, message)
if sla.slack_channel and self.slack_token:
self._send_slack(sla.slack_channel, message, severity="warning")
def _format_violation_message(
self, violations: List[SLACheckResult], sla: SLADefinition
) -> str:
lines = [
f":red_circle: *SLA VIOLATION: {sla.name}*",
f"Pipeline: `{sla.pipeline_id}`",
f"Checked at: {datetime.utcnow().isoformat()}Z",
"",
]
for v in violations:
lines.append(
f"• *{v.dimension.upper()}*: {v.message}"
)
lines += [
"",
"Action required: See runbook at <link>",
"On-call: /oncall data-engineering",
]
return "\n".join(lines)
def _format_warning_message(
self, warnings: List[SLACheckResult], sla: SLADefinition
) -> str:
lines = [
f":warning: *SLA Warning: {sla.name}*",
f"Pipeline: `{sla.pipeline_id}`",
"",
]
for w in warnings:
lines.append(f"• *{w.dimension}*: {w.message}")
lines.append("No action required yet, but monitor closely.")
return "\n".join(lines)
def _send_slack(self, channel: str, message: str, severity: str) -> None:
color = "#dc2626" if severity == "critical" else "#ea580c"
payload = {
"channel": channel,
"attachments": [{
"color": color,
"text": message,
"footer": "DataSLAMonitor",
"ts": int(datetime.utcnow().timestamp()),
}]
}
response = requests.post(
"https://slack.com/api/chat.postMessage",
json=payload,
headers={"Authorization": f"Bearer {self.slack_token}"},
timeout=5,
)
if not response.ok:
logger.error("Failed to send Slack alert: %s", response.text)
def _page_oncall(
self, sla: SLADefinition, violations: List[SLACheckResult]
) -> None:
payload = {
"service_key": sla.pagerduty_service_key,
"event_type": "trigger",
"description": f"Data SLA violated: {sla.name}",
"details": {
"violations": [
{"dimension": v.dimension, "message": v.message}
for v in violations
]
},
}
response = requests.post(
"https://events.pagerduty.com/generic/2010-04-15/create_event.json",
json=payload,
timeout=5,
)
if not response.ok:
logger.error("Failed to page on-call: %s", response.text)
Airflow SLA Miss Callbacks
Airflow has a native SLA mechanism: the sla parameter on each task definition. When a task does not complete within the specified duration, Airflow calls the sla_miss_callback function. This is coarser than our custom SLA monitor (it only covers task runtime, not data quality), but it is the first line of defense.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import requests
def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""
Called by Airflow when any task in the DAG misses its SLA.
Sends a Slack alert and pages on-call.
"""
missed_tasks = [str(sla.task_id) for sla in slas]
message = (
f":rotating_light: *SLA Miss* in DAG `{dag.dag_id}`\n"
f"Tasks that missed SLA: {', '.join(missed_tasks)}\n"
f"These tasks are blocking: {', '.join(str(t) for t in blocking_task_list)}\n"
f"Check Airflow UI for details."
)
# Post to Slack
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={"text": message},
timeout=5,
)
with DAG(
dag_id="recommendation_feature_pipeline",
schedule_interval="@hourly",
start_date=days_ago(1),
sla_miss_callback=sla_miss_callback, # Fires if DAG takes > 45 min total
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
) as dag:
compute_features = PythonOperator(
task_id="compute_features",
python_callable=run_feature_computation,
sla=timedelta(minutes=30), # Task-level SLA: 30 minutes max
)
validate_output = PythonOperator(
task_id="validate_output",
python_callable=run_output_validation,
sla=timedelta(minutes=5),
)
compute_features >> validate_output
:::tip Add a Row Count Validation Task Add an explicit validation task at the end of every pipeline that checks: (1) the output table has at least N rows, (2) the maximum timestamp is within the expected freshness window, (3) no downstream tables have been accidentally truncated. This task should fail loudly - raising an exception that Airflow marks as a failed run - rather than logging a warning and continuing. :::
Incident Response for Data Quality
Data incidents require a different response than software incidents. A 500 error is binary - the service either works or doesn't. A data incident is graded - the data is partially wrong, slightly stale, or sparsely populated. Triage requires understanding the business impact, not just the technical symptom.
Severity Classification
| Severity | Description | Response Time | Example |
|---|---|---|---|
| P0 | ML model producing materially wrong outputs due to bad data | 15 minutes | Fraud model approving 40% more transactions because feature store is empty |
| P1 | Core business metric data is incorrect or missing | 30 minutes | Revenue dashboard showing $0 for the current day |
| P2 | Non-critical data pipeline failure affecting downstream teams | 2 hours | Recommendation features are 3 hours stale |
| P3 | Cosmetic data issue, no business impact | Next business day | Historical report has formatting errors |
The P0/P1 line is crossed when incorrect data drives incorrect automated decisions. A stale recommendation is P2. A fraud model approving transactions it should block is P0.
The Data Incident Response Playbook
STEP 1 - DETECT (0-5 minutes)
- SLA monitor fires alert
- Stakeholder reports incorrect data
- Engagement metrics show anomaly
STEP 2 - TRIAGE (5-15 minutes)
- Assess severity: which downstream systems are affected?
- Is the data wrong, stale, or missing?
- Is this isolated to one pipeline or systemic?
- Assign an incident commander
STEP 3 - COMMUNICATE (15 minutes after detection)
- Notify stakeholders: what is affected, what we know, ETA for resolution
- If P0/P1: page engineering leadership
- Open an incident channel in Slack: #incident-YYYY-MM-DD-pipeline-name
STEP 4 - MITIGATE (parallel with communication)
- If data is stale: trigger a manual pipeline run
- If ML model is affected: roll back to last known good model or disable feature
- If data is wrong: quarantine the bad data, flag affected records
STEP 5 - FIX (mitigation complete, root cause addressed)
- Identify root cause
- Deploy fix to pipeline
- Run validation before marking resolved
STEP 6 - BACKFILL (after fix)
- Backfill missing or incorrect data
- Validate backfill completeness
- Notify stakeholders that data is restored
STEP 7 - POST-MORTEM (within 48 hours)
- Document root cause
- 5-whys analysis
- Action items to prevent recurrence
- Update runbook
Stakeholder Communication Templates
Having templates written before an incident reduces response time and ensures consistent communication under pressure.
INITIAL NOTIFICATION (send within 15 minutes of detection):
Subject: [DATA INCIDENT] {pipeline_name} - {severity} - Investigation in progress
We are currently investigating a data issue affecting {affected_systems}.
What we know:
- {symptom}: e.g., "Recommendation features in the feature store are approximately 4 hours stale"
- Detected at: {detection_time} UTC
- Estimated impact: {business_impact}
What we are doing:
- {mitigation_step}: e.g., "Triggering a manual pipeline run and investigating root cause"
ETA for resolution: {eta or "unknown - will update in 30 minutes"}
Incident channel: #incident-{date}-{pipeline}
Incident commander: {name}
---
UPDATE (send every 30 minutes until resolved):
Update #{n} - {current_time} UTC
Current status: {Investigating | Mitigating | Resolved}
What we have learned:
- {finding}
Current action:
- {action}
ETA: {eta or "updating in 30 minutes"}
---
RESOLUTION (send when resolved):
Subject: [RESOLVED] {pipeline_name} data incident - {date}
The data incident affecting {systems} has been resolved.
Resolution time: {resolution_time} UTC
Total duration: {duration}
Root cause: {one-sentence root cause}
Data status: {e.g., "Features backfilled through current time. No action required by downstream teams."}
Post-mortem will be shared within 48 hours.
Runbook Design
Every data pipeline must have a runbook. The runbook is written before the incident, not during it. A runbook written at 3am by an on-call engineer who is sleep-deprived and has never worked on this pipeline is worse than no runbook.
A complete runbook contains:
# Runbook: recommendation_feature_pipeline
## What This Pipeline Does
Computes user-level recommendation features hourly from the events table
and writes them to the `rec_features` table in the feature store.
Downstream consumers: recommendation API, A/B testing system, user analytics.
## SLA
- Freshness: features must be refreshed within 90 minutes of each hour boundary
- Completeness: at least 95% of active users must have a feature record
## Schedule
Runs at minute 0 of every hour. Expected runtime: 15-25 minutes.
Airflow DAG: `recommendation_feature_pipeline`
## Dependencies
- Upstream: `events` table in prod_db (written by event ingestion pipeline)
- Upstream: `users` table in prod_db (updated daily)
- Downstream: recommendation API reads features via Feast SDK
- Downstream: A/B testing reads features for experiment assignment
## Common Failure Modes
### 1. Upstream `events` Table Empty or Stale
Symptom: Task `compute_features` runs in <1 second, writes 0 rows.
Check: `SELECT COUNT(*) FROM events WHERE event_time > NOW() - INTERVAL '2 hours'`
Fix: Check the event ingestion pipeline DAG (`event_ingestion_pipeline`) - likely
upstream failure. Do NOT manually insert fake data. Wait for upstream to recover
or trigger backfill.
### 2. Memory Error in Spark Job
Symptom: Task fails with `java.lang.OutOfMemoryError`
Check: Airflow logs → Spark UI (http://spark-master:4040)
Fix: Increase executor memory in `spark_submit_options` in DAG config.
Escalate to data infra team if config change insufficient.
### 3. Feature Store Write Fails
Symptom: `write_to_feast` task fails with connection error.
Check: Feast online store status (Redis): `redis-cli -h {host} ping`
Fix: Restart Feast online store if unresponsive. Page data infra on-call.
## Manual Backfill Instructions
airflow dags backfill
--start-date 2024-01-15
--end-date 2024-01-15
recommendation_feature_pipeline
After backfill: validate row counts and maximum feature timestamp before
notifying stakeholders that data is restored.
## Who to Page
- Data Engineering on-call: PagerDuty service `data-pipelines`
- If Feast/Redis issue: Data Infrastructure on-call
- If business impact > $10k estimated: Engineering Director
Data Observability: Commercial Platforms
Custom monitoring handles specific pipelines you design from scratch. Commercial data observability platforms handle the long tail - hundreds of tables, dozens of pipelines, with automatic anomaly detection that doesn't require manual threshold configuration.
Monte Carlo: The market leader as of 2024. Connects to your data warehouse (Snowflake, BigQuery, Databricks) via a read-only connection, automatically learns the normal distribution of row counts, schema shapes, and null rates for every table, and alerts on anomalies. Provides data lineage graphs to show which tables feed which downstream assets. Strong at warehouse-level monitoring; weaker at streaming and real-time feature stores.
Acceldata: Broader platform - covers pipeline performance (Spark, Airflow), data quality, and cost management in a single product. Better suited for Hadoop/Spark-heavy architectures. More complex to set up than Monte Carlo.
Atlan: Focuses on data catalog and governance, with quality scoring built on top. Better for data discovery and lineage documentation than real-time alerting. Often used alongside Monte Carlo rather than instead of it.
Great Expectations (open source): Not technically an observability platform, but the most widely used open-source data quality framework. Defines expectations explicitly (not learned from history), integrates with dbt, Airflow, and Spark. More control than commercial platforms, more setup required.
| Platform | Best For | Weakness |
|---|---|---|
| Monte Carlo | Automatic anomaly detection at warehouse scale | Limited streaming/real-time coverage |
| Acceldata | Spark-heavy architectures, cost monitoring | Complex setup, steeper learning curve |
| Atlan | Data catalog, lineage, governance | Weaker real-time alerting |
| Great Expectations | Full control, open source, CI/CD integration | Manual threshold definition, no auto-learning |
SLAs for ML Systems
Feature freshness SLAs are particularly consequential for ML systems because stale features degrade prediction quality in ways that are invisible to the model itself.
from dataclasses import dataclass
from datetime import datetime
@dataclass
class MLFeatureSLA:
"""
SLA definition for an ML feature, tracking both freshness and
prediction quality impact of staleness.
"""
feature_name: str
pipeline_id: str
max_age_minutes: int
quality_degradation_per_hour: float # AUC points lost per hour of staleness
model_id: str
def estimate_model_quality_from_freshness(
sla: MLFeatureSLA,
current_age_minutes: float,
baseline_auc: float = 0.94,
) -> dict:
"""
Estimate expected model quality given current feature staleness.
This gives business stakeholders a concrete impact estimate for
SLA violation reporting, not just an abstract "data is stale."
"""
age_hours = current_age_minutes / 60
estimated_auc = baseline_auc - (
sla.quality_degradation_per_hour * age_hours
)
estimated_auc = max(0.5, estimated_auc) # Floor at random chance
return {
"feature": sla.feature_name,
"current_age_hours": round(age_hours, 2),
"baseline_auc": baseline_auc,
"estimated_current_auc": round(estimated_auc, 4),
"auc_degradation": round(baseline_auc - estimated_auc, 4),
"sla_violated": current_age_minutes > sla.max_age_minutes,
}
Post-Mortem: The 5-Whys Analysis
The 5-whys technique traces a failure to its root cause by asking "why" repeatedly until you reach a systemic issue rather than a proximate cause. Applied to the 3am incident:
Symptom: Recommendation features were 6 hours stale for 4 hours.
Why 1: Why were features stale?
The compute_features task wrote zero rows to the feature store.
Why 2: Why did it write zero rows?
The upstream events partition for the 3am hour was empty due to a compaction job failure.
Why 3: Why didn't the pipeline detect the empty partition? The validation task only checked that the SQL query succeeded, not that the result had an expected number of rows.
Why 4: Why wasn't there a row count check? No formal SLA existed for this pipeline. Without an SLA, there was no defined "expected row count" to check against.
Why 5: Why was there no SLA? The pipeline was built as a "quick prototype" and never formally operationalized. No process existed to require SLA definition before a pipeline is promoted to production.
Root cause: No formal pipeline operationalization process. Pipelines reach production without SLA definitions, runbooks, or validation checks.
Action items:
- Add row count validation to all pipeline output tasks (owner: pipeline author, due: 1 week)
- Define SLAs for all production data pipelines (owner: data engineering lead, due: 2 weeks)
- Create a pipeline promotion checklist that requires SLA + runbook + monitoring before production promotion (owner: data engineering lead, due: 1 month)
- Implement alerting on zero-row writes in the feature store layer (owner: data infra, due: 2 weeks)
Incident Response Flow
Quantifying the Cost of Data Downtime
SLA violations need business impact estimates - not to assign blame, but to prioritize remediation and justify investment in prevention.
def calculate_data_downtime_cost(
incident_duration_hours: float,
affected_system: str,
hourly_revenue_impact: float,
engineering_hours_spent: float,
engineer_hourly_cost: float = 150.0,
) -> dict:
"""
Framework for estimating total cost of a data downtime incident.
Components:
1. Direct revenue impact (degraded model → worse decisions)
2. Engineering time to detect, investigate, and fix
3. Stakeholder time (meetings, communication)
4. Reputational/trust cost (harder to quantify)
"""
direct_revenue_loss = incident_duration_hours * hourly_revenue_impact
engineering_cost = engineering_hours_spent * engineer_hourly_cost
# Stakeholder time: assume 2x engineering time in meetings and communication
stakeholder_cost = engineering_cost * 0.5
total_cost = direct_revenue_loss + engineering_cost + stakeholder_cost
return {
"incident_duration_hours": incident_duration_hours,
"direct_revenue_impact": round(direct_revenue_loss, 2),
"engineering_cost": round(engineering_cost, 2),
"stakeholder_cost": round(stakeholder_cost, 2),
"total_estimated_cost": round(total_cost, 2),
"cost_per_hour": round(total_cost / incident_duration_hours, 2),
}
# Example: the 3am recommendation incident
cost = calculate_data_downtime_cost(
incident_duration_hours=4.0,
affected_system="recommendation_system",
hourly_revenue_impact=2500.0, # 5% engagement drop * $50k/hr revenue
engineering_hours_spent=8.0, # 2 engineers * 4 hours each
engineer_hourly_cost=150.0,
)
# Result: {total_estimated_cost: 12200.0, cost_per_hour: 3050.0}
The framing that gets SLA investment approved: the 3am incident cost an estimated 10,000 investment in data observability infrastructure pays back in the first prevented incident.
Production Engineering Notes
The Zero-Row Write Problem
The most common source of silent data incidents is a pipeline that successfully writes zero rows. The pipeline completed, the task is green, the table exists - but it is empty. Standard monitoring doesn't catch this because it monitors task success, not output.
Fix: add an explicit assertion at the end of every pipeline task:
def validate_pipeline_output(
table: str,
min_rows: int,
engine: sa.engine.Engine,
) -> None:
"""Fail loudly if pipeline output doesn't meet expectations."""
with engine.connect() as conn:
count = conn.execute(
sa.text(f"SELECT COUNT(*) FROM {table}")
).scalar()
if count < min_rows:
raise ValueError(
f"Pipeline output validation failed: {table} has {count} rows, "
f"expected at least {min_rows}. "
f"The pipeline will be marked as FAILED to prevent silent data loss."
)
logger.info("Output validation passed: %s has %d rows", table, count)
SLA Breach Classification Table
| Check | Warning Threshold | Violation Threshold | Action |
|---|---|---|---|
| Freshness | 75% of max_age | 100% of max_age | Warning: Slack. Violation: Page. |
| Completeness | 97% coverage | 95% coverage | Warning: Slack. Violation: Page. |
| Availability | 99.5% weekly | 99% weekly | Warning: ticket. Violation: Page. |
| Row count | -10% from expected | -20% from expected | Warning: Slack. Violation: Page. |
Common Mistakes
:::danger The Silent Success Trap
The most dangerous failure mode in data pipelines: the task completes successfully, all logs look normal, but the output is wrong. Causes include: the query returning 0 rows (empty partition), a join producing fewer rows than expected, a filter condition being too aggressive, or a write failing silently and the task not checking the write result.
Defense: every pipeline task that writes data must check what it wrote. The final task in every DAG should validate row counts, schema, and freshness of the output. Fail loudly. A failed Airflow task is visible. A succeeded Airflow task that wrote wrong data is invisible. :::
:::danger The Manual Fix Without Backfill Trap
During an incident, an engineer manually reruns the failed pipeline to restore data. The immediate incident is resolved - the feature store is fresh again. But the 4-hour gap in historical feature data is never backfilled. Downstream systems that needed historical features for model evaluation, A/B testing analysis, or audit logs now have a gap.
Always backfill before declaring an incident resolved. The backfill must cover the full gap from when the data went bad to when it was restored. Validate the backfill completeness before notifying stakeholders. :::
:::danger The SLA Without Consequence Trap
Defining an SLA and never alerting on violations is worse than having no SLA - it creates false confidence. The team believes they have monitoring. They don't.
An SLA is only real if: (1) a monitor checks it on every pipeline run, (2) a violation automatically triggers an alert, (3) the alert reaches a human who is responsible for responding, and (4) there is a runbook defining how to respond. Missing any one of these four elements means the SLA is documentation, not infrastructure. :::
:::warning The Aggregate Metric Trap
Monitoring average data freshness across all pipelines hides individual failures. If 99 pipelines are fresh and 1 is 24 hours stale, the average freshness might look acceptable. Monitor each pipeline individually. Aggregate metrics are useful for trend analysis; per-pipeline metrics are required for incident detection. :::
:::warning The Dashboard Without Alerting Trap
A beautiful data quality dashboard that requires a human to check it every hour is not monitoring - it is manual inspection with extra steps. Monitoring only has value if it proactively notifies when something is wrong. Build the alert before building the dashboard. The alert catches the 3am incident. The dashboard helps you understand it. :::
Interview Q&A
Q1: Design a data SLA monitoring system for a feature store serving 20 ML models.
The system has five components.
First, an SLA registry: a database table where each feature store table has a defined SLA - freshness threshold, completeness threshold, expected row count range, and the list of models that depend on it. SLAs are version-controlled as code, reviewed before pipeline promotion.
Second, a monitoring job: runs every 5 minutes, checks freshness (max timestamp of each table), completeness (row count against expected), and schema (column count and types match registered schema). Results are written to a time-series table with the check timestamp, table name, dimension, current value, and threshold.
Third, alerting: a separate process reads the monitoring results and fires alerts on violations. Uses severity classification: features consumed by P0 models (fraud, payments) page on-call immediately. Features consumed by P2 models post to Slack. Critical here: the alert system knows the model-to-feature dependency map, so when a feature has a quality issue, it immediately surfaces which models are at risk.
Fourth, a model health dashboard: shows each model's feature health status in real time. A model with all green features is healthy. A model with one yellow feature is at risk. A model with one red feature is in incident state. This gives model owners visibility without requiring them to monitor the data infrastructure directly.
Fifth, a post-incident analysis tool: after any incident, generates a report of the timeline (when the SLA was first violated, when it was detected, when it was resolved) and identifies whether the incident would have been caught by existing monitoring or whether a monitoring gap exists.
Q2: What's the difference between data freshness SLA and data latency SLA? When would you use each?
Freshness SLA is relative: "data must be no older than X minutes." It's checked against the current time. If it's 4pm and the freshness SLA is 90 minutes, any data older than 2:30pm is in violation. Freshness SLAs are appropriate for pipelines that run on a schedule and need to stay continuously current.
Latency SLA is absolute: "data must be available by time T." A batch pipeline that feeds a morning risk calculation must have data ready by 8am UTC, regardless of when the pipeline started. If it finishes at 7:45am, it passes. If it finishes at 8:05am, it violates the SLA even if it ran for the same duration as yesterday.
Use freshness SLAs for streaming and near-real-time pipelines where the consumer needs data to be continuously fresh. Use latency SLAs for batch pipelines that feed scheduled downstream processes (morning risk runs, daily model retraining, weekly reporting). In practice, most pipelines need both: they have a latency SLA ("be ready by 8am") and a freshness SLA ("don't be older than 90 minutes at any time during the day").
Q3: How do you handle an incident where a data pipeline wrote incorrect data to the feature store, and models have already served predictions based on that incorrect data?
Four immediate steps, in order.
First, mitigate the ongoing impact: disable the affected feature in the feature store, or switch the model to a fallback that doesn't use that feature, or roll the model back to a checkpoint that predates the bad data. The goal is to stop serving incorrect predictions immediately.
Second, quarantine the bad data: flag the affected time range in the feature store as unreliable. Downstream systems that read historical features for evaluation or retraining must skip this period.
Third, assess the impact of predictions already served: which decisions were made based on the bad features? For a fraud model, this means reviewing transactions approved or denied during the incident window. For a recommendation model, this means understanding whether users were shown materially different content. High-stakes decisions (credit, fraud, healthcare) may require manual review and remediation.
Fourth, backfill and validate: after fixing the root cause, recompute the correct features for the affected time range and write them to the feature store. Validate the backfill against expected distributions. Trigger reprocessing of any downstream jobs that consumed the bad data.
Q4: A data pipeline's Airflow DAG shows all green tasks but downstream ML models are degrading. How do you investigate?
This is the silent success trap. All green tasks means all tasks completed without raising exceptions. It says nothing about the correctness or completeness of the data they produced.
Investigation steps:
One: check row counts. Query the output table: SELECT COUNT(*), MAX(updated_at) FROM feature_table WHERE updated_at > NOW() - INTERVAL '2 hours'. Compare against the expected count from previous runs. If count is dramatically lower (or zero), the pipeline ran but wrote wrong data.
Two: check the feature distribution. Pull descriptive statistics for the top features by model importance. Compare to the previous hour, previous day, and the training baseline. Use PSI to detect distribution shifts that standard stats might miss.
Three: check the upstream dependencies. Are all upstream tables populated for the relevant time period? A pipeline that joins against an empty partition will produce fewer rows without failing.
Four: add output validation to the pipeline. After identifying the root cause, add an explicit assertion at the end of the pipeline that validates row counts, schema, and freshness. This converts future silent failures into loud failures that Airflow marks as failed runs.
Q5: How would you define and enforce data SLAs across a team of 20 data engineers working on different pipelines?
The key challenge is making SLA definition a default behavior, not an optional extra.
Structurally: create a pipeline promotion checklist that requires an SLA definition before a pipeline is marked "production." The checklist includes: freshness threshold, completeness threshold, expected row count range, runbook link, stakeholder list, and on-call assignment. No pipeline goes to production without completing the checklist. This is enforced via a code review requirement - PRs that add a new production DAG must include the SLA definition as a data file (YAML or JSON checked into the repo).
Technically: the SLA registry is a database table populated from the YAML files in the repo. The monitoring job reads from the registry. When a new pipeline is merged, its SLA is automatically registered. When a pipeline is deprecated, its SLA is automatically deregistered.
Culturally: publish a weekly SLA health report to the engineering team - which pipelines violated their SLAs, which are approaching violation, and which have no SLA defined yet (the no-SLA list should shrink each week). Make the data quality posture of the team visible. This creates natural peer pressure without requiring management enforcement.
Q6: What is the difference between data observability and data monitoring? When is each appropriate?
Data monitoring is the practice of checking specific, pre-defined metrics against specific, pre-defined thresholds. You know what to watch; you configure the check; you get alerted when the check fails. Monitoring is precise but requires you to anticipate what will go wrong. It works well for known failure modes (empty tables, schema changes, freshness violations) but misses unknown failure modes.
Data observability extends monitoring with machine learning: instead of defining "row count must be between 1M and 1.2M," an observability platform learns the normal distribution of row counts from historical data and alerts when the current value is anomalous relative to that history. This catches failures you didn't anticipate - a row count that is 15% lower than the day-of-week average, a null rate that has been creeping up for three weeks, a value distribution that changed shape without changing mean.
The appropriate choice: monitoring is sufficient for stable, well-understood pipelines where the failure modes are known. Observability is necessary for large table estates (50+ tables) where defining and maintaining explicit thresholds for every table is impractical, or for catching subtle quality degradation that doesn't violate explicit thresholds.
In practice: use monitoring for your highest-stakes pipelines (explicit thresholds, explicit runbooks) and observability for the long tail. The combination catches both the known unknowns (monitoring) and the unknown unknowns (observability).
tags: data-engineering, data-quality, ml-systems, ai-infrastructure
