:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required. :::
Alerting on LLM Quality Degradation
The Six-Hour Blind Spot
It was a Tuesday in October. The AI-powered search feature had been degrading since 10:14 AM. By 10:17, every response was quoting confidently from the wrong section of the product documentation. By noon, users were getting pricing information that was 18 months out of date. By 2 PM, two enterprise trial users had made internal purchasing recommendations based on the AI's hallucinated outputs.
The on-call engineer's dashboard showed: latency 1.8 seconds (normal), error rate 0.06% (normal), P99 2.4 seconds (normal), availability 99.99% (normal). Everything was green. The engineer was working on an unrelated feature when a Slack message arrived from customer success: "Getting a lot of tickets about wrong pricing. The AI seems confident but the numbers are wrong."
Investigation revealed the root cause: a "harmless" refactoring of the context assembly pipeline, deployed at 10:12 AM, had introduced an off-by-one error in the document chunk selection logic. The system was now truncating the first 512 tokens of each retrieved document - including the section headers that told the model which document it was reading. The model, reading bodyless context, filled in the gaps from parametric memory. The parametric memory held the old pricing.
The fix was a two-line change. It took ten minutes to deploy once discovered. Discovery took six hours.
This is the failure mode that AI quality alerting is designed to prevent. Operational metrics - latency, errors, availability - tell you whether the infrastructure is working. They tell you nothing about whether the model outputs are correct. Quality degradation is silent from an infrastructure perspective. It only surfaces when users or customers notice - and by then, the damage compounds with every response.
Quality alerting closes this gap. The goal is to detect statistical shifts in AI output quality metrics - faithfulness, relevance, hallucination rate, toxicity - and notify engineers before users do. In the above incident, a properly configured faithfulness alert would have fired at approximately 10:25 AM, less than fifteen minutes after the regression began.
This lesson covers the full alerting stack: architecture, threshold-based alerts, statistical process control, ML anomaly detection, notification routing, Prometheus/Grafana integration, runbook design, and deployment correlation.
The Quality Alerting Stack
Alert Severity Matrix
| Severity | Metric Condition | Sustain Duration | Notification | On-Call SLA |
|---|---|---|---|---|
| P0 | Toxicity rate > 1% OR faithfulness < 0.40 | 5 min | PagerDuty page | Wake immediately |
| P1 | Faithfulness < 0.65 OR hallucination rate > 25% | 15 min | PagerDuty page | 15 min response |
| P2 | Faithfulness < 0.75 OR relevance < 0.60 | 30 min | Slack team channel | Next business hour |
| P3 | Any metric trending down (SPC Rule 4) | 45 min | Slack digest | Daily review |
| Watch | Metric approaching threshold (within 10%) | N/A | Dashboard only | No action required |
Threshold-Based Alerting
The simplest and most interpretable approach. Define fixed thresholds on rolling average quality metrics, require a minimum sample count to avoid noise, and require a sustained duration to avoid transient spikes:
# alerting/threshold_engine.py
import anthropic
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from collections import deque
from enum import Enum
import statistics
import asyncio
from typing import Optional, Callable
class Comparison(str, Enum):
LT = "lt" # alert when metric < threshold
GT = "gt" # alert when metric > threshold
@dataclass
class QualityAlertRule:
"""Defines a single threshold-based alert rule."""
rule_id: str
metric: str
threshold: float
comparison: Comparison
window: timedelta # rolling window for averaging
min_samples: int # minimum samples required (prevents false alerts during low traffic)
sustain_duration: timedelta # must violate threshold for this long before alerting
severity: str # P0, P1, P2, P3
title: str
description: str
runbook_url: str
feature: Optional[str] = None # None = all features, str = specific feature
@dataclass
class AlertFiring:
"""Tracks an in-progress alert condition."""
rule_id: str
first_violated_at: datetime
last_value: float
sample_count: int
# Standard alert rules for LLM quality monitoring
STANDARD_ALERT_RULES = [
QualityAlertRule(
rule_id="toxicity_critical",
metric="toxicity_rate",
threshold=0.01,
comparison=Comparison.GT,
window=timedelta(minutes=30),
min_samples=10,
sustain_duration=timedelta(minutes=5),
severity="P0",
title="CRITICAL: Toxicity rate above 1%",
description="Users are receiving toxic or harmful content. This requires immediate investigation.",
runbook_url="https://wiki.internal/runbooks/llm-toxicity"
),
QualityAlertRule(
rule_id="faithfulness_critical",
metric="faithfulness",
threshold=0.40,
comparison=Comparison.LT,
window=timedelta(hours=1),
min_samples=20,
sustain_duration=timedelta(minutes=10),
severity="P0",
title="CRITICAL: Faithfulness collapsed (< 0.40)",
description="Model is heavily hallucinating. Widespread incorrect information being served.",
runbook_url="https://wiki.internal/runbooks/llm-faithfulness"
),
QualityAlertRule(
rule_id="faithfulness_degraded",
metric="faithfulness",
threshold=0.65,
comparison=Comparison.LT,
window=timedelta(hours=1),
min_samples=20,
sustain_duration=timedelta(minutes=15),
severity="P1",
title="Faithfulness degraded (< 0.65)",
description="Significant hallucination increase. Check for recent deployments or RAG changes.",
runbook_url="https://wiki.internal/runbooks/llm-faithfulness"
),
QualityAlertRule(
rule_id="faithfulness_warning",
metric="faithfulness",
threshold=0.75,
comparison=Comparison.LT,
window=timedelta(hours=1),
min_samples=30,
sustain_duration=timedelta(minutes=30),
severity="P2",
title="Faithfulness below warning threshold (< 0.75)",
description="Quality degradation in progress. Monitor closely.",
runbook_url="https://wiki.internal/runbooks/llm-faithfulness"
),
QualityAlertRule(
rule_id="hallucination_rate_high",
metric="hallucination_rate",
threshold=0.25,
comparison=Comparison.GT,
window=timedelta(hours=1),
min_samples=20,
sustain_duration=timedelta(minutes=15),
severity="P1",
title="Hallucination rate > 25%",
description="Over 1 in 4 responses contains at least one hallucinated claim.",
runbook_url="https://wiki.internal/runbooks/llm-hallucination"
),
QualityAlertRule(
rule_id="relevance_degraded",
metric="answer_relevance",
threshold=0.60,
comparison=Comparison.LT,
window=timedelta(hours=1),
min_samples=30,
sustain_duration=timedelta(minutes=30),
severity="P2",
title="Answer relevance degraded (< 0.60)",
description="Responses are not addressing user questions. Possible prompt or retrieval issue.",
runbook_url="https://wiki.internal/runbooks/llm-relevance"
),
QualityAlertRule(
rule_id="user_satisfaction_drop",
metric="user_satisfaction_rate",
threshold=0.50,
comparison=Comparison.LT,
window=timedelta(hours=4),
min_samples=50,
sustain_duration=timedelta(hours=1),
severity="P2",
title="User satisfaction (thumbs-up rate) below 50%",
description="Explicit user feedback indicates quality below acceptable threshold.",
runbook_url="https://wiki.internal/runbooks/llm-user-satisfaction"
),
]
class ThresholdAlertEngine:
"""
Evaluates threshold-based alert rules on streaming quality metrics.
Supports per-feature rules and global rules.
"""
def __init__(
self,
rules: list[QualityAlertRule],
notifier,
deployment_correlator=None
):
self.rules = rules
self.notifier = notifier
self.deployment_correlator = deployment_correlator
# Per-metric, per-feature ring buffers of (timestamp, value)
self._windows: dict[str, deque] = {}
# Track when a violation was first detected, per rule
self._firing: dict[str, AlertFiring] = {}
# Track fired alerts to prevent re-firing until resolved
self._alerted: set[str] = set()
def record(
self,
metric: str,
value: float,
feature: str = "default",
timestamp: Optional[datetime] = None
) -> None:
"""Record a new quality metric observation."""
ts = timestamp or datetime.now(timezone.utc)
key = f"{metric}::{feature}"
if key not in self._windows:
self._windows[key] = deque()
self._windows[key].append((ts, value))
# Also record in the global (all-features) window
global_key = f"{metric}::*"
if global_key not in self._windows:
self._windows[global_key] = deque()
self._windows[global_key].append((ts, value))
async def evaluate(self) -> list[dict]:
"""
Evaluate all alert rules. Call this on a fixed schedule (every 30-60 seconds).
Returns list of newly fired alerts.
"""
now = datetime.now(timezone.utc)
fired_alerts = []
for rule in self.rules:
feature_scope = rule.feature or "*"
key = f"{rule.metric}::{feature_scope}"
window_data = self._windows.get(key, deque())
# Filter to the rule's time window
cutoff = now - rule.window
recent = [(ts, v) for ts, v in window_data if ts > cutoff]
# Enforce minimum sample count
if len(recent) < rule.min_samples:
continue
values = [v for _, v in recent]
mean_value = statistics.mean(values)
# Check threshold violation
violated = (
(rule.comparison == Comparison.LT and mean_value < rule.threshold) or
(rule.comparison == Comparison.GT and mean_value > rule.threshold)
)
if violated:
if rule.rule_id not in self._firing:
# First violation - start the sustain timer
self._firing[rule.rule_id] = AlertFiring(
rule_id=rule.rule_id,
first_violated_at=now,
last_value=mean_value,
sample_count=len(recent)
)
else:
# Update the firing record
self._firing[rule.rule_id].last_value = mean_value
self._firing[rule.rule_id].sample_count = len(recent)
firing = self._firing[rule.rule_id]
# Check if it has been sustained long enough
sustained = (now - firing.first_violated_at) >= rule.sustain_duration
if sustained and rule.rule_id not in self._alerted:
# Get deployment correlation context
deployment_context = None
if self.deployment_correlator:
deployment_context = await self.deployment_correlator.check(
metric=rule.metric,
degradation_start=firing.first_violated_at
)
# Fire the alert
alert = await self.notifier.send_alert(
rule=rule,
current_value=mean_value,
sample_count=len(recent),
sustained_since=firing.first_violated_at,
deployment_context=deployment_context
)
self._alerted.add(rule.rule_id)
fired_alerts.append(alert)
else:
# Condition resolved - clear the firing state
if rule.rule_id in self._firing:
del self._firing[rule.rule_id]
if rule.rule_id in self._alerted:
self._alerted.discard(rule.rule_id)
# Send resolution notification
await self.notifier.send_resolution(rule, mean_value)
return fired_alerts
:::tip The min_samples Guard Prevents the Most Common False Positive
If your evaluation pipeline has a hiccup and produces 2 samples in a 1-hour window, those 2 samples might produce a mean faithfulness of 0.2 - triggering a P0 alert at 3 AM. Always require min_samples >= 20 for P0/P1 alerts. The min_samples guard is not optional.
:::
Statistical Process Control (SPC)
Fixed thresholds have a fundamental flaw: they don't adapt to your system's baseline. If your faithful baseline is 0.88, an alert threshold of 0.70 means you need an 18-point drop before being notified. A drop from 0.88 to 0.77 - 11 points, a meaningful regression - would never trigger an alert.
Statistical Process Control solves this by treating your historical metric distribution as the definition of "normal" and alerting on statistically unusual deviations, regardless of the absolute value:
# alerting/spc_engine.py
from dataclasses import dataclass, field
from collections import deque
import statistics
from typing import Optional
@dataclass
class ControlLimits:
mean: float
std: float
ucl_1sigma: float # mean + 1σ
ucl_2sigma: float # mean + 2σ
ucl_3sigma: float # mean + 3σ (Upper Control Limit)
lcl_1sigma: float # mean - 1σ
lcl_2sigma: float # mean - 2σ
lcl_3sigma: float # mean - 3σ (Lower Control Limit)
@dataclass
class SPCViolation:
rule_name: str
severity: str
description: str
values_involved: list[float]
class SPCAlertEngine:
"""
Statistical Process Control for LLM quality metrics.
Uses the Western Electric rules to detect out-of-control conditions.
These rules detect unusual PATTERNS, not just individual outliers -
making them far more sensitive to gradual drift than threshold alerts.
For AI quality metrics, we care about the LOWER tail (quality going down),
so we apply rules specifically to detect downward deviations.
"""
MIN_TRAINING_SAMPLES = 50 # need enough history to establish reliable limits
TRAINING_WINDOW = 1000 # use last N observations for control limits
def __init__(self):
self.history: deque = deque(maxlen=self.TRAINING_WINDOW)
self.limits: Optional[ControlLimits] = None
self._recent_values: list[float] = [] # last 8 values for rule evaluation
def add_observation(self, value: float) -> list[SPCViolation]:
"""
Add a new observation and check for SPC violations.
Returns any violations detected in the current observation context.
"""
self.history.append(value)
# Maintain a recent values buffer for pattern detection
self._recent_values.append(value)
if len(self._recent_values) > 8:
self._recent_values.pop(0)
# Recompute control limits periodically (every 50 observations)
if len(self.history) >= self.MIN_TRAINING_SAMPLES:
if len(self.history) % 50 == 0 or self.limits is None:
self._compute_control_limits()
if self.limits is None:
return [] # Not enough data yet
return self._check_western_electric_rules(value)
def _compute_control_limits(self) -> None:
"""Compute control limits from historical data."""
values = list(self.history)
mean = statistics.mean(values)
std = statistics.stdev(values)
self.limits = ControlLimits(
mean=mean, std=std,
ucl_1sigma=mean + std,
ucl_2sigma=mean + 2 * std,
ucl_3sigma=mean + 3 * std,
lcl_1sigma=mean - std,
lcl_2sigma=mean - 2 * std,
lcl_3sigma=mean - 3 * std
)
def _check_western_electric_rules(self, latest_value: float) -> list[SPCViolation]:
"""
Apply Western Electric rules to detect out-of-control conditions.
These rules are designed to detect patterns that indicate the process
has shifted from its historical norm, with sensitivity calibrated to
minimize false positives while catching real regressions quickly.
"""
violations = []
L = self.limits
recent = self._recent_values
# --- RULE 1: Single point beyond 3σ ---
# The most extreme case: an outlier so far from normal it's almost certainly real.
if latest_value < L.lcl_3sigma:
violations.append(SPCViolation(
rule_name="Rule 1: Point beyond 3σ",
severity="P1",
description=f"Latest value {latest_value:.3f} is beyond LCL "
f"({L.lcl_3sigma:.3f}). Extreme outlier.",
values_involved=[latest_value]
))
# --- RULE 2: 2 of 3 consecutive points beyond 2σ on same side ---
# Early warning: the process is trending toward a control limit.
if len(recent) >= 3:
last_3 = recent[-3:]
count_below_2sigma = sum(1 for v in last_3 if v < L.lcl_2sigma)
if count_below_2sigma >= 2:
violations.append(SPCViolation(
rule_name="Rule 2: 2 of 3 points below 2σ",
severity="P2",
description=f"2 of last 3 observations below 2σ limit "
f"({L.lcl_2sigma:.3f}). Quality trending down.",
values_involved=last_3
))
# --- RULE 3: 4 of 5 consecutive points beyond 1σ on same side ---
# Detects a shift in the process mean - sustained mild degradation.
if len(recent) >= 5:
last_5 = recent[-5:]
count_below_1sigma = sum(1 for v in last_5 if v < L.lcl_1sigma)
if count_below_1sigma >= 4:
violations.append(SPCViolation(
rule_name="Rule 3: 4 of 5 points below 1σ",
severity="P2",
description=f"4 of last 5 observations below 1σ limit "
f"({L.lcl_1sigma:.3f}). Probable mean shift.",
values_involved=last_5
))
# --- RULE 4: 8 consecutive points on same side of mean ---
# Sustained degradation: the mean has permanently shifted. Very high confidence.
if len(recent) >= 8:
last_8 = recent[-8:]
all_below_mean = all(v < L.mean for v in last_8)
if all_below_mean:
violations.append(SPCViolation(
rule_name="Rule 4: 8 consecutive points below mean",
severity="P1",
description=f"8 consecutive observations below historical mean "
f"({L.mean:.3f}). Confirmed sustained degradation.",
values_involved=last_8
))
return violations
def get_control_chart_data(self) -> dict:
"""Returns current control limits for dashboard visualization."""
if not self.limits:
return {}
L = self.limits
return {
"mean": round(L.mean, 4),
"ucl": round(L.ucl_3sigma, 4),
"lcl": round(L.lcl_3sigma, 4),
"warning_ucl": round(L.ucl_2sigma, 4),
"warning_lcl": round(L.lcl_2sigma, 4),
"sample_size": len(self.history)
}
Comparing Alert Strategies
| Strategy | Sensitivity | False Positive Risk | Setup Effort | Best For |
|---|---|---|---|---|
| Fixed threshold | Low (only catches large drops) | Low if baseline is stable | Low | Safety floors (toxicity, P0 thresholds) |
| SPC (Western Electric) | High (catches trends and small shifts) | Medium (requires stable baseline) | Medium | Ongoing quality monitoring |
| ML anomaly detection | Very high (catches complex patterns) | Medium-High (needs tuning) | High | High-volume systems with seasonal patterns |
| Deployment correlation | N/A (not an alert type) | N/A | Medium | Root cause identification |
ML-Based Anomaly Detection
For production systems with complex temporal patterns (traffic spikes at certain hours, day-of-week variation, seasonal effects), ML-based anomaly detection captures multi-dimensional anomalies that simple threshold and SPC approaches miss:
# alerting/anomaly_detector.py
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional
@dataclass
class AnomalyResult:
is_anomalous: bool
anomaly_score: float # more negative = more anomalous (IsolationForest convention)
anomaly_percentile: float # percentile of anomaly_score in training distribution
contributing_features: list[str]
explanation: str
class QualityAnomalyDetector:
"""
ML-based anomaly detection using Isolation Forest.
Features: time-of-day, day-of-week, traffic volume, plus quality metrics.
The model learns what "normal quality at this time of day with this traffic
level" looks like, and flags deviations from that baseline.
This catches cases where quality is below the absolute threshold but well
within normal for that time period, vs. quality that's high by absolute
threshold but anomalously low for that context.
"""
def __init__(self, contamination: float = 0.05):
"""
contamination: expected fraction of anomalies in the training data.
0.05 = expect about 5% of historical points to be anomalies.
"""
self.model = IsolationForest(
contamination=contamination,
n_estimators=200,
max_samples="auto",
random_state=42
)
self.scaler = StandardScaler()
self.trained = False
self._training_scores: Optional[np.ndarray] = None
self._feature_names = [
"hour_sin", "hour_cos", # time-of-day (cyclical encoding)
"weekday_sin", "weekday_cos", # day-of-week (cyclical encoding)
"log_request_count", # traffic volume (log scale)
"faithfulness",
"answer_relevance",
"context_utilization",
"hallucination_rate",
"p99_latency_ms"
]
def train(self, historical_windows: list[dict]) -> None:
"""
Train on historical quality metric windows.
Each window: {
"timestamp": datetime,
"request_count": int,
"faithfulness": float,
"answer_relevance": float,
"context_utilization": float,
"hallucination_rate": float,
"p99_latency_ms": float
}
"""
if len(historical_windows) < 200:
print("Warning: < 200 training windows. Anomaly detection may be unreliable.")
return
features = self._extract_features(historical_windows)
features_scaled = self.scaler.fit_transform(features)
self.model.fit(features_scaled)
# Store training scores to compute percentiles for new observations
self._training_scores = self.model.score_samples(features_scaled)
self.trained = True
print(f"Trained anomaly detector on {len(historical_windows)} windows")
print(f"Training score range: [{self._training_scores.min():.3f}, "
f"{self._training_scores.max():.3f}]")
def detect(self, current_window: dict) -> AnomalyResult:
"""Detect if the current metrics window is anomalous."""
if not self.trained:
return AnomalyResult(
is_anomalous=False,
anomaly_score=0.0,
anomaly_percentile=0.5,
contributing_features=[],
explanation="Model not yet trained - insufficient historical data."
)
features = self._extract_features([current_window])
features_scaled = self.scaler.transform(features)
score = float(self.model.score_samples(features_scaled)[0])
prediction = int(self.model.predict(features_scaled)[0]) # -1 = anomaly
is_anomalous = prediction == -1
# Compute percentile against training scores
if self._training_scores is not None:
percentile = float(np.mean(self._training_scores >= score))
else:
percentile = 0.5
# Identify which features contribute most to the anomaly
contributing = self._identify_contributing_features(
current_window, features_scaled
)
explanation = self._generate_explanation(
is_anomalous, score, percentile, contributing, current_window
)
return AnomalyResult(
is_anomalous=is_anomalous,
anomaly_score=round(score, 4),
anomaly_percentile=round(percentile, 3),
contributing_features=contributing,
explanation=explanation
)
def _extract_features(self, windows: list[dict]) -> np.ndarray:
rows = []
for w in windows:
ts = w.get("timestamp", datetime.now(timezone.utc))
hour = ts.hour
weekday = ts.weekday()
# Cyclical encoding to avoid artificial discontinuities at midnight/Sunday
hour_sin = np.sin(2 * np.pi * hour / 24)
hour_cos = np.cos(2 * np.pi * hour / 24)
weekday_sin = np.sin(2 * np.pi * weekday / 7)
weekday_cos = np.cos(2 * np.pi * weekday / 7)
request_count = max(1, w.get("request_count", 1))
log_requests = np.log10(request_count)
rows.append([
hour_sin, hour_cos,
weekday_sin, weekday_cos,
log_requests,
w.get("faithfulness", 0.8),
w.get("answer_relevance", 0.75),
w.get("context_utilization", 0.7),
w.get("hallucination_rate", 0.1),
w.get("p99_latency_ms", 2000) / 10000.0, # normalize
])
return np.array(rows)
def _identify_contributing_features(
self,
window: dict,
features_scaled: np.ndarray
) -> list[str]:
"""
Identify which features deviate most from their expected range.
A simplified approximation - production systems use SHAP values.
"""
feature_values = features_scaled[0]
# Flag features > 2 standard deviations from zero (scaled mean)
contributing = [
self._feature_names[i]
for i, v in enumerate(feature_values)
if abs(v) > 2.0 and i >= 4 # skip time features
]
return contributing
def _generate_explanation(
self,
is_anomalous: bool,
score: float,
percentile: float,
contributing: list[str],
window: dict
) -> str:
if not is_anomalous:
return "Metrics within normal range for this time period and traffic level."
feature_desc = ", ".join(contributing) if contributing else "multiple metrics"
return (
f"Anomaly detected at {percentile*100:.0f}th percentile of training distribution "
f"(score={score:.3f}). Contributing features: {feature_desc}. "
f"Faithfulness={window.get('faithfulness', 'N/A'):.3f}, "
f"Hallucination rate={window.get('hallucination_rate', 'N/A'):.3f}."
)
Notification Routing
# alerting/notifier.py
import anthropic
import httpx
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional
import json
@dataclass
class AlertContext:
rule_id: str
severity: str
title: str
description: str
metric: str
current_value: float
threshold: float
sample_count: int
sustained_since: datetime
runbook_url: str
deployment_context: Optional[dict] = None
class AlertNotifier:
"""
Routes quality alerts to appropriate channels based on severity.
P0/P1 -> PagerDuty (wake someone up)
P2/P3 -> Slack (team awareness, no wakeup)
All levels -> Prometheus metric update
"""
def __init__(
self,
pagerduty_routing_key: Optional[str] = None,
slack_webhook_url: Optional[str] = None,
prometheus_push_gateway: Optional[str] = None
):
self.pagerduty_key = pagerduty_routing_key
self.slack_webhook = slack_webhook_url
self.prometheus_gateway = prometheus_push_gateway
async def send_alert(
self,
rule,
current_value: float,
sample_count: int,
sustained_since: datetime,
deployment_context: Optional[dict] = None
) -> dict:
"""Send alert to appropriate channels based on severity."""
context = AlertContext(
rule_id=rule.rule_id,
severity=rule.severity,
title=rule.title,
description=rule.description,
metric=rule.metric,
current_value=current_value,
threshold=rule.threshold,
sample_count=sample_count,
sustained_since=sustained_since,
runbook_url=rule.runbook_url,
deployment_context=deployment_context
)
if rule.severity in ("P0", "P1"):
await self._page_pagerduty(context)
if rule.severity in ("P0", "P1", "P2", "P3"):
await self._post_slack(context)
return {
"rule_id": rule.rule_id,
"severity": rule.severity,
"fired_at": datetime.now(timezone.utc).isoformat(),
"current_value": current_value
}
async def send_resolution(self, rule, resolved_value: float) -> None:
"""Notify that an alert has resolved."""
if self.slack_webhook:
payload = {
"attachments": [{
"color": "#16a34a",
"title": f"RESOLVED: {rule.title}",
"text": (
f"Alert `{rule.rule_id}` has resolved.\n"
f"Current value: `{resolved_value:.4f}` "
f"(threshold: `{rule.threshold}`)"
),
"footer": "AI Quality Monitoring",
"ts": int(datetime.now(timezone.utc).timestamp())
}]
}
async with httpx.AsyncClient() as client:
await client.post(self.slack_webhook, json=payload, timeout=5.0)
async def _page_pagerduty(self, ctx: AlertContext) -> None:
if not self.pagerduty_key:
return
# Build deployment context summary for the alert
deploy_note = ""
if ctx.deployment_context and ctx.deployment_context.get("correlated_deployment"):
d = ctx.deployment_context
deploy_note = (
f"\n\nCorrelated deployment: `{d['correlated_deployment']}` "
f"deployed at {d.get('deployed_at', 'unknown')} "
f"by {d.get('deployed_by', 'unknown')}. "
f"Metric delta: {d.get('metric_delta', 0):+.3f}"
)
payload = {
"routing_key": self.pagerduty_key,
"event_action": "trigger",
"dedup_key": ctx.rule_id, # prevent duplicate incidents for same alert
"payload": {
"summary": f"[AI Quality {ctx.severity}] {ctx.title}",
"severity": "critical" if ctx.severity == "P0" else "error",
"source": "ai-quality-monitoring",
"timestamp": datetime.now(timezone.utc).isoformat(),
"custom_details": {
"metric": ctx.metric,
"current_value": round(ctx.current_value, 4),
"threshold": ctx.threshold,
"sample_count": ctx.sample_count,
"sustained_since": ctx.sustained_since.isoformat(),
"description": ctx.description + deploy_note,
"runbook": ctx.runbook_url
}
},
"links": [{"href": ctx.runbook_url, "text": "Runbook"}]
}
async with httpx.AsyncClient() as client:
response = await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=5.0
)
if response.status_code != 202:
print(f"PagerDuty error: {response.status_code} {response.text}")
async def _post_slack(self, ctx: AlertContext) -> None:
if not self.slack_webhook:
return
severity_colors = {
"P0": "#dc2626", # red
"P1": "#ea580c", # orange
"P2": "#d97706", # amber
"P3": "#ca8a04", # yellow
}
color = severity_colors.get(ctx.severity, "#6b7280")
# Build deployment hint field
fields = [
{"title": "Metric", "value": ctx.metric, "short": True},
{"title": "Severity", "value": ctx.severity, "short": True},
{"title": "Current Value", "value": f"`{ctx.current_value:.4f}`", "short": True},
{"title": "Threshold", "value": f"`{ctx.threshold}`", "short": True},
{"title": "Sample Count", "value": str(ctx.sample_count), "short": True},
{"title": "Sustained Since",
"value": ctx.sustained_since.strftime("%H:%M UTC"), "short": True},
]
if ctx.deployment_context and ctx.deployment_context.get("correlated_deployment"):
d = ctx.deployment_context
fields.append({
"title": "Correlated Deployment",
"value": (
f"`{d['correlated_deployment'][:12]}` by {d.get('deployed_by', '?')} "
f"at {d.get('deployed_at', '?')}"
),
"short": False
})
payload = {
"attachments": [{
"color": color,
"title": f"[{ctx.severity}] {ctx.title}",
"text": ctx.description,
"fields": fields,
"footer": f"AI Quality Monitoring | <{ctx.runbook_url}|Runbook>",
"ts": int(datetime.now(timezone.utc).timestamp())
}]
}
async with httpx.AsyncClient() as client:
await client.post(self.slack_webhook, json=payload, timeout=5.0)
Prometheus and Grafana Integration
Export quality metrics to Prometheus for visualization and additional alerting via Grafana:
# alerting/prometheus_exporter.py
from prometheus_client import Gauge, Histogram, Counter, start_http_server
from datetime import datetime
# Quality metric gauges (labeled by feature and model version)
faithfulness_gauge = Gauge(
"llm_quality_faithfulness",
"Rolling mean faithfulness score over the last hour",
labelnames=["feature", "model_version"]
)
hallucination_rate_gauge = Gauge(
"llm_quality_hallucination_rate",
"Fraction of responses with at least one hallucinated claim",
labelnames=["feature"]
)
answer_relevance_gauge = Gauge(
"llm_quality_answer_relevance",
"Rolling mean answer relevance score",
labelnames=["feature", "model_version"]
)
toxicity_rate_gauge = Gauge(
"llm_quality_toxicity_rate",
"Fraction of responses with toxicity score above threshold",
labelnames=["feature"]
)
context_utilization_gauge = Gauge(
"llm_quality_context_utilization",
"Rolling mean context utilization score",
labelnames=["feature"]
)
user_satisfaction_gauge = Gauge(
"llm_quality_user_satisfaction",
"Fraction of rated responses with thumbs-up (rolling 4h)",
labelnames=["feature"]
)
eval_latency_histogram = Histogram(
"llm_eval_duration_seconds",
"Time to run a quality evaluation",
labelnames=["eval_type"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
alerts_fired_counter = Counter(
"llm_quality_alerts_total",
"Total number of quality alerts fired",
labelnames=["severity", "metric"]
)
def export_quality_metrics(
metrics: dict,
feature: str,
model_version: str
) -> None:
"""
Update Prometheus gauges with the latest quality metrics.
Call this after each evaluation window completes.
metrics: {
"faithfulness_mean": float,
"hallucination_rate": float,
"answer_relevance_mean": float,
"toxicity_rate": float,
"context_utilization_mean": float,
"user_satisfaction_rate": float
}
"""
if "faithfulness_mean" in metrics:
faithfulness_gauge.labels(
feature=feature, model_version=model_version
).set(metrics["faithfulness_mean"])
if "hallucination_rate" in metrics:
hallucination_rate_gauge.labels(feature=feature).set(
metrics["hallucination_rate"]
)
if "answer_relevance_mean" in metrics:
answer_relevance_gauge.labels(
feature=feature, model_version=model_version
).set(metrics["answer_relevance_mean"])
if "toxicity_rate" in metrics:
toxicity_rate_gauge.labels(feature=feature).set(
metrics["toxicity_rate"]
)
if "context_utilization_mean" in metrics:
context_utilization_gauge.labels(feature=feature).set(
metrics["context_utilization_mean"]
)
if "user_satisfaction_rate" in metrics:
user_satisfaction_gauge.labels(feature=feature).set(
metrics["user_satisfaction_rate"]
)
# grafana/alerting/llm_quality_alerts.yaml
# Grafana alerting rules - supplementary to the Python alert engine
# These fire on the Prometheus metrics for Grafana dashboard integration
groups:
- name: llm_quality_critical
interval: 1m
rules:
- alert: LLMToxicityCritical
expr: llm_quality_toxicity_rate > 0.01
for: 5m
labels:
severity: critical
team: ai-platform
annotations:
summary: "CRITICAL: LLM toxicity rate above 1%"
description: >
Feature {{ $labels.feature }} toxicity rate is
{{ $value | humanizePercentage }} (threshold: 1%).
Users are receiving potentially harmful content.
runbook_url: "https://wiki.internal/runbooks/llm-toxicity"
dashboard_url: "https://grafana.internal/d/llm-quality"
- alert: LLMFaithfulnessCritical
expr: llm_quality_faithfulness < 0.40
for: 10m
labels:
severity: critical
team: ai-platform
annotations:
summary: "CRITICAL: LLM faithfulness collapsed"
description: >
Feature {{ $labels.feature }} faithfulness is {{ $value | humanize }}
(threshold: 0.40). Widespread hallucinations likely.
runbook_url: "https://wiki.internal/runbooks/llm-faithfulness"
- name: llm_quality_warning
interval: 2m
rules:
- alert: LLMFaithfulnessDegraded
expr: llm_quality_faithfulness < 0.70
for: 15m
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM faithfulness below warning threshold"
description: >
Feature {{ $labels.feature }} (model {{ $labels.model_version }})
faithfulness is {{ $value | humanize }}.
Check recent deployments and RAG index state.
runbook_url: "https://wiki.internal/runbooks/llm-faithfulness"
- alert: LLMAnswerRelevanceDegraded
expr: llm_quality_answer_relevance < 0.60
for: 30m
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM answer relevance degraded"
description: >
Answer relevance for {{ $labels.feature }} is {{ $value | humanize }}.
Responses may not be addressing user questions.
- alert: LLMUserSatisfactionDrop
expr: llm_quality_user_satisfaction < 0.50
for: 1h
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM user satisfaction rate below 50%"
description: >
Thumbs-up rate for {{ $labels.feature }} is
{{ $value | humanizePercentage }} over the last 4 hours.
Deployment Correlation
The most actionable piece of information in an alert message is "this started right after deployment X." Automatically correlating quality degradation with deployments gives the on-call engineer the most likely root cause before they even open the runbook:
# alerting/deployment_correlator.py
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional
import asyncpg
@dataclass
class DeploymentRecord:
deployment_id: str
deployed_at: datetime
deployed_by: str
components: list[str] # e.g., ["retrieval-service", "prompt-templates"]
commit_hash: str
commit_message: str
environment: str
@dataclass
class CorrelationResult:
correlated_deployment: Optional[str]
deployed_at: Optional[datetime]
deployed_by: Optional[str]
components: list[str]
metric_delta: float # how much the metric changed at deployment time
confidence: str # "strong", "moderate", "weak", "none"
explanation: str
class DeploymentCorrelator:
"""
Automatically checks if quality degradation correlates with a recent deployment.
Adds a deployment context hint to every P0/P1 alert message.
"""
def __init__(self, db: asyncpg.Connection):
self.db = db
async def check(
self,
metric: str,
degradation_start: datetime,
look_back_hours: int = 3
) -> CorrelationResult:
"""
Find the deployment most likely correlated with quality degradation.
Strategy:
1. Find deployments in the (degradation_start - look_back_hours) window
2. For each deployment, compare metric mean before vs. after
3. Return the deployment with the largest negative delta
"""
window_start = degradation_start - timedelta(hours=look_back_hours)
deployments = await self.db.fetch("""
SELECT deployment_id, deployed_at, deployed_by,
components, commit_hash, commit_message, environment
FROM deployments
WHERE deployed_at BETWEEN $1 AND $2
AND environment = 'production'
ORDER BY deployed_at DESC
""", window_start, degradation_start + timedelta(minutes=30))
if not deployments:
return CorrelationResult(
correlated_deployment=None,
deployed_at=None,
deployed_by=None,
components=[],
metric_delta=0.0,
confidence="none",
explanation="No deployments found in the correlation window."
)
best_correlation = None
best_delta = 0.0
for deploy in deployments:
deployed_at = deploy["deployed_at"]
# Get metric average in 1 hour before this deployment
pre_avg = await self._get_metric_average(
metric=metric,
start=deployed_at - timedelta(hours=1),
end=deployed_at
)
# Get metric average in 1 hour after this deployment
post_avg = await self._get_metric_average(
metric=metric,
start=deployed_at,
end=deployed_at + timedelta(hours=1)
)
if pre_avg is None or post_avg is None:
continue
delta = post_avg - pre_avg # negative = degradation
# We want the deployment with the largest negative delta
if delta < best_delta:
best_delta = delta
best_correlation = {
"deployment": deploy,
"delta": delta,
"pre_avg": pre_avg,
"post_avg": post_avg
}
if best_correlation is None:
return CorrelationResult(
correlated_deployment=None,
deployed_at=None,
deployed_by=None,
components=[],
metric_delta=0.0,
confidence="none",
explanation="No significant metric change at any deployment boundary."
)
deploy = best_correlation["deployment"]
delta = best_correlation["delta"]
# Classify confidence
if delta < -0.15:
confidence = "strong"
elif delta < -0.08:
confidence = "moderate"
else:
confidence = "weak"
explanation = (
f"Deployment {deploy['deployment_id'][:8]} at "
f"{deploy['deployed_at'].strftime('%H:%M UTC')} by "
f"{deploy['deployed_by']} caused {metric} to change "
f"from {best_correlation['pre_avg']:.3f} to "
f"{best_correlation['post_avg']:.3f} ({delta:+.3f}). "
f"Components: {', '.join(deploy['components'] or [])}."
)
return CorrelationResult(
correlated_deployment=deploy["deployment_id"],
deployed_at=deploy["deployed_at"],
deployed_by=deploy["deployed_by"],
components=deploy["components"] or [],
metric_delta=round(delta, 4),
confidence=confidence,
explanation=explanation
)
async def _get_metric_average(
self,
metric: str,
start: datetime,
end: datetime
) -> Optional[float]:
row = await self.db.fetchrow("""
SELECT AVG(value) as avg_value, COUNT(*) as count
FROM quality_metrics
WHERE metric_name = $1
AND recorded_at BETWEEN $2 AND $3
""", metric, start, end)
if row and row["count"] >= 5:
return float(row["avg_value"])
return None
Incident Runbook Template
A runbook is the structured guide an on-call engineer follows when an alert fires. It must be written before the incident, linked directly from the alert message, and kept current with system changes:
## Runbook: LLM Faithfulness Degradation
**Alert**: `LLMFaithfulnessDegraded` - Mean faithfulness < 0.70 for 15 minutes
**Severity**: P1 - Page on-call, 15-minute response SLA
**Last updated**: 2026-01-15 by @platform-team
---
### Step 1: Assess Actual Severity (3 minutes)
1. Open the [LLM Quality Dashboard](https://grafana.internal/d/llm-quality)
2. Check current faithfulness (last 30 min rolling, by feature)
3. Is this affecting all features or just one?
- All features → systemic issue (model, shared infrastructure)
- One feature → feature-specific change (prompt, retrieval config)
4. Check traffic volume - low-traffic periods amplify metric noise
5. **If faithfulness < 0.50: immediately escalate to P0**
---
### Step 2: Check Recent Deployments (2 minutes)
The alert message includes a "Correlated Deployment" field if auto-correlation found one.
1. If a correlated deployment is listed: **that is your primary suspect**
2. Check deployment log: https://deploys.internal/prod
3. Look specifically for changes to:
- Prompt templates (any change to system or user prompts)
- Context assembly logic (chunking, truncation, ordering)
- RAG retrieval config (embedding model, top-k, reranking)
- Model version (any rollout of a different claude version)
---
### Step 3: Trace-Level Investigation (5 minutes)
1. Open LangSmith/Langfuse → filter to last 30 min + faithfulness < 0.60
2. Click through 5-10 failing traces. For each, ask:
- Is the retrieved context relevant to the query?
- Is the model's response grounded in the retrieved context?
- Are there unfilled template variables in the prompt?
- Is the context visibly truncated at an unusual point?
3. Sample some passing traces (faithfulness > 0.85). What's different?
---
### Step 4: Mitigation Decision Tree
Is the issue in the retrieved context? ├─ YES (context is irrelevant or truncated) │ ├─ Check chunk selection logic for off-by-one errors │ ├─ Verify embedding model is returning correct results │ └─ Temp mitigation: increase context window limit (config flag) │ ├─ NO (context is fine, model ignores it) │ ├─ Check if prompt instructs model to use context │ ├─ Check if model version changed │ └─ Temp mitigation: roll back to last known good prompt version │ └─ PARTIAL (some queries affected, others not) ├─ Likely a query-type specific regression ├─ Identify common pattern in failing queries └─ Add a specific handling rule for that pattern
---
### Step 5: Apply Mitigation
- **Prompt rollback**: `promptctl rollback --env prod --feature <feature_name>`
- **Model version rollback**: Contact model-team Slack channel with trace examples
- **Config flag change**: `featureflags set --env prod llm.context_window_tokens 4096`
---
### Step 6: Verify Resolution (5 minutes monitoring)
After applying mitigation:
1. Monitor faithfulness in Grafana for 10 minutes
2. Target: faithfulness back above 0.82 (pre-incident baseline)
3. If no improvement after 10 minutes: escalate to P0
---
### Step 7: Post-Incident (within 24 hours)
1. File post-mortem in Notion: [Template](https://notion.internal/postmortem-template)
2. Add 3-5 failing examples to the golden evaluation dataset
3. Create an eval gate that would have caught this regression
4. If deployment-correlated: add the change type to the deployment checklist
Common Mistakes
:::danger Alert on Sustained Rolling Averages, Not Individual Responses
Individual LLM responses vary naturally - a single faithfulness score of 0.4 is not an incident, it's a hard query. Alert only on the rolling mean over a minimum sample count. A good rule: mean(last 20+ samples, over 1 hour) < threshold for 15+ minutes. This eliminates ~90% of false positives from natural quality variance.
:::
:::warning Set Thresholds Based on Your Actual Baseline, Not Intuition "Faithfulness < 0.80 seems like a good threshold" is how you end up with constant false positive P2 alerts if your baseline is 0.78. Measure your baseline for 2-4 weeks before setting any thresholds. Set P1 thresholds at approximately the 2nd-5th percentile of your historical distribution. Use SPC to detect relative regressions without needing to know the baseline in advance. :::
:::danger Missing the min_samples Guard Creates 3 AM False Positives
If your evaluation pipeline has an outage and produces 3 samples in the alert window, those 3 samples might have a mean of 0.2 - triggering a P0 alert. Without a min_samples >= 20 guard, low-traffic windows and pipeline outages will generate constant false alerts. The minimum sample requirement is not optional for P0/P1 severity rules.
:::
:::warning Never Alert on Multiple Correlated Metrics Without Deduplication
If faithfulness drops, hallucination rate typically rises, answer relevance drops, and user satisfaction may follow. Without deduplication, a single root cause fires 4 separate alerts, 3 PagerDuty incidents, and 2 Slack threads simultaneously. The on-call engineer is immediately overwhelmed. Group correlated alerts into a single incident with one PagerDuty dedup_key. Use the most severe metric as the primary alert and list the others as context.
:::
Interview Q&A
Q1: How do you design a quality alerting system for a production LLM application?
Answer: Quality alerting for LLMs requires five interconnected components:
Evaluation pipeline: An async worker pool that samples 5-15% of production traffic, runs faithfulness, relevance, toxicity, and hallucination evaluators (using Claude Haiku as judge), and writes metric values to a time-series database. The evaluation must be asynchronous - it cannot be in the hot path of production requests. For enterprise-tier users or error responses, sample 100%.
Alert engine: Three complementary approaches run simultaneously. (1) Threshold-based rules with minimum sample count guards and sustain duration requirements prevent false positives. (2) SPC rules (Western Electric) detect relative regressions against the metric's own historical baseline - catches gradual drift that fixed thresholds miss. (3) ML anomaly detection (Isolation Forest) catches complex multivariate anomalies - quality that is contextually anomalous given time-of-day and traffic level.
Severity routing: P0/P1 pages the on-call engineer via PagerDuty. P2/P3 posts to the team Slack channel. All levels update Prometheus gauges for Grafana. Use dedup_key in PagerDuty to group correlated alerts (one root cause should produce one incident, not five).
Deployment correlation: Every quality alert automatically checks if a deployment happened in the preceding 3 hours. If a deployment correlates with the quality drop (metric delta > 0.08), the deployment details are included directly in the alert message. This typically reduces mean time to root cause by 60-70%.
Runbooks: Every P0/P1 alert rule has a corresponding runbook linked directly from the alert. Runbooks contain: severity assessment (is it really that bad?), recent deployment checklist, trace-level investigation steps, a mitigation decision tree, verification criteria, and post-incident requirements. The runbook must be written before the incident.
The hardest part: calibrating thresholds without a baseline. Instrument first, measure for 2-4 weeks, then set thresholds at the 2nd-5th percentile of historical values. Start with loose thresholds (high tolerance for false negatives) and tighten as you understand your system's normal range.
Q2: Explain Statistical Process Control for AI quality monitoring and why it's better than fixed thresholds.
Answer: Statistical Process Control (SPC) is a quality control technique from manufacturing, first developed by Walter Shewhart at Bell Labs in the 1920s. The core insight: instead of comparing a metric against a fixed threshold, compare it against the metric's own statistical distribution. A metric is "out of control" when it behaves in a statistically unusual way relative to its own history.
How it works for AI metrics: Maintain a rolling history of quality metric values (last 1,000 observations). Compute the historical mean (μ) and standard deviation (σ). Control limits: UCL = μ + 3σ, LCL = μ - 3σ. Apply the Western Electric rules to detect unusual patterns:
- Rule 1: Single point beyond 3σ - an extreme outlier, very likely real
- Rule 2: 2 of 3 consecutive points beyond 2σ on the same side - trending toward a limit
- Rule 3: 4 of 5 consecutive points beyond 1σ on the same side - mean shift beginning
- Rule 4: 8 consecutive points on the same side of mean - confirmed sustained shift
Why it's better than fixed thresholds: Fixed thresholds suffer from two failure modes. (1) False negatives: if your baseline faithfulness is 0.90 and you set a threshold at 0.70, a regression from 0.90 to 0.75 - a major quality drop - never fires an alert. (2) False positives: if your baseline varies between 0.72 and 0.95 depending on time of day and query difficulty, a threshold at 0.75 fires constantly during normal operations.
SPC adapts to your system's actual distribution. A faithfulness of 0.75 is an anomaly if your mean is 0.92 (Rule 1: 2.2σ below mean). The same value of 0.75 is unremarkable if your mean is 0.77. SPC also detects trends through the pattern rules - 8 consecutive points below the mean is a confirmed shift even if no single point is extreme enough to trigger Rule 1.
Limitation: SPC requires a stable baseline to establish reliable control limits (minimum 50-100 observations). If your system is undergoing rapid A/B experimentation or model upgrades, the baseline itself shifts frequently and SPC control limits lag behind. In these scenarios, supplement SPC with deployment-correlated analysis.
Q3: How do you prevent alert fatigue in AI quality monitoring?
Answer: Alert fatigue is when engineers receive so many alerts that they start ignoring all of them - including real incidents. It's one of the most dangerous failure modes in monitoring systems. The causes and their solutions:
Too-sensitive thresholds: The most common cause. Set without a measured baseline, the threshold fires constantly during normal operations. Solution: measure baseline first (2-4 weeks), set thresholds at the historical 2nd-5th percentile, require 20+ samples and 15+ minute sustain duration before firing.
No minimum sample count: Evaluation pipeline outage → 2 samples in the alert window → extreme mean → false P0 at 3 AM. Fix: always require min_samples >= 20 for any P0/P1 rule.
Duplicate alerts for the same root cause: Faithfulness drops → hallucination rate rises → relevance drops → 3 simultaneous P1 alerts for one root cause. Fix: group correlated alerts with a single PagerDuty dedup_key. The dedup key should be the root metric (faithfulness), not the rule ID - so all severity levels for the same metric collapse into one incident.
Alerting on unactionable metrics: "P99 token generation latency increased 5%" - what is the on-call engineer supposed to do? Only alert on metrics where there is a clear investigation path and mitigation option. If you don't have a runbook for it, don't alert on it.
No feedback loop on false positives: Track the "no-action rate" - percentage of alerts that resolved without engineer action. If > 15% of alerts are no-action, investigate and tune thresholds. Consider a weekly false positive review meeting for the first 3 months of a new monitoring system.
Severity mis-calibration: Crying wolf with P1 for P3 situations. Reserve P0 for existential quality failures (toxicity, complete hallucination) that justify waking someone at 3 AM. P2 and P3 should be team notifications during business hours only.
Q4: How do you correlate LLM quality degradation with code deployments?
Answer: Deployment correlation is the diagnostic tool that most dramatically reduces time-to-root-cause for AI quality incidents. The approach: at every quality alert, automatically check if a deployment happened in the preceding 3 hours, and if so, whether the quality metric shows a step-change at the deployment timestamp.
Implementation: (1) Store a deployment event record at every production deployment, including: deployment ID, timestamp, deployer, list of components changed, commit hash, commit message. (2) When a quality alert fires, query for deployments in the past 3 hours. (3) For each candidate deployment, retrieve the quality metric's average value in the 1 hour before vs. 1 hour after. (4) Compute the delta. A large negative delta (e.g., faithfulness drops 0.12 in the 1 hour after deployment vs. 1 hour before) is a strong correlation signal. (5) Include the correlated deployment details - deployer, timestamp, components, commit hash, delta - directly in the alert message.
Confidence classification: Delta > 0.15 = strong correlation (deployment is almost certainly the cause). Delta 0.08-0.15 = moderate correlation (likely but not certain). Delta < 0.08 = weak correlation (may be coincidental).
Edge cases: (1) Multiple deployments close together makes attribution difficult. Mitigate with deployment freeze windows around recent quality incidents. (2) Quality changes that lag deployments - some regressions only appear after the cache warms or specific traffic patterns hit the changed code. Extend the correlation window to 4-6 hours in these cases. (3) Gradual degradation (not a step-change) may not correlate with any single deployment - look for deployments that changed configuration that could cause cumulative drift.
The gold standard: every deployment automatically triggers a quality eval run against your golden evaluation dataset as a pre-production gate. Failures block the deployment. This catches most quality regressions at the CI layer, before they ever reach production alerting.
Q5: Walk through your complete incident response process when a toxicity alert fires at 3 AM.
Answer: A toxicity alert at 3 AM is a P0 - wake-someone-up-immediately severity. Here is the complete process:
T+0 (alert fires): PagerDuty pages the on-call engineer. The alert message includes: current toxicity rate, feature affected, sample count, alert duration, deployment correlation hint (if any), and a direct link to the runbook.
T+3 minutes - Assess severity: Open the quality dashboard. What is the actual toxicity rate? 1.1% (just over threshold) is very different from 8% (widespread problem). Is it affecting one feature or all features? One feature suggests a feature-specific change; all features suggests a shared component (model routing, prompt prefix, output filter).
T+5 minutes - Attempt immediate mitigation: Two parallel paths depending on the alert context.
Path A (deployment correlation found): Roll back the correlated deployment. kubectl rollout undo deployment/ai-service --to-revision=N. This is the fastest mitigation if the deployment is the cause.
Path B (no deployment correlation or rollback not possible): Enable the emergency content filter - a rule-based post-processor that checks outputs for known harmful patterns. This is a config flag, not a deployment. It degrades response quality slightly but eliminates toxicity. Also consider: reduce temperature to 0 (deterministic outputs are less likely to be toxic), switch to backup model version.
T+8 minutes - Trace investigation: Open LangSmith. Filter to toxicity_score > 0.7, last 30 minutes. Click through 3-5 traces. Key questions: Is the toxicity in direct model generation or from tool output? Are the inputs adversarial (jailbreak patterns)? Is there a specific query type or user segment generating most of the toxic outputs?
T+12 minutes - Communication: Post in #incidents: "P0 LLM toxicity - [feature name]. Toxicity rate at X%. Root cause under investigation. Mitigation [applied/in progress]. ETA to resolution: 15 minutes." If enterprise users are affected, flag to customer success team.
T+25 minutes - Verify resolution: After mitigation, toxicity rate should return below 0.5% within 10 minutes. If it doesn't, escalate by involving the full AI platform team.
Next day - Post-incident: (1) Write a post-mortem with timeline, root cause, contributing factors, and corrective actions. (2) Add the adversarial inputs to a red-team evaluation dataset. (3) Review and strengthen input filtering rules for similar patterns. (4) Add an automated eval check for this pattern to the CI quality gate. The goal: the same incident class should never reach production again without being caught pre-deployment.
