Token Cost Monitoring
The $40,000 Surprise
The week started normally. The LLM application team had shipped a new feature the previous Friday: a "deep analysis" mode that allowed users to ask the AI assistant to perform comprehensive analysis of uploaded documents. The feature worked beautifully in staging. Users loved it in the preview.
On Thursday, the engineering manager received an email from the CFO. "Can you explain why our OpenAI API spend this week is 12,000."
The engineering team pulled the billing logs. The deep analysis feature was sending the full contents of large uploaded documents to GPT-4 as part of the prompt - sometimes 50,000 tokens per request. A subset of power users had discovered the feature and were running it dozens of times per day. One user had triggered 847 deep analysis calls in four days, generating $2,200 in API costs alone.
The feature was correct - it did exactly what it was designed to do. The cost model had never been designed at all. Nobody had calculated the cost per request. Nobody had set per-user rate limits. Nobody had set a monthly budget alert. Nobody had built a dashboard to show real-time cost trends. The first signal that something was wrong was a CFO email four days after the problem started.
By Thursday, $38,000 had been spent. A cost anomaly detection system would have caught this pattern within 4 hours of the first day.
:::tip 🎮 Interactive Playground Visualize this concept: Try the LLM Token Cost Monitor demo on the EngineersOfAI Playground - no code required. :::
LLM Cost Is Different from Traditional Serving Cost
In traditional ML serving, cost is roughly fixed per request: each inference call uses a predictable amount of GPU time regardless of input content. A classification model that classifies 100 images costs 100x the cost of classifying 1 image. Simple math.
LLM API cost scales with token count:
The problem: token count is highly variable and user-controlled:
- A user who uploads a 100-page PDF as context generates 50,000 input tokens
- A user who asks a simple question generates 200 input tokens
- A factor of 250x cost difference between the same feature, depending on user behavior
This variability means you cannot reason about cost from request counts alone. You need token-level visibility.
What to Log for Cost Monitoring
Every LLM API call must be logged with enough detail to compute cost, attribute it to a feature and user, and detect anomalies.
import time
import json
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any, List
import openai
# Token pricing table (update when providers change pricing)
TOKEN_PRICING = {
"gpt-4-turbo": {
"input": 0.01 / 1000, # $0.01 per 1K input tokens
"output": 0.03 / 1000, # $0.03 per 1K output tokens
},
"gpt-4o": {
"input": 0.005 / 1000,
"output": 0.015 / 1000,
},
"gpt-3.5-turbo": {
"input": 0.0005 / 1000,
"output": 0.0015 / 1000,
},
"claude-3-5-sonnet": {
"input": 0.003 / 1000,
"output": 0.015 / 1000,
},
"claude-3-haiku": {
"input": 0.00025 / 1000,
"output": 0.00125 / 1000,
},
}
@dataclass
class LLMCallRecord:
"""
Complete record of a single LLM API call.
Stored for cost tracking, debugging, and quality monitoring.
"""
call_id: str
timestamp: str
model: str
feature_name: str # "document_analysis", "support_chat", "code_review"
user_id: Optional[str]
session_id: Optional[str]
# Token counts
input_tokens: int
output_tokens: int
total_tokens: int
# Computed cost
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float
# Performance
latency_ms: float
tokens_per_second: float
# Request metadata
temperature: float
max_tokens: int
prompt_hash: str # hash of system prompt for prompt versioning
finish_reason: str # "stop", "length", "content_filter"
# Quality signals
cache_hit: bool = False # was this a cache hit (cost saved)?
error: Optional[str] = None
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items()}
class LLMCostTracker:
"""
Wraps the OpenAI client to automatically log every call with cost attribution.
Drop-in replacement for direct OpenAI client usage.
"""
def __init__(
self,
openai_client,
event_sink, # database, message queue, or logging system
feature_name: str,
user_id: Optional[str] = None
):
self.client = openai_client
self.sink = event_sink
self.feature_name = feature_name
self.user_id = user_id
def chat_complete(
self,
messages: List[dict],
model: str = "gpt-4-turbo",
temperature: float = 0.7,
max_tokens: int = 1024,
session_id: Optional[str] = None,
**kwargs
) -> openai.types.chat.ChatCompletion:
"""
Wrapper around OpenAI chat completions that logs cost and performance.
"""
call_id = hashlib.md5(
f"{self.user_id}:{session_id}:{time.time()}".encode()
).hexdigest()[:12]
# Compute prompt hash for tracking prompt versions
system_prompt = next(
(m["content"] for m in messages if m["role"] == "system"), ""
)
prompt_hash = hashlib.sha256(system_prompt.encode()).hexdigest()[:12]
start_time = time.perf_counter()
error = None
response = None
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
except Exception as e:
error = str(e)
raise
finally:
latency_ms = (time.perf_counter() - start_time) * 1000
if response is not None:
usage = response.usage
pricing = TOKEN_PRICING.get(model, {"input": 0, "output": 0})
input_cost = usage.prompt_tokens * pricing["input"]
output_cost = usage.completion_tokens * pricing["output"]
total_cost = input_cost + output_cost
tps = usage.completion_tokens / max(latency_ms / 1000, 0.001)
record = LLMCallRecord(
call_id=call_id,
timestamp=datetime.now().isoformat(),
model=model,
feature_name=self.feature_name,
user_id=self.user_id,
session_id=session_id,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
total_tokens=usage.total_tokens,
input_cost_usd=input_cost,
output_cost_usd=output_cost,
total_cost_usd=total_cost,
latency_ms=latency_ms,
tokens_per_second=tps,
temperature=temperature,
max_tokens=max_tokens,
prompt_hash=prompt_hash,
finish_reason=response.choices[0].finish_reason,
error=error,
)
self.sink.log(record.to_dict())
return response
# Example: instrumented feature function
def analyze_document_with_tracking(
document_text: str,
user_id: str,
session_id: str,
openai_client
) -> str:
"""
Document analysis with full cost tracking.
"""
tracker = LLMCostTracker(
openai_client=openai_client,
event_sink=get_event_sink(), # returns your logging backend
feature_name="document_analysis",
user_id=user_id,
)
messages = [
{"role": "system", "content": "You are a document analysis assistant. Provide a comprehensive analysis."},
{"role": "user", "content": f"Analyze this document:\n\n{document_text}"}
]
response = tracker.chat_complete(
messages=messages,
model="gpt-4-turbo",
temperature=0.3,
max_tokens=2048,
session_id=session_id,
)
return response.choices[0].message.content
Cost Dashboards and Anomaly Detection
Raw logs need to become actionable visibility. Build dashboards and automated alerts.
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional
class CostAnalyzer:
"""
Analyzes LLM cost logs to produce dashboards and detect anomalies.
In production: integrate with Grafana, Datadog, or a custom dashboard.
"""
def __init__(self, cost_records: pd.DataFrame):
"""
cost_records: DataFrame with columns matching LLMCallRecord fields.
Expected index: timestamp (datetime)
"""
self.df = cost_records
if "timestamp" in self.df.columns:
self.df["timestamp"] = pd.to_datetime(self.df["timestamp"])
self.df = self.df.set_index("timestamp").sort_index()
def cost_by_feature(self, window_days: int = 7) -> pd.DataFrame:
"""
Cost breakdown by feature over the past N days.
Answers: which features are driving cost?
"""
cutoff = datetime.now() - timedelta(days=window_days)
recent = self.df[self.df.index >= cutoff]
return recent.groupby("feature_name").agg(
total_cost=("total_cost_usd", "sum"),
n_calls=("call_id", "count"),
mean_cost_per_call=("total_cost_usd", "mean"),
p95_cost_per_call=("total_cost_usd", lambda x: x.quantile(0.95)),
mean_input_tokens=("input_tokens", "mean"),
mean_output_tokens=("output_tokens", "mean"),
).sort_values("total_cost", ascending=False)
def cost_by_user(self, window_days: int = 7, top_n: int = 20) -> pd.DataFrame:
"""
Cost breakdown by user. Identifies high-spend users that may need rate limiting.
"""
cutoff = datetime.now() - timedelta(days=window_days)
recent = self.df[self.df.index >= cutoff]
return recent.groupby("user_id").agg(
total_cost=("total_cost_usd", "sum"),
n_calls=("call_id", "count"),
mean_cost_per_call=("total_cost_usd", "mean"),
max_single_call_cost=("total_cost_usd", "max"),
).sort_values("total_cost", ascending=False).head(top_n)
def hourly_cost_trend(self, days: int = 7) -> pd.DataFrame:
"""Hourly cost time series for trend visualization."""
cutoff = datetime.now() - timedelta(days=days)
return self.df[self.df.index >= cutoff].resample("H")["total_cost_usd"].sum()
def cost_efficiency_metrics(self) -> dict:
"""
Compute token efficiency metrics that reveal optimization opportunities.
"""
recent = self.df.last("7D")
# Output token ratio: high ratio means models are generating long responses
output_ratio = recent["output_tokens"] / recent["input_tokens"].replace(0, np.nan)
# Max tokens utilization: if finish_reason is always "length", max_tokens is too low
# If it's always "stop", max_tokens may be wastefully high
finish_reasons = recent["finish_reason"].value_counts(normalize=True)
return {
"mean_input_tokens": recent["input_tokens"].mean(),
"mean_output_tokens": recent["output_tokens"].mean(),
"p95_input_tokens": recent["input_tokens"].quantile(0.95),
"p95_output_tokens": recent["output_tokens"].quantile(0.95),
"mean_output_input_ratio": output_ratio.mean(),
"finish_reason_distribution": finish_reasons.to_dict(),
"truncation_rate": finish_reasons.get("length", 0), # responses cut off by max_tokens
}
def detect_cost_anomalies(
self,
lookback_days: int = 14,
sensitivity: float = 3.0 # standard deviations
) -> List[dict]:
"""
Detect unusual cost spikes using z-score on hourly cost.
Returns list of anomalous time windows.
"""
hourly = self.hourly_cost_trend(days=lookback_days)
if len(hourly) < 24:
return []
mean_cost = hourly.mean()
std_cost = hourly.std()
if std_cost == 0:
return []
z_scores = (hourly - mean_cost) / std_cost
anomalies = z_scores[z_scores > sensitivity]
results = []
for timestamp, z_score in anomalies.items():
# Find top users in this anomalous hour
hour_data = self.df[
(self.df.index >= timestamp) &
(self.df.index < timestamp + timedelta(hours=1))
]
top_users = hour_data.groupby("user_id")["total_cost_usd"].sum().nlargest(3)
results.append({
"timestamp": str(timestamp),
"hourly_cost": float(hourly[timestamp]),
"z_score": float(z_score),
"normal_hourly_cost": float(mean_cost),
"top_cost_users": top_users.to_dict(),
"top_features": hour_data.groupby("feature_name")["total_cost_usd"].sum().to_dict(),
})
return results
def generate_cost_report(analyzer: CostAnalyzer, budget_monthly_usd: float) -> str:
"""Generate a human-readable cost report for weekly review."""
feature_costs = analyzer.cost_by_feature(window_days=7)
user_costs = analyzer.cost_by_user(window_days=7, top_n=5)
efficiency = analyzer.cost_efficiency_metrics()
anomalies = analyzer.detect_cost_anomalies()
total_7d = feature_costs["total_cost"].sum()
projected_monthly = total_7d * (30 / 7)
budget_utilization = projected_monthly / budget_monthly_usd
report = f"""=== LLM Cost Report - {datetime.now().strftime('%Y-%m-%d')} ===
SUMMARY
Last 7 days: ${total_7d:.2f}
Projected monthly: ${projected_monthly:.2f}
Monthly budget: ${budget_monthly_usd:.2f}
Budget utilization: {budget_utilization:.0%} {'⚠️ OVER BUDGET' if budget_utilization > 1.0 else '✓'}
COST BY FEATURE (7 days)
"""
for feature, row in feature_costs.iterrows():
report += f" {feature:30s}: ${row['total_cost']:8.2f} ({row['n_calls']:,} calls, "
report += f"${row['mean_cost_per_call']:.4f}/call)\n"
report += f"""
TOP COST USERS (7 days)
"""
for user_id, row in user_costs.iterrows():
report += f" {str(user_id)[:20]:20s}: ${row['total_cost']:8.2f} ({row['n_calls']:,} calls)\n"
report += f"""
EFFICIENCY METRICS
Mean input tokens: {efficiency['mean_input_tokens']:.0f}
Mean output tokens: {efficiency['mean_output_tokens']:.0f}
P95 input tokens: {efficiency['p95_input_tokens']:.0f}
Truncation rate: {efficiency['truncation_rate']:.1%}
ANOMALIES DETECTED (past 14 days): {len(anomalies)}
"""
for anomaly in anomalies[:3]:
report += f" {anomaly['timestamp']}: ${anomaly['hourly_cost']:.2f}/hr "
report += f"({anomaly['z_score']:.1f} std devs above normal)\n"
return report
Cost Controls: Rate Limiting and Budget Enforcement
Detecting cost anomalies is not enough - you need controls that prevent runaway spending.
import redis
from functools import wraps
class UserCostRateLimiter:
"""
Per-user cost budget enforcement using Redis for distributed state.
Limits:
- Per-request token count (prevents one massive call)
- Per-hour spending (rate limiting)
- Per-day spending (daily budget)
- Per-month spending (monthly cap)
"""
def __init__(self, redis_client, limits: dict = None):
self.redis = redis_client
self.limits = limits or {
"max_input_tokens_per_request": 32_000,
"max_output_tokens_per_request": 4_096,
"max_cost_per_hour_usd": 1.00,
"max_cost_per_day_usd": 5.00,
"max_cost_per_month_usd": 50.00,
}
def check_request_allowed(self, user_id: str, estimated_input_tokens: int) -> dict:
"""
Check if a request is allowed before sending to the LLM.
Returns: {"allowed": bool, "reason": str, "retry_after_seconds": int}
"""
# Check per-request token limit
if estimated_input_tokens > self.limits["max_input_tokens_per_request"]:
return {
"allowed": False,
"reason": f"Input too large: {estimated_input_tokens} tokens "
f"(limit: {self.limits['max_input_tokens_per_request']})",
"retry_after_seconds": 0,
}
# Check hourly cost limit
hourly_key = f"user_cost_hour:{user_id}:{datetime.now().strftime('%Y%m%d%H')}"
hourly_spend = float(self.redis.get(hourly_key) or 0)
if hourly_spend >= self.limits["max_cost_per_hour_usd"]:
seconds_until_next_hour = 3600 - datetime.now().minute * 60 - datetime.now().second
return {
"allowed": False,
"reason": f"Hourly budget exceeded: ${hourly_spend:.2f} "
f"(limit: ${self.limits['max_cost_per_hour_usd']})",
"retry_after_seconds": seconds_until_next_hour,
}
# Check daily cost limit
daily_key = f"user_cost_day:{user_id}:{datetime.now().strftime('%Y%m%d')}"
daily_spend = float(self.redis.get(daily_key) or 0)
if daily_spend >= self.limits["max_cost_per_day_usd"]:
return {
"allowed": False,
"reason": f"Daily budget exceeded: ${daily_spend:.2f} "
f"(limit: ${self.limits['max_cost_per_day_usd']})",
"retry_after_seconds": (24 - datetime.now().hour) * 3600,
}
return {"allowed": True, "reason": "ok", "retry_after_seconds": 0}
def record_actual_cost(self, user_id: str, actual_cost_usd: float):
"""
Record actual cost after a successful API call.
Updates all budget windows atomically.
"""
now = datetime.now()
pipe = self.redis.pipeline()
# Hourly window (expires after 2 hours)
hourly_key = f"user_cost_hour:{user_id}:{now.strftime('%Y%m%d%H')}"
pipe.incrbyfloat(hourly_key, actual_cost_usd)
pipe.expire(hourly_key, 7200)
# Daily window (expires after 2 days)
daily_key = f"user_cost_day:{user_id}:{now.strftime('%Y%m%d')}"
pipe.incrbyfloat(daily_key, actual_cost_usd)
pipe.expire(daily_key, 172800)
# Monthly window (expires after 35 days)
monthly_key = f"user_cost_month:{user_id}:{now.strftime('%Y%m')}"
pipe.incrbyfloat(monthly_key, actual_cost_usd)
pipe.expire(monthly_key, 3024000)
pipe.execute()
class GlobalBudgetGuard:
"""
Team/org-level budget enforcement.
Shuts down expensive features when budget threshold is hit.
"""
def __init__(self, redis_client, monthly_budget_usd: float, alert_threshold: float = 0.80):
self.redis = redis_client
self.monthly_budget = monthly_budget_usd
self.alert_threshold = alert_threshold
def record_cost(self, cost_usd: float, feature: str):
"""Record global cost and trigger alerts if approaching budget."""
month_key = f"global_cost:{datetime.now().strftime('%Y%m')}"
new_total = float(self.redis.incrbyfloat(month_key, cost_usd))
self.redis.expire(month_key, 3024000)
utilization = new_total / self.monthly_budget
if utilization >= 1.0:
self._trigger_alert("BUDGET_EXCEEDED", new_total, feature)
elif utilization >= self.alert_threshold:
self._trigger_alert("BUDGET_WARNING", new_total, feature)
return new_total
def _trigger_alert(self, alert_type: str, current_spend: float, feature: str):
"""Send alert via PagerDuty, Slack, email, etc."""
message = (
f"[{alert_type}] LLM API spend: ${current_spend:.2f} / "
f"${self.monthly_budget:.2f} ({current_spend/self.monthly_budget:.0%}). "
f"Last call: feature={feature}"
)
print(f"ALERT: {message}")
# In production: send to PagerDuty, Slack webhook, etc.
Observability Tools: LangSmith and Langfuse
LangSmith: Official LangChain tracing and evaluation platform. Best if you are using LangChain. Provides full trace visibility (which node in the chain ran what), cost per trace, automatic token counting, and integration with LangChain's evaluation framework. Requires sending data to LangChain's cloud.
Langfuse (open-source): Self-hostable alternative to LangSmith. Supports any LLM framework (not just LangChain). Provides traces, cost tracking, quality scores, and A/B testing for prompts. Integrates with the OpenAI SDK via a drop-in replacement client. Best choice for teams with data residency requirements or who need full control.
# Langfuse integration (minimal example)
from langfuse import Langfuse
from langfuse.decorators import observe
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com" # or your self-hosted URL
)
@observe() # Auto-traces this function with input/output/cost
def analyze_document(document: str, user_id: str) -> str:
client = langfuse.get_client() # patched OpenAI client that auto-logs
response = client.chat.completions.create(
model="gpt-4-turbo",
messages=[
{"role": "system", "content": "Analyze the provided document."},
{"role": "user", "content": document}
],
user=user_id, # Langfuse uses this for cost attribution
)
return response.choices[0].message.content
Token Optimization Techniques
The best cost control is using fewer tokens without degrading quality.
class TokenOptimizer:
"""
Techniques for reducing token consumption without hurting quality.
"""
@staticmethod
def truncate_input_to_budget(
messages: List[dict],
max_input_tokens: int,
tokenizer,
priority_order: List[str] = None
) -> List[dict]:
"""
Truncate message history to fit within a token budget.
Preserves system prompt and most recent messages by default.
"""
priority_order = priority_order or ["system", "recent_user", "recent_assistant", "history"]
# Separate message types
system_msgs = [m for m in messages if m["role"] == "system"]
non_system = [m for m in messages if m["role"] != "system"]
# Always keep system prompt and last user message
must_keep = system_msgs + (non_system[-1:] if non_system else [])
optional = non_system[:-1]
# Count tokens in must-keep messages
must_keep_tokens = sum(len(tokenizer.encode(m["content"])) for m in must_keep)
available_for_history = max_input_tokens - must_keep_tokens
# Add history from most recent backward until budget is exhausted
history_to_include = []
for msg in reversed(optional):
msg_tokens = len(tokenizer.encode(msg["content"]))
if available_for_history - msg_tokens >= 0:
history_to_include.insert(0, msg)
available_for_history -= msg_tokens
else:
break
return system_msgs + history_to_include + (non_system[-1:] if non_system else [])
@staticmethod
def compress_context(
context_chunks: List[str],
query: str,
llm_client,
max_context_tokens: int = 2000
) -> str:
"""
Compress multiple retrieved context chunks into a concise context.
Reduces input tokens by summarizing redundant information.
"""
combined = "\n\n".join(context_chunks)
# Only compress if we are significantly over budget
estimated_tokens = len(combined.split()) * 1.3 # rough estimate
if estimated_tokens <= max_context_tokens:
return combined
compress_prompt = f"""Compress the following context into the most relevant information for answering: "{query}"
Be concise. Preserve specific facts, numbers, and names. Remove repetition.
Context:
{combined}
Compressed context:"""
response = llm_client.chat.completions.create(
model="gpt-3.5-turbo", # Use cheap model for compression
messages=[{"role": "user", "content": compress_prompt}],
temperature=0.0,
max_tokens=max_context_tokens
)
return response.choices[0].message.content
@staticmethod
def cache_common_responses(query_embedding: List[float],
all_embeddings: List[List[float]],
all_responses: List[str],
similarity_threshold: float = 0.95) -> Optional[str]:
"""
Semantic cache: find cached response for semantically similar query.
Returns cached response if similarity exceeds threshold, else None.
"""
import numpy as np
if not all_embeddings:
return None
query_arr = np.array(query_embedding)
cache_arr = np.array(all_embeddings)
# Cosine similarities
query_norm = query_arr / np.linalg.norm(query_arr)
cache_norms = cache_arr / np.linalg.norm(cache_arr, axis=1, keepdims=True)
similarities = cache_norms @ query_norm
best_match_idx = np.argmax(similarities)
best_similarity = similarities[best_match_idx]
if best_similarity >= similarity_threshold:
return all_responses[best_match_idx]
return None
Production Engineering Notes
Real-time vs batch cost tracking: Log every API call in real-time (per-call records to a message queue or event stream). Aggregate into cost dashboards using batch queries (hourly or daily rollups in BigQuery/Redshift). Real-time records are for anomaly detection; aggregated dashboards are for budgeting and analysis.
Model downgrade paths: Build a model selection layer that can automatically route to cheaper models when budget thresholds approach. "If daily spend exceeds $X, route non-critical features to GPT-3.5-turbo instead of GPT-4." This requires that your prompts work acceptably on cheaper models - test this during development.
Cache hit rate: Track your semantic cache hit rate as a cost efficiency metric. A 30% cache hit rate means 30% of API costs are avoided. Low hit rate means your queries are highly variable (normal for certain use cases) or your cache similarity threshold is too aggressive.
Cost per user segment: Different user tiers may have different cost profiles. Free users should have strict limits; enterprise customers may have higher limits but also higher revenue. Your cost monitoring must support cost attribution by user tier to validate that the unit economics of each tier are sustainable.
Common Mistakes
:::danger No Per-User Spend Limits Without per-user limits, a single user running automated workflows can consume your entire monthly budget in hours. Implement per-user hourly, daily, and monthly cost caps before launching any LLM feature to end users. This is non-negotiable. The cap levels should be derived from your unit economics: if a user pays 20/month. :::
:::danger Logging Token Counts But Not Costs Token counts are not costs. A 1,000-token GPT-4 call costs 0.002 - a 20x difference. When you have multiple models in your system, you must compute and log dollar costs per call (not just token counts) to aggregate meaningfully. Always multiply token counts by model-specific pricing in your logging layer. :::
:::warning Setting max_tokens Too High If you set max_tokens=4096 on every request but average outputs are 200 tokens, you are not wasting money (you only pay for tokens generated), but you are reserving GPU capacity for long responses that rarely materialize. More importantly, a max_tokens that is much larger than typical outputs signals a design smell: you have not thought about appropriate response length. Set max_tokens to 2x your expected output length, with a hard cap appropriate for the use case. :::
:::warning Not Monitoring Cost Trends, Only Point-in-Time Values "Current spend is $1,200 this month" is less useful than "spend is growing at 15% per week and will exceed budget in 12 days." Build trend monitoring: compute week-over-week growth rates by feature, detect sustained upward trends (not just anomalous spikes), and project when budget will be exhausted at the current growth rate. Alert on trends, not just thresholds. :::
Interview Q&A
Q: How would you build a cost monitoring system for an LLM application with a $10,000/month API budget?
A: Four layers. First, per-call logging: instrument every LLM API call to log model, token counts, computed cost (tokens × model-specific price), user ID, feature name, latency, and finish reason. Store in a time-series database or data warehouse. Second, real-time anomaly detection: compute hourly cost totals and compare against a rolling baseline. Alert via PagerDuty when hourly cost is more than 3 standard deviations above the baseline, or when a single user generates more than 1% of daily budget in one hour. Third, per-user rate limits: enforce per-user hourly (5), and monthly (10K budget, set alert thresholds at 9K (90%), and enforce a feature shutdown or model downgrade at $9.5K to prevent overage.
Q: What is semantic caching for LLMs and when is it effective?
A: Semantic caching stores LLM responses keyed by the semantic meaning of the query (using an embedding), not the exact text. When a new query arrives, you embed it and check cosine similarity against cached query embeddings. If a cached query is sufficiently similar (typically cosine similarity above 0.92–0.95), return the cached response instead of making a new API call. This is effective when: users ask semantically similar questions repeatedly (customer support, FAQ-style applications), the correct answer is stable (not time-sensitive), and you have enough query volume for cache hits to be frequent. It is ineffective when: queries are highly unique (creative writing, personalized analysis), responses need to reflect real-time data, or the application has very low query volume. Measure cache hit rate as a key metric - a well-tuned semantic cache can reduce API costs by 20–40% in customer support applications, but poorly tuned (threshold too low) it returns wrong answers; threshold too high, it never hits.
Q: How do you attribute LLM costs to different features in a multi-feature application?
A: Feature attribution requires tagging every API call with the feature that triggered it. Implementation: create a context object at the API boundary that captures the feature name, propagate it through your call stack (using Python's contextvars for async code), and include it in every logged API call. The tagging must happen at the feature level, not just at the model level - if the same LLM model serves both the chatbot and the document analyzer, you need to distinguish which feature generated each call. In the cost analysis layer, group costs by feature name to produce per-feature cost breakdowns. This attribution enables: (1) identifying which features have unsustainable unit economics (cost per call exceeds revenue per call), (2) prioritizing optimization work (optimize the most expensive features first), (3) setting appropriate per-feature rate limits based on feature revenue. For multi-tenant products, also track cost by customer tier so you can validate that enterprise customers' API costs are below their contract value.
Q: Describe the token efficiency metrics you would track for an LLM system.
A: Key metrics: mean and P95 input tokens per request (high P95 indicates a small fraction of requests consuming disproportionate cost - investigate those users/queries), mean and P95 output tokens per request (very high output tokens may indicate prompts that encourage verbose responses), truncation rate (fraction of calls where finish_reason is "length" - these calls hit max_tokens and produced incomplete outputs, which is a quality issue), output-to-input token ratio (a ratio above 1 is unusual and may indicate the model is being prompted to produce very long outputs unnecessarily), and cache hit rate (fraction of requests served from semantic cache - measures cost avoidance). Optimization based on these metrics: if P95 input tokens are 10x the mean, implement input token limits or truncation. If truncation rate exceeds 5%, increase max_tokens or investigate why outputs are so long. If output tokens are consistently high, tighten the prompt instruction to be more concise.
