Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Product Architecture demo on the EngineersOfAI Playground - no code required. :::

AI Product Architecture

From Prototype to Production

You built a chatbot prototype in a weekend. It worked brilliantly in demos - fast, accurate, impressive. Your CTO green-lit it for production. Two months later, you're fielding 3 AM pages about timeouts, users complaining about lost conversations, your team discovering that the "architecture" is a single 800-line app.py with global state, no retry logic, and LLM calls happening synchronously in request handlers.

The prototype became a product without ever becoming an architecture.

This is the most common failure mode in AI product development. Teams move so fast on the demo that they never step back to ask: what does this need to look like at 1,000 concurrent users? What happens when the LLM provider has a 5-minute outage? How do we debug why a specific user got a bad response six days ago? How do we update the system prompt without redeploying the entire service?

The answer is that AI products need the same architectural rigor as any distributed system - plus several AI-specific concerns that traditional architecture patterns don't address: model version management, prompt versioning, context window management, streaming response handling, token cost tracking, and evaluation infrastructure.

This lesson maps the complete architecture of a production AI product, from the API gateway that receives user requests to the evaluation pipeline that catches regressions before they reach users. Every component exists for a reason - we'll examine what breaks without it.


Why This Exists

Early AI products (2022-2023) were mostly monoliths: one service, one database, one LLM call per request, synchronous from end to end. This worked when traffic was low and the LLM provider was reliable. At scale, three structural problems emerged.

LLM latency is unpredictable. A synchronous LLM call can take anywhere from 500ms to 30 seconds depending on output length, model load, and provider infrastructure. Synchronous architectures mean your request thread is blocked for this entire duration. At 1,000 concurrent users, you need 1,000 threads - plus the LLM calls create backpressure that can cascade into timeouts and 503s.

The system prompt is business logic. Changing a system prompt in a monolith means redeploying the application. Teams discovered they needed to iterate on prompts daily, sometimes hourly. System prompts needed versioning, rollback capability, A/B testing, and environment-specific overrides - the same workflow as database migrations.

You can't debug what you don't log. When a user reports "the AI gave me wrong information last Tuesday," you need to be able to reconstruct exactly what prompt was sent, what documents were retrieved, which model version responded, and what the exact response was. Without structured observability at every layer, this is impossible.

The architecture described in this lesson solves all three problems.


The Complete Stack


Layer 1: The API Gateway

The gateway is the first line of defense. Its job is to authenticate requests, enforce rate limits, route to the right service version, and provide a stable external API even as the internal architecture evolves.

# gateway/middleware.py
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
import time
import uuid


app = FastAPI(title="AI Product Gateway")

app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.yourdomain.com"],
allow_methods=["POST", "GET"],
allow_headers=["Authorization", "Content-Type"],
)


async def get_tenant_context(request: Request) -> dict:
"""Extract and validate tenant context from request."""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization")

api_key = auth_header.removeprefix("Bearer ")
tenant = await validate_api_key(api_key) # DB lookup + cache
if not tenant:
raise HTTPException(status_code=401, detail="Invalid API key")

return {
"tenant_id": tenant["id"],
"tier": tenant["tier"],
"request_id": str(uuid.uuid4()),
"request_start": time.time(),
}


@app.middleware("http")
async def add_request_id(request: Request, call_next):
"""Inject request ID for distributed tracing."""
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request.state.request_id = request_id

response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response


@app.post("/v1/chat/completions")
async def chat_completion(
request: Request,
body: dict,
ctx: dict = Depends(get_tenant_context),
):
"""
Main chat endpoint. Stable API contract - internal architecture can change.
Modeled after OpenAI API for compatibility.
"""
# Validate request
if not body.get("messages"):
raise HTTPException(status_code=400, detail="messages required")

# Route to AI service
result = await ai_service.handle_chat(
tenant_id=ctx["tenant_id"],
request_id=ctx["request_id"],
messages=body["messages"],
model=body.get("model", "default"),
stream=body.get("stream", False),
)

return result


async def validate_api_key(api_key: str) -> Optional[dict]:
"""Validate API key with caching."""
# Check Redis cache first (5 minute TTL)
# cache_key = f"apikey:{hashlib.sha256(api_key.encode()).hexdigest()}"
# cached = await redis.get(cache_key)
# if cached: return json.loads(cached)

# DB lookup
# tenant = await db.fetchone("SELECT * FROM tenants WHERE key_hash = ?", hash(api_key))
# await redis.setex(cache_key, 300, json.dumps(tenant))
# return tenant
return {"id": "tenant_123", "tier": "professional"} # mock

Layer 2: Prompt Management

System prompts are business logic. They must be versioned, tested, and deployable without code changes. The Prompt Registry is a service (or at minimum a database table) that stores prompt templates, tracks versions, and enables A/B testing.

# services/prompt_manager.py
from dataclasses import dataclass
from typing import Optional
import anthropic
import hashlib
import json


@dataclass
class PromptTemplate:
template_id: str
version: str
content: str
variables: list[str] # e.g., ["company_name", "current_date"]
created_at: float
created_by: str
description: str
tags: list[str]
is_active: bool = True


@dataclass
class PromptVersion:
template_id: str
version: str
content_hash: str


class PromptRegistry:
"""
Versioned prompt storage with A/B testing support.
Prompts are first-class artifacts - versioned like code.
"""

def __init__(self, db, redis_client):
self.db = db
self.cache = redis_client
self._local_cache: dict[str, PromptTemplate] = {}

async def get_prompt(
self,
template_id: str,
version: Optional[str] = None,
variables: Optional[dict] = None,
) -> str:
"""
Retrieve and render a prompt template.
If version is None, uses the active (latest) version.
"""
cache_key = f"prompt:{template_id}:{version or 'active'}"

# Check local cache first (hot path)
if cache_key in self._local_cache and not version:
template = self._local_cache[cache_key]
else:
# In production: fetch from DB
# template = await self.db.fetchone(
# "SELECT * FROM prompt_templates WHERE id=? AND version=?",
# (template_id, version or 'active')
# )
template = self._mock_template(template_id)
self._local_cache[cache_key] = template

rendered = self._render(template.content, variables or {})
return rendered

def _render(self, template: str, variables: dict) -> str:
"""Render template with variable substitution."""
result = template
for key, value in variables.items():
result = result.replace(f"{{{{{key}}}}}", str(value))
return result

def _mock_template(self, template_id: str) -> PromptTemplate:
"""Mock template for demo purposes."""
return PromptTemplate(
template_id=template_id,
version="v1.2.3",
content=(
"You are a helpful AI assistant for {{company_name}}.\n"
"Today's date is {{current_date}}.\n"
"Always be concise, accurate, and cite sources when available."
),
variables=["company_name", "current_date"],
created_at=1700000000.0,
created_by="system",
description="Base assistant system prompt",
tags=["base", "assistant"],
)

async def create_version(
self,
template_id: str,
content: str,
created_by: str,
description: str,
) -> PromptVersion:
"""
Create a new prompt version.
Computes content hash for deduplication.
"""
content_hash = hashlib.sha256(content.encode()).hexdigest()[:16]

# Check for duplicate content
# existing = await self.db.fetchone(
# "SELECT version FROM prompt_templates WHERE content_hash=?", content_hash
# )
# if existing: raise ValueError(f"Identical content exists as {existing['version']}")

# Increment semantic version
# current = await self.get_current_version(template_id)
# new_version = increment_minor(current)

version = PromptVersion(
template_id=template_id,
version="v1.3.0",
content_hash=content_hash,
)

# await self.db.execute("INSERT INTO prompt_templates ...", ...)
# Invalidate cache
self._local_cache.pop(f"prompt:{template_id}:active", None)

print(f"Created prompt version {version.version} for {template_id}")
return version

async def ab_test_prompt(
self,
template_id: str,
tenant_id: str,
variants: dict[str, float], # version → traffic fraction
) -> str:
"""
Assign tenant to an A/B test variant deterministically.
Same tenant always gets the same variant (consistent experience).
"""
import hashlib

# Deterministic assignment based on tenant_id
hash_val = int(hashlib.md5(f"{template_id}:{tenant_id}".encode()).hexdigest(), 16)
bucket = (hash_val % 100) / 100.0

cumulative = 0.0
for version, fraction in variants.items():
cumulative += fraction
if bucket <= cumulative:
return await self.get_prompt(template_id, version=version)

# Fallback to active version
return await self.get_prompt(template_id)

Layer 3: Context Management

The context manager enforces token budgets, manages conversation history, and ensures that every LLM call gets exactly the right amount of context - no more (cost), no less (quality).

# services/context_manager.py
from dataclasses import dataclass, field
from typing import Optional
import json
import time


@dataclass
class Message:
role: str # "user" | "assistant" | "system"
content: str
timestamp: float = field(default_factory=time.time)
token_count: Optional[int] = None
metadata: dict = field(default_factory=dict)


@dataclass
class ConversationContext:
conversation_id: str
tenant_id: str
user_id: str
messages: list[Message] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
last_activity: float = field(default_factory=time.time)
total_tokens_used: int = 0
metadata: dict = field(default_factory=dict)


class ContextManager:
"""
Manages conversation context with intelligent windowing.
Ensures LLM calls always fit within token budgets.
"""

def __init__(self, db, max_history_messages: int = 20):
self.db = db
self.max_history_messages = max_history_messages

async def get_context(
self,
conversation_id: str,
tenant_id: str,
) -> Optional[ConversationContext]:
"""Load conversation context from storage."""
# In production: DB lookup with tenant scope
# row = await self.db.fetchone(
# "SELECT * FROM conversations WHERE id=? AND tenant_id=?",
# (conversation_id, tenant_id)
# )
return None # New conversation

async def save_turn(
self,
conversation_id: str,
tenant_id: str,
user_message: str,
assistant_response: str,
tokens_used: int,
) -> None:
"""Append a conversation turn to storage."""
# INSERT INTO messages (conversation_id, role, content, tokens, timestamp)
# VALUES (?, 'user', ?, 0, NOW()), (?, 'assistant', ?, ?, NOW())
pass

def build_messages_for_llm(
self,
context: Optional[ConversationContext],
new_user_message: str,
max_context_tokens: int,
system_prompt: str = "",
) -> tuple[list[dict], int]:
"""
Build the messages array for the LLM call.
Trims history to fit within token budget.
Returns (messages, estimated_token_count).
"""
# Estimate system prompt tokens
system_tokens = len(system_prompt) // 4
response_reserve = 2048 # reserve for assistant response
available = max_context_tokens - system_tokens - response_reserve

messages = []
token_count = len(new_user_message) // 4

if context and context.messages:
# Take most recent messages that fit within budget
history = context.messages[-self.max_history_messages:]

# Walk backwards to fit as much history as possible
kept = []
for msg in reversed(history):
msg_tokens = (msg.token_count or len(msg.content) // 4)
if token_count + msg_tokens > available:
break
kept.append(msg)
token_count += msg_tokens

# Add in chronological order
for msg in reversed(kept):
messages.append({"role": msg.role, "content": msg.content})

messages.append({"role": "user", "content": new_user_message})
return messages, token_count

def summarize_history_if_needed(
self,
messages: list[dict],
current_tokens: int,
max_tokens: int,
) -> list[dict]:
"""
If conversation is too long, summarize older messages.
Keeps recent messages verbatim, summarizes the rest.
"""
if current_tokens <= max_tokens * 0.8:
return messages # No summarization needed

# Keep last N messages verbatim
keep_recent = 6
recent = messages[-keep_recent:]
older = messages[:-keep_recent]

if not older:
return messages

# In production: use a fast model to summarize older messages
summary = f"[Earlier conversation summary: {len(older)} messages covering previous context]"

return [{"role": "system", "content": summary}] + recent

Layer 4: The LLM Service

The LLM service is the only component that actually calls the API. Everything else is preparation and post-processing. It handles streaming, retries, fallback models, and usage recording.

# services/llm_service.py
import anthropic
import asyncio
import time
from typing import AsyncIterator, Optional
from dataclasses import dataclass


@dataclass
class LLMResponse:
content: str
model: str
input_tokens: int
output_tokens: int
latency_ms: int
stop_reason: str
request_id: str


class LLMService:
"""
Production LLM service with retry, fallback, and observability.
"""

# Model fallback chain - try in order on failure
FALLBACK_CHAIN = {
"claude-opus-4-6": ["claude-sonnet-4-6", "claude-haiku-4-5-20251001"],
"claude-sonnet-4-6": ["claude-haiku-4-5-20251001"],
"claude-haiku-4-5-20251001": [],
}

def __init__(self, tracer, metrics):
self.client = anthropic.AsyncAnthropic()
self.tracer = tracer
self.metrics = metrics

async def complete(
self,
request_id: str,
model: str,
system: str,
messages: list[dict],
max_tokens: int = 2048,
temperature: float = 1.0,
stream: bool = False,
) -> LLMResponse:
"""
Non-streaming completion with retry and fallback.
"""
start = time.time()
last_error = None

# Try primary model, then fallbacks
models_to_try = [model] + self.FALLBACK_CHAIN.get(model, [])

for attempt_model in models_to_try:
for attempt in range(3): # 3 retries per model
try:
response = await self.client.messages.create(
model=attempt_model,
max_tokens=max_tokens,
system=system,
messages=messages,
temperature=temperature,
)

latency_ms = int((time.time() - start) * 1000)

# Record metrics
self.metrics.record_llm_call(
model=attempt_model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency_ms,
success=True,
)

return LLMResponse(
content=response.content[0].text,
model=attempt_model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency_ms,
stop_reason=response.stop_reason,
request_id=request_id,
)

except anthropic.RateLimitError:
wait = (2 ** attempt) + (0.1 * attempt)
await asyncio.sleep(wait)
last_error = "rate_limit"

except anthropic.APIStatusError as e:
if e.status_code >= 500: # Server error - retry
wait = (2 ** attempt)
await asyncio.sleep(wait)
last_error = f"server_error_{e.status_code}"
else: # 4xx - don't retry, try fallback
break

except anthropic.APIConnectionError:
wait = (2 ** attempt)
await asyncio.sleep(wait)
last_error = "connection_error"

# Log fallback
if attempt_model != model:
print(f"[{request_id}] Fell back from {model} to {attempt_model}")

raise Exception(f"All LLM attempts failed. Last error: {last_error}")

async def stream(
self,
request_id: str,
model: str,
system: str,
messages: list[dict],
max_tokens: int = 2048,
) -> AsyncIterator[str]:
"""
Streaming completion - yields text chunks.
"""
start = time.time()
total_output_tokens = 0

async with self.client.messages.stream(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
yield text
total_output_tokens += len(text) // 4 # rough estimate

# Get final usage from stream
final = await stream.get_final_message()
latency_ms = int((time.time() - start) * 1000)

self.metrics.record_llm_call(
model=model,
input_tokens=final.usage.input_tokens,
output_tokens=final.usage.output_tokens,
latency_ms=latency_ms,
success=True,
)

Layer 5: The RAG Engine

# services/rag_engine.py
from dataclasses import dataclass
from typing import Optional
import asyncio


@dataclass
class RetrievedChunk:
content: str
source: str
doc_id: str
similarity_score: float
metadata: dict


class RAGEngine:
"""
Retrieval-Augmented Generation engine.
Handles embedding, search, reranking, and context assembly.
"""

def __init__(
self,
embedding_service,
vector_store,
reranker=None,
):
self.embedding_service = embedding_service
self.vector_store = vector_store
self.reranker = reranker # Optional cross-encoder reranker

async def retrieve(
self,
query: str,
tenant_id: str,
top_k: int = 10,
rerank_top_n: int = 5,
filters: Optional[dict] = None,
) -> list[RetrievedChunk]:
"""
Full RAG retrieval pipeline:
1. Embed query
2. Vector search (retrieve top_k candidates)
3. Rerank (select top_n final results)
"""
# 1. Embed query
query_embedding = await self.embedding_service.embed(query)

# 2. Vector search within tenant namespace
raw_results = await self.vector_store.search(
tenant_id=tenant_id,
query_embedding=query_embedding,
top_k=top_k,
filters=filters,
)

chunks = [
RetrievedChunk(
content=r.get("content", ""),
source=r.get("source", ""),
doc_id=r.get("doc_id", ""),
similarity_score=r.get("score", 0.0),
metadata=r,
)
for r in raw_results
]

# 3. Rerank if available (significantly improves precision)
if self.reranker and len(chunks) > rerank_top_n:
chunks = await self._rerank(query, chunks, rerank_top_n)
else:
chunks = chunks[:rerank_top_n]

return chunks

async def _rerank(
self,
query: str,
chunks: list[RetrievedChunk],
top_n: int,
) -> list[RetrievedChunk]:
"""
Cross-encoder reranking: more accurate than bi-encoder similarity.
Uses a separate model (e.g., Cohere Rerank, BGE Reranker).
"""
# In production: call reranker API
# scores = await self.reranker.rank(query, [c.content for c in chunks])
# reranked = sorted(zip(scores, chunks), key=lambda x: x[0], reverse=True)
# return [chunk for _, chunk in reranked[:top_n]]

# Mock: just return top N by similarity score
return sorted(chunks, key=lambda c: c.similarity_score, reverse=True)[:top_n]

def build_context_string(
self,
chunks: list[RetrievedChunk],
max_tokens: int = 3000,
) -> str:
"""Assemble retrieved chunks into a context string."""
if not chunks:
return ""

parts = ["## Context from Knowledge Base\n"]
token_budget = max_tokens
included = 0

for chunk in chunks:
chunk_text = (
f"### Source: {chunk.source}\n"
f"{chunk.content}\n\n"
)
chunk_tokens = len(chunk_text) // 4
if token_budget - chunk_tokens < 0:
break
parts.append(chunk_text)
token_budget -= chunk_tokens
included += 1

if included < len(chunks):
parts.append(f"*({len(chunks) - included} additional sources omitted)*\n")

return "".join(parts)

Layer 6: Observability

# observability/tracing.py
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional, Any
from contextlib import asynccontextmanager


@dataclass
class Span:
trace_id: str
span_id: str
parent_span_id: Optional[str]
operation: str
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
status: str = "ok"
error: Optional[str] = None

def finish(self, error: Optional[str] = None):
self.end_time = time.time()
self.status = "error" if error else "ok"
self.error = error

@property
def duration_ms(self) -> Optional[float]:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return None


class AITracer:
"""
Distributed tracer for AI request pipelines.
Captures the full call graph: gateway → context → RAG → LLM.
"""

def __init__(self, export_fn=None):
self._active_spans: dict[str, Span] = {}
self.export_fn = export_fn or self._print_span

@asynccontextmanager
async def start_span(
self,
operation: str,
trace_id: Optional[str] = None,
parent_span_id: Optional[str] = None,
attributes: Optional[dict] = None,
):
span = Span(
trace_id=trace_id or str(uuid.uuid4()),
span_id=str(uuid.uuid4())[:8],
parent_span_id=parent_span_id,
operation=operation,
attributes=attributes or {},
)
self._active_spans[span.span_id] = span

try:
yield span
span.finish()
except Exception as e:
span.finish(error=str(e))
raise
finally:
self._active_spans.pop(span.span_id, None)
self.export_fn(span)

def _print_span(self, span: Span):
print(
f"[TRACE] {span.trace_id[:8]} | {span.operation} | "
f"{span.duration_ms:.0f}ms | {span.status}"
)


class AIMetrics:
"""
Key metrics for AI product monitoring.
"""

def __init__(self):
self._data: dict[str, list] = {}

def record_llm_call(
self,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: int,
success: bool,
) -> None:
"""Record LLM call metrics."""
key = f"llm.{model}"
if key not in self._data:
self._data[key] = []
self._data[key].append({
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"latency_ms": latency_ms,
"success": success,
"timestamp": time.time(),
})

def record_rag_retrieval(
self,
tenant_id: str,
chunks_retrieved: int,
retrieval_latency_ms: int,
) -> None:
"""Record RAG retrieval metrics."""
print(
f"[METRICS] RAG | tenant={tenant_id} | "
f"chunks={chunks_retrieved} | latency={retrieval_latency_ms}ms"
)

def get_p99_latency(self, model: str) -> Optional[float]:
"""Get p99 latency for a model."""
data = self._data.get(f"llm.{model}", [])
if not data:
return None
latencies = sorted(d["latency_ms"] for d in data)
p99_idx = int(len(latencies) * 0.99)
return latencies[p99_idx]

Layer 7: Evaluation Pipeline

The eval pipeline is what separates teams that catch regressions before production from those that discover them in user complaints.

# eval/pipeline.py
import anthropic
from dataclasses import dataclass
from typing import Optional
import json


@dataclass
class EvalCase:
case_id: str
input_messages: list[dict]
expected_contains: list[str] # response must contain these
expected_not_contains: list[str] # response must NOT contain these
expected_tone: Optional[str] # "helpful", "professional", etc.
tags: list[str] # "safety", "accuracy", "format", etc.


@dataclass
class EvalResult:
case_id: str
passed: bool
score: float # 0.0 to 1.0
failures: list[str]
response: str
model: str
latency_ms: int


class EvalPipeline:
"""
Automated evaluation pipeline.
Runs on every prompt version change and model update.
"""

def __init__(self, system_prompt: str):
self.client = anthropic.Anthropic()
self.system_prompt = system_prompt

def run_case(self, case: EvalCase, model: str) -> EvalResult:
"""Run a single evaluation case."""
import time
start = time.time()

response = self.client.messages.create(
model=model,
max_tokens=1024,
system=self.system_prompt,
messages=case.input_messages,
)

latency_ms = int((time.time() - start) * 1000)
content = response.content[0].text

failures = []
score = 1.0

# Check expected content
for expected in case.expected_contains:
if expected.lower() not in content.lower():
failures.append(f"Missing expected content: '{expected}'")
score -= 0.2

# Check forbidden content
for forbidden in case.expected_not_contains:
if forbidden.lower() in content.lower():
failures.append(f"Contains forbidden content: '{forbidden}'")
score -= 0.3

# LLM-as-judge for qualitative checks
if case.expected_tone:
tone_pass = self._check_tone(content, case.expected_tone)
if not tone_pass:
failures.append(f"Tone check failed: expected '{case.expected_tone}'")
score -= 0.2

score = max(0.0, score)
return EvalResult(
case_id=case.case_id,
passed=len(failures) == 0,
score=score,
failures=failures,
response=content,
model=model,
latency_ms=latency_ms,
)

def _check_tone(self, response: str, expected_tone: str) -> bool:
"""Use LLM-as-judge to evaluate response tone."""
judge_prompt = (
f"Evaluate if this response has a '{expected_tone}' tone. "
f"Respond with only 'yes' or 'no'.\n\nResponse: {response[:500]}"
)
result = self.client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=10,
messages=[{"role": "user", "content": judge_prompt}],
)
return "yes" in result.content[0].text.lower()

def run_suite(
self,
cases: list[EvalCase],
model: str,
pass_threshold: float = 0.95,
) -> dict:
"""
Run full evaluation suite. Returns pass/fail with metrics.
Gate prompt/model deployments on this result.
"""
results = [self.run_case(case, model) for case in cases]

passed = sum(1 for r in results if r.passed)
pass_rate = passed / len(results) if results else 0.0
avg_score = sum(r.score for r in results) / len(results) if results else 0.0
avg_latency = sum(r.latency_ms for r in results) / len(results) if results else 0.0

failures_by_tag: dict[str, int] = {}
for case, result in zip(cases, results):
if not result.passed:
for tag in case.tags:
failures_by_tag[tag] = failures_by_tag.get(tag, 0) + 1

return {
"model": model,
"total_cases": len(cases),
"passed": passed,
"failed": len(cases) - passed,
"pass_rate": round(pass_rate, 3),
"avg_score": round(avg_score, 3),
"avg_latency_ms": round(avg_latency),
"deployment_approved": pass_rate >= pass_threshold,
"failures_by_tag": failures_by_tag,
"details": [
{
"case_id": r.case_id,
"passed": r.passed,
"score": r.score,
"failures": r.failures,
"latency_ms": r.latency_ms,
}
for r in results
],
}

Putting It All Together

# main.py - the complete request handler
import anthropic
import time
import uuid
from typing import Optional


class AIProduct:
"""
Complete AI product orchestrator.
Coordinates all layers for a production request.
"""

def __init__(
self,
prompt_registry: PromptRegistry,
context_manager: ContextManager,
rag_engine: RAGEngine,
llm_service: LLMService,
cost_tracker,
tracer: AITracer,
metrics: AIMetrics,
):
self.prompts = prompt_registry
self.context = context_manager
self.rag = rag_engine
self.llm = llm_service
self.costs = cost_tracker
self.tracer = tracer
self.metrics = metrics

async def handle_chat(
self,
tenant_id: str,
user_id: str,
request_id: str,
conversation_id: Optional[str],
user_message: str,
model: str = "claude-sonnet-4-6",
stream: bool = False,
) -> dict:
"""
Full request handler with all production concerns addressed.
"""
trace_id = request_id

async with self.tracer.start_span("chat_request", trace_id=trace_id):

# 1. Load conversation context
async with self.tracer.start_span("load_context", trace_id=trace_id):
ctx = await self.context.get_context(
conversation_id or str(uuid.uuid4()),
tenant_id,
)

# 2. RAG retrieval (parallel with context load)
async with self.tracer.start_span("rag_retrieval", trace_id=trace_id):
chunks = await self.rag.retrieve(
query=user_message,
tenant_id=tenant_id,
top_k=10,
rerank_top_n=5,
)
rag_context = self.rag.build_context_string(chunks)

# 3. Build system prompt from registry
async with self.tracer.start_span("build_prompt", trace_id=trace_id):
import datetime
system_prompt = await self.prompts.get_prompt(
"base_assistant",
variables={
"company_name": "Acme Corp",
"current_date": datetime.date.today().isoformat(),
},
)
if rag_context:
system_prompt += f"\n\n{rag_context}"

# 4. Build message history within token budget
messages, est_tokens = self.context.build_messages_for_llm(
context=ctx,
new_user_message=user_message,
max_context_tokens=32000,
system_prompt=system_prompt,
)

# 5. LLM call
async with self.tracer.start_span(
"llm_call",
trace_id=trace_id,
attributes={"model": model, "estimated_tokens": est_tokens},
):
response = await self.llm.complete(
request_id=request_id,
model=model,
system=system_prompt,
messages=messages,
max_tokens=2048,
)

# 6. Record cost
self.costs.record_usage(
tenant_id=tenant_id,
model=response.model,
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
)

# 7. Save conversation turn
await self.context.save_turn(
conversation_id=conversation_id or "new",
tenant_id=tenant_id,
user_message=user_message,
assistant_response=response.content,
tokens_used=response.input_tokens + response.output_tokens,
)

return {
"response": response.content,
"conversation_id": conversation_id,
"model": response.model,
"usage": {
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
},
"sources": [
{"source": c.source, "score": round(c.similarity_score, 3)}
for c in chunks
],
"latency_ms": response.latency_ms,
}

Deployment Architecture


Production Engineering Notes

Prompt versioning is non-negotiable. You will change system prompts. Treat them like database migrations: every change gets a version, changes are reviewed, rollback is one command. Store prompt versions in your database, not in code.

Scale LLM calls with async, not more servers. LLM calls are I/O-bound (waiting on network). Use AsyncAnthropic with async frameworks (FastAPI + asyncio). A single Python process can handle thousands of concurrent LLM calls this way - you don't need to horizontally scale until you hit CPU/memory limits.

Context assembly is your reliability lever. The most impactful reliability improvement in most AI products is better context management: trimming stale history, summarizing long conversations, deduplicating retrieved chunks. Better context → better responses without touching the model.

Separate the read and write paths. User-facing requests (read: get response) should never wait on write operations (save conversation, update analytics). Write operations belong in async background tasks or fire-and-forget queues.

Health checks for LLM services. Your kubernetes readiness probe should make a real (but cheap) LLM call. A service that can't reach the Anthropic API is not ready to serve traffic, even if the process is running.

:::tip Canary Deployments for Prompt Changes Before rolling out a new prompt version to all traffic, run it on 5% of requests. Compare: response length distribution, user satisfaction signals (thumbs up/down), refusal rates, hallucination detection scores. Roll forward only if all metrics are equal or better. :::

:::warning Avoid Synchronous LLM Calls in Request Handlers If your web framework's request handler makes a synchronous requests.post() call to the LLM API, you're blocking a thread per request. At 100 concurrent users with 10-second LLM latency, you need 1,000 blocked threads. Use AsyncAnthropic + async request handlers. :::

:::danger Model Version Pinning Always pin to a specific model version (e.g., claude-sonnet-4-6) in production, not a floating alias. Model providers update aliases, which means your system's behavior changes without any code change on your side. This has broken production AI products. Pin versions, test new versions in staging first. :::


Interview Questions

Q: Design the architecture for a production AI chat product serving 100K users/day. What are the critical components?

The critical components: (1) API gateway with auth and rate limiting, (2) prompt registry for versioned system prompts, (3) context manager for conversation history with token budget enforcement, (4) RAG engine for knowledge retrieval, (5) async LLM service with retry/fallback, (6) per-tenant cost tracker, (7) distributed tracing for observability, and (8) automated eval pipeline to catch regressions. The non-obvious one is the eval pipeline - most teams skip it and discover prompt regressions from angry users.

Q: How do you version and deploy system prompt changes safely?

Treat prompts like database migrations. Store all versions in a prompt_templates table with version strings (semantic versioning). Deploy changes through a pipeline: write new version → run eval suite against it → canary at 5% traffic → monitor metrics (satisfaction rate, refusal rate, response length) → full rollout or rollback. Never deploy a prompt change without first running your eval suite. A/B testing at the prompt level is critical for data-driven iteration.

Q: How do you debug "the AI gave a wrong answer" for a specific user request from last week?

This requires structured audit logging on every request. Each LLM call must log: request_id, tenant_id, user_id, timestamp, system_prompt_version, model, messages sent, response received, tokens used, and latency. With this log, you can replay the exact request to reproduce the issue, compare against the current prompt version to identify regressions, and trace the retrieved RAG chunks to identify a knowledge base error. Without this log, debugging past AI behavior is nearly impossible.

Q: How do you handle LLM provider outages in a production AI product?

Multiple layers: (1) Exponential backoff with jitter for transient errors. (2) Fallback model chain: if claude-opus-4-6 fails, try claude-sonnet-4-6, then claude-haiku-4-5-20251001. (3) Circuit breaker: after N consecutive failures, open the circuit and return cached responses or a graceful degradation message. (4) If budget allows: multi-provider failover (Anthropic primary, OpenAI secondary). (5) For async workloads: queue messages and process when the provider recovers. Always have a graceful degradation strategy - returning a meaningful error message is better than a timeout.

Q: What's the difference between p50 and p99 latency, and why does it matter for AI products?

p50 (median) is the typical user experience. p99 is what 1 in 100 users experiences. For AI products with 100K daily users, p99 = 1,000 users getting that experience daily. LLM latency distributions are highly skewed - p50 might be 2 seconds but p99 might be 25 seconds. This happens because: long outputs take longer, model retries add latency, and RAG retrieval can be slow for cold cache. Monitor p99 separately from p50. SLAs should be defined on p99. Common fix: implement streaming (users see tokens immediately, even if total completion takes 15 seconds) and set aggressive timeout thresholds.

Q: How do you scale an AI service horizontally?

LLM services are I/O-bound - the bottleneck is waiting on the LLM API, not CPU. Start with async within a single process: AsyncAnthropic + FastAPI + asyncio can handle hundreds of concurrent calls. Scale horizontally when CPU or memory becomes the bottleneck (usually from context assembly, embedding computation, or reranking). Use stateless AI service instances behind a load balancer - conversation state lives in the database, not in-process. Scale the vector store independently from the AI service. The embedding service and reranker can be separate pods that auto-scale on CPU.

© 2026 EngineersOfAI. All rights reserved.