:::tip 🎮 Interactive Playground Visualize this concept: Try the Context Compression demo on the EngineersOfAI Playground - no code required. :::
Context Management at Scale
The 3 AM Pager
It is 3:12 AM. Your phone lights up. The on-call alert reads: [P0] Customer Support Bot - 500 errors spiking. You open the runbook, SSH into the app server, and scroll through the logs. Every error is the same:
anthropic.BadRequestError: 400 {"type":"error","error":{"type":"invalid_request_error",
"message":"prompt is too long: 210,432 tokens > 200,000 token limit"}}
Your chatbot has been deployed for two weeks. Customers love it. Some of them love it so much they have been chatting for hours - exploring edge cases, asking follow-ups, iterating on documents together. Nobody thought about what happens when a conversation grows beyond the model's context window. Nobody implemented a limit. Nobody implemented a strategy.
You now have 847 active sessions where users are mid-conversation. If you reset their context, they lose everything. If you do nothing, every new message they send will throw a 500. You have to fix this live, in production, without destroying user sessions.
This is the moment that defines production AI engineers from demo builders. The demo never runs out of context. Production always does - eventually. The difference is whether you planned for it before 3 AM or not.
Why Context Management Is a First-Class Engineering Problem
Language models are stateless. Every API call is completely independent. The model has no memory of previous turns unless you explicitly include them in the current prompt. This means building a multi-turn conversation requires sending the entire conversation history on every single request.
Early chatbot demos ignored this. The conversation history grew indefinitely. For a 2-minute demo, this works fine - maybe 10 turns, a few thousand tokens. But in production with real users who engage for an hour? You hit the context limit fast. And when you hit it, without a plan, you get P0 alerts at 3 AM.
The naive fix is to truncate: just drop the oldest messages when you get too long. This works until a customer's context-critical information from 20 messages ago gets dropped and the bot starts contradicting itself. The bot answers a question about the user's order number differently because it no longer remembers the order number the user provided earlier. That is a trust-destroying moment that no apologetic response can fully recover.
The right answer is a family of techniques - each with different tradeoffs in fidelity, latency, and cost - that together form a context management strategy. Every production LLM application needs one before launch, not after the first 3 AM incident.
The Context Window Mental Model
Think of the context window as a fixed-size RAM buffer. The model can only "see" what is in that buffer during a single forward pass. Unlike a database (which can query arbitrary history), the context window is the model's entire working memory for that call.
Every component competes for the same finite space. The system prompt is fixed. The current user message is fixed. RAG documents are variable but controllable. The conversation history is the component that grows without bound - and it is the one that causes 3 AM pages.
Understanding this competition for space is the foundation of all context management strategy. Every technique in this lesson is a way to control the history and RAG components so they do not crowd out everything else.
Technique 1: Sliding Window
The simplest strategy. Keep only the most recent N messages. When the conversation exceeds N messages, discard the oldest ones. Simple, predictable, and sufficient for most customer support use cases where recent context matters most.
from anthropic import Anthropic
from typing import Optional
client = Anthropic()
class SlidingWindowConversation:
"""
Maintains a conversation with a fixed-size sliding window.
Older messages are discarded when the window fills up.
Best for: customer support, FAQ bots, short-session assistants.
Failure mode: amnesia - early-conversation facts are lost.
"""
def __init__(
self,
max_messages: int = 20,
system_prompt: str = "You are a helpful assistant.",
model: str = "claude-opus-4-6",
):
self.max_messages = max_messages
self.system_prompt = system_prompt
self.model = model
self.messages: list[dict] = []
self.total_messages_ever: int = 0 # Track how much was dropped
def add_user_message(self, content: str) -> None:
self.messages.append({"role": "user", "content": content})
self.total_messages_ever += 1
self._trim_window()
def add_assistant_message(self, content: str) -> None:
self.messages.append({"role": "assistant", "content": content})
def _trim_window(self) -> None:
"""
Remove oldest messages to stay within the window.
Always remove in pairs (user + assistant) to maintain role alternation.
The Anthropic API requires alternating user/assistant roles.
Removing just one message would create consecutive same-role messages
and trigger a 400 error.
"""
while len(self.messages) > self.max_messages:
if len(self.messages) >= 2:
# Remove oldest user message
self.messages.pop(0)
# Remove its paired assistant response (if present and correct role)
if self.messages and self.messages[0]["role"] == "assistant":
self.messages.pop(0)
else:
self.messages.pop(0)
def chat(self, user_message: str) -> str:
self.add_user_message(user_message)
response = client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system_prompt,
messages=self.messages,
)
assistant_reply = response.content[0].text
self.add_assistant_message(assistant_reply)
# Track token usage for monitoring
self._log_usage(response.usage)
return assistant_reply
def _log_usage(self, usage) -> None:
"""Track token usage for capacity planning."""
dropped_messages = self.total_messages_ever - len(self.messages)
if dropped_messages > 0:
print(
f"[SlidingWindow] Input tokens: {usage.input_tokens} | "
f"Dropped messages: {dropped_messages} | "
f"Active window: {len(self.messages)}"
)
@property
def message_count(self) -> int:
return len(self.messages)
def get_stats(self) -> dict:
return {
"active_messages": len(self.messages),
"total_ever": self.total_messages_ever,
"dropped": self.total_messages_ever - len(self.messages),
"window_utilization": len(self.messages) / self.max_messages,
}
The sliding window has a critical failure mode: amnesia. If a user says their name in message 1, and you are now at message 25, the model no longer knows their name. For many applications this is acceptable. For others it is a product-breaking bug. Before choosing sliding window, ask: does any information from the beginning of the conversation matter at the end?
:::warning Role Alternation Requirement
The Anthropic API requires strictly alternating user/assistant roles. If you slice a message list in the middle of a pair, you will get a consecutive messages from same role error. Always remove messages in pairs (user + following assistant). This is one of the most common bugs in sliding window implementations.
:::
Technique 2: Summarization Compression
Instead of discarding old messages, compress them into a running summary. When the context grows too large, summarize the oldest N messages into a compact representation, then discard the originals. The summary stays in context; the raw messages are gone. Key facts survive the compression; verbatim text does not.
import anthropic
from typing import Optional
client = anthropic.Anthropic()
def count_tokens_estimate(messages: list[dict]) -> int:
"""
Estimate token count for a message list.
Rule of thumb: ~4 characters per token for English text.
Use actual API-returned token counts for monitoring - estimates for pre-call checks.
"""
total_chars = sum(len(str(m.get("content", ""))) for m in messages)
return total_chars // 4
def summarize_messages(
messages: list[dict],
existing_summary: Optional[str] = None,
model: str = "claude-haiku-4-5-20251001", # Use cheap model for compression
) -> str:
"""
Ask the model to compress a list of messages into a compact summary.
Uses Haiku (not Opus) to minimize cost of the compression call itself.
Args:
messages: Raw conversation messages to compress
existing_summary: Any prior summary to incorporate
model: Model to use for summarization (use a cheap, fast model)
Returns:
Compact summary preserving key facts and decisions
"""
formatted = "\n".join(
f"{m['role'].upper()}: {m['content']}" for m in messages
)
prefix = ""
if existing_summary:
prefix = f"Previous conversation summary:\n{existing_summary}\n\nNew turns to incorporate:\n"
summary_prompt = f"""{prefix}Compress the following conversation excerpt into a compact summary.
Preserve exactly:
- Key facts (names, numbers, account IDs, decisions)
- The user's stated goals and current situation
- Any commitments or action items made
- Context that would affect future responses
Omit:
- Pleasantries and filler
- Repeated information
- Details that don't affect future responses
Conversation:
{formatted}
Write a compact summary (3-5 sentences) starting with "Previously in this conversation:"""
response = client.messages.create(
model=model,
max_tokens=400,
messages=[{"role": "user", "content": summary_prompt}],
)
return response.content[0].text
class SummarizingConversation:
"""
Conversation manager that compresses old messages into summaries
rather than discarding them. Preserves facts while controlling token usage.
Best for: long-form assistants, document analysis sessions,
anywhere early-conversation facts matter at turn 50.
Tradeoff: compression adds ~500ms latency when triggered.
"""
def __init__(
self,
token_budget: int = 80_000, # Trigger compression at this threshold
summarize_oldest_n: int = 10, # Compress this many messages at once
system_prompt: str = "You are a helpful assistant.",
model: str = "claude-opus-4-6",
compression_model: str = "claude-haiku-4-5-20251001", # Cheaper model for summaries
):
self.token_budget = token_budget
self.summarize_oldest_n = summarize_oldest_n
self.system_prompt = system_prompt
self.model = model
self.compression_model = compression_model
self.messages: list[dict] = []
self.summary: Optional[str] = None
self.compression_count: int = 0
def _build_context(self) -> list[dict]:
"""
Build the actual messages list sent to the API.
Prepends the running summary as a synthetic exchange if it exists.
The synthetic user/assistant pair ensures role alternation is maintained.
"""
if self.summary:
summary_msg = {
"role": "user",
"content": (
f"[CONTEXT FROM EARLIER IN THIS CONVERSATION]\n{self.summary}"
),
}
summary_ack = {
"role": "assistant",
"content": "Understood. I have that context from our earlier conversation.",
}
return [summary_msg, summary_ack] + self.messages
return self.messages
def _maybe_compress(self) -> None:
"""
Check token usage and compress if approaching the budget.
Triggers at 70-80% to leave headroom for the compression call itself.
Never compress when you're already at 100% - there won't be room
to run the compression call.
"""
context = self._build_context()
estimated_tokens = count_tokens_estimate(context)
if (
estimated_tokens > self.token_budget
and len(self.messages) > self.summarize_oldest_n
):
# Take the oldest N messages for summarization
to_compress = self.messages[: self.summarize_oldest_n]
self.messages = self.messages[self.summarize_oldest_n :]
# Generate a new summary incorporating the previous one
new_summary = summarize_messages(
to_compress,
existing_summary=self.summary,
model=self.compression_model,
)
self.summary = new_summary
self.compression_count += 1
print(
f"[Compression #{self.compression_count}] "
f"Compressed {self.summarize_oldest_n} messages. "
f"Summary length: {len(new_summary)} chars."
)
def chat(self, user_message: str) -> str:
self.messages.append({"role": "user", "content": user_message})
self._maybe_compress()
response = client.messages.create(
model=self.model,
max_tokens=1024,
system=self.system_prompt,
messages=self._build_context(),
)
reply = response.content[0].text
self.messages.append({"role": "assistant", "content": reply})
return reply
:::tip Compression Timing Compress proactively, not reactively. Trigger compression at 70-80% of the context limit, not at 99%. Compression itself uses tokens and time - you do not want that latency added to a user-facing response that is already slow because the context is enormous. Set your trigger conservatively. :::
:::tip Use Haiku for Summarization
The compression call itself costs money and adds latency. Use claude-haiku-4-5-20251001 for summarization, not Opus or Sonnet. The quality difference for a "summarize these 10 messages" task is negligible, but the cost difference is significant. Save expensive models for tasks where quality is critical.
:::
Technique 3: Hierarchical Context
Some applications have a natural two-level structure: a long-lived "project" context and a short-lived "session" context. A coding assistant knows about your entire codebase (project) and the current file you are editing (session). A customer service bot knows the customer's full account history (project) and the current issue they called about (session).
Hierarchical context allocates the token budget deliberately across these levels:
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ContextTier:
name: str
max_tokens: int
priority: int = 0 # lower number = higher priority (preserved last)
content: str = ""
@property
def estimated_tokens(self) -> int:
return len(self.content) // 4
@property
def utilization(self) -> float:
return self.estimated_tokens / max(self.max_tokens, 1)
class HierarchicalContextManager:
"""
Manages multiple tiers of context with explicit token budgets.
Higher-priority tiers are preserved when the total budget is exceeded.
Use this for applications with natural context layers:
- Coding assistants: project overview + relevant files + current file
- Customer service: account history + current session
- Research assistants: paper corpus + current paper + query context
"""
TOTAL_CONTEXT_LIMIT = 180_000 # Leave 20K headroom for response
def __init__(self):
self.tiers: list[ContextTier] = []
def add_tier(self, tier: ContextTier) -> None:
self.tiers.append(tier)
self.tiers.sort(key=lambda t: t.priority)
def set_tier_content(self, name: str, content: str) -> None:
for tier in self.tiers:
if tier.name == name:
# Hard truncate to the tier's max_tokens budget
max_chars = tier.max_tokens * 4
if len(content) > max_chars:
tier.content = (
content[:max_chars]
+ f"\n\n[... truncated at {tier.max_tokens} token budget ...]"
)
else:
tier.content = content
return
raise ValueError(f"Tier '{name}' not found. Add it first with add_tier().")
def build_system_prompt(self) -> str:
"""
Assemble all tiers into a single system prompt, respecting budgets.
Tiers with no content are skipped.
"""
sections = []
for tier in self.tiers:
if tier.content:
header = f"## {tier.name.upper().replace('_', ' ')}"
sections.append(f"{header}\n{tier.content}")
return "\n\n---\n\n".join(sections)
def get_budget_report(self) -> dict[str, Any]:
"""Show how each tier is using its allocation."""
report = {}
total_used = 0
for tier in self.tiers:
used = tier.estimated_tokens
total_used += used
report[tier.name] = {
"max_tokens": tier.max_tokens,
"estimated_used": used,
"utilization_pct": round(tier.utilization * 100, 1),
"within_budget": used <= tier.max_tokens,
}
report["_total"] = {
"estimated_used": total_used,
"limit": self.TOTAL_CONTEXT_LIMIT,
"utilization_pct": round(total_used / self.TOTAL_CONTEXT_LIMIT * 100, 1),
}
return report
def is_over_budget(self) -> bool:
total = sum(tier.estimated_tokens for tier in self.tiers)
return total > self.TOTAL_CONTEXT_LIMIT
# Example: coding assistant with hierarchical context
def build_coding_assistant_context(
project_overview: str,
relevant_files: dict[str, str], # filename -> content
current_file_content: str,
current_file_path: str,
) -> HierarchicalContextManager:
"""Build a hierarchical context for a code assistant."""
manager = HierarchicalContextManager()
manager.add_tier(ContextTier(
name="project_overview",
max_tokens=5_000,
priority=0, # Never drop - too important
))
manager.add_tier(ContextTier(
name="relevant_files",
max_tokens=40_000,
priority=1,
))
manager.add_tier(ContextTier(
name="current_file",
max_tokens=20_000,
priority=2,
))
manager.set_tier_content("project_overview", project_overview)
manager.set_tier_content(
"relevant_files",
"\n\n".join(f"# {name}\n```\n{content}\n```" for name, content in relevant_files.items())
)
manager.set_tier_content(
"current_file",
f"# {current_file_path}\n```\n{current_file_content}\n```"
)
return manager
Technique 4: Token Budget Enforcement
Track tokens per request using the API's usage field, not estimates. Build a token budget enforcer that prevents any single component from consuming too much of the context window.
import anthropic
from anthropic.types import Usage
from typing import Optional
client = anthropic.Anthropic()
class TokenBudgetEnforcer:
"""
Tracks real token usage from API responses and enforces per-component budgets.
Uses actual API-returned counts (not estimates) for monitoring.
The budgets ensure no single component can crowd out others.
When a component exceeds its budget, it is hard-truncated before the call.
"""
def __init__(
self,
total_budget: int = 150_000,
system_budget: int = 5_000,
history_budget: int = 80_000,
rag_budget: int = 40_000,
response_budget: int = 8_096,
):
self.budgets = {
"system": system_budget,
"history": history_budget,
"rag": rag_budget,
"response": response_budget,
}
self.total_budget = total_budget
self.usage_history: list[Usage] = []
self._component_overflows: dict[str, int] = {}
def record_usage(self, usage: Usage) -> None:
"""Record API-returned usage. Use this for real monitoring."""
self.usage_history.append(usage)
def get_usage_stats(self) -> dict:
"""Compute statistics from actual API-returned token counts."""
if not self.usage_history:
return {}
input_counts = [u.input_tokens for u in self.usage_history]
output_counts = [u.output_tokens for u in self.usage_history]
return {
"samples": len(self.usage_history),
"avg_input_tokens": sum(input_counts) / len(input_counts),
"p95_input_tokens": sorted(input_counts)[int(len(input_counts) * 0.95)],
"avg_output_tokens": sum(output_counts) / len(output_counts),
"max_input_tokens": max(input_counts),
"total_input_tokens": sum(input_counts),
"total_output_tokens": sum(output_counts),
}
def truncate_to_budget(self, text: str, budget_name: str) -> tuple[str, bool]:
"""
Truncate text to fit within a named budget.
Returns (truncated_text, was_truncated).
"""
max_tokens = self.budgets[budget_name]
max_chars = max_tokens * 4 # rough estimate
if len(text) > max_chars:
self._component_overflows[budget_name] = (
self._component_overflows.get(budget_name, 0) + 1
)
truncated = text[:max_chars]
return truncated + f"\n\n[... truncated to fit {budget_name} budget ({max_tokens} tokens) ...]", True
return text, False
def truncate_messages_to_budget(
self,
messages: list[dict],
budget_name: str = "history",
) -> list[dict]:
"""
Truncate a message list from the oldest end to fit within token budget.
Preserves message pairs (user/assistant) for API compatibility.
Always keeps the most recent messages (they have the most context relevance).
"""
max_tokens = self.budgets[budget_name]
max_chars = max_tokens * 4
total_chars = sum(len(str(m.get("content", ""))) for m in messages)
# Remove oldest pairs until within budget
while total_chars > max_chars and len(messages) >= 2:
# Always remove from oldest end, in pairs
removed = messages.pop(0)
total_chars -= len(str(removed.get("content", "")))
if messages and messages[0]["role"] == "assistant":
removed = messages.pop(0)
total_chars -= len(str(removed.get("content", "")))
return messages
def enforce_all_budgets(
self,
system: str,
messages: list[dict],
rag_context: Optional[str] = None,
) -> tuple[str, list[dict], Optional[str]]:
"""
Apply all budget constraints and return trimmed components.
Drop priority order: RAG first, then history, then system (never dropped).
"""
system_out, _ = self.truncate_to_budget(system, "system")
messages_out = self.truncate_messages_to_budget(list(messages), "history")
rag_out = None
if rag_context:
rag_out, _ = self.truncate_to_budget(rag_context, "rag")
return system_out, messages_out, rag_out
Technique 5: KV Cache Awareness
Modern LLM APIs (including Anthropic's) use prompt caching. When you send the same prefix repeatedly, the model reuses its computed key-value cache for that prefix rather than recomputing it. This can reduce latency by 80%+ and cut costs by 90% on the cached portion.
import anthropic
client = anthropic.Anthropic()
# WRONG: Cache-unfriendly pattern
# The system prompt changes every request → no cache hits
def chat_no_cache(user_message: str, document: str, conversation: list[dict]) -> str:
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system=f"You are a helpful assistant. Document: {document}", # Changes position!
messages=conversation + [{"role": "user", "content": user_message}],
)
return response.content[0].text
# RIGHT: Cache-friendly pattern
# Stable prefix → high cache hit rate
SYSTEM_PROMPT = "You are a helpful assistant that answers questions about documents."
def load_document_into_context(document_text: str) -> list[dict]:
"""
Create a synthetic exchange that 'loads' the document into the
conversation at a stable position - maximizing cache hits.
The document goes into the conversation body (not the system prompt)
so we can use cache_control on it explicitly.
"""
return [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Please read and remember this document for our conversation:",
},
{
"type": "text",
"text": document_text,
"cache_control": {"type": "ephemeral"}, # Mark for caching
},
],
},
{
"role": "assistant",
"content": "I have read and will reference this document in my responses.",
},
]
def chat_with_cache(
user_message: str,
document_prefix: list[dict],
conversation: list[dict],
) -> dict:
"""
Chat with a document using prompt caching.
The document is cached after the first call - subsequent calls pay only 10% of document token cost.
"""
messages = document_prefix + conversation + [
{"role": "user", "content": user_message}
]
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system=SYSTEM_PROMPT,
messages=messages,
)
# Monitor cache performance
usage = response.usage
cache_read = getattr(usage, "cache_read_input_tokens", 0)
cache_created = getattr(usage, "cache_creation_input_tokens", 0)
cache_hit_rate = cache_read / max(usage.input_tokens, 1)
return {
"response": response.content[0].text,
"cache_hit_rate": cache_hit_rate,
"cache_read_tokens": cache_read,
"cache_created_tokens": cache_created,
"total_input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
}
:::note KV Cache Stability The cache is keyed on the exact bytes of the prefix. Even a single character change busts the cache. Your system prompt must be 100% stable across requests. No dynamic timestamps. No per-request IDs. No user-specific interpolation. Put all dynamic content in the messages array, after the stable cached prefix. :::
| Pattern | Cache Behavior | Impact |
|---|---|---|
| Static system prompt | Cache hit every call | 90% cost reduction |
| System prompt with timestamp | Cache miss every call | No benefit |
| Document in messages body with cache_control | Cache hit after first call | 90% on document tokens |
| RAG documents (different each call) | Cache miss each call | No benefit |
| Conversation history (grows) | Partial hit (prefix matches) | Moderate benefit |
Technique 6: Multi-Turn State Storage
For long-lived applications (days or weeks), you cannot keep conversation history in memory. It must be persisted to a database and loaded on demand. In-memory storage means losing all context on process restart, pod rotation, or load balancer re-routing.
import json
import uuid
from datetime import datetime
from typing import Optional
import asyncpg # PostgreSQL async driver
class ConversationStore:
"""
Persists conversation history to PostgreSQL.
Designed for multi-user, multi-session production deployments.
Key design decisions:
- Token counts stored with each message (use API-returned values)
- Load with token budget to avoid fetching too much
- Index on (session_id, created_at DESC) for fast recent-message queries
- Session stats support monitoring and compression decisions
"""
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS conversation_messages (
id BIGSERIAL PRIMARY KEY,
session_id UUID NOT NULL,
user_id TEXT NOT NULL,
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
content TEXT NOT NULL,
token_count INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_session_created
ON conversation_messages (session_id, created_at DESC);
CREATE TABLE IF NOT EXISTS conversation_summaries (
session_id UUID PRIMARY KEY,
summary TEXT NOT NULL,
messages_compressed INTEGER NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
"""
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def create_session(self, user_id: str) -> str:
"""Create a new conversation session and return its ID."""
session_id = str(uuid.uuid4())
# Could store session metadata here (user_id, created_at, etc.)
return session_id
async def append_message(
self,
session_id: str,
user_id: str,
role: str,
content: str,
token_count: Optional[int] = None,
) -> None:
"""Append a message to the session. Called after each API response."""
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO conversation_messages
(session_id, user_id, role, content, token_count)
VALUES ($1, $2, $3, $4, $5)
""",
uuid.UUID(session_id), user_id, role, content, token_count,
)
async def load_recent_messages(
self,
session_id: str,
limit: int = 40,
max_tokens: int = 80_000,
) -> list[dict]:
"""
Load the most recent messages, respecting a token budget.
Returns messages in chronological order (oldest first in the result).
Fetches more than needed (limit) then filters by token budget.
This ensures we fill the budget as completely as possible.
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT role, content, token_count, created_at
FROM conversation_messages
WHERE session_id = $1
ORDER BY created_at DESC
LIMIT $2
""",
uuid.UUID(session_id), limit,
)
# Build message list within token budget, working backwards from most recent
messages = []
total_tokens = 0
for row in rows: # Already DESC order (newest first)
estimated = row["token_count"] or len(row["content"]) // 4
if total_tokens + estimated > max_tokens:
break
messages.append({"role": row["role"], "content": row["content"]})
total_tokens += estimated
# Reverse to chronological order for the API
messages.reverse()
return messages
async def load_session_summary(self, session_id: str) -> Optional[str]:
"""Load the compression summary for a session, if it exists."""
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT summary FROM conversation_summaries WHERE session_id = $1",
uuid.UUID(session_id),
)
return row["summary"] if row else None
async def save_session_summary(
self,
session_id: str,
summary: str,
messages_compressed: int,
) -> None:
"""Upsert the compression summary for a session."""
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO conversation_summaries (session_id, summary, messages_compressed)
VALUES ($1, $2, $3)
ON CONFLICT (session_id) DO UPDATE
SET summary = $2,
messages_compressed = $3,
updated_at = NOW()
""",
uuid.UUID(session_id), summary, messages_compressed,
)
async def get_session_stats(self, session_id: str) -> dict:
"""Get stats for monitoring and capacity planning."""
async with self.pool.acquire() as conn:
result = await conn.fetchrow(
"""
SELECT
COUNT(*) as message_count,
COALESCE(SUM(token_count), 0) as total_tokens,
MIN(created_at) as started_at,
MAX(created_at) as last_message_at
FROM conversation_messages
WHERE session_id = $1
""",
uuid.UUID(session_id),
)
return dict(result)
Context Overflow Handling: Graceful Degradation
When context overflow is detected (before it causes an API error), you need a graceful degradation strategy. The goal is to always return a response - even a degraded one - rather than failing with a 500.
import anthropic
from anthropic import BadRequestError
from typing import Any
client = anthropic.Anthropic()
class ContextOverflowHandler:
"""
Wraps an LLM call with automatic context overflow recovery.
Tries progressively more aggressive compression strategies.
This is your last line of defense - before this should come proper
context management that prevents overflow proactively.
Strategy order (least to most aggressive):
1. Trim RAG context (re-retrievable)
2. Remove 50% of history (keep recent)
3. Remove 75% of history (keep only recent)
4. Emergency: keep only current message
"""
def __init__(self, model: str = "claude-opus-4-6"):
self.model = model
self.overflow_count = 0
def _strategy_trim_rag(self, kwargs: dict) -> dict:
"""Remove RAG context first - it can be re-retrieved."""
messages = list(kwargs.get("messages", []))
# Remove any messages tagged as RAG context injections
filtered = [m for m in messages if not m.get("_is_rag_context")]
if len(filtered) < len(messages):
print(f"[Overflow] Strategy 1: Removed {len(messages) - len(filtered)} RAG messages")
return {**kwargs, "messages": filtered}
def _strategy_trim_history_50pct(self, kwargs: dict) -> dict:
"""Remove the oldest 50% of conversation history."""
messages = list(kwargs.get("messages", []))
trimmed = messages[len(messages) // 2:]
print(f"[Overflow] Strategy 2: Kept last {len(trimmed)}/{len(messages)} messages")
return {**kwargs, "messages": trimmed}
def _strategy_trim_history_75pct(self, kwargs: dict) -> dict:
"""Keep only the last 25% of conversation history."""
messages = list(kwargs.get("messages", []))
keep = max(2, len(messages) // 4)
print(f"[Overflow] Strategy 3: Emergency trim to last {keep} messages")
return {**kwargs, "messages": messages[-keep:]}
def _strategy_emergency_reset(self, kwargs: dict) -> dict:
"""Last resort: keep only the current user message."""
messages = list(kwargs.get("messages", []))
last_user = next(
(m for m in reversed(messages) if m["role"] == "user"), None
)
if last_user:
print("[Overflow] Strategy 4: EMERGENCY - keeping only current message")
return {**kwargs, "messages": [last_user]}
return kwargs
def call_with_overflow_protection(self, **kwargs) -> Any:
"""
Attempt the LLM call. On context overflow (400), apply
increasingly aggressive trimming strategies and retry.
This should be used as a safety net. If you're hitting overflow
regularly, fix your context management strategy upstream.
"""
strategies = [
self._strategy_trim_rag,
self._strategy_trim_history_50pct,
self._strategy_trim_history_75pct,
self._strategy_emergency_reset,
]
# Try the call once before applying any strategy
for i, strategy in enumerate(strategies):
try:
return client.messages.create(**kwargs)
except BadRequestError as e:
error_str = str(e).lower()
if "too long" in error_str or "prompt" in error_str:
self.overflow_count += 1
print(f"[Overflow #{self.overflow_count}] Applying strategy: {strategy.__name__}")
kwargs = strategy(kwargs)
continue
raise # Re-raise non-overflow errors
# Final attempt after all strategies applied
return client.messages.create(**kwargs)
Architecture: The Full Context Pipeline
Production Engineering Notes
Measure before optimizing. Add token counting to every LLM response. Log the input token count, output token count, and cache hit/miss status. You cannot manage what you do not measure. Build a dashboard showing context window utilization per session over time before you build any management strategy.
Separate RAG context from conversation history. RAG documents should be injected fresh each turn (not stored in conversation history) because the relevant documents change. Store only the user/assistant turns in the conversation store.
The system prompt is sacred. Never put dynamic content in your system prompt. Every byte of the system prompt that changes busts the KV cache. Put dynamic content in the first user message instead.
Token counting libraries lie. Different tokenizers produce different counts. The only accurate count is the one the API returns in the usage field of the response. Use API-returned counts for monitoring; use estimates only for pre-call budget checks.
Compression adds latency. When you trigger summarization mid-conversation, you are adding a full extra LLM call (500-1000ms) before the actual response. Do this asynchronously in the background, not synchronously in the request path, whenever possible.
:::danger Never Silently Drop Context If you truncate context without telling the user, the model may confidently give wrong answers (it no longer has the facts). Always inject a note: "Note: Earlier parts of this conversation were compressed for length." This sets the right expectation and prevents trust-destroying contradictions. :::
:::warning Rolling Windows Break Role Alternation
The Anthropic API requires alternating user/assistant roles. If you slice a message list in the middle of a pair, you will get a consecutive messages from same role error. Always remove messages in pairs (user + following assistant).
:::
:::danger Infinite Context Growth in Long-Running Agents
Never allow an agent or automation to run without a context limit. A loop that calls an LLM 100 times, accumulating history each time, will eventually fail with a 400 error. Set an explicit max_iterations or max_tokens_budget at agent initialization.
:::
:::danger Caching Dynamic Content Putting timestamps, request IDs, or user-specific data in the system prompt is one of the most common performance mistakes. It destroys cache hits entirely. Put all dynamic content in the messages array, after the stable cached prefix. :::
Technique Comparison
| Technique | Fidelity | Latency Impact | Cost Impact | Best For |
|---|---|---|---|---|
| Sliding Window | Low (amnesia) | None | None | Short sessions, FAQ bots |
| Summarization | Medium | +500-1000ms when triggered | +cost of summary call | Long sessions with important early facts |
| Hierarchical | High | None | None | Multi-tier structured applications |
| Token Budget Enforcement | High | None | None | All applications (always use this) |
| KV Cache | High | -60-80% TTFT | -90% on cached tokens | Any repeated system prompt or document |
| DB Storage | High | +DB latency (~5ms) | +storage cost | Multi-session, stateless servers |
Interview Q&A
Q1: A customer says your chatbot "forgot" what they told it 10 messages ago. What went wrong and how do you fix it?
This is a sliding window amnesia bug. The simplest fix is to increase the window size, but the correct fix is to use summarization compression. Instead of discarding old messages, compress them into a running summary using the LLM itself. The summary is injected as a "previously in this conversation" prefix, so critical facts (the user's account number, their stated goal, previous decisions) are preserved even as the raw message list is compressed. For critical long-form data (like an order number the user provided), consider also extracting and storing it in structured form (not just the conversation history) so it can be reliably injected back on each turn regardless of compression. Use a cheap model like Haiku for compression to keep costs low.
Q2: How does KV caching work in LLM APIs and how do you engineer for maximum cache hits?
KV caching works by storing the computed attention key-value pairs for a prompt prefix. When the same prefix is sent again, the model reuses those cached KV states instead of recomputing them, reducing both latency (by 50-80%) and cost (by up to 90% on the cached portion). To maximize cache hits: (1) keep the system prompt 100% static - no timestamps, no per-user interpolation; (2) place stable content (long documents, large tool schemas) at the beginning of the context, before the conversation history; (3) use Anthropic's cache_control header to mark specific blocks for caching; (4) monitor cache hit rates via usage.cache_read_input_tokens in every response; (5) never embed dynamic values (user IDs, request timestamps) anywhere in the cached prefix.
Q3: How do you handle a production system where you need to manage context for 10,000 concurrent conversations?
You need three things: persistent storage, token-aware loading, and compression pipelines. Store all conversation messages in a database (PostgreSQL with a session_id index). On each request, load only the last N messages that fit within a token budget - not the entire history. Run a background compression worker that monitors sessions approaching the context limit and summarizes their oldest messages asynchronously. Never keep conversations in application memory across requests - stateless app servers load from the DB each time. Use Redis to cache recently-loaded conversation windows (10-minute TTL) to avoid hitting the database on rapid multi-turn exchanges. Track context window utilization as a metric and alert when average sessions exceed 70% utilization.
Q4: What is the difference between context length and context window? How do you manage each?
Context length refers to the number of tokens in a specific request. Context window refers to the maximum number of tokens the model can process in a single call. Context window is fixed per model (e.g., 200K for Claude). Context length varies per request and is what you control. You manage context length through the techniques in this lesson: sliding window, summarization, hierarchical allocation, and token budget enforcement. You monitor context window utilization by tracking input_tokens / max_context_window as a ratio and alerting when it exceeds 80%. A context length near the context window limit is a signal that your management strategy is failing for that session.
Q5: An LLM-powered document editor lets users work on 100K-word documents. The document exceeds the context window. How do you architect this?
This requires a chunked and hierarchical approach. The full document is stored in your database. For each user interaction, you retrieve only the relevant sections: (1) the section currently being edited (full content, ~10K tokens); (2) adjacent sections for context (summaries, ~5K tokens); (3) a document-level summary covering structure and key points (~2K tokens). When the user makes edits, you update the database and regenerate the affected section summaries. For queries about the entire document ("find all mentions of X"), you use vector search over pre-chunked and embedded sections rather than putting the whole document in context. Never put more in context than the task actually needs - this is the golden rule.
Q6: How do you implement context budget allocation in a multi-feature AI assistant?
Define a token budget hierarchy and enforce it explicitly. For example, with a 150K total budget: system prompt gets 3K (fixed), conversation history gets 60K, RAG context gets 40K, tool results get 20K, response reservation is 8K, and 19K is kept as overflow headroom. Each component is responsible for truncating itself to its own budget before being handed to the assembler. The assembler builds the final context and does a final token count check. If the total still exceeds the limit, it drops components in priority order: RAG first, older history second, tool results third. The system prompt and current user message are never dropped. This declarative budget system means overflows are handled gracefully and predictably rather than causing 3 AM pages.
