:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Token Cost Monitor demo on the EngineersOfAI Playground - no code required. :::
Cost Optimization Patterns
The $40,000/Month Surprise
Arjun's team had been running their AI-powered code review tool for four months when the finance team scheduled an emergency meeting. The monthly LLM API bill had reached 8,000 three months earlier. Five-fold growth in cost with two-fold growth in users. Something was clearly wrong with the unit economics.
The culprit was not just usage growth. A forensic analysis of their API logs revealed a pattern: 34% of API calls were for identical requests - the same code snippet being reviewed multiple times within the same day. Their CI/CD system triggered code review on every push, and developers pushed frequently. Each review call cost 0.12. Over 10,000 pushes per day, that was $1,200 per day in unnecessary duplicate calls.
They implemented semantic caching: before calling the LLM, hash the code snippet and check if a review for this exact content had been generated in the last 24 hours. The cache hit rate was 34% in the first week. Monthly API spend dropped from 26,400.
Over the next month, they added three more optimizations: model routing (simple files reviewed by Haiku, complex architecture reviews by Sonnet), prompt compression (removing redundant context from their system prompt), and output length limits matched to review complexity. Total monthly bill: $14,800 - down 63% from peak with the same user base and no quality degradation.
Cost optimization is one of the highest-ROI engineering activities in production AI systems. The patterns are straightforward. They are often implemented only after a budget crisis rather than proactively. Do not wait for the emergency meeting.
The LLM Cost Model
Before optimizing costs, understand the structure of what you're paying for:
def estimate_call_cost(
input_tokens: int,
output_tokens: int,
model: str = "claude-haiku-4-5-20251001",
use_batch: bool = False,
cache_read_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> dict:
"""
Estimate cost of a single LLM call with all pricing components.
Pricing components:
1. Input tokens: charged per million tokens
2. Output tokens: charged per million tokens (higher rate than input)
3. Cache creation: slightly higher than input (one-time cost to populate cache)
4. Cache reads: ~90% cheaper than input (major savings opportunity)
5. Batch discount: 50% off all components
Check Anthropic pricing page for current rates - these change.
"""
# Approximate pricing (USD per million tokens) - verify current rates
pricing = {
"claude-haiku-4-5-20251001": {
"input": 0.80, "output": 4.00,
"cache_creation": 1.00, # 25% premium over input
"cache_read": 0.08, # 90% off input
},
"claude-sonnet-4-6": {
"input": 3.00, "output": 15.00,
"cache_creation": 3.75,
"cache_read": 0.30,
},
"claude-opus-4-6": {
"input": 15.00, "output": 75.00,
"cache_creation": 18.75,
"cache_read": 1.50,
},
}
if model not in pricing:
raise ValueError(f"Unknown model: {model}")
p = pricing[model]
batch_multiplier = 0.5 if use_batch else 1.0
# Tokens that are not cached or cache creation
regular_input = max(0, input_tokens - cache_read_tokens - cache_creation_tokens)
cost_regular_input = regular_input * p["input"] / 1_000_000 * batch_multiplier
cost_cache_creation = cache_creation_tokens * p["cache_creation"] / 1_000_000 * batch_multiplier
cost_cache_read = cache_read_tokens * p["cache_read"] / 1_000_000 * batch_multiplier
cost_output = output_tokens * p["output"] / 1_000_000 * batch_multiplier
total = cost_regular_input + cost_cache_creation + cost_cache_read + cost_output
# What the cost would be without caching
cost_without_cache = (
input_tokens * p["input"] / 1_000_000 * batch_multiplier
+ output_tokens * p["output"] / 1_000_000 * batch_multiplier
)
return {
"total_usd": round(total, 6),
"cost_regular_input": round(cost_regular_input, 6),
"cost_cache_creation": round(cost_cache_creation, 6),
"cost_cache_read": round(cost_cache_read, 6),
"cost_output": round(cost_output, 6),
"without_cache_usd": round(cost_without_cache, 6),
"cache_savings_usd": round(cost_without_cache - total, 6),
}
Cost Leverage Points: What to Optimize First
Optimization 1: Semantic Caching
The highest-impact optimization for applications with repeated or similar requests. Cache LLM responses and serve cache hits without calling the API at all:
import hashlib
import json
import time
import anthropic
from dataclasses import dataclass, field
from typing import Optional
client = anthropic.Anthropic()
@dataclass
class CacheEntry:
"""A single cached LLM response."""
request_hash: str
response: str
model: str
input_tokens: int
output_tokens: int
created_at: float
hit_count: int = 0
last_accessed: float = field(default_factory=time.time)
class ExactMatchCache:
"""
Exact-match LLM response cache.
Use this when you know requests will repeat exactly:
- CI/CD code review on unchanged files
- FAQ systems with repeated common questions
- Classification tasks run multiple times
Exact match is faster and more reliable than semantic similarity.
For paraphrase matching, extend to SemanticCache below.
"""
def __init__(
self,
max_entries: int = 50_000,
ttl_seconds: float = 86_400, # 24 hours default
):
self.max_entries = max_entries
self.ttl_seconds = ttl_seconds
self._cache: dict[str, CacheEntry] = {}
self._hits = 0
self._misses = 0
def _request_hash(self, messages: list[dict], system: str, model: str) -> str:
"""Generate a deterministic hash for a request."""
payload = json.dumps(
{"messages": messages, "system": system, "model": model},
sort_keys=True,
ensure_ascii=True,
)
return hashlib.sha256(payload.encode()).hexdigest()
def get(
self,
messages: list[dict],
system: str,
model: str,
) -> Optional[CacheEntry]:
"""Look up a cached response. Returns None if not found or expired."""
key = self._request_hash(messages, system, model)
entry = self._cache.get(key)
if entry is None:
self._misses += 1
return None
# Check expiry
if time.time() - entry.created_at > self.ttl_seconds:
del self._cache[key]
self._misses += 1
return None
entry.hit_count += 1
entry.last_accessed = time.time()
self._hits += 1
return entry
def set(
self,
messages: list[dict],
system: str,
model: str,
response: str,
input_tokens: int,
output_tokens: int,
) -> None:
"""Store a response in the cache."""
# Evict expired entries if at capacity
if len(self._cache) >= self.max_entries:
self._evict()
key = self._request_hash(messages, system, model)
self._cache[key] = CacheEntry(
request_hash=key,
response=response,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
created_at=time.time(),
)
def _evict(self) -> None:
"""Remove expired entries, then LRU entries if still over capacity."""
now = time.time()
# Remove expired
expired = [k for k, e in self._cache.items() if now - e.created_at > self.ttl_seconds]
for k in expired:
del self._cache[k]
# Remove LRU if still over capacity
if len(self._cache) >= self.max_entries:
sorted_by_lru = sorted(self._cache.items(), key=lambda kv: kv[1].last_accessed)
to_remove = len(self._cache) - int(self.max_entries * 0.75)
for k, _ in sorted_by_lru[:to_remove]:
del self._cache[k]
def hit_rate(self) -> float:
total = self._hits + self._misses
return self._hits / max(total, 1)
def cost_saved_usd(self, model: str = "claude-haiku-4-5-20251001") -> float:
"""Estimate cost saved by cache hits."""
pricing = {"claude-haiku-4-5-20251001": (0.80, 4.00), "claude-sonnet-4-6": (3.00, 15.00), "claude-opus-4-6": (15.00, 75.00)}
inp, out = pricing.get(model, (0.80, 4.00))
saved = 0.0
for entry in self._cache.values():
saved += entry.hit_count * (entry.input_tokens * inp + entry.output_tokens * out) / 1_000_000
return round(saved, 2)
def stats(self) -> dict:
return {
"entries": len(self._cache),
"hits": self._hits,
"misses": self._misses,
"hit_rate": round(self.hit_rate(), 3),
}
# Production usage with the cache
_cache = ExactMatchCache(ttl_seconds=86_400) # 24h TTL
def cached_llm_call(
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
) -> dict:
"""
LLM call with exact-match caching.
Returns cache hit immediately (no API call, no cost).
"""
cached = _cache.get(messages, system, model)
if cached:
return {
"content": cached.response,
"cached": True,
"input_tokens": 0, # No tokens consumed from cache
"output_tokens": 0,
}
# Cache miss - call API
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
content = response.content[0].text
_cache.set(
messages, system, model, content,
response.usage.input_tokens,
response.usage.output_tokens,
)
return {
"content": content,
"cached": False,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
Optimization 2: Model Routing
Route requests to the cheapest model that can handle the task. Haiku is 18x cheaper than Opus - using it for simple tasks while reserving Opus for genuinely complex tasks dramatically reduces average cost per request:
import anthropic
from enum import Enum
client = anthropic.Anthropic()
class ModelTier(Enum):
FAST = "claude-haiku-4-5-20251001" # Cheapest, fastest, 200K context
BALANCED = "claude-sonnet-4-6" # Balanced quality and cost
POWERFUL = "claude-opus-4-6" # Highest quality, most expensive
# Cost ratios relative to Haiku (input tokens)
COST_MULTIPLIER = {
ModelTier.FAST: 1.0,
ModelTier.BALANCED: 3.75, # 3.75x more expensive per token than Haiku
ModelTier.POWERFUL: 18.75, # 18.75x more expensive per token than Haiku
}
def classify_task_complexity(
user_message: str,
context_length: int,
task_type: str | None = None,
) -> ModelTier:
"""
Classify task complexity to determine appropriate model tier.
This is a heuristic classifier - tune it for your specific use cases.
Consider A/B testing model tier decisions to validate quality-cost tradeoffs.
Args:
user_message: The user's request text
context_length: Total input context length in characters
task_type: Explicit task type tag (overrides heuristic if provided)
"""
# Explicit task type override (highest priority)
if task_type:
fast_types = {
"classification", "labeling", "extraction", "formatting",
"yes_no", "sentiment", "translation_simple", "summarize_short",
}
powerful_types = {
"creative_writing", "complex_reasoning", "architecture_review",
"research_synthesis", "strategic_planning", "nuanced_analysis",
}
if task_type in fast_types:
return ModelTier.FAST
if task_type in powerful_types:
return ModelTier.POWERFUL
message_lower = user_message.lower()
word_count = len(user_message.split())
# --- Fast tier signals ---
fast_keywords = [
"classify", "categorize", "label", "tag", "extract", "parse",
"format as", "yes or no", "true or false", "summarize briefly",
"in one word", "one sentence", "sort", "validate",
]
if any(kw in message_lower for kw in fast_keywords) and context_length < 3000:
return ModelTier.FAST
# Short, simple requests
if word_count < 10 and context_length < 1000:
return ModelTier.FAST
# --- Powerful tier signals ---
powerful_keywords = [
"deeply analyze", "reason about", "multi-step", "design system",
"evaluate tradeoffs", "compare architectures", "creative story",
"strategic", "nuanced", "long-form", "research",
]
if any(kw in message_lower for kw in powerful_keywords):
return ModelTier.POWERFUL
# Very long context (complex documents)
if context_length > 20_000:
return ModelTier.POWERFUL
# Default: balanced tier
return ModelTier.BALANCED
def routed_llm_call(
messages: list[dict],
system: str = "",
task_type: str | None = None,
force_tier: ModelTier | None = None,
max_tokens: int = 500,
) -> dict:
"""
Make an LLM call routed to the appropriate model tier.
Returns the result with cost metrics for monitoring.
"""
context_length = sum(len(str(m.get("content", ""))) for m in messages)
user_message = " ".join(
m.get("content", "") for m in messages if m.get("role") == "user"
)
if force_tier:
tier = force_tier
else:
tier = classify_task_complexity(user_message, context_length, task_type)
model = tier.value
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
actual_cost = estimate_call_cost(
response.usage.input_tokens,
response.usage.output_tokens,
model,
)["total_usd"]
# What would this have cost on Opus?
opus_cost = estimate_call_cost(
response.usage.input_tokens,
response.usage.output_tokens,
"claude-opus-4-6",
)["total_usd"]
return {
"content": response.content[0].text,
"model_used": model,
"tier": tier.name,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"actual_cost_usd": actual_cost,
"opus_cost_counterfactual_usd": opus_cost,
"cost_savings_usd": opus_cost - actual_cost,
"cost_multiplier_vs_haiku": COST_MULTIPLIER[tier],
}
# Convenience functions for common task types
def classify_text(text: str, categories: list[str]) -> str:
"""Always uses Haiku - classification is always a simple task."""
response = client.messages.create(
model=ModelTier.FAST.value,
max_tokens=20, # Categories are short
messages=[{
"role": "user",
"content": (
f"Classify the following text as one of: "
f"{', '.join(categories)}\n\n"
f"Text: {text[:1000]}\n\n"
f"Category (one word only):"
),
}],
)
return response.content[0].text.strip()
def analyze_deeply(document: str, question: str) -> str:
"""Uses Opus for deep analysis - quality is worth the cost."""
response = client.messages.create(
model=ModelTier.POWERFUL.value,
max_tokens=2000,
messages=[
{"role": "user", "content": document},
{"role": "assistant", "content": "I've read the document."},
{"role": "user", "content": question},
],
)
return response.content[0].text
Optimization 3: Anthropic Prompt Caching
Anthropic's prompt caching processes static content once and reuses the cached KV states on subsequent requests. This delivers 90% cost reduction on the cached portion and 40-60% TTFT improvement:
import anthropic
client = anthropic.Anthropic()
def call_with_prompt_cache(
static_knowledge: str,
user_message: str,
system_instruction: str = "You are a helpful assistant.",
model: str = "claude-sonnet-4-6",
max_tokens: int = 1000,
) -> dict:
"""
Use Anthropic prompt caching to reduce cost for repeated calls
with the same static context.
How prompt caching works:
1. First call: content is processed and KV states are cached (5-minute TTL)
2. Subsequent calls within 5 min: cache_read_tokens instead of reprocessing
3. Cache TTL resets on each use - active use maintains the cache
Cost breakdown (claude-sonnet-4-6):
- Standard input: $3.00/M tokens
- Cache creation: $3.75/M tokens (25% surcharge, one-time)
- Cache reads: $0.30/M tokens (90% savings)
Break-even: cache creation pays off after 1.25 cache hits.
Best use cases:
- Large product catalog included in every support query
- Full API documentation for a code assistant
- Long legal document for Q&A
- System prompt > 1024 tokens shared across many requests
"""
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=[
{
"type": "text",
"text": system_instruction,
# Note: system instructions not marked for caching
# (typically short - not worth the overhead)
},
{
"type": "text",
"text": static_knowledge,
"cache_control": {"type": "ephemeral"}, # Cache this block
# "ephemeral" = 5-minute TTL, refreshes on each access
},
],
messages=[{"role": "user", "content": user_message}],
)
usage = response.usage
cache_read = getattr(usage, "cache_read_input_tokens", 0)
cache_creation = getattr(usage, "cache_creation_input_tokens", 0)
regular_input = usage.input_tokens - cache_read - cache_creation
# Calculate actual vs counterfactual cost
cost_breakdown = estimate_call_cost(
input_tokens=regular_input,
output_tokens=usage.output_tokens,
model=model,
cache_read_tokens=cache_read,
cache_creation_tokens=cache_creation,
)
return {
"content": response.content[0].text,
"cache_hit": cache_read > 0,
"cache_read_tokens": cache_read,
"cache_creation_tokens": cache_creation,
"actual_cost_usd": cost_breakdown["total_usd"],
"without_cache_cost_usd": cost_breakdown["without_cache_usd"],
"savings_usd": cost_breakdown["cache_savings_usd"],
}
# Measuring cache effectiveness over a session
class CacheEffectivenessMeasurer:
"""Track prompt caching effectiveness across a session."""
def __init__(self):
self.calls: list[dict] = []
def record(self, result: dict) -> None:
self.calls.append(result)
def report(self) -> dict:
if not self.calls:
return {}
total_calls = len(self.calls)
cache_hits = sum(1 for c in self.calls if c.get("cache_hit"))
total_saved = sum(c.get("savings_usd", 0) for c in self.calls)
total_spent = sum(c.get("actual_cost_usd", 0) for c in self.calls)
return {
"total_calls": total_calls,
"cache_hits": cache_hits,
"hit_rate": cache_hits / total_calls,
"total_savings_usd": round(total_saved, 4),
"total_spent_usd": round(total_spent, 4),
"would_have_spent_usd": round(total_spent + total_saved, 4),
"savings_percentage": round(total_saved / max(total_spent + total_saved, 0.001) * 100, 1),
}
Optimization 4: Output Length Control
Output tokens cost 5x more per token than input tokens on most Claude models. Matching max_tokens to actual task requirements prevents paying for tokens you do not need:
import anthropic
client = anthropic.Anthropic()
# Task-to-max_tokens mapping
# These are conservative upper bounds - actual outputs are usually shorter
MAX_TOKENS_BY_TASK = {
"yes_no": 5, # "Yes" or "No"
"binary_classification": 10, # "positive" or "negative"
"single_label": 20, # One category from a fixed list
"short_answer": 50, # A sentence or short phrase
"brief_summary": 150, # 2-3 sentences
"paragraph_summary": 300, # One paragraph
"analysis": 800, # Detailed analysis
"structured_extraction": 400, # JSON extraction
"code_snippet": 600, # Short code generation
"full_response": 2048, # No meaningful limit
"document": 4096, # Long document generation
}
# Cost savings from right-sizing max_tokens
# Setting max_tokens=2048 vs max_tokens=5 for a yes/no task:
# At worst case (actually generates 2048 output tokens) → 400x more expensive
# In practice, even with right-sized max_tokens, the model generates only what it needs
def task_optimized_call(
task_type: str,
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
output_format_hint: str | None = None,
) -> dict:
"""
Make an LLM call with max_tokens tuned to task type.
Also adds output format hints to the system prompt when appropriate -
this encourages the model to be concise, reducing actual output tokens
even below the max_tokens limit.
"""
max_tokens = MAX_TOKENS_BY_TASK.get(task_type, 500)
# Add format hints to encourage concise output
format_hints = {
"yes_no": "Answer with only 'Yes' or 'No'.",
"binary_classification": "Answer with only the classification label.",
"single_label": "Answer with only the category name, nothing else.",
"short_answer": "Be brief. One sentence.",
"brief_summary": "2-3 sentences maximum.",
"structured_extraction": "Output only valid JSON.",
}
enhanced_system = system
if task_type in format_hints:
hint = format_hints[task_type]
enhanced_system = f"{system}\n\n{hint}".strip() if system else hint
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=enhanced_system,
messages=messages,
)
cost = estimate_call_cost(
response.usage.input_tokens,
response.usage.output_tokens,
model,
)["total_usd"]
# What would it have cost with max_tokens=2048?
# Note: this is worst-case - actual output may be the same length
worst_case_cost = estimate_call_cost(
response.usage.input_tokens,
2048, # Worst case output
model,
)["total_usd"]
return {
"content": response.content[0].text,
"output_tokens": response.usage.output_tokens,
"max_tokens_allowed": max_tokens,
"actual_cost_usd": cost,
"worst_case_max_tokens_2048_usd": worst_case_cost,
"potential_savings_usd": worst_case_cost - cost,
}
# Batch classification - the most cost-sensitive use case
def batch_classify_items(
items: list[str],
categories: list[str],
model: str = "claude-haiku-4-5-20251001",
) -> list[dict]:
"""
Classify a batch of items at minimum cost.
Uses Haiku + max_tokens=10 + format hint = lowest possible cost per item.
"""
results = []
category_list = ", ".join(categories)
for item in items:
result = task_optimized_call(
task_type="single_label",
messages=[{
"role": "user",
"content": (
f"Classify: {item[:200]}\n"
f"Categories: {category_list}"
),
}],
model=model,
)
results.append({
"item": item,
"category": result["content"].strip(),
"cost_usd": result["actual_cost_usd"],
})
return results
Optimization 5: Prompt Compression
Shorter input prompts reduce input token costs and improve KV cache hit rates:
import anthropic
import re
from typing import Optional
client = anthropic.Anthropic()
def compress_system_prompt(system_prompt: str) -> tuple[str, dict]:
"""
Remove unnecessary tokens from system prompts.
Conservative compression (always safe):
- Normalize whitespace
- Remove common filler phrases
- Deduplicate blank lines
Returns (compressed_prompt, stats)
"""
original_chars = len(system_prompt)
# Normalize whitespace
compressed = re.sub(r"[ \t]+", " ", system_prompt) # Multiple spaces → single
compressed = re.sub(r"\n{3,}", "\n\n", compressed) # 3+ newlines → double
# Remove filler phrases that add tokens without meaning
filler_phrases = [
r"Please note that ",
r"It is important to remember that ",
r"You should always ",
r"You must always ",
r"Remember to always ",
r"Always keep in mind that ",
r"Be sure to ",
r"Make sure to ",
r"As an AI (?:language model|assistant), ",
r"As a helpful AI, ",
r"I would like to remind you that ",
]
for phrase in filler_phrases:
compressed = re.sub(phrase, "", compressed, flags=re.IGNORECASE)
# Normalize repeated punctuation
compressed = re.sub(r"\.{2,}", ".", compressed)
compressed = re.sub(r"!{2,}", "!", compressed)
compressed = compressed.strip()
compressed_chars = len(compressed)
char_reduction = original_chars - compressed_chars
token_reduction_est = char_reduction // 4 # Rough estimate
return compressed, {
"original_chars": original_chars,
"compressed_chars": compressed_chars,
"char_reduction": char_reduction,
"token_reduction_estimate": token_reduction_est,
"reduction_pct": round(char_reduction / max(original_chars, 1) * 100, 1),
}
def truncate_rag_context(
documents: list[dict],
max_total_tokens: int = 30_000,
min_tokens_per_doc: int = 200,
) -> list[dict]:
"""
Truncate RAG documents to fit within a token budget.
Strategy: equal budget allocation per document, with minimum per doc.
In practice, use scored retrieval to include the most relevant docs
and truncate the least relevant ones first.
Args:
documents: List of dicts with "content" and optionally "score" keys
max_total_tokens: Total token budget for all documents
min_tokens_per_doc: Minimum tokens to allocate per document
Returns:
Documents with content truncated to fit budget
"""
if not documents:
return []
# Sort by relevance score if available (highest first)
scored = sorted(
documents,
key=lambda d: d.get("score", 0.5),
reverse=True,
)
tokens_per_doc = max(min_tokens_per_doc, max_total_tokens // len(scored))
chars_per_doc = tokens_per_doc * 4 # Rough approximation
result = []
remaining_budget = max_total_tokens * 4 # In chars
for doc in scored:
if remaining_budget <= 0:
break
content = doc.get("content", "")
if len(content) > chars_per_doc:
# Truncate this document
truncated = content[:chars_per_doc]
# Truncate at a sentence boundary if possible
last_period = truncated.rfind(". ")
if last_period > chars_per_doc // 2:
truncated = truncated[:last_period + 1]
content = truncated + "\n[... document truncated ...]"
result.append({**doc, "content": content})
remaining_budget -= len(content)
return result
def compress_conversation_history(
messages: list[dict],
max_tokens: int = 4_000,
compression_model: str = "claude-haiku-4-5-20251001",
) -> list[dict]:
"""
Compress conversation history by summarizing older turns.
Always keeps the most recent turns verbatim (most contextually relevant).
Summarizes older turns using the cheapest model.
Cost note: the compression call itself costs money - only use this
when the savings from fewer history tokens exceed the compression call cost.
"""
KEEP_LAST_N = 6 # Always keep last 3 exchanges verbatim
if len(messages) <= KEEP_LAST_N:
return messages
recent = messages[-KEEP_LAST_N:]
older = messages[:-KEEP_LAST_N]
if not older:
return recent
# Format older turns for summarization
older_text = "\n".join([
f"{m['role'].upper()}: {str(m.get('content', ''))[:300]}"
for m in older[-20:] # Summarize at most 20 older turns
])
# Use cheapest model for compression
summary_response = client.messages.create(
model=compression_model,
max_tokens=200,
messages=[{
"role": "user",
"content": (
f"Summarize this conversation in 2-3 sentences, "
f"preserving key facts and decisions:\n\n{older_text}"
),
}],
)
summary = summary_response.content[0].text
return [
{"role": "user", "content": f"[Earlier conversation summary: {summary}]"},
{"role": "assistant", "content": "Understood."},
] + recent
Cost Monitoring and Budget Alerts
Track costs in real-time to catch overruns before they become crises:
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional
@dataclass
class CostTracker:
"""
Real-time LLM cost tracker with budget alerts.
Tracks cost by model, task type, user, and time window.
Triggers alerts when approaching or exceeding budget.
"""
daily_budget_usd: float = 500.0
alert_threshold_pct: float = 0.8 # Alert at 80% of budget
_records: list[dict] = field(default_factory=list)
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
task_type: str = "unknown",
user_id: str | None = None,
cached: bool = False,
cache_read_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> float:
"""Record an API call. Returns cost in USD."""
cost = 0.0 if cached else estimate_call_cost(
input_tokens, output_tokens, model,
cache_read_tokens=cache_read_tokens,
cache_creation_tokens=cache_creation_tokens,
)["total_usd"]
self._records.append({
"timestamp": time.time(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"task_type": task_type,
"user_id": user_id,
"cached": cached,
"cost_usd": cost,
})
# Check budget
daily = self.daily_spend()
if daily >= self.daily_budget_usd:
self._trigger_alert("CRITICAL", daily, "Daily budget exceeded")
elif daily >= self.daily_budget_usd * self.alert_threshold_pct:
self._trigger_alert("WARNING", daily, "Approaching daily budget")
return cost
def _trigger_alert(self, level: str, spend: float, message: str) -> None:
"""In production: send to PagerDuty, Slack, or monitoring system."""
pct = spend / self.daily_budget_usd * 100
print(f"[Cost Alert] {level}: ${spend:.2f} / ${self.daily_budget_usd:.2f} ({pct:.0f}%) - {message}")
def daily_spend(self, days_ago: int = 0) -> float:
"""Calculate spend for a specific day (0 = today)."""
now = time.time()
day_start = now - (now % 86400) - days_ago * 86400
day_end = day_start + 86400
return sum(
r["cost_usd"] for r in self._records
if day_start <= r["timestamp"] < day_end
)
def hourly_burn_rate(self) -> float:
"""Current hourly spend rate."""
hour_start = time.time() - 3600
return sum(r["cost_usd"] for r in self._records if r["timestamp"] >= hour_start)
def projected_daily_cost(self) -> float:
"""Project today's total cost based on current hourly rate."""
return self.hourly_burn_rate() * 24
def cost_by_model(self, hours: int = 24) -> dict:
"""Break down cost by model for the last N hours."""
cutoff = time.time() - hours * 3600
by_model: dict[str, float] = defaultdict(float)
for r in self._records:
if r["timestamp"] >= cutoff:
by_model[r["model"]] += r["cost_usd"]
return dict(by_model)
def cost_by_task_type(self, hours: int = 24) -> dict:
"""Break down cost by task type for the last N hours."""
cutoff = time.time() - hours * 3600
by_task: dict[str, float] = defaultdict(float)
for r in self._records:
if r["timestamp"] >= cutoff:
by_task[r["task_type"]] += r["cost_usd"]
return dict(by_task)
def cache_metrics(self, hours: int = 24) -> dict:
"""Cache hit rate and savings for the last N hours."""
cutoff = time.time() - hours * 3600
recent = [r for r in self._records if r["timestamp"] >= cutoff]
if not recent:
return {}
cached = [r for r in recent if r["cached"]]
return {
"total_calls": len(recent),
"cache_hits": len(cached),
"hit_rate": len(cached) / len(recent),
"cache_savings_usd": sum(r["cost_usd"] for r in cached), # Already 0 for cached
}
def get_dashboard(self) -> dict:
"""Full cost dashboard for monitoring."""
daily = self.daily_spend()
hourly_rate = self.hourly_burn_rate()
return {
"today_spend_usd": round(daily, 4),
"daily_budget_usd": self.daily_budget_usd,
"budget_used_pct": round(daily / self.daily_budget_usd * 100, 1),
"over_budget": daily > self.daily_budget_usd,
"hourly_rate_usd": round(hourly_rate, 4),
"projected_daily_usd": round(self.projected_daily_cost(), 2),
"by_model_24h": self.cost_by_model(24),
"by_task_24h": self.cost_by_task_type(24),
"cache_metrics_24h": self.cache_metrics(24),
}
Common Mistakes
:::danger Using Opus for Everything Claude Opus is 18x more expensive per token than Haiku. Using it for simple classification, extraction, or formatting tasks is like using a supercomputer to run a calculator. Audit your actual request distribution - in most production systems, 60-70% of requests are simple enough for Haiku. Model routing pays for itself immediately. :::
:::danger No max_tokens Limit or Default Too High
Many applications never set max_tokens or set it to a high default (2048+). For a yes/no classification task, you're paying for up to 2048 output tokens when you need 5. Always match max_tokens to your task type. Use the MAX_TOKENS_BY_TASK mapping above as a starting point.
:::
:::warning Caching Without TTL or Invalidation LLM responses cached without expiry can serve stale content indefinitely. A cached product description stays served even after the product changes. A cached legal analysis may be outdated after new regulations. Always set TTL based on how quickly your underlying data changes. For static product data: 24 hours. For live financial data: minutes or no caching. :::
:::warning Not Measuring Before Optimizing Different applications have different cost drivers. RAG-heavy applications spend most budget on input tokens (large retrieved documents). Creative writing applications spend mostly on output tokens. Chatbots may spend on conversation history. Profile your actual token usage breakdown before choosing optimizations - the highest-impact optimization depends on where your costs actually are. :::
:::tip Compound Optimizations The optimizations in this lesson compound. Start with measurement, then apply in this order: (1) semantic caching - eliminates entire API calls; (2) model routing - reduces cost per call; (3) prompt caching - reduces cost of input tokens on cache hits; (4) output length control - reduces output tokens. Each subsequent optimization builds on the savings from the previous ones. :::
Optimization Impact Comparison
| Optimization | Typical Impact | Implementation Effort | Best For |
|---|---|---|---|
| Semantic caching | 20-50% total cost reduction | Medium | High-repetition workloads |
| Model routing | 40-70% on eligible requests | Low | Mixed simple/complex tasks |
| Anthropic prompt caching | 80-90% on cached portion | Low | Large shared static context |
| Output length control | 10-50% output cost reduction | Very Low | Classification, extraction |
| Prompt compression | 10-20% input cost reduction | Low | Verbose system prompts |
| Batch API (50% off) | 50% total cost reduction | Medium | Overnight, non-urgent work |
| Conversation history compression | 10-30% input cost reduction | Medium | Long chat sessions |
Interview Q&A
Q1: What are the most impactful LLM cost optimization techniques and how do you choose between them?
Start with measurement: know your actual cost distribution (input vs output token ratio), which models you are using for which tasks, and what your cache hit potential looks like. Then: (1) Semantic caching - if 20%+ of requests are similar or identical, caching has high ROI. Implementation is simple; impact is immediate. (2) Model routing - if you have a mix of simple tasks (classification, extraction) and complex tasks, routing simple tasks to Haiku while using Opus only for genuinely complex tasks typically saves 50-80% on the simple task subset. (3) Anthropic prompt caching - high impact if you have long static contexts (product catalogs, documentation) that appear in many requests. Break-even is ~1.25 cache hits after the creation cost. (4) Output length control - high impact if output tokens are a large share of your cost and you have classification or extraction tasks. Match max_tokens to task requirements. Each optimization compounds - apply them in sequence to maximize total savings.
Q2: How does Anthropic's prompt caching work and what are the eligibility requirements?
Prompt caching processes static content once and caches the computed KV attention states for 5 minutes. Subsequent requests that include the same cached prefix reuse those states, skipping the re-computation. Pricing: cache creation costs 25% more than standard input (one-time write cost), but cache reads cost 90% less than standard input. Requirements: (1) content must be marked with cache_control: {type: "ephemeral"}; (2) content must appear at the beginning of the prompt (prefix - not in the middle or end); (3) minimum 1,024 tokens for the content to be eligible; (4) cache TTL is 5 minutes, reset on each access (so active conversations maintain the cache). Best applications: a large FAQ document queried many times, a long system prompt shared across many users, a codebase file loaded for every code review question.
Q3: How do you implement model routing in production without sacrificing quality?
Two approaches, used in combination. First, rule-based routing: classify task type from the request (task_type parameter or heuristic classifier on request content) and map to model tiers. Classification/extraction → Haiku; standard generation → Sonnet; complex reasoning/creative → Opus. Simple, fast, deterministic. Second, quality validation: run a 100-item A/B test comparing your routing decision against always-using-Opus. Measure output quality (human evaluation or a quality LLM judge). If quality is equivalent, the routing is correct. If quality degrades for a task type, move it up a tier. Track routing decisions and periodically review the distribution - as product evolves, the right routing may change. Never trust "this task is simple" without validating against a quality baseline.
Q4: A new product uses LLMs for every user action, and the bill is growing faster than users. What is your diagnosis and action plan?
Start with measurement: instrument every API call with model, input_tokens, output_tokens, task_type, and whether it was a cache hit. Build a cost dashboard showing spend by model and task type. Then diagnose: (1) Check model distribution - if most calls go to Opus, implement routing; (2) Check cache hit rate - if zero, add semantic caching; (3) Check average output tokens - if high for simple tasks, add max_tokens limits; (4) Check input token growth - if conversation history is growing, add compression. The 14,800 by applying caching (34% reduction) + routing + compression + output limits. This 63% reduction was achieved without any quality degradation - the savings came entirely from efficiency, not cutting corners.
Q5: How do you build a cost monitoring system for LLM APIs?
Three components. First, instrumentation: log every API call with model, input_tokens, output_tokens (from response.usage), task_type, user_id, and timestamp. Never estimate costs from character counts - always use actual API-returned token counts. Second, dashboards: track daily spend vs budget, hourly burn rate (for anomaly detection), cost breakdown by model and task type, cache hit rate and savings, and projected daily cost based on current rate. Third, alerts: alert at 80% of daily budget (WARNING), 100% (CRITICAL), and on anomalous burn rate (if hourly rate spikes 3x vs the previous hour). Integrate alerts with your on-call system (PagerDuty, Slack). Review cost attribution weekly - costs should grow proportionally to valuable user activity, not faster.
