Parallel Agent Execution
Reading time: ~38 minutes | Relevance: High for performance-critical agent systems | Target roles: AI Engineer, ML Engineer, Backend Engineer
The Scenario
You're building a competitive intelligence system. It needs to research 12 companies, analyze each company's recent moves, and produce a comparative summary.
Sequential approach: 12 research calls + 12 analysis calls = 24 sequential LLM calls. At 3 seconds each, that's 72 seconds minimum.
Parallel approach: 12 research calls simultaneously, 12 analysis calls simultaneously = 2 rounds of parallel calls. At 3 seconds per round, that's 6 seconds.
Same quality. 12x faster. This is why parallel execution matters.
:::tip 🎮 Interactive Playground Visualize this concept: Try the Parallel Agent Execution demo on the EngineersOfAI Playground - no code required. :::
Why This Exists
The LLM API world is parallel by nature. When you make a request to Anthropic's API, your request runs on a cluster of GPUs. There's no reason you can't have 20 requests running simultaneously - each on a different part of the cluster.
Most early agentic systems ignored this. They ran everything sequentially because it was simpler to code. As these systems moved to production and needed to process 100+ items, the sequential approach became untenable.
Python's asyncio library, combined with async HTTP clients, makes parallel agent execution straightforward. The anthropic Python SDK is async-native. You already have everything you need.
When Parallelism Applies
Parallelism requires independence: tasks that can run simultaneously must not depend on each other's outputs.
✅ Independent - can parallelize:
research("Apple")
research("Google")
research("Microsoft")
(none depends on the others)
❌ Dependent - cannot parallelize:
research_result = research("AI trends")
analysis = analyze(research_result) # needs research first
report = write(analysis) # needs analysis first
The key question: does task B need task A's output to start? If yes, they're sequential. If no, they can run in parallel.
Python Async for Agent Parallelism
The Anthropic SDK supports async natively via AsyncAnthropic. This enables proper non-blocking parallel execution:
import asyncio
import anthropic
# Use AsyncAnthropic for parallel calls
async_client = anthropic.AsyncAnthropic()
async def research_async(topic: str) -> str:
"""Async research agent - non-blocking."""
response = await async_client.messages.create(
model="claude-opus-4-5",
max_tokens=800,
system="You are a research agent. Return structured research findings.",
messages=[{"role": "user", "content": f"Research: {topic}"}]
)
return response.content[0].text
async def parallel_research(topics: list[str]) -> list[str]:
"""Research all topics simultaneously."""
# asyncio.gather runs all coroutines concurrently
results = await asyncio.gather(*[
research_async(topic) for topic in topics
])
return list(results)
# Usage
topics = ["RAG systems", "Agent memory", "Tool use", "Multi-agent coordination"]
results = asyncio.run(parallel_research(topics))
asyncio.gather is the core primitive. It starts all coroutines simultaneously and waits for all of them to complete, returning results in the same order as the inputs.
asyncio.create_task() for More Control
When you need to start tasks and handle them as they complete (not wait for all):
async def parallel_with_progress(topics: list[str]) -> list[str]:
"""Research topics in parallel, showing progress as each completes."""
tasks = {
asyncio.create_task(research_async(topic)): topic
for topic in topics
}
results = {}
for coro in asyncio.as_completed(tasks.keys()):
# Returns results as they complete, not in input order
result = await coro
# Note: finding topic from completed task requires tracking
print(f"[Parallel] One research completed ({len(result)} chars)")
results[len(results)] = result
return list(results.values())
asyncio.as_completed() yields futures as they finish - useful for showing progress or processing results immediately when available.
Thread-Based Parallelism
When agent tools are synchronous and blocking (filesystem operations, subprocess calls, synchronous HTTP clients), asyncio alone won't help. Use ThreadPoolExecutor:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import anthropic
# Sync client for thread-based execution
sync_client = anthropic.Anthropic()
def blocking_agent_call(topic: str) -> str:
"""A blocking (sync) agent call."""
response = sync_client.messages.create(
model="claude-opus-4-5",
max_tokens=800,
system="Research agent",
messages=[{"role": "user", "content": f"Research: {topic}"}]
)
return response.content[0].text
async def parallel_with_threads(topics: list[str], max_workers: int = 5) -> list[str]:
"""Run blocking agents in a thread pool."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# run_in_executor wraps blocking calls as awaitables
tasks = [
loop.run_in_executor(executor, blocking_agent_call, topic)
for topic in topics
]
results = await asyncio.gather(*tasks)
return list(results)
Rule of thumb: Use asyncio + async SDK for pure LLM calls. Use ThreadPoolExecutor when agents use blocking I/O (file reads, sync HTTP). Use ProcessPoolExecutor only for CPU-bound work (rare in LLM agents).
Worker Pool Pattern
For large-scale parallel execution where you want to limit concurrency (protecting API rate limits, controlling costs):
import asyncio
from asyncio import Queue, Semaphore
class AgentWorkerPool:
"""
Manages a fixed number of concurrent agent workers.
Queues excess work until a worker is available.
"""
def __init__(self, max_concurrent: int, agent_fn):
self.max_concurrent = max_concurrent
self.agent_fn = agent_fn
self._semaphore = Semaphore(max_concurrent)
self._completed = 0
self._failed = 0
async def run_task(self, task: dict) -> dict:
"""Run a single task, respecting the concurrency limit."""
async with self._semaphore:
try:
result = await self.agent_fn(task["content"])
self._completed += 1
return {
"task_id": task["id"],
"success": True,
"result": result
}
except Exception as e:
self._failed += 1
return {
"task_id": task["id"],
"success": False,
"error": str(e)
}
async def run_all(self, tasks: list[dict]) -> list[dict]:
"""Run all tasks with bounded concurrency."""
print(f"[WorkerPool] Running {len(tasks)} tasks, max_concurrent={self.max_concurrent}")
coroutines = [self.run_task(task) for task in tasks]
results = await asyncio.gather(*coroutines)
print(f"[WorkerPool] Done: {self._completed} succeeded, {self._failed} failed")
return list(results)
Fan-Out / Fan-In Pattern
The most common parallel pattern: distribute N tasks to N agents, aggregate results.
async def fan_out_fan_in(
main_task: str,
async_client: anthropic.AsyncAnthropic
) -> str:
"""
Fan-out: decompose task into subtasks.
Execute subtasks in parallel.
Fan-in: synthesize results.
"""
# Phase 1: Decompose (sequential - synthesis needs all parts)
decomp_response = await async_client.messages.create(
model="claude-opus-4-5",
max_tokens=400,
system=(
"Decompose the task into exactly 4 independent subtasks. "
"Output JSON: {\"subtasks\": [\"...\", \"...\", \"...\", \"...\"]}"
),
messages=[{"role": "user", "content": f"Task: {main_task}"}]
)
raw = decomp_response.content[0].text
import json, re
match = re.search(r'\{.*\}', raw, re.DOTALL)
subtasks = json.loads(match.group())["subtasks"] if match else [main_task]
print(f"[FanOut] Decomposed into {len(subtasks)} subtasks")
# Phase 2: Fan-out (parallel execution)
async def execute_subtask(subtask: str) -> str:
response = await async_client.messages.create(
model="claude-opus-4-5",
max_tokens=600,
system="Execute this specific subtask thoroughly and concisely.",
messages=[{"role": "user", "content": subtask}]
)
return response.content[0].text
subtask_results = await asyncio.gather(*[
execute_subtask(st) for st in subtasks
])
print(f"[FanIn] Received {len(subtask_results)} subtask results")
# Phase 3: Fan-in (synthesis - sequential, needs all results)
synthesis_context = "\n\n".join([
f"Subtask {i+1}: {st}\nResult: {res}"
for i, (st, res) in enumerate(zip(subtasks, subtask_results))
])
synthesis_response = await async_client.messages.create(
model="claude-opus-4-5",
max_tokens=1000,
system="Synthesize the subtask results into a coherent, comprehensive final answer.",
messages=[{"role": "user", "content": f"Original task: {main_task}\n\n{synthesis_context}"}]
)
return synthesis_response.content[0].text
Dependency-Aware Parallelism: DAG Execution
Some tasks have partial dependencies - some subtasks are independent, some depend on others. A DAG (Directed Acyclic Graph) scheduler handles this:
import asyncio
from collections import defaultdict
from typing import Optional
class DAGTask:
def __init__(self, name: str, fn, depends_on: list[str] = None):
self.name = name
self.fn = fn
self.depends_on = depends_on or []
self.result: Optional[str] = None
self.completed = asyncio.Event()
class DAGExecutor:
"""
Executes tasks in dependency order.
Tasks with no pending dependencies run in parallel.
"""
def __init__(self):
self._tasks: dict[str, DAGTask] = {}
def add_task(self, task: DAGTask):
self._tasks[task.name] = task
async def execute(self) -> dict[str, str]:
"""Execute all tasks respecting dependencies, maximizing parallelism."""
async def run_task(task: DAGTask):
# Wait for all dependencies to complete
for dep_name in task.depends_on:
dep_task = self._tasks[dep_name]
await dep_task.completed.wait()
# Gather dependency results for context
dep_results = {
dep: self._tasks[dep].result
for dep in task.depends_on
}
print(f"[DAG] Running: {task.name}")
task.result = await task.fn(dep_results)
task.completed.set()
print(f"[DAG] Completed: {task.name}")
# Start all tasks simultaneously - asyncio handles dependency ordering
await asyncio.gather(*[run_task(t) for t in self._tasks.values()])
return {name: task.result for name, task in self._tasks.items()}
# Example: Research pipeline with partial dependencies
async def demo_dag():
async_client = anthropic.AsyncAnthropic()
async def research_markets(deps: dict) -> str:
r = await async_client.messages.create(
model="claude-opus-4-5", max_tokens=400,
messages=[{"role": "user", "content": "Research AI market size 2025"}]
)
return r.content[0].text
async def research_competitors(deps: dict) -> str:
r = await async_client.messages.create(
model="claude-opus-4-5", max_tokens=400,
messages=[{"role": "user", "content": "Research top AI companies 2025"}]
)
return r.content[0].text
async def research_trends(deps: dict) -> str:
r = await async_client.messages.create(
model="claude-opus-4-5", max_tokens=400,
messages=[{"role": "user", "content": "Research AI trends 2025"}]
)
return r.content[0].text
async def synthesis(deps: dict) -> str:
context = "\n\n".join([f"{k}:\n{v}" for k, v in deps.items()])
r = await async_client.messages.create(
model="claude-opus-4-5", max_tokens=600,
messages=[{"role": "user", "content": f"Synthesize into analysis:\n{context}"}]
)
return r.content[0].text
executor = DAGExecutor()
# These 3 run in parallel (no dependencies)
executor.add_task(DAGTask("markets", research_markets))
executor.add_task(DAGTask("competitors", research_competitors))
executor.add_task(DAGTask("trends", research_trends))
# This depends on all 3 above - runs after they complete
executor.add_task(DAGTask(
"synthesis", synthesis,
depends_on=["markets", "competitors", "trends"]
))
results = await executor.execute()
print(f"\n[DAG] Synthesis:\n{results['synthesis']}")
return results
Full Python Code: Complete Parallel Agent Executor
"""
parallel_agent_executor.py
Production parallel agent execution with:
- asyncio.gather for simple fan-out
- Semaphore-based concurrency control
- DAG-based dependency scheduling
- Rate limiting
- Partial failure handling
- Execution statistics
"""
import asyncio
import time
import uuid
from dataclasses import dataclass, field
from typing import Optional, Callable, Any
import anthropic
async_client = anthropic.AsyncAnthropic()
MODEL = "claude-opus-4-5"
@dataclass
class TaskResult:
task_id: str
task_name: str
success: bool
output: Optional[str]
error: Optional[str]
duration_ms: int
tokens_used: int = 0
@dataclass
class ExecutionStats:
total_tasks: int = 0
succeeded: int = 0
failed: int = 0
total_duration_ms: int = 0
total_tokens: int = 0
wall_clock_ms: int = 0
@property
def speedup(self) -> float:
"""How much faster than sequential execution."""
if self.wall_clock_ms == 0:
return 0
sequential_estimate = self.total_duration_ms
return sequential_estimate / self.wall_clock_ms
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, requests_per_minute: int):
self._rpm = requests_per_minute
self._min_interval = 60.0 / requests_per_minute
self._last_call = 0.0
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
elapsed = now - self._last_call
if elapsed < self._min_interval:
await asyncio.sleep(self._min_interval - elapsed)
self._last_call = time.time()
class ParallelAgentExecutor:
"""
Executes multiple agent tasks in parallel with:
- Configurable concurrency limit
- Rate limiting
- Partial failure handling
- Full execution statistics
"""
def __init__(
self,
max_concurrent: int = 10,
requests_per_minute: int = 60,
system_prompt: str = "You are a helpful AI assistant."
):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._rate_limiter = RateLimiter(requests_per_minute)
self._system_prompt = system_prompt
self._results: list[TaskResult] = []
async def _execute_single(self, task_id: str, task_name: str, content: str) -> TaskResult:
"""Execute a single task with rate limiting and concurrency control."""
async with self._semaphore:
await self._rate_limiter.acquire()
start = time.time()
try:
response = await async_client.messages.create(
model=MODEL,
max_tokens=600,
system=self._system_prompt,
messages=[{"role": "user", "content": content}]
)
duration_ms = int((time.time() - start) * 1000)
return TaskResult(
task_id=task_id,
task_name=task_name,
success=True,
output=response.content[0].text,
error=None,
duration_ms=duration_ms,
tokens_used=response.usage.input_tokens + response.usage.output_tokens
)
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
return TaskResult(
task_id=task_id,
task_name=task_name,
success=False,
output=None,
error=str(e),
duration_ms=duration_ms
)
async def execute_all(
self,
tasks: list[dict], # [{"name": str, "content": str}, ...]
fail_fast: bool = False
) -> tuple[list[TaskResult], ExecutionStats]:
"""
Execute all tasks in parallel.
fail_fast=True: raise on first failure
fail_fast=False: collect all results, partial failures OK
"""
wall_start = time.time()
tagged_tasks = [
(str(uuid.uuid4())[:8], t["name"], t["content"])
for t in tasks
]
print(f"[Executor] Starting {len(tasks)} tasks in parallel (max_concurrent={self._semaphore._value})")
coroutines = [
self._execute_single(tid, name, content)
for tid, name, content in tagged_tasks
]
if fail_fast:
results = await asyncio.gather(*coroutines) # raises on exception
else:
results = await asyncio.gather(*coroutines, return_exceptions=False)
wall_clock_ms = int((time.time() - wall_start) * 1000)
results = list(results)
stats = ExecutionStats(
total_tasks=len(results),
succeeded=sum(1 for r in results if r.success),
failed=sum(1 for r in results if not r.success),
total_duration_ms=sum(r.duration_ms for r in results),
total_tokens=sum(r.tokens_used for r in results),
wall_clock_ms=wall_clock_ms
)
print(f"[Executor] Complete: {stats.succeeded}/{stats.total_tasks} succeeded")
print(f"[Executor] Wall clock: {wall_clock_ms}ms | Sum of durations: {stats.total_duration_ms}ms")
print(f"[Executor] Speedup vs sequential: {stats.speedup:.1f}x")
print(f"[Executor] Total tokens: {stats.total_tokens}")
return results, stats
async def execute_pipeline(self, stages: list[list[dict]]) -> list[list[TaskResult]]:
"""
Execute a multi-stage pipeline where each stage runs in parallel
but stages are sequential (stage 2 starts after stage 1 completes).
"""
all_stage_results = []
for i, stage in enumerate(stages):
print(f"\n[Pipeline] Stage {i+1}/{len(stages)}: {len(stage)} tasks")
results, stats = await self.execute_all(stage)
all_stage_results.append(results)
failed = [r for r in results if not r.success]
if failed:
print(f"[Pipeline] Stage {i+1} had {len(failed)} failures")
for f in failed:
print(f" FAILED: {f.task_name} - {f.error}")
return all_stage_results
# ─── Usage ────────────────────────────────────────────────────────────────────
async def demo_parallel_research():
"""Demo: research 8 companies in parallel, then analyze all in parallel."""
companies = [
"Anthropic", "OpenAI", "Google DeepMind", "Meta AI",
"Mistral AI", "Cohere", "AI21 Labs", "Stability AI"
]
executor = ParallelAgentExecutor(
max_concurrent=5, # max 5 simultaneous API calls
requests_per_minute=50
)
# Stage 1: Research all companies in parallel
research_tasks = [
{"name": f"research_{co}", "content": f"Research {co}'s AI products and recent news in 2 sentences."}
for co in companies
]
# Stage 2: Analysis (runs after research - uses research results)
# In this demo, Stage 2 tasks are independent of each other
analysis_tasks = [
{"name": f"analyze_{co}", "content": f"What makes {co} unique vs other AI labs? 2 sentences."}
for co in companies
]
# Multi-stage pipeline
all_results = await executor.execute_pipeline([research_tasks, analysis_tasks])
print("\n--- STAGE 1: RESEARCH ---")
for r in all_results[0]:
status = "OK" if r.success else f"FAIL: {r.error}"
print(f" {r.task_name}: {status} ({r.duration_ms}ms)")
print("\n--- STAGE 2: ANALYSIS ---")
for r in all_results[1]:
status = "OK" if r.success else f"FAIL: {r.error}"
print(f" {r.task_name}: {status}")
# Sample output from one research result
successful = [r for r in all_results[0] if r.success]
if successful:
print(f"\n--- SAMPLE OUTPUT ({successful[0].task_name}) ---")
print(successful[0].output)
if __name__ == "__main__":
asyncio.run(demo_parallel_research())
Error Handling in Parallel Execution
Parallel systems have unique failure modes:
# Partial failure - some tasks fail, others succeed
async def parallel_with_partial_failure(tasks: list[str]) -> dict:
results = await asyncio.gather(
*[research_async(t) for t in tasks],
return_exceptions=True # don't raise, return exceptions as results
)
return {
"successes": [r for r in results if isinstance(r, str)],
"failures": [str(r) for r in results if isinstance(r, Exception)],
"success_rate": sum(1 for r in results if isinstance(r, str)) / len(results)
}
# Timeout handling
async def with_timeout(coro, timeout_seconds: float = 30.0):
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except asyncio.TimeoutError:
return None # or raise, depending on whether task is required
Parallel Execution Patterns
Cost vs Speed Tradeoff
Parallelism is not free:
| Factor | Sequential | Parallel |
|---|---|---|
| Wall clock time | N × t | t (+ overhead) |
| Total token cost | Same | Same |
| API costs | Same | Same |
| Rate limit risk | Low | High |
| Debugging ease | High | Lower |
| Error propagation | Simple | Complex |
Parallelism reduces latency, not cost. You pay the same tokens either way. If you're optimizing for cost, parallelism doesn't help. If you're optimizing for speed, parallelism is transformative.
Production Notes
Semaphore sizing: Start with max_concurrent=5. Monitor API rate limit errors. Increase if you're under your rate limit, decrease if you're hitting it. For Anthropic's Claude, check your tier's requests-per-minute limit.
Timeout every parallel task: A single hanging task can block the fan-in phase indefinitely. Use asyncio.wait_for(coro, timeout=30) for every parallel call.
Test partial failure scenarios: What does your system do when 3 of 10 parallel tasks fail? Test this explicitly. The answer should not be "silently return 7 results and pretend everything worked."
Measure the actual speedup: Instrument parallel vs sequential execution for your specific workload. Theoretical 10x speedup often becomes 3-4x in practice due to rate limiting, cold starts, and network variance.
:::warning Rate Limit Explosions Running 50 agents in parallel can exhaust your API rate limit in seconds, causing cascading failures as all 50 calls fail simultaneously. Always use a semaphore or rate limiter. Never run unbounded parallel agent calls in production. :::
:::danger asyncio vs threading for Blocking Calls
Mixing synchronous blocking calls (like the sync Anthropic client) inside asyncio coroutines blocks the entire event loop. If you must use sync code inside async context, use loop.run_in_executor(). Never call sync_client.messages.create() directly inside an async def function - it will freeze all other coroutines.
:::
Interview Q&A
Q: How does asyncio.gather work for parallel agent execution?
A: asyncio.gather takes multiple coroutines and schedules them to run concurrently on the asyncio event loop. It returns a list of results in the same order as the input coroutines. If any coroutine raises an exception, gather by default cancels all remaining coroutines and re-raises the exception. With return_exceptions=True, it collects exceptions as results instead of raising them, enabling partial failure handling.
Q: What's the difference between asyncio.gather and asyncio.as_completed?
A: gather waits for all tasks to complete and returns results in input order. as_completed yields results as they finish in completion order (fastest first). Use gather when you need all results before proceeding. Use as_completed when you want to process results incrementally as they arrive - useful for showing progress or starting dependent work early.
Q: How do you prevent parallel agents from overwhelming API rate limits?
A: Two mechanisms: a semaphore limits concurrency (maximum N simultaneous calls at any time), and a rate limiter enforces a requests-per-minute cap. The semaphore prevents burst parallelism; the rate limiter prevents sustained overdraft. In production, set both: semaphore to your practical concurrency needs, rate limiter to 80% of your API tier's limit to leave headroom.
Q: When is parallelism not helpful for agents?
A: When tasks are sequentially dependent (each task needs the previous task's output). When the task is so fast that coordination overhead dominates (simple lookups). When API rate limits are the bottleneck regardless of concurrency. When debugging is critical - parallel execution is significantly harder to trace and debug than sequential.
Q: What is DAG-based task scheduling and why does it matter?
A: A DAG (Directed Acyclic Graph) represents tasks and their dependencies. DAG scheduling runs tasks as soon as their dependencies are complete - no earlier (correctness), no later (maximum parallelism). It's the most efficient general scheduling approach for pipelines with mixed dependencies. Without it, you either serialize everything (safe but slow) or parallelize everything (fast but incorrect when dependencies exist).
