Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Product Architecture demo on the EngineersOfAI Playground - no code required. :::

LLM-Powered Product Architecture

The Production Reality Check

A team ships an LLM-powered customer support assistant to beta. It works beautifully in demos: the model understands customer questions, retrieves relevant documentation, and generates clear, helpful responses. Beta launch is a success. The decision is made to scale to 100,000 daily active users.

The engineering team runs the numbers. At 100,000 DAU with an average of 5 messages per session, that is 500,000 messages per day. Each message involves an API call to GPT-4 with a 2,000-token prompt and a 500-token response. At 30permillioninputtokensand30 per million input tokens and 60 per million output tokens: input cost is 30×(500,000×2,000/1,000,000)=30 × (500,000 × 2,000 / 1,000,000) = 30,000/day. Output cost is 60×(500,000×500/1,000,000)=60 × (500,000 × 500 / 1,000,000) = 15,000/day. Total: 45,000/day.Thatis45,000/day. That is 1.35 million per month.

The current product revenue is $200,000 per month. The LLM cost alone is 6.75x the revenue.

This is not a hypothetical. This is the situation that most LLM product teams face when they move from prototype to production at scale. The question is not "how do we get LLMs to work?" - they work in demos. The question is "how do we build a production system around LLMs that is reliable, observable, and economically viable at scale?"

This lesson answers that question. It covers the full production LLM stack: the serving infrastructure, prompt management, RAG for knowledge grounding, multi-LLM routing for cost control, streaming for user experience, and the observability stack that lets you actually understand what is happening at scale.


Requirements

Functional requirements:

  • LLM-powered customer support: answer product questions, troubleshoot issues, escalate complex cases to humans
  • Ground responses in company knowledge base (documentation, FAQ, tickets)
  • Support conversation context (multi-turn conversations)
  • Escalate to human agents when confidence is low or topic is out of scope

Non-functional requirements:

  • Latency: time to first token under 1 second; full response under 10 seconds
  • Throughput: 1,000 concurrent conversations
  • Cost: under $0.10 per customer conversation (to be economically viable)
  • Reliability: 99.9% uptime; graceful degradation when LLM provider is unavailable
  • Observability: log every LLM interaction with sufficient detail to debug quality issues

The LLM Serving Stack


RAG Architecture: Knowledge-Grounded Responses

RAG (Retrieval Augmented Generation) grounds LLM responses in your actual knowledge base, reducing hallucination and keeping responses accurate and current.

import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from elasticsearch import Elasticsearch
from typing import Optional
import hashlib


class ProductionRAGPipeline:
"""
Production RAG pipeline with hybrid retrieval:
- Dense semantic search (sentence-transformers + FAISS)
- Sparse keyword search (BM25 via Elasticsearch)
- Fusion via Reciprocal Rank Fusion
- Reranking with cross-encoder for precision
"""

def __init__(
self,
embedding_model: str = "BAAI/bge-base-en-v1.5",
es_host: str = "localhost:9200",
index_name: str = "knowledge_base",
top_k_dense: int = 20,
top_k_sparse: int = 20,
top_k_final: int = 5,
):
self.encoder = SentenceTransformer(embedding_model)
self.es = Elasticsearch([es_host])
self.index_name = index_name
self.top_k_dense = top_k_dense
self.top_k_sparse = top_k_sparse
self.top_k_final = top_k_final
self.faiss_index = None
self.chunk_store: dict = {}

def index_documents(self, documents: list) -> None:
"""
Index documents into both vector index and Elasticsearch.
documents: [{"id": ..., "content": ..., "metadata": {...}}]
"""
# Chunk documents into smaller segments
chunks = self._chunk_documents(documents)

# Encode and build FAISS index
texts = [c["content"] for c in chunks]
embeddings = self.encoder.encode(
texts, batch_size=64, show_progress_bar=True,
normalize_embeddings=True,
).astype("float32")

self.faiss_index = faiss.IndexFlatIP(embeddings.shape[1])
self.faiss_index.add(embeddings)

# Store chunks for retrieval
for i, chunk in enumerate(chunks):
self.chunk_store[i] = chunk

# Index into Elasticsearch for sparse retrieval
actions = [
{
"_index": self.index_name,
"_id": chunk["chunk_id"],
"_source": {
"content": chunk["content"],
"doc_id": chunk["doc_id"],
"chunk_idx": i,
"metadata": chunk["metadata"],
}
}
for i, chunk in enumerate(chunks)
]
from elasticsearch.helpers import bulk
bulk(self.es, actions)
print(f"[RAG] Indexed {len(chunks)} chunks")

def retrieve(
self,
query: str,
filters: Optional[dict] = None,
) -> list:
"""
Hybrid retrieval: dense + sparse, fused with RRF.
Returns top_k_final chunks sorted by relevance.
"""
# Dense retrieval
query_emb = self.encoder.encode(
[query], normalize_embeddings=True
).astype("float32")
dense_scores, dense_indices = self.faiss_index.search(
query_emb, self.top_k_dense
)
dense_results = [
(int(idx), float(score))
for idx, score in zip(dense_indices[0], dense_scores[0])
if idx >= 0
]

# Sparse retrieval
es_response = self.es.search(
index=self.index_name,
body={
"size": self.top_k_sparse,
"query": {
"multi_match": {
"query": query,
"fields": ["content^2", "metadata.title"],
}
}
}
)
sparse_results = [
(int(hit["_source"]["chunk_idx"]), float(hit["_score"]))
for hit in es_response["hits"]["hits"]
]

# Reciprocal Rank Fusion
fused = self._reciprocal_rank_fusion(dense_results, sparse_results)

# Return top chunks with content
top_chunks = []
for chunk_idx, rrf_score in fused[:self.top_k_final]:
chunk = self.chunk_store.get(chunk_idx, {})
if chunk:
top_chunks.append({
"content": chunk["content"],
"score": rrf_score,
"source": chunk.get("metadata", {}).get("source", "unknown"),
})

return top_chunks

def build_context(self, retrieved_chunks: list) -> str:
"""Format retrieved chunks as context for the LLM prompt."""
if not retrieved_chunks:
return ""
context_parts = []
for i, chunk in enumerate(retrieved_chunks, 1):
context_parts.append(
f"[Source {i}: {chunk['source']}]\n{chunk['content']}"
)
return "\n\n---\n\n".join(context_parts)

def _chunk_documents(self, documents: list, chunk_size: int = 512) -> list:
"""Split documents into overlapping chunks."""
chunks = []
for doc in documents:
content = doc["content"]
words = content.split()
overlap = 50 # 50-word overlap between chunks
step = chunk_size - overlap

for i in range(0, max(1, len(words) - overlap), step):
chunk_words = words[i:i + chunk_size]
chunk_text = " ".join(chunk_words)
chunk_id = hashlib.md5(
f"{doc['id']}_{i}".encode()
).hexdigest()
chunks.append({
"chunk_id": chunk_id,
"doc_id": doc["id"],
"content": chunk_text,
"metadata": doc.get("metadata", {}),
})
return chunks

def _reciprocal_rank_fusion(
self,
dense_results: list,
sparse_results: list,
k: int = 60,
) -> list:
from collections import defaultdict
scores: dict = defaultdict(float)
for rank, (idx, _) in enumerate(dense_results):
scores[idx] += 1.0 / (k + rank + 1)
for rank, (idx, _) in enumerate(sparse_results):
scores[idx] += 1.0 / (k + rank + 1)
return sorted(scores.items(), key=lambda x: -x[1])

Prompt Management

Prompts are code. They must be versioned, tested, and deployed with the same rigor as model artifacts.

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import json


@dataclass
class PromptVersion:
"""A versioned prompt template."""
prompt_id: str
version: str # semantic versioning: "1.2.3"
template: str # the prompt with {placeholders}
variables: list # list of required template variables
model: str # intended model: "gpt-4o", "claude-3-5-sonnet"
max_tokens: int
temperature: float
created_by: str
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
metadata: dict = field(default_factory=dict)

def render(self, **kwargs) -> str:
"""Render the template with provided variables."""
missing = [v for v in self.variables if v not in kwargs]
if missing:
raise ValueError(f"Missing required variables: {missing}")
return self.template.format(**kwargs)


class PromptRegistry:
"""
Central registry for versioned prompts.
Supports A/B testing between prompt versions.
"""

CUSTOMER_SUPPORT_PROMPT = """You are a helpful customer support assistant for {company_name}.

## Your Role
Answer customer questions accurately based on the provided knowledge base context.
If the answer is not in the context, say "I don't have information on that" and offer to connect them with a human agent.

## Knowledge Base Context
{context}

## Conversation History
{conversation_history}

## Customer Question
{user_message}

## Instructions
- Use ONLY information from the Knowledge Base Context above
- If the context doesn't contain the answer, say so clearly
- Keep responses concise and actionable (under 200 words)
- If the customer seems frustrated or the issue is complex, offer to escalate
- Do not make up product features, prices, or policies

Response:"""

def __init__(self, storage_client):
self.storage = storage_client
self._cache: dict = {}

def register(self, prompt: PromptVersion) -> None:
"""Register a new prompt version."""
key = f"{prompt.prompt_id}/{prompt.version}"
self.storage.set(key, json.dumps(prompt.__dict__))
print(f"[PromptRegistry] Registered {key}")

def get(
self,
prompt_id: str,
version: str = "latest",
) -> Optional[PromptVersion]:
"""Retrieve a prompt version."""
cache_key = f"{prompt_id}/{version}"
if cache_key in self._cache:
return self._cache[cache_key]

data = self.storage.get(cache_key)
if not data:
return None

prompt = PromptVersion(**json.loads(data))
self._cache[cache_key] = prompt
return prompt

Multi-LLM Routing: Cost-Effective Model Selection

Not every conversation turn needs GPT-4. Simple factual questions can be answered by a cheaper, faster model. Complex reasoning, ambiguous queries, and emotional support conversations need the best model.

import re
from enum import Enum
from dataclasses import dataclass


class ModelTier(Enum):
CHEAP = "cheap" # GPT-4o-mini, Claude Haiku: ~$0.0015/1K tokens
STANDARD = "standard" # GPT-4o, Claude Sonnet: ~$0.015/1K tokens
PREMIUM = "premium" # o1, Claude Opus: ~$0.075/1K tokens


@dataclass
class RoutingDecision:
model: str
tier: ModelTier
reason: str
estimated_cost_per_1k_tokens: float


class LLMRouter:
"""
Routes requests to the appropriate LLM based on complexity,
cost targets, and fallback logic.
"""

MODEL_CONFIG = {
ModelTier.CHEAP: {
"model": "gpt-4o-mini",
"cost_per_1k_input": 0.00015,
"cost_per_1k_output": 0.0006,
},
ModelTier.STANDARD: {
"model": "gpt-4o",
"cost_per_1k_input": 0.0025,
"cost_per_1k_output": 0.01,
},
ModelTier.PREMIUM: {
"model": "o1",
"cost_per_1k_input": 0.015,
"cost_per_1k_output": 0.06,
},
}

# Keywords that suggest complexity requiring a better model
COMPLEX_KEYWORDS = [
"why", "explain", "how does", "compare", "analyze",
"help me understand", "not working", "broken", "error",
"urgent", "cancel", "refund", "frustrated", "angry",
]

# Indicators of simple queries that the cheap model can handle
SIMPLE_PATTERNS = [
r"what is your (hours|price|address|email|phone)",
r"how do i (reset|change|update) (my )?password",
r"what are (your )?shipping (times|costs|options)",
r"(what|where) is my order (status|number)",
]

def route(
self,
user_message: str,
conversation_turn: int,
user_tier: str = "standard", # "free", "standard", "enterprise"
) -> RoutingDecision:
"""
Determine which LLM tier to use for this message.
"""
message_lower = user_message.lower()

# Enterprise users always get standard or better
if user_tier == "enterprise":
return self._decide(ModelTier.STANDARD, "enterprise_user")

# Check for simple pattern match
for pattern in self.SIMPLE_PATTERNS:
if re.search(pattern, message_lower):
return self._decide(ModelTier.CHEAP, "simple_query_pattern")

# Check for complexity signals
complex_signals = sum(
1 for kw in self.COMPLEX_KEYWORDS if kw in message_lower
)
if complex_signals >= 2:
return self._decide(ModelTier.STANDARD, "complex_query")

# Long messages tend to be more complex
if len(user_message.split()) > 100:
return self._decide(ModelTier.STANDARD, "long_message")

# Later turns in conversation often need context synthesis
if conversation_turn > 5:
return self._decide(ModelTier.STANDARD, "deep_conversation")

# Default to cheap model for simple queries
return self._decide(ModelTier.CHEAP, "default_cheap")

def _decide(self, tier: ModelTier, reason: str) -> RoutingDecision:
config = self.MODEL_CONFIG[tier]
return RoutingDecision(
model=config["model"],
tier=tier,
reason=reason,
estimated_cost_per_1k_tokens=config["cost_per_1k_input"],
)

Streaming Responses: Token-by-Token UX

Users tolerate LLM latency better when they see tokens stream in immediately, rather than waiting for the full response. Implement Server-Sent Events (SSE) for streaming.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import asyncio
import json


app = FastAPI()
openai_client = AsyncOpenAI()
anthropic_client = AsyncAnthropic()


class LLMStreamingService:
"""Handles streaming responses from multiple LLM providers."""

async def stream_openai(
self,
model: str,
messages: list,
max_tokens: int = 1000,
):
"""Stream tokens from OpenAI API."""
async with openai_client.chat.completions.stream(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
) as stream:
async for text in stream.text_stream:
yield text

async def stream_anthropic(
self,
model: str,
system_prompt: str,
messages: list,
max_tokens: int = 1000,
):
"""Stream tokens from Anthropic API."""
async with anthropic_client.messages.stream(
model=model,
max_tokens=max_tokens,
system=system_prompt,
messages=messages,
temperature=0.3,
) as stream:
async for text in stream.text_stream:
yield text


streaming_service = LLMStreamingService()


@app.post("/chat/stream")
async def chat_stream(request: dict):
"""
SSE endpoint for streaming chat responses.
Client connects and receives tokens as they are generated.
"""
user_message = request["message"]
conversation_id = request["conversation_id"]

# 1. Router decides which model to use
router = LLMRouter()
routing = router.route(user_message, request.get("turn", 1))

# 2. RAG retrieval
rag = ProductionRAGPipeline()
chunks = rag.retrieve(user_message)
context = rag.build_context(chunks)

# 3. Build prompt
system_prompt = f"""You are a customer support assistant.
Use the following context to answer questions:

{context}

If the answer is not in the context, say so clearly."""

messages = request.get("history", []) + [
{"role": "user", "content": user_message}
]

async def generate():
"""Generate SSE events with tokens."""
full_response = ""
token_count = 0

try:
if "gpt" in routing.model:
token_stream = streaming_service.stream_openai(
model=routing.model,
messages=[{"role": "system", "content": system_prompt}] + messages,
)
else:
token_stream = streaming_service.stream_anthropic(
model=routing.model,
system_prompt=system_prompt,
messages=messages,
)

async for token in token_stream:
full_response += token
token_count += 1

# Send each token as an SSE event
event_data = json.dumps({"token": token, "done": False})
yield f"data: {event_data}\n\n"

# Send completion event
done_data = json.dumps({
"token": "",
"done": True,
"model_used": routing.model,
"token_count": token_count,
})
yield f"data: {done_data}\n\n"

except Exception as e:
error_data = json.dumps({
"error": str(e),
"done": True,
})
yield f"data: {error_data}\n\n"

return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # disable nginx buffering
}
)

Cost Management

from prometheus_client import Counter, Histogram, Gauge


# LLM cost tracking
LLM_INPUT_TOKENS = Counter(
"llm_input_tokens_total",
"Total input tokens consumed",
labelnames=["model", "user_tier", "feature"],
)

LLM_OUTPUT_TOKENS = Counter(
"llm_output_tokens_total",
"Total output tokens generated",
labelnames=["model", "user_tier", "feature"],
)

LLM_COST_USD = Counter(
"llm_cost_usd_total",
"Total LLM cost in USD",
labelnames=["model", "user_tier", "feature"],
)

LLM_LATENCY = Histogram(
"llm_response_latency_seconds",
"Time to first token from LLM",
labelnames=["model"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0],
)


class CostTracker:
"""Track and enforce per-user and per-conversation cost limits."""

# Cost per 1000 tokens (approximate)
COST_MAP = {
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4o": {"input": 0.0025, "output": 0.01},
"claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
"claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
}

LIMITS = {
"free": {"daily_cost_usd": 0.10, "monthly_cost_usd": 1.0},
"standard": {"daily_cost_usd": 1.0, "monthly_cost_usd": 10.0},
"enterprise": {"daily_cost_usd": 100.0, "monthly_cost_usd": 1000.0},
}

def __init__(self, redis_client):
self.redis = redis_client

def compute_cost(
self,
model: str,
input_tokens: int,
output_tokens: int,
) -> float:
model_costs = self.COST_MAP.get(model, {"input": 0.001, "output": 0.002})
cost = (
input_tokens / 1000 * model_costs["input"]
+ output_tokens / 1000 * model_costs["output"]
)
return cost

def record_usage(
self,
user_id: str,
model: str,
input_tokens: int,
output_tokens: int,
user_tier: str = "standard",
feature: str = "chat",
) -> float:
cost = self.compute_cost(model, input_tokens, output_tokens)

# Prometheus metrics
LLM_INPUT_TOKENS.labels(model=model, user_tier=user_tier, feature=feature).inc(input_tokens)
LLM_OUTPUT_TOKENS.labels(model=model, user_tier=user_tier, feature=feature).inc(output_tokens)
LLM_COST_USD.labels(model=model, user_tier=user_tier, feature=feature).inc(cost)

# Per-user daily counter in Redis
import datetime
today = datetime.date.today().isoformat()
daily_key = f"llm_cost:user:{user_id}:{today}"
self.redis.incrbyfloat(daily_key, cost)
self.redis.expire(daily_key, 86400 * 2) # keep 2 days

return cost

def check_limit(self, user_id: str, user_tier: str = "standard") -> bool:
"""Returns True if user is within their cost limit."""
import datetime
today = datetime.date.today().isoformat()
daily_key = f"llm_cost:user:{user_id}:{today}"
daily_cost = float(self.redis.get(daily_key) or 0.0)
limit = self.LIMITS.get(user_tier, self.LIMITS["standard"])
return daily_cost < limit["daily_cost_usd"]

LLM Observability

You cannot improve what you cannot observe. LLM systems need more than uptime monitoring - you need to track response quality, token usage patterns, model selection distribution, and user satisfaction signals.

import uuid
from datetime import datetime, timezone
import json


class LLMTracer:
"""
Full-fidelity tracing for LLM interactions.
Stores every interaction with enough detail to:
- Debug quality issues
- Compute cost attribution
- Build training data for fine-tuning
- Monitor for policy violations
"""

def __init__(self, storage_backend):
self.storage = storage_backend

def trace_interaction(
self,
conversation_id: str,
user_id: str,
user_message: str,
system_prompt: str,
retrieved_chunks: list,
model: str,
model_response: str,
input_tokens: int,
output_tokens: int,
latency_seconds: float,
routing_reason: str,
user_tier: str,
) -> str:
"""
Record a complete LLM interaction.
Returns trace_id for later retrieval.
"""
trace_id = str(uuid.uuid4())

trace = {
"trace_id": trace_id,
"conversation_id": conversation_id,
"user_id": user_id,
"timestamp": datetime.now(timezone.utc).isoformat(),

# The full prompt context (essential for debugging)
"user_message": user_message,
"system_prompt_hash": hashlib.md5(system_prompt.encode()).hexdigest(),
"system_prompt_length": len(system_prompt),

# RAG context
"retrieved_chunk_count": len(retrieved_chunks),
"retrieved_sources": [c.get("source") for c in retrieved_chunks],

# Model selection
"model": model,
"routing_reason": routing_reason,
"user_tier": user_tier,

# Response
"response": model_response,
"response_length": len(model_response),

# Performance and cost
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"latency_seconds": latency_seconds,
"estimated_cost_usd": CostTracker({}).compute_cost(
model, input_tokens, output_tokens
),
}

self.storage.write(trace_id, json.dumps(trace))
return trace_id

def add_feedback(
self,
trace_id: str,
feedback_type: str, # "thumbs_up", "thumbs_down", "escalated"
feedback_text: str = "",
) -> None:
"""Attach user feedback to a trace. Used for quality monitoring."""
feedback = {
"trace_id": trace_id,
"feedback_type": feedback_type,
"feedback_text": feedback_text,
"recorded_at": datetime.now(timezone.utc).isoformat(),
}
self.storage.append_feedback(trace_id, json.dumps(feedback))


class LLMQualityMonitor:
"""
Automated quality monitoring for LLM responses.
Detects hallucination, refusals, very short responses, and off-topic answers.
"""

def analyze_response_quality(
self,
user_message: str,
model_response: str,
retrieved_chunks: list,
) -> dict:
"""
Quick automated quality checks.
Results feed into monitoring dashboards and alert on degradation.
"""
response_lower = model_response.lower()

# Detect refusals (model refuses to answer)
refusal_phrases = [
"i cannot", "i'm unable to", "i don't have information",
"i apologize", "as an ai", "i'm sorry, but",
]
is_refusal = any(phrase in response_lower for phrase in refusal_phrases)

# Detect very short responses (likely low quality)
is_too_short = len(model_response.split()) < 20

# Check if response cites context sources (good sign for RAG)
source_count = len(retrieved_chunks)
response_word_set = set(model_response.lower().split())
context_words = set()
for chunk in retrieved_chunks:
context_words.update(chunk["content"].lower().split())

# Overlap between response and context (low overlap = potential hallucination)
response_context_overlap = (
len(response_word_set & context_words) / max(len(response_word_set), 1)
)

return {
"is_refusal": is_refusal,
"is_too_short": is_too_short,
"response_context_overlap": response_context_overlap,
"potential_hallucination": response_context_overlap < 0.15 and source_count > 0,
"response_word_count": len(model_response.split()),
}

Input and Output Guardrails

import re


class InputGuardrails:
"""Detect and handle problematic inputs before they reach the LLM."""

PROMPT_INJECTION_PATTERNS = [
r"ignore (all )?previous instructions",
r"disregard (your )?system prompt",
r"you are now (a )?different (ai|assistant|bot)",
r"jailbreak",
r"pretend (you are|to be)",
r"act as (if )?you (are|were) (not|no longer)",
]

def check_prompt_injection(self, text: str) -> bool:
"""Returns True if prompt injection is detected."""
text_lower = text.lower()
return any(
re.search(pattern, text_lower)
for pattern in self.PROMPT_INJECTION_PATTERNS
)

def detect_pii(self, text: str) -> dict:
"""Detect PII in user input for logging and compliance."""
pii_found = {}

# Credit card numbers
cc_pattern = r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b"
if re.search(cc_pattern, text):
pii_found["credit_card"] = True

# SSN
ssn_pattern = r"\b\d{3}[-]?\d{2}[-]?\d{4}\b"
if re.search(ssn_pattern, text):
pii_found["ssn"] = True

# Email
email_pattern = r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"
if re.search(email_pattern, text):
pii_found["email"] = True

return pii_found

def sanitize(self, text: str) -> str:
"""Remove or mask PII before logging."""
# Mask credit card numbers
text = re.sub(
r"\b(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})[\s-]?(\d{4})\b",
r"\1-XXXX-XXXX-\4",
text,
)
# Mask SSN
text = re.sub(
r"\b(\d{3})[-]?(\d{2})[-]?(\d{4})\b",
r"\1-XX-XXXX",
text,
)
return text

:::danger Hallucination in RAG Systems

RAG reduces but does not eliminate hallucination. A common failure: the retrieved chunks do not contain the answer, but the LLM generates a plausible-sounding but incorrect answer anyway, as if the context supported it. Users treat LLM responses as authoritative, so a confident hallucination is worse than an honest "I don't know."

Solution: implement a faithfulness check. After generating a response, use a smaller LLM to verify that every claim in the response is supported by the retrieved context. Alternatively, use a citation-style prompt that requires the model to cite the specific source for each claim - claims without citations are flagged. When the faithfulness score is low, override the response with "I don't have reliable information on that" and offer human escalation. :::

:::warning LLM Provider Outages

LLM API providers (OpenAI, Anthropic) have outages. Building a product that depends on a single provider with no fallback is a reliability risk. In October 2023, OpenAI experienced multiple service disruptions affecting thousands of businesses.

Solution: implement a provider fallback chain. If the primary provider (e.g., OpenAI) returns an error or exceeds your latency threshold, automatically retry with the secondary provider (e.g., Anthropic). For critical paths, maintain a self-hosted fallback model (Llama 3, Mistral) that degrades quality but maintains availability. Use circuit breakers: if OpenAI returns 5 errors in 10 seconds, open the circuit and route 100% to Anthropic until OpenAI recovers. :::


Interview Q&A

Q1: How do you design a RAG system to minimize hallucination in an LLM product?

RAG reduces hallucination by grounding the LLM's response in retrieved documents. But the quality of the retrieval determines the quality of the grounding. Weak retrieval that returns irrelevant chunks can actually make hallucination worse - the model has context that does not answer the question, so it ignores the context and generates from its parameters.

The key design decisions: (1) Chunk size - chunks that are too large dilute relevance; chunks that are too small lose context. 256-512 tokens with 10-20% overlap is the starting point; tune based on your document type. (2) Hybrid retrieval - BM25 for exact keyword matches (model numbers, product names), dense retrieval for semantic similarity; fuse with RRF. (3) Reranking - use a cross-encoder on the top 20 retrieved chunks to select the top 5 for the prompt; this significantly improves precision. (4) Prompt design - explicitly instruct the model to cite its source and to say "I don't know" if the context does not support the answer. (5) Post-hoc faithfulness check - use a smaller model to verify each claim in the response against the retrieved context.


Q2: How do you implement cost-effective LLM routing in production?

LLM routing assigns incoming requests to different model tiers based on estimated complexity. The routing logic typically combines: message length (longer = more complex), keyword signals (explanatory keywords suggest reasoning required), conversation depth (turn 10 is more context-dependent than turn 1), and user tier (paying users get better models).

Start with simple heuristics and measure the quality difference. If GPT-4o-mini handles 70% of requests with quality indistinguishable from GPT-4o (as judged by thumbs up/down feedback), route 70% to mini and 30% to full. The cost reduction is immediate. Over time, you can train a classifier on historical (message, model_needed) pairs to make routing more accurate.

The numbers: GPT-4o-mini is approximately 16x cheaper than GPT-4o. If you can route 80% of traffic to mini with acceptable quality, your average cost per token drops by 10x. On a 45,000/dayLLMbill,thisis45,000/day LLM bill, this is 40,500/day in savings.


Q3: What observability do you need for an LLM-powered product in production?

LLM observability requires three categories of metrics that traditional software monitoring does not cover.

Quality metrics: thumbs up/down rates per model per feature, escalation rates (user asks for human agent - implies dissatisfaction), refusal rates (model refuses to answer - may indicate prompt issues), response length distribution (very short = low quality, very long = likely verbose), and faithfulness scores from automated checks.

Cost and efficiency metrics: token usage per model per user tier, cost per conversation, routing distribution (what fraction goes to each model tier), cache hit rates if you have semantic caching.

Operational metrics: time to first token (TTFT), total generation latency, provider error rates, circuit breaker state, streaming throughput (tokens per second).

Tools: LangSmith (LangChain's native tracing), Langfuse (open source), Helicone (provider-agnostic), or a custom Prometheus + Grafana stack. At minimum, log every interaction with: user_id, conversation_id, model used, token counts, latency, user feedback. This is your debugging and fine-tuning dataset.


Q4: How do you handle LLM provider outages gracefully?

Build a provider fallback chain with circuit breakers. Configure a primary provider (e.g., Anthropic Claude for quality), a secondary (e.g., OpenAI GPT-4o), and a tertiary self-hosted fallback (e.g., Llama 3.1 70B on your own GPU cluster for guaranteed availability).

The circuit breaker monitors each provider: if error rate exceeds 5% in a 60-second window, open the circuit for that provider and route 100% to the next provider in the chain. Test your fallback chain monthly - failover that is never tested will fail when you need it.

For the self-hosted fallback: vLLM on 2 A100 GPUs can serve a 70B model at 30-50 tokens/second. This is slower than cloud APIs but guaranteed available. The response quality will be lower, so implement a user-facing message: "We are experiencing high demand; responses may be slower than usual." This preserves availability without misleading users.


Q5: How do you build a prompt management system for a production LLM product?

Prompts are code. Treat them with the same engineering rigor: versioning, testing, staged rollout, and rollback capability.

The prompt management stack: (1) A prompt registry that stores versioned prompt templates with their associated metadata (intended model, temperature, max_tokens, created_by, created_at); (2) A/B testing capability - route X% of traffic to prompt_v1 and (1-X)% to prompt_v2, measure quality metrics, promote the winner; (3) Evaluation harness - a set of (input, expected_output) pairs that you run against every new prompt version before deployment. Automated metrics: ROUGE for factual questions with expected answers, embedding similarity for paraphrase tasks, Claude-as-judge for open-ended quality.

The deployment flow: a data scientist edits the prompt template, runs the evaluation harness against 100 test cases, reviews the report, and submits a PR to the prompt registry. A reviewer approves. The new version is deployed to 5% of traffic via A/B test. After 24 hours, if quality metrics are stable, the version is promoted to 100%. If quality degrades, the previous version is automatically restored.


Summary

An LLM-powered product architecture requires more than calling an API. The serving stack wraps the LLM with input guardrails (prompt injection, PII detection), RAG retrieval (hybrid BM25 + dense, cross-encoder reranking), LLM routing (cheap model for simple queries, expensive for complex), streaming SSE responses, output guardrails (faithfulness checks, PII scrubbing), and a full observability stack. Cost management - routing, caching, and prompt compression - is not optional; at scale, LLM costs consume company economics without deliberate management. Prompt management requires versioning, evaluation harnesses, and staged rollout. Provider fallback with circuit breakers ensures availability when any single provider experiences outages. The difference between an LLM demo and an LLM product is this surrounding infrastructure.

© 2026 EngineersOfAI. All rights reserved.