Task Decomposition
The Moment an Agent Hits a Wallā
Picture this: your agent receives a message at 9:02 AM.
"Build me a REST API for a todo app. It needs JWT authentication, full CRUD for todos, PostgreSQL storage, unit tests with at least 80% coverage, a Dockerfile, and deployment instructions. Follow PEP 8 and make sure the tests run in CI."
A single LLM call returns a plausible-looking wall of code. Some of it is correct. Some of it is hallucinated. The test file references functions that do not exist. The Dockerfile points to a Python version that is not installed. The JWT implementation has a subtle flaw that will cause tokens to never expire.
This is not a failure of intelligence. It is a failure of architecture. A single-shot generation cannot reliably produce a working multi-component system because:
- Context pressure: generating 2,000 lines of code in one shot leaves no room for reasoning
- Error accumulation: a mistake in the data model propagates through every layer
- No verification: the model cannot test its own output mid-generation
- Flat reasoning: the model cannot revisit earlier decisions as it learns new constraints
The fix is not a better prompt. The fix is task decomposition - breaking the goal into a dependency-ordered graph of manageable subtasks that can be executed, verified, and revised independently.
:::tip š® Interactive Playground Visualize this concept: Try the Agent Planning demo on the EngineersOfAI Playground - no code required. :::
Why Single-Shot LLM Calls Fail for Complex Tasksā
Let's be precise about the failure modes.
Context Window Pressureā
Modern LLMs have large context windows (128k to 1M tokens), but there is a subtler problem: reasoning quality degrades as context fills. Empirical studies show that LLMs lose precision on tasks when the relevant information is buried more than ~32k tokens from the end of the context.
When you ask for a complex system in a single call:
- The model must simultaneously reason about all constraints
- Earlier decisions cannot be verified against later requirements
- There is no room for intermediate working state
Error Accumulationā
In a flat, single-shot generation:
Step 1: Design the data model (may have subtle errors)
Step 2: Write endpoints based on that model (inherits errors)
Step 3: Write tests based on those endpoints (tests wrong behavior)
Step 4: Write Dockerfile based on that stack (wrong dependencies)
Each step inherits and compounds the errors of previous steps. By step 4, you may be testing a Docker container that packages the wrong code.
With decomposition, you can verify step 1 before proceeding to step 2.
Reasoning Depth Limitsā
LLMs perform better on tasks that fit in a "reasoning window" - roughly 5ā10 logical steps where they can maintain coherence. Beyond that, plans become inconsistent. Breaking a task into subtasks, each of which fits comfortably in the reasoning window, is how you get reliable behavior.
Hierarchical Decompositionā
The core idea: a complex task is a tree. The root is the goal. The leaves are atomic actions that a tool can execute.
Note that this is not just a list - it is a tree with implied dependencies. You cannot write auth endpoint tests (E2) before you have written the auth endpoints (C3). The order matters.
Decomposition Depth: How Deep Is Deep Enough?ā
A practical rule: decompose until each leaf task can be completed with 1ā3 tool calls. When a task requires a sequence of more than 3 tool calls, it should be decomposed further.
Too shallow: tasks are too complex to execute reliably Too deep: overhead of managing the graph exceeds the benefit Right: each leaf is one "unit of verifiable work"
Decomposition Strategiesā
Different task structures call for different decomposition patterns.
Sequential Decompositionā
Tasks with strict ordering: A must complete before B can start.
Task A ā Task B ā Task C ā Result
Use when: data flows from one task to the next, or tasks have hard prerequisites.
Parallel Decompositionā
Independent tasks that can run simultaneously.
āāā Task A āāā
Goal ā āāā¼āā Task B āāā¼āā Merge ā Result
āāā Task C āāā
Use when: tasks are genuinely independent. A common optimization opportunity - many agents execute tasks sequentially that could be parallelized.
Conditional Decompositionā
Branch based on the result of an earlier task.
Task A ā if success ā Task B
ā if failure ā Task C (recovery)
Use when: the path forward depends on what you discover during execution.
Iterative Decompositionā
Repeat a task until a condition is met.
Task A ā evaluate ā condition met? ā yes ā done
ā no ā Task A (again)
Use when: quality improvement, retry with different parameters, or convergence-based tasks.
Dependency Graphs: DAG Representationā
A task decomposition is a Directed Acyclic Graph (DAG):
- Nodes are tasks
- Edges represent "must complete before" relationships
- No cycles (you cannot have A depends on B depends on A)
A key insight: the DAG defines the minimum number of sequential steps needed. Tasks without dependencies on each other can run in parallel, reducing total execution time.
The critical path of the DAG - the longest path from start to finish - determines the minimum total execution time regardless of parallelization.
Task Representationā
Every task in the decomposition needs a structured representation:
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
import uuid
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
class Task(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
title: str
description: str
status: TaskStatus = TaskStatus.PENDING
dependencies: list[str] = Field(default_factory=list) # task IDs
result: Optional[str] = None
error: Optional[str] = None
subtasks: list["Task"] = Field(default_factory=list)
metadata: dict = Field(default_factory=dict)
def is_ready(self, completed_ids: set[str]) -> bool:
"""True if all dependencies are completed."""
return all(dep in completed_ids for dep in self.dependencies)
def is_leaf(self) -> bool:
"""True if this task has no subtasks (is executable)."""
return len(self.subtasks) == 0
Full Implementation: Task Decomposerā
Here is a complete task decomposer that takes a natural language goal and produces an executable dependency graph:
"""
task_decomposer.py
Complete hierarchical task decomposition system.
Requirements:
pip install openai pydantic
"""
from __future__ import annotations
import json
import textwrap
from typing import Optional
from collections import deque
from enum import Enum
from pydantic import BaseModel, Field
from openai import OpenAI
import uuid
client = OpenAI() # uses OPENAI_API_KEY from environment
# āāā Data Models āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
class TaskNode(BaseModel):
"""A single node in the task dependency graph."""
id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
title: str
description: str
tool_hint: Optional[str] = None # which tool should execute this
estimated_steps: int = 1 # expected number of tool calls
status: TaskStatus = TaskStatus.PENDING
dependencies: list[str] = Field(default_factory=list)
result: Optional[str] = None
error: Optional[str] = None
metadata: dict = Field(default_factory=dict)
def is_ready(self, completed_ids: set[str]) -> bool:
return all(dep in completed_ids for dep in self.dependencies)
def mark_complete(self, result: str) -> None:
self.status = TaskStatus.COMPLETED
self.result = result
def mark_failed(self, error: str) -> None:
self.status = TaskStatus.FAILED
self.error = error
class TaskGraph(BaseModel):
"""DAG of tasks with dependency tracking."""
goal: str
tasks: dict[str, TaskNode] = Field(default_factory=dict)
root_task_ids: list[str] = Field(default_factory=list)
def add_task(self, task: TaskNode) -> None:
self.tasks[task.id] = task
def get_ready_tasks(self) -> list[TaskNode]:
"""Return all tasks that can be executed now (dependencies met)."""
completed = {
tid for tid, t in self.tasks.items()
if t.status == TaskStatus.COMPLETED
}
return [
t for t in self.tasks.values()
if t.status == TaskStatus.PENDING and t.is_ready(completed)
]
def get_critical_path(self) -> list[str]:
"""Longest path through the DAG (topological order)."""
# Simple longest-path via dynamic programming
dist: dict[str, int] = {}
pred: dict[str, Optional[str]] = {}
# Topological sort
in_degree = {tid: 0 for tid in self.tasks}
children: dict[str, list[str]] = {tid: [] for tid in self.tasks}
for tid, task in self.tasks.items():
for dep in task.dependencies:
in_degree[tid] += 1
children[dep].append(tid)
queue = deque([tid for tid, d in in_degree.items() if d == 0])
topo: list[str] = []
while queue:
tid = queue.popleft()
topo.append(tid)
for child in children[tid]:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
for tid in topo:
task = self.tasks[tid]
if not task.dependencies:
dist[tid] = task.estimated_steps
pred[tid] = None
else:
best_dep = max(task.dependencies, key=lambda d: dist.get(d, 0))
dist[tid] = dist.get(best_dep, 0) + task.estimated_steps
pred[tid] = best_dep
# Trace back from the node with max distance
if not dist:
return []
end = max(dist, key=dist.__getitem__)
path = []
current: Optional[str] = end
while current is not None:
path.append(current)
current = pred.get(current)
path.reverse()
return path
def is_complete(self) -> bool:
return all(
t.status in (TaskStatus.COMPLETED, TaskStatus.FAILED)
for t in self.tasks.values()
)
def summary(self) -> str:
lines = [f"Task Graph: {self.goal}", "ā" * 50]
completed = sum(1 for t in self.tasks.values() if t.status == TaskStatus.COMPLETED)
failed = sum(1 for t in self.tasks.values() if t.status == TaskStatus.FAILED)
pending = sum(1 for t in self.tasks.values() if t.status == TaskStatus.PENDING)
lines.append(f"Total: {len(self.tasks)} | Done: {completed} | Failed: {failed} | Pending: {pending}")
lines.append("")
for task in self.tasks.values():
icon = {"pending": "ā³", "running": "š", "completed": "ā
", "failed": "ā", "blocked": "š«"}[task.status]
dep_str = f" (needs: {', '.join(task.dependencies)})" if task.dependencies else ""
lines.append(f" {icon} [{task.id}] {task.title}{dep_str}")
return "\n".join(lines)
# āāā LLM-Based Decomposer āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
DECOMPOSITION_SYSTEM_PROMPT = textwrap.dedent("""
You are an expert task decomposition system. Your job is to take a complex goal
and break it into a structured, dependency-ordered list of executable tasks.
Rules:
1. Each task should require 1ā3 tool calls to complete
2. Identify genuine dependencies - don't force sequential ordering if tasks are independent
3. Be specific: each task title should be actionable and clear
4. Assign a tool_hint from: [write_file, run_command, search_web, call_api, analyze_text, ask_human]
5. Estimate steps: how many tool calls will this task need? (1ā5)
6. Parallel opportunities: tasks without dependencies can run simultaneously
Output JSON in this exact schema:
{
"tasks": [
{
"id": "t1",
"title": "Short actionable title",
"description": "Detailed description of what to do and why",
"tool_hint": "write_file",
"estimated_steps": 2,
"dependencies": []
},
{
"id": "t2",
"title": "Next task title",
"description": "...",
"tool_hint": "run_command",
"estimated_steps": 1,
"dependencies": ["t1"]
}
]
}
""").strip()
class TaskDecomposer:
"""
Decomposes a natural language goal into an executable dependency graph.
Supports hierarchical decomposition and dynamic replanning.
"""
def __init__(self, model: str = "gpt-4o"):
self.model = model
self.client = OpenAI()
def decompose(self, goal: str, context: str = "") -> TaskGraph:
"""
Main entry point. Takes a goal string, returns a TaskGraph.
"""
print(f"\nšÆ Decomposing goal: {goal[:80]}...")
user_message = f"Goal: {goal}"
if context:
user_message += f"\n\nAdditional context:\n{context}"
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": DECOMPOSITION_SYSTEM_PROMPT},
{"role": "user", "content": user_message},
],
response_format={"type": "json_object"},
temperature=0.2,
)
raw = response.choices[0].message.content
data = json.loads(raw)
graph = TaskGraph(goal=goal)
for task_data in data["tasks"]:
task = TaskNode(**task_data)
graph.add_task(task)
# root tasks = those with no dependencies
graph.root_task_ids = [
tid for tid, t in graph.tasks.items()
if not t.dependencies
]
return graph
def decompose_hierarchically(
self,
goal: str,
max_depth: int = 2,
estimated_steps_threshold: int = 3,
) -> TaskGraph:
"""
Decompose, then recursively decompose tasks that are too complex.
Tasks with estimated_steps > threshold get broken down further.
"""
top_level_graph = self.decompose(goal)
for task_id, task in list(top_level_graph.tasks.items()):
if task.estimated_steps > estimated_steps_threshold and max_depth > 0:
print(f" ā Task '{task.title}' is complex ({task.estimated_steps} steps), decomposing further...")
sub_graph = self.decompose(
goal=task.title,
context=f"Parent goal: {goal}\nThis is a subtask of: {task.title}\n{task.description}",
)
# Prefix subtask IDs to avoid collision
for sub_id, sub_task in list(sub_graph.tasks.items()):
new_id = f"{task_id}_{sub_id}"
sub_task.id = new_id
# Remap dependencies within this sub-graph
sub_task.dependencies = [
f"{task_id}_{dep}" if dep in sub_graph.tasks else dep
for dep in sub_task.dependencies
]
# Sub-tasks inherit the parent's dependencies at the root level
if not sub_task.dependencies or all(
dep not in sub_graph.tasks for dep in sub_task.dependencies
):
sub_task.dependencies = list(task.dependencies)
top_level_graph.add_task(sub_task)
# Remove the original coarse task
del top_level_graph.tasks[task_id]
return top_level_graph
def replan(
self,
original_graph: TaskGraph,
failed_task: TaskNode,
error_message: str,
) -> TaskGraph:
"""
Generate a revised plan when a task has failed.
Returns a new graph for the remaining work.
"""
completed = [
t for t in original_graph.tasks.values()
if t.status == TaskStatus.COMPLETED
]
completed_summary = "\n".join(
f"- {t.title}: {t.result[:100] if t.result else 'done'}"
for t in completed
)
replan_goal = f"""
Original goal: {original_graph.goal}
Completed steps:
{completed_summary}
FAILED step: {failed_task.title}
Error: {error_message}
Generate a revised plan for the remaining work, starting from after the failure.
Include any recovery steps needed to handle the failure before proceeding.
"""
return self.decompose(replan_goal)
# āāā Execution Engine (simplified) āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
class MockExecutor:
"""
Simulates executing tasks. Replace with real tool calls in production.
"""
def execute(self, task: TaskNode) -> str:
"""Execute a task and return the result."""
print(f"\n Executing: [{task.id}] {task.title}")
print(f" Tool: {task.tool_hint} | Est. steps: {task.estimated_steps}")
# In production: call real tools based on task.tool_hint
return f"Completed: {task.title}"
class TaskOrchestrator:
"""
Orchestrates execution of a task graph, handling parallelism and failures.
"""
def __init__(self, executor: MockExecutor, decomposer: TaskDecomposer):
self.executor = executor
self.decomposer = decomposer
def run(self, graph: TaskGraph, max_retries: int = 2) -> TaskGraph:
"""
Execute tasks in dependency order.
Retries failed tasks and replans if needed.
"""
print(f"\n{'='*60}")
print(f"Starting execution: {graph.goal}")
print(f"Total tasks: {len(graph.tasks)}")
print(f"{'='*60}")
retry_counts: dict[str, int] = {}
while not graph.is_complete():
ready = graph.get_ready_tasks()
if not ready:
# Check if we're deadlocked (remaining tasks blocked by failures)
pending = [t for t in graph.tasks.values() if t.status == TaskStatus.PENDING]
if pending:
print("\nā ļø Deadlock detected - pending tasks have failed dependencies")
break
break
# In production, ready tasks could be executed in parallel (asyncio.gather)
for task in ready:
task.status = TaskStatus.RUNNING
try:
result = self.executor.execute(task)
task.mark_complete(result)
print(f" ā
{task.title}")
except Exception as e:
error = str(e)
retry_counts[task.id] = retry_counts.get(task.id, 0) + 1
if retry_counts[task.id] <= max_retries:
print(f" ā ļø Failed ({retry_counts[task.id]}/{max_retries}), retrying: {task.title}")
task.status = TaskStatus.PENDING
else:
print(f" ā Permanently failed: {task.title} - {error}")
task.mark_failed(error)
# Attempt replanning
print("\nš Attempting to replan around failure...")
new_graph = self.decomposer.replan(graph, task, error)
# Merge new graph into current (simplified)
for new_task in new_graph.tasks.values():
if new_task.id not in graph.tasks:
graph.add_task(new_task)
print(f"\n{graph.summary()}")
return graph
# āāā Demo āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
def demo_decomposition():
"""
Demonstrates task decomposition on a realistic engineering goal.
"""
decomposer = TaskDecomposer(model="gpt-4o")
executor = MockExecutor()
orchestrator = TaskOrchestrator(executor, decomposer)
goal = (
"Build a REST API for a todo application with JWT authentication, "
"PostgreSQL storage, full CRUD operations, unit tests with 80% coverage, "
"a Dockerfile, and a README with setup instructions."
)
# Step 1: Top-level decomposition
graph = decomposer.decompose(goal)
print("\nš Initial decomposition:")
print(graph.summary())
# Step 2: Show critical path
critical = graph.get_critical_path()
print(f"\nš“ Critical path ({len(critical)} tasks):")
for tid in critical:
task = graph.tasks.get(tid)
if task:
print(f" ā {task.title}")
# Step 3: Show parallel opportunities
ready = graph.get_ready_tasks()
print(f"\nā” Tasks ready to run in parallel now: {len(ready)}")
for t in ready:
print(f" ⢠{t.title}")
# Step 4: Execute
result_graph = orchestrator.run(graph)
return result_graph
def demo_hierarchical():
"""Shows hierarchical (multi-level) decomposition."""
decomposer = TaskDecomposer(model="gpt-4o")
goal = (
"Set up a complete MLOps pipeline: model training with hyperparameter "
"tuning, experiment tracking with MLflow, model registry, automated "
"testing, Docker packaging, and a FastAPI serving endpoint."
)
print("\nš Hierarchical decomposition:")
graph = decomposer.decompose_hierarchically(goal, max_depth=1, estimated_steps_threshold=3)
print(graph.summary())
if __name__ == "__main__":
demo_decomposition()
# demo_hierarchical() # Uncomment for the MLOps example
Dynamic Decomposition: Replanning as You Learnā
A plan made at the start is always incomplete. Real tasks reveal surprises:
- "The database schema needs an extra column we didn't anticipate"
- "The API we're calling has different rate limits than documented"
- "The test environment is missing a dependency"
A good decomposer handles this by replanning when new information changes the task structure.
Plan-Then-Execute vs. Interleaved Planningā
Two architectural choices for when planning happens:
Plan-then-execute: Generate the full task graph upfront, then execute it. Pros: the planner sees the whole picture; tasks can be optimally ordered. Cons: the plan may be wrong before you start; you only discover this during execution.
Interleaved planning: Decide the next 1ā3 tasks at each step, using accumulated results to inform the next planning call. Pros: each planning call has more context; adapts to surprises. Cons: more LLM calls; the planner may not see far enough ahead to optimize ordering.
Recommendation: start with plan-then-execute for well-understood task types; use interleaved planning for exploratory tasks where requirements are unclear.
Production Notesā
:::warning Decomposition Quality Depends on Goal Clarity Ambiguous goals produce bad decompositions. "Make it better" decomposes into vague tasks. Before decomposing, run an ambiguity check (see Lesson 04) and ensure the goal is specific, measurable, and bounded. :::
:::danger Circular Dependencies A DAG must not have cycles. If your LLM-generated decomposition contains a cycle (Task A depends on Task B which depends on Task A), execution will deadlock. Always validate the graph for cycles before execution. Use a topological sort - if it fails, there's a cycle. :::
Parallelism caution: parallel execution is faster but increases complexity. If parallel tasks share resources (same file, same database table), you need locking. Start sequential and add parallelism only when you have profiled and found it necessary.
Decomposition caching: the same goal decomposed twice should produce the same graph. Cache decompositions keyed on a hash of the goal + context. This saves LLM calls and ensures reproducible behavior.
Interview Questions and Answersā
Q: Why is task decomposition better than a single long LLM generation for complex tasks?
A: Three reasons. First, single generations suffer from context pressure - the model cannot hold all constraints in focus over thousands of tokens, leading to inconsistencies. Second, errors accumulate: a mistake in step 3 propagates through every downstream step with no opportunity to catch it. Third, long generations have no verification points - you can only check the final output, not intermediate states. Decomposition creates checkpoints where you can verify each step before proceeding, dramatically reducing compounded errors.
Q: How do you represent task dependencies in code?
A: As a DAG - directed acyclic graph. Each task has a list of dependency task IDs. A task is "ready" when all its dependencies are in the COMPLETED state. The full graph can be serialized as a dictionary of task objects. I use Pydantic for validation and topological sort (Kahn's algorithm) for execution ordering. I check for cycles at construction time by verifying the topological sort completes without stuck nodes.
Q: What's the difference between sequential, parallel, and conditional decomposition?
A: Sequential is when task B cannot start until A completes because it needs A's output. Parallel is when tasks are genuinely independent - they can run simultaneously. Conditional is when the path through the graph depends on runtime results (if the API returns X, do task A; if it returns Y, do task B). In practice most task graphs mix all three. Sequential is the safest default; parallel requires careful handling of shared resources; conditional requires the planner to anticipate branching outcomes upfront or to defer those decisions to the executor.
Q: How do you handle the case where your initial decomposition is wrong?
A: With replanning. When a task fails or returns unexpected results, I trigger a replanning call that provides the LLM with the completed work so far, the failure details, and asks for a revised plan for the remaining work. The key is to preserve what has already been completed and only replan forward from the point of failure. I track "completed task summaries" as compact context that does not inflate the replanning prompt.
Q: What is the critical path of a task graph and why does it matter?
A: The critical path is the longest dependency chain through the DAG - the sequence of tasks that determines the minimum possible execution time regardless of how much you parallelize everything else. It matters because: (1) optimizing tasks on the critical path has the highest leverage on total execution time; (2) any failure on the critical path blocks the entire remaining execution; (3) tasks off the critical path can be parallelized without affecting the deadline. I compute it with dynamic programming on the topological sort of the DAG.
Q: How do you decide when to stop decomposing?
A: I use the "1ā3 tool calls" heuristic: if a task can be completed with at most 3 tool calls, it's a leaf. If it needs more, decompose it further. More formally, I estimate the number of steps (estimated_steps field) and set a threshold - any task above the threshold gets recursively decomposed. The other signal is coherence: if a task requires coordinating multiple concerns simultaneously (like "implement auth and write its tests"), split it so each concern is separate and can be verified independently.
