:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Observability demo on the EngineersOfAI Playground - no code required. :::
Tracing LLM Applications
The Four-Day Investigation
A production AI system. Tuesday afternoon. A senior support engineer opens a ticket from an enterprise customer: "Your AI summary feature gave our legal team wrong information during a client meeting. The summary said the contract expires in 2027. It actually expires in 2025." The engineer escalates to the AI team. The AI team opens their dashboards: Datadog shows HTTP 200, latency 1.2 seconds, no errors. Everything green.
They ask for the customer's session ID. They query the logs. They find the HTTP request with a 200 response body - but the body is the API response envelope, not the raw LLM output. The actual prompt, the raw model output, the exact model version used, the token counts - none of it is in the logs. They can see that a request was made and a response was returned. They cannot see what was asked or what was said.
The investigation takes four days. They eventually reconstruct what happened by cross-referencing timestamps with the customer's screen recording. The model was called with an incomplete document chunk - a retrieval bug - and hallucinated the year. The fix takes two hours. The investigation takes four days. The entire four days could have been eliminated with one thing: a trace record that captured the exact system prompt, the exact user message, the exact model version, the exact output, and the document chunk that was retrieved.
This is the core value proposition of LLM tracing. Not performance monitoring - you already have that. Understanding what your AI said, to whom, and why.
Why Traditional APM Fails for AI
Traditional Application Performance Monitoring tools - Datadog, New Relic, Dynatrace, Sentry - were designed for deterministic service architectures. They answer: is the service up? What is the latency? Are errors spiking? What is the call graph?
These questions matter for LLM applications. But they are insufficient because LLMs introduce a fundamentally new failure mode: semantic incorrectness. An LLM API call can be:
- HTTP 200 ✓
- Latency 900ms (normal) ✓
- Zero exceptions ✓
- No error codes ✓
- And completely, confidently wrong ✗
APM tools have no concept of "the response was factually incorrect." They cannot measure whether the output was helpful. They cannot record the exact prompt that was sent. They cannot tell you which version of your system prompt was active when the bad output occurred. They cannot calculate how much a specific conversation cost, or flag that the model stopped generating because it hit max_tokens rather than naturally finishing.
LLM-specific tracing adds three capabilities that traditional APM does not:
- Semantic recording: capture the exact prompt text (system + user), exact output text, and model version - not just request/response envelopes
- LLM-specific metadata: token counts, cost per call, model temperature, finish reason (
end_turnvsmax_tokensmatters enormously) - Quality scoring: attach a quality score to each trace from an LLM judge or user feedback, making quality as queryable as latency
Trace Structure for LLM Applications
A trace in the LLM context is a hierarchical record of one user request, from entry point to final response. It is composed of spans - individual units of work within that request.
Every good trace answers eight diagnostic questions:
- Who made the request? -
user_id,session_id - What was asked? - exact user message, system prompt version
- What model was called? - exact model name and version string
- What was the response? - verbatim output (stored, not just logged)
- How much did it cost? - token counts, computed cost in USD
- How long did it take? - latency in ms at each pipeline step
- Was it good? - quality score from LLM judge or user feedback
- What was retrieved? - for RAG: which documents, their scores, the query
If you cannot answer all eight questions by querying your trace store, your tracing is incomplete.
Key Span Attributes for LLM Calls
| Attribute | Description | Example |
|---|---|---|
gen_ai.system | The AI provider | anthropic |
gen_ai.request.model | The exact model requested | claude-opus-4-6 |
gen_ai.response.model | Actual model used (may differ) | claude-opus-4-6 |
gen_ai.request.max_tokens | Token limit requested | 1024 |
gen_ai.request.temperature | Sampling temperature | 0.3 |
gen_ai.usage.input_tokens | Tokens consumed for input | 1247 |
gen_ai.usage.output_tokens | Tokens in response | 183 |
gen_ai.finish_reason | Why generation stopped | end_turn, max_tokens |
gen_ai.request.system | System prompt (truncated) | You are a helpful... |
llm.cost_usd | Computed cost in USD | 0.006321 |
llm.latency_ms | Time to complete | 1413 |
llm.quality_score | LLM judge score 0-1 | 0.88 |
user.id | User identifier | user-42 |
session.id | Session identifier | sess-789 |
app.feature | Which product feature | document-summary |
app.prompt.version | Prompt version used | v1.2.0 |
The OpenTelemetry GenAI Semantic Conventions
OpenTelemetry (OTel) is the CNCF standard for distributed tracing, metrics, and logging. In 2024, the OTel community established GenAI semantic conventions - a standard set of span attributes for LLM calls. This is important because it creates vendor-neutral instrumentation: you instrument once using OTel conventions, and export to any backend (Jaeger, Grafana Tempo, LangSmith, Langfuse, Arize Phoenix, Datadog).
Without standards, every team creates their own attribute names: llm_prompt, gpt_input, claude_request_text. When you switch backends, you have to remap everything. With OTel GenAI conventions, the attribute names are fixed and every compliant backend understands them natively.
The GenAI conventions define:
gen_ai.system:anthropic|openai|google_vertexai|aws_bedrock|coheregen_ai.operation.name:chat|text_completion|embeddingsgen_ai.request.*: model, temperature, max_tokens, top_p, stop_sequencesgen_ai.usage.*: input_tokens, output_tokens, total_tokensgen_ai.response.*: finish_reasons, model (what the server reports)
:::info Why finish_reason matters in production
When gen_ai.finish_reason is max_tokens, the model's response was cut off mid-sentence. Responses that hit max_tokens are often incomplete and can lead to confusing or incorrect outputs. Track this metric separately - a high max_tokens rate is a prompt design problem (your output budget is too small) that looks like high quality in operational metrics but is a quality failure.
:::
Complete Production Tracer Implementation
Here is a production-grade tracing setup that wraps the Anthropic client and records full traces to any OTLP-compatible backend.
# observability/tracer.py
import anthropic
import time
import os
from contextlib import contextmanager
from datetime import datetime, timezone
from typing import Optional, Any
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace import Status, StatusCode
# ── Token Cost Table ($/1M tokens) ────────────────────────────────────────────
COST_PER_MTK: dict[str, dict[str, float]] = {
"claude-opus-4-6": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.0},
# Fallback for unknown models
"_default": {"input": 3.0, "output": 15.0},
}
def compute_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Compute the USD cost of an LLM call given model and token counts."""
pricing = COST_PER_MTK.get(model, COST_PER_MTK["_default"])
return (
input_tokens * pricing["input"] +
output_tokens * pricing["output"]
) / 1_000_000
# ── OTel Setup ─────────────────────────────────────────────────────────────────
def setup_tracer(
service_name: str = "llm-application",
otlp_endpoint: str = "http://localhost:4317",
export_to_console: bool = False,
) -> trace.Tracer:
"""
Set up an OpenTelemetry tracer with OTLP export.
Sends traces to any OTel-compatible backend:
- Jaeger: http://jaeger:4317
- Grafana Tempo: http://tempo:4317
- Langfuse: https://cloud.langfuse.com/api/public/otel
- Phoenix: http://phoenix:6006/v1/traces
- Datadog: http://otel-collector:4317 (via collector)
"""
resource = Resource.create({
"service.name": service_name,
"service.version": os.getenv("APP_VERSION", "0.0.0"),
"deployment.environment": os.getenv("ENVIRONMENT", "development"),
})
provider = TracerProvider(resource=resource)
if export_to_console:
provider.add_span_processor(
BatchSpanProcessor(ConsoleSpanExporter())
)
else:
otlp_exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
insecure=True, # set False and add TLS certs for production
)
provider.add_span_processor(
BatchSpanProcessor(
otlp_exporter,
max_queue_size=2048, # buffer up to 2048 spans
max_export_batch_size=512, # export in batches of 512
export_timeout_millis=30_000, # 30s export timeout
)
)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
# ── Instrumented Anthropic Client ──────────────────────────────────────────────
class TracedAnthropicClient:
"""
A thin wrapper around the Anthropic client that records every call
as an OpenTelemetry span with full GenAI semantic convention attributes.
Design goals:
- Zero extra latency on the critical path (spans close synchronously)
- Full content capture by default, with a PII-safe mode
- Cost tracking per call
- Quality scoring via LLM judge (optional, async-safe)
"""
def __init__(
self,
tracer: trace.Tracer,
enable_content_capture: bool = True,
content_max_chars: int = 4000,
):
self._client = anthropic.Anthropic()
self._tracer = tracer
self._capture_content = enable_content_capture
self._content_max_chars = content_max_chars
def messages_create(
self,
model: str,
messages: list[dict],
max_tokens: int,
system: Optional[str] = None,
temperature: float = 0.0,
# Application-level trace attributes
operation_name: str = "chat",
span_name: Optional[str] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
feature_name: Optional[str] = None,
prompt_version: Optional[str] = None,
extra_attributes: Optional[dict] = None,
) -> anthropic.types.Message:
"""
Call the Anthropic Messages API with full OTel tracing.
Records all GenAI semantic convention attributes plus application context.
Args:
model: The Anthropic model to call (e.g., "claude-opus-4-6")
messages: The messages array (OpenAI-compatible format)
max_tokens: Maximum tokens to generate
system: System prompt text
temperature: Sampling temperature (0.0 = deterministic)
operation_name: OTel GenAI operation name ("chat", "completion")
span_name: Override span name (defaults to "anthropic {operation_name}")
user_id: User identifier for tracing
session_id: Session identifier for tracing
feature_name: Product feature name for cost attribution
prompt_version: Version of the prompt template being used
extra_attributes: Any additional key-value attributes
Returns:
anthropic.types.Message - the full API response
"""
effective_span_name = span_name or f"anthropic {operation_name}"
with self._tracer.start_as_current_span(effective_span_name) as span:
# ── GenAI Request Attributes (OTel GenAI semantic conventions) ────
span.set_attribute("gen_ai.system", "anthropic")
span.set_attribute("gen_ai.operation.name", operation_name)
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.request.max_tokens", max_tokens)
span.set_attribute("gen_ai.request.temperature", temperature)
# ── Content capture (skip for PII-sensitive features) ─────────────
if self._capture_content:
if system:
span.set_attribute(
"gen_ai.request.system",
system[:self._content_max_chars]
)
user_msgs = [
m["content"] for m in messages if m.get("role") == "user"
]
if user_msgs:
last_user = str(user_msgs[-1])
span.set_attribute(
"gen_ai.prompt.user",
last_user[:self._content_max_chars]
)
# ── Application-level attributes ───────────────────────────────────
if user_id:
span.set_attribute("user.id", user_id)
if session_id:
span.set_attribute("session.id", session_id)
if feature_name:
span.set_attribute("app.feature", feature_name)
if prompt_version:
span.set_attribute("app.prompt.version", prompt_version)
span.set_attribute(
"app.timestamp",
datetime.now(timezone.utc).isoformat()
)
if extra_attributes:
for key, value in extra_attributes.items():
span.set_attribute(key, str(value))
start_ns = time.monotonic_ns()
try:
# ── The actual API call ────────────────────────────────────────
kwargs: dict[str, Any] = {
"model": model,
"max_tokens": max_tokens,
"messages": messages,
"temperature": temperature,
}
if system:
kwargs["system"] = system
response = self._client.messages.create(**kwargs)
duration_ms = (time.monotonic_ns() - start_ns) / 1_000_000
# ── GenAI Response Attributes ──────────────────────────────────
span.set_attribute("gen_ai.response.model", response.model)
span.set_attribute(
"gen_ai.usage.input_tokens",
response.usage.input_tokens
)
span.set_attribute(
"gen_ai.usage.output_tokens",
response.usage.output_tokens
)
span.set_attribute(
"gen_ai.usage.total_tokens",
response.usage.input_tokens + response.usage.output_tokens,
)
span.set_attribute(
"gen_ai.finish_reason",
str(response.stop_reason)
)
if self._capture_content and response.content:
span.set_attribute(
"gen_ai.response.text",
response.content[0].text[:self._content_max_chars],
)
# ── Cost tracking ──────────────────────────────────────────────
cost = compute_cost(
model,
response.usage.input_tokens,
response.usage.output_tokens,
)
span.set_attribute("llm.cost_usd", round(cost, 8))
span.set_attribute("llm.latency_ms", round(duration_ms, 2))
# Alert on max_tokens truncation
if response.stop_reason == "max_tokens":
span.set_attribute("llm.truncated", True)
span.add_event(
"response_truncated",
{"reason": "hit max_tokens limit", "limit": max_tokens}
)
span.set_status(Status(StatusCode.OK))
return response
except anthropic.APIStatusError as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.http_status", str(e.status_code))
span.set_attribute("error.message", str(e.message))
raise
except anthropic.APIConnectionError as e:
span.set_status(Status(StatusCode.ERROR, "Connection failed"))
span.record_exception(e)
span.set_attribute("error.type", "APIConnectionError")
raise
def score_response_quality(
self,
span: trace.Span,
user_input: str,
output: str,
criteria: str = "Be accurate, helpful, and concise.",
) -> float:
"""
Attach a quality score to an existing span via LLM-as-judge.
Uses claude-haiku-4-5-20251001 for cost efficiency.
"""
prompt = f"""Rate this AI response from 0.0 to 1.0.
User question: {user_input[:500]}
Quality criteria: {criteria}
AI response: {output[:1000]}
Respond with ONLY a decimal number between 0.0 and 1.0.
Examples: 0.93, 0.41, 0.78"""
try:
judge_response = self._client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=10,
temperature=0.0,
messages=[{"role": "user", "content": prompt}],
)
score = float(judge_response.content[0].text.strip())
score = max(0.0, min(1.0, score)) # clamp to [0, 1]
except (ValueError, Exception):
score = 0.5 # fallback on parse error
span.set_attribute("llm.quality_score", score)
return score
Tracing a Complete RAG Pipeline
The real power of tracing emerges in multi-step pipelines. Here is a complete, production-grade RAG pipeline with properly nested parent/child spans.
# observability/rag_tracer.py
import anthropic
import time
import uuid
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from .tracer import TracedAnthropicClient, compute_cost, setup_tracer
class RAGPipelineTracer:
"""
Traces a complete RAG pipeline with end-to-end span nesting.
Span hierarchy:
rag-pipeline (root)
├── embed-query
├── vector-retrieval
├── anthropic chat (auto-nested via context propagation)
└── quality-score
"""
def __init__(self, tracer: trace.Tracer, vector_store=None):
self.tracer = tracer
self.vector_store = vector_store # your vector DB client
self.llm = TracedAnthropicClient(
tracer=tracer,
enable_content_capture=True,
)
def run(
self,
user_query: str,
user_id: str,
session_id: str,
feature: str = "rag-qa",
) -> dict:
"""
Run a full RAG pipeline with end-to-end tracing.
Returns: dict with response, sources, trace_id, and quality_score.
"""
request_id = str(uuid.uuid4())
with self.tracer.start_as_current_span("rag-pipeline") as root_span:
root_span.set_attribute("user.id", user_id)
root_span.set_attribute("session.id", session_id)
root_span.set_attribute("app.feature", feature)
root_span.set_attribute("pipeline.request_id", request_id)
root_span.set_attribute("pipeline.query", user_query[:500])
try:
# Step 1: Embed query
query_embedding = self._embed_query(user_query)
# Step 2: Retrieve context
retrieved_docs = self._retrieve(user_query, query_embedding)
# Step 3: Generate answer (LLM call is traced automatically)
answer, usage = self._generate(
user_query, retrieved_docs, user_id, session_id
)
# Step 4: Score quality
quality_score = self._score_quality(
user_query, answer, retrieved_docs
)
# Aggregate root span attributes
root_span.set_attribute("pipeline.docs_retrieved", len(retrieved_docs))
root_span.set_attribute("pipeline.response_length", len(answer))
root_span.set_attribute("pipeline.quality_score", quality_score)
root_span.set_attribute("pipeline.total_cost_usd", usage.get("cost", 0))
root_span.set_status(Status(StatusCode.OK))
trace_id = format(
root_span.get_span_context().trace_id, "032x"
)
return {
"response": answer,
"sources": retrieved_docs,
"quality_score": quality_score,
"trace_id": trace_id,
"request_id": request_id,
"usage": usage,
}
except Exception as e:
root_span.set_status(Status(StatusCode.ERROR, str(e)))
root_span.record_exception(e)
raise
def _embed_query(self, query: str) -> list[float]:
"""Embed the user query. Traced as a child span."""
with self.tracer.start_as_current_span("embed-query") as span:
span.set_attribute("gen_ai.operation.name", "embeddings")
span.set_attribute("gen_ai.system", "voyage")
span.set_attribute("embedding.model", "voyage-3")
span.set_attribute("embedding.query_length", len(query))
start = time.monotonic_ns()
# In production: call your embedding API
# embedding = voyage_client.embed(query, model="voyage-3")
time.sleep(0.05) # Simulated latency
embedding = [0.1] * 1024 # Placeholder
duration_ms = (time.monotonic_ns() - start) / 1_000_000
span.set_attribute("embedding.dimensions", len(embedding))
span.set_attribute("embedding.latency_ms", round(duration_ms, 2))
span.set_status(Status(StatusCode.OK))
return embedding
def _retrieve(self, query: str, embedding: list[float]) -> list[str]:
"""Retrieve relevant documents. Traced as a child span."""
with self.tracer.start_as_current_span("vector-retrieval") as span:
span.set_attribute("retrieval.query", query[:500])
span.set_attribute("retrieval.k", 5)
span.set_attribute("retrieval.index", "knowledge-base-v3")
span.set_attribute("db.system", "pinecone")
start = time.monotonic_ns()
# In production: query your vector store
# results = self.vector_store.query(embedding, top_k=5)
time.sleep(0.08) # Simulated latency
docs = [
"Document 1: The contract was executed on March 14, 2023, with a two-year term expiring March 14, 2025.",
"Document 2: Payment terms require NET-30 invoicing from the contract date.",
"Document 3: The renewal clause allows 60-day notice before expiry.",
"Document 4: Governing law is the State of California.",
"Document 5: Termination for convenience requires 30-day notice from either party.",
]
duration_ms = (time.monotonic_ns() - start) / 1_000_000
span.set_attribute("retrieval.docs_found", len(docs))
span.set_attribute("retrieval.latency_ms", round(duration_ms, 2))
span.set_attribute("retrieval.avg_similarity", 0.87)
# Log first doc for debugging (truncated)
if docs:
span.set_attribute("retrieval.first_doc_preview", docs[0][:200])
span.set_status(Status(StatusCode.OK))
return docs
def _generate(
self,
query: str,
docs: list[str],
user_id: str,
session_id: str,
) -> tuple[str, dict]:
"""Generate the answer. The LLM call is traced by TracedAnthropicClient."""
context = "\n\n---\n\n".join(docs)
messages = [{
"role": "user",
"content": (
f"Based only on the following context, answer the question accurately.\n\n"
f"Context:\n{context}\n\n"
f"Question: {query}"
),
}]
response = self.llm.messages_create(
model="claude-opus-4-6",
messages=messages,
max_tokens=1024,
temperature=0.0,
system=(
"You are a precise document analyst. "
"Answer based only on the provided context. "
"If the context doesn't contain the answer, say so explicitly."
),
operation_name="rag-synthesis",
user_id=user_id,
session_id=session_id,
feature_name="rag-qa",
)
answer = response.content[0].text
usage = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"cost": compute_cost(
"claude-opus-4-6",
response.usage.input_tokens,
response.usage.output_tokens,
),
}
return answer, usage
def _score_quality(
self,
query: str,
answer: str,
context_docs: list[str],
) -> float:
"""Score the answer quality. Traced as a child span."""
with self.tracer.start_as_current_span("quality-scoring") as span:
span.set_attribute("quality.method", "llm-judge")
span.set_attribute("quality.judge_model", "claude-haiku-4-5-20251001")
context_snippet = "\n".join(doc[:200] for doc in context_docs[:3])
judge_prompt = f"""Rate this RAG response from 0.0 to 1.0.
Question: {query}
Context used: {context_snippet}
Response: {answer[:500]}
Rate on:
- Faithfulness (does it stick to the context?)
- Completeness (does it answer the question?)
- Conciseness (no unnecessary padding?)
Output only a decimal number."""
try:
client = anthropic.Anthropic()
judge_resp = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=10,
temperature=0.0,
messages=[{"role": "user", "content": judge_prompt}],
)
score = float(judge_resp.content[0].text.strip())
score = max(0.0, min(1.0, score))
except (ValueError, Exception):
score = 0.5
span.set_attribute("quality.score", score)
return score
Async Tracing and Context Propagation
OTel context propagates automatically within asyncio via Python's contextvars mechanism. This is critical for async FastAPI applications.
# observability/async_tracer.py
import asyncio
import anthropic
from opentelemetry import trace, context as otel_context
from opentelemetry.trace import Status, StatusCode
from fastapi import FastAPI, Request
tracer = trace.get_tracer("async-ai-service")
client = anthropic.AsyncAnthropic()
async def traced_parallel_pipeline(
user_query: str,
user_id: str,
) -> dict:
"""
Run retrieval and embedding concurrently.
OTel context propagates correctly through asyncio.gather().
"""
with tracer.start_as_current_span("parallel-rag") as root:
root.set_attribute("user.id", user_id)
root.set_attribute("input.query", user_query)
async def step_embed():
with tracer.start_as_current_span("embed-query") as s:
await asyncio.sleep(0.05) # simulate embedding API
s.set_attribute("embedding.model", "voyage-3")
s.set_attribute("embedding.dims", 1024)
return [0.1] * 1024
async def step_prefetch_metadata():
with tracer.start_as_current_span("prefetch-user-metadata") as s:
await asyncio.sleep(0.02) # simulate DB lookup
s.set_attribute("db.system", "postgresql")
return {"tier": "enterprise", "locale": "en-US"}
# Both steps run concurrently - spans nest correctly in the trace
embedding, metadata = await asyncio.gather(
step_embed(),
step_prefetch_metadata(),
)
with tracer.start_as_current_span("llm-generation") as gen_span:
gen_span.set_attribute("gen_ai.system", "anthropic")
gen_span.set_attribute("gen_ai.request.model", "claude-opus-4-6")
response = await client.messages.create(
model="claude-opus-4-6",
max_tokens=512,
messages=[{"role": "user", "content": user_query}],
)
answer = response.content[0].text
gen_span.set_attribute("gen_ai.usage.input_tokens", response.usage.input_tokens)
gen_span.set_attribute("gen_ai.usage.output_tokens", response.usage.output_tokens)
gen_span.set_attribute("gen_ai.finish_reason", str(response.stop_reason))
root.set_attribute("pipeline.complete", True)
return {"answer": answer, "user_metadata": metadata}
# ── FastAPI ASGI Middleware ──────────────────────────────────────────────────
class LLMTracingMiddleware:
"""
ASGI middleware that creates a root trace span for each HTTP request
and propagates context into nested LLM calls.
"""
def __init__(self, app, tracer: trace.Tracer):
self.app = app
self.tracer = tracer
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope.get("path", "")
method = scope.get("method", "GET")
headers = dict(scope.get("headers", []))
with self.tracer.start_as_current_span(f"{method} {path}") as span:
span.set_attribute("http.method", method)
span.set_attribute("http.path", path)
# Extract user context from request headers
user_id = headers.get(b"x-user-id", b"").decode("utf-8", errors="replace")
if user_id:
span.set_attribute("user.id", user_id)
tier = headers.get(b"x-user-tier", b"free").decode("utf-8", errors="replace")
span.set_attribute("user.tier", tier)
await self.app(scope, receive, send)
Streaming Response Tracing
Streaming is more complex to trace because the full response isn't available until the stream ends. Here is the correct pattern:
import anthropic
import time
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer("streaming-tracer")
client = anthropic.Anthropic()
def traced_streaming_call(
user_query: str,
user_id: str,
) -> str:
"""
Trace a streaming LLM response correctly.
Key insight: the span starts BEFORE the stream and ends AFTER
the last token. time_to_first_token is tracked separately as
a span attribute because it is the user-perceived latency.
"""
with tracer.start_as_current_span("anthropic chat (streaming)") as span:
span.set_attribute("gen_ai.system", "anthropic")
span.set_attribute("gen_ai.request.model", "claude-opus-4-6")
span.set_attribute("user.id", user_id)
span.set_attribute("streaming", True)
accumulated_text = []
input_tokens = 0
output_tokens = 0
start_ns = time.monotonic_ns()
time_to_first_token_ms: float | None = None
try:
with client.messages.stream(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": user_query}],
) as stream:
for text in stream.text_stream:
if time_to_first_token_ms is None:
time_to_first_token_ms = (
(time.monotonic_ns() - start_ns) / 1_000_000
)
span.add_event(
"first_token_received",
{"latency_ms": time_to_first_token_ms}
)
accumulated_text.append(text)
# Stream complete - access final message for usage
final_message = stream.get_final_message()
input_tokens = final_message.usage.input_tokens
output_tokens = final_message.usage.output_tokens
total_ms = (time.monotonic_ns() - start_ns) / 1_000_000
full_response = "".join(accumulated_text)
# Record all attributes after stream completes
span.set_attribute("gen_ai.usage.input_tokens", input_tokens)
span.set_attribute("gen_ai.usage.output_tokens", output_tokens)
span.set_attribute("gen_ai.finish_reason", "end_turn")
span.set_attribute("llm.latency_ms", round(total_ms, 2))
span.set_attribute("llm.time_to_first_token_ms", round(time_to_first_token_ms or 0, 2))
span.set_attribute("gen_ai.response.text", full_response[:4000])
span.set_status(Status(StatusCode.OK))
return full_response
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Production Engineering Notes
Content Capture and PII
Capturing the full prompt and response in traces creates a PII risk: if users send personal information (names, emails, health data), that information is now stored in your trace store. Mitigations:
- Hash or redact: run a PII detector (Microsoft Presidio, AWS Comprehend) before the tracer captures input
- Restrict access: trace stores contain every prompt your users have sent - treat them with the same access controls as your production database
- Retention policies: auto-delete traces older than 30-90 days
- PII-safe mode: set
enable_content_capture=Falsefor sensitive features and rely on aggregate metrics only
:::warning Truncate, Never Omit
If you must limit the size of prompt content stored in traces for cost or privacy, truncate it - do not omit it entirely. A trace with gen_ai.request.system: "You are a helpful assistant for Acme... [truncated]" is still useful for debugging. A trace with no system prompt field is useless.
:::
:::danger Trace Stores Contain Sensitive Data Your trace store contains every prompt your users have sent, verbatim. This is a high-value target for data breaches and an auditor's concern if not properly controlled. Treat the trace store with the same access control, encryption at rest, and retention policies as your production database. Rotate access credentials. Log all queries from engineers. This is not optional for enterprise deployments. :::
Sampling in High-Volume Systems
At high QPS, recording every trace in full is expensive (storage, network, and in some backends, per-trace fees). Use tail-based sampling:
- Record 100% of traces that had errors, LLM exceptions, or
stop_reason == max_tokens - Record 100% of traces for enterprise or premium users
- Record 5-10% of all other traces for baseline quality monitoring
- Never sample below the granularity of individual LLM calls within a trace - either record the whole trace or none of it
import random
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
from opentelemetry.sdk.trace import TracerProvider
class QualityAwareSampler(Sampler):
"""
Always sample errors and enterprise users.
Sample base_rate fraction of everything else.
"""
def __init__(self, base_rate: float = 0.10):
self.base_rate = base_rate
def should_sample(
self, parent_context, trace_id, name, kind, attributes, links
):
attrs = attributes or {}
# Always sample errors
if attrs.get("error") or "error" in name.lower():
return SamplingResult(Decision.RECORD_AND_SAMPLE)
# Always sample enterprise users
if attrs.get("user.tier") == "enterprise":
return SamplingResult(Decision.RECORD_AND_SAMPLE)
# Sample at base rate for everything else
if random.random() < self.base_rate:
return SamplingResult(Decision.RECORD_AND_SAMPLE)
return SamplingResult(Decision.DROP)
def get_description(self) -> str:
return f"QualityAwareSampler(base_rate={self.base_rate})"
Async and Background Workers
OTel trace context propagates correctly within asyncio through the contextvars mechanism. For background workers (Celery, ARQ, RQ), you must explicitly extract and inject the trace context:
from opentelemetry.propagate import inject, extract
import celery
@celery_app.task
def background_llm_task(payload: dict, otel_headers: dict):
"""Celery task that continues the trace from the originating request."""
# Restore the OTel context from the task payload
ctx = extract(otel_headers)
token = otel_context.attach(ctx)
try:
with tracer.start_as_current_span("background-llm-task") as span:
span.set_attribute("task.type", "background")
# ... do the LLM work
finally:
otel_context.detach(token) # always detach
def dispatch_background_task(payload: dict):
"""Dispatch a Celery task, propagating the current OTel trace context."""
headers = {}
inject(headers) # serializes traceparent + tracestate into headers dict
background_llm_task.delay(payload=payload, otel_headers=headers)
Comparison: Tracing Backends
| Backend | Type | LLM-Specific | Self-Hosted | Best For |
|---|---|---|---|---|
| LangSmith | Managed SaaS | Yes (native) | Enterprise | LangChain teams, evals |
| Langfuse | Open-source | Yes (native) | Free | Data residency, cost control |
| Phoenix | Open-source | Yes (UMAP) | Free | RAG debugging, dev-time |
| Jaeger | Open-source | Via OTel | Free | General distributed tracing |
| Grafana Tempo | Open-source | Via OTel | Free | Existing Grafana stack |
| Datadog | Managed SaaS | Via OTel | No | Enterprise APM integration |
| Honeycomb | Managed SaaS | Via OTel | No | High-cardinality queries |
Interview Q&A
Q1: What makes LLM tracing fundamentally different from traditional distributed tracing?
Traditional distributed tracing (Jaeger, Zipkin, Datadog APM) captures request graphs - which service called which, with what latency and what HTTP status code. This is a structural view of system behavior. LLM tracing adds a semantic layer: the exact content of what was asked and answered, in natural language.
This content layer is critical because LLM failures are semantic, not structural. A model can produce HTTP 200, low latency, zero exceptions, and a completely wrong answer. Traditional tracing cannot detect this because it has no concept of semantic correctness.
LLM tracing also captures four categories of metadata that traditional tracing ignores entirely: (1) Token economics - input/output token counts, computed cost per call, rolling cost attribution by user and feature. (2) Model governance - exact model version string used (not just the endpoint URL), so you can correlate quality changes with model version changes. (3) Generation parameters - temperature, max_tokens, stop sequences - because the same prompt at temperature 0.9 vs 0.0 produces very different outputs. (4) Finish reason - why the model stopped: end_turn means it completed naturally, max_tokens means it was cut off. A 30% max_tokens rate is a prompt design bug.
Q2: How do you propagate trace context through a multi-step AI pipeline across service boundaries?
OTel uses the W3C TraceContext standard, propagated via HTTP headers (traceparent, tracestate). Within a single Python process, OTel context propagates automatically through synchronous and asyncio code via contextvars - no explicit effort needed. Cross-service propagation requires instrumented HTTP clients.
For HTTP calls between microservices: install opentelemetry-instrumentation-httpx or opentelemetry-instrumentation-requests and call HTTPXClientInstrumentor().instrument(). The tracer automatically injects traceparent into outgoing requests and the receiving service's OTel middleware extracts it, creating spans as children of the originating request.
For message queues (Kafka, SQS, Celery) which don't use HTTP headers: manually serialize the context with inject(headers) when producing the message, and deserialize with extract(headers) when consuming. This is a manual step the developer must implement - it's the most common gap in distributed LLM pipeline tracing.
The result: one unified trace tree from the frontend HTTP request through retrieval service → LLM gateway → synthesis service → response, even if those services are separate processes on separate machines.
Q3: Should you store full prompt content in traces? What are the trade-offs?
Full content storage enables the most valuable debugging capability: "exactly what prompt was sent when this user got a bad response?" Without it, you have aggregate metrics but cannot diagnose individual incidents.
The trade-offs are: (1) Storage cost - prompts can be large (1,000-8,000 tokens). At millions of traces, this becomes significant storage. Mitigate with truncation (keep first 4,000 characters) and data lifecycle policies (delete after 30-90 days). (2) PII risk - users' personal content is stored verbatim in your trace store. Run a PII scrubber before the tracer captures inputs for regulated industries. (3) Compliance - some regulations (GDPR, HIPAA) restrict retention periods for conversational data. Set your trace retention policy to match your privacy policy. (4) Access control - the trace store contains every conversation. Restrict read access to engineers with a legitimate debugging need, log all queries, and never expose it to third parties.
For most B2B applications with non-sensitive data: store full content, truncate at 4,000 chars, retain for 30 days. For healthcare, legal, or financial applications: implement PII scrubbing at the trace boundary, shorten retention to 7 days, and consider self-hosting your trace backend for full data control.
Q4: What are the OpenTelemetry GenAI semantic conventions, and why do they matter for the ecosystem?
The OTel GenAI semantic conventions, stabilized in 2024, are a standardized set of span attribute names for AI/LLM calls. Key attributes: gen_ai.system (the provider, e.g., anthropic), gen_ai.request.model (exact model name), gen_ai.usage.input_tokens, gen_ai.usage.output_tokens, gen_ai.finish_reason.
They matter for three reasons: (1) Vendor neutrality - instrument once with standard attribute names, and any OTel-compatible backend (Jaeger, Grafana Tempo, Datadog, Langfuse, Phoenix) understands your LLM spans. Without a standard, switching backends requires remapping all your custom attribute names. (2) Shared tooling - when all tools agree on attribute names, pre-built dashboards, alerts, and queries from the community work out of the box for your application. (3) Cross-vendor correlation - you can run OTel spans from your Anthropic calls and your vector DB calls in the same trace with a common schema, enabling end-to-end latency attribution that spans both systems.
The conventions are now proposed for inclusion in the core OTel specification (under gen_ai.*), which means Datadog, Honeycomb, and other major backends will build native UI support for these attributes - making them the de facto standard for AI observability.
Q5: How do you correctly trace a streaming LLM response?
Streaming sends tokens incrementally, so the full response is not available at span-start time. The correct pattern: start the span when the stream begins, accumulate tokens in a buffer, record the time of the first token as a separate attribute (llm.time_to_first_token_ms), and close the span only after the stream fully completes.
The span duration covers the full stream time (time to open the stream + time to last token). This is important: for streaming UIs, time_to_first_token_ms is the user-perceived latency (when the UI starts displaying text), while total latency is when the response is complete. Track both separately.
The get_final_message() method (on Anthropic's streaming context manager) gives you the complete message with token counts after the stream ends - use this to record gen_ai.usage.input_tokens and gen_ai.usage.output_tokens in the span. Never estimate token counts during streaming; wait for the authoritative values from the API.
One common mistake: closing the span before the stream finishes (e.g., returning a generator to the caller and the span closing when the generator leaves scope). The span must outlive the stream - structure your code so the span's context manager wraps the entire stream consumption, not just the stream creation.
