Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Observability demo on the EngineersOfAI Playground - no code required. :::

Model Monitoring Platform

The Silent Degradation

The recommendation model had been deployed in November. By January, user engagement had dropped 8%. The product team assumed it was a seasonal effect - post-holiday lull. By February, engagement was still depressed. A data scientist ran a manual analysis and found the problem: the model's prediction distribution had shifted significantly. It was recommending products from a narrow category cluster that had been heavily represented in a November promotional event. Two months of drift, accumulating silently.

The root cause was concept drift: user purchase behavior had changed after the holiday season, but the model, trained on pre-holiday data, hadn't adapted. Without monitoring, the only signal was the slowly worsening business metric - and that signal was masked by seasonal variation, making it ambiguous.

With proper monitoring, the drift would have been caught within 24 hours. A PSI score on the recommendation distribution would have alerted on day two. Automated retraining would have triggered. The two months of degradation would have been two days.

This lesson teaches you to build the monitoring infrastructure that catches this class of failure.


The Three Drift Types

Understanding the failure modes is prerequisite to monitoring them:

Data drift: The distribution of input features changes. Example: a "user age" feature that was normally distributed with mean 35 starts skewing toward younger users after a marketing campaign. The model's predictions will change - possibly for the worse - because it's operating in a region of input space it wasn't optimized for.

Prediction drift: The distribution of model outputs changes. Example: a recommendation model that previously spread predictions across 100 products starts concentrating on 20 products. This may indicate data drift upstream or a bug in the serving pipeline.

Concept drift: The relationship between inputs and outcomes changes. Example: a fraud detection model trained before a new fraud pattern emerges. Inputs look "normal" by the model's definition, but they're actually fraudulent. This is the hardest to detect - inputs look fine, predictions look fine, but accuracy is degrading.

The monitoring strategy differs for each type.


Statistical Tests for Drift Detection

Population Stability Index (PSI)

PSI is the most widely used feature distribution monitoring statistic in production ML, especially in finance and e-commerce:

PSI=i=1N(AiEi)ln(AiEi)\text{PSI} = \sum_{i=1}^{N} (A_i - E_i) \ln\left(\frac{A_i}{E_i}\right)

Where AiA_i = fraction in bucket ii (actual, current), EiE_i = fraction in bucket ii (expected, reference/training).

Interpretation:

  • PSI < 0.10: No significant shift
  • 0.10 ≤ PSI < 0.25: Moderate shift - monitor closely
  • PSI ≥ 0.25: Major shift - consider retraining
import numpy as np
from typing import Optional

def compute_psi(
reference: np.ndarray,
current: np.ndarray,
bins: int = 10,
eps: float = 1e-8,
) -> float:
"""
Compute Population Stability Index between reference and current distributions.

Args:
reference: Reference distribution (e.g., training data feature values)
current: Current distribution (e.g., last 7 days of serving data)
bins: Number of bins for discretization
eps: Small value to prevent log(0)

Returns:
PSI score (higher = more drift)
"""
# Use reference distribution to define bins
bin_edges = np.percentile(reference, np.linspace(0, 100, bins + 1))
bin_edges[0] -= eps
bin_edges[-1] += eps

# Count observations in each bin
ref_counts, _ = np.histogram(reference, bins=bin_edges)
cur_counts, _ = np.histogram(current, bins=bin_edges)

# Convert to proportions, adding eps to avoid division by zero
ref_pct = (ref_counts + eps) / (len(reference) + bins * eps)
cur_pct = (cur_counts + eps) / (len(current) + bins * eps)

# PSI formula
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return float(psi)


# Example
reference_ages = np.random.normal(35, 10, 10000) # training data distribution
current_ages = np.random.normal(28, 12, 1000) # current serving distribution

psi = compute_psi(reference_ages, current_ages)
print(f"PSI: {psi:.4f}") # Should be ~0.15–0.30 for this shift

Kolmogorov-Smirnov Test

The KS test detects whether two samples come from the same continuous distribution. More statistically rigorous than PSI, but less interpretable:

from scipy import stats

def ks_drift_test(
reference: np.ndarray,
current: np.ndarray,
significance_level: float = 0.05,
) -> dict:
"""
Kolmogorov-Smirnov test for distribution drift.

Returns whether drift is statistically significant.
"""
ks_stat, p_value = stats.ks_2samp(reference, current)

return {
"ks_statistic": ks_stat,
"p_value": p_value,
"drift_detected": p_value < significance_level,
"interpretation": (
f"Distributions significantly different (p={p_value:.4f})"
if p_value < significance_level
else f"No significant drift detected (p={p_value:.4f})"
),
}

Maximum Mean Discrepancy (MMD)

For high-dimensional feature spaces (embedding vectors), PSI and KS are impractical. MMD measures the distance between distributions in a kernel-mapped feature space:

MMD2=Ex,x[k(x,x)]2Ex,y[k(x,y)]+Ey,y[k(y,y)]\text{MMD}^2 = \mathbb{E}_{x,x'}[k(x,x')] - 2\mathbb{E}_{x,y}[k(x,y)] + \mathbb{E}_{y,y'}[k(y,y')]

def compute_mmd(
reference: np.ndarray, # shape: (n_ref, d)
current: np.ndarray, # shape: (n_cur, d)
kernel: str = "rbf",
gamma: float = 1.0,
) -> float:
"""
Maximum Mean Discrepancy for detecting drift in embedding spaces.
Useful for monitoring text embeddings, image embeddings, etc.
"""
from sklearn.metrics.pairwise import rbf_kernel, polynomial_kernel

if kernel == "rbf":
K = lambda X, Y: rbf_kernel(X, Y, gamma=gamma)
else:
K = lambda X, Y: polynomial_kernel(X, Y)

n_ref = len(reference)
n_cur = len(current)

mmd_squared = (
np.sum(K(reference, reference)) / (n_ref * n_ref)
- 2 * np.sum(K(reference, current)) / (n_ref * n_cur)
+ np.sum(K(current, current)) / (n_cur * n_cur)
)

return float(max(0, mmd_squared) ** 0.5) # Return MMD (not squared)

The Monitoring Architecture


Building the Drift Detector

import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Optional
import json
from pathlib import Path

@dataclass
class DriftAlert:
model_name: str
feature_name: str
drift_type: str # "data", "prediction", "performance"
metric: str # "psi", "ks", "mmd"
score: float
threshold: float
severity: str # "warning", "critical"


class ModelDriftDetector:
"""
Compute and report drift metrics for production models.
Runs as a batch job every hour or continuously on stream.
"""

# Default thresholds
PSI_WARNING = 0.10
PSI_CRITICAL = 0.25
KS_SIGNIFICANCE = 0.05
PREDICTION_DRIFT_WARNING = 0.10
PREDICTION_DRIFT_CRITICAL = 0.25

def __init__(
self,
model_name: str,
reference_data_path: str,
reference_predictions_path: str,
):
self.model_name = model_name

# Load reference distributions from training data
self.reference_features = pd.read_parquet(reference_data_path)
ref_preds = np.load(reference_predictions_path)
self.reference_predictions = ref_preds

def detect_feature_drift(
self,
current_features: pd.DataFrame,
) -> list[DriftAlert]:
"""Compute PSI for each numeric feature."""
alerts = []

for col in current_features.select_dtypes(include=[np.number]).columns:
if col not in self.reference_features.columns:
continue

psi = compute_psi(
self.reference_features[col].dropna().values,
current_features[col].dropna().values,
)

if psi >= self.PSI_CRITICAL:
alerts.append(DriftAlert(
model_name=self.model_name,
feature_name=col,
drift_type="data",
metric="psi",
score=psi,
threshold=self.PSI_CRITICAL,
severity="critical",
))
elif psi >= self.PSI_WARNING:
alerts.append(DriftAlert(
model_name=self.model_name,
feature_name=col,
drift_type="data",
metric="psi",
score=psi,
threshold=self.PSI_WARNING,
severity="warning",
))

return alerts

def detect_prediction_drift(
self,
current_predictions: np.ndarray,
) -> list[DriftAlert]:
"""Compute prediction distribution drift."""
alerts = []

psi = compute_psi(self.reference_predictions, current_predictions)

if psi >= self.PREDICTION_DRIFT_CRITICAL:
alerts.append(DriftAlert(
model_name=self.model_name,
feature_name="__predictions__",
drift_type="prediction",
metric="psi",
score=psi,
threshold=self.PREDICTION_DRIFT_CRITICAL,
severity="critical",
))

# KS test for additional confirmation
ks_result = ks_drift_test(self.reference_predictions, current_predictions)
if ks_result["drift_detected"] and psi >= self.PREDICTION_DRIFT_WARNING:
alerts.append(DriftAlert(
model_name=self.model_name,
feature_name="__predictions__",
drift_type="prediction",
metric="ks",
score=ks_result["ks_statistic"],
threshold=self.KS_SIGNIFICANCE,
severity="warning",
))

return alerts

def detect_performance_drift(
self,
labels_with_predictions: pd.DataFrame, # columns: label, prediction, timestamp
) -> Optional[DriftAlert]:
"""
Detect accuracy degradation using labeled feedback.
Only possible when ground truth is available (delayed labels).
"""
from sklearn.metrics import accuracy_score

# Compute accuracy on recent labeled examples
if len(labels_with_predictions) < 100:
return None # not enough labeled data

recent_accuracy = accuracy_score(
labels_with_predictions["label"],
labels_with_predictions["prediction"].round(),
)

# Compare to reference accuracy (from model registry)
reference_accuracy = self._get_reference_accuracy()

if reference_accuracy and (reference_accuracy - recent_accuracy) > 0.05:
return DriftAlert(
model_name=self.model_name,
feature_name="__accuracy__",
drift_type="performance",
metric="accuracy",
score=recent_accuracy,
threshold=reference_accuracy - 0.05,
severity="critical" if (reference_accuracy - recent_accuracy) > 0.10 else "warning",
)

return None

def _get_reference_accuracy(self) -> Optional[float]:
"""Fetch reference accuracy from MLflow model registry."""
import mlflow
client = mlflow.MlflowClient()
versions = client.get_latest_versions(self.model_name, stages=["Production"])
if not versions:
return None
run = client.get_run(versions[0].run_id)
return run.data.metrics.get("val_accuracy")

Alert Routing and Root Cause Analysis

Not all alerts deserve the same response. A data drift alert on a low-importance feature is different from performance degradation on the primary metric:

from enum import Enum

class AlertSeverity(Enum):
INFO = 1
WARNING = 2
CRITICAL = 3

class AlertRouter:
"""
Route drift alerts to appropriate responders with context.
"""

def route(self, alert: DriftAlert) -> dict:
"""Determine where to send alert and what to include."""

if alert.drift_type == "performance" and alert.severity == "critical":
return {
"channel": "pagerduty",
"urgency": "high",
"message": self._build_critical_message(alert),
"runbook": "https://wiki.internal/ml-runbooks/model-degradation",
}
elif alert.severity == "critical":
return {
"channel": "slack:#ml-alerts",
"urgency": "medium",
"message": self._build_drift_message(alert),
}
else:
return {
"channel": "slack:#ml-monitoring",
"urgency": "low",
"message": self._build_warning_message(alert),
}

def _build_drift_message(self, alert: DriftAlert) -> str:
return (
f":warning: *Model Drift Detected*\n"
f"Model: `{alert.model_name}`\n"
f"Feature: `{alert.feature_name}`\n"
f"Drift Type: {alert.drift_type}\n"
f"PSI Score: {alert.score:.3f} (threshold: {alert.threshold:.3f})\n"
f"Action: Review input data pipeline for upstream changes\n"
f"Dashboard: <https://grafana.internal/d/ml-monitoring|Model Monitoring>"
)

def _build_critical_message(self, alert: DriftAlert) -> str:
return (
f"CRITICAL: Model accuracy degraded below threshold\n"
f"Model: {alert.model_name}\n"
f"Current accuracy: {alert.score:.3f}\n"
f"Threshold: {alert.threshold:.3f}\n"
f"Action required: Investigate and consider rollback or emergency retrain"
)

Monitoring Dashboard Design

A good ML monitoring dashboard answers five questions at a glance:

  1. Is any model in trouble right now? - Health status for all production models
  2. What changed recently? - Feature drift trends over last 7 days
  3. Is the issue data or model? - Separate data drift vs prediction drift views
  4. How does it compare to history? - Baseline comparison with training distribution
  5. What is the business impact? - Model metrics alongside business metrics
# Prometheus metrics for ML monitoring
from prometheus_client import Gauge, Counter, Histogram, REGISTRY

class MLMonitoringMetrics:
"""Prometheus metrics for production model monitoring."""

def __init__(self, model_name: str):
self.model_name = model_name
prefix = "ml_model"

# Feature drift
self.feature_psi = Gauge(
f"{prefix}_feature_psi",
"PSI drift score per feature",
["model", "feature"],
)

# Prediction drift
self.prediction_psi = Gauge(
f"{prefix}_prediction_psi",
"PSI drift score for model predictions",
["model"],
)

# Performance
self.accuracy = Gauge(
f"{prefix}_accuracy",
"Model accuracy on recent labeled examples",
["model"],
)

# Serving health
self.latency_p50 = Histogram(
f"{prefix}_latency_ms",
"Inference latency in milliseconds",
["model"],
buckets=[5, 10, 25, 50, 100, 200, 500, 1000],
)
self.error_rate = Gauge(
f"{prefix}_error_rate",
"Fraction of failed inference requests",
["model"],
)

def record_drift_check(self, feature: str, psi: float):
self.feature_psi.labels(model=self.model_name, feature=feature).set(psi)

Common Mistakes

:::danger Monitoring only technical metrics (latency, error rate), not model quality A model can have perfect latency and zero errors while returning completely wrong predictions. Technical health monitoring (is the endpoint up? is it fast?) is necessary but not sufficient. Always monitor the prediction distribution and, when labels are available, actual model accuracy. The silent degradation scenario - wrong predictions with no technical errors - is the most damaging and most common failure mode. :::

:::warning Running drift checks too infrequently Daily drift checks can miss acute failures. A training data bug deployed at 9 AM will degrade predictions for 24 hours before being detected. Run drift checks at least hourly for production models, continuously for critical models. The compute cost is minimal - PSI computation on 10,000 samples takes milliseconds. :::

:::danger Setting thresholds without understanding your baseline variance A PSI of 0.15 might be normal weekly variance for a highly seasonal model. Applying the same threshold to a stable model versus a seasonal model generates too many false positives on one and misses real drift on the other. Calibrate thresholds using historical data - the alert threshold should be set at the 99th percentile of historical drift scores on non-degraded periods. :::


Interview Q&A

Q: What is the difference between data drift, prediction drift, and concept drift?

A: Three distinct phenomena with different causes and monitoring approaches. Data drift (also called covariate shift): the input feature distribution changes - for example, your user age distribution shifts younger after a marketing campaign. The model wasn't trained on this region of input space and may perform worse. Monitored via PSI or KS test on feature distributions. Prediction drift: the distribution of model outputs changes - the model starts recommending different products or making different fraud decisions - which may be a symptom of data drift or a serving bug. Monitored via PSI on prediction distributions. Concept drift: the relationship between inputs and outcomes changes - fraud patterns evolve, user preferences shift - so the model's learned mapping is no longer accurate even when inputs look normal. The hardest to detect because it requires ground truth labels, which arrive with delay.

Q: How do you compute PSI and what does it tell you?

A: PSI (Population Stability Index) measures how much a distribution has changed between a reference period (typically training data) and a current period (recent serving data). Compute it by: (1) discretize both distributions into the same bins defined by the reference data percentiles; (2) compute the fraction of observations in each bin for both distributions; (3) for each bin, compute (current_pct - reference_pct) × ln(current_pct / reference_pct); (4) sum across bins. Interpretation: PSI below 0.10 means no significant change, 0.10–0.25 means moderate change worth monitoring, above 0.25 means major change that warrants investigation and likely retraining. PSI is unit-free, applies to any feature type after discretization, and has well-understood thresholds from decades of use in financial modeling - which is why it's the standard in production ML monitoring.

Q: How would you build a system that detects model degradation within 24 hours?

A: Four components. First, prediction logging: every inference request logs the input features and prediction to a stream (Kafka/Kinesis) - this is the data source for all monitoring. Second, drift computation job: runs hourly, computes PSI for each feature comparing the last 24 hours of serving data to the training reference distribution. Third, alert system: when PSI exceeds threshold, triggers a Slack/PagerDuty alert with the feature name, drift score, and a link to the monitoring dashboard. Fourth, labeled feedback loop: for use cases where ground truth arrives with delay (fraud outcomes, user purchases), compute accuracy on recently labeled examples and alert when accuracy drops below training performance minus a threshold. The hourly cadence ensures detection within 1–2 hours of a real drift event. For critical models, run continuously.

Q: How do you handle model monitoring for LLM-based applications?

A: LLMs add complexity because the output is text, not a simple prediction. Three monitoring approaches. First, distribution monitoring on structured metadata: even if you can't compute PSI on text outputs, you can monitor output length distribution, refusal rate (how often the model refuses to answer), response latency, and token count distribution. Second, LLM-as-judge quality monitoring: sample 1–5% of production outputs, send them to a judge LLM (GPT-4 or a fine-tuned classifier) that scores them on a rubric. Track the average quality score over time. Third, task-specific metrics: for RAG systems, track retrieval relevance scores. For classification-framed LLM tasks, track the distribution of output categories. The core principle is the same as for traditional models: define what "good behavior" looks like quantitatively, measure it continuously, and alert on degradation.

Q: What is your incident response process when a model drift alert fires?

A: Four-step process. First, assess severity: is this data drift (PSI spike on input features), prediction drift (model outputs shifting), or performance degradation (accuracy dropping)? Pull the monitoring dashboard to characterize the drift. Second, identify the change: check deployment timeline - was there a model update, a data pipeline change, or an upstream schema change in the last 24 hours? Check feature pipeline logs for any errors or schema changes. Third, decide on immediate action: if accuracy is critically degraded and there's a recent deployment, rollback the model via the registry. If drift is moderate, increase monitoring frequency and trigger manual investigation. Fourth, root cause and fix: once the acute incident is resolved, trace back to the root cause - which input features drifted, which upstream source changed, and why. Fix the upstream issue, retrain the model on recent data, and validate before redeploying.

© 2026 EngineersOfAI. All rights reserved.