Skip to main content

Production Agent Monitoring

After the Benchmark

Your agent passed SWE-bench at 48%. Your LLM judge scored it 4.1/5. Your human evaluators gave it a thumbs up. You deployed it.

Now what?

Production monitoring answers the questions benchmarks cannot. Benchmarks tell you how the agent performs on a curated, static evaluation set. Production tells you how it performs on the actual, evolving distribution of real user queries - including the ones you never anticipated, the adversarial ones, the edge cases, and the routine ones that happen a thousand times a day.

The gap between benchmark performance and production performance is always larger than you expect. Users interact with agents differently than researchers design benchmark tasks. Production codebases are messier than benchmark repositories. Real web searches return noisy, contradictory results. Real users ask ambiguous questions and correct the agent mid-session in unexpected ways.

Production monitoring is how you detect this gap, understand it, and close it.


:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Observability demo on the EngineersOfAI Playground - no code required. :::

Why Production Monitoring Differs From Benchmark Evaluation

DimensionBenchmark EvaluationProduction Monitoring
DataFixed, curatedContinuous, evolving
ScaleHundreds of tasksMillions of interactions
DistributionCarefully designedReal user behavior
LatencyNot time-sensitiveReal-time
Failure detectionPre-deploymentPost-deployment
CostExplicit budgetMust minimize overhead
Signal typeQuality scoresBehavioral metrics
ResponseFix before shipFix fast while live

Benchmarks are gatekeepers. Production monitoring is the ongoing safety net.


Distribution Shift in Production

The most insidious production problem is distribution shift - when users start asking questions your agent was not optimized for. Three common causes:

Organic shift: Users discover what the agent is good at, route other tasks to humans, and over time the production distribution skews toward the easy cases. Your metrics look great. Your agent is only handling the easy problems.

Seasonal shift: An agent that handles financial queries sees very different traffic in January (tax season) versus July. A task-tracking agent sees different queries on Monday (planning) versus Friday (reporting). Metrics computed over the whole year hide within-period failures.

Adversarial shift: Once users learn the agent's limitations, some probe them deliberately. Jailbreaking attempts, out-of-scope queries, and deliberate edge cases create a distribution the agent was never designed for.

Monitoring must detect these shifts, not just track aggregate metrics.


Core Production Metrics

1. Task Completion Rate

Definition: Fraction of sessions where the agent successfully accomplished the user's goal.

"Accomplished" is the critical definition. It must be defined precisely for your use case:

  • Did the agent produce a final response? (minimal - almost always true)
  • Did the user take a follow-up action that suggests satisfaction? (implicit signal)
  • Did the agent produce an output that passes a quality gate? (automated check)
  • Did the user explicitly rate it as helpful? (explicit signal)

Each definition captures something different. Use multiple:

# Strong completion: no follow-up correction, no escalation
strong_completion = (
not session.had_user_correction and
not session.escalated_to_human and
session.final_output is not None
)

# Weak completion: any final output produced
weak_completion = session.final_output is not None

Track both. The gap between weak and strong completion reveals the quality of "technically done" sessions.

2. User Satisfaction

Two signals, both imperfect:

Explicit: Thumbs up/down, star ratings, "Was this helpful?" Yes/No. Problems: low response rate (1–5% of sessions), responders are not representative (extreme experiences dominate), and users often click quickly without careful thought.

Implicit: Much richer signal, no user action required.

  • User accepted the output without correction → positive signal
  • User immediately issued a correction → negative signal
  • User abandoned the session before completion → negative signal
  • User asked a follow-up question elaborating on the response → positive signal (engaged)
  • User copied and used the output → strong positive signal

3. Cost Per Successful Task

cost_per_success=total_tokens_cost+tool_api_costs+compute_costsuccessful_tasks\text{cost\_per\_success} = \frac{\text{total\_tokens\_cost} + \text{tool\_api\_costs} + \text{compute\_cost}}{\text{successful\_tasks}}

Track this weekly. An upward trend means either: the agent is taking more steps per task (efficiency regression), token prices changed, or the task distribution shifted to harder tasks. Distinguish these by also tracking:

  • Average steps per session
  • Average tokens per session
  • Average tool calls per session

4. Latency

Track at percentile level, not mean:

  • p50 (median): The typical user experience
  • p95: What 95% of users experience
  • p99: The tail - often 5-10× the median for agents due to long-tail trajectories

For agents, latency is a product decision: p95 of 30 seconds might be fine for a research assistant, catastrophic for a customer support agent.

5. Error Rate by Type

Distinguish error types - they have different causes and different fixes:

Error TypeDefinitionTypical Cause
Tool failureTool call returned errorAPI issues, bad input generation
LLM refusalModel declined to proceedSafety filter, prompt issue
TimeoutSession exceeded time limitInfinite loop, slow tools
User abandonmentUser left before completionPoor experience, off-topic
Hallucination caughtOutput verified as incorrectKnowledge gap, context overflow

Distributed Tracing

Every production agent interaction should be recorded as a trace - the complete record of what happened, in what order, with what outcomes. Traces are the raw material from which all production insights flow.

Sampling Strategy

You cannot store and review every trace - it is too expensive. Instead:

  • 100% of failures: Every session that errored, timed out, or was explicitly rated negative
  • 100% of safety-flagged sessions: Any session that triggered a safety classifier
  • 10% of successes: Random sample for continuous quality monitoring
  • 100% of first sessions per new user: Understand onboarding quality
  • Stratified sample by task type: Ensure all task categories are represented

This gives you comprehensive coverage of failures and representative coverage of successes at 10-20% of full-trace storage cost.


Full Python: Production Monitoring System

"""
Production monitoring system for agents.
Includes: OpenTelemetry tracing, metric aggregation,
statistical anomaly detection, and alerting.
"""

import json
import math
import os
import time
import uuid
from collections import defaultdict, deque
from dataclasses import dataclass, field, asdict
from statistics import mean, stdev
from typing import Any, Optional
import anthropic


# ── Trace data models ──────────────────────────────────────────────────────────

@dataclass
class Span:
"""One step in a production trace."""
span_id: str
trace_id: str
parent_span_id: Optional[str]
operation_name: str
start_time: float
end_time: Optional[float] = None

# Span attributes
attributes: dict = field(default_factory=dict)
events: list[dict] = field(default_factory=list)
status: str = "OK" # "OK", "ERROR"
error_message: Optional[str] = None

@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0.0

def set_attribute(self, key: str, value: Any):
self.attributes[key] = value

def add_event(self, name: str, attributes: dict = None):
self.events.append({
"name": name,
"timestamp": time.time(),
"attributes": attributes or {},
})

def end(self, status: str = "OK", error: str = None):
self.end_time = time.time()
self.status = status
if error:
self.error_message = error
self.status = "ERROR"


@dataclass
class Trace:
"""Complete record of one production agent session."""
trace_id: str
session_id: str
user_id: Optional[str]
start_time: float
spans: list[Span] = field(default_factory=list)
end_time: Optional[float] = None

# Outcome
final_output: Optional[str] = None
completed: bool = False
user_rated_positive: Optional[bool] = None # Explicit thumbs up/down
user_corrected: bool = False # User issued correction
escalated_to_human: bool = False

# Tags for filtering
task_type: Optional[str] = None
user_segment: Optional[str] = None
model_version: str = "unknown"
agent_version: str = "unknown"

def add_span(self, span: Span):
self.spans.append(span)

def complete(self, final_output: str):
self.final_output = final_output
self.completed = True
self.end_time = time.time()

@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0.0

@property
def total_input_tokens(self) -> int:
return sum(
s.attributes.get("input_tokens", 0)
for s in self.spans
)

@property
def total_output_tokens(self) -> int:
return sum(
s.attributes.get("output_tokens", 0)
for s in self.spans
)

@property
def tool_call_count(self) -> int:
return sum(1 for s in self.spans if s.operation_name == "tool_call")

@property
def error_count(self) -> int:
return sum(1 for s in self.spans if s.status == "ERROR")

@property
def estimated_cost_usd(self) -> float:
input_cost = (self.total_input_tokens / 1_000_000) * 3.0
output_cost = (self.total_output_tokens / 1_000_000) * 15.0
return input_cost + output_cost

def is_strong_completion(self) -> bool:
return (
self.completed and
not self.user_corrected and
not self.escalated_to_human and
self.user_rated_positive is not False
)

def should_sample(self) -> bool:
"""Determine if this trace should be stored for review."""
if not self.completed:
return True # All failures
if self.error_count > 0:
return True # Any errors
if self.user_rated_positive is False:
return True # Explicitly negative rating
if self.escalated_to_human:
return True # Escalations
# 10% random sample of successes
return (hash(self.trace_id) % 10) == 0


# ── Tracer ─────────────────────────────────────────────────────────────────────

class AgentTracer:
"""
Lightweight tracer for production agent instrumentation.
Wraps your agent with trace recording.
"""

def __init__(
self,
agent_version: str,
model_version: str,
exporter=None,
):
self.agent_version = agent_version
self.model_version = model_version
self.exporter = exporter or InMemoryExporter()
self._current_trace: Optional[Trace] = None

def start_trace(
self,
session_id: str,
user_id: Optional[str] = None,
task_type: Optional[str] = None,
user_segment: Optional[str] = None,
) -> Trace:
trace = Trace(
trace_id=str(uuid.uuid4()),
session_id=session_id,
user_id=user_id,
start_time=time.time(),
task_type=task_type,
user_segment=user_segment,
model_version=self.model_version,
agent_version=self.agent_version,
)
self._current_trace = trace
return trace

def start_span(
self,
operation_name: str,
parent_span_id: Optional[str] = None,
attributes: dict = None,
) -> Span:
span = Span(
span_id=str(uuid.uuid4())[:8],
trace_id=self._current_trace.trace_id if self._current_trace else "unknown",
parent_span_id=parent_span_id,
operation_name=operation_name,
start_time=time.time(),
attributes=attributes or {},
)
if self._current_trace:
self._current_trace.add_span(span)
return span

def end_trace(self, trace: Trace, final_output: Optional[str]):
if final_output:
trace.complete(final_output)
else:
trace.end_time = time.time()

if trace.should_sample():
self.exporter.export(trace)

def instrument_agent(self, query: str, session_id: str = None) -> tuple[Optional[str], Trace]:
"""
Run the Claude agent with full tracing instrumentation.
Returns (final_output, trace).
"""
if not session_id:
session_id = str(uuid.uuid4())[:8]

trace = self.start_trace(session_id=session_id)
client = anthropic.Anthropic()

tools = [
{
"name": "web_search",
"description": "Search the web.",
"input_schema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
}
]

messages = [{"role": "user", "content": query}]
system = "You are a helpful assistant."
final_output = None

for _ in range(15):
# LLM span
llm_span = self.start_span("llm_call")
t0 = time.time()

try:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
system=system,
tools=tools,
messages=messages,
)
llm_span.set_attribute("input_tokens", response.usage.input_tokens)
llm_span.set_attribute("output_tokens", response.usage.output_tokens)
llm_span.set_attribute("stop_reason", response.stop_reason)
llm_span.set_attribute("model", response.model)
llm_span.end()
except Exception as e:
llm_span.end(status="ERROR", error=str(e))
break

text = next((b.text for b in response.content if hasattr(b, "text")), "")

if response.stop_reason == "end_turn":
final_output = text
break

if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_results = []

for block in response.content:
if block.type == "tool_use":
tool_span = self.start_span(
"tool_call",
attributes={
"tool_name": block.name,
"tool_input": json.dumps(block.input)[:500],
}
)
t0 = time.time()

try:
# Mock execution
result = f"[Mock result for {block.name}]"
tool_span.set_attribute("tool_output_length", len(result))
tool_span.end()
except Exception as e:
tool_span.end(status="ERROR", error=str(e))
result = f"Error: {e}"

tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})

messages.append({"role": "user", "content": tool_results})

self.end_trace(trace, final_output)
return final_output, trace


# ── Metric aggregation ─────────────────────────────────────────────────────────

@dataclass
class MetricWindow:
"""Rolling window of metric values for a fixed time period."""
window_size: int # number of data points
values: deque = field(default_factory=deque)

def add(self, value: float):
self.values.append(value)
if len(self.values) > self.window_size:
self.values.popleft()

def mean(self) -> float:
return mean(self.values) if self.values else 0.0

def std(self) -> float:
return stdev(self.values) if len(self.values) > 1 else 0.0

def percentile(self, p: float) -> float:
"""p is 0-100."""
if not self.values:
return 0.0
sorted_vals = sorted(self.values)
idx = int(len(sorted_vals) * p / 100)
return sorted_vals[min(idx, len(sorted_vals) - 1)]


class MetricsAggregator:
"""
Aggregates production trace data into metrics windows.
Maintains rolling windows for anomaly detection.
"""

def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.completion_window = MetricWindow(window_size)
self.strong_completion_window = MetricWindow(window_size)
self.latency_window = MetricWindow(window_size)
self.cost_window = MetricWindow(window_size)
self.error_rate_window = MetricWindow(window_size)
self.tool_calls_window = MetricWindow(window_size)

# Slice tracking: by task_type, model_version, user_segment
self.slice_metrics: dict[str, dict[str, MetricWindow]] = defaultdict(
lambda: {
"completion": MetricWindow(200),
"latency": MetricWindow(200),
"cost": MetricWindow(200),
}
)

self.total_traces = 0

def record_trace(self, trace: Trace):
"""Record a completed trace into all metric windows."""
self.total_traces += 1

self.completion_window.add(1.0 if trace.completed else 0.0)
self.strong_completion_window.add(1.0 if trace.is_strong_completion() else 0.0)
self.latency_window.add(trace.duration_ms)
self.cost_window.add(trace.estimated_cost_usd)
self.tool_calls_window.add(trace.tool_call_count)

# Error rate: fraction of tool calls that errored
if trace.tool_call_count > 0:
self.error_rate_window.add(trace.error_count / trace.tool_call_count)

# Slice metrics
for slice_key in [trace.task_type, trace.model_version, trace.user_segment]:
if slice_key:
self.slice_metrics[slice_key]["completion"].add(
1.0 if trace.completed else 0.0
)
self.slice_metrics[slice_key]["latency"].add(trace.duration_ms)
self.slice_metrics[slice_key]["cost"].add(trace.estimated_cost_usd)

def current_metrics(self) -> dict:
return {
"total_traces": self.total_traces,
"completion_rate": round(self.completion_window.mean(), 3),
"strong_completion_rate": round(self.strong_completion_window.mean(), 3),
"latency_p50_ms": round(self.latency_window.percentile(50), 1),
"latency_p95_ms": round(self.latency_window.percentile(95), 1),
"latency_p99_ms": round(self.latency_window.percentile(99), 1),
"avg_cost_usd": round(self.cost_window.mean(), 5),
"error_rate": round(self.error_rate_window.mean(), 3),
"avg_tool_calls": round(self.tool_calls_window.mean(), 1),
}

def slice_report(self) -> dict:
report = {}
for slice_key, metrics in self.slice_metrics.items():
report[slice_key] = {
"completion_rate": round(metrics["completion"].mean(), 3),
"latency_p50_ms": round(metrics["latency"].percentile(50), 1),
"avg_cost_usd": round(metrics["cost"].mean(), 5),
"sample_count": len(metrics["completion"].values),
}
return report


# ── Statistical anomaly detection ──────────────────────────────────────────────

@dataclass
class Alert:
alert_id: str
severity: str # "critical", "warning"
metric: str
message: str
current_value: float
baseline_value: float
threshold: float
timestamp: float


class AnomalyDetector:
"""
Statistical process control for agent behavior metrics.
Uses EWMA (Exponentially Weighted Moving Average) control charts.
"""

EWMA_LAMBDA = 0.2 # Smoothing factor. Higher = more weight to recent data.
ALERT_THRESHOLDS = {
"completion_rate": {
"critical_drop": 0.10, # Alert if drops by more than 10 percentage points
"warning_drop": 0.05, # Warn if drops by more than 5 percentage points
},
"strong_completion_rate": {
"critical_drop": 0.10,
"warning_drop": 0.05,
},
"latency_p95_ms": {
"critical_increase": 2.0, # Alert if 2× baseline
"warning_increase": 1.5, # Warn if 1.5× baseline
},
"error_rate": {
"critical_increase": 0.10, # Alert if error rate > 10%
"warning_increase": 0.05, # Warn if > 5%
},
"avg_cost_usd": {
"critical_increase": 2.0, # Alert if 2× baseline cost
"warning_increase": 1.5,
},
}

def __init__(self):
self._ewma: dict[str, float] = {}
self._baseline: dict[str, float] = {}
self._initialized: dict[str, bool] = {}
self._alert_history: list[Alert] = []

def update(self, metrics: dict) -> list[Alert]:
"""Update EWMA values and check for anomalies."""
new_alerts = []

for metric, value in metrics.items():
if not isinstance(value, (int, float)):
continue

# Initialize EWMA
if not self._initialized.get(metric):
self._ewma[metric] = value
self._baseline[metric] = value
self._initialized[metric] = True
continue

# Update EWMA
prev_ewma = self._ewma[metric]
self._ewma[metric] = (
self.EWMA_LAMBDA * value +
(1 - self.EWMA_LAMBDA) * prev_ewma
)

# Check thresholds
thresholds = self.ALERT_THRESHOLDS.get(metric, {})
baseline = self._baseline[metric]

if not thresholds or baseline == 0:
continue

current = self._ewma[metric]

for threshold_key, threshold_value in thresholds.items():
alert = self._check_threshold(
metric, current, baseline, threshold_key, threshold_value
)
if alert:
new_alerts.append(alert)
self._alert_history.append(alert)

return new_alerts

def _check_threshold(
self, metric: str, current: float, baseline: float,
threshold_key: str, threshold_value: float
) -> Optional[Alert]:
"""Check a single threshold and create an alert if violated."""

if "drop" in threshold_key:
# Lower is worse (completion rate, etc.)
drop = baseline - current
if drop <= 0:
return None # No drop

severity = "critical" if "critical" in threshold_key else "warning"
if drop >= threshold_value:
return Alert(
alert_id=str(uuid.uuid4())[:8],
severity=severity,
metric=metric,
message=f"{metric} dropped by {drop:.3f} (baseline: {baseline:.3f}, current: {current:.3f})",
current_value=current,
baseline_value=baseline,
threshold=threshold_value,
timestamp=time.time(),
)

elif "increase" in threshold_key:
# Higher is worse (latency, error rate, cost)
severity = "critical" if "critical" in threshold_key else "warning"

if "×" in str(threshold_value) or threshold_value > 1:
# Multiplicative threshold (e.g., 2× baseline)
ratio = current / max(baseline, 0.001)
if ratio >= threshold_value:
return Alert(
alert_id=str(uuid.uuid4())[:8],
severity=severity,
metric=metric,
message=f"{metric} is {ratio:.1f}× baseline ({current:.3f} vs {baseline:.3f})",
current_value=current,
baseline_value=baseline,
threshold=threshold_value,
timestamp=time.time(),
)
else:
# Absolute threshold (e.g., error rate > 0.10)
if current >= threshold_value:
return Alert(
alert_id=str(uuid.uuid4())[:8],
severity=severity,
metric=metric,
message=f"{metric} exceeded threshold ({current:.3f} >= {threshold_value:.3f})",
current_value=current,
baseline_value=baseline,
threshold=threshold_value,
timestamp=time.time(),
)

return None

def update_baseline(self, metrics: dict):
"""Update baseline values (e.g., after intentional change)."""
for metric, value in metrics.items():
if isinstance(value, (int, float)):
self._baseline[metric] = value
self._ewma[metric] = value


# ── Alerting ───────────────────────────────────────────────────────────────────

class SlackAlerter:
"""Sends alerts to Slack. Replace with your alerting system."""

def __init__(self, webhook_url: Optional[str] = None, channel: str = "#agent-alerts"):
self.webhook_url = webhook_url or os.getenv("SLACK_WEBHOOK_URL")
self.channel = channel
self._alert_cooldowns: dict[str, float] = {}
self.COOLDOWN_SECONDS = 300 # Don't repeat same alert within 5 minutes

def send_alert(self, alert: Alert) -> bool:
"""Send an alert if not in cooldown."""
cooldown_key = f"{alert.metric}_{alert.severity}"
now = time.time()

if (cooldown_key in self._alert_cooldowns and
now - self._alert_cooldowns[cooldown_key] < self.COOLDOWN_SECONDS):
return False # Silenced by cooldown

self._alert_cooldowns[cooldown_key] = now

emoji = ":rotating_light:" if alert.severity == "critical" else ":warning:"
message = (
f"{emoji} *{alert.severity.upper()} - Agent Monitor*\n"
f"Metric: `{alert.metric}`\n"
f"Message: {alert.message}\n"
f"Alert ID: `{alert.alert_id}`"
)

print(f"[ALERT] {message}") # Replace with actual Slack webhook call

if self.webhook_url:
try:
import urllib.request
payload = json.dumps({"text": message, "channel": self.channel})
req = urllib.request.Request(
self.webhook_url,
data=payload.encode(),
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
return True
except Exception as e:
print(f"Slack alert failed: {e}")

return True

def send_alerts(self, alerts: list[Alert]):
for alert in alerts:
self.send_alert(alert)


# ── Storage ────────────────────────────────────────────────────────────────────

class InMemoryExporter:
"""In-memory trace storage. Replace with persistent DB in production."""

def __init__(self, max_traces: int = 10_000):
self.max_traces = max_traces
self._traces: list[Trace] = []

def export(self, trace: Trace):
self._traces.append(trace)
if len(self._traces) > self.max_traces:
self._traces.pop(0)

def get_recent(self, n: int = 100) -> list[Trace]:
return self._traces[-n:]

def get_failures(self) -> list[Trace]:
return [t for t in self._traces if not t.completed or t.error_count > 0]


# ── Production monitor (orchestrator) ─────────────────────────────────────────

class ProductionMonitor:
"""
Orchestrates tracing, metric aggregation, anomaly detection, and alerting.
"""

def __init__(
self,
agent_version: str,
model_version: str,
alert_fn=None,
):
self.exporter = InMemoryExporter()
self.tracer = AgentTracer(
agent_version=agent_version,
model_version=model_version,
exporter=self.exporter,
)
self.aggregator = MetricsAggregator(window_size=1000)
self.detector = AnomalyDetector()
self.alerter = SlackAlerter()
self._trace_count = 0

def run_session(
self,
query: str,
session_id: str = None,
user_id: str = None,
) -> tuple[Optional[str], Trace]:
"""Run an agent session with full monitoring instrumentation."""
result, trace = self.tracer.instrument_agent(
query=query,
session_id=session_id or str(uuid.uuid4())[:8],
)

# Record metrics
self.aggregator.record_trace(trace)
self._trace_count += 1

# Check for anomalies every 50 traces
if self._trace_count % 50 == 0:
metrics = self.aggregator.current_metrics()
alerts = self.detector.update(metrics)
if alerts:
self.alerter.send_alerts(alerts)

return result, trace

def dashboard(self) -> dict:
"""Current monitoring dashboard data."""
metrics = self.aggregator.current_metrics()
return {
"metrics": metrics,
"slices": self.aggregator.slice_report(),
"recent_failures": len(self.exporter.get_failures()),
}

def print_dashboard(self):
data = self.dashboard()
print("\n── Production Monitor Dashboard ─────────────────────")
print(f"Total traces processed: {data['metrics']['total_traces']}")
print(f"\nKey Metrics:")
m = data["metrics"]
print(f" Completion rate: {m['completion_rate']:.1%}")
print(f" Strong completion rate: {m['strong_completion_rate']:.1%}")
print(f" Latency p50: {m['latency_p50_ms']:,.0f}ms")
print(f" Latency p95: {m['latency_p95_ms']:,.0f}ms")
print(f" Error rate: {m['error_rate']:.1%}")
print(f" Avg cost: ${m['avg_cost_usd']:.5f}")
print(f" Avg tool calls: {m['avg_tool_calls']:.1f}")

if data.get("slices"):
print(f"\nSlice Analysis:")
for slice_key, slice_data in data["slices"].items():
print(f" {slice_key}: "
f"completion={slice_data['completion_rate']:.1%} | "
f"p50={slice_data['latency_p50_ms']:,.0f}ms | "
f"n={slice_data['sample_count']}")


# ── The production improvement flywheel ───────────────────────────────────────

class ImprovementFlywheel:
"""
Converts production monitoring insights into eval dataset updates.
Implements the feedback flywheel: production → eval → improvement → production.
"""

def __init__(self, monitor: ProductionMonitor, eval_dataset_path: str):
self.monitor = monitor
self.eval_dataset_path = eval_dataset_path

def extract_eval_candidates(
self,
n_failures: int = 20,
n_successes: int = 5,
) -> list[dict]:
"""
Extract production traces as candidates for the eval dataset.
Failures are most valuable for regression prevention.
"""
failures = self.monitor.exporter.get_failures()[:n_failures]
successes = [
t for t in self.monitor.exporter.get_recent(200)
if t.completed and t.error_count == 0
][:n_successes]

candidates = []
for trace in failures + successes:
if not trace.final_output:
continue
candidates.append({
"source": "production",
"trace_id": trace.trace_id,
"task_type": trace.task_type,
"duration_ms": trace.duration_ms,
"error_count": trace.error_count,
"completed": trace.completed,
"is_failure": trace in failures,
"estimated_cost": trace.estimated_cost_usd,
})

return candidates

def weekly_report(self) -> dict:
"""Generate weekly summary for team review."""
metrics = self.monitor.aggregator.current_metrics()
failures = self.monitor.exporter.get_failures()

failure_types = defaultdict(int)
for trace in failures:
for span in trace.spans:
if span.status == "ERROR":
op = span.operation_name
failure_types[op] += 1

return {
"metrics_summary": metrics,
"total_failures": len(failures),
"failure_breakdown": dict(failure_types),
"eval_candidates_available": len(self.extract_eval_candidates()),
"recommended_actions": self._recommend_actions(metrics, failure_types),
}

def _recommend_actions(self, metrics: dict, failure_types: dict) -> list[str]:
actions = []

if metrics.get("completion_rate", 1.0) < 0.80:
actions.append("CRITICAL: Completion rate below 80%. Investigate immediately.")

if metrics.get("error_rate", 0) > 0.10:
actions.append("High tool error rate. Check API integrations and input validation.")

if metrics.get("latency_p99_ms", 0) > 120_000:
actions.append("p99 latency > 2 minutes. Check for infinite loops or slow tools.")

if failure_types.get("tool_call", 0) > 50:
actions.append("High tool call failures. Review tool reliability and error handling.")

return actions


# ── Demo ───────────────────────────────────────────────────────────────────────

def demo():
monitor = ProductionMonitor(
agent_version="v2.1.0",
model_version="claude-opus-4-6",
)

print("Simulating production traffic...\n")

sample_queries = [
"What is the capital of France?",
"Explain quantum entanglement in simple terms.",
"What is 15% of 847?",
"Summarize the key points of transformer architecture.",
"What are the main differences between RAG and fine-tuning?",
]

for i, query in enumerate(sample_queries):
print(f"Session {i+1}: {query[:50]}...")
result, trace = monitor.run_session(
query=query,
session_id=f"session_{i+1}",
user_id=f"user_{i % 3}",
)
print(f" Completed: {trace.completed} | "
f"Tokens: {trace.total_input_tokens + trace.total_output_tokens:,} | "
f"Cost: ${trace.estimated_cost_usd:.5f}")

monitor.print_dashboard()

# Flywheel demo
flywheel = ImprovementFlywheel(monitor, eval_dataset_path="/tmp/eval_candidates.json")
report = flywheel.weekly_report()
print("\n── Weekly Improvement Report ────────────────────────")
print(f"Total failures: {report['total_failures']}")
print(f"Eval candidates available: {report['eval_candidates_available']}")
if report["recommended_actions"]:
print("Recommended actions:")
for action in report["recommended_actions"]:
print(f" - {action}")
else:
print("No actions needed - all metrics healthy.")


if __name__ == "__main__":
demo()

Slice Analysis: Beyond Aggregate Metrics

Aggregate metrics hide the most important failures. A 90% completion rate sounds excellent - until you discover that:

  • Task type "code_debugging" has a 60% completion rate
  • Users on mobile have a 95% abandonment rate
  • Sessions between 2am–5am have a 40% error rate (your tool APIs have SLA gaps)
  • New users (first 5 sessions) have 30% lower completion than experienced users

Slice your metrics by: task type, user segment, device/platform, model version, time of day, session number (first session vs returning user), and query length. Review slice-level metrics weekly. Aggregate metrics tell you something changed. Slices tell you what.


The Production Improvement Flywheel

Each revolution of the flywheel makes your agent better and your eval set more comprehensive. After 6 months of running this loop:

  • Your eval set contains 200+ production failure cases that your agent now handles correctly
  • Your anomaly detector has learned the expected baseline behavior and alerts quickly on deviations
  • Your slice analysis has identified and addressed 3–5 previously unknown failure modes
  • Your cost per successful task has decreased as efficiency regressions were caught early

This is how production agents improve over time - not through one-time releases, but through continuous measurement, learning, and iteration.


:::danger Incomplete Completion Definitions Create False Confidence The most common production monitoring mistake is defining "task completion" as "agent produced a final response." This is almost always true - agents almost always produce some output. The metric looks great (95%+ completion) but hides poor quality. Always measure strong completion: no user correction, no follow-up that contradicts the response, no human escalation. The gap between weak and strong completion is where your quality problems hide. :::

:::warning Alerting Fatigue Kills Monitoring Programs If your alerting system sends 50 alerts per day, engineers will mute it within a week. Design alerting to be surgical: page on truly critical issues (completion rate drops 10%+ in 10 minutes), warn on concerning trends (steady 2% weekly decline), and batch daily digests for informational metrics. The goal is "every alert gets investigated" - not "every metric deviation generates an alert." Tune thresholds aggressively. Better to miss a minor issue than to alert-fatigue your team into ignoring all alerts. :::


Interview Q&A

Q: What is the difference between production monitoring and benchmark evaluation for agents?

A: Benchmarks evaluate on a fixed, curated dataset before deployment - they are a gate. Production monitoring measures actual behavior on real user traffic continuously after deployment. The fundamental difference is data: benchmark data is controlled, representative by design, and static. Production data is uncontrolled, evolves with user behavior, and reveals failure modes benchmarks never anticipated. You need both: benchmarks to gate releases, production monitoring to catch what benchmarks miss. Specifically, production catches: distribution shift (users ask things the benchmark didn't cover), adversarial inputs, long-tail edge cases, and degradation over time as context or dependencies change.


Q: Define "task completion rate" and explain why the definition matters.

A: Task completion rate sounds simple - fraction of agent sessions where the task was completed. But "completed" can mean very different things: (a) agent produced any final output (near 100% always), (b) agent produced an output that passed an automated quality check, (c) user did not issue a correction or escalation, or (d) user explicitly rated it as helpful. Each definition captures different quality signals. The gap between definition (a) and definition (c) - "weak completion" vs "strong completion" - reveals the fraction of sessions where the agent technically finished but failed to satisfy the user. In practice, weak completion is 95%+ for most deployed agents; strong completion is often 60–80%. Always measure both, and track the gap.


Q: How would you design a sampling strategy for production trace storage?

A: Not all traces deserve equal storage and review. I would use a tiered sampling strategy: store 100% of failures (any session that errored, timed out, or was explicitly rated negative), 100% of sessions that triggered safety classifiers, and 100% of first sessions per new user (understand onboarding). For successes, store a 10% stratified random sample - stratified by task type and user segment to ensure all slices are represented. This gives comprehensive failure coverage and representative success coverage at roughly 15-20% of full-trace storage cost. Weekly, a human reviews a sample of the stored traces (10-20 per week) for qualitative insights that automated metrics miss.


Q: What is EWMA-based anomaly detection and why is it better than threshold-based alerts?

A: EWMA (Exponentially Weighted Moving Average) builds a smoothed running average of each metric that gives more weight to recent observations than older ones, controlled by a smoothing factor lambda. Rather than alerting when a metric crosses a fixed threshold (e.g., "alert when error rate > 5%"), EWMA-based detection alerts when the metric deviates significantly from its recent baseline. This is better for three reasons: it adapts to legitimate baseline changes (if your completion rate is normally 78%, a drop to 68% is more alarming than the same drop from 95% to 85%), it filters out transient spikes (a single bad batch does not page on-call), and it detects gradual drift (a slow decline from 90% to 80% over two weeks is caught before it becomes critical).


Q: Describe the production improvement flywheel. How does production monitoring feed back into agent improvement?

A: The flywheel has five stages that repeat continuously. First, trace collection: every production session is traced, with 100% of failures and a 10% sample of successes stored. Second, anomaly detection: EWMA-based detectors alert on metric deviations within minutes of degradation. Third, weekly human review: engineers review a sample of stored traces, identify qualitative failure patterns, and extract high-value failure cases. Fourth, eval dataset update: production failure cases are added to the evaluation dataset, ensuring the eval set evolves with real user behavior. Fifth, agent improvement: insights from trace review guide prompt changes, tool updates, or model changes. The new version is deployed, the baseline is updated, and the cycle repeats. After 6 months, the result is an eval dataset full of real production failures that the agent now handles correctly - a compounding quality advantage.

© 2026 EngineersOfAI. All rights reserved.