Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Model Fallback & Retry demo on the EngineersOfAI Playground - no code required. :::

Idempotency and Retries

The Duplicate Email Incident

The email notification system had been running fine for two weeks when a user reported receiving the same AI-generated personalized newsletter three times. Then four more reports. By end of day: 847 users had received duplicates, some as many as five copies.

The post-mortem revealed a subtle cause. The pipeline used a queue-based architecture: a worker pulled a user record, called the LLM to generate the personalized email, and sent it via the email API. But the network call to the email API was timing out intermittently due to a degraded third-party service. The worker, not receiving a 200 OK confirmation, would retry the entire pipeline - generate and send - again. And again. The email was being delivered on the first attempt; the retry just could not receive confirmation because the email API was slow to respond.

The fix was conceptually simple: before retrying, check whether the operation had already succeeded. Before generating and sending an email, check whether this specific user has already received their newsletter this week. Make the "send email" operation idempotent - able to be called multiple times with the same effect as calling it once.

This is the core challenge of distributed LLM pipelines: every call can fail at any point, including after the work was actually done. Retrying without idempotency creates duplicate side effects. Understanding how to make LLM workflows both resilient and safe is fundamental to production reliability.

The Failure Taxonomy

LLM pipeline failures fall into three categories with different recovery strategies:

The ambiguous case is the hardest. When a request times out, you do not know if the server completed the work. The request might have timed out because: the server received it and is still processing, the server received and completed it but the response was lost on the network, or the server never received it at all. Retrying without checking could mean executing the operation twice. This is why idempotency keys exist.

Error TypeExamplesStrategySafe to Retry?
Transient429, 503, network timeoutRetry with backoffYes, unconditionally
Permanent400, 401, 403, 404Fail immediately, alertNo
AmbiguousConnection reset, 504Check idempotency key firstOnly after checking
InfrastructureProcess crash, OOMResume from checkpointAfter loading state

Idempotency Keys: The Foundation

An idempotency key is a unique identifier for a specific operation. Before executing, check if this key was already processed. If yes, return the cached result. If no, execute and store the result with the key.

import anthropic
import hashlib
import json
import time
from dataclasses import dataclass
from typing import Optional

client = anthropic.Anthropic()


@dataclass
class IdempotentResult:
key: str
content: str
input_tokens: int
output_tokens: int
created_at: float
was_cached: bool = False


# In-memory store for single-process use
# Replace with Redis or PostgreSQL for distributed systems
_idempotency_store: dict[str, IdempotentResult] = {}


def generate_idempotency_key(
messages: list[dict],
system: str = "",
model: str = "",
extra_context: dict | None = None,
) -> str:
"""
Generate a deterministic idempotency key from request parameters.
Same inputs → same key → same (cached) result returned.

The key is a hash - it does not reveal the content of the request.
Include extra_context for business-level keys (e.g., user_id + week_number).
"""
payload = {
"messages": messages,
"system": system,
"model": model,
**(extra_context or {}),
}
# sort_keys=True ensures consistent ordering regardless of dict creation order
content = json.dumps(payload, sort_keys=True, ensure_ascii=True)
return hashlib.sha256(content.encode()).hexdigest()


def generate_business_key(user_id: str, operation: str, period: str) -> str:
"""
Generate a business-level idempotency key.
More readable than a content hash - useful for debugging.

Examples:
- generate_business_key("user_123", "weekly_email", "2025-W12")
- generate_business_key("doc_456", "contract_analysis", "v2")
"""
content = f"{user_id}:{operation}:{period}"
return hashlib.sha256(content.encode()).hexdigest()


def call_with_idempotency(
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
idempotency_key: Optional[str] = None,
cache_ttl_seconds: float = 3600, # 1 hour default
) -> IdempotentResult:
"""
Make an LLM call with idempotency guarantee.

If this exact call has been made before (identified by idempotency_key),
return the cached result without calling the API again.

Args:
messages: Conversation messages
system: System prompt
model: Model to use
max_tokens: Max tokens in response
idempotency_key: Custom key (auto-generated from inputs if not provided)
cache_ttl_seconds: How long to cache results (0 = never expire)

Returns:
IdempotentResult with was_cached=True if result came from cache
"""
key = idempotency_key or generate_idempotency_key(messages, system, model)

# Check cache first
if key in _idempotency_store:
cached = _idempotency_store[key]
age = time.time() - cached.created_at

if cache_ttl_seconds == 0 or age < cache_ttl_seconds:
return IdempotentResult(
key=cached.key,
content=cached.content,
input_tokens=cached.input_tokens,
output_tokens=cached.output_tokens,
created_at=cached.created_at,
was_cached=True,
)

# Cache miss - make the actual API call
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)

result = IdempotentResult(
key=key,
content=response.content[0].text,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
created_at=time.time(),
was_cached=False,
)

# Store result before returning (in case next step fails)
_idempotency_store[key] = result
return result

Production Idempotency with Redis

For distributed systems running multiple app instances, the idempotency store must be shared across all instances. An in-memory dict only works for single-process deployments:

import anthropic
import hashlib
import json
import time
from typing import Optional
import redis as redis_lib


class RedisIdempotencyStore:
"""
Redis-backed idempotency store for distributed systems.

Key properties:
- Shared across all service instances (unlike in-memory dict)
- Survives process restarts (persisted in Redis)
- TTL-based expiry (automatic cleanup)
- Distributed lock support (prevents duplicate concurrent execution)

Architecture:
- Result key: "llm:idem:{hash}" → JSON result with TTL
- Lock key: "llm:idem:lock:{hash}" → "locked" with 2min TTL
"""

def __init__(self, redis_client: redis_lib.Redis, key_prefix: str = "llm:idem:"):
self.redis = redis_client
self.key_prefix = key_prefix

def get(self, idempotency_key: str) -> Optional[dict]:
"""Get a cached result. Returns None if not found or expired."""
redis_key = f"{self.key_prefix}{idempotency_key}"
data = self.redis.get(redis_key)
if data is None:
return None
return json.loads(data)

def set(
self,
idempotency_key: str,
result: dict,
ttl_seconds: int = 3600,
) -> None:
"""Store a result with automatic expiry."""
redis_key = f"{self.key_prefix}{idempotency_key}"
self.redis.setex(redis_key, ttl_seconds, json.dumps(result))

def acquire_lock(
self,
idempotency_key: str,
lock_ttl_seconds: int = 120,
) -> bool:
"""
Acquire a distributed lock for this idempotency key.
Returns True if we acquired the lock, False if already locked.

Uses Redis SET NX (set if not exists) - atomic operation.
The lock expires automatically after lock_ttl_seconds to prevent
deadlocks if the holding process crashes.

This prevents two workers from simultaneously processing the same item
when they both check the cache and find a miss at the same time.
"""
lock_key = f"{self.key_prefix}lock:{idempotency_key}"
result = self.redis.set(lock_key, "locked", nx=True, ex=lock_ttl_seconds)
return result is not None # True if SET succeeded (lock acquired)

def release_lock(self, idempotency_key: str) -> None:
"""Release the lock for an idempotency key."""
lock_key = f"{self.key_prefix}lock:{idempotency_key}"
self.redis.delete(lock_key)

def is_locked(self, idempotency_key: str) -> bool:
"""Check if a lock is held (without acquiring it)."""
lock_key = f"{self.key_prefix}lock:{idempotency_key}"
return self.redis.exists(lock_key) > 0


def distributed_idempotent_call(
messages: list[dict],
system: str,
model: str,
max_tokens: int,
store: RedisIdempotencyStore,
idempotency_key: Optional[str] = None,
cache_ttl_seconds: int = 3600,
) -> dict:
"""
Idempotent LLM call with distributed Redis-backed deduplication.
Safe to call from multiple concurrent workers - exactly-once execution.

Flow:
1. Check result cache → return if found
2. Acquire distributed lock → prevents concurrent duplicate execution
3. Re-check result cache (in case another worker just finished)
4. Execute LLM call
5. Store result in cache
6. Release lock
"""
client = anthropic.Anthropic()

key = idempotency_key or hashlib.sha256(
json.dumps({"messages": messages, "system": system, "model": model}, sort_keys=True).encode()
).hexdigest()

# Step 1: Check result cache (fast path - O(1) Redis GET)
cached = store.get(key)
if cached:
return {**cached, "cached": True}

# Step 2: Acquire distributed lock
lock_acquired = store.acquire_lock(key, lock_ttl_seconds=120)

if not lock_acquired:
# Another worker is processing this - wait briefly and re-check
print(f"Lock held by another worker for key {key[:8]}. Waiting...")
time.sleep(2)

# Re-check after wait - the other worker may have finished
cached = store.get(key)
if cached:
return {**cached, "cached": True}

# If still not done, proceed anyway (lock may have expired)
# This is safe - the worst case is two workers computing the same result

try:
# Step 3: Double-check cache after acquiring lock (prevents TOCTOU race)
# Another worker may have completed between steps 1 and 2
cached = store.get(key)
if cached:
return {**cached, "cached": True}

# Step 4: Execute the LLM call
response = client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)

result = {
"content": response.content[0].text,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"completed_at": time.time(),
"cached": False,
}

# Step 5: Store result BEFORE returning (so other workers can use it)
store.set(key, result, ttl_seconds=cache_ttl_seconds)
return result

finally:
# Step 6: Always release the lock, even if the call failed
store.release_lock(key)

Retry Strategies in Depth

Strategy 1: Exponential Backoff with Jitter

import asyncio
import anthropic
import random
import time
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass

T = TypeVar("T")
async_client = anthropic.AsyncAnthropic()

# Classify errors by retry behavior
RETRYABLE_ERRORS = (
anthropic.RateLimitError, # 429: too many requests - definitely retry
anthropic.APIConnectionError, # Network issue - retry
anthropic.APITimeoutError, # Timeout - retry (ambiguous, but usually OK)
anthropic.InternalServerError, # 500/503 - Anthropic server issue, retry
)
NON_RETRYABLE_ERRORS = (
anthropic.AuthenticationError, # 401: fix the API key
anthropic.PermissionDeniedError, # 403: check permissions
anthropic.NotFoundError, # 404: wrong endpoint
anthropic.BadRequestError, # 400: fix the request (context too long, etc.)
)


@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_retries: int = 5
base_delay: float = 1.0 # Initial delay in seconds
max_delay: float = 60.0 # Cap on delay
backoff_multiplier: float = 2.0
jitter_type: str = "full" # "full", "equal", "decorrelated"

# Delays at each attempt (base_delay=1, multiplier=2):
# Attempt 1 fail: wait ~1s (full jitter: 0-1s)
# Attempt 2 fail: wait ~2s (full jitter: 0-2s)
# Attempt 3 fail: wait ~4s (full jitter: 0-4s)
# Attempt 4 fail: wait ~8s (full jitter: 0-8s)
# Attempt 5 fail: wait ~16s (full jitter: 0-16s)
# → gives up after 5 retries


async def with_retry(
fn: Callable[[], Awaitable[T]],
config: RetryConfig = RetryConfig(),
) -> T:
"""
Execute an async function with configurable retry logic.

Jitter prevents the "thundering herd" problem where all retrying
clients hit the API simultaneously after a service disruption.

Rate limit headers: If the API returns a Retry-After header,
we respect it (wait at least that long before retrying).
"""
last_exception = None
prev_delay = config.base_delay

for attempt in range(config.max_retries + 1):
try:
return await fn()

except NON_RETRYABLE_ERRORS:
# Never retry permanent errors - fail immediately
raise

except RETRYABLE_ERRORS as e:
last_exception = e

if attempt == config.max_retries:
print(f"All {config.max_retries} retries exhausted. Giving up.")
break

# Exponential base delay
exp_delay = min(
config.base_delay * (config.backoff_multiplier ** attempt),
config.max_delay,
)

# Apply jitter strategy
if config.jitter_type == "full":
# Fully randomized: spreads retries maximally
delay = random.uniform(0, exp_delay)
elif config.jitter_type == "equal":
# Guaranteed minimum: predictable lower bound
delay = exp_delay / 2 + random.uniform(0, exp_delay / 2)
elif config.jitter_type == "decorrelated":
# AWS recommended: prevents cross-client correlation
delay = min(config.max_delay, random.uniform(config.base_delay, prev_delay * 3))
prev_delay = delay
else:
delay = exp_delay

# Respect Retry-After header for rate limit errors
if isinstance(e, anthropic.RateLimitError):
retry_after = getattr(e, "retry_after", None)
if retry_after:
delay = max(delay, float(retry_after))
print(f"Rate limited. Respecting Retry-After: {retry_after}s")

print(
f"Attempt {attempt + 1}/{config.max_retries} failed "
f"({type(e).__name__}). Retry in {delay:.2f}s..."
)
await asyncio.sleep(delay)

raise last_exception


# Usage with default config (5 retries, exponential backoff, full jitter)
async def robust_call(messages: list[dict], system: str = "") -> str:
async def call():
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=500,
system=system,
messages=messages,
)
return response.content[0].text

return await with_retry(call)


# Aggressive retry for critical operations
aggressive_config = RetryConfig(
max_retries=8,
base_delay=0.5,
max_delay=120.0,
jitter_type="decorrelated",
)

Strategy 2: Circuit Breaker

When the LLM API is genuinely degraded (not just rate-limited), retrying just accumulates timeouts. The circuit breaker prevents this:

import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Awaitable


class CircuitState(Enum):
CLOSED = "closed" # Normal: calls pass through
OPEN = "open" # Degraded: fail fast without calling
HALF_OPEN = "half_open" # Recovering: test one call at a time


@dataclass
class CircuitBreaker:
"""
Circuit breaker for external API calls.

Without circuit breaker: 1000 queued calls each wait 30s timeout
= 30,000s of accumulated wait, exhausted thread pool, cascading failures.

With circuit breaker: after 5 timeouts in 60s, remaining 995 calls
fail in microseconds (fast fail), allowing the system to recover
or route to a fallback.

The pattern was popularized by Michael Nygard in "Release It!" (2007).
"""
failure_threshold: int = 5 # Open after N failures
success_threshold: int = 2 # Close after N successes in half-open
timeout_seconds: float = 60.0 # Wait before trying half-open
window_seconds: float = 60.0 # Failure count window

_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_times: list[float] = field(default_factory=list, init=False)
_last_opened_at: float = field(default=0.0, init=False)
_half_open_successes: int = field(default=0, init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
_calls_blocked: int = field(default=0, init=False)
_calls_total: int = field(default=0, init=False)

def _prune_failures(self) -> None:
cutoff = time.monotonic() - self.window_seconds
self._failure_times = [t for t in self._failure_times if t > cutoff]

async def call(self, fn: Callable[[], Awaitable[Any]]) -> Any:
"""Execute fn through the circuit breaker."""
async with self._lock:
self._calls_total += 1

if self._state == CircuitState.OPEN:
elapsed = time.monotonic() - self._last_opened_at
if elapsed >= self.timeout_seconds:
self._state = CircuitState.HALF_OPEN
self._half_open_successes = 0
print(f"[Circuit] OPEN → HALF_OPEN after {elapsed:.0f}s")
else:
self._calls_blocked += 1
remaining = self.timeout_seconds - elapsed
raise RuntimeError(
f"Circuit OPEN: failing fast. "
f"Retry in {remaining:.0f}s. "
f"({self._calls_blocked} calls blocked)"
)

try:
result = await fn()

async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_times.clear()
print(f"[Circuit] HALF_OPEN → CLOSED (recovered after {self._half_open_successes} successes)")

return result

except Exception as e:
async with self._lock:
self._failure_times.append(time.monotonic())
self._prune_failures()

if self._state == CircuitState.HALF_OPEN:
self._state = CircuitState.OPEN
self._last_opened_at = time.monotonic()
print(f"[Circuit] HALF_OPEN → OPEN (recovery failed)")

elif len(self._failure_times) >= self.failure_threshold:
self._state = CircuitState.OPEN
self._last_opened_at = time.monotonic()
print(
f"[Circuit] CLOSED → OPEN "
f"({len(self._failure_times)} failures in {self.window_seconds}s)"
)
raise

@property
def state(self) -> CircuitState:
return self._state

def stats(self) -> dict:
self._prune_failures()
return {
"state": self._state.value,
"recent_failures": len(self._failure_times),
"calls_total": self._calls_total,
"calls_blocked": self._calls_blocked,
"block_rate": self._calls_blocked / max(self._calls_total, 1),
}

Workflow Idempotency: Multi-Step Pipelines

For pipelines with multiple dependent steps, track state at each step individually. A crash mid-pipeline resumes from the last completed step rather than restarting from scratch:

import anthropic
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Callable

client = anthropic.Anthropic()


class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"


@dataclass
class StepRecord:
"""Record of a single step's execution."""
name: str
status: StepStatus = StepStatus.PENDING
result: Any = None
error: str | None = None
started_at: float | None = None
completed_at: float | None = None
attempt: int = 0


class IdempotentWorkflow:
"""
Multi-step workflow with step-level idempotency.

Key behaviors:
- Completed steps are skipped on re-run (return cached result)
- Failed steps are re-attempted
- State is persisted after each step (crash-safe)
- Steps can depend on previous steps' results

Use for any multi-step LLM pipeline that might fail mid-way:
- Document analysis pipelines (extract → classify → summarize → report)
- Data enrichment (fetch → analyze → store → notify)
- Report generation (gather data → generate sections → compile → publish)
"""

def __init__(self, workflow_id: str, state_dir: str = "./workflows"):
self.workflow_id = workflow_id
self.state_dir = Path(state_dir)
self.state_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.state_dir / f"{workflow_id}.json"
self.steps: list[StepRecord] = []
self._load()

def _load(self) -> None:
"""Load existing state if it exists."""
if self.state_file.exists():
try:
data = json.loads(self.state_file.read_text())
self.steps = [
StepRecord(
name=s["name"],
status=StepStatus(s["status"]),
result=s.get("result"),
error=s.get("error"),
started_at=s.get("started_at"),
completed_at=s.get("completed_at"),
attempt=s.get("attempt", 0),
)
for s in data.get("steps", [])
]
print(f"[Workflow {self.workflow_id}] Loaded existing state: {len(self.steps)} steps")
except Exception as e:
print(f"[Workflow {self.workflow_id}] Failed to load state: {e}. Starting fresh.")

def _save(self) -> None:
"""Persist current state atomically."""
data = {
"workflow_id": self.workflow_id,
"steps": [
{
"name": s.name,
"status": s.status.value,
"result": s.result,
"error": s.error,
"started_at": s.started_at,
"completed_at": s.completed_at,
"attempt": s.attempt,
}
for s in self.steps
],
"saved_at": time.time(),
}
# Atomic write: write to temp file, then rename
tmp_path = self.state_file.with_suffix(".tmp")
tmp_path.write_text(json.dumps(data, indent=2))
tmp_path.rename(self.state_file)

def _get_step(self, name: str) -> StepRecord | None:
return next((s for s in self.steps if s.name == name), None)

def run_step(
self,
name: str,
fn: Callable[..., Any],
*args,
max_attempts: int = 3,
**kwargs,
) -> Any:
"""
Execute a workflow step with idempotency.

If the step previously completed successfully, return cached result.
If the step failed, retry (up to max_attempts total).
If the step is running (from a crashed process), re-attempt.

Args:
name: Unique name for this step
fn: Function to execute (can be sync or async)
max_attempts: Maximum total attempts (not retries)
*args, **kwargs: Arguments to pass to fn

Returns:
The result of fn(*args, **kwargs)
"""
step = self._get_step(name)

# Idempotency check: skip completed steps
if step and step.status == StepStatus.COMPLETED:
print(f"[Step '{name}'] Already completed - using cached result")
return step.result

# Check if we've exceeded max attempts
if step and step.attempt >= max_attempts:
raise RuntimeError(
f"Step '{name}' failed {step.attempt} times. "
f"Max attempts ({max_attempts}) exceeded. Manual intervention required. "
f"Last error: {step.error}"
)

# Create or update step record
if not step:
step = StepRecord(name=name)
self.steps.append(step)

step.status = StepStatus.RUNNING
step.started_at = time.time()
step.attempt += 1
self._save()

print(f"[Step '{name}'] Executing (attempt {step.attempt}/{max_attempts})")

try:
result = fn(*args, **kwargs)

step.status = StepStatus.COMPLETED
step.result = result
step.completed_at = time.time()
step.error = None
self._save()

duration = step.completed_at - step.started_at
print(f"[Step '{name}'] Completed in {duration:.1f}s")
return result

except Exception as e:
step.status = StepStatus.FAILED
step.error = str(e)
step.completed_at = time.time()
self._save()

print(f"[Step '{name}'] Failed (attempt {step.attempt}): {e}")
raise

def get_summary(self) -> dict:
"""Get workflow execution summary."""
return {
"workflow_id": self.workflow_id,
"total_steps": len(self.steps),
"completed": sum(1 for s in self.steps if s.status == StepStatus.COMPLETED),
"failed": sum(1 for s in self.steps if s.status == StepStatus.FAILED),
"pending": sum(1 for s in self.steps if s.status == StepStatus.PENDING),
"steps": [
{
"name": s.name,
"status": s.status.value,
"attempts": s.attempt,
"error": s.error,
}
for s in self.steps
],
}


# Real-world usage: document analysis pipeline
def run_document_analysis_pipeline(document_id: str, document_text: str) -> dict:
"""
Three-step analysis pipeline. Safe to run multiple times.
If it crashes at step 2, re-running will skip step 1 and resume at step 2.
"""
workflow = IdempotentWorkflow(f"doc_analysis_{document_id}")

# Step 1: Extract entities
entities = workflow.run_step(
"extract_entities",
fn=lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=200,
messages=[{
"role": "user",
"content": f"List the 5 main entities in this document:\n{document_text[:1000]}",
}],
).content[0].text,
)

# Step 2: Classify document type (uses step 1 output)
classification = workflow.run_step(
"classify_document",
fn=lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=50,
messages=[{
"role": "user",
"content": (
f"Classify this document type based on these entities: {entities}\n"
f"One word classification:"
),
}],
).content[0].text.strip(),
)

# Step 3: Generate report (uses both previous outputs)
report = workflow.run_step(
"generate_report",
fn=lambda: client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=400,
messages=[{
"role": "user",
"content": (
f"Write a 2-paragraph analysis report.\n"
f"Document type: {classification}\n"
f"Key entities: {entities}"
),
}],
).content[0].text,
)

return {
"document_id": document_id,
"entities": entities,
"classification": classification,
"report": report,
"workflow_summary": workflow.get_summary(),
}

Dead Letter Queue Pattern

When items fail after all retries, do not lose them - route to a DLQ for later investigation:

import asyncio
import anthropic
import json
import time
from pathlib import Path
from dataclasses import dataclass, field

async_client = anthropic.AsyncAnthropic()


@dataclass
class DeadLetterItem:
"""An item that exhausted all retry attempts."""
item_id: str
original_payload: dict
failure_reason: str
failure_count: int
first_failed_at: float
last_failed_at: float
error_type: str


class DeadLetterQueue:
"""
Routes permanently-failed items to a DLQ file for investigation.
Items in the DLQ can be:
1. Investigated manually to understand why they failed
2. Re-processed after fixing the underlying issue
3. Discarded if the failure is expected (e.g., content policy)
"""

def __init__(self, dlq_file: str):
self.dlq_path = Path(dlq_file)

def add(self, item: DeadLetterItem) -> None:
"""Append an item to the DLQ (JSONL format)."""
entry = {
"item_id": item.item_id,
"original_payload": item.original_payload,
"failure_reason": item.failure_reason,
"failure_count": item.failure_count,
"first_failed_at": item.first_failed_at,
"last_failed_at": item.last_failed_at,
"error_type": item.error_type,
}
with open(self.dlq_path, "a") as f:
f.write(json.dumps(entry) + "\n")

def get_all(self) -> list[DeadLetterItem]:
"""Load all DLQ items for analysis."""
if not self.dlq_path.exists():
return []
items = []
with open(self.dlq_path) as f:
for line in f:
try:
data = json.loads(line)
items.append(DeadLetterItem(**data))
except (json.JSONDecodeError, TypeError):
continue
return items

def get_error_distribution(self) -> dict:
"""Analyze error types across all DLQ items."""
from collections import Counter
items = self.get_all()
return {
"total": len(items),
"by_error_type": dict(Counter(i.error_type for i in items)),
}

Common Mistakes

:::danger Retrying Non-Idempotent Operations Without Checking If an operation has side effects (send email, charge a payment card, create a database record), retrying without first checking if it already succeeded creates duplicates. Always check "did this already happen?" before retrying any side-effecting operation. The idempotency key pattern above is the solution. :::

:::danger No Jitter on Exponential Backoff Without jitter, all retrying workers back off to the same intervals and retry simultaneously - the "thundering herd" problem. This spikes load exactly when the service is trying to recover, creating a retry loop. Always add jitter. Use "full" jitter for maximum spread or "decorrelated" for AWS-recommended behavior. :::

:::danger Retrying Permanent Errors BadRequestError (400), AuthenticationError (401), and PermissionDeniedError (403) will never succeed on retry. Retrying them burns API quota and wastes time. Explicitly categorize your error types and skip retry for permanent failures. An alert on permanent errors is more valuable than a retry loop. :::

:::warning Using In-Memory State for Distributed Systems An idempotency store in a Python dict only works for single-process deployments. With multiple app instances (load balancer, multiple workers), each instance has its own dict - no sharing. In production, idempotency state must be in Redis or a database accessible to all instances. :::

:::tip Idempotency Keys for All Critical Operations If an LLM call drives a significant business action - sends a notification, creates a user account, charges a customer, writes a record - attach an idempotency key derived from the business entity ID and the operation. This makes retries safe everywhere and eliminates entire categories of distributed systems bugs. :::

Interview Q&A

Q1: What is idempotency and why is it important for LLM pipelines?

Idempotency means that executing an operation multiple times produces the same result as executing it once. It is critical for LLM pipelines because failures in distributed systems are normal - network timeouts, rate limits, process crashes - and recovery requires retrying. Without idempotency, retries cause duplicate execution: emails sent twice, records created twice, charges made twice. With idempotency, retries are safe because the system detects "this was already done" and returns the cached result. The implementation is an idempotency key - a unique identifier for each operation - stored with the result in Redis or a database. All subsequent calls with the same key return the cached result without re-executing. The 847 duplicate emails in the incident above could have been prevented with a single idempotency check before sending.

Q2: What is the "thundering herd" problem and how does jitter solve it?

When many clients simultaneously experience failures (a service restart causes 1,000 clients to get connection errors at the same moment), they all back off and retry at the same intervals if using deterministic exponential backoff. With a 2x multiplier: all 1,000 clients retry at t+1s simultaneously, creating a spike that overwhelms the recovering service. The service fails again. All clients retry at t+2s. Another spike. This cycle can prevent recovery indefinitely. Jitter breaks the synchrony by randomizing the wait time. Instead of all 1,000 clients waiting exactly 4 seconds for their third retry, they wait uniformly between 0 and 4 seconds - spreading the load over 4 seconds (~250 clients per second) rather than all arriving at once. AWS recommends "decorrelated jitter" which also prevents correlation between different retry waves from the same client.

Q3: How does a circuit breaker help LLM applications and when should you use one?

A circuit breaker monitors failure rates and stops making calls to a degraded service, allowing it to recover. Three states: CLOSED (normal), OPEN (fail fast), HALF_OPEN (testing recovery). When failures exceed a threshold, the circuit opens. Subsequent calls fail immediately without hitting the API - protecting the service from overload and protecting your system from accumulated timeouts. After a timeout, the circuit half-opens and allows test calls. Use circuit breakers when: LLM API calls are in the critical path of user requests (a 30s cascade of timeouts blocks users), you have fallback logic (route to a different model when the primary is down), or you need fast failure detection for monitoring and alerting. Do not use circuit breakers for batch processing where you want to retry exhaustively rather than fail fast.

Q4: How do you design an LLM pipeline that is resilient to process crashes mid-execution?

Use step-level idempotency with persistent state. The IdempotentWorkflow pattern above demonstrates this: (1) Before each step, check if it previously completed and return the cached result if so; (2) Save workflow state to disk after each step completes (use atomic rename to prevent corruption); (3) Track step status: PENDING, RUNNING, COMPLETED, FAILED; (4) On restart, load the saved state and skip COMPLETED steps. The key insight: "RUNNING" status in saved state means the process crashed during that step - it should be re-attempted, not assumed complete. Set a reasonable max_attempts to prevent infinite retry loops on systematically failing steps.

Q5: What is the difference between at-least-once and exactly-once delivery, and when does each matter?

At-least-once: the operation is guaranteed to execute at least once, but may execute multiple times due to retries. This is the default for most distributed systems - it is simpler to implement. Acceptable when: the operation is idempotent (reading, querying, overwriting) or when duplicates are harmless (generating a report that will be reviewed before use). Exactly-once: the operation executes exactly once even in the presence of failures. Harder to achieve - requires idempotency keys, distributed locks, and careful state management. Required when: the operation has side effects that must not be duplicated (charging a payment card, sending an email, creating a unique record). In LLM pipelines: the LLM call itself is at-least-once (calling it twice generates two responses, but if they are semantically equivalent it may be acceptable). The downstream action triggered by the LLM response (send email, charge card, create record) must be exactly-once. Separate the concerns: retry the LLM call freely, but gate the downstream action with an idempotency check.

Q6: How do you test that your LLM pipeline's idempotency implementation actually works?

Testing idempotency requires deliberately inducing failures and verifying the recovery behavior. Four test scenarios: (1) Successful re-run: process an item, record it as complete, run the pipeline again on the same item - verify the result is returned from cache with zero additional API calls. Assert was_cached=True in the result. (2) Mid-pipeline crash simulation: run a multi-step workflow, inject an exception at step 2 (by mocking the LLM client to raise on the second call), verify the workflow state file shows step 1 COMPLETED and step 2 FAILED. Re-run the pipeline and verify step 1 is skipped and step 2 is retried. (3) Concurrent execution: spin up two workers processing the same item simultaneously. Verify only one API call is made (the distributed lock prevents duplicate execution). Verify both workers eventually return the same result. (4) Expired cache: create a cached result with a 1-second TTL, wait 2 seconds, call again - verify a fresh API call is made. Run these tests in CI before every deployment. Idempotency bugs are subtle and usually only appear in production under specific failure conditions.

© 2026 EngineersOfAI. All rights reserved.