:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required. :::
Anomaly Detection in Pipelines: Catching What Static Checks Miss
The 18% Revenue Drop
The Monday morning business review started normally. Slide four stopped everyone cold: revenue for the previous Tuesday was down 18% versus the same day the prior week. Not a seasonal dip - a clean vertical drop on a single day, followed by an immediate return to baseline.
The product team ran through the playbook. No deploys on Monday night. No pricing changes. No campaigns that launched or expired. The app was up. Payment processing was green. Customer support tickets from Tuesday were normal volume, normal topics. Nothing in the product had changed.
The data engineering team spent two days tracing backward through the pipeline before they found it. An upstream vendor had sent a file on Tuesday morning. The ETL job picked it up, validated the file schema (all columns present, all types correct), and loaded it. What nobody checked: the event_timestamp column. Every row in that file had timestamps from 72 hours earlier. The file was a delayed resend of a file that had already been processed - same data, different delivery time.
The pipeline loaded 1.2 million rows of 72-hour-old data. The revenue attribution system processed it and credited those events to Tuesday. Actual Tuesday revenue was fine. The attribution for Monday through Wednesday was corrupted. The dashboard showed an 18% dip on Tuesday because Tuesday's events were attributed to Saturday, where they blended invisibly with weekend traffic.
The timestamp column existed in the schema. The ETL job read it correctly. Nobody was checking its distribution. A simple check - "is the max timestamp in this file within the last four hours?" - would have caught this in the first minute of the pipeline run. The fix, once identified, took twenty minutes. The investigation took two days.
This is the gap that anomaly detection fills: the space between "the file arrived" and "the file is correct."
Why Static Threshold Checks Fail
The first instinct after an incident like the one above is to add a threshold check: "if max(event_timestamp) is older than 6 hours, fail the pipeline." This is reasonable. It would catch the exact incident that triggered it. It will also fail in three months when the vendor's timezone configuration changes and all timestamps shift by an hour, or when a bank holiday reduces event volume and the max timestamp is legitimately 8 hours old, or when the engineering team runs a historical backfill and every timestamp is weeks old by design.
Static thresholds fail because they are written at one point in time, based on one understanding of the data, and the data changes. The thresholds don't.
Consider what static threshold maintenance looks like at scale:
- A table that grows 15% month-over-month needs its row count threshold updated monthly
- A feature with weekly seasonality needs different thresholds for weekdays versus weekends
- A metric that is legitimately zero on holidays needs holiday-aware thresholds
- Every new metric added to the pipeline needs manually calibrated thresholds from someone who understands the domain
At ten tables, this is manageable. At a hundred tables with a dozen metrics each, it is a full-time job. And despite the effort, thresholds still miss anomalies that weren't anticipated when they were set.
Statistical anomaly detection inverts this. Instead of setting a threshold and checking against it, you build a model of what normal looks like from historical data, then flag deviations from that model. The model adapts as the data changes (with controlled retraining). It handles seasonality automatically. It doesn't require per-metric calibration. And it catches anomalies that no human would have thought to write a threshold for.
Statistical Approaches: Building the Toolbox
Z-Score Detection: The Foundation
The z-score measures how many standard deviations a current observation is from the historical mean. For data pipeline metrics (row counts, null rates, aggregate values), z-score detection is often sufficient and always interpretable.
where is the current metric value, is the historical mean, and is the historical standard deviation.
An observation with is approximately 0.3% likely under a normal distribution - a reasonable anomaly threshold for most pipeline metrics.
import numpy as np
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class MetricBaseline:
"""Historical statistics for a single metric."""
metric_name: str
mean: float
std: float
min_observed: float
max_observed: float
sample_count: int
@dataclass
class AnomalyResult:
"""Result of an anomaly check for a single metric."""
metric_name: str
current_value: float
z_score: float
is_anomaly: bool
severity: str # "normal", "warning", "critical"
explanation: str
class ZScoreDetector:
"""
Z-score based anomaly detector for pipeline metrics.
Fits on historical data, detects anomalies on new observations.
"""
def __init__(
self,
warning_threshold: float = 2.0,
critical_threshold: float = 3.0,
min_std: float = 0.001, # prevent division by near-zero std
):
self.warning_threshold = warning_threshold
self.critical_threshold = critical_threshold
self.min_std = min_std
self.baselines: dict[str, MetricBaseline] = {}
def fit(self, historical_metrics: dict[str, list[float]]) -> None:
"""
Build baselines from historical metric values.
Args:
historical_metrics: dict of metric_name -> list of historical values
"""
for metric_name, values in historical_metrics.items():
arr = np.array(values, dtype=float)
arr = arr[~np.isnan(arr)] # remove NaN
if len(arr) < 7:
raise ValueError(
f"Need at least 7 observations to build baseline for {metric_name}, "
f"got {len(arr)}"
)
self.baselines[metric_name] = MetricBaseline(
metric_name=metric_name,
mean=float(np.mean(arr)),
std=float(max(np.std(arr), self.min_std)),
min_observed=float(np.min(arr)),
max_observed=float(np.max(arr)),
sample_count=len(arr),
)
def detect(self, current_metrics: dict[str, float]) -> list[AnomalyResult]:
"""
Detect anomalies in current metric values.
Args:
current_metrics: dict of metric_name -> current value
Returns:
List of AnomalyResult for each metric
"""
results = []
for metric_name, current_value in current_metrics.items():
if metric_name not in self.baselines:
# Unknown metric - treat as warning
results.append(AnomalyResult(
metric_name=metric_name,
current_value=current_value,
z_score=float("inf"),
is_anomaly=True,
severity="warning",
explanation=f"No baseline exists for metric '{metric_name}'"
))
continue
baseline = self.baselines[metric_name]
z_score = (current_value - baseline.mean) / baseline.std
if abs(z_score) >= self.critical_threshold:
severity = "critical"
is_anomaly = True
direction = "above" if z_score > 0 else "below"
explanation = (
f"{metric_name} is {abs(z_score):.1f} standard deviations {direction} "
f"baseline. Current: {current_value:.2f}, "
f"Expected: {baseline.mean:.2f} ± {baseline.std:.2f}"
)
elif abs(z_score) >= self.warning_threshold:
severity = "warning"
is_anomaly = True
direction = "above" if z_score > 0 else "below"
explanation = (
f"{metric_name} is {abs(z_score):.1f} standard deviations {direction} "
f"baseline. Current: {current_value:.2f}, "
f"Expected: {baseline.mean:.2f} ± {baseline.std:.2f}"
)
else:
severity = "normal"
is_anomaly = False
explanation = (
f"{metric_name} is within normal range. "
f"z-score: {z_score:.2f}"
)
results.append(AnomalyResult(
metric_name=metric_name,
current_value=current_value,
z_score=z_score,
is_anomaly=is_anomaly,
severity=severity,
explanation=explanation,
))
return results
IQR-Based Detection: Handling Skewed Distributions
Row counts and file sizes are often right-skewed (occasional large spikes). Z-scores assume approximate normality. For skewed distributions, IQR-based detection is more robust:
where and is typically 1.5 (mild outlier) or 3.0 (extreme outlier).
class IQRDetector:
"""IQR-based outlier detection. More robust than z-score for skewed distributions."""
def __init__(self, k_warning: float = 1.5, k_critical: float = 3.0):
self.k_warning = k_warning
self.k_critical = k_critical
self.baselines = {}
def fit(self, historical_metrics: dict[str, list[float]]) -> None:
for metric_name, values in historical_metrics.items():
arr = np.array(values, dtype=float)
arr = arr[~np.isnan(arr)]
q1 = np.percentile(arr, 25)
q3 = np.percentile(arr, 75)
iqr = q3 - q1
self.baselines[metric_name] = {
"q1": q1,
"q3": q3,
"iqr": iqr,
"median": np.median(arr),
"warn_lower": q1 - self.k_warning * iqr,
"warn_upper": q3 + self.k_warning * iqr,
"crit_lower": q1 - self.k_critical * iqr,
"crit_upper": q3 + self.k_critical * iqr,
}
def detect(self, current_metrics: dict[str, float]) -> list[AnomalyResult]:
results = []
for metric_name, current_value in current_metrics.items():
if metric_name not in self.baselines:
continue
b = self.baselines[metric_name]
if current_value < b["crit_lower"] or current_value > b["crit_upper"]:
severity, is_anomaly = "critical", True
explanation = (
f"{metric_name} value {current_value:.2f} is outside critical IQR bounds "
f"[{b['crit_lower']:.2f}, {b['crit_upper']:.2f}]"
)
elif current_value < b["warn_lower"] or current_value > b["warn_upper"]:
severity, is_anomaly = "warning", True
explanation = (
f"{metric_name} value {current_value:.2f} is outside warning IQR bounds "
f"[{b['warn_lower']:.2f}, {b['warn_upper']:.2f}]"
)
else:
severity, is_anomaly = "normal", False
explanation = f"{metric_name} is within normal IQR range"
results.append(AnomalyResult(
metric_name=metric_name,
current_value=current_value,
z_score=(current_value - b["median"]) / max(b["iqr"], 1e-6),
is_anomaly=is_anomaly,
severity=severity,
explanation=explanation,
))
return results
Rolling Window Comparison: Handling Seasonality
Day-over-day comparisons fail on weekly seasonality - Monday traffic is different from Sunday traffic. Rolling window comparisons normalize for weekly patterns:
import pandas as pd
class RollingWindowDetector:
"""
Compare current value to same period in previous weeks.
Handles weekly seasonality automatically.
"""
def __init__(
self,
lookback_weeks: int = 4,
warning_pct: float = 0.20, # 20% deviation
critical_pct: float = 0.40, # 40% deviation
):
self.lookback_weeks = lookback_weeks
self.warning_pct = warning_pct
self.critical_pct = critical_pct
def detect(
self,
metric_series: pd.Series, # DatetimeIndex, values are the metric
current_date: pd.Timestamp,
metric_name: str,
) -> AnomalyResult:
"""
Compare value on current_date to same weekday in previous N weeks.
"""
current_value = metric_series.get(current_date)
if current_value is None:
raise ValueError(f"No value for {current_date}")
# Collect same weekday in previous weeks
reference_values = []
for weeks_back in range(1, self.lookback_weeks + 1):
ref_date = current_date - pd.Timedelta(weeks=weeks_back)
ref_value = metric_series.get(ref_date)
if ref_value is not None and not np.isnan(ref_value):
reference_values.append(ref_value)
if len(reference_values) < 2:
return AnomalyResult(
metric_name=metric_name,
current_value=current_value,
z_score=0.0,
is_anomaly=False,
severity="normal",
explanation="Insufficient historical data for comparison"
)
reference_mean = np.mean(reference_values)
pct_deviation = abs(current_value - reference_mean) / max(abs(reference_mean), 1e-6)
if pct_deviation >= self.critical_pct:
severity, is_anomaly = "critical", True
explanation = (
f"{metric_name}: {pct_deviation*100:.1f}% deviation from "
f"same-weekday average ({reference_mean:.2f}) over last {len(reference_values)} weeks. "
f"Current: {current_value:.2f}"
)
elif pct_deviation >= self.warning_pct:
severity, is_anomaly = "warning", True
explanation = (
f"{metric_name}: {pct_deviation*100:.1f}% deviation from "
f"same-weekday average ({reference_mean:.2f}). "
f"Current: {current_value:.2f}"
)
else:
severity, is_anomaly = "normal", False
explanation = f"{metric_name}: within {pct_deviation*100:.1f}% of expected"
z_score = (current_value - reference_mean) / max(np.std(reference_values), 1e-6)
return AnomalyResult(
metric_name=metric_name,
current_value=current_value,
z_score=z_score,
is_anomaly=is_anomaly,
severity=severity,
explanation=explanation,
)
CUSUM: Detecting Slow Drift
CUSUM (Cumulative Sum) detects sustained drift - a metric that shifts gradually over many periods rather than spiking once. Z-scores miss this because each individual reading looks normal; it is the accumulation that signals a problem.
where is the expected mean, is the allowance parameter (typically ), and an alert triggers when (typically to ).
class CUSUMDetector:
"""
CUSUM control chart for detecting sustained drift in metrics.
Use when you expect gradual shifts, not sudden spikes.
"""
def __init__(
self,
k_factor: float = 0.5, # allowance: k * sigma
h_factor: float = 4.0, # decision interval: h * sigma
):
self.k_factor = k_factor
self.h_factor = h_factor
def run(
self,
values: list[float],
target_mean: float,
sigma: float,
) -> list[dict]:
"""
Run CUSUM on a sequence of values.
Args:
values: time-ordered metric observations
target_mean: expected (in-control) mean
sigma: expected standard deviation
Returns:
List of dicts with CUSUM state at each step
"""
k = self.k_factor * sigma
h = self.h_factor * sigma
results = []
cusum_high = 0.0 # cumulative sum for upward drift
cusum_low = 0.0 # cumulative sum for downward drift
for i, x in enumerate(values):
deviation = x - target_mean
cusum_high = max(0, cusum_high + deviation - k)
cusum_low = max(0, cusum_low - deviation - k)
alert_high = cusum_high > h
alert_low = cusum_low > h
results.append({
"index": i,
"value": x,
"cusum_high": cusum_high,
"cusum_low": cusum_low,
"alert_upward_drift": alert_high,
"alert_downward_drift": alert_low,
"explanation": (
f"Upward drift detected (CUSUM={cusum_high:.2f} > threshold={h:.2f})"
if alert_high else
f"Downward drift detected (CUSUM={cusum_low:.2f} > threshold={h:.2f})"
if alert_low else
"No drift detected"
)
})
return results
The Complete Pipeline Anomaly Detector
Combining the above detectors into a production class:
import json
import logging
from datetime import datetime
from typing import Any
import requests
logger = logging.getLogger(__name__)
class PipelineAnomalyDetector:
"""
Production-grade anomaly detector for data pipeline metrics.
Composes multiple detection strategies with severity routing.
"""
def __init__(
self,
pipeline_name: str,
slack_webhook_url: Optional[str] = None,
pagerduty_routing_key: Optional[str] = None,
):
self.pipeline_name = pipeline_name
self.slack_webhook_url = slack_webhook_url
self.pagerduty_routing_key = pagerduty_routing_key
self.z_detector = ZScoreDetector(warning_threshold=2.0, critical_threshold=3.0)
self.iqr_detector = IQRDetector(k_warning=1.5, k_critical=3.0)
self._fitted = False
def fit(self, historical_metrics: dict[str, list[float]]) -> None:
"""Fit all detectors on historical data."""
self.z_detector.fit(historical_metrics)
self.iqr_detector.fit(historical_metrics)
self._fitted = True
logger.info(
f"Fitted anomaly detector on {len(historical_metrics)} metrics "
f"with {next(iter(historical_metrics.values())).__len__()} historical observations"
)
def detect(self, current_metrics: dict[str, float]) -> list[AnomalyResult]:
"""
Run all detectors and merge results (take the highest severity).
"""
if not self._fitted:
raise RuntimeError("Call fit() before detect()")
z_results = {r.metric_name: r for r in self.z_detector.detect(current_metrics)}
iqr_results = {r.metric_name: r for r in self.iqr_detector.detect(current_metrics)}
merged = []
severity_rank = {"normal": 0, "warning": 1, "critical": 2}
for metric_name in current_metrics:
z_result = z_results.get(metric_name)
iqr_result = iqr_results.get(metric_name)
if z_result and iqr_result:
# Take the more severe result
if severity_rank[z_result.severity] >= severity_rank[iqr_result.severity]:
winner = z_result
else:
winner = iqr_result
elif z_result:
winner = z_result
elif iqr_result:
winner = iqr_result
else:
continue
merged.append(winner)
return merged
def alert(self, anomalies: list[AnomalyResult]) -> None:
"""Route anomalies to appropriate alerting channels."""
critical = [a for a in anomalies if a.severity == "critical"]
warnings = [a for a in anomalies if a.severity == "warning"]
if critical and self.pagerduty_routing_key:
self._page_oncall(critical)
if (critical or warnings) and self.slack_webhook_url:
self._notify_slack(critical, warnings)
def _notify_slack(
self,
critical: list[AnomalyResult],
warnings: list[AnomalyResult],
) -> None:
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"Data Anomaly: {self.pipeline_name}",
}
}
]
if critical:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Critical Anomalies:*\n" + "\n".join(
f":red_circle: {a.explanation}" for a in critical
)
}
})
if warnings:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Warnings:*\n" + "\n".join(
f":warning: {a.explanation}" for a in warnings
)
}
})
payload = {
"blocks": blocks,
"text": f"Pipeline anomaly detected: {self.pipeline_name}",
}
try:
resp = requests.post(
self.slack_webhook_url,
json=payload,
timeout=5,
)
resp.raise_for_status()
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
def _page_oncall(self, critical: list[AnomalyResult]) -> None:
"""Trigger PagerDuty alert for critical anomalies."""
payload = {
"routing_key": self.pagerduty_routing_key,
"event_action": "trigger",
"payload": {
"summary": (
f"[{self.pipeline_name}] Critical data anomaly: "
+ "; ".join(a.metric_name for a in critical)
),
"severity": "critical",
"timestamp": datetime.utcnow().isoformat(),
"custom_details": {
a.metric_name: a.explanation for a in critical
},
}
}
try:
resp = requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=5,
)
resp.raise_for_status()
except Exception as e:
logger.error(f"Failed to send PagerDuty alert: {e}")
What to Monitor: A Metric Taxonomy
Effective anomaly detection requires knowing which metrics to collect. Five categories cover most pipeline failure modes:
Volume Metrics
def collect_volume_metrics(table: str, engine) -> dict[str, float]:
"""Row count, partition count, estimated data size."""
with engine.connect() as conn:
row_count = conn.execute(
f"SELECT COUNT(*) FROM {table}"
).scalar()
today_count = conn.execute(
f"SELECT COUNT(*) FROM {table} WHERE partition_date = CURRENT_DATE"
).scalar()
distinct_partitions = conn.execute(
f"SELECT COUNT(DISTINCT partition_date) FROM {table} "
f"WHERE partition_date >= CURRENT_DATE - 30"
).scalar()
return {
f"{table}.row_count": row_count,
f"{table}.today_row_count": today_count,
f"{table}.partition_count_30d": distinct_partitions,
}
Freshness Metrics
def collect_freshness_metrics(table: str, timestamp_col: str, engine) -> dict[str, float]:
"""How recent is the data?"""
with engine.connect() as conn:
result = conn.execute(f"""
SELECT
DATEDIFF(MINUTE, MAX({timestamp_col}), CURRENT_TIMESTAMP) as lag_minutes,
MAX({timestamp_col}) as max_ts,
MIN({timestamp_col}) as min_ts,
DATEDIFF(HOUR, MIN({timestamp_col}), MAX({timestamp_col})) as span_hours
FROM {table}
WHERE partition_date = CURRENT_DATE
""").fetchone()
return {
f"{table}.freshness_lag_minutes": result[0] or 9999,
f"{table}.timestamp_span_hours": result[3] or 0,
}
Distribution Metrics
def collect_distribution_metrics(
table: str,
columns: list[str],
engine
) -> dict[str, float]:
"""Null rates, cardinality, mean, stddev for key columns."""
metrics = {}
for col in columns:
with engine.connect() as conn:
result = conn.execute(f"""
SELECT
COUNT(*) as total,
COUNT({col}) as non_null,
COUNT(DISTINCT {col}) as cardinality,
AVG(CAST({col} AS FLOAT)) as mean_val,
STDDEV(CAST({col} AS FLOAT)) as std_val
FROM {table}
WHERE partition_date = CURRENT_DATE
""").fetchone()
total = result[0] or 1
non_null = result[1] or 0
metrics[f"{table}.{col}.null_rate"] = 1.0 - (non_null / total)
metrics[f"{table}.{col}.cardinality"] = result[2] or 0
if result[3] is not None:
metrics[f"{table}.{col}.mean"] = result[3]
if result[4] is not None:
metrics[f"{table}.{col}.stddev"] = result[4]
return metrics
Business Metrics
def collect_business_metrics(engine) -> dict[str, float]:
"""Domain-specific aggregate metrics - distinct users, orders, revenue."""
with engine.connect() as conn:
result = conn.execute("""
SELECT
COUNT(DISTINCT user_id) as active_users,
COUNT(DISTINCT order_id) as order_count,
SUM(revenue_usd) as total_revenue,
AVG(revenue_usd) as avg_order_value
FROM orders
WHERE order_date = CURRENT_DATE
""").fetchone()
return {
"orders.active_users_today": result[0] or 0,
"orders.order_count_today": result[1] or 0,
"orders.total_revenue_today": result[2] or 0.0,
"orders.avg_order_value_today": result[3] or 0.0,
}
ML-Based Anomaly Detection
For complex, multivariate anomalies - where no single metric is anomalous but the combination is unusual - machine learning detectors outperform threshold-based approaches.
Isolation Forest for Multivariate Metrics
from sklearn.ensemble import IsolationForest
import numpy as np
import pandas as pd
class IsolationForestDetector:
"""
Isolation Forest for multivariate pipeline metric anomaly detection.
Detects anomalies in the joint distribution of multiple metrics.
"""
def __init__(
self,
contamination: float = 0.01, # expected fraction of anomalies
n_estimators: int = 100,
random_state: int = 42,
):
self.model = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
random_state=random_state,
)
self.feature_names = None
self.scaler = None
def fit(self, historical_df: pd.DataFrame) -> None:
"""
Fit on a DataFrame where each row is a day's metrics
and each column is a metric.
"""
from sklearn.preprocessing import StandardScaler
self.feature_names = list(historical_df.columns)
self.scaler = StandardScaler()
X = self.scaler.fit_transform(historical_df.values)
self.model.fit(X)
def detect(self, current_metrics: dict[str, float]) -> dict:
"""
Return anomaly score and prediction for today's metrics.
Isolation Forest score: -1 = anomaly, 1 = normal.
Anomaly score: lower (more negative) = more anomalous.
"""
if self.feature_names is None:
raise RuntimeError("Call fit() first")
# Align features in the same order as training
feature_vector = np.array([
current_metrics.get(f, np.nan) for f in self.feature_names
]).reshape(1, -1)
scaled = self.scaler.transform(feature_vector)
prediction = self.model.predict(scaled)[0] # -1 or 1
score = self.model.score_samples(scaled)[0] # anomaly score
return {
"is_anomaly": prediction == -1,
"anomaly_score": score,
"severity": "critical" if score < -0.7 else "warning" if score < -0.5 else "normal",
}
Prophet for Time-Series Metric Forecasting
from prophet import Prophet
import pandas as pd
class ProphetMetricMonitor:
"""
Uses Prophet to forecast metric values and flag observations
that fall outside the prediction interval.
"""
def __init__(
self,
interval_width: float = 0.95, # 95% prediction interval
uncertainty_threshold: float = 1.5, # alert if N× outside interval
):
self.interval_width = interval_width
self.uncertainty_threshold = uncertainty_threshold
self.models: dict[str, Prophet] = {}
self.forecasts: dict[str, pd.DataFrame] = {}
def fit(self, metric_name: str, historical_df: pd.DataFrame) -> None:
"""
Fit Prophet on a metric.
Args:
metric_name: name of the metric
historical_df: DataFrame with columns 'ds' (datetime) and 'y' (value)
"""
model = Prophet(
interval_width=self.interval_width,
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=False, # usually too little data
changepoint_prior_scale=0.05, # conservative - avoid overfitting
)
model.fit(historical_df)
self.models[metric_name] = model
def detect(
self,
metric_name: str,
current_date: pd.Timestamp,
current_value: float,
) -> AnomalyResult:
"""
Compare current_value to Prophet's prediction for current_date.
"""
if metric_name not in self.models:
raise ValueError(f"No model fitted for {metric_name}")
model = self.models[metric_name]
# Make a one-step forecast for the current date
future = pd.DataFrame({"ds": [current_date]})
forecast = model.predict(future)
yhat = forecast["yhat"].iloc[0]
yhat_lower = forecast["yhat_lower"].iloc[0]
yhat_upper = forecast["yhat_upper"].iloc[0]
interval_width = yhat_upper - yhat_lower
# How far outside the prediction interval is the current value?
if current_value < yhat_lower:
overshoot = (yhat_lower - current_value) / max(interval_width, 1e-6)
elif current_value > yhat_upper:
overshoot = (current_value - yhat_upper) / max(interval_width, 1e-6)
else:
overshoot = 0.0
is_anomaly = overshoot > self.uncertainty_threshold
severity = (
"critical" if overshoot > self.uncertainty_threshold * 2
else "warning" if is_anomaly
else "normal"
)
return AnomalyResult(
metric_name=metric_name,
current_value=current_value,
z_score=overshoot, # using overshoot as the z-score analog
is_anomaly=is_anomaly,
severity=severity,
explanation=(
f"{metric_name}: {current_value:.2f} is {overshoot:.1f}× outside "
f"Prophet forecast interval [{yhat_lower:.2f}, {yhat_upper:.2f}]. "
f"Forecast: {yhat:.2f}"
),
)
Feature Drift Detection for ML Systems
When features in production drift from the distribution seen during training, model performance degrades before any downstream signal (like accuracy) is measurable. Detecting drift early, at the data layer, enables retraining before customers feel the impact.
Population Stability Index (PSI)
PSI measures how much a distribution has shifted from a reference distribution. Originally developed for credit risk, it is widely used in ML monitoring:
Interpretation:
- : no significant shift
- : moderate shift - monitor closely
- : significant shift - investigate, likely retrain
import numpy as np
from typing import Union
def compute_psi(
expected: np.ndarray,
actual: np.ndarray,
n_bins: int = 10,
epsilon: float = 1e-4,
) -> float:
"""
Compute Population Stability Index between expected and actual distributions.
Args:
expected: training distribution values (1D array)
actual: current production values (1D array)
n_bins: number of histogram bins
epsilon: small value to avoid log(0)
Returns:
PSI score
"""
# Use quantiles from the expected distribution to define bins
# This ensures bins are meaningful for the reference distribution
breakpoints = np.percentile(expected, np.linspace(0, 100, n_bins + 1))
breakpoints = np.unique(breakpoints) # remove duplicates from constant ranges
if len(breakpoints) < 2:
return 0.0 # degenerate case - constant feature
# Compute proportions in each bin
expected_props = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_props = np.histogram(actual, bins=breakpoints)[0] / len(actual)
# Apply epsilon to avoid division by zero and log(0)
expected_props = np.clip(expected_props, epsilon, None)
actual_props = np.clip(actual_props, epsilon, None)
# Renormalize after clipping
expected_props = expected_props / expected_props.sum()
actual_props = actual_props / actual_props.sum()
psi = np.sum(
(actual_props - expected_props) * np.log(actual_props / expected_props)
)
return float(psi)
def compute_kl_divergence(
expected: np.ndarray,
actual: np.ndarray,
n_bins: int = 20,
epsilon: float = 1e-4,
) -> float:
"""
KL divergence from expected to actual distribution.
Non-symmetric: KL(actual || expected).
Lower is better - 0 means identical distributions.
"""
breakpoints = np.percentile(expected, np.linspace(0, 100, n_bins + 1))
breakpoints = np.unique(breakpoints)
if len(breakpoints) < 2:
return 0.0
p = np.histogram(expected, bins=breakpoints)[0] / len(expected)
q = np.histogram(actual, bins=breakpoints)[0] / len(actual)
p = np.clip(p, epsilon, None)
q = np.clip(q, epsilon, None)
p = p / p.sum()
q = q / q.sum()
return float(np.sum(q * np.log(q / p)))
class FeatureDriftMonitor:
"""
Monitor feature drift between training distribution and current serving distribution.
"""
def __init__(
self,
psi_warn_threshold: float = 0.1,
psi_critical_threshold: float = 0.2,
):
self.psi_warn = psi_warn_threshold
self.psi_critical = psi_critical_threshold
self.training_distributions: dict[str, np.ndarray] = {}
def register_training_distribution(
self,
feature_name: str,
training_values: np.ndarray
) -> None:
"""Store the training distribution for a feature."""
self.training_distributions[feature_name] = training_values
def check_drift(
self,
feature_name: str,
serving_values: np.ndarray,
) -> dict:
"""Check how much a feature has drifted from training."""
if feature_name not in self.training_distributions:
raise ValueError(f"No training distribution registered for {feature_name}")
training = self.training_distributions[feature_name]
psi = compute_psi(training, serving_values)
kl = compute_kl_divergence(training, serving_values)
severity = (
"critical" if psi >= self.psi_critical
else "warning" if psi >= self.psi_warn
else "normal"
)
return {
"feature": feature_name,
"psi": psi,
"kl_divergence": kl,
"severity": severity,
"interpretation": (
f"PSI={psi:.3f}: "
+ ("significant distribution shift - consider retraining"
if psi >= self.psi_critical
else "moderate shift - monitor closely"
if psi >= self.psi_warn
else "stable distribution")
),
"training_mean": float(np.mean(training)),
"serving_mean": float(np.mean(serving_values)),
"mean_shift_pct": abs(np.mean(serving_values) - np.mean(training))
/ max(abs(np.mean(training)), 1e-6) * 100,
}
The Anomaly Detection Pipeline Architecture
Tools: The Commercial vs. Custom Landscape
Commercial Data Observability Tools
Monte Carlo Data: The category-defining platform. Monitors data freshness, volume, schema, distribution, and lineage automatically. ML-powered anomaly detection without configuration. Connects to all major warehouses. Best for teams that want observability without building it themselves. Pricing is per-connector and scales to enterprise.
Bigeye: Column-level metric monitoring with customizable thresholds and ML-powered baselines. Strong dbt integration. Provides "auto-thresholds" that learn from historical data - similar to what you'd build with the IQR/z-score approach, but with a managed service around it.
Anomalo: Focuses on root-cause analysis in addition to detection. When an anomaly fires, it tries to explain which segment (user cohort, region, date range) is driving it. Particularly useful for business metric monitoring.
dbt-profiler: Open-source dbt package that profiles each model and generates summary statistics (column-level null rates, cardinality, distributions). Not anomaly detection per se, but provides the raw metrics that feed anomaly detection systems.
When to Build Custom
Custom makes sense when:
- Your data has domain-specific semantics that commercial tools don't understand
- You need anomaly detection embedded in the pipeline execution itself (not a parallel monitoring layer)
- You want feature drift detection tightly coupled to your ML training infrastructure
- Your data volumes are extreme and commercial tool pricing becomes prohibitive
- You need to detect anomalies that require joining across multiple tables
The stack described in this lesson - Python detectors + a metrics store (InfluxDB, BigQuery, or even a simple Postgres table) + Slack/PagerDuty routing - covers 80% of the use cases at a fraction of the cost.
Common Mistakes
:::danger Alert Fatigue: The Self-Defeating Detector
An anomaly detection system that fires too often is worse than no system at all. Engineers learn to ignore the alerts. When a real anomaly fires, it gets triaged as "probably another false positive" and is ignored.
Signs of alert fatigue:
- Alerts firing multiple times per day
- Engineers dismissing alerts without investigation
- "Known issues" list in Slack that never gets resolved
Fix: use a two-stage threshold (warning → Slack, critical → PagerDuty), calibrate thresholds so critical fires less than once per week per pipeline, and implement alert deduplication (don't re-alert if the same metric is anomalous for five consecutive runs). Quality over quantity: 10 high-signal alerts are infinitely more valuable than 500 noisy ones. :::
:::danger Baseline Contamination: Teaching the Model Bad Data Is Normal
If your historical metrics include periods of bad data, your anomaly detector learns that bad data is normal. The most common scenario: you deploy anomaly detection after an incident, but the baseline training window includes the incident itself.
Example: row counts dropped to 30% of normal for two weeks during a bug. You then train your detector on 90 days of history that includes those two weeks. The detector's baseline mean is now artificially low. Future healthy runs get flagged as "anomalously high" and real incidents look normal.
Fix:
- Before fitting the baseline, exclude known bad periods (maintain an exclusion list)
- Inspect baseline distributions for multimodality (bimodal distribution = two regimes, one probably bad)
- Use median-based statistics (resistant to extreme values) rather than mean-based
- Re-fit the baseline on a rolling window that excludes recently-flagged anomalies :::
:::warning Monitoring Only at the Final Layer
Monitoring only the output of your pipeline - the feature table or serving table - means anomalies propagate through multiple stages before detection. By the time an anomaly in the raw events table surfaces as a problem in the feature store, it may have corrupted days of output.
Monitor at every layer: raw ingestion (did the file arrive? correct schema? row count in range?), intermediate transformations (join key null rate, deduplication counts), and final output (feature distributions, business metric aggregates). Earlier detection means smaller blast radius. :::
Interview Q&A
Q1: Why do static threshold checks fail for pipeline monitoring, and what is the alternative?
Static thresholds are brittle because they encode one moment's understanding of the data into a permanent rule. They fail when data grows (row count threshold becomes too low), when patterns shift seasonally (a Monday threshold doesn't work on Sundays), when the business changes (a new product line changes the expected revenue distribution), or when someone runs a legitimate backfill (timestamps are all in the past by design). Every threshold requires manual recalibration when the data changes, which nobody does proactively.
The alternative is statistical anomaly detection: fit a model of "normal" from historical data (computing mean, standard deviation, IQR from 60-90 days of metric history), and flag observations that deviate significantly from that model. The model adapts with controlled retraining. It handles seasonality with rolling window comparisons (compare to same weekday, not just yesterday). It catches anomalies in combinations of metrics (via Isolation Forest) that no individual threshold would catch. The cost is slightly more complexity; the benefit is vastly better signal-to-noise ratio.
Q2: What is PSI, when do you use it, and how do you interpret it?
PSI (Population Stability Index) measures the shift between two distributions - typically a training distribution and a current serving distribution. You discretize both distributions into the same bins (using the training distribution's quantiles to define bins), compute the proportion of values in each bin for both distributions, and calculate:
Interpretation: PSI below 0.1 means stable (no significant shift), 0.1-0.2 is moderate shift (monitor), above 0.2 is significant shift (investigate, likely need to retrain). PSI is symmetric in the sense that it captures both shifts in individual bins and the direction doesn't matter - it measures total distributional distance. Use PSI for monitoring continuous features. For categorical features, use chi-squared tests or Jensen-Shannon divergence. Run PSI nightly on all features that feed into production models, and trigger retraining workflows when PSI exceeds the threshold.
Q3: How do you avoid alert fatigue in a pipeline anomaly detection system?
Three-part answer: threshold calibration, severity routing, and deduplication. For calibration: set warning thresholds at z-score 2.0 (fires occasionally) and critical at 3.0 (fires rarely). Back-test your thresholds against 90 days of history - if your warning threshold would have fired more than once per week per pipeline historically, it is too sensitive. For severity routing: warnings go to a Slack channel with low urgency, criticals go to PagerDuty and the on-call engineer. Never send warnings to PagerDuty - that instantly trains engineers to ignore PagerDuty. For deduplication: if the same metric is anomalous for three consecutive runs, send one alert not three. Implement a "cooldown" period after an alert fires - don't re-alert until the metric returns to normal and then goes anomalous again. Also: measure your false positive rate and set a team target (below 10% of alerts should be false positives). When you exceed the target, tune the thresholds.
Q4: You're building anomaly detection for a pipeline that has strong weekly seasonality. How do you handle this?
Don't use simple day-over-day comparisons or a global baseline - both will fire constantly on legitimate seasonal variation. Use rolling window comparison: compare today's value to the same weekday in the previous N weeks (typically 4 weeks). This normalizes out weekly patterns. Implement it as: collect Monday values for the last 4 weeks, compute their mean and standard deviation, use that as the baseline for the current Monday. For strong hourly patterns (peak traffic at 11am, low traffic at 3am), add hour-of-day segmentation as well. If you have strong yearly seasonality (holiday spikes, end-of-quarter patterns), Prophet is more appropriate - it models multiple seasonality components simultaneously and produces a prediction interval that naturally widens during high-uncertainty periods (like holidays).
Q5: How do you detect slow feature drift that doesn't trigger any single-day anomaly?
Z-score and IQR detectors catch sudden spikes but miss gradual drift. For slow drift, use CUSUM (Cumulative Sum control chart). CUSUM accumulates deviations from the expected value over time. A single deviation of 1.5 standard deviations might not trigger a z-score alert, but if the same 1.5σ deviation occurs for ten consecutive days, CUSUM accumulates it and triggers an alert on day 4 or 5. This is the correct behavior - sustained moderate drift is more dangerous than an isolated spike because it silently corrupts model performance over weeks. Also, monitor the rolling 7-day mean of PSI scores for each feature. If PSI has been slowly increasing for two weeks - still below the threshold each day but trending upward - that trend is itself a signal worth alerting on.
Q6: What is the baseline contamination problem and how do you prevent it?
Baseline contamination occurs when the historical data used to fit your anomaly detector's baseline includes anomalous periods. If your baseline training window contains two weeks of a pipeline bug that halved row counts, the detector learns that low row counts are normal. Future healthy runs trigger false positives ("anomalously high") and real incidents look normal.
Prevention has three components. First, before training the baseline, audit the historical metric time series visually or statistically - look for multimodality (bimodal distribution suggests two regimes) or known incident periods in runbooks. Exclude identified bad periods from the baseline using an exclusion list. Second, use robust statistics for the baseline: median instead of mean, IQR instead of standard deviation. These are resistant to outliers and reduce the impact of contaminated periods. Third, implement contamination-aware baseline updates: when the detector flags an anomaly that is confirmed (by the on-call engineer or by automated incident resolution), retroactively exclude that period from the rolling baseline window. This prevents confirmed anomalies from corrupting future baselines.
