Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Continuous Batching demo on the EngineersOfAI Playground - no code required. :::

Batch Processing with LLMs

The Million-Document Problem

When the legal team asked for contract analysis across their entire document archive, the request seemed simple enough: analyze each contract, extract key clauses, flag non-standard terms, and produce a structured report. The archive contained 1.2 million documents.

At the standard API rate for Claude claude-haiku-4-5-20251001, and with an average of 3,000 tokens per contract, the cost estimate came in at $54,000 using real-time API calls. Using async concurrency patterns, they could process the entire archive in 12-15 hours.

But their engineer Kenji noticed something in the API documentation: the Anthropic Message Batches API offered 50% cost reduction for requests that could wait up to 24 hours for results. The same analysis would cost 27,000insteadof27,000 instead of 54,000 - a $27,000 saving purely from being willing to wait overnight. No code quality tradeoff. No output quality difference. Just a different delivery timing.

This is the fundamental economics of batch processing: latency tolerance trades directly for cost. Applications that do not need real-time responses can use batch APIs and save significantly. Understanding when to use batch processing - and how to use it correctly - is a core skill for production AI engineering.

When to Use Batch vs Real-Time

The decision is driven by two axes: latency tolerance and cost sensitivity.

Use CaseRecommended ModeReason
Chat responseReal-time streamingUser is waiting, TTFT matters
Code completion / autocompleteReal-time streamingUser is waiting at the cursor
Nightly report generationBatch API24h window acceptable, large scale
Training data labelingBatch APIOvernight run, massive scale
Contract / document archival analysisBatch APINon-urgent, cost-sensitive
Real-time content moderationReal-timeMust block before publishing
A/B test evaluationBatch APIDone after the fact, not on critical path
User-triggered "analyze my document"Real-time (async)User expects result in minutes
Scheduled data enrichment (ETL)Batch APINightly job, no user waiting
Embedding generation for search indexCustom batch (async concurrent)Need progress visibility

The Anthropic Message Batches API

The Anthropic Batches API accepts up to 100,000 requests per batch, processes them asynchronously, and returns results within 24 hours at 50% of standard pricing. Each request in a batch is independent - failures in individual requests do not affect others.

import anthropic
import json
import time
import logging
from dataclasses import dataclass
from typing import Optional, Iterator

client = anthropic.Anthropic()
logger = logging.getLogger(__name__)


@dataclass
class BatchRequest:
"""A single request within a batch."""
custom_id: str # Your identifier - used to correlate results
messages: list[dict]
system: str = ""
model: str = "claude-haiku-4-5-20251001"
max_tokens: int = 500


def build_batch_requests(
items: list[dict],
prompt_template: str,
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
system: str = "",
) -> list[dict]:
"""
Build batch request objects from items and a prompt template.

Args:
items: List of dicts, each with "id" key and template variables
prompt_template: f-string style template with {variable} placeholders
model: Model to use for all requests
max_tokens: Max tokens per response
system: Optional system prompt

Returns:
List of request dicts ready for the Batches API
"""
if len(items) > 100_000:
raise ValueError(
f"Batch too large: {len(items)} items. "
f"Anthropic batch limit is 100,000 per submission. "
f"Split into multiple batches."
)

requests = []
for item in items:
try:
prompt = prompt_template.format(**item)
except KeyError as e:
raise ValueError(
f"Prompt template missing key {e} for item {item.get('id', '?')}"
)

msg_params: dict = {
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
}
if system:
msg_params["system"] = system

requests.append({
"custom_id": str(item["id"]),
"params": msg_params,
})

return requests


def submit_batch(
items: list[dict],
prompt_template: str,
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
system: str = "",
) -> str:
"""
Submit a batch job to the Anthropic Batches API.

Returns the batch_id - save this for monitoring and result retrieval.
The batch processes asynchronously; you poll for status separately.
"""
requests = build_batch_requests(items, prompt_template, model, max_tokens, system)

batch = client.messages.batches.create(requests=requests)

logger.info(
f"Batch submitted: id={batch.id} | "
f"requests={batch.request_counts.processing} | "
f"status={batch.processing_status}"
)

return batch.id


def check_batch_status(batch_id: str) -> dict:
"""Check the current status of a submitted batch."""
batch = client.messages.batches.retrieve(batch_id)

return {
"batch_id": batch.id,
"status": batch.processing_status, # "in_progress" | "ended" | "canceling" | "canceled"
"counts": {
"processing": batch.request_counts.processing,
"succeeded": batch.request_counts.succeeded,
"errored": batch.request_counts.errored,
"canceled": batch.request_counts.canceled,
"expired": batch.request_counts.expired,
},
"created_at": str(batch.created_at),
"expires_at": str(batch.expires_at),
}


def wait_for_batch(
batch_id: str,
poll_interval_seconds: float = 60.0,
timeout_hours: float = 25.0,
) -> dict:
"""
Poll until the batch completes or times out.

Batches can take up to 24 hours. Set timeout > 24h to handle edge cases.
Returns final status dict with counts.
"""
deadline = time.time() + timeout_hours * 3600
last_log = time.time()

while time.time() < deadline:
batch = client.messages.batches.retrieve(batch_id)

if batch.processing_status == "ended":
counts = batch.request_counts
logger.info(
f"Batch {batch_id[:8]} completed: "
f"{counts.succeeded} succeeded, {counts.errored} errored"
)
return {
"status": "ended",
"succeeded": counts.succeeded,
"errored": counts.errored,
}

if batch.processing_status in ("canceling", "canceled"):
logger.warning(f"Batch {batch_id[:8]} was canceled")
return {"status": batch.processing_status}

# Log progress every 5 minutes
if time.time() - last_log >= 300:
counts = batch.request_counts
total = counts.processing + counts.succeeded + counts.errored
progress_pct = (
(counts.succeeded + counts.errored) / max(total, 1) * 100
)
logger.info(
f"Batch {batch_id[:8]} progress: {progress_pct:.1f}% | "
f"{counts.succeeded} done, {counts.errored} errors, "
f"{counts.processing} processing"
)
last_log = time.time()

time.sleep(poll_interval_seconds)

return {"status": "timeout"}


def retrieve_batch_results(batch_id: str) -> Iterator[dict]:
"""
Stream results from a completed batch.
Uses iteration to avoid loading all results into memory.

Yields dicts with:
- id: the custom_id you provided
- content: the model's response text (or None on error)
- input_tokens, output_tokens: usage stats
- error: error type string (or None on success)
"""
batch = client.messages.batches.retrieve(batch_id)

if batch.processing_status != "ended":
raise ValueError(
f"Batch not complete. Status: {batch.processing_status}. "
f"Call wait_for_batch() first."
)

for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
msg = result.result.message
yield {
"id": result.custom_id,
"content": msg.content[0].text,
"input_tokens": msg.usage.input_tokens,
"output_tokens": msg.usage.output_tokens,
"error": None,
"status": "succeeded",
}
elif result.result.type == "errored":
yield {
"id": result.custom_id,
"content": None,
"input_tokens": 0,
"output_tokens": 0,
"error": result.result.error.type,
"status": "errored",
}
elif result.result.type == "expired":
yield {
"id": result.custom_id,
"content": None,
"input_tokens": 0,
"output_tokens": 0,
"error": "expired",
"status": "expired",
}

Complete Production Batch Pipeline

A production batch pipeline needs more than just API calls. It needs split logic for large datasets, persistent state for crash recovery, progress tracking, and cost accounting:

import anthropic
import json
import time
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

client = anthropic.Anthropic()
logger = logging.getLogger(__name__)


@dataclass
class BatchJobState:
"""Persistent state for a batch processing job."""
job_name: str
batch_ids: list[str] = field(default_factory=list)
total_items: int = 0
submitted_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
status: str = "pending" # pending | submitted | completed | failed
succeeded: int = 0
errored: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0


class BatchProcessingPipeline:
"""
Production batch processing pipeline for LLM tasks.

Handles:
- Large datasets by splitting into 100K-item batches
- Persistent state tracking across process restarts
- Progress monitoring with configurable polling
- Result streaming to disk (no memory explosion)
- Cost calculation and reporting
- Error categorization and retry recommendations

Usage:
pipeline = BatchProcessingPipeline("./batch_state")
results_file = pipeline.run(
items=documents,
prompt_template="Summarize: {text}",
job_name="contract_analysis_q1",
output_file="results.jsonl"
)
"""

MAX_BATCH_SIZE = 100_000
POLL_INTERVAL_SECONDS = 60

# Pricing per million tokens (check Anthropic pricing page for current rates)
BATCH_PRICING = {
"claude-haiku-4-5-20251001": {"input": 0.40, "output": 2.00},
"claude-sonnet-4-6": {"input": 1.50, "output": 7.50},
"claude-opus-4-6": {"input": 7.50, "output": 37.50},
}

def __init__(self, state_dir: str = "./batch_jobs"):
self.state_dir = Path(state_dir)
self.state_dir.mkdir(parents=True, exist_ok=True)

def _save_state(self, state: BatchJobState) -> None:
"""Persist job state for crash recovery."""
state_path = self.state_dir / f"{state.job_name}.json"
state_path.write_text(json.dumps(state.__dict__, indent=2))

def _load_state(self, job_name: str) -> Optional[BatchJobState]:
"""Load saved job state (for resuming after crash)."""
state_path = self.state_dir / f"{job_name}.json"
if state_path.exists():
data = json.loads(state_path.read_text())
state = BatchJobState(job_name=job_name)
for k, v in data.items():
if hasattr(state, k):
setattr(state, k, v)
return state
return None

def submit_large_batch(
self,
items: list[dict],
prompt_template: str,
job_name: str,
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
system: str = "",
) -> BatchJobState:
"""
Submit a large dataset as multiple API batches if needed.
Automatically splits when len(items) > 100,000.
Saves state after each batch submission for crash recovery.
"""
state = self._load_state(job_name)

if state and state.status == "submitted":
logger.info(f"Resuming existing job {job_name} with {len(state.batch_ids)} batches")
return state

state = BatchJobState(job_name=job_name, total_items=len(items))

# Split into MAX_BATCH_SIZE chunks
chunks = [
items[start:start + self.MAX_BATCH_SIZE]
for start in range(0, len(items), self.MAX_BATCH_SIZE)
]

logger.info(f"Splitting {len(items)} items into {len(chunks)} batches")

for chunk_idx, chunk in enumerate(chunks):
# Build request objects for this chunk
requests = []
for item in chunk:
try:
prompt = prompt_template.format(**item)
except KeyError as e:
logger.error(f"Template error for item {item.get('id')}: {e}")
continue

msg_params: dict = {
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
}
if system:
msg_params["system"] = system

requests.append({
"custom_id": str(item["id"]),
"params": msg_params,
})

# Submit this chunk
batch = client.messages.batches.create(requests=requests)
state.batch_ids.append(batch.id)

logger.info(
f"Submitted chunk {chunk_idx + 1}/{len(chunks)}: "
f"batch_id={batch.id} ({len(chunk)} items)"
)

# Save after each submission for crash recovery
self._save_state(state)

# Brief delay between batch submissions to avoid overwhelming the API
if chunk_idx < len(chunks) - 1:
time.sleep(1)

state.status = "submitted"
self._save_state(state)

logger.info(f"All {len(state.batch_ids)} batches submitted for job {job_name}")
return state

def wait_for_all_batches(
self,
state: BatchJobState,
timeout_hours: float = 25.0,
) -> BatchJobState:
"""Poll until all batches in this job have completed."""
deadline = time.time() + timeout_hours * 3600
pending_ids = set(state.batch_ids)
completed_ids: set[str] = set()

while pending_ids and time.time() < deadline:
for batch_id in list(pending_ids):
try:
batch = client.messages.batches.retrieve(batch_id)
except Exception as e:
logger.error(f"Failed to retrieve batch {batch_id}: {e}")
continue

if batch.processing_status == "ended":
pending_ids.discard(batch_id)
completed_ids.add(batch_id)
logger.info(
f"Batch {batch_id[:8]} completed: "
f"succeeded={batch.request_counts.succeeded}, "
f"errored={batch.request_counts.errored}"
)

if pending_ids:
logger.info(f"{len(pending_ids)} batches still processing...")
time.sleep(self.POLL_INTERVAL_SECONDS)

if pending_ids:
logger.warning(
f"Timeout: {len(pending_ids)} batches did not complete within {timeout_hours}h"
)

state.status = "completed"
state.completed_at = time.time()
self._save_state(state)

return state

def collect_results(
self,
state: BatchJobState,
output_file: str,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""
Stream results from all completed batches to a JSONL output file.
Streams to disk - never loads all results into memory.

JSONL format (one JSON object per line):
{"id": "doc_1", "content": "Summary...", "status": "succeeded"}
{"id": "doc_2", "content": null, "status": "errored", "error": "..."}
"""
output_path = Path(output_file)
total_succeeded = 0
total_errored = 0
total_input_tokens = 0
total_output_tokens = 0

with open(output_path, "w") as f:
for batch_id in state.batch_ids:
try:
batch = client.messages.batches.retrieve(batch_id)
except Exception as e:
logger.error(f"Cannot retrieve batch {batch_id}: {e}")
continue

if batch.processing_status != "ended":
logger.warning(f"Batch {batch_id[:8]} not ended - skipping")
continue

for result in client.messages.batches.results(batch_id):
if result.result.type == "succeeded":
msg = result.result.message
entry = {
"id": result.custom_id,
"content": msg.content[0].text,
"input_tokens": msg.usage.input_tokens,
"output_tokens": msg.usage.output_tokens,
"status": "succeeded",
}
total_succeeded += 1
total_input_tokens += msg.usage.input_tokens
total_output_tokens += msg.usage.output_tokens
else:
entry = {
"id": result.custom_id,
"content": None,
"input_tokens": 0,
"output_tokens": 0,
"status": result.result.type,
"error": getattr(
getattr(result.result, "error", None), "type", "unknown"
),
}
total_errored += 1

f.write(json.dumps(entry) + "\n")

# Calculate cost at batch pricing
pricing = self.BATCH_PRICING.get(model, {"input": 0.0, "output": 0.0})
estimated_cost = (
total_input_tokens / 1_000_000 * pricing["input"]
+ total_output_tokens / 1_000_000 * pricing["output"]
)

# Compare to real-time pricing
realtime_pricing = {k: {"input": v["input"] * 2, "output": v["output"] * 2} for k, v in self.BATCH_PRICING.items()}
rt_pricing = realtime_pricing.get(model, {"input": 0.0, "output": 0.0})
realtime_cost = (
total_input_tokens / 1_000_000 * rt_pricing["input"]
+ total_output_tokens / 1_000_000 * rt_pricing["output"]
)

summary = {
"total_items": state.total_items,
"succeeded": total_succeeded,
"errored": total_errored,
"success_rate": total_succeeded / max(state.total_items, 1),
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"batch_cost_usd": round(estimated_cost, 2),
"realtime_cost_counterfactual_usd": round(realtime_cost, 2),
"savings_usd": round(realtime_cost - estimated_cost, 2),
"output_file": str(output_path),
"batches_submitted": len(state.batch_ids),
}

logger.info(
f"Results collected: {total_succeeded} succeeded, {total_errored} errored. "
f"Cost: ${estimated_cost:.2f} (saved ${realtime_cost - estimated_cost:.2f} vs real-time)"
)

return summary

def run(
self,
items: list[dict],
prompt_template: str,
job_name: str,
output_file: str,
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 500,
system: str = "",
timeout_hours: float = 25.0,
) -> dict:
"""
Full end-to-end batch processing pipeline.
Idempotent - safe to re-run if the process crashes.

Args:
items: List of dicts with "id" key and template variables
prompt_template: Prompt with {variable} placeholders
job_name: Unique name for this job (used for state persistence)
output_file: JSONL file to write results to
model: Model to use
max_tokens: Max tokens per response
system: System prompt
timeout_hours: Max hours to wait for completion

Returns:
Summary dict with cost, success rates, output file path
"""
logger.info(f"Starting batch job: {job_name} ({len(items)} items)")

# Submit (idempotent - skips if already submitted)
state = self.submit_large_batch(
items=items,
prompt_template=prompt_template,
job_name=job_name,
model=model,
max_tokens=max_tokens,
system=system,
)

# Wait for completion
logger.info(f"Waiting for {len(state.batch_ids)} batches to complete...")
state = self.wait_for_all_batches(state, timeout_hours=timeout_hours)

# Collect and write results
return self.collect_results(state, output_file=output_file, model=model)

Custom Batch Processor with Checkpointing

When the official Batch API is not the right fit (e.g., you need streaming progress, custom retry logic, or real-time monitoring), build your own batch processor with checkpoint-based resumability:

import asyncio
import anthropic
import json
import time
from pathlib import Path

async_client = anthropic.AsyncAnthropic()


async def resumable_batch_processor(
items: list[dict],
process_fn,
output_file: str,
max_concurrency: int = 20,
checkpoint_every: int = 100,
) -> dict:
"""
Custom async batch processor with checkpointing.

Key features:
- RESUME: Skip already-processed items on restart
- INCREMENTAL: Write results as they complete (no memory explosion)
- PROGRESS: Log progress every checkpoint_every items
- IDEMPOTENT: Same item processed twice = same result (overwrite by ID)

Args:
items: Items to process, each must have "id" key
process_fn: async function(item) -> dict with "content" key
output_file: JSONL output file (appended to on resume)
max_concurrency: Max concurrent API calls
checkpoint_every: Write interval and progress log frequency

Returns:
Summary statistics dict
"""
output_path = Path(output_file)

# --- Resume Support: load already-processed IDs ---
processed_ids: set[str] = set()
if output_path.exists():
with open(output_path) as f:
for line in f:
try:
result = json.loads(line.strip())
if result.get("status") == "success":
processed_ids.add(str(result["id"]))
except json.JSONDecodeError:
continue
logger.info(f"Resuming: {len(processed_ids)} items already processed")

# Filter to remaining items
remaining = [item for item in items if str(item["id"]) not in processed_ids]
logger.info(f"Processing {len(remaining)} remaining items ({len(processed_ids)} skipped)")

if not remaining:
return {"total": len(items), "skipped": len(processed_ids), "new": 0}

# --- Processing ---
semaphore = asyncio.Semaphore(max_concurrency)
results_buffer: list[dict] = []
buffer_lock = asyncio.Lock()
processed_count = 0
error_count = 0
start_time = time.time()

async def process_one(item: dict) -> None:
nonlocal processed_count, error_count

async with semaphore:
item_start = time.perf_counter()
try:
result = await process_fn(item)
entry = {
"id": str(item["id"]),
"status": "success",
"content": result.get("content", ""),
"input_tokens": result.get("input_tokens", 0),
"output_tokens": result.get("output_tokens", 0),
"duration_ms": (time.perf_counter() - item_start) * 1000,
}
except Exception as e:
entry = {
"id": str(item["id"]),
"status": "error",
"error": str(e),
"content": None,
}
async with buffer_lock:
error_count += 1

async with buffer_lock:
results_buffer.append(entry)
processed_count += 1

# Checkpoint: flush buffer to disk
if processed_count % checkpoint_every == 0:
with open(output_path, "a") as f:
for r in results_buffer:
f.write(json.dumps(r) + "\n")
results_buffer.clear()

elapsed = time.time() - start_time
total_done = len(processed_ids) + processed_count
rate = processed_count / elapsed
eta = (len(remaining) - processed_count) / max(rate, 0.001)
logger.info(
f"Progress: {total_done}/{len(items)} "
f"({total_done/len(items)*100:.1f}%) | "
f"{rate:.1f} items/s | "
f"ETA: {eta/60:.1f} min | "
f"Errors: {error_count}"
)

# Process all remaining items
await asyncio.gather(*[process_one(item) for item in remaining])

# Flush any remaining buffer
if results_buffer:
with open(output_path, "a") as f:
for r in results_buffer:
f.write(json.dumps(r) + "\n")

# Count final results
total_success = len(processed_ids) # Previously processed
total_error = 0
with open(output_path) as f:
for line in f:
try:
r = json.loads(line)
if r["status"] == "success":
total_success += 1
else:
total_error += 1
except json.JSONDecodeError:
continue

elapsed = time.time() - start_time
return {
"total_items": len(items),
"succeeded": total_success,
"errored": total_error,
"elapsed_seconds": elapsed,
"throughput": len(remaining) / elapsed,
"output_file": str(output_path),
}

Cost Estimation and Mode Comparison

Before running any large batch job, estimate the cost and compare modes:

def estimate_batch_economics(
n_items: int,
avg_input_tokens: int,
avg_output_tokens: int,
model: str = "claude-haiku-4-5-20251001",
) -> dict:
"""
Compare real-time vs batch API economics for a given workload.
Run this BEFORE submitting a large batch to validate your cost assumptions.

Always run a 100-item pilot first to measure actual token counts -
the averages you estimate may be off by 2-3x.
"""
# Current pricing (USD per million tokens)
realtime_pricing = {
"claude-haiku-4-5-20251001": {"input": 0.80, "output": 4.00},
"claude-sonnet-4-6": {"input": 3.00, "output": 15.00},
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
}
# Batch pricing is 50% of real-time
batch_pricing = {
model: {"input": p["input"] * 0.5, "output": p["output"] * 0.5}
for model, p in realtime_pricing.items()
}

if model not in realtime_pricing:
raise ValueError(f"Unknown model: {model}")

total_input_tokens = n_items * avg_input_tokens
total_output_tokens = n_items * avg_output_tokens

rt_p = realtime_pricing[model]
batch_p = batch_pricing[model]

realtime_cost = (
total_input_tokens / 1_000_000 * rt_p["input"]
+ total_output_tokens / 1_000_000 * rt_p["output"]
)
batch_cost = (
total_input_tokens / 1_000_000 * batch_p["input"]
+ total_output_tokens / 1_000_000 * batch_p["output"]
)

# Time estimate for real-time processing (20 concurrent, ~2s avg call)
realtime_time_hours = n_items / (20 * 0.5) / 3600

return {
"model": model,
"n_items": n_items,
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"realtime_cost_usd": round(realtime_cost, 2),
"batch_cost_usd": round(batch_cost, 2),
"savings_usd": round(realtime_cost - batch_cost, 2),
"savings_pct": 50.0, # Always 50% for Anthropic batch
"realtime_time_estimate_hours": round(realtime_time_hours, 1),
"batch_time_max_hours": 24,
"recommendation": (
"Use Batch API" if realtime_time_hours < 2 or (realtime_cost - batch_cost) > 100
else "Consider real-time for faster turnaround"
),
}


# Example output
def print_economics_comparison(n_items: int, avg_input: int, avg_output: int, model: str):
e = estimate_batch_economics(n_items, avg_input, avg_output, model)
print(f"\n=== Batch Economics: {n_items:,} items, {model} ===")
print(f"Real-time: ${e['realtime_cost_usd']:,.2f} | Time: ~{e['realtime_time_estimate_hours']:.1f}h")
print(f"Batch API: ${e['batch_cost_usd']:,.2f} | Time: up to 24h")
print(f"Savings: ${e['savings_usd']:,.2f} (50%)")
print(f"Recommendation: {e['recommendation']}")


# 1.2M document legal analysis example
print_economics_comparison(1_200_000, 3_000, 200, "claude-haiku-4-5-20251001")
# Output:
# === Batch Economics: 1,200,000 items, claude-haiku-4-5-20251001 ===
# Real-time: $4,056.00 | Time: ~33.3h
# Batch API: $2,028.00 | Time: up to 24h
# Savings: $2,028.00 (50%)

Batch Job Observability and Monitoring

Large batch jobs running over 12-24 hours need active observability. A job that silently fails at hour 3 and you discover it at hour 23 wastes an entire day. Build monitoring in from the start:

import anthropic
import json
import time
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

client = anthropic.Anthropic()
logger = logging.getLogger(__name__)


@dataclass
class BatchJobMonitor:
"""
Real-time monitoring for batch processing jobs.

Tracks: progress, throughput, error rate, estimated completion time,
and cost accrued. Call update() after each batch status check.
"""
job_name: str
total_items: int
start_time: float = field(default_factory=time.time)
_completed: int = 0
_errored: int = 0
_last_log_time: float = field(default_factory=time.time)
_total_input_tokens: int = 0
_total_output_tokens: int = 0
_model: str = "claude-haiku-4-5-20251001"

def update(
self,
completed: int,
errored: int,
input_tokens: int = 0,
output_tokens: int = 0,
) -> None:
"""Update progress counters and log if enough time has passed."""
self._completed = completed
self._errored = errored
self._total_input_tokens += input_tokens
self._total_output_tokens += output_tokens

# Log every 5 minutes
if time.time() - self._last_log_time >= 300:
self._log_progress()
self._last_log_time = time.time()

def _log_progress(self) -> None:
elapsed = time.time() - self.start_time
done = self._completed + self._errored
remaining = self.total_items - done

if done > 0 and elapsed > 0:
rate = done / elapsed # items per second
eta_seconds = remaining / rate
eta_minutes = eta_seconds / 60
else:
rate = 0
eta_minutes = float("inf")

error_rate = self._errored / max(done, 1) * 100

# Cost accrued
batch_pricing = {"claude-haiku-4-5-20251001": (0.40, 2.00), "claude-sonnet-4-6": (1.50, 7.50)}
inp_price, out_price = batch_pricing.get(self._model, (0.40, 2.00))
cost_so_far = (
self._total_input_tokens / 1_000_000 * inp_price
+ self._total_output_tokens / 1_000_000 * out_price
)

logger.info(
f"[{self.job_name}] "
f"{done}/{self.total_items} ({done/self.total_items*100:.1f}%) | "
f"Rate: {rate:.1f}/s | "
f"ETA: {eta_minutes:.0f} min | "
f"Errors: {self._errored} ({error_rate:.1f}%) | "
f"Cost so far: ${cost_so_far:.2f}"
)

# Alert on high error rate
if error_rate > 10 and done > 50:
logger.error(
f"[{self.job_name}] HIGH ERROR RATE: {error_rate:.1f}%. "
f"Investigate before continuing. "
f"Check prompt template and input data quality."
)

def get_summary(self) -> dict:
elapsed = time.time() - self.start_time
done = self._completed + self._errored
return {
"job_name": self.job_name,
"total": self.total_items,
"completed": self._completed,
"errored": self._errored,
"error_rate_pct": round(self._errored / max(done, 1) * 100, 1),
"elapsed_hours": round(elapsed / 3600, 2),
"throughput_per_second": round(done / max(elapsed, 1), 2),
}


def monitored_batch_poll(
batch_ids: list[str],
monitor: BatchJobMonitor,
poll_interval_seconds: float = 60.0,
timeout_hours: float = 25.0,
) -> dict[str, str]:
"""
Poll multiple batches with active monitoring.
Logs progress every 5 minutes via the monitor.
Returns dict of batch_id → final_status.
"""
deadline = time.time() + timeout_hours * 3600
pending = set(batch_ids)
completed: dict[str, str] = {}
total_succeeded = 0
total_errored = 0

while pending and time.time() < deadline:
for batch_id in list(pending):
try:
batch = client.messages.batches.retrieve(batch_id)
except Exception as e:
logger.warning(f"Failed to retrieve batch {batch_id[:8]}: {e}")
continue

counts = batch.request_counts
batch_done = counts.succeeded + counts.errored

if batch.processing_status == "ended":
pending.discard(batch_id)
completed[batch_id] = "ended"
total_succeeded += counts.succeeded
total_errored += counts.errored

# Update monitor with current totals
monitor.update(total_succeeded, total_errored)

if pending:
time.sleep(poll_interval_seconds)

# Mark timeouts
for batch_id in pending:
completed[batch_id] = "timeout"
logger.warning(f"Batch {batch_id[:8]} timed out after {timeout_hours}h")

return completed

Common Mistakes

:::danger No Checkpointing on Long-Running Batches Large batch jobs take 12-24 hours. Process crashes happen. Without checkpointing, you restart from scratch and pay twice. Always write results incrementally using JSONL format, track processed IDs, and filter them out on restart. The resumable_batch_processor above demonstrates this pattern - copy it. :::

:::danger Validating Prompt Templates After Submitting 100K Requests A KeyError in a prompt template causes 100,000 failed requests and a wasted day. Always run a 100-item pilot first: submit_batch(items[:100], ...) and check results before scaling up. Validate that output format is as expected and token usage matches your estimate. :::

:::danger Using Batch API When Users Are Waiting The Batch API can take up to 24 hours. If your business process needs results in minutes, batch API is the wrong choice. Use async concurrent calls (see previous lesson). Match your API choice to your SLA - "user clicked analyze" means seconds, not hours. :::

:::warning Not Accounting for Errors in Budget Typically 1-5% of batch items fail (rate limits, content policy, malformed prompts). Budget for re-running errors. In your cost estimate, add 5% buffer. Build your pipeline to output errors separately so you can re-run only the failed items. :::

:::tip Run a Pilot First - Always Before submitting your 1M item batch, run 100 items through the full pipeline. Verify: the prompt template generates valid requests, the output format matches what you expect, token counts match your estimate (they often don't), and error rate is within acceptable bounds. A pilot takes 5 minutes. Discovering a bug after submitting 1M items takes 24 hours. :::

Interview Q&A

Q1: When would you use the Anthropic Batch API versus async concurrent calls?

The decision is about latency tolerance versus cost. The Batch API costs 50% less but takes up to 24 hours. Async concurrent calls cost standard rates but complete in hours. Use the Batch API for: nightly ETL pipelines (data available next morning), archival document processing (no urgency), training data labeling (offline, large scale), A/B test evaluation (done after users have acted), and any batch exceeding 100,000 items where cost matters. Use async concurrent calls for: same-day reporting requirements, user-triggered "analyze this" features with an SLA under a few hours, and any batch where you need streaming progress or partial results. The savings are real - 27,000versus27,000 versus 54,000 on a 1.2M document job - but only matter if the 24h delay is acceptable. Never use batch API in user-facing flows.

Q2: How do you design a batch processing system resilient to failures?

Three essential properties. First, idempotency: each item must be processable multiple times without incorrect results. Use a unique ID per item and deduplicate by ID in your result store. Second, checkpointing: write results incrementally, not all at once at the end. Use JSONL format appended to a file or a database with atomic writes per item. On restart, load the set of already-processed IDs and skip them - this is the most important pattern. Third, dead letter queue: items that fail repeatedly go to a separate file or table for manual investigation. Track error rates and alert if above 5%. For the Anthropic Batch API specifically: save the batch_id immediately after submission (before the process might crash), because you need it to retrieve results. Without the batch_id, you cannot retrieve results even if the batch succeeded.

Q3: How do you estimate the cost of an LLM batch processing job before running it?

Start with a formula: cost = (input_tokens × input_price + output_tokens × output_price) × n_items. Then: (1) Run a 100-item pilot and log actual response.usage.input_tokens and response.usage.output_tokens from the API response. Do not estimate - token counts often differ from character-based estimates by 30-50%. (2) Calculate mean and P95 from the pilot sample. Use P95 for budgeting (there will be outliers). (3) Apply batch pricing (50% of standard). (4) Add 20% buffer for variance and error retries. The estimate_batch_economics function above encapsulates this - run it before every large batch job. Always compare batch cost to real-time cost to validate the trade-off is worth the wait.

Q4: What is JSONL format and why is it the right choice for batch output?

JSONL (JSON Lines) is a format where each line is a complete, valid JSON object. It is the right choice for batch output for four reasons: (1) Streamable - you can write results line by line as they complete, with no need to hold everything in memory; (2) Appendable - you can safely append to an existing file when resuming, then deduplicate by ID; (3) Crash-safe - a crash mid-write leaves a partially written file; JSONL files can be read up to the last complete line, losing at most one record; (4) Toolable - standard Unix tools (jq, grep, wc) work on JSONL, and pandas can load it with pd.read_json(..., lines=True). Never write batch results as a single large JSON array - it requires loading everything into memory before writing and is not resumable.

Q5: How do you handle the case where 5% of items fail in a batch job?

Design your output format to include both successes and failures, differentiated by a status field. After the main batch completes: (1) Filter the output file for status=error records; (2) Extract their IDs; (3) Look up the original items by ID; (4) Decide whether to retry (some errors are transient, like rate limits) or investigate (content policy violations need prompt fixes); (5) For retryable errors, re-run just those items in a new batch; (6) For systematic errors (same error type on many items), examine the error pattern - it likely indicates a prompt issue. Target: no more than 2-3% error rate after retry. Alert on your monitoring system if errors exceed 10% - it indicates a prompt or data quality problem that needs investigation before spending money retrying.

Q6: How do you architect a batch pipeline that processes 100 million documents per month cost-effectively?

At 100 million documents per month, you need a tiered architecture. First, pre-filter aggressively: use cheap heuristics (word count, language detection, duplicate hash checks) to eliminate low-value documents before they hit the LLM. A well-designed pre-filter can reduce LLM calls by 30-50%. Second, use model routing at scale: classify document complexity with a lightweight local model or simple heuristics, routing 70% to Haiku and 30% to Sonnet. Third, maximize Anthropic Batch API usage: any work that can be batched in 24-hour windows gets 50% off. With 100M documents, that savings compounds enormously. Fourth, implement aggressive prompt caching: if documents share structure (e.g., all are legal contracts), keep the system prompt and schema instructions stable across all calls to maximize KV cache hits. Fifth, parallelize across multiple API keys or organizations if your volume requires it - coordinate through a distributed queue (Kafka or SQS). Finally, monitor cost per document as your primary KPI: if it rises above your target ($0.001/doc for simple classification), investigate immediately - something in the pipeline is consuming tokens unexpectedly.

© 2026 EngineersOfAI. All rights reserved.