:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Token Cost Monitor demo on the EngineersOfAI Playground - no code required. :::
Cost Management and Budget Alerts
The $94,000 Surprise
The CFO's email arrived at 8:47 AM on a Thursday. Subject line: "LLM Invoice Explanation?" The body was three sentences and a PDF attachment. The PDF was the Anthropic invoice for the previous month: $94,283.17.
The engineering team's approved monthly budget for LLM infrastructure was 20,000 "to cover growth." The invoice was 4.7x over the approved budget. The CFO wanted an explanation before the all-hands meeting at 10 AM.
The root cause analysis took three days. A developer had shipped an internal "summarization helper" as a quick proof of concept - a feature that summarized any content a user viewed, in real time, using Claude Opus at 480 per day. $14,400 per month from a single undocumented internal tool that nobody had sanctioned and no budget had been allocated for.
There was no per-feature spend tracking. There were no budget alerts configured. There were no hard limits on any dimension. There was no anomaly detection on spend acceleration. Nobody saw the cost building until the invoice arrived. The fix required adding cost tracking retroactively - after $94,000 had already been spent.
This is the problem that real-time cost management at the gateway layer exists to prevent. Not post-hoc analysis, not monthly billing surprises, but per-request cost visibility with enforcement that stops overspending before it becomes a CFO conversation.
Why LLM Costs Are Different From Every Other Infrastructure Cost
LLM costs behave differently from every other infrastructure cost category in ways that make standard monitoring insufficient.
Server costs are bounded by instance count: you cannot accidentally spend 5x your budget without provisioning 5x the servers, which requires a deliberate action. Storage costs scale with data volume: there are natural limits from your data ingestion pipeline. Network costs scale with traffic: extraordinary traffic is visible in your load balancer metrics before the invoice. LLM costs are unbounded by default - a single feature, a single batch job, or even a single misconfigured API call can generate thousands of dollars in a matter of hours with no natural ceiling, no obvious monitoring signal, and no automatic brake.
The cost-per-call is also invisible at the point of consumption. An engineer writing a new feature sees a function call - client.messages.create() - not a dollar sign. The economic consequence only surfaces in the monthly invoice, by which point the damage has been accumulating for weeks. Even sophisticated engineering teams have been surprised by $50,000+ invoices from features they thought were low-usage.
Cost management at the gateway layer solves this by making cost visible at the moment of consumption, enforceable at request time, and auditable at any dimension you care about.
The Cost Calculation Formula
Every LLM API response includes exact token counts in the usage field of the response. Cost is deterministic from those counts and the provider's published pricing:
Where and are the prices per million tokens for the model in use. For models that support prompt caching (like Claude Sonnet), a third term accounts for cache read and write tokens at their respective discounted rates.
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class ModelPricing:
"""Pricing for one model. All prices in USD per 1M tokens."""
input_per_million: float
output_per_million: float
cache_read_per_million: Optional[float] = None # Anthropic prompt caching discount
cache_write_per_million: Optional[float] = None # Anthropic prompt caching write cost
# Approximate pricing as of March 2026.
# ALWAYS verify against current provider documentation before billing decisions.
# Store this as configurable YAML/JSON in production - not as hardcoded constants.
PRICING_TABLE: dict[str, ModelPricing] = {
# Anthropic
"claude-opus-4-6": ModelPricing(15.00, 75.00),
"claude-sonnet-4-6": ModelPricing(3.00, 15.00,
cache_read_per_million=0.30,
cache_write_per_million=3.75),
"claude-haiku-4-5-20251001": ModelPricing(0.80, 4.00),
# OpenAI
"gpt-4o": ModelPricing(2.50, 10.00),
"gpt-4o-mini": ModelPricing(0.15, 0.60),
"o1": ModelPricing(15.00, 60.00),
# Google
"gemini-1.5-pro": ModelPricing(3.50, 10.50),
"gemini-1.5-flash": ModelPricing(0.075, 0.30),
"gemini-2.0-flash": ModelPricing(0.10, 0.40),
}
def calculate_cost(
model: str,
input_tokens: int,
output_tokens: int,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
) -> float:
"""
Calculate the exact dollar cost of one LLM request.
For models with prompt caching (e.g., claude-sonnet-4-6), pass
cache_read_tokens and cache_write_tokens for accurate cost calculation.
Returns 0.0 and logs a warning for unknown models - unknown models
are a cost blind spot and should trigger an alert.
"""
pricing = PRICING_TABLE.get(model)
if pricing is None:
print(f"WARNING: No pricing entry for model '{model}'. Cost set to $0. "
f"Add this model to PRICING_TABLE to enable cost tracking.")
return 0.0
cost = (
input_tokens * pricing.input_per_million
+ output_tokens * pricing.output_per_million
) / 1_000_000
if cache_read_tokens > 0 and pricing.cache_read_per_million is not None:
cost += cache_read_tokens * pricing.cache_read_per_million / 1_000_000
if cache_write_tokens > 0 and pricing.cache_write_per_million is not None:
cost += cache_write_tokens * pricing.cache_write_per_million / 1_000_000
return cost
# ─── Illustrative cost examples ───────────────────────────────────────────────
# Short FAQ query on the cheapest model
haiku_cost = calculate_cost("claude-haiku-4-5-20251001", 100, 50)
print(f"Haiku FAQ (100in+50out): ${haiku_cost:.6f} → ${haiku_cost * 1000:.4f} per 1k calls")
# Document analysis on the standard model
sonnet_cost = calculate_cost("claude-sonnet-4-6", 5000, 500)
print(f"Sonnet doc analysis (5k+500): ${sonnet_cost:.4f}")
# Complex reasoning on the premium model
opus_cost = calculate_cost("claude-opus-4-6", 10000, 2000)
print(f"Opus reasoning (10k+2k): ${opus_cost:.4f} → ${opus_cost * 1000:.2f} per 1k calls")
The CostTracker: Real-Time Per-Dimension Accumulation
The CostTracker accumulates cost atomically in Redis using INCRBYFLOAT - a single atomic operation that safely handles concurrent requests from multiple gateway processes. It tracks cost across four dimensions simultaneously: per-user, per-team, per-feature, and global. After each request, it checks budget thresholds and fires alerts when they are crossed.
import anthropic
import json
import time
import redis
import httpx
import uuid
from dataclasses import dataclass, field
@dataclass
class BudgetConfig:
"""Budget limits and alert thresholds for one budget dimension."""
hard_limit: Optional[float] = None # USD - reject requests above this threshold
soft_limit: Optional[float] = None # USD - alert but allow requests through
alert_at_pct: float = 0.80 # Alert when spend / hard_limit exceeds this fraction
@dataclass
class CostEvent:
"""A single cost event emitted after every LLM request."""
timestamp: float
request_id: str
user_id: str
team_id: str
feature: str
model: str
provider: str
input_tokens: int
output_tokens: int
cost_usd: float
cache_hit: bool = False
latency_ms: float = 0.0
class CostTracker:
"""
Real-time LLM cost tracker with multi-dimension accumulation.
Architecture:
- Redis INCRBYFLOAT for atomic accumulation (safe for concurrent gateway requests)
- Per-period keys: cost:{dimension}:{value}:{YYYY-MM} (monthly) or daily/weekly
- Time-series event log (last 10k events, for anomaly detection and debugging)
- Slack webhook alerts on threshold crossing with cooldown to prevent spam
- Pre-flight budget check before LLM calls (O(3) Redis GETs, ~1-3ms overhead)
For complex analytics (top N users by spend, trend graphs, month-over-month),
write events to PostgreSQL in addition to Redis accumulation.
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
budgets: Optional[dict[str, BudgetConfig]] = None,
slack_webhook_url: Optional[str] = None,
period: str = "monthly", # "daily", "weekly", or "monthly"
alert_cooldown_s: float = 3600.0, # Minimum seconds between same-alert firings
):
self.redis = redis.from_url(redis_url)
self.budgets = budgets or {}
self.slack_webhook = slack_webhook_url
self.period = period
self.alert_cooldown_s = alert_cooldown_s
# In-memory cache of sent alerts: {alert_key: sent_timestamp}
# Prevents alert spam when thresholds are crossed repeatedly
self._alert_cache: dict[str, float] = {}
def _period_suffix(self) -> str:
"""Current billing window suffix for Redis key construction."""
t = time.gmtime()
if self.period == "daily":
return f"{t.tm_year}-{t.tm_mon:02d}-{t.tm_mday:02d}"
elif self.period == "weekly":
week = int(time.strftime("%W", t))
return f"{t.tm_year}-W{week:02d}"
else: # monthly
return f"{t.tm_year}-{t.tm_mon:02d}"
def _redis_key(self, dimension: str, value: str) -> str:
return f"cost:{dimension}:{value}:{self._period_suffix()}"
def _increment(self, dimension: str, value: str, amount: float) -> float:
"""Atomic increment - safe for concurrent gateway requests across processes."""
key = self._redis_key(dimension, value)
new_total = float(self.redis.incrbyfloat(key, amount))
self.redis.expire(key, 90 * 86400) # 90-day retention window
return new_total
def _get_current(self, dimension: str, value: str) -> float:
"""Get current period total for a dimension/value pair."""
raw = self.redis.get(self._redis_key(dimension, value))
return float(raw) if raw else 0.0
def _maybe_alert(self, alert_key: str, message: str) -> None:
"""Send an alert if the cooldown period has elapsed since the last send."""
now = time.time()
last_sent = self._alert_cache.get(alert_key, 0.0)
if now - last_sent < self.alert_cooldown_s:
return # Alert cooldown active - suppress this firing
self._alert_cache[alert_key] = now
if self.slack_webhook:
try:
httpx.post(
self.slack_webhook,
json={"text": message},
timeout=5.0,
)
except Exception as e:
print(f"[CostTracker] Slack alert failed: {e}. Message: {message}")
else:
print(f"[CostTracker ALERT] {message}")
def _check_budget(self, dimension: str, value: str, new_total: float) -> bool:
"""
Check budget limits and send alerts after an increment.
Returns True if the request should be BLOCKED (hard limit exceeded).
This is called AFTER recording cost - for pre-flight blocking, use check_pre_flight().
"""
budget_key = f"{dimension}:{value}"
config = self.budgets.get(budget_key)
if config is None:
return False # No budget configured for this dimension - allow everything
# Hard limit check: block ALL subsequent requests
if config.hard_limit and new_total >= config.hard_limit:
alert_key = f"hard:{budget_key}:{self._period_suffix()}"
self._maybe_alert(
alert_key,
f":rotating_light: *LLM Hard Budget Exceeded*\n"
f"> `{dimension}:{value}` spent *${new_total:.2f}* "
f"(limit: ${config.hard_limit:.2f})\n"
f"> Requests are being BLOCKED until the period resets or the limit is raised."
)
return True # Signal: block future requests
# Percentage-based threshold alert (e.g., 80% of hard limit)
if config.hard_limit and config.alert_at_pct:
threshold_usd = config.hard_limit * config.alert_at_pct
if new_total >= threshold_usd:
pct = int(new_total / config.hard_limit * 100)
alert_key = f"pct{pct}:{budget_key}:{self._period_suffix()}"
self._maybe_alert(
alert_key,
f":warning: *LLM Budget Alert ({pct}% of limit)*\n"
f"> `{dimension}:{value}` has spent *${new_total:.2f}* "
f"of ${config.hard_limit:.2f} budget"
)
# Soft limit alert: warn but continue serving
if config.soft_limit and new_total >= config.soft_limit:
# Bucket alerts by $10 increments to avoid per-dollar spam
bucket = int(new_total / 10) * 10
alert_key = f"soft:{budget_key}:{bucket}"
self._maybe_alert(
alert_key,
f":warning: *LLM Soft Limit Reached*\n"
f"> `{dimension}:{value}` has reached soft limit: "
f"${new_total:.2f} of ${config.soft_limit:.2f}"
)
return False # Not at hard limit - allow request
def record(self, event: CostEvent) -> dict[str, bool]:
"""
Record a cost event and check all budgets.
Returns {dimension: is_blocked} for each tracked dimension.
True means the NEXT request on this dimension would be blocked.
"""
if event.cache_hit:
# Cache hits incur zero LLM cost - record for accounting but no accumulation
return {"user": False, "team": False, "feature": False}
block_status: dict[str, bool] = {}
for dim, val in [
("user", event.user_id),
("team", event.team_id),
("feature", event.feature),
("global", "all"),
]:
new_total = self._increment(dim, val, event.cost_usd)
if dim != "global":
block_status[dim] = self._check_budget(dim, val, new_total)
# Append to time-series event log (capped at 10k events)
event_json = json.dumps({
"ts": event.timestamp,
"req_id": event.request_id,
"user_id": event.user_id,
"team_id": event.team_id,
"feature": event.feature,
"model": event.model,
"cost_usd": round(event.cost_usd, 8),
"input_tokens": event.input_tokens,
"output_tokens": event.output_tokens,
"latency_ms": round(event.latency_ms, 1),
"cache_hit": event.cache_hit,
})
self.redis.lpush("cost:events", event_json)
self.redis.ltrim("cost:events", 0, 9_999)
return block_status
def check_pre_flight(
self, user_id: str, team_id: str, feature: str
) -> tuple[bool, str]:
"""
Pre-flight budget check BEFORE making the LLM call.
Returns (is_allowed, reason_if_blocked).
This prevents spending LLM tokens on requests that would be blocked anyway.
Overhead: ~3 Redis GETs (~1-3ms). Negligible vs LLM latency (500-5000ms).
"""
for dim, val in [("user", user_id), ("team", team_id), ("feature", feature)]:
current = self._get_current(dim, val)
budget_key = f"{dim}:{val}"
cfg = self.budgets.get(budget_key)
if cfg and cfg.hard_limit and current >= cfg.hard_limit:
return False, (
f"LLM budget exceeded for {dim} '{val}': "
f"${current:.4f} of ${cfg.hard_limit:.2f} spent this period."
)
return True, ""
def spend_summary(self) -> dict:
"""Aggregate spend for the current period across all tracked dimensions."""
period = self._period_suffix()
pattern = f"cost:*:*:{period}"
keys = self.redis.keys(pattern)
summary: dict[str, dict[str, float]] = {}
for raw_key in keys:
key_str = raw_key.decode() if isinstance(raw_key, bytes) else raw_key
parts = key_str.split(":")
if len(parts) < 4:
continue
dim, val = parts[1], parts[2]
amount = float(self.redis.get(raw_key) or 0)
summary.setdefault(dim, {})[val] = round(amount, 4)
return summary
def recent_events(self, n: int = 50) -> list[dict]:
"""Return the N most recent cost events from the event log."""
raw = self.redis.lrange("cost:events", 0, n - 1)
return [json.loads(e) for e in raw]
def detect_anomaly(
self,
feature: str,
window_days: int = 7,
spike_multiplier: float = 2.0,
) -> Optional[dict]:
"""
Compare today's feature spend to the rolling N-day average.
Returns an anomaly report if today's rate exceeds multiplier * baseline.
Best used for batch reporting (hourly cron) rather than per-request checks.
"""
events = self.recent_events(n=5000)
# Group spend by day for the target feature
daily_totals: dict[str, float] = {}
for evt in events:
if evt.get("feature") != feature:
continue
day = time.strftime("%Y-%m-%d", time.gmtime(evt["ts"]))
daily_totals[day] = daily_totals.get(day, 0.0) + evt.get("cost_usd", 0.0)
if len(daily_totals) < 3:
return None # Not enough data for a meaningful baseline
today = time.strftime("%Y-%m-%d", time.gmtime())
recent_days = sorted(daily_totals.keys())
baseline_days = [d for d in recent_days if d != today][-window_days:]
if not baseline_days:
return None
baseline_avg = sum(daily_totals[d] for d in baseline_days) / len(baseline_days)
today_spend = daily_totals.get(today, 0.0)
if baseline_avg > 0 and today_spend > baseline_avg * spike_multiplier:
return {
"feature": feature,
"today_spend": round(today_spend, 4),
"baseline_avg_daily": round(baseline_avg, 4),
"spike_ratio": round(today_spend / baseline_avg, 2),
"alert": (
f"Feature '{feature}' spend today (${today_spend:.2f}) is "
f"{today_spend / baseline_avg:.1f}x the {window_days}-day "
f"average (${baseline_avg:.2f}/day)"
),
}
return None
Cost-Aware Anthropic Client
The CostAwareAnthropicClient wraps the tracker with pre-flight checks and post-request recording integrated directly into the call path. Applications use this client instead of the raw Anthropic SDK.
class CostAwareAnthropicClient:
"""
Anthropic client with integrated cost tracking and budget enforcement.
Pre-flight: checks all budget dimensions before the LLM call
Post-flight: records the actual cost event with full metadata
Hard limit: raises ValueError before the LLM call (no tokens consumed)
Soft limit: allows the call but warns that the next request may be blocked
"""
def __init__(self, tracker: CostTracker):
self.tracker = tracker
self.client = anthropic.Anthropic()
def complete(
self,
messages: list[dict],
user_id: str,
team_id: str,
feature: str,
model: str = "claude-sonnet-4-6",
max_tokens: int = 1024,
system: Optional[str] = None,
) -> dict:
"""
Complete a request with cost tracking and budget enforcement.
Raises ValueError if the pre-flight budget check fails - this is a
deliberate design: budget blocks are not LLM errors, they are policy
decisions. The caller should handle ValueError separately from
anthropic.APIError to show appropriate user-facing messages.
"""
# Step 1: Pre-flight budget check - block before consuming any tokens
allowed, reason = self.tracker.check_pre_flight(user_id, team_id, feature)
if not allowed:
raise ValueError(f"Request blocked by cost policy: {reason}")
# Step 2: Execute the LLM call
start = time.time()
kwargs: dict = {
"model": model,
"max_tokens": max_tokens,
"messages": messages,
}
if system:
kwargs["system"] = system
response = self.client.messages.create(**kwargs)
latency_ms = (time.time() - start) * 1000
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost = calculate_cost(model, input_tokens, output_tokens)
# Step 3: Record cost event with full attribution metadata
event = CostEvent(
timestamp=time.time(),
request_id=str(uuid.uuid4())[:8],
user_id=user_id,
team_id=team_id,
feature=feature,
model=model,
provider="anthropic",
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
latency_ms=latency_ms,
)
block_status = self.tracker.record(event)
# Step 4: Surface post-request budget status (warning only - not blocking)
for dim, blocked in block_status.items():
if blocked:
print(f"[CostTracker] WARNING: {dim} budget now exceeded - "
f"next request from this {dim} will be blocked")
return {
"response": response.content[0].text,
"model": model,
"cost_usd": round(cost, 8),
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"latency_ms": round(latency_ms, 1),
"request_id": event.request_id,
}
def demo_cost_tracking() -> None:
"""Full demo: cost tracking with budget alerts and anomaly detection."""
tracker = CostTracker(
redis_url="redis://localhost:6379",
period="monthly",
alert_cooldown_s=3600.0,
budgets={
# Per-user monthly limits
"user:user_8821": BudgetConfig(
hard_limit=10.00,
soft_limit=8.00,
alert_at_pct=0.80,
),
# Per-team monthly limits
"team:platform-team": BudgetConfig(
hard_limit=500.00,
soft_limit=400.00,
alert_at_pct=0.80,
),
# Per-feature monthly limits
"feature:documentation-assistant": BudgetConfig(
hard_limit=200.00,
soft_limit=160.00,
alert_at_pct=0.80,
),
},
slack_webhook_url="https://hooks.slack.com/services/T.../B.../...",
)
client = CostAwareAnthropicClient(tracker)
queries = [
"What is the difference between TCP and UDP?",
"Explain the Raft consensus algorithm in simple terms.",
"What is a distributed hash table and how does it work?",
]
print("=== Running queries with cost tracking ===\n")
for query in queries:
try:
result = client.complete(
messages=[{"role": "user", "content": query}],
user_id="user_8821",
team_id="platform-team",
feature="documentation-assistant",
model="claude-sonnet-4-6",
max_tokens=200,
)
print(f"Q: {query[:60]}")
print(f" Cost: ${result['cost_usd']:.6f} | "
f"Tokens: {result['input_tokens']}+{result['output_tokens']} | "
f"Latency: {result['latency_ms']:.0f}ms | "
f"Req: {result['request_id']}")
print()
except ValueError as e:
print(f"BLOCKED: {e}\n")
print("=== Spend Summary (current period) ===")
summary = tracker.spend_summary()
for dim, values in sorted(summary.items()):
for val, amount in sorted(values.items(), key=lambda x: -x[1]):
print(f" [{dim}] {val}: ${amount:.4f}")
print("\n=== Recent Events (last 5) ===")
for evt in tracker.recent_events(n=5):
print(f" {evt['feature']} | {evt['model']} | "
f"${evt['cost_usd']:.6f} | req={evt['req_id']}")
print("\n=== Anomaly Detection ===")
anomaly = tracker.detect_anomaly("documentation-assistant", window_days=7)
if anomaly:
print(f" ANOMALY DETECTED: {anomaly['alert']}")
print(f" Today: ${anomaly['today_spend']:.2f} | "
f"Baseline: ${anomaly['baseline_avg_daily']:.2f}/day | "
f"Ratio: {anomaly['spike_ratio']:.1f}x")
else:
print(" No anomaly detected (need 3+ days of data for baseline)")
if __name__ == "__main__":
demo_cost_tracking()
Monthly Spend Forecasting
Knowing current spend is necessary but not sufficient. Finance and engineering leadership need forward-looking projections: given current burn rate, will we stay within budget? This is especially important in the second half of the billing period, when there is still time to make corrections.
def forecast_monthly_spend(
daily_costs: list[float], # last N days of daily totals, chronological
budget_limit: Optional[float] = None,
) -> dict:
"""
Project end-of-month total spend based on recent daily trend.
Two scenarios:
- Flat projection: assumes daily average stays constant (best case)
- Trend projection: extrapolates the recent 3-day trend (catches acceleration)
Always present both - if the trend projection significantly exceeds the flat
projection, spending is accelerating and requires investigation.
"""
if len(daily_costs) < 2:
return {"error": "Need at least 2 days of data to forecast"}
avg_daily = sum(daily_costs) / len(daily_costs)
days_in_month = 30
days_elapsed = len(daily_costs)
days_remaining = max(0, days_in_month - days_elapsed)
spent_so_far = sum(daily_costs)
# Flat projection: current average rate
flat_projection = spent_so_far + avg_daily * days_remaining
# Trend projection: recent 3-day acceleration
if len(daily_costs) >= 3:
recent = daily_costs[-3:]
recent_avg = sum(recent) / len(recent)
trend_projection = spent_so_far + recent_avg * days_remaining
else:
trend_projection = flat_projection
result = {
"spent_so_far": round(spent_so_far, 2),
"days_elapsed": days_elapsed,
"days_remaining": days_remaining,
"avg_daily_usd": round(avg_daily, 2),
"recent_3day_avg_usd": round(sum(daily_costs[-3:]) / min(3, len(daily_costs)), 2),
"flat_projection_usd": round(flat_projection, 2),
"trend_projection_usd": round(trend_projection, 2),
"acceleration": trend_projection > flat_projection * 1.1, # 10% faster than flat
}
if budget_limit:
result["budget_limit"] = budget_limit
result["flat_pct_of_budget"] = round(flat_projection / budget_limit * 100, 1)
result["trend_pct_of_budget"] = round(trend_projection / budget_limit * 100, 1)
result["on_track"] = trend_projection <= budget_limit
result["days_until_budget_exhausted"] = (
round((budget_limit - spent_so_far) / (sum(daily_costs[-3:]) / 3), 1)
if sum(daily_costs[-3:]) > 0 else None
)
return result
# Example: 10 days into the month, spending is accelerating
daily_costs_example = [
45.23, 48.12, 52.44, 51.90, 60.11, # Days 1-5: relatively stable
62.33, 71.20, 68.90, 75.44, 89.23, # Days 6-10: clearly accelerating
]
forecast = forecast_monthly_spend(daily_costs_example, budget_limit=1500.00)
print(f"Spent so far (10 days): ${forecast['spent_so_far']:.2f}")
print(f"Daily average: ${forecast['avg_daily_usd']:.2f}")
print(f"Recent 3-day average: ${forecast['recent_3day_avg_usd']:.2f}")
print(f"Flat projection: ${forecast['flat_projection_usd']:.2f} "
f"({forecast.get('flat_pct_of_budget', 'N/A')}% of budget)")
print(f"Trend projection: ${forecast['trend_projection_usd']:.2f} "
f"({forecast.get('trend_pct_of_budget', 'N/A')}% of budget)")
print(f"On track: {forecast.get('on_track', 'N/A')}")
print(f"Spending accelerating: {forecast['acceleration']}")
if forecast.get('days_until_budget_exhausted'):
print(f"Budget exhausted in: {forecast['days_until_budget_exhausted']} days "
f"(at current 3-day rate)")
Cost Attribution Architecture
Key Cost Metrics Dashboard
These metrics should be visible in your cost management dashboard at all times:
| Metric | Granularity | Alert Condition |
|---|---|---|
| Current period spend by feature | Hourly | Exceeds 80% of feature budget |
| Current period spend by team | Hourly | Exceeds 80% of team budget |
| Daily spend vs 7-day rolling average | Daily | Today > 2x baseline (anomaly) |
| Cost per request by model | Hourly | Unexpected spike in P95 cost |
| Token count distribution by feature | Hourly | P99 > 10x median (outlier requests) |
| Cache hit rate and cost savings | Hourly | Hit rate below 20% for FAQ features |
| Unknown models in spend log | Real-time | Any unknown model = alert immediately |
| Forecast vs budget | Daily | Trend projection > 90% of budget |
Production Engineering Notes
:::tip Store cost events in PostgreSQL for complex analytics
Redis is ideal for real-time accumulation and pre-flight checks - O(1) atomic increments, sub-millisecond latency. But for complex queries (top N features by spend trend, cost per request by user cohort, month-over-month comparisons, per-team P95 cost), PostgreSQL is essential. Write every CostEvent to a cost_events table with indexed columns for user_id, team_id, feature, model, and timestamp. Run aggregations in SQL, not in application code iterating over Redis keys.
:::
:::warning Budget pre-flight checks add 1-3ms per request The pre-flight check (3 Redis GETs) adds roughly 1–3ms per request depending on Redis latency. For user-facing features with 500–5000ms total LLM latency, this is negligible (0.1% overhead). For high-throughput batch pipelines doing thousands of requests per second with Redis on a network boundary, consider caching the budget state locally with a short TTL (5 seconds) to reduce Redis load. :::
:::danger Pricing tables go stale - make them configurable, not hardcoded LLM providers change pricing with little notice. If your pricing table is hardcoded in application code, updating it requires a deployment. Store pricing in a YAML or JSON configuration file loaded at startup, or in a database table queryable at runtime. Add a monitoring rule: if a model appears in usage logs that has no entry in the pricing table, fire an alert immediately - you have a cost blind spot. Track "unknown model" events as a critical alert, not a warning. :::
:::info Never expose raw budget error messages to end users When a hard budget limit is exceeded and a request is blocked, the user-facing error should be generic and professional: "Service temporarily unavailable due to usage limits. Please try again later or contact support." Do not expose the specific limit amount, the dimension that was exceeded, the current spend amount, or any internal identifiers. This information is useful for internal debugging (log it server-side) but reveals system implementation details to users. :::
Common Mistakes
Mistake 1: Setting per-user limits without accounting for multi-session usage. A user working across a mobile app, web app, and API simultaneously generates requests from three contexts. All three share the same budget bucket (keyed on user_id), which is correct - the limit should apply to the user's total spend regardless of how many sessions they have. But the per-session UI should reflect the correct remaining budget, not the full limit. Fetch the current spend from Redis and subtract from the limit to show the remaining allowance.
Mistake 2: Not configuring per-feature limits before a feature goes to production. Setting limits after launch means the "undocumented internal tool" scenario can still happen for the weeks between launch and limit configuration. Make per-feature budget limits a required step in your feature review process - no production launch without a budget config entry.
Mistake 3: Alerting on every threshold crossing without cooldown. If a threshold is crossed at 500, and 1,000 requests arrive in the next 10 minutes each pushing the total up, you will send 1,000 Slack alerts. The on-call engineer will mute the alert and ignore it. The cooldown (e.g., 1 alert per hour per threshold per dimension) prevents alert fatigue while ensuring the first crossing always fires.
Mistake 4: Treating $0 cost events from cache hits as anomalies. If you track all requests in the cost event log (including cache hits with cost_usd=0.0), your anomaly detection must exclude zero-cost events from the spend rate calculation. A spike in cache hit rate will drive your daily cost down, not up - this is a positive signal, not an anomaly. Separate the count of cache hits from the count of actual LLM calls in your dashboard.
Interview Q&A
Q: How do you calculate the exact cost of an LLM request in real time?
Every LLM API response includes exact token counts in the usage field. The cost formula is: (input_tokens / 1_000_000) * input_price_per_million + (output_tokens / 1_000_000) * output_price_per_million. Provider pricing tables list prices per million tokens per model. The gateway records both token counts from the response, applies the formula at request time, and emits a cost event with the calculated amount. For models with prompt caching (like Claude Sonnet), there is a third term for cache read tokens at the discounted cache read price. The total cost is stored in Redis using INCRBYFLOAT - an atomic increment that prevents race conditions when multiple concurrent requests update the same counter simultaneously.
Q: What is the difference between a hard budget limit and a soft budget limit?
A hard limit is an enforcement mechanism: when accumulated spend reaches the threshold, the gateway rejects subsequent requests with a budget exceeded error. No more LLM calls are made until the period resets or the limit is raised. A soft limit is an alerting mechanism: when spend reaches the soft threshold, an alert fires (Slack, PagerDuty, email), but requests continue to be served. The recommended pattern is: soft limit at 80% of hard limit. This gives engineers 20% of budget headroom to investigate after the warning fires before the hard enforcement stops the service. A typical configuration: 400/month soft limit, percentage alert at 80% ($400).
Q: How do you prevent budget alert spam when a threshold is crossed repeatedly?
Track which alerts have been sent using an in-memory dictionary keyed by alert_type:dimension:value:period. On the first threshold crossing, send the alert and record the alert key with the current timestamp. On subsequent crossings within the cooldown period (e.g., 1 hour), check whether the cooldown has elapsed since the last alert. If not, skip. This fires at most one alert per cooldown period per threshold, regardless of how many requests cross the threshold. Reset the alert cache at the start of each billing period so that the first crossing of the new period always fires.
Q: How would you implement cost anomaly detection without predefined static budget thresholds?
Use a rolling baseline. Store daily spend totals in Redis (one key per day per feature). Compute the 7-day moving average as the baseline. At the end of each day (or every few hours projecting the day's rate), compare the current day's spend to the baseline. If today's spend exceeds baseline_average * spike_multiplier (e.g., 2x), fire an anomaly alert. This approach adapts to each feature's normal spending pattern automatically: a feature that normally costs 1,000/day. The anomaly detector doesn't require you to predict the correct spend level in advance - it learns from recent history.
Q: A new feature is deployed and LLM costs spike 10x within the first two hours. How does your system detect and respond?
With anomaly detection in place: the per-feature spend rate for the new feature exceeds the rolling baseline by more than 2x within the first hour, triggering a Slack alert. With a feature-level hard limit configured: the feature stops being served once it hits the limit, containing the total damage. The on-call engineer receives the anomaly alert, queries recent cost events filtered by feature name, sees the volume and per-call cost, and traces it to a specific endpoint or configuration. Without feature-level limits (the failure case): the anomaly alert fires but requests continue until the monthly budget is exhausted. The lesson: always configure per-feature hard limits before production launch, not after the invoice arrives.
Q: How do you handle cost tracking when some requests are served from semantic cache?
Record all requests in the cost event log, including cache hits - but with cost_usd = 0.0 and a cache_hit = True flag. This enables two important calculations. First, budget tracking counts only the real LLM costs, not hypothetical costs - cache hits do not consume budget. Second, cost savings reporting computes the theoretical cost that would have been incurred without caching: the number of cache hits multiplied by the average cost-per-miss. This "cost avoidance" metric is what justifies the investment in semantic caching infrastructure. Finance and leadership understand "we avoided $11,000 in LLM costs last month through caching" much better than "our cache hit rate was 38%."
Q: How would you build a cost dashboard that shows spend by team and feature simultaneously?
Use a two-level Redis key structure: cost:team:{team_id}:{period} and cost:feature:{feature}:{period}. After each request, increment both keys atomically (in a Redis pipeline to minimize round-trips). For the dashboard query, scan all keys matching cost:team:*:{current_period} and cost:feature:*:{current_period} to get current totals. For cross-dimensional analysis (team X's spend on feature Y), you need PostgreSQL: the cost event table with (team_id, feature, cost_usd, timestamp) supports GROUP BY queries for any combination. The Redis layer handles real-time enforcement; the PostgreSQL layer handles retrospective analysis.
Per-Request Cost Visibility: What to Include in API Responses
Every LLM response from your gateway should include cost metadata in the response body. This makes cost visible to calling teams without requiring them to query a separate analytics endpoint, and creates a feedback loop where engineers can see the cost impact of their implementation choices during development.
# Recommended response envelope from the gateway
example_response = {
# LLM response content
"response": "The answer to your question...",
"model": "claude-sonnet-4-6",
# Token attribution
"input_tokens": 1247,
"output_tokens": 312,
# Cost attribution (always include - makes cost visible at the API layer)
"cost_usd": 0.000843, # This specific request
"period_spend_usd": 14.27, # Caller's total spend this period (from tracker)
"period_budget_usd": 200.00, # Caller's budget for the period
"budget_remaining_usd": 185.73,
# Request tracing
"request_id": "a7f3b2c1", # For correlating with cost event logs
"latency_ms": 847,
"cache_hit": False,
# Rate limit headers (also set as HTTP headers)
"rate_limit_remaining_tokens": 142_500,
"rate_limit_reset_seconds": 12.4,
}
Include X-Cost-USD, X-Budget-Remaining-USD, and X-RateLimit-Remaining-Tokens as HTTP response headers as well. This allows API consumers to display budget status in their UIs without a separate API call, and allows CLI tools and scripts to surface cost information inline.
Model Cost Comparison: Choosing the Right Model for Each Use Case
A significant lever on LLM cost is model selection. Different models have dramatically different price points for similar quality on specific task types.
| Use Case | Recommended Model | Cost per 1k calls (est.) | Notes |
|---|---|---|---|
| Simple FAQ, classification | claude-haiku-4-5-20251001 | 0.20 | 100in+50out per call |
| Document summarization (1–5k) | claude-sonnet-4-6 | 9.00 | 5k in + 500 out |
| Complex reasoning, analysis | claude-sonnet-4-6 | 18.00 | 10k in + 1k out |
| Highest quality, legal/medical | claude-opus-4-6 | 90 | Use sparingly |
| Code generation (short) | claude-haiku-4-5-20251001 | 0.80 | Often quality-sufficient |
| Batch classification at scale | claude-haiku-4-5-20251001 | 0.16 | Cache system prompt |
The most impactful cost optimization is routing to the cheapest model that meets quality requirements - not paying for premium model capacity on tasks where the budget model is indistinguishable.
def model_cost_comparison(
input_tokens: int,
output_tokens: int,
daily_calls: int,
) -> None:
"""Print a cost comparison across models for a given request profile."""
models_to_compare = [
"claude-haiku-4-5-20251001",
"claude-sonnet-4-6",
"claude-opus-4-6",
"gpt-4o-mini",
"gpt-4o",
]
print(f"Cost comparison: {input_tokens} input + {output_tokens} output tokens")
print(f"Daily calls: {daily_calls:,}")
print(f"{'Model':<30} {'Per Call':>10} {'Daily':>12} {'Monthly':>12}")
print("-" * 70)
for model in models_to_compare:
cost_per_call = calculate_cost(model, input_tokens, output_tokens)
if cost_per_call == 0.0:
continue
daily_cost = cost_per_call * daily_calls
monthly_cost = daily_cost * 30
print(f"{model:<30} ${cost_per_call:>8.5f} ${daily_cost:>10.2f} ${monthly_cost:>10.2f}")
# Example: what it costs to run 10,000 document analyses per day
model_cost_comparison(
input_tokens=5_000,
output_tokens=500,
daily_calls=10_000,
)
PostgreSQL Schema for Cost Event Storage
For complex analytics beyond what Redis key scanning supports, write every cost event to PostgreSQL. This enables SQL-based aggregations across any dimension combination.
-- Cost events table: one row per LLM request
CREATE TABLE cost_events (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
request_id TEXT NOT NULL,
user_id TEXT NOT NULL,
team_id TEXT NOT NULL,
feature TEXT NOT NULL,
model TEXT NOT NULL,
provider TEXT NOT NULL,
input_tokens INT NOT NULL,
output_tokens INT NOT NULL,
cost_usd NUMERIC(12, 8) NOT NULL,
latency_ms NUMERIC(10, 2),
cache_hit BOOLEAN NOT NULL DEFAULT FALSE
);
-- Indexes for common query patterns
CREATE INDEX idx_cost_events_timestamp ON cost_events (timestamp DESC);
CREATE INDEX idx_cost_events_user_id ON cost_events (user_id, timestamp DESC);
CREATE INDEX idx_cost_events_team_id ON cost_events (team_id, timestamp DESC);
CREATE INDEX idx_cost_events_feature ON cost_events (feature, timestamp DESC);
CREATE INDEX idx_cost_events_model ON cost_events (model, timestamp DESC);
-- Example analytics queries:
-- Top 10 features by spend this month
-- SELECT feature, SUM(cost_usd) AS total_spend, COUNT(*) AS requests,
-- AVG(cost_usd) AS avg_per_request
-- FROM cost_events
-- WHERE timestamp >= DATE_TRUNC('month', NOW())
-- AND NOT cache_hit
-- GROUP BY feature
-- ORDER BY total_spend DESC
-- LIMIT 10;
-- Daily spend trend for one feature (last 30 days)
-- SELECT DATE(timestamp) AS day, SUM(cost_usd) AS daily_spend, COUNT(*) AS requests
-- FROM cost_events
-- WHERE feature = 'documentation-assistant'
-- AND timestamp >= NOW() - INTERVAL '30 days'
-- AND NOT cache_hit
-- GROUP BY DATE(timestamp)
-- ORDER BY day;
-- Cost per model, comparing input vs output token ratios
-- SELECT model, SUM(cost_usd) AS total, SUM(input_tokens) AS total_input,
-- SUM(output_tokens) AS total_output,
-- ROUND(AVG(cost_usd)::numeric, 6) AS avg_cost_per_req
-- FROM cost_events
-- WHERE timestamp >= DATE_TRUNC('month', NOW())
-- GROUP BY model
-- ORDER BY total DESC;
