LangGraph
The Control Problem
Every multi-agent framework makes a tradeoff. AutoGen gives you natural conversation but limited control over exact execution flow. CrewAI gives you clean role abstractions but limited control over conditional logic and state.
What happens when you need both? When the workflow has complex conditional branches, when you need to checkpoint state and resume from the middle, when human approval is required at specific steps, when you need to debug exactly which path was taken and why?
LangGraph is the answer. It models agent workflows as directed graphs: nodes are computational steps (LLM calls, tool executions, data transformations), edges are transitions between steps. Conditional edges implement branching logic. The graph state is a typed Python object that accumulates information as the workflow progresses.
The result is the most powerful and most complex agent framework available. LangGraph is what teams reach for when the simpler frameworks cannot express the control flow they need.
As of early 2025, LangGraph powers production agent systems at Replit, LinkedIn, Uber, and dozens of other enterprises. It is the framework behind most of LangChain's enterprise use cases.
:::tip 🎮 Interactive Playground Visualize this concept: Try the LangGraph Stateful Agents demo on the EngineersOfAI Playground - no code required. :::
Why Graph-Based Matters
The key insight behind LangGraph: explicit state transitions, not implicit conversation flow.
In AutoGen and CrewAI, the execution path is somewhat implicit. It emerges from conversation dynamics (AutoGen) or task ordering (CrewAI). This is fine for simple workflows but creates problems for complex ones:
- Debugging: "Which path did the agent take and why?" is hard to answer
- Human approval: Pausing for human input mid-workflow requires awkward workarounds
- Retry logic: "If step 3 fails, go back to step 2" is not expressible cleanly
- Parallel execution: Running multiple branches simultaneously requires custom code
- State inspection: Examining intermediate state requires parsing conversation history
Graph-based execution solves all of these:
- Every node execution and edge transition is logged explicitly
- Human approval is a node in the graph - the graph pauses until input arrives
- Retry logic is a conditional edge that routes back to an earlier node
- Parallel branches are nodes with fan-out edges
- State is a typed Python object, inspectable at any point
Core Concepts
StateGraph
The primary abstraction. A StateGraph defines:
- The state schema: a TypedDict or Pydantic model defining all data the graph tracks
- Nodes: functions that take state and return updated state
- Edges: connections between nodes
- Conditional edges: functions that look at state and decide which node to go to next
from typing import TypedDict, Annotated, list
from langgraph.graph import StateGraph, END
import operator
# State schema: all data the graph accumulates
class ResearchState(TypedDict):
# Basic fields
topic: str
research_notes: str
draft: str
critique: str
revision_count: int
final_output: str
approved: bool
# Lists that grow with each addition (using operator.add reducer)
messages: Annotated[list[str], operator.add]
sources: Annotated[list[str], operator.add]
Nodes
Nodes are Python functions (or classes) that take state and return updates:
import anthropic
client = anthropic.Anthropic()
def researcher_node(state: ResearchState) -> dict:
"""Research node: gather information about the topic."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1000,
system=(
"You are a research analyst. Provide structured research notes "
"with specific facts, data points, and sources. "
"End your notes with a numbered list of sources."
),
messages=[{
"role": "user",
"content": f"Research the following topic: {state['topic']}"
}],
)
notes = response.content[0].text
# Return only the fields to update (not the full state)
return {
"research_notes": notes,
"messages": [f"[Researcher]: Research completed ({len(notes)} chars)"],
}
Edges
Edges connect nodes. Direct edges always go to a specific node. Conditional edges call a function to decide:
def should_approve(state: ResearchState) -> str:
"""
Conditional edge function: returns the name of the next node.
"""
if state.get("approved", False):
return "finalize"
elif state.get("revision_count", 0) >= 3:
return "force_finalize" # Max revisions reached
else:
return "revise" # Need more revisions
# Build the graph
graph = StateGraph(ResearchState)
graph.add_node("research", researcher_node)
graph.add_node("write", writer_node)
graph.add_node("critique", critique_node)
graph.add_node("revise", revise_node)
graph.add_node("finalize", finalize_node)
graph.set_entry_point("research")
graph.add_edge("research", "write")
graph.add_edge("write", "critique")
graph.add_conditional_edges(
"critique",
should_approve, # Function that returns next node name
{
"finalize": "finalize", # If returns "finalize" → go to finalize
"force_finalize": "finalize", # Both map to same node
"revise": "revise", # If returns "revise" → go to revise
}
)
graph.add_edge("revise", "write") # After revision, write again
graph.add_edge("finalize", END)
The LangGraph Execution Model
Checkpointing: Resumable Workflows
Checkpointing is one of LangGraph's most powerful features. After each node execution, the full state is saved to a persistent backend. If the workflow fails, you can resume from the last successful checkpoint.
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.checkpoint.postgres import PostgresSaver
import sqlite3
# ── SQLite (development, single-process) ──────────────────────
def build_graph_with_sqlite_checkpointing():
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
graph = StateGraph(ResearchState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)
return compiled
# ── PostgreSQL (production, multi-process, multi-agent) ───────
def build_graph_with_postgres_checkpointing(connection_string: str):
checkpointer = PostgresSaver.from_conn_string(connection_string)
checkpointer.setup() # Creates checkpoint tables if they don't exist
graph = StateGraph(ResearchState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)
return compiled
# ── Using checkpoints ─────────────────────────────────────────
def run_with_checkpointing():
compiled_graph = build_graph_with_sqlite_checkpointing()
# Each run needs a unique thread_id for isolation
config = {"configurable": {"thread_id": "research-task-001"}}
# First run - may be interrupted
initial_state = {
"topic": "The impact of LLMs on software development",
"revision_count": 0,
"approved": False,
"messages": [],
"sources": [],
"research_notes": "",
"draft": "",
"critique": "",
"final_output": "",
}
# Run until completion or interrupt
for event in compiled_graph.stream(initial_state, config=config):
print(f"Node completed: {list(event.keys())}")
# If the run was interrupted, you can inspect state at any point
state = compiled_graph.get_state(config)
print(f"Current node: {state.next}")
print(f"Revision count: {state.values['revision_count']}")
# Resume from where it left off (same thread_id)
for event in compiled_graph.stream(None, config=config):
print(f"Resumed node: {list(event.keys())}")
Human-in-the-Loop
LangGraph's interrupt mechanism pauses execution and waits for human input. This is invaluable for workflows requiring approval, clarification, or human judgment.
from langgraph.graph import StateGraph, END, interrupt_before
# Method 1: interrupt_before - pause BEFORE a specific node runs
graph = StateGraph(ResearchState)
graph.add_node("research", researcher_node)
graph.add_node("write", writer_node)
graph.add_node("publish", publish_node) # Requires human approval
graph.set_entry_point("research")
graph.add_edge("research", "write")
graph.add_edge("write", "publish")
graph.add_edge("publish", END)
# Compile with interrupt before publish - always pauses for human approval
compiled = graph.compile(
checkpointer=SqliteSaver(conn),
interrupt_before=["publish"], # Pause before this node
)
# ─────────────────────────────────────────────────────────────
# Method 2: interrupt() inside a node - dynamic interrupt based on content
from langgraph.types import interrupt
def approval_node(state: ResearchState) -> dict:
"""Node that requires human review of draft content."""
draft_preview = state["draft"][:500]
# This call pauses execution and stores state in checkpoint
human_response = interrupt({
"question": "Please review this draft and approve or reject.",
"draft_preview": draft_preview,
"full_draft_length": len(state["draft"]),
})
# human_response is set when the graph is resumed
if human_response["decision"] == "approve":
return {"approved": True}
else:
return {
"approved": False,
"critique": human_response.get("feedback", "Rejected without specific feedback"),
}
# ─────────────────────────────────────────────────────────────
# Resuming after human input
def human_review_workflow():
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
graph = StateGraph(ResearchState)
graph.add_node("research", researcher_node)
graph.add_node("write", writer_node)
graph.add_node("human_approval", approval_node)
graph.add_node("publish", publish_node)
graph.set_entry_point("research")
graph.add_edge("research", "write")
graph.add_edge("write", "human_approval")
graph.add_conditional_edges(
"human_approval",
lambda state: "publish" if state["approved"] else "write",
)
graph.add_edge("publish", END)
compiled = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "review-task-001"}}
# Phase 1: Run until human approval is needed
print("Phase 1: Running until human approval needed...")
for event in compiled.stream(
{"topic": "AI safety regulations", "revision_count": 0, "approved": False,
"messages": [], "sources": [], "research_notes": "", "draft": "",
"critique": "", "final_output": ""},
config=config,
):
print(f" Completed: {list(event.keys())}")
# Check where we paused
state = compiled.get_state(config)
if state.next:
print(f"\nPaused at: {state.next}")
print("Waiting for human review...")
# Simulate human reviewing and providing input
print("\n[Human reviews draft and approves]")
# Resume with human's decision
compiled.update_state(
config,
values={"approved": True},
as_node="human_approval",
)
# Phase 2: Resume from approval decision
print("\nPhase 2: Resuming after approval...")
for event in compiled.stream(None, config=config):
print(f" Completed: {list(event.keys())}")
The Supervisor Pattern: Multi-Agent in LangGraph
The supervisor pattern implements multi-agent coordination in LangGraph. A supervisor agent routes tasks to specialist sub-agents, collects their outputs, and decides when the overall task is complete.
"""
Multi-agent research system using the LangGraph supervisor pattern.
Architecture:
supervisor → [web_researcher | data_analyst | report_writer]
↘ finalize (when supervisor decides task is complete)
Install: pip install langgraph langchain-anthropic
"""
from __future__ import annotations
import json
import operator
from typing import TypedDict, Annotated, Sequence, Literal
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END
from langgraph.types import Command
import anthropic
# ── State ─────────────────────────────────────────────────────
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add]
next_agent: str # Which agent to call next
task_complete: bool
research_output: str
analysis_output: str
report_draft: str
final_report: str
iteration_count: int
# ── Specialist Nodes ───────────────────────────────────────────
def web_researcher_node(state: AgentState) -> dict:
"""Research specialist: gathers information."""
client = anthropic.Anthropic()
# Get the last human message as the research task
task = next(
(m.content for m in reversed(state["messages"]) if isinstance(m, HumanMessage)),
"Research the given topic"
)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=800,
system=(
"You are a web research specialist. You have extensive knowledge of current events, "
"industry trends, and technical topics. Provide specific, data-backed research notes "
"with clear structure. Note what you are confident about vs uncertain about."
),
messages=[{
"role": "user",
"content": f"Research task: {task}\n\nProvide comprehensive research notes."
}],
)
research = response.content[0].text
return {
"research_output": research,
"messages": [AIMessage(content=f"[Web Researcher]: {research[:200]}...")],
"next_agent": "supervisor", # Return control to supervisor
}
def data_analyst_node(state: AgentState) -> dict:
"""Analysis specialist: interprets and analyzes research."""
client = anthropic.Anthropic()
research = state.get("research_output", "No research available")
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=800,
system=(
"You are a strategic data analyst. Analyze research findings to identify "
"key patterns, implications, and strategic insights. Structure analysis as: "
"1. Key findings (3-5 bullets), 2. Trends, 3. Strategic implications."
),
messages=[{
"role": "user",
"content": f"Analyze this research:\n\n{research}"
}],
)
analysis = response.content[0].text
return {
"analysis_output": analysis,
"messages": [AIMessage(content=f"[Data Analyst]: {analysis[:200]}...")],
"next_agent": "supervisor",
}
def report_writer_node(state: AgentState) -> dict:
"""Writing specialist: produces the final report."""
client = anthropic.Anthropic()
research = state.get("research_output", "")
analysis = state.get("analysis_output", "")
original_task = next(
(m.content for m in state["messages"] if isinstance(m, HumanMessage)),
""
)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1200,
system=(
"You are an expert report writer. Create polished, professional reports "
"that synthesize research and analysis into actionable intelligence. "
"Write for a senior executive audience: concise, specific, actionable."
),
messages=[{
"role": "user",
"content": (
f"Original task: {original_task}\n\n"
f"Research:\n{research}\n\n"
f"Analysis:\n{analysis}\n\n"
f"Write a comprehensive report (400-600 words)."
)
}],
)
report = response.content[0].text
return {
"report_draft": report,
"messages": [AIMessage(content=f"[Report Writer]: Report drafted ({len(report)} chars)")],
"next_agent": "supervisor",
}
# ── Supervisor Node ────────────────────────────────────────────
SUPERVISOR_SYSTEM = """You are an orchestrator managing a team of specialists:
- web_researcher: gathers information and research
- data_analyst: analyzes research and identifies patterns
- report_writer: produces polished final reports
- FINISH: use this when the report is complete and ready
Given the current state, decide which specialist should act next.
Consider:
1. Has research been gathered? If not → web_researcher
2. Has analysis been done? If not → data_analyst
3. Has a report been written? If not → report_writer
4. Is the report complete and high quality? → FINISH
Respond with ONLY the next action as JSON: {"next": "web_researcher"|"data_analyst"|"report_writer"|"FINISH"}"""
def supervisor_node(state: AgentState) -> dict:
"""Supervisor: decides which specialist to call next."""
client = anthropic.Anthropic()
# Build context for supervisor
context_parts = [f"Iteration: {state.get('iteration_count', 0) + 1}"]
if state.get("research_output"):
context_parts.append(f"Research: Available ({len(state['research_output'])} chars)")
else:
context_parts.append("Research: Not yet gathered")
if state.get("analysis_output"):
context_parts.append(f"Analysis: Available ({len(state['analysis_output'])} chars)")
else:
context_parts.append("Analysis: Not yet done")
if state.get("report_draft"):
context_parts.append(f"Report: Drafted ({len(state['report_draft'])} chars)")
else:
context_parts.append("Report: Not yet written")
context = "\n".join(context_parts)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=100,
system=SUPERVISOR_SYSTEM,
messages=[{
"role": "user",
"content": f"Current state:\n{context}\n\nWhat is the next step?"
}],
)
try:
decision = json.loads(response.content[0].text)
next_action = decision.get("next", "FINISH")
except (json.JSONDecodeError, KeyError):
next_action = "FINISH"
print(f"[Supervisor] Next action: {next_action}")
if next_action == "FINISH":
return {
"next_agent": "FINISH",
"task_complete": True,
"final_report": state.get("report_draft", "No report generated"),
}
else:
return {
"next_agent": next_action,
"iteration_count": state.get("iteration_count", 0) + 1,
}
def route_to_specialist(state: AgentState) -> str:
"""Router: determines which node to go to based on supervisor's decision."""
next_agent = state.get("next_agent", "FINISH")
if next_agent == "FINISH":
return END
return next_agent
# ── Build the Supervisor Graph ─────────────────────────────────
def build_supervisor_graph(
use_checkpointing: bool = True,
) -> tuple:
"""Build and compile the supervisor multi-agent graph."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("supervisor", supervisor_node)
graph.add_node("web_researcher", web_researcher_node)
graph.add_node("data_analyst", data_analyst_node)
graph.add_node("report_writer", report_writer_node)
# Entry point
graph.set_entry_point("supervisor")
# Supervisor routes to specialists or END
graph.add_conditional_edges(
"supervisor",
route_to_specialist,
{
"web_researcher": "web_researcher",
"data_analyst": "data_analyst",
"report_writer": "report_writer",
END: END,
}
)
# All specialists return to supervisor
graph.add_edge("web_researcher", "supervisor")
graph.add_edge("data_analyst", "supervisor")
graph.add_edge("report_writer", "supervisor")
# Compile with optional checkpointing
if use_checkpointing:
conn = sqlite3.connect(":memory:", check_same_thread=False)
checkpointer = SqliteSaver(conn)
compiled = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "supervisor-001"}}
else:
compiled = graph.compile()
config = {}
return compiled, config
# ── Run the System ─────────────────────────────────────────────
def run_research_system(topic: str) -> str:
"""Run the complete multi-agent research system."""
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
compiled, config = build_supervisor_graph(use_checkpointing=True)
initial_state: AgentState = {
"messages": [HumanMessage(content=f"Research and analyze: {topic}")],
"next_agent": "supervisor",
"task_complete": False,
"research_output": "",
"analysis_output": "",
"report_draft": "",
"final_report": "",
"iteration_count": 0,
}
print(f"Starting multi-agent research system for: {topic}\n")
# Stream execution with progress updates
for event in compiled.stream(initial_state, config=config):
for node_name, node_output in event.items():
if node_name != "__end__":
print(f"[{node_name}] completed → next: {node_output.get('next_agent', '?')}")
# Get final state
final_state = compiled.get_state(config)
return final_state.values.get("final_report", "No report generated")
def demo():
report = run_research_system(
"The current state of open-source LLM development and its impact on enterprise AI adoption"
)
print("\n" + "=" * 60)
print("FINAL RESEARCH REPORT")
print("=" * 60)
print(report)
if __name__ == "__main__":
demo()
Streaming in LangGraph
LangGraph supports three streaming modes, all critical for production user experience:
# Mode 1: Stream node outputs (know when each node completes)
for event in compiled.stream(initial_state, config=config):
for node_name, output in event.items():
print(f"Node '{node_name}' produced: {list(output.keys())}")
# Mode 2: Stream state updates (see how state changes)
for state in compiled.stream(
initial_state,
config=config,
stream_mode="values", # Stream full state after each node
):
print(f"Current state keys: {list(state.keys())}")
# Mode 3: Stream LLM tokens (real-time output, requires astream_events)
async def stream_tokens():
async for event in compiled.astream_events(
initial_state,
config=config,
version="v2",
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)
Subgraphs: Composing Complex Systems
Large LangGraph systems are built from composable subgraphs. Each subgraph is a complete graph that can be used as a node in a larger graph.
"""Compose a validation subgraph into the main research graph."""
from typing import TypedDict
from langgraph.graph import StateGraph, END
# Subgraph: validation pipeline
class ValidationState(TypedDict):
content: str
quality_score: int
issues: list[str]
passed: bool
def quality_scorer(state: ValidationState) -> dict:
"""Score content quality."""
word_count = len(state["content"].split())
score = min(100, word_count * 2)
return {"quality_score": score}
def issue_detector(state: ValidationState) -> dict:
"""Detect specific quality issues."""
issues = []
if len(state["content"]) < 200:
issues.append("Content too short")
if "TODO" in state["content"]:
issues.append("Contains unresolved TODOs")
return {"issues": issues}
def pass_fail_decision(state: ValidationState) -> str:
if state["quality_score"] >= 60 and not state["issues"]:
return "pass"
return "fail"
# Build validation subgraph
validation_builder = StateGraph(ValidationState)
validation_builder.add_node("score", quality_scorer)
validation_builder.add_node("detect", issue_detector)
validation_builder.set_entry_point("score")
validation_builder.add_edge("score", "detect")
validation_builder.add_conditional_edges(
"detect",
pass_fail_decision,
{"pass": END, "fail": END}, # Both end the subgraph
)
validation_subgraph = validation_builder.compile()
# Use validation subgraph as a node in the main graph
def run_validation(state: ResearchState) -> dict:
"""Node that delegates to validation subgraph."""
validation_result = validation_subgraph.invoke({
"content": state["draft"],
"quality_score": 0,
"issues": [],
"passed": False,
})
return {
"approved": validation_result["passed"],
"critique": "; ".join(validation_result["issues"]) if validation_result["issues"] else "Approved",
}
LangGraph vs CrewAI vs Swarm vs AutoGen
| Dimension | LangGraph | CrewAI | AutoGen | OpenAI Swarm |
|---|---|---|---|---|
| Control granularity | Maximum | Medium | Low | Medium |
| Setup complexity | High | Low | Medium | Low |
| State management | Typed, explicit | Implicit | Conversation history | Implicit |
| Checkpointing | Built-in | No | No | No |
| Human-in-the-loop | Native | Workaround | Workaround | No |
| Debugging | Excellent (graph viz) | Good | Moderate | Limited |
| Streaming | Token + node + state | Limited | Limited | Limited |
| Best for | Complex stateful workflows | Role-based pipelines | Conversational tasks | Simple routing |
| Production maturity | High | High | High | Low |
:::danger State Mutation Bugs
The most common LangGraph bug: mutating state directly inside a node instead of returning updates. If you do state["messages"].append(msg) and return state, you create undefined behavior - LangGraph cannot track what changed. Always return a dict containing only the keys you want to update, using the correct reducer semantics. For list fields annotated with operator.add, return the new items only (not the full list) - LangGraph will append them automatically.
:::
:::warning Checkpointing Serialization
All state values must be JSON-serializable to use checkpointing. Custom Python objects (Pydantic models not configured for serialization, datetime objects, custom classes) will cause checkpoint failures. Design your state schema with serialization in mind: use primitive types (str, int, float, bool, list, dict) or Pydantic models with model_config = ConfigDict(json_encoders={...}). Test checkpointing early in development - discovering serialization issues after building a complex graph is expensive to fix.
:::
Interview Questions and Answers
Q: What is the fundamental difference between LangGraph's graph-based approach and the conversational approach of AutoGen?
A: AutoGen uses conversations as the coordination mechanism - agents exchange messages and coordination emerges from the dialogue. The execution path is implicit and emerges from what agents say to each other. LangGraph uses explicit graph transitions - every state change and routing decision is declared in the graph structure. The execution path is explicit and determined by the combination of conditional edge functions and current state. This has concrete engineering consequences: with LangGraph you can look at the graph definition and know every possible path the workflow can take; with AutoGen the possible paths depend on what agents generate, which is not statically deterministic. LangGraph is better when you need deterministic, auditable workflows with explicit control flow. AutoGen is better when you want natural coordination to emerge from agent dialogue without pre-specifying the conversation structure.
Q: How does LangGraph checkpointing work and what problems does it solve?
A: After each node execution, LangGraph serializes the full graph state (a TypedDict) and saves it to a configured persistence backend (SQLite for development, PostgreSQL for production, Redis for high-throughput). Each checkpoint is indexed by thread_id (which workflow instance) and checkpoint_id (which step). This solves three problems: (1) Resume after failure - if a node crashes or the process restarts, you can resume from the last successful checkpoint rather than restarting from scratch. This is critical for long-running workflows where restarting is expensive. (2) Human-in-the-loop - when you need a human to review and approve before continuing, you interrupt execution (the state is checkpointed at the interrupt), the human reviews asynchronously, and then the workflow resumes from the checkpoint with the human's input. Without checkpointing, the entire conversation state would need to be reconstructed. (3) Debugging and replay - you can inspect the state at any checkpoint and replay from any point, making debugging complex multi-step workflows much easier.
Q: How would you implement the supervisor pattern in LangGraph and when would you use it over a simple sequential graph?
A: The supervisor pattern has a central supervisor node that decides which specialist agent to route to at each step. Implementation: the supervisor node calls an LLM with a decision prompt and the current state, and returns the name of the next agent (or END). A conditional edge reads this field and routes accordingly. All specialist nodes, after completing their work, route back to the supervisor. This creates a hub-and-spoke graph where the supervisor is the hub. Use the supervisor pattern when: the optimal ordering of specialist tasks depends on intermediate results and cannot be determined upfront; different specializations are needed for different aspects of a complex task; or you want a "manager" with global visibility to coordinate specialists without pre-specifying the coordination sequence. A simple sequential graph is better when the task workflow is predictable and always follows the same path - sequential is faster (no routing overhead), cheaper (no LLM call for routing), and more debuggable.
Q: How do you handle state that grows unboundedly in a LangGraph workflow, like a messages list?
A: Several approaches. (1) Message reducers with size limits: instead of Annotated[list[BaseMessage], operator.add] (which appends indefinitely), write a custom reducer that keeps only the last N messages. Example: Annotated[list, lambda existing, new: (existing + new)[-50:]] keeps at most 50 messages. (2) Summarization node: insert a "compress_history" node that runs periodically (every N iterations) and replaces old messages with a summary. The node calls an LLM to summarize the existing messages and replaces them with the summary. (3) Separate working memory from persistent facts: use short-lived fields for conversation history (discarded or summarized regularly) and separate long-lived fields for important findings (never truncated). (4) PostgreSQL checkpointing with cleanup: if using PostgreSQL for checkpoints, run a periodic cleanup job to delete old checkpoint versions for completed threads. Checkpoints themselves can accumulate significant storage over time for long-running workflows.
Q: How would you implement parallel agent execution in LangGraph for tasks that can be done concurrently?
A: LangGraph supports parallel execution via fan-out/fan-in patterns. Fan-out: from one node, add edges to multiple nodes simultaneously. LangGraph executes these in parallel when using an async runner. Fan-in: use a node that waits for multiple inputs before proceeding. Implementation: (1) Define nodes for each parallel task. (2) Add edges from a "dispatch" node to all parallel task nodes simultaneously. (3) Add a "join" node that all parallel tasks route to. The join node's state will contain outputs from all parallel tasks. (4) Run with compiled.astream(...) to get actual async parallelism. For example, a research workflow that needs to gather data from three different sources simultaneously: dispatch → [search_academic, search_news, search_social] → aggregate. Each search runs concurrently; aggregate waits for all three to complete before synthesizing. This can reduce total latency from the sum of all search times to the maximum search time - a significant improvement for I/O-bound operations.
