Skip to main content

Observability for LLM Apps

The Bug That Took Three Weeks to Find

A startup's AI-powered writing assistant had been in production for four months. The product team noticed a slow degradation in user satisfaction scores - nothing dramatic, just a steady 2–3% decline per week for the past three weeks. The NPS was down. Support tickets about "the AI feeling worse" were up. But the engineering dashboard showed nothing unusual: p99 latency was stable, error rate was near zero, token usage was flat.

The team spent two weeks looking in the wrong places. They checked for model provider changes (none announced). They reviewed recent deployments (three, all minor). They ran A/B tests on new prompt variations (no significant difference). They were debugging blindly because their observability stack was built for traditional software - metrics, logs, traces - and none of those tools could tell them why the LLM's outputs had gotten worse.

The breakthrough came from a product manager who pulled 200 random conversation logs manually and compared them to a sample from eight weeks earlier. The answer was visible immediately: the system prompt had drifted. Three weeks before the degradation started, a developer had added a new feature that conditionally appended context to the system prompt. In a specific but common user scenario (accounts created via SSO), the appended context was subtly contradicting the main system prompt, causing the LLM to produce shorter, less helpful responses.

The fix took twenty minutes. But finding it took three weeks - because the team had no way to track which prompt version had generated each response, no way to compare output quality over time, and no way to correlate a system change with a quality degradation.

Traditional application performance monitoring (APM) measures whether your service is up, fast, and error-free. LLM observability measures something harder: whether your service is producing good outputs. These are different problems, and they require different tools.

Why Traditional APM Falls Short

In a traditional web service, a successful response is a successful response. HTTP 200 means the service worked. You can measure what matters - latency, error rate, throughput - and get a complete picture of health.

In an LLM application, a successful response (HTTP 200, valid JSON, no exceptions) can still be a bad response. The LLM returned an answer - but is it correct? Is it helpful? Is it grounded in the context? Is it consistent with what it returned yesterday for the same question? Did the prompt change between yesterday and today?

None of these questions are answerable by traditional APM metrics. You need a new observability layer.

The four questions LLM observability must answer:

  1. What happened? - full request/response trace with prompt, context, and output
  2. How much did it cost? - token usage, provider costs, per-feature attribution
  3. Is the quality changing? - automated quality metrics, human feedback correlation
  4. Why did it change? - prompt version tracking, model version tracking, input distribution monitoring

Four Layers of LLM Observability

Distributed Tracing for LLM Applications

A single LLM application request may involve many sub-steps: retrieval, reranking, multiple LLM calls, tool calls, response synthesis. Each step must be observable individually and as a composed trace.

The tracing vocabulary maps onto LLM concepts:

  • Trace: a single end-to-end user request
  • Span: one step within a trace (an LLM call, a retrieval, a tool call)
  • Span attributes: model, temperature, token counts, prompt version, input hash

Tool Comparison: LangSmith, Langfuse, Helicone, Arize Phoenix

FeatureLangSmithLangfuseHeliconeArize Phoenix
TracingYesYesYes (proxy)Yes
EvaluationYes (first-class)YesBasicYes
Prompt versioningYesYesLimitedNo
Dataset managementYesYesNoLimited
Self-hostableNoYesNoYes
SDK integrationLangChain nativePython/JS SDKProxy onlyPython SDK
LLM-as-judgeYesYesNoYes
PricingPer traceSelf-host freePer requestOSS free
Best forLangChain teamsSelf-host complianceNo-SDK-change teamsML platform teams

LangSmith is the most mature option if you use LangChain. Tracing, evaluation datasets, and prompt management are tightly integrated. The main downside is no self-hosting - all data goes to Anthropic's servers.

Langfuse is the OSS alternative. Self-hostable (Docker or managed), comparable features, and increasingly capable evaluation workflows. Good for teams with data residency requirements.

Helicone is the zero-integration option: route all your OpenAI requests through the Helicone proxy endpoint. No SDK changes, instant observability. Limited to what's observable at the HTTP layer - you cannot trace internal steps like retrieval.

Arize Phoenix comes from the ML observability world. Strong on drift detection, embedding visualization, and dataset management. Less mature on the LLM-specific features but catching up fast.

LangSmith Integration

import os
from langsmith import Client, traceable
from langsmith.evaluation import evaluate, LangChainStringEvaluator
from openai import OpenAI

# Set environment variables for automatic tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-assistant"


client = OpenAI()
ls_client = Client()


@traceable(name="rag_pipeline", run_type="chain")
def rag_pipeline(user_question: str, user_id: str) -> str:
"""
Decorator-based tracing. Every call is automatically logged to LangSmith.
All sub-functions decorated with @traceable appear as child spans.
"""
docs = retrieve_documents(user_question)
context = "\n\n".join(docs)
response = generate_response(user_question, context)
return response


@traceable(name="retrieve_documents", run_type="retriever")
def retrieve_documents(query: str) -> list[str]:
"""Retrieval span - LangSmith knows this is a retriever step."""
# Your vector search here
return ["Document 1...", "Document 2..."]


@traceable(name="generate_response", run_type="llm")
def generate_response(question: str, context: str) -> str:
"""LLM span - automatically captures model, tokens, latency."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Answer using only the provided context.",
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}",
},
],
max_tokens=1024,
)
return response.choices[0].message.content


# Add user feedback to a trace
def record_user_feedback(run_id: str, score: float, comment: str = None):
"""
Attach user feedback (thumbs up/down, rating) to the trace.
LangSmith uses this for evaluation dataset construction.
"""
ls_client.create_feedback(
run_id=run_id,
key="user_rating",
score=score, # 0.0 to 1.0
comment=comment,
)

LangSmith Evaluation

from langsmith.evaluation import evaluate, LangChainStringEvaluator
from langsmith import Client


def run_evaluation_suite(dataset_name: str):
"""
Run automated evaluation on a labeled dataset.
Compares current pipeline against ground truth answers.
"""
evaluators = [
# Built-in evaluators
LangChainStringEvaluator("qa"), # correctness vs ground truth
LangChainStringEvaluator("coherence"), # is the response coherent?
LangChainStringEvaluator("conciseness"), # is it concise?
]

results = evaluate(
rag_pipeline,
data=dataset_name,
evaluators=evaluators,
experiment_prefix="rag-v2", # version your experiments
metadata={"model": "gpt-4o", "retrieval": "hybrid-v3"},
)

print(f"Experiment: {results.experiment_name}")
print(f"Mean correctness: {results.get_aggregate_feedback()['qa']:.3f}")

Langfuse Integration (Self-Hosted)

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI

langfuse = Langfuse(
public_key="your-langfuse-public-key",
secret_key="your-langfuse-secret-key",
host="https://your-langfuse.internal.company.com", # self-hosted
)


@observe() # Langfuse equivalent of @traceable
def rag_pipeline_langfuse(user_question: str) -> str:
# Update trace metadata
langfuse_context.update_current_trace(
name="RAG Pipeline",
tags=["production", "v2"],
metadata={"user_segment": "enterprise"},
)

docs = retrieve_with_langfuse(user_question)
return generate_with_langfuse(user_question, docs)


@observe(name="retrieval")
def retrieve_with_langfuse(query: str) -> list[str]:
langfuse_context.update_current_observation(
input={"query": query},
metadata={"index": "production-v3", "top_k": 5},
)
docs = ["Doc 1...", "Doc 2..."]
langfuse_context.update_current_observation(
output={"doc_count": len(docs)},
)
return docs


@observe(name="llm_call", as_type="generation")
def generate_with_langfuse(question: str, docs: list[str]) -> str:
client = OpenAI()
context = "\n".join(docs)

response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer using only context."},
{"role": "user", "content": f"Context: {context}\n\nQ: {question}"},
],
)

content = response.choices[0].message.content

# Manually log token usage for cost tracking
langfuse_context.update_current_observation(
usage={
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens,
"unit": "TOKENS",
},
model="gpt-4o",
model_parameters={"temperature": 0.7},
)

return content

Custom Observability Callbacks

For teams not using LangChain or a specific framework, custom callbacks provide full control:

import time
import structlog
import uuid
from dataclasses import dataclass, field
from typing import Optional, Callable
from openai import OpenAI

log = structlog.get_logger()


@dataclass
class LLMCallRecord:
trace_id: str
span_id: str
parent_span_id: Optional[str]
span_name: str
model: str
prompt_version: str
input_tokens: int
output_tokens: int
latency_ms: float
cost_usd: float
success: bool
error: Optional[str]
metadata: dict = field(default_factory=dict)


class ObservableOpenAI:
"""
Wrapper around OpenAI client that emits structured traces.
Drop-in replacement for OpenAI() with full observability.
"""

# Cost table (USD per 1K tokens)
INPUT_COSTS = {
"gpt-4o": 0.0025,
"gpt-4o-mini": 0.00015,
}
OUTPUT_COSTS = {
"gpt-4o": 0.010,
"gpt-4o-mini": 0.00060,
}

def __init__(
self,
api_key: str,
emit_fn: Callable[[LLMCallRecord], None],
prompt_version: str = "unknown",
):
self.client = OpenAI(api_key=api_key)
self.emit = emit_fn
self.prompt_version = prompt_version

def chat_complete(
self,
messages: list[dict],
model: str = "gpt-4o",
trace_id: Optional[str] = None,
parent_span_id: Optional[str] = None,
span_name: str = "llm_call",
metadata: dict = None,
**kwargs,
) -> str:
trace_id = trace_id or str(uuid.uuid4())
span_id = str(uuid.uuid4())
start = time.monotonic()
error = None
response = None

try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs,
)
return response.choices[0].message.content

except Exception as e:
error = str(e)
raise

finally:
latency_ms = (time.monotonic() - start) * 1000

if response and response.usage:
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
cost = (
self.INPUT_COSTS.get(model, 0.01) * input_tokens / 1000
+ self.OUTPUT_COSTS.get(model, 0.03) * output_tokens / 1000
)
else:
input_tokens = output_tokens = 0
cost = 0.0

record = LLMCallRecord(
trace_id=trace_id,
span_id=span_id,
parent_span_id=parent_span_id,
span_name=span_name,
model=model,
prompt_version=self.prompt_version,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=round(latency_ms, 1),
cost_usd=round(cost, 6),
success=error is None,
error=error,
metadata=metadata or {},
)
self.emit(record)


def emit_to_structlog(record: LLMCallRecord):
log.info(
"llm_call",
trace_id=record.trace_id,
span_id=record.span_id,
span_name=record.span_name,
model=record.model,
prompt_version=record.prompt_version,
input_tokens=record.input_tokens,
output_tokens=record.output_tokens,
latency_ms=record.latency_ms,
cost_usd=record.cost_usd,
success=record.success,
error=record.error,
**record.metadata,
)

Prompt Versioning

The three-week debugging story at the top of this lesson was caused by prompt drift. Prompt versioning solves this by associating every LLM response with the exact prompt version that generated it.

What to version:

  • System prompt text (hash + human-readable label like v2.3-tone-update)
  • Few-shot examples
  • Temperature and other generation parameters
  • Retrieval configuration (index version, chunk size, top-k)
import hashlib
from dataclasses import dataclass


@dataclass
class PromptVersion:
label: str # e.g., "v2.3-tone-update"
system_prompt: str
temperature: float
max_tokens: int
model: str
created_at: str # ISO 8601

@property
def hash(self) -> str:
content = f"{self.system_prompt}{self.temperature}{self.max_tokens}{self.model}"
return hashlib.sha256(content.encode()).hexdigest()[:12]

def as_metadata(self) -> dict:
return {
"prompt_version_label": self.label,
"prompt_version_hash": self.hash,
"model": self.model,
"temperature": self.temperature,
}


# Centralized prompt registry - source of truth for all prompt versions
PROMPT_REGISTRY: dict[str, PromptVersion] = {
"support-v2.3": PromptVersion(
label="support-v2.3",
system_prompt="""You are a helpful customer support assistant for Acme SaaS.
You help with account management, billing, and product features.
Always be concise and friendly. If you cannot help, say so clearly.""",
temperature=0.3,
max_tokens=512,
model="gpt-4o",
created_at="2026-02-15T09:00:00Z",
),
}

ACTIVE_PROMPT = "support-v2.3"


def get_active_prompt() -> PromptVersion:
return PROMPT_REGISTRY[ACTIVE_PROMPT]

When you change the system prompt, bump the version label. Every LLM call logs prompt_version_label and prompt_version_hash. If quality drops, you can filter the trace dashboard by prompt version to identify exactly when the change took effect.

Quality Metrics and LLM-as-Judge

LLM applications need quality metrics beyond latency and error rate. The most scalable approach is LLM-as-judge: use a separate LLM call to evaluate the quality of the primary LLM's response.

from openai import OpenAI
from dataclasses import dataclass


@dataclass
class QualityScore:
relevance: float # 0-1: does the response answer the question?
groundedness: float # 0-1: is the response grounded in context?
coherence: float # 0-1: is the response logically coherent?
conciseness: float # 0-1: is the response appropriately concise?
overall: float # 0-1: weighted average


QUALITY_EVAL_PROMPT = """You are evaluating an AI assistant's response for quality.

User question: {question}
Retrieved context: {context}
AI response: {response}

Rate each dimension from 1-5:
- Relevance: Does the response directly answer the user's question?
- Groundedness: Is the response supported by the provided context?
- Coherence: Is the response logically coherent and well-structured?
- Conciseness: Is the response appropriately concise (not too verbose, not too brief)?

Respond in JSON format:
{{"relevance": N, "groundedness": N, "coherence": N, "conciseness": N, "reasoning": "brief explanation"}}"""


class LLMJudge:
def __init__(self, client: OpenAI):
self.client = client

def evaluate(
self,
question: str,
context: str,
response: str,
context_limit: int = 2000,
) -> QualityScore:
import json

context_truncated = context[:context_limit]

eval_response = self.client.chat.completions.create(
model="gpt-4o-mini", # Use cheap model for evaluation
max_tokens=300,
temperature=0,
response_format={"type": "json_object"},
messages=[
{
"role": "user",
"content": QUALITY_EVAL_PROMPT.format(
question=question,
context=context_truncated,
response=response,
),
}
],
)

scores = json.loads(eval_response.choices[0].message.content)

# Normalize 1-5 scale to 0-1
def norm(x):
return (float(x) - 1) / 4

return QualityScore(
relevance=norm(scores.get("relevance", 3)),
groundedness=norm(scores.get("groundedness", 3)),
coherence=norm(scores.get("coherence", 3)),
conciseness=norm(scores.get("conciseness", 3)),
overall=norm(
(
scores.get("relevance", 3) * 0.4
+ scores.get("groundedness", 3) * 0.3
+ scores.get("coherence", 3) * 0.2
+ scores.get("conciseness", 3) * 0.1
)
),
)

:::note LLM-as-judge bias and calibration LLM judges have well-documented biases: they prefer longer responses, they prefer responses that match their own style, and they show positional bias (preferring options listed first). Mitigate these by: (1) using a different model family than the one being evaluated - if generating with Claude, evaluate with GPT-4o and vice versa, (2) running evaluations multiple times and averaging, (3) calibrating your judge against human ratings on 100+ examples before trusting it in production. :::

Feedback Loop: User Signal to Dataset

User feedback is the most valuable signal for quality. Thumbs up/down, star ratings, and explicit corrections are ground truth that automated evaluators approximate.

import json
import time
import redis.asyncio as aioredis
from dataclasses import dataclass
from typing import Optional


@dataclass
class UserFeedback:
trace_id: str
user_id: str
rating: float # 0 or 1 (thumbs down/up), or 0-5 (star rating)
feedback_text: Optional[str]
created_at: float


class FeedbackCollector:
def __init__(self, redis_client, dataset_threshold: int = 50):
self.redis = redis_client
self.dataset_threshold = dataset_threshold

async def record(
self,
trace_id: str,
user_id: str,
rating: float,
feedback_text: Optional[str] = None,
):
feedback = UserFeedback(
trace_id=trace_id,
user_id=user_id,
rating=rating,
feedback_text=feedback_text,
created_at=time.time(),
)

# Store in Redis stream for real-time processing
await self.redis.xadd(
"feedback_stream",
{
"trace_id": trace_id,
"user_id": user_id,
"rating": str(rating),
"feedback_text": feedback_text or "",
"created_at": str(feedback.created_at),
},
)

# Track aggregate ratings per prompt version
prompt_key = f"quality:{await self._get_prompt_version(trace_id)}"
pipe = self.redis.pipeline()
pipe.incr(f"{prompt_key}:total")
if rating >= 0.7:
pipe.incr(f"{prompt_key}:positive")
await pipe.execute()

async def _get_prompt_version(self, trace_id: str) -> str:
"""Look up the prompt version used for this trace."""
data = await self.redis.get(f"trace_meta:{trace_id}")
if data:
meta = json.loads(data)
return meta.get("prompt_version", "unknown")
return "unknown"

async def get_quality_by_prompt_version(self) -> dict[str, float]:
"""Return positive rate per prompt version for dashboard."""
pattern = "quality:*:total"
keys = await self.redis.keys(pattern)
results = {}
for key in keys:
version = key.replace("quality:", "").replace(":total", "")
total = int(await self.redis.get(key) or 0)
positive = int(await self.redis.get(f"quality:{version}:positive") or 0)
if total > 10: # minimum sample size
results[version] = positive / total
return results

Drift Detection

Model outputs drift when the input distribution changes, the model provider updates the underlying model, or your prompt changes. Detecting drift early prevents silent quality degradation.

Input distribution drift: track the embedding distribution of user queries over time. If the centroid or variance shifts significantly, your input distribution has changed - your RAG index or prompt may need to be updated.

Output distribution drift: track statistics of generated responses over time: average length, vocabulary richness, sentiment score. A sudden change in any of these signals an output distribution shift.

import numpy as np
from scipy.stats import ks_2samp
from collections import deque


class DriftDetector:
"""
Detects distribution drift in LLM inputs and outputs.
Uses a sliding window comparison against a reference baseline.
"""

def __init__(self, window_size: int = 1000, p_value_threshold: float = 0.05):
self.window_size = window_size
self.p_value_threshold = p_value_threshold
self.reference_samples: list[float] = [] # baseline metrics
self.current_window: deque = deque(maxlen=window_size)

def add_reference_sample(self, metric_value: float):
"""Add a metric from the reference period (e.g., first 2 weeks in production)."""
self.reference_samples.append(metric_value)

def add_current_sample(self, metric_value: float):
"""Add a metric from the current period."""
self.current_window.append(metric_value)

def detect_drift(self) -> tuple[bool, float]:
"""
Run KS test between reference and current distributions.
Returns (drift_detected, p_value).
"""
if len(self.reference_samples) < 100 or len(self.current_window) < 50:
return False, 1.0

stat, p_value = ks_2samp(
self.reference_samples,
list(self.current_window),
)
drift_detected = p_value < self.p_value_threshold
return drift_detected, p_value


# Track multiple drift signals
response_length_drift = DriftDetector()
quality_score_drift = DriftDetector()


def record_response_metrics(response: str, quality_score: float):
response_length_drift.add_current_sample(len(response))
quality_score_drift.add_current_sample(quality_score)

# Check drift every N samples
length_drift, length_p = response_length_drift.detect_drift()
quality_drift, quality_p = quality_score_drift.detect_drift()

if length_drift:
log.warning(
"response_length_drift_detected",
p_value=round(length_p, 4),
)

if quality_drift:
log.error(
"quality_score_drift_detected",
p_value=round(quality_p, 4),
)

Cost Attribution

Understanding where your LLM costs come from is essential for optimization and capacity planning.

from dataclasses import dataclass, field
from collections import defaultdict
import time


@dataclass
class CostSummary:
total_usd: float
by_feature: dict[str, float]
by_model: dict[str, float]
by_user_tier: dict[str, float]
period_start: float
period_end: float


class CostAttributor:
"""
Tracks LLM cost by feature, model, and user tier.
Designed to feed a cost dashboard.
"""

def __init__(self, redis_client):
self.redis = redis_client

async def record(
self,
cost_usd: float,
model: str,
feature: str,
user_tier: str,
):
now = time.time()
day_key = int(now // 86400) * 86400 # round to day start

pipe = self.redis.pipeline()
pipe.incrbyfloat(f"cost:total:{day_key}", cost_usd)
pipe.incrbyfloat(f"cost:feature:{feature}:{day_key}", cost_usd)
pipe.incrbyfloat(f"cost:model:{model}:{day_key}", cost_usd)
pipe.incrbyfloat(f"cost:tier:{user_tier}:{day_key}", cost_usd)
# Expire after 90 days
for key in [
f"cost:total:{day_key}",
f"cost:feature:{feature}:{day_key}",
f"cost:model:{model}:{day_key}",
f"cost:tier:{user_tier}:{day_key}",
]:
pipe.expire(key, 90 * 86400)
await pipe.execute()

async def get_daily_summary(self, day_start: int = None) -> CostSummary:
if day_start is None:
day_start = int(time.time() // 86400) * 86400

total = float(await self.redis.get(f"cost:total:{day_start}") or 0)

# Aggregate by feature
feature_keys = await self.redis.keys(f"cost:feature:*:{day_start}")
by_feature = {}
for key in feature_keys:
feature = key.split(":")[2]
by_feature[feature] = float(await self.redis.get(key) or 0)

# Aggregate by model
model_keys = await self.redis.keys(f"cost:model:*:{day_start}")
by_model = {}
for key in model_keys:
model = key.split(":")[2]
by_model[model] = float(await self.redis.get(key) or 0)

# Aggregate by tier
tier_keys = await self.redis.keys(f"cost:tier:*:{day_start}")
by_tier = {}
for key in tier_keys:
tier = key.split(":")[2]
by_tier[tier] = float(await self.redis.get(key) or 0)

return CostSummary(
total_usd=total,
by_feature=by_feature,
by_model=by_model,
by_user_tier=by_tier,
period_start=day_start,
period_end=day_start + 86400,
)

Alerting: What to Page On

Not everything that goes wrong with an LLM application warrants an immediate page. Define alert severity based on user impact.

SignalConditionSeverityAction
Error rateabove 5% for 5 minutesCriticalPage on-call
p99 latencyabove 15s for 5 minutesHighPage on-call
Quality scoredrops 10+ points vs 7-day averageHighPage on-call
Cost per hour3x above 7-day averageHighPage on-call
Guardrail block rate10x above baselineMediumSlack alert
Cache hit ratedrops below 5%LowSlack alert
Prompt version mismatchunexpected version in productionCriticalPage on-call
import asyncio
from typing import Callable


class LLMAlertManager:
def __init__(self, pager_fn: Callable, slack_fn: Callable):
self.page = pager_fn
self.slack = slack_fn

async def check_all(self, metrics: dict):
checks = [
self._check_error_rate(metrics),
self._check_latency(metrics),
self._check_quality(metrics),
self._check_cost(metrics),
]
await asyncio.gather(*checks)

async def _check_error_rate(self, metrics: dict):
error_rate = metrics.get("error_rate_5m", 0)
if error_rate > 0.05:
await self.page(
title="LLM Error Rate Critical",
message=f"Error rate {error_rate:.1%} exceeds 5% threshold",
severity="critical",
)

async def _check_quality(self, metrics: dict):
current = metrics.get("quality_score_1h", 1.0)
baseline = metrics.get("quality_score_7d_avg", 1.0)
drop = (baseline - current) / baseline if baseline > 0 else 0
if drop > 0.10:
await self.page(
title="LLM Quality Degradation",
message=(
f"Quality score dropped {drop:.1%} "
f"(current: {current:.3f}, baseline: {baseline:.3f})"
),
severity="high",
)

async def _check_latency(self, metrics: dict):
p99 = metrics.get("latency_p99_5m_ms", 0)
if p99 > 15000:
await self.page(
title="LLM Latency High",
message=f"p99 latency {p99:.0f}ms exceeds 15s threshold",
severity="high",
)

async def _check_cost(self, metrics: dict):
current_hourly = metrics.get("cost_usd_1h", 0)
baseline_hourly = metrics.get("cost_usd_7d_hourly_avg", 0)
if baseline_hourly > 0 and current_hourly > baseline_hourly * 3:
await self.page(
title="LLM Cost Anomaly",
message=(
f"Hourly cost ${current_hourly:.2f} is "
f"{current_hourly / baseline_hourly:.1f}x above baseline"
),
severity="high",
)

Common Mistakes

:::danger Not logging the prompt version with every response If you cannot answer "which prompt version generated this response?", you cannot debug quality regressions, roll back prompt changes, or understand the effect of A/B tests. Treat prompt version like software version - it belongs in every trace record. :::

:::danger Using latency as a proxy for quality A fast response is not a good response. A slow response is not a bad response. LLM quality is orthogonal to latency. Teams that optimize for latency without measuring quality often accidentally optimize for short, low-effort responses - which are also lower quality. Track both independently. :::

:::warning Running LLM-as-judge evaluations with the same model family you are evaluating If you use GPT-4o to evaluate GPT-4o responses, the judge has a systematic preference for its own outputs. Use a different model family for evaluation - Claude evaluating GPT-4o responses, or GPT-4o evaluating Claude responses. The cross-model evaluation is more calibrated. :::

:::danger No baseline for quality metrics A quality score of 0.72 is meaningless without a baseline. Is 0.72 good or bad for your task? Establish a baseline during the first two weeks in production (before any prompt changes) and track relative changes. Set alerts on drops below baseline, not on absolute threshold values. :::

:::warning Sampling all traffic for quality evaluation Running LLM-as-judge on 100% of traffic doubles your LLM costs. Use stratified sampling: evaluate 5–10% of traffic by default, oversample low-rated responses (thumbs down, low satisfaction), and evaluate 100% of responses that triggered a guardrail. This gives maximum signal per evaluation dollar. :::

Interview Questions

Q: Why does traditional APM not work for LLM applications, and what do you add?

A: Traditional APM monitors whether your service is available, fast, and error-free. HTTP 200 with no exceptions means success. LLM applications have a fourth dimension: whether the response is good. A 200 response with a hallucinated answer, an off-topic response, or a response that repeats yesterday's cached wrong answer - these are all traditional APM successes and product failures. Traditional APM also has no concept of prompt identity - it cannot correlate a quality change with a specific prompt modification. You extend APM with four additional layers: LLM-specific metrics (token counts, model version, cache hits), quality metrics (groundedness, relevance, user ratings), safety metrics (guardrail block rates, toxicity detection), and prompt provenance (which prompt version generated this response). LangSmith, Langfuse, and Helicone provide the infrastructure for these additional layers.


Q: How do you build a feedback loop from user ratings to automated evaluations?

A: Five-step pipeline. First, collect signal: every LLM response gets a thumbs up/down or star rating in the UI. Store the rating with the trace ID so you can look up the exact prompt, context, and response that generated it. Second, build the dataset: every week, sample 200 conversations stratified by rating (100 low-rated, 100 high-rated). Label them with the actual quality dimensions you care about: relevance, groundedness, tone. Third, run automated evaluation against the dataset: on every deployment, run your LLM-as-judge evaluator against the labeled dataset. Track the correlation between the judge's scores and human ratings - if correlation drops below 0.7, your judge needs recalibration. Fourth, gate deployments on evaluation results: if a new prompt version scores worse on the dataset than the current version, require human approval before deploying. Fifth, grow the dataset over time: add new labeled examples weekly, especially edge cases and failures. The evaluation dataset becomes a regression test suite for LLM quality.


Q: How do you detect that the quality of an LLM application is degrading in production?

A: Multiple overlapping signals. Quantitative: track daily user satisfaction metrics (thumbs up rate, CSAT score) and set an alert if it drops 5% or more versus the 7-day moving average. Track automated LLM-as-judge quality scores on a sample of production traffic and alert on drift. Track response length distribution - sudden shortening of responses often signals a model or prompt change. Qualitative: read 20 random conversations per week. This is cheap and catches patterns that metrics miss. Operational: monitor when prompt versions change, when the LLM provider announces model updates, and when input query distribution shifts. Correlate these events with quality metrics. In the opening story, the three-week debugging could have been resolved in hours with: (1) prompt version logged per response, (2) quality metric dashboard, (3) correlation of quality drop with the deployment three weeks earlier. The deployment was the cause - but without prompt version tracking, there was no way to correlate them.


Q: Compare LangSmith, Langfuse, and Helicone. When would you choose each?

A: LangSmith if your team uses LangChain heavily and data residency is not a concern. Tracing, evaluation, and prompt management are deeply integrated and work out of the box with minimal configuration. The managed SaaS model means no infrastructure overhead. Langfuse if you need self-hosting - compliance, data residency, or cost. Langfuse is open-source, can be deployed in your own infrastructure, and has comparable features to LangSmith (tracing, evaluation datasets, prompt management). Slightly more setup effort but full control. Helicone if you cannot or do not want to change your application code. Helicone works as a proxy - you change your OpenAI client's base URL to route through Helicone, and you get request logging with zero SDK integration. The limitation is that you can only observe what is visible at the HTTP boundary - you cannot trace internal steps like retrieval or reranking. Arize Phoenix if you already have an ML observability platform built on Arize and want to extend it to LLMs. Best for teams that care about embedding drift, input distribution monitoring, and connecting LLM observability to broader ML model monitoring.


Q: How would you attribute LLM costs to specific features or experiments for a product with ten different LLM-powered features?

A: Three-layer attribution. First, feature tagging: every LLM call includes a feature_id metadata field (e.g., "search", "summarizer", "support-chat"). This is a convention enforced by your LLM gateway or SDK wrapper. Second, experiment tagging: every A/B test assigns a experiment_id to requests in the treatment group. The cost difference between control and treatment groups is the marginal cost of the experiment. Third, user tier tagging: tag requests with the user's tier (free, starter, professional). This reveals which tier is most expensive to serve - often free-tier users consume disproportionate resources because they are exploring, while paid users have more focused usage. Implementation: Redis hash maps keyed by cost:{feature}:{day}, cost:{experiment}:{day}, cost:{tier}:{day}. Increment on every LLM call completion. Aggregate into a daily cost breakdown dashboard. Alert when any single feature exceeds its allocated budget for the day - this catches runaway feature usage before it becomes a billing incident.


Q: Design a system to detect and alert on prompt regressions automatically.

A: Regression detection pipeline with four components. First, baseline capture: when a prompt version is deployed, run it on a held-out evaluation dataset of 200 labeled examples and store the evaluation scores. This is the baseline for that prompt version. Second, deployment gate: before deploying a new prompt version to production, run it on the same evaluation dataset and compare scores against the current production version. If any quality dimension drops by more than 5%, block the deployment and send a Slack alert with the comparison. Third, production monitoring: even after deployment, run LLM-as-judge on a 10% sample of production traffic daily. Compare the rolling 7-day average against the baseline captured at deployment time. Alert if quality drops beyond a threshold. Fourth, prompt version pinning for rollback: every production response is tagged with its prompt version hash. If a quality regression is detected, the rollback is a one-line configuration change - switch the active prompt version back to the previous hash. The fix is instant and auditable.

:::tip 🎮 Interactive Playground

Visualize this concept: Try the LLM Observability & Tracing demo on the EngineersOfAI Playground - no code required.

:::

© 2026 EngineersOfAI. All rights reserved.