Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Model Fallback & Retry demo on the EngineersOfAI Playground - no code required. :::

AI Error Handling and Fallbacks

When the AI Breaks and Nobody Knows What to Do

A mid-sized fintech company deployed an AI document analysis tool for loan officers. It processed mortgage applications - extracting income figures, flagging discrepancies, and generating plain-English summaries that let an officer review a 200-page application package in 15 minutes instead of 2 hours. The tool was fast, accurate, and beloved. Loan officers were processing 40% more applications per day than before. Leadership called it the most successful internal product in company history. They expanded it to three additional regional offices. They tied quarterly bonus targets to adoption rates.

Then a Tuesday afternoon arrived when the Anthropic API had a partial service degradation. Certain API endpoints returned 529 (overloaded) errors intermittently - not every request, just perhaps 1 in 5. The AI tool surfaced this to users as a raw JavaScript error: TypeError: Cannot read properties of undefined (reading 'text'). No guidance. No context. No recovery path. The loan officers had no idea if their document had been processed. Were they in a retry loop? Had the analysis been submitted? Should they process it manually and risk creating a duplicate entry? Three loan officers submitted the same application twice - creating duplicate records that triggered automated fraud flags and took the compliance team four days to reconcile. Two others waited 40 minutes staring at the same cryptic error before calling IT. One opened a brand-new identical ticket in the loan management system, which triggered a separate application for the same property with a different loan officer assigned. The root cause of every one of these downstream problems was not the Anthropic service degradation. It was the absence of error handling. The team had designed the "AI works" experience with exquisite care and completely failed to design the "AI is temporarily broken" experience. That Tuesday afternoon was fixable in two hours of proper error handling. It was not fixed, and it cost the team two weeks of trust repair and a mandatory post-mortem with the CTO.

This is what error handling is for: not preventing the error itself, but designing the user's path through it. Well-designed error handling communicates three things every time - what happened, whether the user needs to act, and what they should do next. That is the entire requirement. Everything in this lesson is a specific mechanism for meeting it.


The LLM Error Taxonomy

LLM errors fall into distinct categories, each requiring a completely different response. The most important first step is correct classification - retrying a permanent error wastes money and time; not retrying a transient error loses user requests that could have succeeded with one more attempt.

Error CategoryHTTP StatusRetry?User Message Theme
Transient (overloaded)529, 503Yes - short backoff"Temporarily busy, retrying automatically..."
Rate limited429Yes - longer wait"High demand right now, waiting before retry"
TimeoutN/AYes - faster model"Taking too long, trying a faster approach"
Content policy400, 422No"Request couldn't be processed, try rephrasing"
Auth / billing401, 403No"Service issue, contact support"
Permanent bad request400No"Request format not supported"

:::info The 529 Status Code HTTP 529 is Anthropic's custom status code for "service overloaded" - distinct from 503 (service unavailable). It means the model servers are at capacity right now but will recover shortly. It is always transient, always retriable, and never indicates a problem with your request. Short exponential backoff (1–5 seconds) is the correct response. :::


The Core Error Classification System

Never handle raw exception types inline throughout your codebase. Build a single classification function and route everything through it. This keeps user messages consistent, retry logic centralized, and monitoring accurate.

# error_handling/classifier.py
import anthropic
import asyncio
from dataclasses import dataclass
from typing import Optional
from enum import Enum


class ErrorCategory(str, Enum):
TRANSIENT = "transient" # Retry with short backoff
RATE_LIMITED = "rate_limited" # Retry with longer delay
TIMEOUT = "timeout" # Retry with faster model
CONTENT_POLICY = "content_policy" # Do not retry - modify request
AUTH_ERROR = "auth_error" # Do not retry - fix config
PERMANENT = "permanent" # Do not retry - fix code
UNKNOWN = "unknown" # Treat as transient, log for investigation


@dataclass
class ClassifiedError:
original: Exception
category: ErrorCategory
user_message: str # Human-readable - safe to show in UI
should_retry: bool
retry_after_seconds: Optional[float]
http_status: Optional[int]
internal_detail: str # For logging only - NEVER shown to user


def classify_anthropic_error(error: Exception) -> ClassifiedError:
"""
Classify an Anthropic API error into actionable categories.
Each category maps to a different retry strategy and user message.

Rules:
- Never expose raw error messages, SDK types, or API details to users.
- Always produce a user_message that ends with a clear action.
- The internal_detail should contain enough context to debug without
needing to reproduce the error.
"""
if isinstance(error, anthropic.RateLimitError):
return ClassifiedError(
original=error,
category=ErrorCategory.RATE_LIMITED,
user_message=(
"The AI is experiencing high demand right now. "
"Please wait a moment - we'll retry automatically."
),
should_retry=True,
retry_after_seconds=30.0, # Check Retry-After header in production
http_status=429,
internal_detail=f"Rate limit hit: {str(error)[:200]}",
)

elif isinstance(error, anthropic.APIStatusError):
status = error.status_code

if status == 529:
return ClassifiedError(
original=error,
category=ErrorCategory.TRANSIENT,
user_message="The AI service is temporarily at capacity. Retrying automatically...",
should_retry=True,
retry_after_seconds=5.0,
http_status=529,
internal_detail=f"API overloaded (529): {str(error)[:200]}",
)
elif status >= 500:
return ClassifiedError(
original=error,
category=ErrorCategory.TRANSIENT,
user_message="The AI service is temporarily unavailable. Retrying...",
should_retry=True,
retry_after_seconds=3.0,
http_status=status,
internal_detail=f"Server error ({status}): {str(error)[:200]}",
)
elif status == 400:
# Could be content policy OR malformed request - inspect error body
error_body = getattr(error, "body", {}) or {}
error_type = error_body.get("error", {}).get("type", "")

if "content" in error_type.lower() or "policy" in error_type.lower():
return ClassifiedError(
original=error,
category=ErrorCategory.CONTENT_POLICY,
user_message="Your message couldn't be processed as written. Try rephrasing it.",
should_retry=False,
retry_after_seconds=None,
http_status=400,
internal_detail=f"Content policy (400): {str(error)[:200]}",
)
return ClassifiedError(
original=error,
category=ErrorCategory.PERMANENT,
user_message="This request format isn't supported. Please try a different approach.",
should_retry=False,
retry_after_seconds=None,
http_status=400,
internal_detail=f"Bad request (400): {str(error)[:200]}",
)
elif status in (401, 403):
return ClassifiedError(
original=error,
category=ErrorCategory.AUTH_ERROR,
user_message=(
"There's a service configuration issue. "
"Please contact support if this continues."
),
should_retry=False,
retry_after_seconds=None,
http_status=status,
internal_detail=f"Auth error ({status}): check API key and billing status",
)
elif status == 422:
return ClassifiedError(
original=error,
category=ErrorCategory.CONTENT_POLICY,
user_message="Your message couldn't be processed due to content guidelines. Please rephrase.",
should_retry=False,
retry_after_seconds=None,
http_status=422,
internal_detail=f"Content violation (422): {str(error)[:200]}",
)

elif isinstance(error, anthropic.APIConnectionError):
return ClassifiedError(
original=error,
category=ErrorCategory.TRANSIENT,
user_message="Connection to the AI service failed. Retrying...",
should_retry=True,
retry_after_seconds=2.0,
http_status=None,
internal_detail=f"Connection error: {str(error)[:200]}",
)

elif isinstance(error, asyncio.TimeoutError):
return ClassifiedError(
original=error,
category=ErrorCategory.TIMEOUT,
user_message="The AI took longer than expected. Trying a faster approach...",
should_retry=True,
retry_after_seconds=0.0,
http_status=None,
internal_detail="Request timed out - model may be under load",
)

# Unknown error type - log for investigation, treat as transient
return ClassifiedError(
original=error,
category=ErrorCategory.UNKNOWN,
user_message="An unexpected error occurred. Please try again.",
should_retry=True,
retry_after_seconds=2.0,
http_status=None,
internal_detail=f"Unknown error type {type(error).__name__}: {str(error)[:200]}",
)

Exponential Backoff with Jitter

Naive retry logic is dangerous. Retrying immediately after a failure just adds load to an already-stressed service. Retrying at fixed intervals causes thundering herd - all your application instances retry at the same moment after an outage, flooding the service as it tries to recover.

The solution is exponential backoff with jitter: delay grows exponentially (1s, 2s, 4s, 8s) and a random factor desynchronizes retries across instances.

# error_handling/retry.py
import anthropic
import asyncio
import random
import logging
from typing import Callable, TypeVar, Any, Optional

logger = logging.getLogger(__name__)
T = TypeVar("T")


async def retry_with_backoff(
fn: Callable[[], Any],
max_attempts: int = 4,
base_delay_seconds: float = 1.0,
max_delay_seconds: float = 60.0,
jitter: bool = True,
on_retry: Optional[Callable[[int, "ClassifiedError"], None]] = None,
) -> Any:
"""
Retry a function with exponential backoff and jitter.

Only retries on transient errors - permanent errors propagate immediately.
Jitter prevents thundering herd: without it, multiple instances recovering
simultaneously flood the service with synchronized retry waves.

Backoff sequence (no jitter): 1s, 2s, 4s, 8s
With jitter (±25%): ~0.75–1.25s, ~1.5–2.5s, ~3–5s, ~6–10s

Args:
fn: Async callable to retry (no arguments - use a closure or lambda)
max_attempts: Total attempts including the first
base_delay_seconds: Starting delay; doubles each retry
max_delay_seconds: Cap on delay to prevent excessive waits
jitter: Add random ±25% factor to desynchronize concurrent retries
on_retry: Callback invoked before each retry (for UI updates)
"""
last_classified: Optional[ClassifiedError] = None

for attempt in range(max_attempts):
try:
return await fn()

except Exception as e:
classified = classify_anthropic_error(e)
last_classified = classified

logger.warning(
f"Attempt {attempt + 1}/{max_attempts} failed | "
f"category={classified.category.value} | "
f"detail={classified.internal_detail}"
)

# Non-retriable errors propagate immediately
if not classified.should_retry:
logger.info(f"Not retrying: {classified.category.value}")
raise e

if attempt == max_attempts - 1:
# Exhausted all attempts
logger.error(f"All {max_attempts} attempts failed | last_error={classified.category.value}")
break

# Compute delay
if classified.retry_after_seconds and classified.retry_after_seconds > 0:
# Provider told us how long to wait
delay = classified.retry_after_seconds
else:
# Exponential backoff
delay = min(base_delay_seconds * (2 ** attempt), max_delay_seconds)

if jitter:
# Add ±25% random jitter to desynchronize retries
jitter_factor = 0.75 + random.random() * 0.5
delay *= jitter_factor

if on_retry:
on_retry(attempt + 1, classified)

logger.info(f"Retrying in {delay:.1f}s (attempt {attempt + 1}/{max_attempts})")
await asyncio.sleep(delay)

raise Exception(
f"Request failed after {max_attempts} attempts. "
f"Last error: {last_classified.user_message if last_classified else 'unknown'}"
)


async def call_claude_resilient(
messages: list[dict],
system: str = "You are a helpful assistant.",
model: str = "claude-opus-4-6",
max_tokens: int = 2048,
max_attempts: int = 4,
) -> str:
"""
Call Claude with automatic retry on transient errors.
Safe to use as the default wrapper for all non-streaming calls.
"""
client = anthropic.AsyncAnthropic()

async def _attempt():
response = await client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
return response.content[0].text

def log_retry(attempt: int, error: ClassifiedError):
logger.info(f"[Retry {attempt}] {error.user_message}")

return await retry_with_backoff(
_attempt,
max_attempts=max_attempts,
on_retry=log_retry,
)

The Fallback Model Chain

When a model is unavailable or times out, fall through to a faster, cheaper alternative. Quality degrades gracefully rather than failing completely. The key design decision: which errors trigger fallback vs which propagate.

# error_handling/fallback_chain.py
import anthropic
import asyncio
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class ModelFallbackChain:
"""
Try models in priority order, falling back on transient failure.

Design principles:
- Primary model: highest quality (Opus)
- Secondary: balance of quality and speed (Sonnet)
- Tertiary: fastest and cheapest (Haiku) - last resort
- Timeouts trigger fallback to faster model (reduce timeout proportionally)
- Content policy violations do NOT trigger fallback (same result everywhere)
- Auth errors do NOT trigger fallback (all models share the same API key)

Track which model served each response for A/B quality analysis.
Alert when fallback model usage exceeds 5% - primary model reliability issue.
"""

FALLBACK_ORDER = [
"claude-opus-4-6",
"claude-sonnet-4-6",
"claude-haiku-4-5-20251001",
]

def __init__(self, timeout_seconds: float = 30.0):
self.client = anthropic.AsyncAnthropic()
self.initial_timeout = timeout_seconds

def _next_model(self, current: str) -> Optional[str]:
try:
idx = self.FALLBACK_ORDER.index(current)
if idx < len(self.FALLBACK_ORDER) - 1:
return self.FALLBACK_ORDER[idx + 1]
except ValueError:
pass
return None

async def complete(
self,
messages: list[dict],
system: str,
preferred_model: str = "claude-opus-4-6",
max_tokens: int = 2048,
) -> tuple[str, str]:
"""
Complete a request using the fallback chain.
Returns (response_text, model_used).
"""
current_model = preferred_model
current_timeout = self.initial_timeout
attempt = 0
max_hops = len(self.FALLBACK_ORDER)

while current_model and attempt < max_hops:
try:
response = await asyncio.wait_for(
self.client.messages.create(
model=current_model,
max_tokens=max_tokens,
system=system,
messages=messages,
),
timeout=current_timeout,
)
if attempt > 0:
logger.info(
f"Fallback successful: served by {current_model} "
f"after {attempt} hop(s)"
)
return response.content[0].text, current_model

except asyncio.TimeoutError:
next_model = self._next_model(current_model)
if next_model:
logger.warning(
f"Timeout on {current_model} (>{current_timeout}s) "
f"→ trying {next_model}"
)
current_model = next_model
# Scale timeout down for faster models
current_timeout = max(10.0, current_timeout * 0.5)
attempt += 1
else:
raise Exception(f"All models timed out. Last tried: {current_model}")

except anthropic.APIStatusError as e:
classified = classify_anthropic_error(e)

if classified.category in (
ErrorCategory.CONTENT_POLICY,
ErrorCategory.AUTH_ERROR,
ErrorCategory.PERMANENT,
):
# These errors will be identical on any model - don't waste hops
raise

if classified.category in (ErrorCategory.TRANSIENT, ErrorCategory.RATE_LIMITED):
next_model = self._next_model(current_model)
if next_model:
logger.warning(
f"Service error on {current_model} "
f"({classified.category.value}) → trying {next_model}"
)
current_model = next_model
attempt += 1
if classified.retry_after_seconds and classified.retry_after_seconds > 0:
await asyncio.sleep(min(classified.retry_after_seconds, 5.0))
else:
raise

except anthropic.APIConnectionError:
next_model = self._next_model(current_model)
if next_model:
logger.warning(f"Connection error on {current_model} → trying {next_model}")
current_model = next_model
attempt += 1
else:
raise

raise Exception("Fallback chain exhausted - all models failed")

The Circuit Breaker

The circuit breaker is the most important resilience pattern for AI services that your team is probably not using. Without it, a partial service degradation becomes a self-amplifying incident.

The failure scenario without a circuit breaker: Anthropic has a 20-minute service issue. 500 concurrent users hit your app. Each request times out after 30 seconds. Your thread pool exhausts. Other features start failing. Your database connection pool starves because threads are stuck waiting. What was a 20-minute AI degradation becomes a 40-minute full application outage - for a feature most users aren't even using.

With a circuit breaker: After 5 failures, the circuit opens. All subsequent AI requests return immediately (sub-millisecond) with a clear user message. The rest of the application remains healthy. After 60 seconds, the circuit half-opens, sends one test request, and closes automatically when the service recovers.

# error_handling/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Optional, Callable, Any
import logging

logger = logging.getLogger(__name__)


class CircuitState(str, Enum):
CLOSED = "closed" # Normal - requests flow through
OPEN = "open" # Service down - fail fast, no requests sent
HALF_OPEN = "half_open" # Testing recovery - allow one test request


class AICircuitBreaker:
"""
Fail fast when the AI service is clearly down.

State transitions:
CLOSED → OPEN: failure_count >= failure_threshold
OPEN → HALF_OPEN: recovery_timeout has elapsed
HALF_OPEN → CLOSED: success_threshold successes received
HALF_OPEN → OPEN: any failure during testing

For distributed deployments: store state in Redis so all instances
share the same circuit state. An in-process circuit means each
pod has its own independent state - defeating the pattern's purpose.
"""

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout_seconds: float = 60.0,
success_threshold: int = 2,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout_seconds
self.success_threshold = success_threshold

self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._opened_at: Optional[float] = None

@property
def state(self) -> CircuitState:
# Auto-transition from OPEN to HALF_OPEN after timeout
if (
self._state == CircuitState.OPEN
and self._opened_at
and time.time() - self._opened_at >= self.recovery_timeout
):
self._state = CircuitState.HALF_OPEN
self._success_count = 0
logger.info("Circuit HALF-OPEN: testing recovery")
return self._state

def _record_success(self) -> None:
self._failure_count = 0

if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
logger.info("Circuit CLOSED: service recovered")

def _record_failure(self) -> None:
self._failure_count += 1

if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._opened_at = time.time()
logger.warning("Circuit OPEN: recovery test failed")
elif self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
self._opened_at = time.time()
logger.error(
f"Circuit OPEN: {self._failure_count} failures exceeded "
f"threshold {self.failure_threshold}"
)

async def call(self, fn: Callable[[], Any]) -> Any:
"""
Execute a function through the circuit breaker.
Raises immediately if circuit is OPEN (fail fast).
"""
current_state = self.state # Triggers auto-transition check

if current_state == CircuitState.OPEN:
time_remaining = max(
0, self.recovery_timeout - (time.time() - (self._opened_at or 0))
)
raise Exception(
f"AI service temporarily unavailable. "
f"Automatically retrying in {time_remaining:.0f}s."
)

try:
result = await fn()
self._record_success()
return result
except Exception as e:
classified = classify_anthropic_error(e)
# Only count transient errors toward the threshold
# Content policy violations should NOT open the circuit
if classified.category in (
ErrorCategory.TRANSIENT,
ErrorCategory.RATE_LIMITED,
ErrorCategory.TIMEOUT,
ErrorCategory.UNKNOWN,
):
self._record_failure()
raise

@property
def status(self) -> dict:
return {
"state": self.state.value,
"failure_count": self._failure_count,
"recovery_in_seconds": max(
0,
self.recovery_timeout - (time.time() - (self._opened_at or 0))
) if self._state == CircuitState.OPEN else 0,
}

:::warning Circuit Breaker State Must Be Shared Across Instances If you run multiple application instances and store circuit breaker state in-process (as above), each instance has its own independent circuit. Instance A might be in OPEN state while Instance B is still sending 30-second timeout requests. In production with multiple instances, store state in Redis: use INCR with a 60-second TTL to count failures, and a separate key for the open/half-open state. A naive Redis implementation works well for this pattern - the circuit breaker does not need distributed transactions. :::


Graceful Degradation with Response Cache

When all LLM calls fail, show something useful rather than a blank error. A multi-level degradation ladder ensures users always get a response, even if it's from cache or a rule-based system.

# error_handling/graceful_degradation.py
import logging
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

logger = logging.getLogger(__name__)


class DegradationLevel(str, Enum):
FULL = "full" # AI working normally
CACHED = "cached" # Serving from semantic cache
RULE_BASED = "rule" # Rule-based fallback responses
MINIMAL = "minimal" # Informative "AI unavailable" message


@dataclass
class DegradedResponse:
level: DegradationLevel
content: str
is_cached: bool = False
cache_age_seconds: Optional[float] = None
suggested_actions: list[str] = field(default_factory=list)
retry_in_seconds: Optional[int] = None


class GracefulDegradationService:
"""
Multi-level degradation when AI is unavailable.

Degradation ladder (try in order):
1. Full AI response (primary path)
2. Semantic cache hit (similar query answered recently)
3. Rule-based response (FAQ, knowledge base lookups)
4. Informative failure message (links to alternatives, retry ETA)

The goal: never show a blank screen or raw error.
Every level must include a suggested next action for the user.
"""

def __init__(
self,
fallback_chain: "ModelFallbackChain",
response_cache, # SemanticCache instance
rule_engine=None, # Optional rule-based fallback
):
self.chain = fallback_chain
self.cache = response_cache
self.rules = rule_engine

async def get_response(
self,
user_message: str,
conversation_history: list[dict],
system: str,
) -> DegradedResponse:
messages = conversation_history + [{"role": "user", "content": user_message}]

# Level 1: Full AI response
try:
text, model = await retry_with_backoff(
lambda: self.chain.complete(messages, system),
max_attempts=3,
)
return DegradedResponse(
level=DegradationLevel.FULL,
content=text,
)
except Exception as e:
classified = classify_anthropic_error(e)
# Content policy and auth: no degradation path helps
if classified.category in (ErrorCategory.CONTENT_POLICY, ErrorCategory.AUTH_ERROR):
raise
logger.warning(f"AI unavailable after retries: {classified.category}")

# Level 2: Semantic cache
try:
cached = await self.cache.find_similar(user_message, threshold=0.85)
if cached:
return DegradedResponse(
level=DegradationLevel.CACHED,
content=f"*[Cached response - AI temporarily unavailable]*\n\n{cached.response}",
is_cached=True,
cache_age_seconds=cached.age_seconds,
suggested_actions=["Try again in a few minutes for a fresh response"],
retry_in_seconds=120,
)
except Exception:
logger.warning("Cache lookup failed")

# Level 3: Rule-based response
if self.rules:
try:
rule_response = await self.rules.match(user_message)
if rule_response:
return DegradedResponse(
level=DegradationLevel.RULE_BASED,
content=rule_response,
suggested_actions=["For complex questions, try again when AI is available"],
retry_in_seconds=300,
)
except Exception:
logger.warning("Rule engine failed")

# Level 4: Informative failure message
return DegradedResponse(
level=DegradationLevel.MINIMAL,
content=(
"The AI assistant is temporarily unavailable. "
"Our team is aware and working to restore service.\n\n"
"**In the meantime:**\n"
"- Check the [Help Center](https://help.example.com) for answers\n"
"- Browse [Previous Conversations](/history) for similar questions\n"
"- Try again in a few minutes\n\n"
"We apologize for the interruption."
),
suggested_actions=[
"Check Help Center",
"Try again in 5 minutes",
"Contact support if urgent",
],
retry_in_seconds=300,
)

User-Facing Error Messages

Error messages are product decisions, not engineering afterthoughts. Every error message must answer three questions: what happened, is it the user's fault, and what should they do now? If any of those three is missing, rewrite it.

# error_handling/user_messages.py
from typing import Optional
from dataclasses import dataclass


@dataclass
class UserErrorDisplay:
title: str
body: str
primary_action_label: str
primary_action_type: str # "retry", "edit_input", "contact_support", "wait"
retry_countdown_seconds: Optional[int]
show_status_page_link: bool
severity: str # "info", "warning", "error"


ERROR_DISPLAY_CONFIG: dict[ErrorCategory, UserErrorDisplay] = {
ErrorCategory.TRANSIENT: UserErrorDisplay(
title="Temporarily unavailable",
body="The AI service is temporarily unavailable. We're automatically retrying.",
primary_action_label="Retrying...",
primary_action_type="retry",
retry_countdown_seconds=5,
show_status_page_link=True,
severity="warning",
),
ErrorCategory.RATE_LIMITED: UserErrorDisplay(
title="High demand right now",
body="You've reached the usage limit for this moment. We'll retry automatically in 30 seconds.",
primary_action_label="Retry in 30s",
primary_action_type="wait",
retry_countdown_seconds=30,
show_status_page_link=False,
severity="info",
),
ErrorCategory.TIMEOUT: UserErrorDisplay(
title="Response took too long",
body="The AI took longer than expected. Trying a faster approach automatically.",
primary_action_label="Retrying...",
primary_action_type="retry",
retry_countdown_seconds=0,
show_status_page_link=False,
severity="info",
),
ErrorCategory.CONTENT_POLICY: UserErrorDisplay(
title="Message couldn't be processed",
body="Your request couldn't be processed as written. Try rephrasing it or adjusting the content.",
primary_action_label="Edit message",
primary_action_type="edit_input",
retry_countdown_seconds=None,
show_status_page_link=False,
severity="info",
),
ErrorCategory.AUTH_ERROR: UserErrorDisplay(
title="Service configuration error",
body="There's a configuration issue with this service. Please contact support if this continues.",
primary_action_label="Contact support",
primary_action_type="contact_support",
retry_countdown_seconds=None,
show_status_page_link=False,
severity="error",
),
ErrorCategory.PERMANENT: UserErrorDisplay(
title="Request not supported",
body="This type of request couldn't be processed. Please try a different approach.",
primary_action_label="Edit message",
primary_action_type="edit_input",
retry_countdown_seconds=None,
show_status_page_link=False,
severity="warning",
),
}


def get_user_error_display(error_category: ErrorCategory) -> UserErrorDisplay:
"""Get the user-facing error display config for an error category."""
return ERROR_DISPLAY_CONFIG.get(
error_category,
ERROR_DISPLAY_CONFIG[ErrorCategory.TRANSIENT], # Default: treat as transient
)

The Complete Resilient AI Call

Putting it all together - circuit breaker wrapping retry wrapping fallback chain, with structured error output for the frontend:

# error_handling/resilient_call.py
import anthropic
import asyncio
import logging
from typing import Optional

logger = logging.getLogger(__name__)

# Singletons - initialize once at app startup, shared across requests
_circuit_breaker = AICircuitBreaker(failure_threshold=5, recovery_timeout_seconds=60.0)
_fallback_chain = ModelFallbackChain(timeout_seconds=30.0)


async def resilient_ai_call(
user_message: str,
conversation_history: list[dict],
system: str = "You are a helpful AI assistant.",
preferred_model: str = "claude-opus-4-6",
max_tokens: int = 2048,
) -> dict:
"""
Complete resilient AI call stack:
1. Circuit breaker (fail fast when service is clearly down)
2. Retry with exponential backoff (transient errors)
3. Fallback model chain (Opus → Sonnet → Haiku)
4. User-friendly structured error on total failure

Returns:
success: {"status": "success", "response": str, "model": str}
error: {"status": "error", "error": dict, "retryable": bool}
"""
messages = conversation_history + [{"role": "user", "content": user_message}]

try:
async def _attempt_with_retry():
return await retry_with_backoff(
lambda: _fallback_chain.complete(
messages=messages,
system=system,
preferred_model=preferred_model,
max_tokens=max_tokens,
),
max_attempts=3,
)

text, model_used = await _circuit_breaker.call(_attempt_with_retry)

return {
"status": "success",
"response": text,
"model": model_used,
"circuit_state": _circuit_breaker.status["state"],
}

except Exception as e:
classified = classify_anthropic_error(e)
display = get_user_error_display(classified.category)

# Internal logging with full detail
logger.error(
f"AI call failed | category={classified.category.value} | "
f"detail={classified.internal_detail} | "
f"circuit={_circuit_breaker.status['state']}"
)

# User-facing response: structured, never exposing internals
return {
"status": "error",
"error": {
"title": display.title,
"body": display.body,
"primary_action_label": display.primary_action_label,
"primary_action_type": display.primary_action_type,
"retry_countdown_seconds": display.retry_countdown_seconds,
"severity": display.severity,
},
"retryable": classified.should_retry,
"error_category": classified.category.value,
"circuit_state": _circuit_breaker.status["state"],
}


async def ai_health_check() -> dict:
"""
Health check endpoint - expose circuit breaker state for monitoring.
Wire this to /api/health/ai and alert when circuit is OPEN.
"""
return {
"circuit_breaker": _circuit_breaker.status,
"service": "ai-chat",
}

Handling Partial Stream Failures

Streaming failures are a special case: the error occurs mid-stream, after some tokens have already been sent to the client. You cannot change the HTTP status code after streaming starts. The solution is to send a structured error event in the SSE stream itself.

:::danger Never Discard Partial Streaming Responses When a stream fails after 50 tokens, those 50 tokens represent real AI output. Blanking the screen discards content the user may have already started reading. Instead: keep the partial content visible, append an inline error notice ("Response was cut short - click Retry to continue"), and offer a retry button that re-sends the original request. Users find partial responses useful as a starting point. :::

# error_handling/stream_error_handling.py
import anthropic
import asyncio
import json
from fastapi.responses import StreamingResponse


async def resilient_stream(
messages: list[dict],
system: str,
model: str = "claude-opus-4-6",
):
"""
Streaming generator with inline error handling.
Errors mid-stream are sent as structured events, not HTTP errors.

Frontend contract:
- Listen for event type "error" to show inline error UI
- Listen for event type "stream_interrupted" to show retry button
- Keep all content received before the error - do not blank the screen
"""
client = anthropic.AsyncAnthropic()
tokens_sent = 0

try:
yield f"data: {json.dumps({'type': 'start'})}\n\n"

async with client.messages.stream(
model=model,
max_tokens=4096,
system=system,
messages=messages,
) as stream:
async for text in stream.text_stream:
tokens_sent += 1
yield f"data: {json.dumps({'type': 'text', 'text': text})}\n\n"

yield f"data: {json.dumps({'type': 'done', 'tokens': tokens_sent})}\n\n"
yield "data: [DONE]\n\n"

except asyncio.CancelledError:
# Client disconnected - stop generating
return

except anthropic.APIStatusError as e:
classified = classify_anthropic_error(e)
event = {
"type": "stream_interrupted",
"tokens_before_error": tokens_sent,
"user_message": classified.user_message,
"retryable": classified.should_retry,
"action": "edit_input" if classified.category == ErrorCategory.CONTENT_POLICY else "retry",
}
yield f"data: {json.dumps(event)}\n\n"
yield "data: [DONE]\n\n"

except Exception as e:
classified = classify_anthropic_error(e)
event = {
"type": "error",
"tokens_before_error": tokens_sent,
"user_message": classified.user_message,
"retryable": True,
}
yield f"data: {json.dumps(event)}\n\n"
yield "data: [DONE]\n\n"

Production Engineering Checklist

Error monitoring is distinct from application logging. Every classified error should emit a metric tagged by category. Track: error counts by category (rate_limit, transient, timeout, content_policy), circuit breaker state transitions, fallback model usage frequency. Alert when:

  • rate_limit errors exceed 5% of requests → billing tier issue
  • transient errors exceed 10% → service instability, page on-call
  • Circuit breaker opens → immediate alert, check status page
  • Fallback model usage exceeds 5% → primary model reliability issue

Distinguish user-caused from system-caused errors. Content policy violations (user sent a problematic message) should not page on-call. 5xx server errors and timeouts should. Mixing these categories dilutes your alert signal and causes alert fatigue within weeks.

Cache successful responses for degraded fallback. When an AI call succeeds, cache the (query_embedding, response) pair with a 24-hour TTL. When the service is down, serve cached responses for semantically similar queries (cosine similarity > 0.85). This provides meaningful degraded experience for common questions rather than a blank error screen.


Common Mistakes

:::danger Never Retry Content Policy Violations If the API returns a 400 or 422 with a content policy reason, the same request will fail on every retry. Retrying wastes API credits, burns retry budget that could be used for real transient errors, and creates a confusing delay for users. The correct path for content policy violations is immediate: classify the error, show the "rephrase your message" UX, and stop. Do not retry. Do not fall back to a different model - the content is the problem, not the model. :::

:::danger Exposing Internal Error Details to Users Never send raw exception messages, stack traces, HTTP status codes, model names, SDK error types, or API provider names directly to users. The user does not need to know it was a 529 from Anthropic. They need to know what to do next. Internal details belong in structured logs tagged with request IDs for debugging. User-facing messages should be written by product designers, not copy-pasted from exception handlers. :::

:::warning Not Testing Failure Paths Before Launch Every error handling path should be exercised in staging before the feature ships. Teams that skip this discover their failure paths for the first time during production incidents - the worst possible time. Test each category: use a mock client that raises specific exceptions, use a proxy that returns controlled HTTP status codes, use feature flags that simulate specific error types. Verify that each path: (1) shows the correct user message, (2) retries or does not retry correctly, (3) falls back to the right degradation level, (4) emits the right metrics. :::

:::tip Test Fallback Paths in Staging Deliberately trigger each failure mode in staging before launch: kill the network to test connection errors, hit rate limits by sending concurrent requests, set artificially short timeouts to test timeout handling, send requests designed to trigger content policy to test the non-retry path. Production failures always happen at the worst times - know your failure paths work before users discover them. :::


Interview Q&A

Q1: How do you distinguish transient from permanent LLM API errors?

By HTTP status code and SDK exception type. From the Anthropic SDK: 5xx status codes (500, 529/overloaded) and APIConnectionError are transient - retry with exponential backoff. RateLimitError (429) is transient but requires a longer wait - check the Retry-After header if present, otherwise default to 30–60 seconds. 400 is split: inspect the error body's type field. If it contains "content" or "policy" it is a content policy violation - do not retry, ask user to rephrase. If it is a generic invalid_request_error it is a permanent bad request - fix the code. 401 and 403 are auth/billing errors - retrying will never help. The Anthropic Python SDK raises distinct exception types (RateLimitError, APIStatusError, APIConnectionError) that make classification direct. Build a single classify_anthropic_error() function and route everything through it so classification logic is never duplicated.

Q2: Describe the circuit breaker pattern and why it matters for AI services.

A circuit breaker monitors failures in a sliding window, and after exceeding a threshold "opens" - rejecting all subsequent requests immediately without calling the external service. After a recovery timeout, it "half-opens" and allows one test request. If that succeeds, it "closes" and normal operation resumes. For AI products the stakes are particularly high: without a circuit breaker, a 20-minute Anthropic degradation can become a 40-minute full application outage because every user request stacks up a 30-second timeout thread. Your thread pool exhausts. Your connection pools starve. Features that have nothing to do with AI start failing because they share the same process. A circuit breaker contains the blast radius: after opening, failed AI requests return in under 1ms with a clear user message, the rest of the application stays healthy, and the circuit automatically tests and restores service when Anthropic recovers. In distributed systems with multiple instances, store circuit state in Redis to prevent each pod from having its own independent circuit.

Q3: How do you write user-facing error messages for AI failures?

Three principles: (1) Never expose internal details - no stack traces, API error codes, SDK error types, or model names. Users don't need to know it was a 529 from Anthropic. (2) Every message ends with a clear action - retry, edit input, contact support, or wait with a countdown. If the user doesn't know what to do next, the message failed its purpose. (3) Match the message to the error category - a rate limit message should show a countdown timer ("retry in 30s"); a transient error should say "retrying automatically" and show the retry happening; a content policy error should ask the user to rephrase without implying blame. Test every error message by reading it aloud to a non-technical colleague. If they don't immediately know what to do, rewrite it. Error messages are product copy, not engineering output.

Q4: How do you implement a fallback model chain?

Define a priority order: Opus → Sonnet → Haiku. On each attempt, use asyncio.wait_for() to enforce a timeout. On success, return the response and which model served it - log this for monitoring. On timeout, try the next model in the chain with a proportionally reduced timeout (if Opus had 30s, Sonnet gets 15s, Haiku gets 10s). On transient errors (5xx, connection error), try the next model with a short sleep if a retry_after was specified. On content policy or auth errors, propagate immediately - the same error will occur on every model, so fallback hops are wasted. Log every fallback event and set up an alert when fallback usage exceeds 5% of production traffic - this is a primary model reliability signal. Return the model_used field in every response so you can correlate quality metrics with which model actually served each response.

Q5: How do you handle partial response failures in streaming?

Streaming failures can occur mid-stream after some tokens are already sent to the client. You cannot change the HTTP status code after streaming starts. The approach: (1) Send errors as structured SSE events - data: {"type": "stream_interrupted", "user_message": "...", "retryable": true}. (2) Include a tokens_before_error field so the frontend knows how much content was received. (3) Never blank the screen - keep all partial content visible. (4) Show an inline error notice below the partial content: "Response was cut short. Click Retry to continue." (5) The retry button re-sends the original request from the beginning. In a future enhancement you can implement resumable generation by summarizing the partial response and sending it as context to the retry request, but the simple re-send covers 95% of cases adequately.

Q6: How do you test error handling paths without causing real production outages?

Several techniques: (1) Dependency injection - pass a mock client to your AI handler that raises specific exceptions. Write unit tests for each error category: transient, rate limit, timeout, content policy, auth. Verify user messages, retry decisions, and metrics are correct for each. (2) Staging with fault injection - configure a local proxy (e.g., toxiproxy) that sits in front of the Anthropic endpoint and returns controlled error responses on demand. (3) Feature flags with error simulation - a hidden flag X-Simulate-Error: timeout that makes the handler raise that error type, exercisable without code changes. (4) Load testing for thundering herd - send 500 concurrent requests to staging and verify the retry logic and circuit breaker handle synchronized retries correctly. (5) Chaos engineering - deliberately kill the AI service connection in staging during a load test and verify the circuit breaker opens, the fallback chain activates, and the degradation service serves from cache before falling to the minimal message.

© 2026 EngineersOfAI. All rights reserved.