:::tip 🎮 Interactive Playground Visualize this concept: Try the Streaming LLM Responses demo on the EngineersOfAI Playground - no code required. :::
Streaming UX for LLMs
The Interface That Changed Everything
In the fall of 2022, a mid-sized startup had built what the team privately believed was the best AI coding assistant in the market. The model was fine-tuned on proprietary codebases, retrieval-augmented with internal documentation, and benchmarked at 73% accuracy on their domain - five points above GPT-3.5 on comparable tasks. The CEO had demoed it to three enterprise prospects, and all three had signed letters of intent. Then they launched the private beta to 500 engineers at a Fortune 100 client. The feedback after week one was brutal: "It feels slow and broken." The average response time was 14 seconds. Users submitted a query, watched a spinner, and waited - and waited. One by one, they stopped using it. The beta had a 22% retention rate at day 7. The team was devastated. The model was right. The product felt broken.
What happened next was a single engineering sprint. Not a model improvement. Not new training data. Not prompt engineering. The team implemented streaming: the response began appearing on screen within 400 milliseconds of submission, a token at a time, as the model generated it. Total generation time was unchanged - still 14 seconds for long responses. But now users were reading the first sentence before the model had finished generating the third. Perceived wait time dropped from "14 seconds of nothing" to "less than a second to the first word." Day-7 retention jumped from 22% to 64% in the next beta cohort. The model hadn't changed. The product had transformed. This lesson teaches you to build that experience: the protocol, the backend implementation, the frontend patterns, and the subtle UX details that separate a streaming interface that works from one that delights.
Why Streaming Exists: The Token Generation Gap
To understand streaming, you need to understand how language models generate text. An LLM does not "think" about the full response and then write it all at once. It generates tokens one at a time, left to right, in an autoregressive loop: each new token is conditioned on all previous tokens. The first token is ready in hundreds of milliseconds - after the model has processed the input and run the first forward pass. But a non-streaming interface holds all tokens until the last one is generated, then sends the entire batch at once.
For a 500-word response at typical generation speed (40-60 tokens/second), that means the user waits 15-20 seconds before seeing anything. The model has been "producing output" since the 400ms mark, but the interface design throws all of that away and makes the user wait anyway. Streaming fixes this mismatch by forwarding tokens to the client as soon as they're generated: token arrives from model → immediately forwarded to browser → browser renders it.
The key metric streaming improves is TTFT (Time-to-First-Token): the delay from request submission to the moment the user sees the first character. With streaming, TTFT can be under 300ms. Without streaming, the effective TTFT from the user's perspective is the total generation time - often 5-25 seconds.
There is a secondary benefit beyond raw latency: streaming makes AI feel more alive. The typing-like animation creates a sense of the AI working in real-time, which increases user engagement and perceived quality - even if the total generation time is identical. This is well-established in UX psychology: users experiencing progress signals perceive wait times as shorter than users staring at a static spinner, even when the actual wait is the same.
The Protocol: Server-Sent Events (SSE)
Streaming uses Server-Sent Events (SSE), a standard HTTP protocol for one-way server-to-client event streaming. Unlike WebSockets, SSE is:
- Unidirectional: server pushes to client only - which is exactly what AI chat needs
- HTTP-native: works through existing load balancers and proxies without special handshake protocols or upgrade headers
- Auto-reconnecting: the browser's built-in
EventSourceAPI handles reconnection transparently - Simple: a plain text wire format, trivial to implement and debug with
curl - Firewall-friendly: it is a normal HTTP/HTTPS connection - no special firewall rules or proxy configurations
SSE wire format:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"type": "stream_start", "model": "claude-opus-4-6"}
data: {"type": "text", "text": "The"}
data: {"type": "text", "text": " answer"}
data: {"type": "text", "text": " is"}
data: {"type": "stream_end", "usage": {"input_tokens": 12, "output_tokens": 3}, "metrics": {"ttft_ms": 287}}
data: [DONE]
Each event is prefixed with data:, followed by a JSON payload. A blank line terminates the event. The [DONE] sentinel signals stream completion. Lines starting with : are SSE comments - sent to the client but ignored by parsers - used for keepalive heartbeats to prevent proxy timeouts.
| Transport Option | Direction | Use Case | Pros | Cons |
|---|---|---|---|---|
| SSE (text/event-stream) | Server → Client | AI response streaming | Simple, HTTP-native, auto-reconnect | One-way only |
| WebSockets | Bidirectional | Real-time chat, collaborative editing | Full duplex | Complex, proxy issues |
| Long polling | Server → Client | Legacy environments | Works everywhere | High latency, inefficient |
| HTTP/2 server push | Server → Client | Asset preloading | Multiplexed | Poor ecosystem support |
| gRPC streaming | Bidirectional | Service-to-service | Type-safe, efficient | Heavy client-side setup |
For AI chat interfaces, SSE is almost always the right choice. It is simpler than WebSockets, sufficient for the unidirectional stream, and works reliably in the browser without additional libraries.
Backend: Streaming with FastAPI and AsyncAnthropic
# backend/streaming_endpoint.py
import anthropic
import json
import asyncio
import time
import logging
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import AsyncIterator
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Use AsyncAnthropic for async streaming - critical for FastAPI's async event loop
# The synchronous anthropic.Anthropic() will block the event loop on each stream read
client = anthropic.AsyncAnthropic()
async def generate_sse_stream(
messages: list[dict],
system: str,
model: str = "claude-opus-4-6",
max_tokens: int = 2048,
) -> AsyncIterator[str]:
"""
Generate SSE-formatted token stream from Claude.
Each yield is a complete SSE message: "data: {...}\n\n"
Event protocol:
- stream_start: signals stream beginning, UI transitions to streaming state
- text: contains a token or token chunk to append to the rendered output
- stream_end: final event with usage data and latency metrics
- error: structured error for client-side error display
- [DONE]: sentinel to close the SSE reader on the client
Critical implementation notes:
1. Always use AsyncAnthropic with FastAPI - sync client blocks event loop
2. Always yield "data: [DONE]\n\n" even after errors - client needs clean close signal
3. Track TTFT - it is the primary UX quality metric for streaming
4. Handle each Anthropic error type separately with user-friendly messages
"""
request_start = time.monotonic()
first_token_time: float | None = None
input_tokens = 0
output_tokens = 0
total_chars = 0
try:
yield f"data: {json.dumps({'type': 'stream_start', 'model': model})}\n\n"
async with client.messages.stream(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
) as stream:
async for event in stream:
if not hasattr(event, "type"):
continue
if event.type == "content_block_delta":
if hasattr(event.delta, "text"):
token_text = event.delta.text
# Track TTFT on first token
if first_token_time is None:
first_token_time = time.monotonic()
ttft_ms = (first_token_time - request_start) * 1000
logger.info(f"TTFT={ttft_ms:.0f}ms model={model}")
total_chars += len(token_text)
yield f"data: {json.dumps({'type': 'text', 'text': token_text})}\n\n"
elif event.type == "message_start":
if hasattr(event.message, "usage"):
input_tokens = event.message.usage.input_tokens
elif event.type == "message_delta":
if hasattr(event, "usage") and hasattr(event.usage, "output_tokens"):
output_tokens = event.usage.output_tokens
# stream_end with full metrics
total_ms = (time.monotonic() - request_start) * 1000
yield f"data: {json.dumps({'type': 'stream_end', 'usage': {'input_tokens': input_tokens, 'output_tokens': output_tokens, 'total_chars': total_chars}, 'metrics': {'total_ms': round(total_ms), 'ttft_ms': round((first_token_time - request_start) * 1000) if first_token_time else None}})}\n\n"
yield "data: [DONE]\n\n"
except anthropic.RateLimitError:
yield f"data: {json.dumps({'type': 'error', 'code': 429, 'message': 'Rate limit reached. Please wait a moment and try again.', 'retryable': True, 'retry_after_seconds': 10})}\n\n"
yield "data: [DONE]\n\n"
except anthropic.APIStatusError as e:
code = getattr(e, "status_code", 500)
yield f"data: {json.dumps({'type': 'error', 'code': code, 'message': 'AI service temporarily unavailable. Please try again.', 'retryable': code >= 500})}\n\n"
yield "data: [DONE]\n\n"
except anthropic.APIConnectionError:
yield f"data: {json.dumps({'type': 'error', 'code': 503, 'message': 'Connection to AI service failed. Check your connection and retry.', 'retryable': True})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.exception(f"Unexpected error in streaming: {e}")
yield f"data: {json.dumps({'type': 'error', 'code': 500, 'message': 'An unexpected error occurred.', 'retryable': True})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/stream")
async def stream_chat(request: Request):
"""
Streaming chat endpoint - returns SSE stream.
Critical response headers:
- Cache-Control: no-cache - prevent any caching of stream chunks
- X-Accel-Buffering: no - disable Nginx buffering (most common streaming bug)
- Connection: keep-alive - maintain long-lived HTTP connection
"""
body = await request.json()
messages = body.get("messages", [])
system = body.get("system", "You are a helpful assistant.")
model = body.get("model", "claude-opus-4-6")
if not messages:
raise HTTPException(status_code=400, detail="messages required")
for msg in messages:
if msg.get("role") not in ("user", "assistant"):
raise HTTPException(status_code=400, detail=f"Invalid role: {msg.get('role')}")
return StreamingResponse(
generate_sse_stream(messages, system, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # CRITICAL - disables Nginx response buffering
"X-Content-Type-Options": "nosniff",
},
)
async def buffered_stream(
messages: list[dict],
system: str,
buffer_size: int = 8,
) -> AsyncIterator[str]:
"""
Token buffering: accumulate small tokens, flush on word/sentence boundaries.
Why buffer at all?
- LLMs sometimes emit tokens character-by-character (e.g. "H", "e", "l", "l", "o")
- Each token triggers a React setState() and a DOM update
- Batching to word boundaries reduces DOM update frequency by ~5x
- Doesn't increase TTFT - first chunk still arrives at first natural boundary (< 50ms delay)
- Particularly valuable for code blocks where char-by-char rendering is visually jittery
When NOT to buffer:
- When the user expects maximum possible speed
- When monitoring TTFT directly (buffer adds ~10-50ms to TTFT)
- When the model outputs primarily whitespace-delimited tokens (most English text)
"""
yield f"data: {json.dumps({'type': 'stream_start'})}\n\n"
buffer: list[str] = []
async with client.messages.stream(
model="claude-opus-4-6",
max_tokens=2048,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
buffer.append(text)
is_boundary = any(c in text for c in ["\n", ".", "!", "?", " ", ",", ";", ":"])
should_flush = len(buffer) >= buffer_size or is_boundary
if should_flush and buffer:
chunk = "".join(buffer)
yield f"data: {json.dumps({'type': 'text', 'text': chunk})}\n\n"
buffer.clear()
if buffer:
yield f"data: {json.dumps({'type': 'text', 'text': ''.join(buffer)})}\n\n"
yield f"data: {json.dumps({'type': 'stream_end'})}\n\n"
yield "data: [DONE]\n\n"
:::warning Nginx Buffering Will Silently Break Streaming
If you deploy behind Nginx (or any reverse proxy), response buffering is enabled by default. Nginx buffers the entire response body before forwarding it to the client - completely defeating streaming. The symptom: your API works perfectly with curl but the browser receives the full response all at once after a long wait. Fix: add X-Accel-Buffering: no to every streaming response header, or add proxy_buffering off; to your Nginx location block for the streaming endpoint. This is the single most common streaming deployment bug, affecting a significant portion of engineers who implement streaming for the first time.
:::
Frontend: The Streaming Chat Hook
// hooks/useStreamingChat.ts
import { useState, useCallback, useRef } from "react";
export interface Message {
id: string;
role: "user" | "assistant";
content: string;
status: "sent" | "pending" | "streaming" | "complete" | "error";
metadata?: {
inputTokens?: number;
outputTokens?: number;
ttftMs?: number;
totalMs?: number;
};
}
interface StreamState {
messages: Message[];
isStreaming: boolean;
error: string | null;
}
export function useStreamingChat(apiUrl: string) {
const [state, setState] = useState<StreamState>({
messages: [],
isStreaming: false,
error: null,
});
// Use ref for streaming content - avoids stale closure capture in reader loop
// If we put content in useState and update it per token, each setState creates
// a new closure in the reader loop that captures the old value.
// Ref + single setState per token solves this cleanly.
const streamingContentRef = useRef<string>("");
const abortControllerRef = useRef<AbortController | null>(null);
const sendMessage = useCallback(
async (userContent: string) => {
if (state.isStreaming) return;
const userMessageId = crypto.randomUUID();
const assistantMessageId = crypto.randomUUID();
// Optimistically add user message + placeholder
setState((prev) => ({
...prev,
messages: [
...prev.messages,
{ id: userMessageId, role: "user", content: userContent, status: "sent" },
{ id: assistantMessageId, role: "assistant", content: "", status: "pending" },
],
isStreaming: true,
error: null,
}));
streamingContentRef.current = "";
abortControllerRef.current = new AbortController();
// Build API message list - only include completed messages
const apiMessages = [
...state.messages
.filter((m) => m.status === "complete" || m.status === "sent")
.map((m) => ({ role: m.role, content: m.content })),
{ role: "user" as const, content: userContent },
];
try {
const response = await fetch(apiUrl, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messages: apiMessages, system: "You are a helpful AI assistant." }),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
if (!response.body) {
throw new Error("Response body is null - streaming not supported");
}
// Transition placeholder from pending to streaming
setState((prev) => ({
...prev,
messages: prev.messages.map((m) =>
m.id === assistantMessageId ? { ...m, status: "streaming" } : m
),
}));
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
// SSE reader loop
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Decode chunk - stream:true handles multi-byte UTF-8 characters split across chunks
buffer += decoder.decode(value, { stream: true });
// Process complete SSE events (terminated by \n\n)
const parts = buffer.split("\n\n");
buffer = parts.pop() ?? "";
for (const block of parts) {
for (const line of block.split("\n")) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") break;
try {
const event = JSON.parse(data);
if (event.type === "text") {
streamingContentRef.current += event.text;
const snapshot = streamingContentRef.current;
setState((prev) => ({
...prev,
messages: prev.messages.map((m) =>
m.id === assistantMessageId
? { ...m, content: snapshot, status: "streaming" }
: m
),
}));
} else if (event.type === "stream_end") {
setState((prev) => ({
...prev,
messages: prev.messages.map((m) =>
m.id === assistantMessageId
? {
...m,
status: "complete",
metadata: {
inputTokens: event.usage?.input_tokens,
outputTokens: event.usage?.output_tokens,
ttftMs: event.metrics?.ttft_ms,
totalMs: event.metrics?.total_ms,
},
}
: m
),
}));
} else if (event.type === "error") {
setState((prev) => ({
...prev,
error: event.message,
messages: prev.messages.map((m) =>
m.id === assistantMessageId
? { ...m, status: "error", content: m.content }
: m
),
}));
}
} catch {
// Skip malformed JSON - SSE may arrive in partial chunks across read() calls
}
}
}
}
// Ensure final state is complete (handles edge case where stream_end wasn't emitted)
setState((prev) => ({
...prev,
isStreaming: false,
messages: prev.messages.map((m) =>
m.id === assistantMessageId && m.status === "streaming"
? { ...m, status: "complete" }
: m
),
}));
} catch (err: unknown) {
if (err instanceof Error && err.name === "AbortError") {
// User clicked "Stop" - keep partial content, mark as complete
setState((prev) => ({
...prev,
isStreaming: false,
messages: prev.messages.map((m) =>
m.id === assistantMessageId && m.status === "streaming"
? { ...m, status: "complete" }
: m
),
}));
return;
}
const errorMsg = err instanceof Error ? err.message : "Failed to get response";
setState((prev) => ({
...prev,
isStreaming: false,
error: errorMsg,
messages: prev.messages.map((m) =>
m.id === assistantMessageId ? { ...m, status: "error" } : m
),
}));
} finally {
abortControllerRef.current = null;
}
},
[state.messages, state.isStreaming, apiUrl]
);
const stopStreaming = useCallback(() => {
abortControllerRef.current?.abort();
}, []);
const clearHistory = useCallback(() => {
setState({ messages: [], isStreaming: false, error: null });
}, []);
const retryLast = useCallback(() => {
const lastUser = [...state.messages].reverse().find((m) => m.role === "user");
if (!lastUser || state.isStreaming) return;
setState((prev) => ({
...prev,
messages: prev.messages.slice(0, -1), // Remove last assistant message
}));
sendMessage(lastUser.content);
}, [state.messages, state.isStreaming, sendMessage]);
return { ...state, sendMessage, stopStreaming, clearHistory, retryLast };
}
:::info Why Use a Ref for Streaming Content?
The content accumulation (streamingContentRef.current += event.text) uses a ref rather than state because the SSE reader loop runs in a closure that captures the initial state values. If you used useState for content and did setContent(prev => prev + event.text), you would need a functional updater on every token - which works but causes an extra re-render closure per token. The ref pattern is cleaner: accumulate in the ref (O(1) per token, no re-render), then mirror to state for React's rendering. This is a common React streaming pattern.
:::
The Streaming Chat UI Component
// components/StreamingChat.tsx
import React, { useRef, useEffect, useState, useCallback } from "react";
import { useStreamingChat, Message } from "../hooks/useStreamingChat";
function MessageBubble({
message,
isLast,
isStreaming,
}: {
message: Message;
isLast: boolean;
isStreaming: boolean;
}) {
const showCursor = isLast && isStreaming && message.role === "assistant";
const isPending = message.status === "pending";
const isError = message.status === "error";
return (
<div
className={`flex gap-3 mb-4 ${
message.role === "user" ? "justify-end" : "justify-start"
}`}
>
{message.role === "assistant" && (
<div className="w-8 h-8 rounded-full bg-blue-600 flex items-center justify-center text-white text-xs font-bold flex-shrink-0 mt-1">
AI
</div>
)}
<div
className={`max-w-[80%] rounded-2xl px-4 py-3 ${
message.role === "user"
? "bg-blue-600 text-white"
: isError
? "bg-red-50 border border-red-200 text-red-700"
: "bg-gray-100 text-gray-800"
}`}
>
{isPending ? (
/* Skeleton loader during the TTFT gap - prevents blank flash */
<div className="space-y-2 py-1">
<div className="h-3 bg-gray-300 rounded animate-pulse w-48" />
<div className="h-3 bg-gray-300 rounded animate-pulse w-36" />
<div className="h-3 bg-gray-300 rounded animate-pulse w-44" />
</div>
) : (
<div className="text-sm leading-relaxed whitespace-pre-wrap">
{message.content}
{isError && (
<span className="block mt-1 text-xs text-red-500">
Error generating response. Try again.
</span>
)}
{/* Blinking cursor - only shown on active streaming message */}
{showCursor && (
<span
className="inline-block w-0.5 h-4 bg-gray-600 ml-0.5 align-text-bottom"
style={{ animation: "blink 1s step-end infinite" }}
/>
)}
</div>
)}
{/* Token usage - show for completed assistant messages */}
{message.role === "assistant" &&
message.status === "complete" &&
message.metadata?.outputTokens && (
<div className="text-xs text-gray-400 mt-2 pt-1.5 border-t border-gray-200 flex gap-3">
<span>{message.metadata.outputTokens} tokens</span>
{message.metadata.ttftMs && (
<span>TTFT: {message.metadata.ttftMs}ms</span>
)}
{message.metadata.totalMs && (
<span>{(message.metadata.totalMs / 1000).toFixed(1)}s total</span>
)}
</div>
)}
</div>
{message.role === "user" && (
<div className="w-8 h-8 rounded-full bg-gray-300 flex items-center justify-center text-gray-600 text-xs font-bold flex-shrink-0 mt-1">
You
</div>
)}
</div>
);
}
export function StreamingChat() {
const [input, setInput] = useState("");
const messagesEndRef = useRef<HTMLDivElement>(null);
const containerRef = useRef<HTMLDivElement>(null);
const userScrolledRef = useRef(false);
const { messages, isStreaming, error, sendMessage, stopStreaming, clearHistory, retryLast } =
useStreamingChat("/v1/chat/stream");
// Smart auto-scroll: follow the stream unless the user scrolled up intentionally
const handleScroll = useCallback(() => {
const container = containerRef.current;
if (!container) return;
const nearBottom =
container.scrollHeight - container.scrollTop - container.clientHeight < 80;
userScrolledRef.current = !nearBottom;
}, []);
useEffect(() => {
if (userScrolledRef.current) return;
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [messages]);
useEffect(() => {
if (!isStreaming) userScrolledRef.current = false;
}, [isStreaming]);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
const content = input.trim();
if (!content || isStreaming) return;
setInput("");
await sendMessage(content);
};
const handleKeyDown = (e: React.KeyboardEvent<HTMLTextAreaElement>) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
handleSubmit(e as unknown as React.FormEvent);
}
};
return (
<>
{/* Cursor blink animation - add to global CSS or inject here */}
<style>{`
@keyframes blink {
0%, 100% { opacity: 1; }
50% { opacity: 0; }
}
`}</style>
<div className="flex flex-col h-screen max-w-3xl mx-auto bg-white">
{/* Header */}
<div className="flex items-center justify-between px-4 py-3 border-b">
<h1 className="text-lg font-semibold text-gray-800">AI Assistant</h1>
<div className="flex gap-2">
<button
onClick={retryLast}
disabled={isStreaming || messages.length === 0}
className="text-xs text-gray-400 hover:text-gray-600 px-2 py-1 rounded disabled:opacity-30"
>
Retry
</button>
<button
onClick={clearHistory}
disabled={isStreaming}
className="text-xs text-gray-400 hover:text-gray-600 px-2 py-1 rounded disabled:opacity-30"
>
Clear
</button>
</div>
</div>
{/* Messages */}
<div
ref={containerRef}
onScroll={handleScroll}
className="flex-1 overflow-y-auto px-4 py-4"
>
{messages.length === 0 && (
<div className="text-center text-gray-400 mt-20">
<p className="text-2xl mb-2">How can I help?</p>
<p className="text-sm">Start a conversation below</p>
</div>
)}
{messages.map((msg, i) => (
<MessageBubble
key={msg.id}
message={msg}
isLast={i === messages.length - 1}
isStreaming={isStreaming}
/>
))}
{error && !isStreaming && (
<div className="bg-red-50 border border-red-200 text-red-700 text-sm rounded-lg px-4 py-3 mb-4">
{error}
</div>
)}
<div ref={messagesEndRef} />
</div>
{/* Input area */}
<form
onSubmit={handleSubmit}
className="border-t px-4 py-3 flex gap-2 items-end bg-white"
>
<textarea
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={handleKeyDown}
placeholder={
isStreaming
? "AI is responding..."
: "Type a message... (Enter to send, Shift+Enter for newline)"
}
disabled={isStreaming}
rows={1}
className="flex-1 resize-none rounded-xl border border-gray-300 px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-blue-500 disabled:bg-gray-50 disabled:cursor-not-allowed"
style={{ maxHeight: "120px" }}
onInput={(e) => {
const el = e.currentTarget;
el.style.height = "auto";
el.style.height = `${Math.min(el.scrollHeight, 120)}px`;
}}
/>
{isStreaming ? (
<button
type="button"
onClick={stopStreaming}
className="px-4 py-2 rounded-xl bg-red-500 text-white text-sm font-medium hover:bg-red-600 flex-shrink-0"
>
Stop
</button>
) : (
<button
type="submit"
disabled={!input.trim()}
className="px-4 py-2 rounded-xl bg-blue-600 text-white text-sm font-medium hover:bg-blue-700 disabled:opacity-50 disabled:cursor-not-allowed flex-shrink-0"
>
Send
</button>
)}
</form>
</div>
</>
);
}
Streaming Architecture: End to End
Streaming Markdown: The Rendering Challenge
Streaming raw text works, but AI responses typically contain Markdown - headers, bold, code blocks, lists. Rendering Markdown while streaming is tricky because the Markdown structure is incomplete mid-stream. A **bold** span won't render correctly if the stream stops after **bo. A code block opened with ``` won't render correctly until the closing ``` arrives.
The naive approach - re-rendering the full response through a Markdown parser on every token - produces constant visual glitches: text flickering between raw syntax and rendered HTML, code blocks half-opening and half-closing, headers appearing and disappearing. This is jarring and makes the streaming experience feel broken, even though the stream itself is working perfectly.
// hooks/useStreamingMarkdown.ts
import { useState, useEffect } from "react";
/**
* Progressive Markdown rendering for streaming AI responses.
*
* Core insight: Markdown syntax is line-oriented. Most Markdown constructs
* (paragraphs, headers, list items, code blocks) are delimited by
* double-newlines or line boundaries. We can safely render everything
* up to the last double-newline boundary, and show the in-progress
* text of the current paragraph as plain text.
*
* Solution:
* - Split content on "\n\n" (paragraph boundaries)
* - Render all complete paragraphs with react-markdown (safe - closed syntax)
* - Render the last incomplete paragraph as plain text + cursor
* - When streaming ends: render everything as Markdown
*
* Special handling for code blocks:
* - A ``` fence without a closing ``` causes the whole incomplete block to render raw
* - Detect open code fences and defer rendering until the closing fence arrives
*
* Performance:
* - Memoize rendered complete paragraphs - they don't change after being completed
* - Only the last paragraph re-renders on every token
*/
export function useStreamingMarkdown(
content: string,
isStreaming: boolean
): { renderableMarkdown: string; pendingText: string; isInCodeBlock: boolean } {
const [renderableMarkdown, setRenderableMarkdown] = useState("");
const [pendingText, setPendingText] = useState("");
const [isInCodeBlock, setIsInCodeBlock] = useState(false);
useEffect(() => {
if (!isStreaming) {
// Stream complete - render everything as Markdown
setRenderableMarkdown(content);
setPendingText("");
setIsInCodeBlock(false);
return;
}
// Detect open code block fences
const fenceCount = (content.match(/```/g) || []).length;
const openCodeBlock = fenceCount % 2 !== 0;
setIsInCodeBlock(openCodeBlock);
if (openCodeBlock) {
// If we're inside a code block, don't split - wait for the closing fence
// Find the last complete code block's end and split there
const lastClosedFence = content.lastIndexOf("```", content.lastIndexOf("```") - 1);
if (lastClosedFence > 0) {
const safeEnd = content.indexOf("\n", lastClosedFence + 3);
if (safeEnd > 0) {
setRenderableMarkdown(content.slice(0, safeEnd + 1));
setPendingText(content.slice(safeEnd + 1));
return;
}
}
// No complete code block found - show everything as plain text for now
setRenderableMarkdown("");
setPendingText(content);
return;
}
// Normal paragraph splitting
const parts = content.split(/\n\n/);
if (parts.length <= 1) {
// Still in the first paragraph - nothing safe to render as Markdown yet
setRenderableMarkdown("");
setPendingText(content);
} else {
// All paragraphs except the last are complete
const complete = parts.slice(0, -1).join("\n\n");
const inProgress = parts[parts.length - 1];
setRenderableMarkdown(complete);
setPendingText(inProgress);
}
}, [content, isStreaming]);
return { renderableMarkdown, pendingText, isInCodeBlock };
}
/**
* Usage example with react-markdown:
*
* import ReactMarkdown from "react-markdown";
* import remarkGfm from "remark-gfm";
* import { useMemo } from "react";
*
* function StreamingMessage({ content, isStreaming }) {
* const { renderableMarkdown, pendingText } = useStreamingMarkdown(content, isStreaming);
* const rendered = useMemo(
* () => <ReactMarkdown remarkPlugins={[remarkGfm]}>{renderableMarkdown}</ReactMarkdown>,
* [renderableMarkdown] // Only re-render when complete paragraphs change
* );
* return (
* <div>
* {rendered}
* <span className="text-gray-800 whitespace-pre-wrap">{pendingText}</span>
* {isStreaming && <span className="cursor-blink">▌</span>}
* </div>
* );
* }
*/
Production Engineering: Heartbeats and Timeouts
# backend/streaming_production.py
import anthropic
import asyncio
import json
import time
from typing import AsyncIterator
client = anthropic.AsyncAnthropic()
async def stream_with_heartbeat(
messages: list[dict],
system: str = "You are a helpful assistant.",
heartbeat_interval: float = 15.0,
) -> AsyncIterator[str]:
"""
Production streaming with keepalive heartbeats.
Problem: Proxies and load balancers interpret silence as a dead connection.
Claude Opus can pause for 5-10 seconds between tokens on complex reasoning tasks.
An ALB with a 60-second idle timeout will kill the connection before the model finishes.
Solution: SSE comment lines (": heartbeat") - sent to the client but ignored by parsers.
The browser's EventSource API and custom fetch-based readers both ignore comment lines.
Heartbeats keep the TCP connection alive without affecting the response content.
Schedule:
- Send heartbeat every 15 seconds of token silence
- Most proxies have 60-second idle timeouts - 15s is safe with margin
- AWS ALB: set idle_timeout to 300s for streaming endpoints as well
"""
yield f"data: {json.dumps({'type': 'stream_start'})}\n\n"
last_token_time = time.monotonic()
heartbeat_task_active = True
token_queue: asyncio.Queue = asyncio.Queue()
async def model_to_queue():
"""Read from Anthropic API and push to queue."""
try:
async with client.messages.stream(
model="claude-opus-4-6",
max_tokens=2048,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
await token_queue.put(("token", text))
except Exception as e:
await token_queue.put(("error", str(e)))
finally:
await token_queue.put(("done", None))
async def heartbeat_sender():
"""Send heartbeat comments to keep connection alive."""
nonlocal heartbeat_task_active
while heartbeat_task_active:
await asyncio.sleep(heartbeat_interval)
if heartbeat_task_active:
yield ": heartbeat\n\n"
# Start model reading in background
model_task = asyncio.create_task(model_to_queue())
last_heartbeat = time.monotonic()
while True:
try:
# Check for heartbeat need while waiting for next token
item = await asyncio.wait_for(token_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
# No token in 1 second - check if heartbeat needed
if time.monotonic() - last_heartbeat > heartbeat_interval:
yield ": heartbeat\n\n"
last_heartbeat = time.monotonic()
continue
kind, value = item
if kind == "token":
yield f"data: {json.dumps({'type': 'text', 'text': value})}\n\n"
last_heartbeat = time.monotonic()
elif kind == "error":
yield f"data: {json.dumps({'type': 'error', 'message': value})}\n\n"
break
elif kind == "done":
break
heartbeat_task_active = False
await model_task
yield f"data: {json.dumps({'type': 'stream_end'})}\n\n"
yield "data: [DONE]\n\n"
async def stream_with_prompt_cache(
messages: list[dict],
system_prompt: str,
cached_context: str = "",
) -> AsyncIterator[str]:
"""
Streaming with Anthropic prompt caching for TTFT optimization.
Prompt caching works by marking specific content blocks as cacheable.
On subsequent requests with the same cached blocks, Anthropic skips
re-processing those blocks - saving 90%+ of input token processing time.
This directly reduces TTFT: the model starts generating output faster
because it doesn't need to re-process the cached system prompt.
Best candidates for caching:
- Long system prompts (> 1K tokens) - instructions, persona, policies
- Static context documents - knowledge bases, reference docs
- Few-shot examples - if the same examples are used across requests
Caching is ephemeral: cached blocks expire after ~5 minutes of inactivity.
For production, keep at least one request per 4 minutes to maintain the cache.
"""
yield f"data: {json.dumps({'type': 'stream_start', 'cache_enabled': bool(cached_context)})}\n\n"
# Build system with cache control on the expensive part
system_blocks = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"}, # Cache the system prompt
}
]
if cached_context:
system_blocks.append({
"type": "text",
"text": cached_context,
"cache_control": {"type": "ephemeral"}, # Also cache the context
})
async with client.messages.stream(
model="claude-opus-4-6",
max_tokens=2048,
system=system_blocks,
messages=messages,
extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"},
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'type': 'text', 'text': text})}\n\n"
yield f"data: {json.dumps({'type': 'stream_end'})}\n\n"
yield "data: [DONE]\n\n"
:::tip TTFT Optimization Checklist - In Order of Impact
The five highest-impact TTFT optimizations, in order: (1) Enable Anthropic prompt caching for any system prompt over 1K tokens - this is the single biggest win, reducing TTFT by 50-70% for large system prompts. (2) Parallelize all pre-LLM steps: run RAG retrieval, auth validation, and context assembly concurrently with asyncio.gather(). (3) Use claude-haiku-4-5-20251001 for simple queries - Haiku's TTFT is 2-3x lower than Opus. (4) Keep warm HTTP/2 connections to the Anthropic API - AsyncAnthropic handles this via httpx's connection pool, but ensure the pool is kept warm. (5) Minimize system prompt length - every 1K tokens adds approximately 50-100ms to TTFT on uncached requests.
:::
:::danger Never Use response.text() or response.json() for Streaming
These methods buffer the entire response body before returning - using them with a streaming endpoint delivers the complete response after the stream ends, exactly like non-streaming behavior. The entire point of streaming is defeated. Always consume streaming responses with response.body.getReader(). If your stream appears to "arrive all at once" after a long wait, the first thing to check is whether you accidentally called await response.text() or await response.json() anywhere in your fetch handler.
:::
Common Production Issues and Fixes
| Issue | Symptom | Root Cause | Fix |
|---|---|---|---|
| Nginx buffering | Full response arrives all at once | proxy_buffering on (default) | proxy_buffering off or X-Accel-Buffering: no header |
| ALB timeout | Connection drops mid-stream | 60s default idle timeout | Set ALB idle timeout to 300s+ for streaming endpoints |
| CORS failure | Streaming works locally, fails in browser | Missing CORS headers on streaming response | Add CORS middleware to FastAPI - applies to all responses |
| Gzip breaking streaming | Full response arrives compressed at end | Response compression middleware buffering | Disable gzip for text/event-stream content type |
| Proxy keepalive timeout | Connection killed during model pause | Proxy idle timeout shorter than model pause | Add SSE heartbeat comments every 15s |
| React re-render storm | CPU spike, UI lag during streaming | setState on every single token | Use ref for accumulation, setState less frequently |
| Markdown flash | Text flickers between raw and rendered | Re-rendering full Markdown on every token | Split on paragraph boundaries, render only complete paragraphs |
| Mobile battery drain | High CPU on mobile during long streams | Too many DOM updates per second | Buffer tokens, batch updates at 60fps with requestAnimationFrame |
| Stop button race condition | Partial content lost when stopping | AbortError handling discards content | On AbortError, keep partial content, mark complete not error |
Interview Q&A
Q1: Explain Server-Sent Events and how they enable streaming AI interfaces. When would you choose WebSockets instead?
SSE is a standard HTTP protocol where the server keeps an HTTP connection open and pushes data to the client as text events. The Content-Type is text/event-stream. Each event is formatted as data: {payload}\n\n - a plain text format that can be debugged with curl. The browser's built-in EventSource API supports SSE natively, but for production AI chat you typically use the Fetch API with response.body.getReader() because it supports POST requests, custom auth headers, and the AbortController for cancellation - none of which are available on EventSource. SSE is the right choice for AI chat because it is unidirectional (the server pushes tokens, the client submits messages via separate POST requests), HTTP-native (works through standard proxies and load balancers without special configuration), and simpler than WebSockets. Choose WebSockets when you need bidirectional real-time communication - collaborative editing, real-time multiplayer, or voice - where both sides push events simultaneously. For AI response streaming, SSE is almost always sufficient and significantly simpler to operate.
Q2: What is TTFT, how do you measure it, and what are the highest-impact ways to reduce it?
TTFT (Time-to-First-Token) is the latency from the moment the user submits a request to the moment the first character of the response appears on screen. It is the primary user-perceived latency metric for streaming AI interfaces, because once the first token appears, users are reading and their perception of wait shifts from "waiting" to "processing." Measure it on the backend by recording timestamps at request receipt and at first content_block_delta event, then emitting the delta in the stream_end event for client-side logging.
To reduce TTFT in order of impact: First, enable Anthropic prompt caching for system prompts over 1K tokens - this skips re-processing the cached portion, which can cut TTFT by 50-70% on repeat requests. Second, parallelize pre-LLM steps with asyncio.gather() - RAG retrieval, auth validation, and context assembly should all run concurrently. Third, use claude-haiku-4-5-20251001 for queries that don't require Opus-level reasoning - Haiku is 2-3x faster at TTFT. Fourth, keep warm HTTP/2 connections to the Anthropic API through a persistent async client. Fifth, minimize system prompt token count - every 1K uncached tokens adds 50-100ms.
Q3: How do you handle errors that occur mid-stream, after the HTTP 200 status code has already been sent?
This is one of the trickier aspects of streaming: once you've sent the HTTP 200 status and begun streaming, you can no longer change the status code. Mid-stream errors must be communicated in-band as a structured error event in the SSE stream. The pattern is: emit data: {"type": "error", "code": 429, "message": "...", "retryable": true}\n\n followed by data: [DONE]\n\n. The client reads the error event, shows the error message inline after any partial content already rendered, and offers a retry option. Do not discard partial content already shown to the user - show the error below it. The client-side reader loop should handle three event types explicitly: text (append to content), stream_end (finalize), and error (show error, enable retry). Always emit [DONE] even after an error so the client can cleanly close the reader.
Q4: How do you implement the "Stop Generating" button correctly?
Frontend: maintain an AbortController reference and pass its signal to the fetch() call. When the user clicks stop, call abortController.abort(). This causes the reader.read() call in the SSE reader loop to reject with an AbortError. Handle AbortError specifically and differently from other errors: keep the partial content already streamed (do not discard it), mark the message status as complete (not error), and transition the UI out of streaming state. The key design decision is that stopping is a user choice, not a failure - the partial response is valid and useful content. The backend: when the client disconnects, FastAPI's StreamingResponse generator will raise a GeneratorExit or asyncio.CancelledError on the next yield. The async with client.messages.stream() context manager exits automatically, cancelling the upstream API call. No explicit cleanup is required - the context manager handles it.
Q5: How do you render streaming Markdown correctly without visual glitches?
The problem: Markdown syntax is incomplete mid-stream. **bold** won't render correctly if the stream stops after **bo. A code block opened with ``` won't render until the closing ``` arrives. Naive full-document re-rendering on every token causes constant flickering. The production solution: split the accumulated content on paragraph boundaries (\n\n). All complete paragraphs (everything before the last \n\n) can be safely rendered through a Markdown parser, because their syntax is closed. The last in-progress paragraph is shown as plain text with a cursor. When the stream ends, re-render everything as Markdown. This approach handles code blocks by detecting open fence counts - if the number of ``` occurrences is odd, a code block is open, and you defer rendering to the last closed fence boundary. For performance, useMemo the rendered complete-paragraphs output so React doesn't re-render unchanged Markdown nodes on every new token - only the in-progress plain text re-renders on each token.
Q6: What infrastructure issues most commonly break streaming in production deployments?
In order of frequency: (1) Nginx proxy buffering - most common by far. Default Nginx configuration buffers the entire response body. Fix: proxy_buffering off; in the Nginx location block, or X-Accel-Buffering: no in the response header. (2) Load balancer idle timeout - AWS ALB defaults to 60 seconds; a long generation crosses this and the connection is killed. Fix: set idle timeout to 300s for the streaming endpoint and send SSE heartbeat comments every 15 seconds. (3) Missing or incorrect CORS headers - the streaming response must have proper CORS headers if the frontend is on a different origin. CORS middleware in FastAPI applies to all responses including streaming. (4) Response compression middleware buffering the stream - gzip/brotli middleware accumulates the response before compressing it. Fix: disable compression for text/event-stream responses. (5) Application-layer response logging middleware that calls await response.body() to log request/response pairs - this buffers the full stream. Fix: skip body logging for SSE endpoints.
Q7: How would you architect a streaming chat system that needs to support 10,000 concurrent users?
Key architectural decisions: First, streaming connections are long-lived - a 15-second response means 15 seconds of open connection. At 10K concurrent users, you need at least 10K concurrent open connections. This drives the choice of async frameworks (FastAPI, not Flask; Gunicorn with uvicorn workers, not sync workers) and connection limits (set ulimit -n 65535 on Linux). Second, horizontal scaling: each backend instance handles N concurrent streams (limited by CPU and network); scale instances behind a load balancer. Third, the load balancer must support long-lived connections - ALB or Nginx with properly configured idle timeouts. Fourth, use prompt caching aggressively: at 10K users hitting similar system prompts, cached prompts reduce both cost and TTFT. Fifth, route short queries (intent classification, simple Q&A) to claude-haiku-4-5-20251001 and longer reasoning to claude-opus-4-6 - mix of models reduces per-request cost by 5-10x. Sixth, implement queue-based load shedding: when Anthropic API rate limits are hit, queue requests with estimated wait time shown in the UI rather than returning hard errors.
