Skip to main content

Orchestrator-Subagent Pattern

Reading time: ~40 minutes | Relevance: Critical for any production multi-agent system | Target roles: AI Engineer, ML Engineer, Systems Architect


The Scenario

Your engineering team is building an automated content generation system. The task: take a research topic, produce a researched, fact-checked, well-written article that's ready to publish.

You could build one agent that does everything. You've tried. The agent loses track of whether it's researching or writing or editing. Its research bleeds into its writing style. Its editing loses sight of factual accuracy. The output is mediocre - not because the model is incapable, but because the task is too cognitively distributed.

The orchestrator-subagent pattern solves this cleanly: one orchestrator decides what to do and who should do it. Subagents just execute their specialized task and return results. The orchestrator has the plan. Subagents have the skills.


:::tip 🎮 Interactive Playground Visualize this concept: Try the Multi-Agent Systems demo on the EngineersOfAI Playground - no code required. :::

Why This Exists

The orchestrator-subagent pattern maps to how organizations actually work. No one person in a company knows everything and does everything. There's a project manager (orchestrator) who coordinates specialists (subagents): designer, developer, QA, technical writer.

The insight transferred directly to multi-agent AI: separate the reasoning-about-what-to-do from the execution-of-what-to-do. The orchestrator reasons. Subagents execute.

This separation has practical benefits:

  • Subagents can be optimized independently: change the researcher without touching the writer
  • Failures are isolated: a subagent failure doesn't take down the whole system
  • Testing becomes possible: test each subagent on unit tasks before integrating
  • Cost control is explicit: the orchestrator decides how many subagent calls to make

Orchestrator Responsibilities

An orchestrator LLM call does the following - not all at once, but iteratively:

1. Task Decomposition

The orchestrator takes a complex task and breaks it into atomic subtasks that individual subagents can execute. Good decomposition means:

  • Each subtask has a clear, verifiable completion criterion
  • Dependencies between subtasks are explicit
  • Subtasks don't overlap (reducing redundant work)
# Orchestrator decomposing a content creation task
task = "Write a technical deep-dive on LLM quantization for ML engineers"

# Good decomposition:
subtasks = [
{"agent": "researcher", "task": "Find and summarize key quantization techniques: INT8, INT4, GPTQ, AWQ, GGUF"},
{"agent": "technical_writer", "task": "Explain each technique with code examples and trade-offs"},
{"agent": "critic", "task": "Review for technical accuracy, completeness, and clarity"},
{"agent": "publisher", "task": "Format as Markdown with proper headers, code blocks, and metadata"}
]

2. Agent Selection

The orchestrator decides which agent to call based on:

  • Capability match: which agent is best suited for this subtask
  • Availability: in parallel systems, which agents are idle
  • Prior performance: if agents have track records, route to the better performer
  • Load balancing: distribute work evenly among identical agents

3. Result Aggregation

Subagents return results. The orchestrator must combine them into a coherent whole:

  • Sequential aggregation: pass result of agent A into context for agent B
  • Parallel aggregation: collect all results, then synthesize
  • Selective aggregation: keep only the best result from multiple attempts

4. Replanning

When a subagent fails or its result is insufficient, the orchestrator decides what to do next:

  • Retry the same subtask with a different prompt
  • Use a different agent for the same subtask
  • Decompose the subtask further into smaller pieces
  • Skip the subtask if it's optional
  • Abort the pipeline if the subtask is critical

Subagent Design Principles

A well-designed subagent has three properties:

Single responsibility: One subagent does one thing well. A "research and writing" agent is two agents badly combined. Separate them.

Clear input/output contract: The subagent should specify exactly what it expects as input and what it will return. This is a schema, not a preference:

@dataclass
class ResearchInput:
topic: str
max_sources: int = 5
focus_areas: list[str] = field(default_factory=list)

@dataclass
class ResearchOutput:
topic: str
findings: list[str]
sources: list[str]
confidence: float # 0.0 to 1.0
gaps: list[str] # what the agent couldn't find

No knowledge of orchestrator: Subagents should be completely unaware of the system they're part of. They receive a task, they complete it, they return a result. This makes subagents reusable across different orchestrators and systems.


Task Assignment Strategies

How does the orchestrator decide which subagent gets which task?

Static Assignment

Fixed mapping from task type to agent. Simple, predictable.

AGENT_ROUTING = {
"research": research_agent,
"analysis": analysis_agent,
"writing": writing_agent,
"critique": critic_agent,
"formatting": publisher_agent
}

def assign_task(task_type: str, task_content: str) -> AgentResult:
agent_fn = AGENT_ROUTING.get(task_type)
if not agent_fn:
raise ValueError(f"No agent for task type: {task_type}")
return agent_fn(task_content)

Dynamic Assignment

The orchestrator LLM decides which agent to call based on the task content. More flexible, but requires trust in the orchestrator's routing decisions:

def dynamic_assign(task: str, available_agents: dict) -> str:
"""Let the orchestrator LLM decide which agent to use."""
agent_descriptions = "\n".join(
f"- {name}: {desc}" for name, desc in available_agents.items()
)
response = client.messages.create(
model=MODEL,
max_tokens=50,
system="You are routing tasks to specialist agents. Respond with just the agent name.",
messages=[{
"role": "user",
"content": f"Task: {task}\n\nAvailable agents:\n{agent_descriptions}\n\nWhich agent should handle this?"
}]
)
return response.content[0].text.strip()

Capability-Based Assignment

Agents declare their capabilities. The orchestrator matches task requirements to agent capabilities:

class AgentCapabilities:
def __init__(self, name: str, capabilities: list[str], max_input_tokens: int):
self.name = name
self.capabilities = capabilities
self.max_input_tokens = max_input_tokens

AGENTS = [
AgentCapabilities("research_agent", ["web_search", "fact_verification", "summarization"], 4096),
AgentCapabilities("code_agent", ["python", "javascript", "sql", "debugging"], 8192),
AgentCapabilities("analysis_agent", ["statistical_analysis", "trend_identification", "comparison"], 4096),
]

def find_best_agent(required_capability: str) -> AgentCapabilities:
matches = [a for a in AGENTS if required_capability in a.capabilities]
if not matches:
raise ValueError(f"No agent with capability: {required_capability}")
return matches[0] # Could rank by relevance, load, cost, etc.

Error Handling: Subagent Failure → Orchestrator Fallback

The orchestrator must handle subagent failures gracefully:


Orchestrator Prompt Design

The orchestrator's system prompt is the most important prompt in the system. It determines how the orchestrator:

  • Reads and understands the original task
  • Decides on decomposition
  • Selects agents
  • Responds to failures

A production orchestrator system prompt:

You are an orchestrator managing a multi-agent content creation pipeline.

Your job:
1. Decompose the user's request into subtasks
2. Assign each subtask to the appropriate specialist agent
3. Track results and handle failures
4. Aggregate outputs into a coherent final result

Available agents:
- researcher: Gathers facts, statistics, and sources on any topic
- writer: Drafts clear, engaging content from research inputs
- critic: Reviews content for accuracy, clarity, and completeness
- publisher: Formats content as structured Markdown

Rules:
- Always start with the researcher before the writer
- Always run the critic after the writer, before the publisher
- If the critic finds major issues, loop back to the writer with the critique
- Maximum 2 writer-critic cycles before accepting the result
- If any required agent fails twice, return partial results with clear explanation

Output format: JSON with keys: subtasks, agent_calls, aggregation_plan

State Passing: What Each Subagent Needs

Each subagent needs exactly the context required to do its job - no more, no less:

@dataclass
class PipelineState:
"""Full state of an orchestrator-subagent pipeline."""
original_task: str
research_output: Optional[str] = None
draft_output: Optional[str] = None
critique_output: Optional[str] = None
final_output: Optional[str] = None
iteration: int = 0
errors: list[str] = field(default_factory=list)

def build_writer_context(state: PipelineState) -> str:
"""Build exactly what the writer needs - research + task, nothing else."""
return (
f"Task: {state.original_task}\n\n"
f"Research findings:\n{state.research_output}\n\n"
+ (f"Previous critique (please address):\n{state.critique_output}\n\n"
if state.critique_output else "")
+ "Write a complete draft based on the above."
)

def build_critic_context(state: PipelineState) -> str:
"""Build exactly what the critic needs - task + draft, nothing else."""
return (
f"Original task: {state.original_task}\n\n"
f"Draft to review:\n{state.draft_output}\n\n"
"Review this draft for accuracy, completeness, and clarity. "
"Output a structured critique with specific issues and suggestions."
)

Full Python Code: Complete Orchestrator Pipeline

"""
orchestrator_pipeline.py

Production-grade orchestrator + 4 subagents for content generation.

Agents: Researcher → Writer → Critic → Publisher
Orchestrator: manages pipeline, handles failures, aggregates results
"""

import json
import time
from dataclasses import dataclass, field
from typing import Optional, Callable
import anthropic

client = anthropic.Anthropic()
MODEL = "claude-opus-4-5"
MAX_RETRIES = 2
MAX_CRITIC_CYCLES = 2


# ─── State ────────────────────────────────────────────────────────────────────

@dataclass
class SubagentCall:
agent_name: str
input_summary: str
output: str
success: bool
duration_ms: int
error: Optional[str] = None


@dataclass
class PipelineState:
task: str
research: Optional[str] = None
draft: Optional[str] = None
critique: Optional[str] = None
final: Optional[str] = None
calls: list[SubagentCall] = field(default_factory=list)
critic_cycles: int = 0
errors: list[str] = field(default_factory=list)


# ─── Subagents ────────────────────────────────────────────────────────────────

def call_agent(
agent_name: str,
system_prompt: str,
user_message: str,
max_tokens: int = 1500
) -> tuple[str, bool, Optional[str]]:
"""
Core agent call with error handling.
Returns (output, success, error_message).
"""
try:
response = client.messages.create(
model=MODEL,
max_tokens=max_tokens,
system=system_prompt,
messages=[{"role": "user", "content": user_message}]
)
return response.content[0].text, True, None
except Exception as e:
return "", False, str(e)


def researcher_agent(topic: str) -> tuple[str, bool, Optional[str]]:
return call_agent(
"ResearcherAgent",
system_prompt=(
"You are a research specialist. Given a topic, produce structured research findings: "
"key facts, statistics, definitions, major perspectives, and potential gaps. "
"Organize findings clearly. Note what you're uncertain about."
),
user_message=f"Research this topic for a technical article: {topic}",
max_tokens=1200
)


def writer_agent(task: str, research: str, prior_critique: Optional[str] = None) -> tuple[str, bool, Optional[str]]:
critique_context = f"\nAddressing previous critique:\n{prior_critique}\n" if prior_critique else ""
return call_agent(
"WriterAgent",
system_prompt=(
"You are a technical writer. Write clear, engaging, accurate content "
"for engineers. Use concrete examples. Avoid filler. Be specific."
),
user_message=(
f"Task: {task}\n\n"
f"Research:\n{research}\n"
f"{critique_context}\n"
"Write a complete, polished draft."
),
max_tokens=2000
)


def critic_agent(task: str, draft: str) -> tuple[str, bool, Optional[str]]:
return call_agent(
"CriticAgent",
system_prompt=(
"You are a technical editor and critic. Review drafts for: "
"(1) factual accuracy, (2) technical completeness, (3) clarity, "
"(4) logical flow, (5) missing important points. "
"Be specific. Output: VERDICT (APPROVE/REVISE) + specific issues list."
),
user_message=(
f"Original task: {task}\n\nDraft:\n{draft}\n\n"
"Review and output: VERDICT: APPROVE or VERDICT: REVISE, then issues."
),
max_tokens=800
)


def publisher_agent(task: str, final_draft: str) -> tuple[str, bool, Optional[str]]:
return call_agent(
"PublisherAgent",
system_prompt=(
"You are a content publisher. Format content as clean, structured Markdown: "
"proper headers (##, ###), code blocks with language tags, "
"bullet points, bold for key terms. Add a 1-sentence TL;DR at the top."
),
user_message=(
f"Format this content for publication:\n\nTask: {task}\n\nContent:\n{final_draft}"
),
max_tokens=2500
)


# ─── Orchestrator ─────────────────────────────────────────────────────────────

class ContentOrchestrator:
"""
Orchestrates a content pipeline: Researcher → Writer ↔ Critic → Publisher.
Handles retries, critic cycles, and partial failures.
"""

def __init__(self, task: str):
self.state = PipelineState(task=task)

def _run_with_retry(
self,
agent_name: str,
agent_fn: Callable,
*args,
**kwargs
) -> Optional[str]:
"""Run an agent with retry logic. Returns output or None on failure."""
for attempt in range(MAX_RETRIES):
start = time.time()
output, success, error = agent_fn(*args, **kwargs)
duration_ms = int((time.time() - start) * 1000)

call = SubagentCall(
agent_name=agent_name,
input_summary=str(args[0])[:100],
output=output[:200] if output else "",
success=success,
duration_ms=duration_ms,
error=error
)
self.state.calls.append(call)

if success:
print(f" [{agent_name}] OK in {duration_ms}ms")
return output

print(f" [{agent_name}] Attempt {attempt+1} failed: {error}")
if attempt < MAX_RETRIES - 1:
time.sleep(2 ** attempt) # exponential backoff

self.state.errors.append(f"{agent_name} failed after {MAX_RETRIES} attempts")
return None

def run(self) -> dict:
task = self.state.task
print(f"\n[Orchestrator] Starting pipeline: {task[:80]}...")

# ── Step 1: Research ──────────────────────────────────────────
print("\n[Orchestrator] Step 1/4: Research")
research = self._run_with_retry("ResearcherAgent", researcher_agent, task)
if not research:
return self._fail("Research failed - cannot continue")
self.state.research = research

# ── Step 2: Write (with critic loop) ─────────────────────────
prior_critique = None
approved = False

while self.state.critic_cycles < MAX_CRITIC_CYCLES and not approved:
print(f"\n[Orchestrator] Step 2/4: Write (cycle {self.state.critic_cycles + 1})")
draft = self._run_with_retry(
"WriterAgent", writer_agent,
task, research, prior_critique
)
if not draft:
return self._fail("Writing failed - cannot continue")
self.state.draft = draft

# ── Step 3: Critique ──────────────────────────────────────
print(f"\n[Orchestrator] Step 3/4: Critique (cycle {self.state.critic_cycles + 1})")
critique = self._run_with_retry("CriticAgent", critic_agent, task, draft)
if not critique:
print(" [Orchestrator] Critique failed - accepting draft as-is")
approved = True
break

self.state.critique = critique
self.state.critic_cycles += 1

if "VERDICT: APPROVE" in critique or "APPROVE" in critique[:50]:
print(" [Orchestrator] Critic approved draft")
approved = True
else:
print(f" [Orchestrator] Critic requested revisions (cycle {self.state.critic_cycles})")
prior_critique = critique

# ── Step 4: Publish ───────────────────────────────────────────
print("\n[Orchestrator] Step 4/4: Publish")
final = self._run_with_retry("PublisherAgent", publisher_agent, task, self.state.draft)
if not final:
print(" [Orchestrator] Publishing failed - returning raw draft")
self.state.final = self.state.draft
else:
self.state.final = final

return self._success()

def _fail(self, reason: str) -> dict:
print(f"\n[Orchestrator] PIPELINE FAILED: {reason}")
return {
"success": False,
"reason": reason,
"partial_state": {
"research": bool(self.state.research),
"draft": bool(self.state.draft),
},
"errors": self.state.errors,
"calls_made": len(self.state.calls)
}

def _success(self) -> dict:
total_duration = sum(c.duration_ms for c in self.state.calls)
print(f"\n[Orchestrator] Pipeline complete in {total_duration}ms")
return {
"success": True,
"final_output": self.state.final,
"critic_cycles": self.state.critic_cycles,
"calls_made": len(self.state.calls),
"total_duration_ms": total_duration,
"errors": self.state.errors
}

def get_trace(self) -> list[dict]:
return [
{
"agent": c.agent_name,
"input": c.input_summary,
"output_preview": c.output,
"success": c.success,
"duration_ms": c.duration_ms,
"error": c.error
}
for c in self.state.calls
]


# ─── Usage ────────────────────────────────────────────────────────────────────

def main():
task = "Write a technical explainer on vector quantization for ML engineers building RAG systems"

orchestrator = ContentOrchestrator(task)
result = orchestrator.run()

if result["success"]:
print("\n" + "="*60)
print("FINAL PUBLISHED OUTPUT:")
print("="*60)
print(result["final_output"])
print(f"\nStats: {result['calls_made']} agent calls, "
f"{result['critic_cycles']} critic cycles, "
f"{result['total_duration_ms']}ms total")
else:
print(f"\nPipeline failed: {result['reason']}")
print(f"Errors: {result['errors']}")

print("\n[Execution Trace]")
for call in orchestrator.get_trace():
status = "OK" if call["success"] else f"FAIL: {call['error']}"
print(f" {call['agent']}: {call['duration_ms']}ms - {status}")


if __name__ == "__main__":
main()

Orchestrator-Subagent Interaction Diagram


Dynamic Orchestration: Adding/Removing Agents at Runtime

Production orchestrators often need to adapt their agent roster based on task needs. A document processing system might need an OCR agent for image-heavy documents but not for plain text. A research pipeline might spawn additional specialized agents when it encounters an unexpected domain.

class DynamicOrchestrator:
"""Orchestrator that can register/unregister agents at runtime."""

def __init__(self):
self._registry: dict[str, Callable] = {}

def register(self, name: str, fn: Callable, capabilities: list[str]):
self._registry[name] = {"fn": fn, "capabilities": capabilities}
print(f"[Registry] Registered: {name} ({capabilities})")

def unregister(self, name: str):
self._registry.pop(name, None)
print(f"[Registry] Unregistered: {name}")

def find_agent(self, capability: str) -> Optional[str]:
for name, meta in self._registry.items():
if capability in meta["capabilities"]:
return name
return None

def call(self, capability: str, *args, **kwargs) -> Optional[str]:
agent_name = self.find_agent(capability)
if not agent_name:
raise ValueError(f"No agent with capability: {capability}")
fn = self._registry[agent_name]["fn"]
return fn(*args, **kwargs)

Production Notes

Idempotency: Design each subagent call to be safe to retry. If the writer is called twice with the same input, the second call should produce an equivalent result. Avoid side effects in subagents.

Result validation: Don't trust subagent output blindly. Validate that the output meets the expected format and minimum length before passing it downstream. A critic agent that returns "OK" when it expected a 500-word critique is a silent failure.

Orchestrator context management: The orchestrator's context grows as it accumulates subagent results. On a long pipeline, the orchestrator may hit its own context limit. Summarize intermediate results rather than passing full text forward when possible.

Observability: Log every orchestrator decision (why it chose agent X over Y, why it decided to retry) not just the calls themselves. The orchestrator's reasoning is the hardest thing to debug after the fact.


:::warning Orchestrator Prompt Fragility The orchestrator's behavior is entirely determined by its system prompt. Small changes to the orchestrator prompt can dramatically change routing decisions, retry behavior, and result aggregation. Treat the orchestrator prompt as critical production code: version it, test it, and review changes carefully. :::

:::danger Single Point of Failure The orchestrator is a single point of failure for the entire pipeline. If the orchestrator LLM call fails, the entire pipeline fails. In production, run the orchestrator with aggressive retry logic, circuit breakers, and fallback to simpler (non-orchestrated) approaches when orchestrator reliability drops. :::


Interview Q&A

Q: What are the orchestrator's core responsibilities?

A: Task decomposition (breaking complex tasks into atomic subtasks), agent selection (routing subtasks to appropriate specialists), result aggregation (combining subagent outputs into a coherent whole), and replanning (handling failures by retrying, rerouting, or gracefully degrading). The orchestrator is essentially a dynamic planning system that coordinates execution without executing tasks itself.

Q: How do you design subagents for maximum reusability?

A: Subagents should have single responsibility, clear input/output contracts (typed schemas, not just free text), and zero knowledge of the system they're part of. A researcher agent that knows it's being orchestrated by a content pipeline is less reusable than a researcher agent that just takes a topic and returns structured findings - which can be used in content pipelines, analysis pipelines, or standalone.

Q: How do you handle the case where a subagent consistently returns low-quality output?

A: First, validate output quality programmatically (length checks, format validation, required fields). If output fails validation, retry with a modified prompt. If quality is consistently low, add a validation step before accepting output - either a lightweight heuristic check or a separate validator agent. Track per-agent quality metrics over time to catch degrading agents early.

Q: What's the right way to pass state between subagents?

A: Pass exactly what each subagent needs - no more. A writer agent doesn't need the raw research; it needs the distilled insights. A critic doesn't need the original research; it needs the task specification and the draft. Over-sharing context wastes tokens and can confuse agents with irrelevant information. Under-sharing causes agents to work without necessary context. Map out the dependency graph explicitly.

Q: When does the orchestrator-subagent pattern fail?

A: It fails when tasks are genuinely non-decomposable (some tasks require continuous awareness of everything, not sequential processing), when the coordination overhead exceeds the value of specialization (simple tasks with complex orchestration overhead), when orchestrator context grows so large it becomes incoherent (long pipelines with large intermediate results), and when error propagation cascades - a bad research result makes the writer produce a bad draft, which makes the critic produce a confused critique.

© 2026 EngineersOfAI. All rights reserved.