:::tip 🎮 Interactive Playground Visualize this concept: Try the Latency vs Throughput demo on the EngineersOfAI Playground - no code required. :::
Handling LLM Latency
The Spinner That Killed the Product
A B2B SaaS startup had built an excellent AI feature - a proposal generator that could draft a sales proposal in about 12 seconds. The model quality was genuinely impressive. Beta users loved the output. The startup was confident they had something, and they launched to their full user base on a Tuesday morning with a press release and a Slack announcement to every customer success manager in the company.
The feature was disabled within three weeks. Not because the proposals were bad. Because users could not psychologically endure the wait. Twelve seconds of a blue spinner - no feedback, no progress, no signal that anything was happening - felt like a frozen page. Users clicked "Generate" multiple times, creating duplicate requests. Some refreshed the browser mid-generation, canceling the request and starting over from scratch. Support tickets poured in: "The AI proposal generator is broken." It was not broken. It was slow, and slow without feedback is indistinguishable from broken. The team had optimized model quality and ignored perceived latency entirely. They paid for it with the feature.
When they rebuilt it six weeks later, they added streaming (first words in 600ms), a skeleton screen during TTFT, a status indicator cycling through "Analyzing your inputs... Drafting introduction... Writing value proposition...", and an estimated time display ("~12 seconds"). They showed a section counter: "3 of 5 sections complete." They added a cancel button. Adoption went from near-zero to 73% of eligible users. Same model. Same actual latency. Radically different product. The lesson embedded itself in the founding team's institutional memory: latency is as much a design problem as it is an infrastructure problem.
This lesson is everything they learned - and everything the field has learned about the psychology and engineering of LLM latency. LLM latency is fundamentally different from API latency, and handling it requires fundamentally different techniques.
The Psychology of Waiting
Humans are not patient-but-accurate latency estimators. We are terrible at judging how long we've waited, and our subjective experience of time is heavily influenced by feedback. Jakob Nielsen's foundational research (1993, still accurate today) established three response time thresholds that remain the standards for UI design:
- 0.1 seconds: feels instantaneous - no feedback indicator needed
- 1 second: noticeable delay, but user maintains focus and flow
- 10 seconds: user's attention wanders; requires constant progress feedback
- >10 seconds: user likely abandons the task or assumes failure
LLM calls land squarely in the "10 seconds" zone, often exceeding it. The good news: the same research shows that perceived duration is dramatically reduced by feedback. A 10-second wait with meaningful progress indicators is consistently perceived as shorter than a 5-second wait with a blank spinner. This is not a placebo - it is how human cognition works.
The key insight is that perceived latency and actual latency are different engineering targets. You cannot change the actual latency without changing the model or infrastructure - a project that may take weeks or months. You can always change the perceived latency through design - a project that takes hours.
Latency Anatomy: Where Time Actually Goes
Before applying techniques, you need to know where time is actually spent. LLM request latency has multiple distinct phases, each with different optimization levers:
| Phase | Typical Duration | Description | Optimization Lever |
|---|---|---|---|
| Auth + validation | 5–50ms | JWT verification, rate limit check | Cache tokens, optimize middleware |
| Context load | 10–200ms | Load conversation history from DB | Redis caching, pagination |
| RAG retrieval | 50–500ms | Embedding + vector search + rerank | Parallel fetch, index optimization |
| Prompt build | 1–10ms | Assemble system + messages | No-op unless very complex |
| Anthropic TTFT | 200–3000ms | Input token processing time | Prompt caching, shorter prompts |
| Token generation | 3–30s | Model generates output tokens | Model selection, streaming |
| Total | 300ms–35s | Sum of all phases | All of the above |
The most important insight from this table: RAG retrieval and Anthropic TTFT are often larger than expected, and both can be attacked in parallel. The second most important insight: phases that appear sequential are often parallelizable - auth can overlap with context load, RAG retrieval can start before auth finishes.
Technique 1: Optimistic UI Updates
Show the user's own message instantly - before the network request even completes. This makes the interface feel responsive even when the AI takes 12 seconds to respond. The psychological effect is significant: users who see their message appear immediately feel like "the system is working, just processing my request" rather than "the system might be broken."
// hooks/useOptimisticChat.ts
import { useState, useCallback } from "react";
interface Message {
id: string;
role: "user" | "assistant";
content: string;
status: "sent" | "pending" | "streaming" | "complete" | "error";
createdAt: number;
}
export function useOptimisticChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [isProcessing, setIsProcessing] = useState(false);
const sendMessage = useCallback(async (
content: string,
streamFn: (
content: string,
history: Message[],
onToken: (t: string) => void,
onComplete: () => void,
) => Promise<void>
) => {
if (isProcessing) return;
const userId = crypto.randomUUID();
const assistantId = crypto.randomUUID();
const now = Date.now();
// INSTANT: Add user message before any network call
const userMsg: Message = {
id: userId,
role: "user",
content,
status: "sent",
createdAt: now,
};
// INSTANT: Add assistant placeholder - shows skeleton
const assistantPlaceholder: Message = {
id: assistantId,
role: "assistant",
content: "",
status: "pending",
createdAt: now,
};
setMessages((prev) => [...prev, userMsg, assistantPlaceholder]);
setIsProcessing(true);
// Now make the actual network call
try {
await streamFn(
content,
messages,
(token) => {
setMessages((prev) =>
prev.map((m) =>
m.id === assistantId
? { ...m, content: m.content + token, status: "streaming" }
: m
)
);
},
() => {
setMessages((prev) =>
prev.map((m) =>
m.id === assistantId ? { ...m, status: "complete" } : m
)
);
}
);
} catch {
setMessages((prev) =>
prev.map((m) =>
m.id === assistantId
? { ...m, status: "error", content: "" }
: m
)
);
} finally {
setIsProcessing(false);
}
}, [messages, isProcessing]);
return { messages, isProcessing, sendMessage };
}
:::tip Disable the Send Button Correctly While processing, disable the send button but keep the input focused. Users with fast fingers habitually hit Enter again - if the input clears and focus disappears, it feels like the app crashed. Keep the field visible, disabled, and with a clear visual indicator. Re-enable and refocus as soon as the response completes. :::
Technique 2: Skeleton Screens vs Spinners
Skeleton screens - grey placeholder shapes that mimic the structure of the expected content - outperform generic spinners on perceived speed in every UX study. A/B tests consistently show 25-35% improvement in perceived speed when skeleton screens replace spinners. The reason: skeleton screens communicate "content is loading in this specific shape" rather than "something is happening, unknown outcome."
// components/SkeletonLoader.tsx
export function MessageSkeleton() {
return (
<div className="flex gap-3 mb-4 animate-pulse" aria-label="Loading response">
{/* Avatar placeholder */}
<div className="w-8 h-8 rounded-full bg-gray-200 flex-shrink-0" />
{/* Content placeholder - mimics actual message structure */}
<div className="flex-1 space-y-2 pt-1">
<div className="h-3 bg-gray-200 rounded-full w-4/5" />
<div className="h-3 bg-gray-200 rounded-full w-3/4" />
<div className="h-3 bg-gray-200 rounded-full w-5/6" />
<div className="h-3 bg-gray-200 rounded-full w-2/3" />
</div>
</div>
);
}
// Alternative: shimmer animation (more polished)
export function ShimmerSkeleton() {
return (
<div className="flex gap-3 mb-4">
<div className="w-8 h-8 rounded-full skeleton-shimmer flex-shrink-0" />
<div className="flex-1 space-y-2 pt-1">
{[80, 70, 90, 50].map((width, i) => (
<div
key={i}
className="h-3 rounded-full skeleton-shimmer"
style={{ width: `${width}%` }}
/>
))}
</div>
</div>
);
}
// The shimmer CSS (add to global styles):
// .skeleton-shimmer {
// background: linear-gradient(90deg, #f0f0f0 25%, #e0e0e0 50%, #f0f0f0 75%);
// background-size: 200% 100%;
// animation: shimmer 1.5s infinite;
// }
// @keyframes shimmer {
// 0% { background-position: 200% 0; }
// 100% { background-position: -200% 0; }
// }
// Context-aware message state renderer
export function AssistantMessage({
content,
status,
}: {
content: string;
status: "pending" | "streaming" | "complete" | "error";
}) {
if (status === "pending") {
// Show skeleton during TTFT gap
return <MessageSkeleton />;
}
if (status === "error") {
return (
<div className="flex gap-3 mb-4">
<div className="w-8 h-8 rounded-full bg-red-100 flex items-center justify-center text-red-500 text-xs flex-shrink-0">!</div>
<div className="bg-red-50 rounded-xl px-4 py-3 text-red-700 text-sm">
Failed to generate response. Please try again.
</div>
</div>
);
}
return (
<div className="flex gap-3 mb-4">
<div className="w-8 h-8 rounded-full bg-blue-600 flex items-center justify-center text-white text-xs font-bold flex-shrink-0">
AI
</div>
<div className="bg-gray-100 rounded-xl px-4 py-3 text-sm text-gray-800 leading-relaxed">
{content}
{status === "streaming" && (
<span className="inline-block w-0.5 h-4 bg-gray-600 ml-0.5 animate-pulse">▌</span>
)}
</div>
</div>
);
}
Technique 3: Thinking Phase Indicators
Show contextual status messages during the TTFT gap that communicate what the AI is doing. This reduces anxiety (users know something is happening) and sets timing expectations. The key is making the messages domain-relevant, not generic. "Analyzing your request..." is better than "Loading...". "Searching your documents..." is far better than "Please wait."
// components/ThinkingIndicator.tsx
import { useState, useEffect } from "react";
type Phase =
| "analyzing"
| "searching"
| "reading"
| "drafting"
| "done";
const PHASE_MESSAGES: Record<Phase, string> = {
analyzing: "Analyzing your request...",
searching: "Searching knowledge base...",
reading: "Reading relevant context...",
drafting: "Drafting response...",
done: "",
};
// Phase duration estimates (ms) - advance through stages during TTFT
const PHASE_TIMINGS: Partial<Record<Phase, number>> = {
analyzing: 400,
searching: 800,
reading: 1200,
drafting: Infinity, // Stay here until stream starts
};
export function ThinkingIndicator({
isActive,
hasRAG = false,
hasDocuments = false,
}: {
isActive: boolean;
hasRAG?: boolean;
hasDocuments?: boolean;
}) {
const [phase, setPhase] = useState<Phase>("analyzing");
useEffect(() => {
if (!isActive) {
setPhase("analyzing"); // Reset for next time
return;
}
const sequence: Phase[] = hasRAG || hasDocuments
? ["analyzing", "searching", "reading", "drafting"]
: ["analyzing", "drafting"];
const timers: ReturnType<typeof setTimeout>[] = [];
for (let i = 1; i < sequence.length; i++) {
const prevPhase = sequence[i - 1];
const delay = PHASE_TIMINGS[prevPhase] ?? 500;
if (delay === Infinity) break;
const cumulativeDelay = sequence.slice(0, i).reduce(
(acc, p) => acc + (PHASE_TIMINGS[p] ?? 500),
0
);
const t = setTimeout(() => {
setPhase(sequence[i]);
}, cumulativeDelay);
timers.push(t);
}
return () => timers.forEach(clearTimeout);
}, [isActive, hasRAG, hasDocuments]);
if (!isActive || phase === "done") return null;
return (
<div className="flex items-center gap-2 text-sm text-gray-500 mb-3 ml-11">
<div className="flex gap-1">
{[0, 1, 2].map((i) => (
<div
key={i}
className="w-1.5 h-1.5 rounded-full bg-blue-400"
style={{
animation: `bounce 1.2s ease-in-out ${i * 0.2}s infinite`,
}}
/>
))}
</div>
<span className="transition-all duration-300">{PHASE_MESSAGES[phase]}</span>
</div>
);
}
:::info Tailor Phase Messages to Your Domain A customer support bot should say "Looking up your account..." not "Searching knowledge base." A coding assistant should say "Analyzing your code..." A legal document tool should say "Reviewing the clause..." Domain-specific phase messages make the AI feel capable and intentional - generic ones make it feel like a spinner with words attached. :::
Technique 4: Latency Budget Tracking
Build observability into your latency - track each phase separately so you know where time is actually going before you optimize. Engineers who skip this step consistently optimize the wrong thing. A team that spends two weeks reducing model TTFT by 200ms may have had a 400ms RAG bottleneck they never noticed.
# latency/tracker.py
import anthropic
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class LatencySpan:
name: str
start_ms: float
end_ms: Optional[float] = None
@property
def duration_ms(self) -> float:
if self.end_ms is None:
return 0.0
return self.end_ms - self.start_ms
@dataclass
class RequestLatencyBreakdown:
request_id: str
spans: list[LatencySpan] = field(default_factory=list)
ttft_ms: Optional[float] = None
total_ms: Optional[float] = None
def start_span(self, name: str) -> LatencySpan:
span = LatencySpan(name=name, start_ms=time.monotonic() * 1000)
self.spans.append(span)
return span
def end_span(self, span: LatencySpan) -> None:
span.end_ms = time.monotonic() * 1000
def to_dict(self) -> dict:
return {
"request_id": self.request_id,
"ttft_ms": self.ttft_ms,
"total_ms": self.total_ms,
"spans": {s.name: round(s.duration_ms) for s in self.spans},
}
async def handle_request_with_tracking(
user_id: str,
tenant_id: str,
messages: list[dict],
system: str,
) -> tuple[str, RequestLatencyBreakdown]:
"""
Complete request handler with per-phase latency tracking.
Use the breakdown to identify bottlenecks and guide optimizations.
"""
import uuid
bd = RequestLatencyBreakdown(request_id=str(uuid.uuid4()))
request_start = time.monotonic() * 1000
# Phase 1: Auth
auth_span = bd.start_span("auth")
# await validate_user_and_tenant(user_id, tenant_id)
await asyncio.sleep(0.005) # Simulate ~5ms auth
bd.end_span(auth_span)
# Phase 2: Context load (run in parallel with RAG)
context_span = bd.start_span("context_load")
rag_span = bd.start_span("rag_retrieval")
# CRITICAL OPTIMIZATION: Parallelize independent pre-LLM steps
async def load_context():
await asyncio.sleep(0.05) # Simulate 50ms
return {"conversation_id": "conv_123", "history_length": 5}
async def rag_retrieve():
await asyncio.sleep(0.15) # Simulate 150ms vector search
return ["doc_chunk_1", "doc_chunk_2"]
context_result, rag_results = await asyncio.gather(
load_context(),
rag_retrieve(),
)
bd.end_span(context_span)
bd.end_span(rag_span)
# Phase 3: Prompt build
prompt_span = bd.start_span("prompt_build")
rag_context = "\n".join(rag_results)
enriched_system = f"{system}\n\nRelevant context:\n{rag_context}"
bd.end_span(prompt_span)
# Phase 4: LLM call - track TTFT separately
llm_span = bd.start_span("llm_call")
client = anthropic.AsyncAnthropic()
first_token_received = False
response_text = ""
llm_start_ms = time.monotonic() * 1000
async with client.messages.stream(
model="claude-opus-4-6",
max_tokens=2048,
system=enriched_system,
messages=messages,
) as stream:
async for text in stream.text_stream:
if not first_token_received:
first_token_received = True
ttft_ms = time.monotonic() * 1000 - llm_start_ms
bd.ttft_ms = round(ttft_ms)
logger.info(
f"TTFT={ttft_ms:.0f}ms "
f"auth={auth_span.duration_ms:.0f}ms "
f"rag={rag_span.duration_ms:.0f}ms "
f"context={context_span.duration_ms:.0f}ms"
)
response_text += text
bd.end_span(llm_span)
bd.total_ms = round(time.monotonic() * 1000 - request_start)
# Log full breakdown for monitoring
logger.info(f"Request complete: {bd.to_dict()}")
return response_text, bd
Technique 5: Model Selection by Latency Budget
Different queries have different latency requirements. Route to the appropriate model based on query complexity and the user's acceptable wait time. This is not about always using the cheapest model - it is about using the right tool for the job.
| Model | TTFT p50 | TTFT p99 | Best Use Case | Relative Cost |
|---|---|---|---|---|
| claude-haiku-4-5-20251001 | ~300ms | ~800ms | Simple Q&A, classification, short summaries | 1x |
| claude-sonnet-4-6 | ~700ms | ~2000ms | Most tasks - good quality, reasonable speed | 5x |
| claude-opus-4-6 | ~1500ms | ~4000ms | Complex reasoning, long documents, highest stakes | 15x |
# latency/model_router.py
import anthropic
from typing import Optional
import re
# Latency characteristics (approximate TTFT for ~500-token output)
MODEL_LATENCY_PROFILE = {
"claude-haiku-4-5-20251001": {"ttft_ms_p50": 300, "ttft_ms_p99": 800},
"claude-sonnet-4-6": {"ttft_ms_p50": 700, "ttft_ms_p99": 2000},
"claude-opus-4-6": {"ttft_ms_p50": 1500, "ttft_ms_p99": 4000},
}
def classify_query_complexity(query: str) -> str:
"""
Classify query complexity to select appropriate model.
Returns: "simple", "moderate", or "complex"
In production: supplement with a lightweight Haiku classifier that
returns a complexity score. The heuristic below handles 80% of cases.
"""
word_count = len(query.split())
# Simple heuristics
simple_patterns = [
r"^(hi|hello|hey|thanks|thank you)[\s!.]*$",
r"^what('s| is) (the )?time",
r"^(yes|no|okay|sure|ok)[\s.]*$",
]
for pattern in simple_patterns:
if re.match(pattern, query.lower()):
return "simple"
# Short queries with simple intent
if word_count < 10:
return "simple"
# Complex signals - these warrant Opus
complex_signals = [
"analyze", "compare", "evaluate", "research", "explain in detail",
"write a comprehensive", "step by step", "pros and cons",
"legal", "medical", "financial advice", "strategy",
]
if word_count > 50 or any(sig in query.lower() for sig in complex_signals):
return "complex"
return "moderate"
def select_model_for_latency(
query: str,
max_ttft_ms: Optional[int] = None,
quality_priority: bool = True,
) -> str:
"""
Select model based on query complexity and latency constraint.
Args:
query: The user's query
max_ttft_ms: Maximum acceptable TTFT in ms (None = no constraint)
quality_priority: If True, prefer higher quality within latency budget
Returns:
Model string to use for this request
"""
complexity = classify_query_complexity(query)
# Latency-constrained selection
if max_ttft_ms is not None:
# Select fastest model that fits in budget
for model, profile in MODEL_LATENCY_PROFILE.items():
if profile["ttft_ms_p99"] <= max_ttft_ms:
return model
# Nothing fits - use fastest and accept the latency
return "claude-haiku-4-5-20251001"
# Quality-priority selection based on complexity
if quality_priority:
model_map = {
"simple": "claude-haiku-4-5-20251001",
"moderate": "claude-opus-4-6",
"complex": "claude-opus-4-6",
}
else:
# Speed priority
model_map = {
"simple": "claude-haiku-4-5-20251001",
"moderate": "claude-haiku-4-5-20251001",
"complex": "claude-opus-4-6",
}
return model_map[complexity]
async def adaptive_model_request(
query: str,
messages: list[dict],
max_ttft_ms: Optional[int] = None,
) -> tuple[str, str]:
"""
Route to appropriate model and return (response, model_used).
Automatically downgrades model if primary exceeds latency budget.
"""
import asyncio
client = anthropic.AsyncAnthropic()
selected_model = select_model_for_latency(query, max_ttft_ms)
try:
response = await asyncio.wait_for(
client.messages.create(
model=selected_model,
max_tokens=2048,
messages=messages,
),
timeout=30.0,
)
return response.content[0].text, selected_model
except asyncio.TimeoutError:
# Model timed out - try faster fallback
fallback = "claude-haiku-4-5-20251001"
if selected_model == fallback:
raise # Already using fastest model
print(f"Timeout on {selected_model}, falling back to {fallback}")
response = await client.messages.create(
model=fallback,
max_tokens=2048,
messages=messages,
)
return response.content[0].text, fallback
Technique 6: Prompt Caching for TTFT Reduction
Anthropic's prompt caching is one of the highest-impact TTFT optimizations available. When you have a long system prompt (instructions, persona, knowledge base, RAG context), caching it means the model skips re-processing those tokens on every request. For a 10,000-token system prompt, this saves roughly 900–1200ms of TTFT on every cached request.
How it works: The first request with a cache_control: ephemeral block pays full processing cost. Subsequent requests within 5 minutes skip that block's token processing and pay 10% of the input cost for cached tokens. The TTFT reduction is proportional to the share of input tokens that are cached.
# latency/prompt_caching.py
import anthropic
from typing import Optional
def build_cached_request(
user_messages: list[dict],
static_system_content: str,
dynamic_context: Optional[str] = None,
) -> dict:
"""
Build an Anthropic API request with prompt caching.
Caching strategy:
- Static content (base instructions, persona): cache_control ephemeral
- Dynamic content (RAG results, conversation context): NOT cached
- User messages: NOT cached (change every request)
Cost: First request pays full input price.
Subsequent requests pay 10% for cached tokens.
TTFT: Subsequent requests skip cached token processing → faster TTFT.
Cache TTL: Anthropic caches for 5 minutes minimum (ephemeral).
When to use:
- System prompt > 1000 tokens: big win (saves 90% input cost on cache hits)
- High request volume (>10 req/min per user): cache hit rate justifies setup
- System prompt is stable across requests: perfect for caching
"""
system_content = [
{
"type": "text",
"text": static_system_content,
# Mark this block for caching
"cache_control": {"type": "ephemeral"},
}
]
if dynamic_context:
# Dynamic context is NOT cached - it changes per request
system_content.append({
"type": "text",
"text": f"\n\nCurrent context:\n{dynamic_context}",
# No cache_control here - will be freshly processed each time
})
return {
"system": system_content,
"messages": user_messages,
}
async def request_with_caching(
user_messages: list[dict],
static_instructions: str,
rag_context: str,
model: str = "claude-opus-4-6",
) -> tuple[str, dict]:
"""
Make a cached request and return (response, usage_with_cache_stats).
Monitor cache_read_input_tokens to confirm caching is working.
"""
client = anthropic.AsyncAnthropic()
request_params = build_cached_request(
user_messages=user_messages,
static_system_content=static_instructions,
dynamic_context=rag_context,
)
response = await client.messages.create(
model=model,
max_tokens=2048,
**request_params,
)
usage = {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"cache_creation_input_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
"cache_read_input_tokens": getattr(response.usage, "cache_read_input_tokens", 0),
}
# Log cache effectiveness
total_input = usage["input_tokens"] + usage["cache_read_input_tokens"]
cache_hit_rate = usage["cache_read_input_tokens"] / max(total_input, 1)
print(f"Cache hit rate: {cache_hit_rate:.1%} | Cache read tokens: {usage['cache_read_input_tokens']}")
return response.content[0].text, usage
:::warning Prompt Caching Has a 5-Minute TTL Anthropic's ephemeral cache expires after 5 minutes of inactivity. If your users send one message, wait 6 minutes, then send another - the second request pays full processing cost. For background batch jobs with long pauses between requests, caching provides no benefit. Caching is most valuable for conversational flows where requests arrive within minutes of each other, and for high-volume APIs serving many users against a shared system prompt. :::
Technique 7: Progressive Summary for Long Responses
For queries that require long responses, show a quick summary first, then stream the full response. Users see something valuable in under 2 seconds, satisfying the "is this working?" check, while the full response streams in behind it.
# latency/progressive_response.py
import anthropic
import asyncio
import json
from typing import AsyncIterator
async def progressive_response(
messages: list[dict],
system: str,
use_summary: bool = True,
) -> AsyncIterator[str]:
"""
Generate a quick summary first (using Haiku), then stream the full response.
Users see value in ~1-2 seconds without waiting for the full generation.
Event types emitted:
- summary: 1-2 sentence TL;DR (arrives fast via Haiku)
- full_start: signals beginning of full response
- text: full response tokens (streaming from Opus)
- stream_end: completion
"""
client = anthropic.AsyncAnthropic()
if use_summary:
# Phase 1: Quick TL;DR using Haiku (fast + cheap)
# Start Opus call in the background at the same time
summary_response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=80,
system=(
"Respond with a 1-2 sentence summary of what a comprehensive answer "
"to this question would cover. Start with 'In short:'. Be direct."
),
messages=messages,
)
summary = summary_response.content[0].text
yield f"data: {json.dumps({'type': 'summary', 'text': summary})}\n\n"
# Phase 2: Full response streaming with Opus (higher quality)
yield f"data: {json.dumps({'type': 'full_start'})}\n\n"
async with client.messages.stream(
model="claude-opus-4-6",
max_tokens=4096,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'type': 'text', 'text': text})}\n\n"
yield f"data: {json.dumps({'type': 'stream_end'})}\n\n"
yield "data: [DONE]\n\n"
Technique 8: SSE Streaming with FastAPI
Server-Sent Events (SSE) is the standard transport for streaming LLM responses. It is simpler than WebSockets for unidirectional server-to-client streaming, and it works through proxies and load balancers without special configuration.
# api/stream_endpoint.py
import anthropic
import asyncio
import json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
client = anthropic.AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
conversation_id: str
system: str = "You are a helpful assistant."
async def stream_claude_response(
messages: list[dict],
system: str,
model: str = "claude-opus-4-6",
):
"""
Async generator that yields SSE-formatted chunks.
Handles connection cleanup if client disconnects mid-stream.
"""
try:
# Send initial heartbeat so client knows we're alive
yield f"data: {json.dumps({'type': 'start'})}\n\n"
async with client.messages.stream(
model=model,
max_tokens=4096,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'type': 'text', 'text': text})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
yield "data: [DONE]\n\n"
except asyncio.CancelledError:
# Client disconnected - clean up and stop generating
yield f"data: {json.dumps({'type': 'cancelled'})}\n\n"
return
except anthropic.APIStatusError as e:
error_payload = {
"type": "error",
"code": e.status_code,
"message": "The AI service encountered an error. Please try again.",
}
yield f"data: {json.dumps(error_payload)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/api/chat/stream")
async def chat_stream(request: Request, body: ChatRequest):
"""
Streaming chat endpoint using Server-Sent Events.
Client subscribes via EventSource or fetch with ReadableStream.
"""
# Load conversation history (from Redis/DB in production)
messages = [{"role": "user", "content": body.message}]
return StreamingResponse(
stream_claude_response(messages, body.system),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
:::danger Nginx Buffering Kills Streaming
By default, Nginx buffers responses before forwarding to clients - which breaks SSE completely. The response arrives all at once after the LLM finishes, defeating the entire purpose of streaming. Always set proxy_buffering off in your Nginx config for streaming endpoints, and include the X-Accel-Buffering: no header in the response. The header approach works even when you don't control the Nginx config directly.
:::
Production Monitoring for Latency
Latency optimization without measurement is guesswork. Build a rolling-window monitor into your application that tracks TTFT and total latency at p50, p95, and p99. Alert on p95 - that is the level at which 1 in 20 users is experiencing unacceptable latency.
# latency/monitoring.py
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
@dataclass
class LatencySample:
timestamp: float
ttft_ms: float
total_ms: float
model: str
tokens_output: int
class LatencyMonitor:
"""
Rolling window latency monitor.
Alert when p95 TTFT or total latency exceeds SLA thresholds.
In production: emit these metrics to Prometheus/Datadog/Grafana.
Set up alerts on p95_ttft_ms > 3000 and p99_total_ms > 30000.
"""
def __init__(
self,
window_size: int = 1000,
ttft_sla_ms: float = 3000, # p95 TTFT SLA
total_sla_ms: float = 30000, # p95 total SLA
):
self.window_size = window_size
self.ttft_sla_ms = ttft_sla_ms
self.total_sla_ms = total_sla_ms
self._samples: deque[LatencySample] = deque(maxlen=window_size)
def record(
self,
ttft_ms: float,
total_ms: float,
model: str,
tokens_output: int,
) -> None:
sample = LatencySample(
timestamp=time.time(),
ttft_ms=ttft_ms,
total_ms=total_ms,
model=model,
tokens_output=tokens_output,
)
self._samples.append(sample)
def get_stats(self) -> dict:
if not self._samples:
return {"error": "no data"}
samples = list(self._samples)
ttfts = [s.ttft_ms for s in samples]
totals = [s.total_ms for s in samples]
ttft_p95 = self._percentile(ttfts, 95)
total_p95 = self._percentile(totals, 95)
return {
"sample_count": len(samples),
"ttft": {
"p50_ms": round(self._percentile(ttfts, 50)),
"p95_ms": round(ttft_p95),
"p99_ms": round(self._percentile(ttfts, 99)),
"sla_violation": ttft_p95 > self.ttft_sla_ms,
},
"total": {
"p50_ms": round(self._percentile(totals, 50)),
"p95_ms": round(total_p95),
"p99_ms": round(self._percentile(totals, 99)),
"sla_violation": total_p95 > self.total_sla_ms,
},
"throughput_tps": round(
sum(s.tokens_output for s in samples)
/ max(1, (samples[-1].timestamp - samples[0].timestamp)),
1,
),
}
@staticmethod
def _percentile(data: list[float], pct: int) -> float:
if not data:
return 0.0
sorted_data = sorted(data)
idx = int(len(sorted_data) * pct / 100)
return sorted_data[min(idx, len(sorted_data) - 1)]
The Latency Optimization Decision Tree
When you're facing a latency problem, use this decision tree to diagnose before you act:
Common Mistakes
:::danger Measuring p50 and Ignoring p99 LLM latency distributions are heavily bimodal - a cluster of fast responses and a long tail of slow ones. p50 latency might be 3 seconds while p99 is 25 seconds. Your SLA should be defined on p95 or p99. A 3-second p50 sounds acceptable; a 25-second p99 means 1 in 100 users is staring at a spinner for 25 seconds and actively considering churning. Always monitor and alert on p95/p99, not averages or medians. :::
:::danger Running Pre-LLM Steps Sequentially
The most common latency optimization mistake: running auth → context load → RAG retrieval → prompt build sequentially when they are largely independent. Auth can run while context loads. RAG retrieval can run while auth validates. Running these in parallel with asyncio.gather() can reduce pre-LLM latency by 50–70%. Always examine your request handler for sequential operations that could be parallelized. Time each phase separately first so you know which ones are actually expensive.
:::
:::warning Using Synchronous Drivers in AsyncIO Applications
In a FastAPI + asyncio application, any synchronous blocking operation (database call with a sync driver, CPU-heavy processing, time.sleep()) blocks the entire event loop - stalling every other request for the duration of that operation. Use async drivers everywhere: asyncpg for PostgreSQL, motor for MongoDB, aioredis for Redis. For CPU-heavy work, use asyncio.get_event_loop().run_in_executor(None, fn) to offload to a thread pool without blocking the loop.
:::
:::warning Caching Dynamic Content
A common mistake with prompt caching is marking RAG-retrieved content or conversation-specific data with cache_control. These change every request - caching them causes cache misses 100% of the time and wastes the 5-minute TTL slot. Only mark stable content for caching: base system instructions, persona definitions, static knowledge base entries, tool schemas. Dynamic content (RAG results, user-specific context, current date/time) should never be cached.
:::
:::tip The Three Highest-Value Latency Improvements
For most AI products: (1) Stream responses - moves from "15-second wait" to "300ms TTFT, reading while generating." Single biggest perceived latency improvement, and achievable in a few hours. (2) Enable Anthropic prompt caching for system prompts over 1K tokens - saves 90% of input token processing time, directly reduces TTFT with near-zero engineering cost. (3) Parallelize pre-LLM steps with asyncio.gather() - typically saves 100–400ms on every request, and the code change is usually under 20 lines. These three changes together can reduce perceived latency by 80% without touching the model.
:::
Latency Budget: A Worked Example
Here is a worked example of a typical latency breakdown before and after applying the techniques in this lesson:
| Phase | Before | After | Technique Applied |
|---|---|---|---|
| Auth + rate limit | 40ms | 12ms | Token caching in Redis |
| Context load | 180ms | 60ms | Redis cache + pagination |
| RAG retrieval | 350ms | 350ms | Now parallelized - not on critical path |
| Pre-LLM total | 570ms sequential | 370ms parallel | asyncio.gather() |
| LLM TTFT | 1800ms | 700ms | Prompt caching (10K token system) |
| Perceived TTFT | 2370ms | 1070ms | Skeleton screen masks 1070ms |
| Token generation | 12s | 12s | Unchanged |
| Perceived wait | 12s (spinner) | ~3s (reading first words) | Streaming |
The actual latency went from 14.4s to 13.1s - a 9% improvement. The perceived latency went from "broken app" to "fast AI." Engineering effort: roughly one sprint. The lesson: perceived latency engineering almost always has higher ROI than infrastructure latency engineering, especially in the early product lifecycle.
Interview Q&A
Q1: What is TTFT and why is it the most important latency metric for LLM products?
TTFT (Time-to-First-Token) is the delay from sending a request to receiving the first token of the response. It is the most important perceived latency metric for streaming AI interfaces because it determines when the user gets their first signal that the AI is working. A 15-second total response time with a 400ms TTFT feels dramatically faster than a 5-second total response time with a 4-second TTFT - users read the first sentence during TTFT, psychologically accepting the longer generation time. Optimizations in order of impact: (1) Anthropic prompt caching - cache the system prompt to avoid re-processing it every request; for a 10K-token system prompt, this saves ~1 second per cached request. (2) Shorter prompts - every 1K tokens of input adds ~50–100ms to TTFT. (3) Parallel pre-LLM steps - run RAG retrieval, context loading, and auth with asyncio.gather(). (4) Model selection - Haiku has 2–3x lower TTFT than Opus for identical input length. (5) Connection pooling - avoid TCP/TLS handshake overhead.
Q2: How do you make a 10-second LLM response feel fast?
Streaming is the primary tool - users see the first word within 300–600ms instead of waiting 10 seconds for the complete response. Layer on top of streaming: (1) Optimistic UI - show the user's message instantly before the network request even returns. This eliminates the worst feeling: "did my message send?" (2) Skeleton screens - replace the spinner with content-shaped placeholders during TTFT. Studies show 25–35% improvement in perceived speed versus a spinner. (3) Thinking indicators - contextual status messages like "Searching knowledge base..." signal activity and set timing expectations. (4) Progressive summary - show a 1–2 sentence TL;DR generated by Haiku in ~1s before the full Opus response streams in. (5) Smart auto-scroll - keep the viewport at the bottom of the stream so users are always reading the freshest content without manual scrolling.
Q3: How do you measure and monitor LLM latency in production?
Track four metrics per request: TTFT, total generation time, tokens per second (throughput), and end-to-end latency. Log these per model, per tenant, and per query type. Set up percentile dashboards (p50, p95, p99) in your monitoring system - never use averages for latency, they mask the long tail. Alert on p95 TTFT exceeding 3 seconds and p99 total exceeding 30 seconds. Break down latency by phase: auth, RAG retrieval, prompt build, LLM TTFT, LLM generation. This reveals bottlenecks - RAG retrieval is often the surprise. Correlate latency with output token count to separate model speed from response length effects. In distributed systems, trace request IDs across services to connect frontend perceived latency to backend phase measurements.
Q4: How do you handle a latency SLA violation gracefully?
Set timeouts at multiple layers. Application-level: asyncio.wait_for(call, timeout=30). SSE keepalive: send heartbeat events every 15s to prevent proxy timeouts during long generations. User-facing: if TTFT exceeds 5 seconds, show a message "Taking longer than usual..." with a visible cancel button. For the actual violation: if the primary model times out, fall back to a faster model (Haiku). For non-streaming use cases: return a "processing" response with a job ID, then deliver the result asynchronously via webhook or polling when ready. Never show a raw timeout error to users - always show a human-readable message with a recovery action. The goal is that users always know what happened and what to do next, even when everything is slow.
Q5: Why is RAG retrieval often a hidden latency bottleneck?
Before the LLM call, RAG requires: embedding the user's query (~100–300ms for a dedicated embedding model API call), searching a vector index (~50–500ms depending on index size, replica count, and ANN algorithm parameters), and optionally reranking results (~100–500ms with a cross-encoder reranker). This pre-LLM latency adds directly to TTFT - if the LLM processes the input in 800ms but RAG takes 400ms before the LLM call even starts, TTFT is 1200ms, not 800ms. Solutions: (1) Parallelize RAG with other pre-LLM steps using asyncio.gather() - if auth takes 50ms and RAG takes 400ms, running them in parallel saves 50ms off TTFT. (2) Cache query embeddings - for frequently-asked questions, store the embedding and reuse it. (3) Cache retrieval results - for semantically identical queries (cosine similarity > 0.95), return the cached chunks directly. (4) Warm vector DB connections - cold connections add TCP handshake overhead on every request. (5) Use ANN indexes - approximate nearest neighbor search is 10–100x faster than exact search at the cost of minor recall reduction, typically acceptable for RAG.
Q6: What is the difference between actual latency and perceived latency, and which should you optimize first?
Actual latency is the wall-clock time from request to complete response. Perceived latency is how long the user feels they waited - a subjective, psychological measure heavily influenced by feedback, progress signals, and anticipation. They are not the same number. A 12-second response with streaming and skeleton screens is consistently perceived as faster than an 8-second response with a blank spinner. You should optimize perceived latency first because: (1) it is faster to implement - design changes take hours, infrastructure improvements take weeks; (2) it has higher return on investment - a user who is reading the first paragraph at second 1 will tolerate 12 more seconds of generation without complaint; (3) it does not require coordination with infrastructure teams, billing changes, or model fine-tuning. Once perceived latency is optimized, invest in actual latency - particularly prompt caching, parallel pre-LLM steps, and model routing - which also improve cost and system throughput.
