Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Sync vs Async Inference demo on the EngineersOfAI Playground - no code required. :::

Async LLM Calls

The Sequential Processing Trap

When Luis first built the document analysis pipeline, he wrote it the way most engineers write LLM pipelines: process one document, wait for the response, process the next document, repeat. It was simple, obvious, and - when they tried to process 10,000 documents - catastrophically slow.

At 2 seconds per document, 10,000 documents would take 5.5 hours. That was unacceptable for a pipeline that needed to run nightly before the 7 AM business review. The documents did not depend on each other. The LLM had no reason to process them sequentially. But the code was fundamentally sequential: each API call blocked until complete before the next could start.

The fix was async concurrency. By running 50 API calls simultaneously, the pipeline runtime dropped from 5.5 hours to 7 minutes - a 47x speedup. The LLM API throughput was the same; the change was purely in how efficiently they consumed it. The documents that used to queue up behind each other now ran in parallel, each in flight while others were being initiated.

Understanding async LLM call patterns is one of the highest-leverage technical skills for AI engineers. The performance difference between well-structured async code and naive sequential code is frequently measured in orders of magnitude, not percentages.

Why LLM Calls Are I/O Bound

Before writing any async code, understand why async works here. LLM API calls are I/O bound, not CPU bound. The bottleneck is network latency and server-side inference time - not computation in your Python process.

A typical LLM API call: your process sends a request, then waits 1-3 seconds while the remote server runs inference. During that wait, your Python process is idle. Async I/O lets you fill that idle time with other I/O operations. The CPU is not the bottleneck - the network is. Async concurrency exploits this by having one process manage many simultaneous network waits.

This is why asyncio.gather with 50 LLM calls completes in roughly the same time as 1 sequential call - the 50 calls are all waiting on the network simultaneously, not one after another.

Foundation: The AsyncAnthropic Client

import anthropic
import asyncio
from typing import Any

# Sync client - fine for scripts and CLI tools
sync_client = anthropic.Anthropic()

# Async client - use this in FastAPI, async workers, and any concurrent code
# AsyncAnthropic maintains an httpx.AsyncClient connection pool internally
async_client = anthropic.AsyncAnthropic()


async def call_claude_async(
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
) -> str:
"""Single async Claude call. Await this to get the result."""
response = await async_client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
return response.content[0].text


async def main():
# Single call
result = await call_claude_async(
messages=[{"role": "user", "content": "What is 2 + 2?"}]
)
print(result)

# Multiple calls - WRONG way (sequential despite being async)
results_sequential = []
for i in range(5):
r = await call_claude_async(
messages=[{"role": "user", "content": f"What is {i} + 1?"}]
)
results_sequential.append(r)

# Multiple calls - RIGHT way (truly concurrent)
tasks = [
call_claude_async(messages=[{"role": "user", "content": f"What is {i} + 1?"}])
for i in range(5)
]
results_concurrent = await asyncio.gather(*tasks)
# All 5 calls run simultaneously - total time ≈ time of single call


asyncio.run(main())

:::danger Using Sync Client in Async Code anthropic.Anthropic() is synchronous. Calling client.messages.create() from a sync client inside an async function blocks the entire event loop, freezing all other concurrent operations. Always use anthropic.AsyncAnthropic() in async contexts. The sync client is appropriate only for scripts, tests, and synchronous frameworks. :::

Pattern 1: Parallel Fan-Out with asyncio.gather

The simplest concurrent pattern. Collect all coroutines, run them simultaneously, wait for all to finish:

import asyncio
import anthropic
import time
from dataclasses import dataclass

async_client = anthropic.AsyncAnthropic()


@dataclass
class DocumentResult:
doc_id: str
summary: str | None
error: str | None
duration_ms: float
input_tokens: int = 0
output_tokens: int = 0


async def summarize_document(doc: dict) -> DocumentResult:
"""Summarize a single document. Designed to run concurrently."""
start = time.perf_counter()

try:
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=200,
messages=[{
"role": "user",
"content": (
f"Summarize this document in 2 sentences:\n\n"
f"{doc['text'][:2000]}" # Truncate for cost control
),
}],
)
return DocumentResult(
doc_id=doc["id"],
summary=response.content[0].text,
error=None,
duration_ms=(time.perf_counter() - start) * 1000,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
)
except Exception as e:
return DocumentResult(
doc_id=doc["id"],
summary=None,
error=str(e),
duration_ms=(time.perf_counter() - start) * 1000,
)


async def analyze_documents_parallel(documents: list[dict]) -> list[DocumentResult]:
"""
Analyze all documents in parallel using asyncio.gather.

IMPORTANT: return_exceptions=True collects all results even if some fail.
Without it, the first exception cancels remaining tasks.

WARNING: No rate limiting here. Use only for small batches (< 20 documents).
For larger batches, add a semaphore (Pattern 2).
"""
tasks = [summarize_document(doc) for doc in documents]
results = await asyncio.gather(*tasks, return_exceptions=True)

processed: list[DocumentResult] = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed.append(DocumentResult(
doc_id=documents[i]["id"],
summary=None,
error=f"Uncaught exception: {str(result)}",
duration_ms=0,
))
else:
processed.append(result)

return processed


async def demo():
documents = [
{"id": f"doc_{i}", "text": f"This is document {i} about various topics..."}
for i in range(10)
]

start = time.perf_counter()
results = await analyze_documents_parallel(documents)
elapsed = time.perf_counter() - start

successful = [r for r in results if r.error is None]
print(f"Processed {len(successful)}/{len(documents)} documents in {elapsed:.1f}s")
print(f"Total tokens: {sum(r.input_tokens + r.output_tokens for r in successful)}")

:::warning Unbounded Concurrency Is Dangerous asyncio.gather(*[call(item) for item in huge_list]) with 10,000 items will attempt 10,000 simultaneous API calls. This will: (1) trigger rate limits with 429 errors; (2) exhaust connection pool slots; (3) potentially crash your process with OOM. Always bound concurrency with a semaphore. :::

Pattern 2: Semaphore-Controlled Concurrency

The most important production pattern. A semaphore limits the maximum number of concurrent operations, preventing rate limit errors:

import asyncio
import anthropic
import time
from dataclasses import dataclass

async_client = anthropic.AsyncAnthropic()


@dataclass
class ProcessingResult:
item_id: str
result: str | None
error: str | None
duration_ms: float
retry_count: int = 0


async def process_with_semaphore(
items: list[dict],
process_fn,
max_concurrency: int = 20,
) -> list[ProcessingResult]:
"""
Process items with bounded concurrency using asyncio.Semaphore.

The semaphore ensures at most max_concurrency tasks run simultaneously.
When a slot becomes available (a task finishes), the next waiting task starts.

Choosing max_concurrency:
- Start at 10, increase until you see 429 rate limit errors
- Anthropic rate limits: ~4,000 RPM on most tiers
- 4,000 RPM / 60 seconds = ~66 requests/second safe rate
- At 2s average call time: up to 133 concurrent calls theoretically
- In practice, use 20-50 to leave headroom

Args:
items: List of items to process
process_fn: Async function(item) -> str
max_concurrency: Maximum simultaneous API calls
"""
semaphore = asyncio.Semaphore(max_concurrency)
results: list[ProcessingResult] = []
results_lock = asyncio.Lock()

async def process_one(item: dict) -> None:
start = time.perf_counter()
async with semaphore: # Blocks until a slot is available
try:
result = await process_fn(item)
proc_result = ProcessingResult(
item_id=str(item.get("id", "")),
result=result,
error=None,
duration_ms=(time.perf_counter() - start) * 1000,
)
except Exception as e:
proc_result = ProcessingResult(
item_id=str(item.get("id", "")),
result=None,
error=str(e),
duration_ms=(time.perf_counter() - start) * 1000,
)

async with results_lock:
results.append(proc_result)

# All tasks are created upfront but only max_concurrency run simultaneously
await asyncio.gather(*[process_one(item) for item in items])
return results


# Real-world: analyze 1000 customer support tickets
async def analyze_support_tickets(tickets: list[dict]) -> list[dict]:
"""Analyze customer support tickets with controlled concurrency."""

async def classify_ticket(ticket: dict) -> str:
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=50,
messages=[{
"role": "user",
"content": (
f"Classify this support ticket as one of: "
f"billing / technical / account / general\n\n"
f"Ticket: {ticket['text'][:500]}\n\n"
f"Category:"
),
}],
)
return response.content[0].text.strip().lower()

results = await process_with_semaphore(
items=tickets,
process_fn=classify_ticket,
max_concurrency=20,
)

return [
{
"ticket_id": r.item_id,
"category": r.result,
"error": r.error,
"processing_ms": r.duration_ms,
}
for r in results
]

Pattern 3: Async Producer-Consumer Queue

For very large datasets, use a queue to decouple ingestion from processing and provide fine-grained backpressure control:

import asyncio
import anthropic
import json
import time
from asyncio import Queue
from dataclasses import dataclass
from pathlib import Path

async_client = anthropic.AsyncAnthropic()


@dataclass
class QueuedItem:
item_id: str
payload: dict
submitted_at: float = 0.0

def __post_init__(self):
if self.submitted_at == 0.0:
self.submitted_at = time.time()


async def producer_consumer_pipeline(
items: list[dict],
process_fn,
n_workers: int = 10,
queue_size: int = 200,
output_file: str | None = None,
) -> dict:
"""
High-throughput producer-consumer pipeline for LLM processing.

Architecture:
- 1 producer: feeds items into bounded queue
- N workers: pull from queue, call LLM, write results
- Bounded queue: limits memory usage and provides backpressure

Advantages over gather+semaphore:
- Better memory control (queue_size limits buffered items)
- Results written incrementally (no memory explosion for 1M items)
- Workers can be added/removed dynamically
- Queue depth is a natural throughput signal

Args:
items: Items to process
process_fn: Async function(dict) -> str
n_workers: Number of concurrent worker coroutines
queue_size: Max items to buffer in queue (controls memory)
output_file: JSONL file to write results to (for checkpointing)
"""
work_queue: Queue[QueuedItem | None] = Queue(maxsize=queue_size)
total_success = 0
total_error = 0
total_tokens = 0
stats_lock = asyncio.Lock()
output_path = Path(output_file) if output_file else None
output_lock = asyncio.Lock()

async def producer() -> None:
"""Feed items into the queue."""
for i, item in enumerate(items):
queued = QueuedItem(
item_id=str(item.get("id", f"item_{i}")),
payload=item,
)
await work_queue.put(queued) # Blocks if queue is full

# Send N sentinel values to stop N workers
for _ in range(n_workers):
await work_queue.put(None)

async def worker(worker_id: int) -> None:
"""Process items from the queue."""
nonlocal total_success, total_error, total_tokens

while True:
queued = await work_queue.get()

if queued is None:
# Sentinel: this worker should stop
work_queue.task_done()
break

start = time.perf_counter()
result_entry: dict

try:
result = await process_fn(queued.payload)
duration_ms = (time.perf_counter() - start) * 1000

result_entry = {
"id": queued.item_id,
"result": result,
"status": "success",
"duration_ms": duration_ms,
"worker": worker_id,
"queue_wait_ms": (time.time() - queued.submitted_at) * 1000,
}

async with stats_lock:
total_success += 1

except Exception as e:
result_entry = {
"id": queued.item_id,
"result": None,
"status": "error",
"error": str(e),
"worker": worker_id,
}
async with stats_lock:
total_error += 1

finally:
work_queue.task_done()

# Write result incrementally (no memory accumulation)
if output_path:
async with output_lock:
with open(output_path, "a") as f:
f.write(json.dumps(result_entry) + "\n")

# Start all workers
worker_tasks = [
asyncio.create_task(worker(i))
for i in range(n_workers)
]

# Run producer and all workers
start_time = time.time()
await asyncio.gather(producer(), *worker_tasks)
elapsed = time.time() - start_time

return {
"total_items": len(items),
"successful": total_success,
"errored": total_error,
"elapsed_seconds": elapsed,
"throughput_per_second": len(items) / elapsed,
"output_file": str(output_path) if output_path else None,
}

Pattern 4: Token Bucket Rate Limiter

Anthropic rate limits are specified in Requests Per Minute (RPM) and Tokens Per Minute (TPM). A token bucket rate limiter enforces sustained rate compliance while allowing short bursts:

import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class TokenBucketRateLimiter:
"""
Token bucket rate limiter for API calls.

Allows burst traffic up to bucket_size, then enforces
a sustained rate of tokens_per_second.

Example: 60 RPS with burst of 20:
- Normally allows 60 calls/second
- Can burst to 20 simultaneous calls
- Bucket refills at 60 tokens/second

Anthropic limits (approximate, check current docs):
- claude-haiku: 4,000 RPM = 66 RPS
- claude-sonnet: 4,000 RPM = 66 RPS
- claude-opus: 4,000 RPM = 66 RPS
"""
tokens_per_second: float
bucket_size: float
_tokens: float = field(init=False)
_last_refill: float = field(init=False)
_lock: asyncio.Lock = field(init=False)

def __post_init__(self):
self._tokens = self.bucket_size
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()

async def acquire(self, tokens: float = 1.0) -> float:
"""
Acquire tokens, waiting if necessary.
Returns the wait time in seconds (0 if no wait needed).
"""
async with self._lock:
while True:
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(
self.bucket_size,
self._tokens + elapsed * self.tokens_per_second,
)
self._last_refill = now

if self._tokens >= tokens:
self._tokens -= tokens
return 0.0

# Calculate wait time until we have enough tokens
wait_time = (tokens - self._tokens) / self.tokens_per_second
await asyncio.sleep(wait_time)


import anthropic

async_client = anthropic.AsyncAnthropic()

# Conservative limits: 50 RPS with burst of 20
rate_limiter = TokenBucketRateLimiter(
tokens_per_second=50.0,
bucket_size=20.0,
)


async def rate_limited_llm_call(
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
) -> str:
"""LLM call with rate limiting applied before the API call."""
await rate_limiter.acquire()

response = await async_client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
return response.content[0].text


async def high_throughput_processor(items: list[dict]) -> list[str]:
"""
Process items with both rate limiting and concurrency control.
Layer both protections: semaphore limits concurrency,
rate limiter enforces sustained RPS.
"""
semaphore = asyncio.Semaphore(30) # Max concurrent calls

async def process_one(item: dict) -> str:
async with semaphore:
return await rate_limited_llm_call(
messages=[{"role": "user", "content": str(item)}]
)

return await asyncio.gather(*[process_one(item) for item in items])

Pattern 5: Exponential Backoff with Jitter

Rate limit errors (429) and transient failures (503, network timeouts) require retry logic. Without jitter, all retrying workers spike load simultaneously:

import asyncio
import anthropic
import random
import time
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")
async_client = anthropic.AsyncAnthropic()

# Errors that should trigger retry
RETRYABLE_ERRORS = (
anthropic.RateLimitError, # 429: retry after backoff
anthropic.APIConnectionError, # Network issues
anthropic.APITimeoutError, # Request timeout
anthropic.InternalServerError, # 500/503: server-side issue
)

# Errors that should NOT be retried
NON_RETRYABLE_ERRORS = (
anthropic.AuthenticationError, # 401: fix your API key
anthropic.PermissionDeniedError, # 403: check permissions
anthropic.NotFoundError, # 404: bad endpoint
anthropic.BadRequestError, # 400: fix your request
)


async def with_exponential_backoff(
fn: Callable[[], Awaitable[T]],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_multiplier: float = 2.0,
jitter_type: str = "full", # "full", "equal", "decorrelated"
) -> T:
"""
Retry an async function with exponential backoff and jitter.

Jitter types (from AWS architecture blog):
- "full": delay = uniform(0, exp_delay)
Most aggressive spread - prevents thundering herd best
- "equal": delay = exp_delay/2 + uniform(0, exp_delay/2)
Guarantees minimum delay - good when you need predictability
- "decorrelated": delay = uniform(base, prev_delay * 3)
AWS recommended - prevents correlation between retriers

The "thundering herd" problem without jitter:
1000 workers all hit rate limit → all back off for exactly 4s → all retry at t+4s
→ spike of 1000 simultaneous retries → rate limit again → cycle

With jitter, the 1000 workers spread their retries over [0, 4s] → ~250/s
which the rate limiter can handle.
"""
last_exception = None
prev_delay = base_delay

for attempt in range(max_retries + 1):
try:
return await fn()

except NON_RETRYABLE_ERRORS:
# These will not succeed on retry - fail immediately
raise

except RETRYABLE_ERRORS as e:
last_exception = e

if attempt == max_retries:
break

# Compute exponential base delay
exp_delay = min(base_delay * (backoff_multiplier ** attempt), max_delay)

# Apply jitter
if jitter_type == "full":
delay = random.uniform(0, exp_delay)
elif jitter_type == "equal":
delay = exp_delay / 2 + random.uniform(0, exp_delay / 2)
elif jitter_type == "decorrelated":
delay = min(max_delay, random.uniform(base_delay, prev_delay * 3))
prev_delay = delay
else:
delay = exp_delay

# Respect Retry-After header for rate limits (when provided)
if isinstance(e, anthropic.RateLimitError):
retry_after = getattr(e, "retry_after", None)
if retry_after:
delay = max(delay, float(retry_after))
print(f"Rate limited. Retry-After: {retry_after}s")

print(
f"Attempt {attempt + 1}/{max_retries} failed: "
f"{type(e).__name__}. Retry in {delay:.2f}s"
)
await asyncio.sleep(delay)

raise last_exception


async def robust_llm_call(
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
) -> str:
"""LLM call with automatic retry on transient errors."""
async def call():
response = await async_client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
return response.content[0].text

return await with_exponential_backoff(call)

Pattern 6: Circuit Breaker

When the LLM API is degraded, retrying just wastes time and piles up errors. Circuit breakers fail fast and allow recovery:

import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Awaitable


class CircuitState(Enum):
CLOSED = "closed" # Normal: requests pass through
OPEN = "open" # Degraded: requests fail immediately
HALF_OPEN = "half_open" # Recovery: test calls allowed


@dataclass
class CircuitBreaker:
"""
Circuit breaker for LLM API calls.

State transitions:
CLOSED → OPEN: when failure_threshold failures occur in window_seconds
OPEN → HALF_OPEN: after timeout_seconds have elapsed
HALF_OPEN → CLOSED: after success_threshold successes
HALF_OPEN → OPEN: on any failure during testing

Production benefit: prevents cascading failures.
Without circuit breaker: 1000 queued calls all timeout (30s each)
= 30,000 seconds of accumulated wait time.
With circuit breaker: calls fail fast in microseconds once circuit opens.
"""
failure_threshold: int = 5 # Open after N failures in window
success_threshold: int = 2 # Close after N successes in half-open
timeout_seconds: float = 60.0 # Wait before trying half-open
window_seconds: float = 60.0 # Failure counting window

_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failure_times: list[float] = field(default_factory=list, init=False)
_last_opened_at: float = field(default=0.0, init=False)
_half_open_successes: int = field(default=0, init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
_total_requests: int = field(default=0, init=False)
_total_blocked: int = field(default=0, init=False)

@property
def state(self) -> CircuitState:
return self._state

def _prune_old_failures(self) -> None:
"""Remove failures outside the counting window."""
cutoff = time.monotonic() - self.window_seconds
self._failure_times = [t for t in self._failure_times if t > cutoff]

async def call(self, fn: Callable[[], Awaitable[Any]]) -> Any:
"""Execute a function through the circuit breaker."""
async with self._lock:
self._total_requests += 1

# Check state transitions
if self._state == CircuitState.OPEN:
time_since_opened = time.monotonic() - self._last_opened_at
if time_since_opened >= self.timeout_seconds:
# Transition to half-open: test if service recovered
self._state = CircuitState.HALF_OPEN
self._half_open_successes = 0
print(f"[Circuit] OPEN → HALF_OPEN (testing recovery)")
else:
# Still open: fail fast
self._total_blocked += 1
remaining = self.timeout_seconds - time_since_opened
raise RuntimeError(
f"Circuit breaker OPEN. Retry in {remaining:.0f}s. "
f"Blocked {self._total_blocked} requests."
)

# Execute the function (outside the lock to allow concurrent calls in CLOSED state)
try:
result = await fn()

async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_times.clear()
print(f"[Circuit] HALF_OPEN → CLOSED (recovered)")

return result

except Exception as e:
async with self._lock:
self._failure_times.append(time.monotonic())
self._prune_old_failures()

if self._state == CircuitState.HALF_OPEN:
# Failed during recovery test - back to open
self._state = CircuitState.OPEN
self._last_opened_at = time.monotonic()
print(f"[Circuit] HALF_OPEN → OPEN (recovery test failed)")

elif len(self._failure_times) >= self.failure_threshold:
# Too many failures in window - open the circuit
self._state = CircuitState.OPEN
self._last_opened_at = time.monotonic()
print(
f"[Circuit] CLOSED → OPEN "
f"({len(self._failure_times)} failures in {self.window_seconds}s)"
)

raise

def get_stats(self) -> dict:
self._prune_old_failures()
return {
"state": self._state.value,
"recent_failures": len(self._failure_times),
"total_requests": self._total_requests,
"total_blocked": self._total_blocked,
}

Pattern 7: Multi-Step Async Pipeline with Dependencies

For workflows where Step 2 depends on Step 1's output, but multiple Step 1 calls can run in parallel:

import asyncio
import anthropic

async_client = anthropic.AsyncAnthropic()


async def multi_step_parallel_pipeline(
documents: list[dict],
max_concurrency: int = 20,
) -> dict:
"""
Multi-step pipeline with parallel execution at each step.

Step 1: Extract entities from all documents (fully parallel)
Step 2: Classify each entity set (fully parallel, depends on step 1)
Step 3: Generate summary report (sequential, depends on all step 2 results)

This pattern is common in:
- RAG preprocessing pipelines
- Batch classification with enrichment
- Document processing with multiple analysis passes
"""
semaphore = asyncio.Semaphore(max_concurrency)

async def guarded(coro):
"""Wrap a coroutine with semaphore protection."""
async with semaphore:
return await coro

# --- Step 1: Extract entities (all documents in parallel) ---
async def extract_entities(doc: dict) -> dict:
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=150,
messages=[{
"role": "user",
"content": (
f"List the 3-5 main topics in this document as "
f"a comma-separated list. Be concise.\n\n"
f"Document: {doc['text'][:500]}"
),
}],
)
return {
"doc_id": doc["id"],
"entities": response.content[0].text.strip(),
"original": doc,
}

step1_results = await asyncio.gather(
*[guarded(extract_entities(doc)) for doc in documents]
)

# --- Step 2: Classify each result (parallel, depends on step 1) ---
async def classify_result(extracted: dict) -> dict:
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=50,
messages=[{
"role": "user",
"content": (
f"Classify these topics as: technical / business / research / other\n"
f"Topics: {extracted['entities']}\n"
f"One word answer:"
),
}],
)
return {
**extracted,
"category": response.content[0].text.strip().lower(),
}

step2_results = await asyncio.gather(
*[guarded(classify_result(r)) for r in step1_results]
)

# --- Step 3: Generate summary report (sequential, uses all results) ---
classification_summary = "\n".join([
f"- {r['doc_id']}: {r['entities']} [{r['category']}]"
for r in step2_results
])

report_response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=250,
messages=[{
"role": "user",
"content": (
f"Based on these document classifications, "
f"write a 2-paragraph summary of the document corpus:\n\n"
f"{classification_summary}"
),
}],
)

# Count by category
from collections import Counter
category_counts = Counter(r["category"] for r in step2_results)

return {
"documents_processed": len(documents),
"category_distribution": dict(category_counts),
"classifications": step2_results,
"corpus_summary": report_response.content[0].text,
}

Production Async LLM Service

Combining all patterns into a production-ready service class:

import asyncio
import anthropic
import time
from dataclasses import dataclass, field
from collections import defaultdict

async_client = anthropic.AsyncAnthropic()


@dataclass
class CallMetrics:
model: str
input_tokens: int
output_tokens: int
duration_ms: float
success: bool
cached: bool = False
error: str | None = None


class AsyncLLMService:
"""
Production async LLM service with all reliability patterns combined:
- Semaphore: bounds concurrency
- Rate limiter: respects API RPM limits
- Retry with backoff: handles transient failures
- Circuit breaker: prevents cascading failures
- Metrics collection: observability
- Model fallback: graceful degradation
"""

def __init__(
self,
max_concurrency: int = 20,
requests_per_second: float = 50.0,
circuit_failure_threshold: int = 10,
):
self.semaphore = asyncio.Semaphore(max_concurrency)
self.rate_limiter = TokenBucketRateLimiter(requests_per_second, requests_per_second)
self.circuit_breaker = CircuitBreaker(failure_threshold=circuit_failure_threshold)
self._metrics: list[CallMetrics] = []

async def call(
self,
messages: list[dict],
system: str = "",
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
fallback_model: str | None = "claude-haiku-4-5-20251001",
) -> str:
"""
Make an LLM call with full production protections.
Falls back to fallback_model if primary model's circuit is open.
"""
await self.rate_limiter.acquire()

async with self.semaphore:
start = time.perf_counter()

async def _call():
return await async_client.messages.create(
model=model,
max_tokens=max_tokens,
system=system,
messages=messages,
)

try:
response = await self.circuit_breaker.call(
lambda: with_exponential_backoff(_call)
)
duration_ms = (time.perf_counter() - start) * 1000

self._metrics.append(CallMetrics(
model=model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
duration_ms=duration_ms,
success=True,
))

return response.content[0].text

except Exception as e:
duration_ms = (time.perf_counter() - start) * 1000
self._metrics.append(CallMetrics(
model=model,
input_tokens=0,
output_tokens=0,
duration_ms=duration_ms,
success=False,
error=str(e),
))

# Try fallback model if circuit is open
if fallback_model and fallback_model != model:
print(f"Primary model failed, trying fallback: {fallback_model}")
fb_response = await async_client.messages.create(
model=fallback_model,
max_tokens=max_tokens,
system=system,
messages=messages,
)
return fb_response.content[0].text

raise

def get_metrics_summary(self) -> dict:
if not self._metrics:
return {}
successful = [m for m in self._metrics if m.success]
failed = [m for m in self._metrics if not m.success]
return {
"total_calls": len(self._metrics),
"success_rate": len(successful) / len(self._metrics),
"avg_duration_ms": sum(m.duration_ms for m in successful) / max(len(successful), 1),
"total_input_tokens": sum(m.input_tokens for m in successful),
"total_output_tokens": sum(m.output_tokens for m in successful),
"error_count": len(failed),
"circuit_stats": self.circuit_breaker.get_stats(),
}

Common Mistakes

:::danger Using Sync Client in Async Code anthropic.Anthropic() (sync) used inside an async function blocks the entire event loop. Every other coroutine - including all other streaming connections - freezes until your sync call returns. Always use anthropic.AsyncAnthropic() in async contexts. :::

:::danger asyncio.gather Without return_exceptions asyncio.gather(*tasks) by default raises on the first exception and cancels remaining tasks. For batch processing, use return_exceptions=True to collect results from all tasks, then check each result individually. Without this, one bad document stops processing of all other documents. :::

:::warning Tight Event Loop Blocking Long-running CPU operations inside an async function - even simple things like parsing a large JSON string or computing an embedding - block the event loop and stall all concurrent streams. Use asyncio.run_in_executor(None, cpu_fn, arg) to offload CPU work to a thread pool. :::

:::tip Tune Concurrency Empirically Start at max_concurrency=10. Run your workload. Check for 429 errors. If none, increase to 20. Repeat until you see occasional 429s, then back off by 10-20%. Your optimal concurrency depends on your API tier, model choice, and average token count per request. :::

Concurrency Pattern Comparison

PatternBest ForMemoryThroughputComplexity
asyncio.gatherSmall fixed batches (<100)High (all results in memory)Very highLow
Semaphore + gatherMedium batches (100-10K)HighVery highLow
Producer-consumer queueLarge batches (10K+)ControlledHighMedium
Rate limiter + semaphoreAPI rate limit constrainedMediumControlledMedium
Circuit breakerUnreliable external servicesLowAdaptiveHigh

Interview Q&A

Q1: Why are LLM API calls a good fit for async concurrency?

LLM API calls are I/O bound, not CPU bound. The bottleneck is network latency and server inference time - not Python computation. A typical call spends 95%+ of its time waiting for the remote server. During that wait, the CPU is idle. Async I/O exploits this by running other I/O operations during each call's wait time. The result: 50 concurrent async calls complete in roughly the same time as 1 sequential call, while 50 sequential calls take 50x longer. This is fundamentally different from CPU-bound tasks (where parallelism requires multiprocessing, not asyncio) and explains why asyncio provides dramatic speedups for LLM pipelines specifically.

Q2: What is the difference between asyncio.gather and a producer-consumer queue for LLM batch processing?

asyncio.gather creates all coroutines upfront and runs them concurrently. Simple and efficient for small, bounded datasets where you know all items in advance and memory is not a constraint. Producer-consumer queues are better for: large datasets where you cannot hold all items in memory simultaneously; streaming data sources where items arrive over time; when you want results written incrementally (not accumulated in memory); and when you need fine-grained control over worker lifecycle. Rule of thumb: use gather+semaphore for under 10K items; use producer-consumer queue for 10K+ or when memory is a concern.

Q3: How do you implement a circuit breaker for LLM API calls and what problem does it solve?

A circuit breaker monitors failure rates and stops requests to a degraded service before they pile up. Three states: CLOSED (normal), OPEN (fail fast), HALF_OPEN (testing recovery). Implementation: count errors in a sliding time window; open the circuit when error count exceeds a threshold; after a timeout, allow a test call through; close on success, reopen on failure. The problem it solves: without a circuit breaker, if the LLM API is experiencing a 30-second timeout, 1,000 queued calls each wait 30 seconds before failing = 30,000 seconds of accumulated wait time and resource consumption. With a circuit breaker, after the first 5 timeouts, subsequent calls fail in microseconds, preserving resources and allowing fast failover to a fallback model.

Q4: What is the "thundering herd" problem in LLM retry logic and how does jitter solve it?

When many concurrent clients simultaneously hit rate limits, they all back off for the same duration and retry at the same time - creating a synchronized spike that immediately hits the rate limit again. This cycle repeats, preventing recovery. Without jitter, 1,000 clients might all retry at exactly t+4s, t+8s, t+16s. With full jitter, each client waits a random duration between 0 and the max exponential delay, spreading 1,000 simultaneous retries across 4 seconds - approximately 250 per second - which the rate limiter can handle. AWS recommends "decorrelated jitter" which prevents temporal correlation even between the retry waves of different clients. Always add jitter to any retry loop.

Q5: How do you set the right max_concurrency value for a semaphore in an LLM pipeline?

Empirically. Start conservative (max_concurrency=10), run your workload, and monitor for 429 (rate limit) errors. If none appear, increase to 20 and repeat. Continue increasing until you see occasional 429 errors, then back off by 10-20%. The right value depends on: your API tier's RPM limit, average call duration, and token counts. Mathematical upper bound: RPM_limit / 60 * avg_call_seconds. Example: 4,000 RPM, 2s calls → theoretical max = 133 concurrent. In practice, use 50-70% of theoretical max to leave headroom for bursts. Also consider: memory (each in-flight call holds its messages in memory), cost predictability, and whether your downstream systems can handle the throughput.

© 2026 EngineersOfAI. All rights reserved.