Production Monitoring for LLMs
The Silent Degradation
A fintech company runs an LLM-powered document analysis tool. For the first four months, everything works well. User satisfaction is high. The model summarizes financial documents accurately and extracts key terms reliably.
Month five: the LLM provider quietly updates their model. The model version string does not change. The response format is the same. Latency looks identical. No alerts fire. But something subtle has changed: the model's approach to extracting dates from European-format documents (DD/MM/YYYY) has shifted. It now occasionally transposes month and day.
The bug is invisible in aggregate metrics. Average latency: fine. Cost per request: fine. Error rate: zero (the model always returns a response). The failures only appear in the content - and content monitoring requires knowing what to look for.
Three months later, a compliance audit finds that 340 documents had been processed with incorrect date extraction. The company spent $180,000 in remediation. The root cause was a silent model update that no monitoring system caught.
Production monitoring for LLMs is fundamentally different from traditional ML monitoring or even traditional software monitoring. The failure modes are subtle, semantic, and often invisible to infrastructure metrics. You need a monitoring stack designed for language model behavior, not just API call statistics.
Why Production LLM Monitoring Is Different
Traditional ML monitoring: "Is the model's prediction distribution shifting?" Traditional software monitoring: "Is the service up? Is latency acceptable?" LLM monitoring: "Are the outputs still correct, safe, and useful? Are costs staying controlled? Is anything in the content changing?"
The differences that matter:
Outputs are free-form text: A bug in a classifier changes a label. A bug in an LLM changes the meaning of hundreds of words. You cannot check correctness by comparing integers.
The model can change without your control: Model providers update models behind stable API versions. Your fine-tuned model's behavior can drift with context window position. Prompt changes have unpredictable cascading effects.
Costs are consumption-based and variable: A single user who discovers a verbose prompt pattern can spend 50x the expected budget in minutes.
Safety violations are rare but catastrophic: A 0.001% harmful output rate is acceptable in aggregate but represents thousands of incidents at scale.
Latency has two components: Time to first token (TTFT) affects perceived responsiveness; time per output token affects total request time. Both matter differently depending on use case.
The Four Monitoring Layers
Layer 1: Infrastructure Monitoring
Latency: TTFT vs TBT vs Total
For streaming LLM responses, there are three distinct latency measurements:
TTFT (Time to First Token): The delay from when the request is sent until the first token is received. This determines perceived responsiveness. For chat interfaces, TTFT above 2–3 seconds feels slow. Causes of high TTFT: model queue depth, cold start, long system prompts that must be processed before generation begins.
TBT (Time Between Tokens): Once streaming starts, how long between each token? TBT determines how smooth the streaming feels. Consistent TBT of 20–50ms (20–50 tokens/second) is comfortable to read. TBT spikes cause jarring pauses mid-response.
Total latency: TTFT + (output tokens × TBT). Important for batch processing but less critical for interactive use where streaming hides total time.
import time
import anthropic
from dataclasses import dataclass, field
from typing import Optional
import statistics
@dataclass
class LatencyMeasurement:
request_id: str
ttft_seconds: float
tbt_values: list = field(default_factory=list)
total_seconds: float = 0.0
input_tokens: int = 0
output_tokens: int = 0
error: Optional[str] = None
@property
def mean_tbt(self) -> float:
return statistics.mean(self.tbt_values) if self.tbt_values else 0.0
@property
def p95_tbt(self) -> float:
if not self.tbt_values:
return 0.0
sorted_tbts = sorted(self.tbt_values)
idx = int(0.95 * len(sorted_tbts))
return sorted_tbts[idx]
@property
def tokens_per_second(self) -> float:
if self.total_seconds > 0 and self.output_tokens > 0:
return self.output_tokens / self.total_seconds
return 0.0
def measure_streaming_latency(
client: anthropic.Anthropic,
prompt: str,
model: str = "claude-3-5-sonnet-20241022",
request_id: str = "req_001",
) -> LatencyMeasurement:
"""
Measure TTFT, TBT, and total latency for a streaming LLM request.
"""
measurement = LatencyMeasurement(request_id=request_id)
first_token_received = False
last_token_time = None
request_start = time.perf_counter()
try:
with client.messages.stream(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
current_time = time.perf_counter()
if not first_token_received:
# First token - record TTFT
measurement.ttft_seconds = current_time - request_start
first_token_received = True
last_token_time = current_time
else:
# Subsequent tokens - record TBT
tbt = current_time - last_token_time
measurement.tbt_values.append(tbt)
last_token_time = current_time
# Get final usage stats
final_message = stream.get_final_message()
measurement.input_tokens = final_message.usage.input_tokens
measurement.output_tokens = final_message.usage.output_tokens
except Exception as e:
measurement.error = str(e)
measurement.total_seconds = time.perf_counter() - request_start
return measurement
class LatencyMonitor:
"""
Aggregates latency measurements and triggers alerts.
"""
def __init__(
self,
ttft_p95_threshold_s: float = 3.0,
tbt_p99_threshold_s: float = 0.5,
):
self.measurements: list[LatencyMeasurement] = []
self.ttft_threshold = ttft_p95_threshold_s
self.tbt_threshold = tbt_p99_threshold_s
def record(self, measurement: LatencyMeasurement) -> None:
self.measurements.append(measurement)
self._check_alerts(measurement)
def _check_alerts(self, m: LatencyMeasurement) -> None:
if m.ttft_seconds > self.ttft_threshold:
print(f"ALERT: High TTFT {m.ttft_seconds:.2f}s (threshold: {self.ttft_threshold}s) "
f"for request {m.request_id}")
if m.p95_tbt > self.tbt_threshold:
print(f"ALERT: High P95 TBT {m.p95_tbt*1000:.0f}ms (threshold: {self.tbt_threshold*1000:.0f}ms)")
def report(self) -> dict:
if not self.measurements:
return {}
ttfts = [m.ttft_seconds for m in self.measurements if not m.error]
tps = [m.tokens_per_second for m in self.measurements if not m.error and m.tokens_per_second > 0]
return {
"n_requests": len(self.measurements),
"error_rate": sum(1 for m in self.measurements if m.error) / len(self.measurements),
"ttft_p50_s": round(statistics.median(ttfts), 3) if ttfts else None,
"ttft_p95_s": round(sorted(ttfts)[int(0.95 * len(ttfts))], 3) if ttfts else None,
"ttft_p99_s": round(sorted(ttfts)[int(0.99 * len(ttfts))], 3) if ttfts else None,
"throughput_p50_tps": round(statistics.median(tps), 1) if tps else None,
}
Cost Monitoring
Cost is the most predictable of all LLM monitoring dimensions, yet teams consistently over-spend because they do not set up per-user or per-feature cost tracking before launch.
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import json
from pathlib import Path
# Current pricing (update regularly - these change)
MODEL_PRICING = {
"claude-3-5-sonnet-20241022": {"input": 3.0, "output": 15.0}, # per 1M tokens
"claude-3-5-haiku-20241022": {"input": 0.8, "output": 4.0},
"claude-3-opus-20240229": {"input": 15.0, "output": 75.0},
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.0, "output": 30.0},
}
@dataclass
class CostRecord:
request_id: str
model: str
input_tokens: int
output_tokens: int
user_id: Optional[str]
feature: Optional[str]
timestamp: str
@property
def cost_usd(self) -> float:
pricing = MODEL_PRICING.get(self.model, {"input": 0, "output": 0})
return (
self.input_tokens * pricing["input"] / 1_000_000 +
self.output_tokens * pricing["output"] / 1_000_000
)
class CostMonitor:
"""
Tracks and alerts on LLM costs with per-user and per-feature breakdown.
"""
def __init__(
self,
daily_budget_usd: float = 100.0,
per_user_hourly_limit_usd: float = 1.0,
):
self.daily_budget = daily_budget_usd
self.per_user_limit = per_user_hourly_limit_usd
self.records: list[CostRecord] = []
self.user_hourly_spend: dict[str, float] = {}
def record(self, record: CostRecord) -> None:
self.records.append(record)
self._check_per_user_limit(record)
def _check_per_user_limit(self, record: CostRecord) -> None:
if not record.user_id:
return
user_id = record.user_id
self.user_hourly_spend[user_id] = (
self.user_hourly_spend.get(user_id, 0) + record.cost_usd
)
if self.user_hourly_spend[user_id] > self.per_user_limit:
print(f"ALERT: User {user_id} has spent ${self.user_hourly_spend[user_id]:.4f} "
f"this hour (limit: ${self.per_user_limit})")
def daily_report(self) -> dict:
today = datetime.utcnow().date().isoformat()
today_records = [
r for r in self.records
if r.timestamp.startswith(today)
]
total_cost = sum(r.cost_usd for r in today_records)
by_model = {}
by_feature = {}
by_user = {}
for r in today_records:
by_model[r.model] = by_model.get(r.model, 0) + r.cost_usd
if r.feature:
by_feature[r.feature] = by_feature.get(r.feature, 0) + r.cost_usd
if r.user_id:
by_user[r.user_id] = by_user.get(r.user_id, 0) + r.cost_usd
# Top 10 users by cost
top_users = sorted(by_user.items(), key=lambda x: x[1], reverse=True)[:10]
alerts = []
if total_cost > self.daily_budget * 0.8:
alerts.append(f"Cost at {total_cost/self.daily_budget:.0%} of daily budget")
if total_cost > self.daily_budget:
alerts.append(f"CRITICAL: Daily budget exceeded (${total_cost:.2f} > ${self.daily_budget:.2f})")
return {
"date": today,
"total_cost_usd": round(total_cost, 4),
"budget_used_pct": round(total_cost / self.daily_budget * 100, 1),
"by_model": {k: round(v, 4) for k, v in by_model.items()},
"by_feature": {k: round(v, 4) for k, v in by_feature.items()},
"top_users": [(u, round(c, 4)) for u, c in top_users],
"n_requests": len(today_records),
"alerts": alerts,
}
Layer 2: Output Quality Monitoring
Embedding-Based Drift Detection
When you cannot check correctness directly (no ground truth for production queries), embedding-based methods can detect when output distributions shift.
import numpy as np
from sklearn.preprocessing import normalize
from typing import List, Optional
from anthropic import Anthropic
class OutputDriftDetector:
"""
Detects quality drift in LLM outputs using embedding-based methods.
Concept: if the distribution of output embeddings shifts significantly,
the model's behavior has changed.
"""
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
window_size: int = 500,
drift_threshold: float = 0.1,
):
self.embedding_model = embedding_model
self.window_size = window_size
self.drift_threshold = drift_threshold
self.baseline_embeddings: Optional[np.ndarray] = None
self.recent_embeddings: List[np.ndarray] = []
def embed_text(self, text: str) -> np.ndarray:
"""Get embedding for a text string."""
import openai
client = openai.OpenAI()
response = client.embeddings.create(
model=self.embedding_model,
input=text[:8000], # Truncate for safety
)
return np.array(response.data[0].embedding)
def set_baseline(self, outputs: List[str]) -> None:
"""
Establish baseline distribution from known-good outputs.
Call this immediately after launch when quality is verified.
"""
print(f"Computing baseline from {len(outputs)} samples...")
embeddings = [self.embed_text(out) for out in outputs]
self.baseline_embeddings = np.array(embeddings)
print(f"Baseline established. Mean embedding norm: "
f"{np.linalg.norm(self.baseline_embeddings, axis=1).mean():.3f}")
def record_output(self, output: str) -> Optional[dict]:
"""
Record a new output and check for drift.
Returns drift alert if detected, None otherwise.
"""
embedding = self.embed_text(output)
self.recent_embeddings.append(embedding)
# Only check drift when we have enough samples
if len(self.recent_embeddings) < 50:
return None
# Use sliding window of recent outputs
window = np.array(self.recent_embeddings[-self.window_size:])
return self._check_drift(window)
def _check_drift(self, recent: np.ndarray) -> Optional[dict]:
"""
Compare recent output distribution to baseline.
Uses Maximum Mean Discrepancy (MMD) as distribution distance.
"""
if self.baseline_embeddings is None:
return None
mmd = self._compute_mmd(self.baseline_embeddings, recent)
if mmd > self.drift_threshold:
return {
"drift_detected": True,
"mmd_score": round(mmd, 4),
"threshold": self.drift_threshold,
"n_recent_samples": len(recent),
"recommendation": "Review recent outputs for quality changes. "
"Check if model or prompt changed recently.",
}
return None
def _compute_mmd(self, X: np.ndarray, Y: np.ndarray) -> float:
"""
Estimate Maximum Mean Discrepancy between two embedding sets.
Uses RBF kernel.
"""
# Sample for efficiency
n_samples = min(100, len(X), len(Y))
X_sample = X[np.random.choice(len(X), n_samples, replace=False)]
Y_sample = Y[np.random.choice(len(Y), n_samples, replace=False)]
# Normalize
X_norm = normalize(X_sample)
Y_norm = normalize(Y_sample)
# RBF kernel
def rbf_kernel(A: np.ndarray, B: np.ndarray, gamma: float = 1.0) -> np.ndarray:
dists = np.sum(A**2, axis=1)[:, None] + np.sum(B**2, axis=1)[None, :] - 2 * A @ B.T
return np.exp(-gamma * dists)
K_XX = rbf_kernel(X_norm, X_norm)
K_YY = rbf_kernel(Y_norm, Y_norm)
K_XY = rbf_kernel(X_norm, Y_norm)
mmd = K_XX.mean() + K_YY.mean() - 2 * K_XY.mean()
return float(max(0, mmd))
User Feedback Integration
from enum import Enum
from typing import Optional
import json
class FeedbackType(Enum):
THUMBS_UP = "thumbs_up"
THUMBS_DOWN = "thumbs_down"
CORRECTION = "correction"
REPORT = "report"
class UserFeedbackCollector:
"""
Integrates user feedback signals into quality monitoring.
Tracks satisfaction rate, correction patterns, and issue types.
"""
def __init__(self):
self.feedback_records = []
self.running_satisfaction = {"positive": 0, "negative": 0}
def record_feedback(
self,
request_id: str,
feedback_type: FeedbackType,
user_id: Optional[str] = None,
correction_text: Optional[str] = None,
issue_category: Optional[str] = None,
) -> None:
"""Record a user feedback event."""
record = {
"request_id": request_id,
"feedback_type": feedback_type.value,
"user_id": user_id,
"correction": correction_text,
"issue_category": issue_category,
"timestamp": datetime.utcnow().isoformat(),
}
self.feedback_records.append(record)
if feedback_type == FeedbackType.THUMBS_UP:
self.running_satisfaction["positive"] += 1
elif feedback_type in [FeedbackType.THUMBS_DOWN, FeedbackType.REPORT]:
self.running_satisfaction["negative"] += 1
def get_satisfaction_rate(self, window_size: int = 1000) -> dict:
"""Compute satisfaction rate from recent feedback."""
recent = self.feedback_records[-window_size:]
pos = sum(1 for r in recent if r["feedback_type"] == "thumbs_up")
neg = sum(1 for r in recent if r["feedback_type"] in ["thumbs_down", "report"])
total_rated = pos + neg
rate = pos / total_rated if total_rated > 0 else None
return {
"satisfaction_rate": round(rate, 4) if rate else None,
"n_positive": pos,
"n_negative": neg,
"n_total_rated": total_rated,
"feedback_rate": total_rated / len(recent) if recent else 0,
}
def get_issue_categories(self) -> dict:
"""Break down negative feedback by issue category."""
negative_feedback = [
r for r in self.feedback_records
if r["feedback_type"] in ["thumbs_down", "report"]
and r.get("issue_category")
]
categories = {}
for r in negative_feedback:
cat = r["issue_category"]
categories[cat] = categories.get(cat, 0) + 1
return dict(sorted(categories.items(), key=lambda x: x[1], reverse=True))
Layer 3: Safety Monitoring
import re
from typing import Callable
class SafetyMonitor:
"""
Real-time safety monitoring for production LLM outputs.
Uses a fast classifier for online filtering and a slower LLM judge for sampling.
"""
# Fast keyword-based pre-filter (catches obvious cases with zero latency)
HARD_BLOCK_PATTERNS = [
r'\b(instructions? for making|how to make|synthesize|synthesizing)\b.{0,50}\b(explosives?|bomb|poison|nerve agent)',
r'\b(kill yourself|commit suicide|end your life)\b',
r'child.*sexual|sexual.*child|CSAM',
]
def __init__(
self,
llm_safety_judge: Callable[[str, str], dict],
sampling_rate: float = 0.05,
alert_threshold: float = 0.001,
):
self.safety_judge = llm_safety_judge
self.sampling_rate = sampling_rate
self.alert_threshold = alert_threshold
self.stats = {
"total": 0,
"hard_blocked": 0,
"llm_flagged": 0,
"human_review_queue": [],
}
# Compile patterns
self.patterns = [re.compile(p, re.IGNORECASE) for p in self.HARD_BLOCK_PATTERNS]
def check_output(
self,
prompt: str,
response: str,
request_id: str,
) -> dict:
"""
Check output safety. Returns whether to serve or block.
Fast path: pattern matching (< 1ms)
Slow path: LLM judge (sampled, async)
"""
self.stats["total"] += 1
# Fast path: pattern matching
for pattern in self.patterns:
if pattern.search(response):
self.stats["hard_blocked"] += 1
self._queue_for_human_review(request_id, prompt, response, "pattern_match")
return {
"serve": False,
"reason": "hard_block_pattern",
"message": "This content is not available.",
}
# Sampling path: LLM judge (async - don't block the response)
import random
if random.random() < self.sampling_rate:
self._async_llm_safety_check(request_id, prompt, response)
return {"serve": True, "reason": "passed"}
def _async_llm_safety_check(
self,
request_id: str,
prompt: str,
response: str,
) -> None:
"""
Queue for async LLM-based safety evaluation.
Does not block the response - runs in background.
"""
# In production: send to a queue (SQS, Redis, etc.)
# Here: simplified synchronous call
result = self.safety_judge(prompt, response)
if result.get("is_harmful"):
self.stats["llm_flagged"] += 1
self._queue_for_human_review(
request_id, prompt, response,
f"llm_judge: {result.get('harm_category', 'unknown')}"
)
def _queue_for_human_review(
self,
request_id: str,
prompt: str,
response: str,
reason: str,
) -> None:
"""Add to human review queue."""
self.stats["human_review_queue"].append({
"request_id": request_id,
"reason": reason,
"timestamp": datetime.utcnow().isoformat(),
})
print(f"SAFETY: Request {request_id} queued for human review (reason: {reason})")
def get_safety_report(self) -> dict:
total = self.stats["total"]
if total == 0:
return {}
return {
"total_requests": total,
"hard_block_rate": round(self.stats["hard_blocked"] / total, 6),
"llm_flag_rate": round(self.stats["llm_flagged"] / total, 6),
"review_queue_size": len(self.stats["human_review_queue"]),
"alerts": [
f"Hard block rate elevated: {self.stats['hard_blocked'] / total:.5%}"
if self.stats["hard_blocked"] / total > self.alert_threshold else None
],
}
Layer 4: Business Metrics
class BusinessMetricsTracker:
"""
Tracks business-level success metrics for LLM features.
Goes beyond technical quality to measure actual user value.
"""
def __init__(self):
self.sessions = {}
self.task_completions = []
def record_session_event(
self,
session_id: str,
event_type: str, # "start", "follow_up", "task_complete", "abandon"
metadata: dict = None,
) -> None:
"""Track multi-turn session progression."""
if session_id not in self.sessions:
self.sessions[session_id] = {
"events": [],
"start_time": datetime.utcnow().isoformat(),
"task_completed": False,
"abandoned": False,
}
event = {
"event_type": event_type,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
}
self.sessions[session_id]["events"].append(event)
if event_type == "task_complete":
self.sessions[session_id]["task_completed"] = True
elif event_type == "abandon":
self.sessions[session_id]["abandoned"] = True
def compute_task_completion_rate(self) -> dict:
"""
Task completion rate: fraction of sessions that ended in success.
High completion rate = model is actually helping users accomplish goals.
"""
n_sessions = len(self.sessions)
if n_sessions == 0:
return {}
completed = sum(1 for s in self.sessions.values() if s["task_completed"])
abandoned = sum(1 for s in self.sessions.values() if s["abandoned"])
in_progress = n_sessions - completed - abandoned
avg_turns = sum(
len(s["events"]) for s in self.sessions.values()
) / n_sessions
return {
"task_completion_rate": round(completed / n_sessions, 4),
"abandonment_rate": round(abandoned / n_sessions, 4),
"avg_turns_per_session": round(avg_turns, 2),
"n_sessions": n_sessions,
}
Observability Platforms Compared
| Platform | Strengths | Weaknesses | Best For |
|---|---|---|---|
| LangSmith | Deep LangChain integration, traces, datasets, evaluators | Tightly coupled to LangChain ecosystem | LangChain-based applications |
| Langfuse | Open-source, self-hostable, multi-provider | Less mature ecosystem | Privacy-sensitive deployments |
| Arize Phoenix | Strong ML monitoring background, embedding drift, LLM evals | Setup complexity | Teams with MLOps experience |
| Helicone | Easy setup, cost tracking, lightweight | Limited evaluation capabilities | Quick cost + latency monitoring |
| Weights & Biases | Excellent experiment tracking, strong visualization | Not LLM-specific | Research and fine-tuning workflows |
LangSmith Integration
from langsmith import Client, traceable
from langsmith.evaluation import evaluate as ls_evaluate
from langsmith.schemas import Run, Example
import os
# Set environment variables:
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_API_KEY=your_key
# LANGCHAIN_PROJECT=your_project_name
@traceable(name="rag_answer_generation", tags=["production", "rag"])
def generate_rag_answer(
question: str,
retrieved_contexts: list,
user_id: str = None,
) -> dict:
"""
LangSmith-traced RAG answer generation.
All inputs, outputs, and metadata are automatically logged.
"""
from anthropic import Anthropic
client = Anthropic()
context_text = "\n\n".join(retrieved_contexts)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=(
"Answer the question based only on the provided context. "
"If the context does not contain enough information, say so clearly."
),
messages=[{
"role": "user",
"content": f"Context:\n{context_text}\n\nQuestion: {question}"
}],
)
answer = message.content[0].text
# LangSmith automatically logs inputs and outputs
# Additional metadata for filtering and debugging:
return {
"answer": answer,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
"user_id": user_id,
}
def run_langsmith_evaluation(
dataset_name: str,
rag_function,
) -> dict:
"""
Run evaluation on a LangSmith dataset.
Evaluators are applied automatically and results logged to LangSmith.
"""
from langsmith.evaluation import LangChainStringEvaluator
client = Client()
# Custom evaluator
def faithfulness_evaluator(run: Run, example: Example) -> dict:
"""Check if answer is grounded in retrieved context."""
answer = run.outputs.get("answer", "")
question = example.inputs.get("question", "")
context = example.inputs.get("context", "")
# Simplified: in production, use full RAGAS faithfulness
n_answer_words = len(answer.split())
context_words = set(context.lower().split())
answer_words = answer.lower().split()
overlap = sum(1 for w in answer_words if w in context_words) / max(n_answer_words, 1)
return {
"key": "faithfulness",
"score": round(overlap, 4),
"comment": f"Word overlap with context: {overlap:.2%}",
}
results = ls_evaluate(
rag_function,
data=dataset_name,
evaluators=[faithfulness_evaluator],
experiment_prefix="production-eval",
metadata={"eval_type": "rag_faithfulness", "version": "1.0"},
)
return results
Runbook: LLM Quality Incident Response
When quality metrics degrade in production, follow this structured response:
INCIDENT RESPONSE RUNBOOK: LLM Quality Degradation
DETECTION
---------
Trigger: [Metric] dropped below threshold
Example: Golden dataset faithfulness dropped from 0.85 to 0.67 in daily eval
STEP 1: Isolate the scope (0-10 minutes)
- [ ] Check when the degradation started (hour-level granularity)
- [ ] Is it across all features or specific to one?
- [ ] Is it specific to certain query types / user segments?
- [ ] Check error rates and latency (separate from quality)
STEP 2: Check for recent changes (10-20 minutes)
- [ ] Model API: any changelog updates in the last 48h?
- [ ] Code deploys: any prompt changes, retrieval config changes?
- [ ] Data: did the knowledge base update? (for RAG systems)
- [ ] Sampling: is the golden dataset still representative?
STEP 3: Component isolation (20-40 minutes)
- [ ] Run component ablation: retrieval metrics, generation metrics separately
- [ ] Compare sampled production outputs from before and after degradation
- [ ] Check if faithfulness or answer relevancy is the primary driver
STEP 4: Mitigation
Option A (Model changed): Pin to previous model version if available
Option B (Prompt changed): Revert prompt to last known good version
Option C (Data changed): Identify which new documents are causing issues
Option D (Unclear): Route to human review queue, increase sampling rate
STEP 5: Post-mortem
- [ ] Document root cause
- [ ] Add detection for this failure mode
- [ ] Add regression test to golden dataset
- [ ] Update monitoring thresholds if needed
:::tip Log Everything in the First Month When first launching an LLM feature, log 100% of requests and responses (with appropriate privacy handling). You do not yet know which patterns are normal and which are anomalous. After 4–6 weeks of baseline data, you can implement intelligent sampling and set meaningful alert thresholds. :::
:::warning Latency Baselines Vary by Model and Task Do not apply a single latency threshold across all your LLM endpoints. A chat endpoint has a TTFT threshold of 1–2 seconds. A batch summarization endpoint tolerates 10+ seconds total latency. A coding assistant with long outputs tolerates high total latency but still needs low TTFT. Set per-endpoint thresholds. :::
Common Mistakes
:::danger Monitoring Infrastructure Instead of Content Many teams set up latency and error rate monitoring and call it "LLM monitoring." These metrics catch complete failures (model API down, timeouts) but miss content quality issues. The model can be fast, cheap, and returning 200 OK while silently producing wrong answers. Always monitor output content, not just infrastructure. :::
:::warning Not Distinguishing TTFT from Total Latency For streaming applications, users do not care about total latency - they care about TTFT and the smoothness of TBT. A model that takes 8 seconds total but streams the first token in 0.5 seconds with smooth 50ms TBT feels fast. A model that takes 6 seconds total but streams the first token in 5 seconds feels agonizingly slow. Optimize and alert on the right metric for your UX. :::
:::danger Ignoring the Feedback Loop User feedback (thumbs up/down, follow-up corrections, abandonment) is the highest-quality signal you have in production. Most teams implement the collection mechanism and then never actually read or act on the data. Build a weekly review process for negative feedback. It will find failure patterns that no automated system detected. :::
Interview Q&A
Q1: What are the four layers of LLM monitoring and why does each matter?
(1) Infrastructure: latency (TTFT, TBT), error rates, costs - catches complete failures and cost overruns, but misses quality issues. (2) Model quality: output quality drift, embedding distribution changes, user feedback - catches silent degradation in what the model says without infrastructure failures. (3) Safety: harmful content, policy violations, jailbreak patterns - critical for preventing user harm; errors here are rare but high-severity. (4) Business: task completion rates, user satisfaction, goal achievement - the ultimate measure of whether the LLM is providing value. Each layer catches failures the others miss. Infrastructure is necessary but not sufficient. Business metrics are the ground truth but lag behind technical issues.
Q2: What is TTFT and why is it different from total latency for LLM applications?
TTFT (Time to First Token) is the delay from sending a request until the first token of the response streams back. For streaming LLM applications, TTFT is the primary latency that affects user experience because the user sees content start appearing after TTFT, not after the full response is generated. Total latency (TTFT + generation time) matters for batch processing and non-streaming applications. In chat interfaces, a TTFT of 500ms with smooth streaming at 50 tokens/second feels fast even if the total response takes 8 seconds. A TTFT of 5 seconds feels broken even if the response then streams quickly. Set separate alerting thresholds for each.
Q3: How would you detect silent quality degradation in a production RAG system when you have no ground truth for production queries?
Several complementary approaches: (1) Continuous evaluation on a static golden dataset - even though production queries have no ground truth, your golden dataset does; run daily evaluation and alert on score drops. (2) Embedding-based drift detection - embed outputs and track the distribution; significant shifts in the embedding space indicate behavior changes, even without labels. (3) User feedback signals - thumbs down rate, correction rate, and abandonment rate are implicit quality signals; sustained increases indicate degradation. (4) Output statistics - track response length distributions, refusal rates, citation patterns (for RAG), and other structural features; changes may indicate model behavior shifts. (5) Canary queries - maintain 10–20 queries with known expected outputs; run these hourly and alert if the expected output format or key facts change.
Q4: A cost spike occurs - API spend is 10x normal for the past hour. How do you investigate?
Start with the per-user and per-feature cost breakdown. Most cost spikes are caused by: (1) One user discovering a prompt pattern that generates very long responses - check for any user with dramatically above-average token count. (2) A bug in a new feature that sends the same request in a loop - check for request patterns with identical prompts from the same source. (3) A large batch job that was not budget-estimated correctly - check if any scheduled job triggered. (4) An input that creates an extremely long context - some inputs (e.g., large documents) dramatically increase input tokens. Check the top 10 most expensive requests in the hour and look for the pattern. Implement per-user and per-feature hourly spend limits before launch, not after the first spike.
Q5: Compare LangSmith and Langfuse as LLM observability platforms. When would you use each?
LangSmith is tightly integrated with the LangChain ecosystem - it automatically traces LangChain chains, agents, and retrieval calls with minimal setup. It has a strong evaluation framework with built-in evaluators and dataset management. Best for teams already using LangChain who want deep integration with minimal setup. Weakness: tightly coupled to LangChain; using it without LangChain requires more manual instrumentation.
Langfuse is open-source and can be self-hosted, which matters for privacy-sensitive deployments (healthcare, legal, finance). It supports all major LLM providers and frameworks through an SDK that works with any Python code. It has strong cost tracking and a prompt management system. Best for teams that need data sovereignty (self-hosted), use multiple frameworks, or have strict compliance requirements. Weakness: less mature ecosystem, more setup required, evaluation features are less comprehensive than LangSmith.
For most product teams starting with LangChain: LangSmith. For enterprise deployments with data residency requirements: Langfuse self-hosted. For teams with strong MLOps backgrounds: consider Arize Phoenix for its broader ML monitoring capabilities.
:::tip 🎮 Interactive Playground
Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required.
:::
