Skip to main content

LangGraph for Stateful Agents

The Agent That Could Not Remember Where It Was

The incident lands in the on-call queue at 11:47 PM. Your company's data analysis agent - three months in production, processing hundreds of research requests per day - has stalled on a complex multi-step analysis. The task required fetching ten financial reports, extracting key metrics, running comparative analysis, drafting a summary, and routing to a human reviewer if confidence was below threshold.

Forty-three minutes into the task, the container running the agent hit an out-of-memory error and crashed. The task was half-complete: six reports fetched, four metrics extracted, zero analysis done. The user received nothing. When the container restarted and the retry logic fired, the agent started from the beginning - refetching the same six reports, burning tokens, burning time, eventually completing 80 minutes later than necessary. Two users with similar tasks in flight at the same moment lost their work entirely.

Your team spends the next two days debating solutions. One engineer suggests writing state to Redis at each step. Another suggests a task queue with idempotent tasks. A third pulls up the LangGraph documentation and points out that LangGraph's built-in checkpointing saves the complete agent state after every node execution, backed by PostgreSQL. A crash mid-task means the agent resumes from the last successful checkpoint - not from the beginning. The Redis idea and the task queue idea are both implementations of the same pattern, and LangGraph has already built it, tested it, and documented it.

The migration takes two days. The agent's logic does not change - just the scaffolding around it. After deployment, crashes during long-running tasks are a minor operational inconvenience rather than a support escalation. Token costs drop by 30% as redundant work disappears. The on-call queue goes quiet on this class of incident.

LangGraph solves a specific problem: stateful, complex agent workflows where the implicit message-list approach of a basic agentic loop is insufficient. It makes the problem structure explicit - typed state, directed graph of nodes, explicit routing - in a way that enables checkpointing, testing, and debugging that is impossible with implicit state. This lesson explains how it works and how to use it.


:::tip 🎮 Interactive Playground Visualize this concept: Try the LangGraph Stateful Agents demo on the EngineersOfAI Playground - no code required. :::

Why This Exists

The Problem with Linear Agents

The basic agentic loop - while tools: call model, execute tool, append result - is fundamentally linear. There is one thread of execution, one growing message list, one model decision at a time. For simple agents this is fine. For complex agents it breaks down in three ways.

First: parallelism. A research agent that needs to fetch ten papers sequentially waits through ten API calls when it could make all ten simultaneously. A linear loop cannot parallelize. You can add async code, but then you are managing concurrent futures manually, outside the loop, in code that is hard to reason about.

Second: complex routing. A code review agent needs to route to a security reviewer when vulnerabilities are found, to a performance reviewer when profiling shows regressions, and to approval when all checks pass. Implementing this in a linear loop means deeply nested if-statements in your routing code, mixed with the model's reasoning, making both harder to read and test.

Third: resumption. When a long-running agent crashes, a linear loop has no notion of "where it was." You restart from the beginning. The loop variable was in memory; the completed steps are gone.

LangGraph's graph model accommodates all three naturally. Parallelism is multiple nodes with shared fan-in. Routing is conditional edges with explicit routing functions. Resumption is checkpointing - after every node, the state is saved.

The Problem with Implicit State

The original LangChain AgentExecutor kept all state in the message history: a growing list of human messages, assistant messages, and tool results. This worked for simple agents. But state is almost always richer than message history.

You want to track: which steps have completed, what quality scores have been assigned, whether a human has approved, how many retries have occurred, what external IDs correspond to what records. None of these belong in the message history. Stuffing them there as JSON in assistant messages is a hack that makes messages hard to read and state hard to access.

LangGraph's state is an explicit TypedDict. You define every field your agent needs. Nodes read from it and write to it. The framework merges updates. The state is the first-class citizen, not a byproduct.


Historical Context

LangGraph was announced by Harrison Chase and the LangChain team in January 2024. It was inspired by Pregel, the distributed graph processing framework published by Google in 2010. Pregel's core abstraction: computation as messages flowing through a graph of vertices (nodes), where each vertex processes its incoming messages and sends outputs to its neighbors. In LangGraph's adaptation, the "messages" are state updates, the "vertices" are processing functions, and the topology defines the agent's possible execution paths.

LangGraph v0.1 shipped in January 2024 with basic StateGraph and conditional edge support. By mid-2024, it had added the interrupt() primitive for human-in-the-loop, Postgres and Redis checkpointers for production persistence, and the Command primitive for dynamic graph routing. LangGraph Cloud launched as a managed deployment platform. By the end of 2024, LangGraph had become the recommended approach for all complex agent development in the LangChain ecosystem, with the team explicitly advising users to use LangGraph rather than AgentExecutor for new development.

The design philosophy: make control flow explicit. Where AgentExecutor hid the routing logic in framework code, LangGraph requires you to define it. This adds verbosity but removes magic. When something goes wrong, you can see exactly which node failed, what the state was before the failure, and which routing decision led there.


Core Architecture

State: The TypedDict

Every LangGraph application starts with a state definition:

from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
import operator

class ResearchState(TypedDict):
# add_messages reducer: new messages appended, not replaced
messages: Annotated[list[BaseMessage], add_messages]

# Plain field: last write wins (each update overwrites)
topic: str
status: str
draft: str
quality_score: float

# operator.add reducer: lists from parallel nodes are concatenated
research_results: Annotated[list[str], operator.add]

# Custom reducer
error_count: Annotated[int, lambda a, b: a + b] # Sum errors from parallel nodes

The reducer is critical. Without it, parallel nodes that update the same field will overwrite each other. With operator.add, parallel results accumulate. With add_messages, message history grows correctly. Without any annotation, the last-writing node wins.

Nodes

A node is any Python callable that takes the current state and returns a dict of state updates:

from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage, HumanMessage

model = ChatAnthropic(model="claude-opus-4-6", temperature=0)

def planner_node(state: ResearchState) -> dict:
"""Decompose the research topic into subtopics."""
response = model.invoke([
SystemMessage(content="""You are a research planner.
Decompose the topic into 3 focused subtopics. Return as a numbered list."""),
HumanMessage(content=f"Topic: {state['topic']}")
])

# Parse the numbered list
subtopics = []
for line in response.content.split('\n'):
line = line.strip()
if line and line[0].isdigit():
subtopics.append(line.split('.', 1)[-1].strip())

return {
"messages": [response], # add_messages appends this
"subtopics": subtopics[:3], # Overwrite subtopics
"status": "planned"
}

The return dict only needs to include the fields being updated. Fields not in the return dict are left unchanged in the state.

Edges

from langgraph.graph import StateGraph, END, START

builder = StateGraph(ResearchState)

# Add nodes
builder.add_node("planner", planner_node)
builder.add_node("researcher", researcher_node)
builder.add_node("critic", critic_node)
builder.add_node("writer", writer_node)

# Simple edges (always flow this direction)
builder.add_edge(START, "planner")
builder.add_edge("planner", "researcher")

Conditional Edges

Conditional edges route to different nodes based on state:

def quality_router(state: ResearchState) -> str:
"""Route based on research quality score."""
score = state.get("quality_score", 0.0)
retries = state.get("retry_count", 0)

if score >= 0.75:
return "write" # Quality is good enough, proceed to writing
elif retries < 2:
return "retry" # Try researching again
else:
return "write" # Give up retrying, write with what we have

builder.add_conditional_edges(
"critic", # From this node
quality_router, # Using this routing function
{
"write": "writer", # If "write", go to writer node
"retry": "researcher", # If "retry", go back to researcher
}
)

builder.add_edge("writer", END)

Full Working Example: Research Agent with Human Approval

import operator
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt

model = ChatAnthropic(model="claude-opus-4-6", temperature=0)

# ─── State ────────────────────────────────────────────────────────────────────

class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
topic: str
research_results: Annotated[list[str], operator.add]
draft: str
quality_score: float
human_feedback: str
approved: bool
iteration: int

# ─── Nodes ────────────────────────────────────────────────────────────────────

def researcher(state: ResearchState) -> dict:
"""Conduct research on the topic."""
iteration = state.get("iteration", 0)
prior_feedback = state.get("human_feedback", "")

prompt = f"Research this topic thoroughly: {state['topic']}"
if prior_feedback:
prompt += f"\n\nPrevious feedback to address: {prior_feedback}"

response = model.invoke([
SystemMessage(content="""You are a research analyst. Provide detailed,
structured research with specific data points, dates, and examples.
Cite your reasoning as you go."""),
HumanMessage(content=prompt)
])

return {
"messages": [response],
"research_results": [f"[Iteration {iteration}]\n{response.content}"],
"iteration": iteration + 1
}

def critic(state: ResearchState) -> dict:
"""Evaluate the quality of research gathered."""
import json, re

combined = "\n\n".join(state.get("research_results", []))
response = model.invoke([
SystemMessage(content="""Evaluate research quality. Return JSON only:
{"score": 0.0-1.0, "strengths": ["..."], "gaps": ["..."]}
Score 0.8+ means ready to write. Below 0.6 means needs more research."""),
HumanMessage(content=f"Evaluate research on '{state['topic']}':\n\n{combined}")
])

try:
match = re.search(r'\{.*\}', response.content, re.DOTALL)
data = json.loads(match.group()) if match else {"score": 0.7}
except Exception:
data = {"score": 0.7}

return {
"messages": [response],
"quality_score": float(data.get("score", 0.7))
}

def writer(state: ResearchState) -> dict:
"""Write the research report."""
combined = "\n\n".join(state.get("research_results", []))
feedback = state.get("human_feedback", "")

context = combined
if feedback:
context += f"\n\n--- Human Reviewer Feedback ---\n{feedback}"

response = model.invoke([
SystemMessage(content="""Write a comprehensive research report with:
1. Executive Summary (3-4 sentences)
2. Key Findings (bulleted)
3. Detailed Analysis
4. Conclusions and Implications
Be specific. Include data points from the research."""),
HumanMessage(content=f"Write a report on: {state['topic']}\n\nResearch:\n{context}")
])

return {
"messages": [response],
"draft": response.content
}

def human_review_gate(state: ResearchState) -> dict:
"""Pause for human review using interrupt()."""
# interrupt() suspends execution here and returns control to caller
# The value passed to interrupt() is shown to the human reviewer
feedback = interrupt({
"action": "review_required",
"topic": state["topic"],
"draft_preview": state["draft"][:500] + "...",
"instruction": "Reply 'approve' to publish, or provide specific feedback for revision."
})

if str(feedback).lower().strip() == "approve":
return {"approved": True, "human_feedback": ""}
return {"approved": False, "human_feedback": str(feedback)}

# ─── Routing functions ─────────────────────────────────────────────────────────

def should_continue_research(state: ResearchState) -> str:
score = state.get("quality_score", 0.0)
iteration = state.get("iteration", 0)
if score >= 0.75 or iteration >= 3:
return "write"
return "research_more"

def after_human_review(state: ResearchState) -> str:
return "end" if state.get("approved") else "revise"

# ─── Build the graph ───────────────────────────────────────────────────────────

def build_research_graph():
builder = StateGraph(ResearchState)

builder.add_node("researcher", researcher)
builder.add_node("critic", critic)
builder.add_node("writer", writer)
builder.add_node("human_review", human_review_gate)

builder.add_edge(START, "researcher")
builder.add_edge("researcher", "critic")

builder.add_conditional_edges(
"critic",
should_continue_research,
{"write": "writer", "research_more": "researcher"}
)

builder.add_edge("writer", "human_review")

builder.add_conditional_edges(
"human_review",
after_human_review,
{"end": END, "revise": "writer"} # Revise goes back to writer with feedback
)

# Compile with in-memory checkpointing (use PostgresSaver in production)
checkpointer = MemorySaver()
return builder.compile(checkpointer=checkpointer)

# ─── Running the graph ─────────────────────────────────────────────────────────

def run_research(topic: str, thread_id: str):
"""Run the research agent, handling human-in-the-loop."""
import uuid
graph = build_research_graph()
config = {"configurable": {"thread_id": thread_id or str(uuid.uuid4())}}

print(f"Starting research on: {topic}")
initial_state = {
"topic": topic,
"research_results": [],
"iteration": 0,
"approved": False
}

# Run until interrupt or completion
for event in graph.stream(initial_state, config, stream_mode="values"):
status = event.get("status", "")
if "draft" in event and event["draft"] and not status:
print(f"\n--- DRAFT READY FOR REVIEW ---")
print(event["draft"][:300] + "...")
print("-------------------------------")

# Check if we are waiting for human input
state = graph.get_state(config)
if state.next:
print(f"\nAgent paused at: {state.next}")
feedback = input("Your review (or 'approve'): ").strip()

# Resume the graph with human feedback
graph.update_state(config, {"human_feedback": feedback})

for event in graph.stream(None, config, stream_mode="values"):
if "approved" in event and event["approved"]:
print("\nResearch approved! Final report saved.")
print(event.get("draft", "")[:200] + "...")

return graph.get_state(config).values.get("draft", "")

if __name__ == "__main__":
import uuid
result = run_research("The state of open-source LLMs in 2025", str(uuid.uuid4()))

Parallel Execution (Fan-Out / Fan-In)

LangGraph supports parallel node execution when multiple edges leave the same node:

from typing import TypedDict, Annotated
import operator
from langgraph.graph import StateGraph, END, START

class ParallelState(TypedDict):
query: str
results: Annotated[list[dict], operator.add] # MUST have operator.add
final_answer: str

def make_api_fetcher(api_name: str, delay: float = 0):
"""Factory that creates a fetcher node for a specific API."""
def fetcher(state: ParallelState) -> dict:
import time
time.sleep(delay) # Simulate API latency
return {
"results": [{"api": api_name, "data": f"Result from {api_name} for '{state['query']}'"}]
}
return fetcher

def synthesizer(state: ParallelState) -> dict:
"""Synthesize results from all parallel fetchers."""
all_results = state.get("results", [])
summary = f"Synthesized {len(all_results)} API results:\n"
for r in all_results:
summary += f" - {r['api']}: {r['data']}\n"
return {"final_answer": summary}

builder = StateGraph(ParallelState)

# Add nodes
builder.add_node("fetch_github", make_api_fetcher("GitHub", delay=0.5))
builder.add_node("fetch_news", make_api_fetcher("NewsAPI", delay=0.8))
builder.add_node("fetch_arxiv", make_api_fetcher("arXiv", delay=1.2))
builder.add_node("synthesize", synthesizer)

# Fan-out: all three fetchers start simultaneously
builder.add_edge(START, "fetch_github")
builder.add_edge(START, "fetch_news")
builder.add_edge(START, "fetch_arxiv")

# Fan-in: synthesizer waits for all three
builder.add_edge("fetch_github", "synthesize")
builder.add_edge("fetch_news", "synthesize")
builder.add_edge("fetch_arxiv", "synthesize")

builder.add_edge("synthesize", END)
graph = builder.compile()

# The three fetchers run in parallel - total time ~1.2s, not 2.5s
result = graph.invoke({"query": "LLM agents 2025", "results": []})
print(result["final_answer"])

Checkpointing for Production

from langgraph.checkpoint.postgres import PostgresSaver

# Production setup: PostgreSQL checkpointing
DATABASE_URL = "postgresql://user:password@localhost:5432/agent_db"

with PostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
# Create the checkpoint tables (one-time setup)
checkpointer.setup()

# Compile graph with persistent checkpointing
graph = builder.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "research-task-42"}}

# If the process crashes, resume from the last checkpoint:
# state = graph.get_state(config)
# print(f"Resuming from: {state.next}")
# for event in graph.stream(None, config):
# ... # Continues from last successful checkpoint

# View the full checkpoint history for a thread
for checkpoint_tuple in graph.get_state_history(config):
step = checkpoint_tuple.metadata.get("step", "?")
next_nodes = checkpoint_tuple.next
print(f"Step {step} → next: {next_nodes}")

Thread IDs and Isolation

Every agent run must have a unique thread ID:

import uuid

def create_agent_run(topic: str) -> str:
"""Create a new agent run with an isolated thread."""
thread_id = str(uuid.uuid4())
config = {"configurable": {"thread_id": thread_id}}

# Store thread_id in your database for later retrieval
db.save_agent_run(thread_id=thread_id, topic=topic, status="running")

return thread_id

def resume_agent_run(thread_id: str, human_input: str):
"""Resume a paused agent run with human input."""
config = {"configurable": {"thread_id": thread_id}}
graph.update_state(config, {"human_feedback": human_input})

for event in graph.stream(None, config):
pass # Continue until next interrupt or completion

Streaming

import asyncio

async def stream_research(topic: str, thread_id: str):
"""Stream agent events as they occur."""
graph = build_research_graph()
config = {"configurable": {"thread_id": thread_id}}

# Stream individual LLM tokens (requires astream_events)
async for event in graph.astream_events(
{"topic": topic, "research_results": [], "iteration": 0},
config,
version="v2"
):
event_type = event["event"]

if event_type == "on_chat_model_stream":
# Stream LLM tokens in real time
chunk = event["data"]["chunk"]
if hasattr(chunk, "content") and chunk.content:
print(chunk.content, end="", flush=True)

elif event_type == "on_chain_start":
node_name = event.get("name", "unknown")
print(f"\n[Starting: {node_name}]")

elif event_type == "on_chain_end":
node_name = event.get("name", "unknown")
print(f"\n[Completed: {node_name}]")

# Run async
asyncio.run(stream_research("AI safety research 2025", "thread-123"))

Subgraphs: Modular Agent Composition

Large agent systems can be decomposed into reusable subgraphs:

# Define a reusable research subgraph
research_builder = StateGraph(ResearchState)
research_builder.add_node("fetch", researcher)
research_builder.add_node("evaluate", critic)
research_builder.add_edge(START, "fetch")
research_builder.add_edge("fetch", "evaluate")
research_builder.add_edge("evaluate", END)
research_subgraph = research_builder.compile()

# Main orchestration graph uses the subgraph as a node
class OrchestratorState(TypedDict):
tasks: list[str]
results: Annotated[list[str], operator.add]
final_report: str

def make_research_task(task_index: int):
def run_task(state: OrchestratorState) -> dict:
if task_index >= len(state["tasks"]):
return {"results": []}
topic = state["tasks"][task_index]
result = research_subgraph.invoke({
"topic": topic,
"research_results": [],
"iteration": 0
})
return {"results": [result.get("draft", "")]}
return run_task

# The orchestrator graph fans out to research subgraphs
orch_builder = StateGraph(OrchestratorState)
orch_builder.add_node("research_0", make_research_task(0))
orch_builder.add_node("research_1", make_research_task(1))
orch_builder.add_node("synthesize", lambda s: {"final_report": "\n\n".join(s["results"])})

orch_builder.add_edge(START, "research_0")
orch_builder.add_edge(START, "research_1")
orch_builder.add_edge("research_0", "synthesize")
orch_builder.add_edge("research_1", "synthesize")
orch_builder.add_edge("synthesize", END)

Production Engineering Notes

Always Define Reducers for Parallel Fields

The single most common LangGraph bug in production: missing reducers on fields updated by parallel nodes. Without operator.add, parallel nodes overwrite each other silently. No error is raised. You just lose data.

# WRONG - parallel nodes will overwrite each other
class BuggyState(TypedDict):
results: list[str] # Missing Annotated + operator.add

# CORRECT - parallel results accumulate
class CorrectState(TypedDict):
results: Annotated[list[str], operator.add]

Graceful Error Handling in Nodes

Nodes that raise unhandled exceptions crash the graph. Implement try-except in every node that touches external systems:

def safe_external_node(state: ResearchState) -> dict:
try:
data = external_api.call(state["topic"])
return {"research_results": [data], "error": None}
except TimeoutError:
return {"research_results": ["[API timeout]"], "error": "timeout"}
except Exception as e:
return {"research_results": [f"[Error: {e}]"], "error": str(e)}

Then use conditional edges to handle errors:

def route_after_fetch(state: ResearchState) -> str:
if state.get("error") == "timeout" and state.get("retry_count", 0) < 2:
return "retry"
return "continue"

LangGraph Cloud for Managed Deployment

For teams that do not want to manage their own PostgreSQL checkpointing:

from langgraph_sdk import get_client

client = get_client(url="https://your-deployment.us.langgraph.app")

# Create and run an agent
thread = await client.threads.create()
run = await client.runs.create(
thread_id=thread["thread_id"],
assistant_id="research-agent", # Your deployed graph name
input={"topic": "Quantum computing 2025", "research_results": [], "iteration": 0}
)

# Stream events from the cloud run
async for event in client.runs.join_stream(thread["thread_id"], run["run_id"]):
print(event)

:::danger Reducer Mismatch: The Silent Data Loss Bug

If a field is updated by multiple parallel nodes and does NOT have a reducer annotation, the last node to complete will overwrite all others. You will not get an error - the graph will run successfully but with silently dropped data. This is particularly dangerous in fan-out patterns where each parallel node writes to the same list.

Rule: any field that can be updated by parallel nodes MUST have a reducer. Use Annotated[list[T], operator.add] for lists. For numeric aggregations, use a lambda like Annotated[int, lambda a, b: a + b]. For sets, use Annotated[set[str], lambda a, b: a | b].

:::

:::warning Thread ID Reuse Across Users

If you accidentally reuse a thread ID across different user requests, users will see each other's agent state. This is a data privacy violation.

Generate a fresh UUID4 for every new agent session. Store it in your database mapped to the user and request. Never derive thread IDs from user-identifiable information. Use import uuid; str(uuid.uuid4()) - it is collision-proof.

:::


Interview Questions and Answers

Q1: What is the fundamental difference between LangGraph and LangChain's AgentExecutor, and why was LangGraph created?

AgentExecutor runs an implicit loop: call agent, execute tools, repeat. The state is the growing message list. You cannot define additional state fields, you cannot checkpoint it, you cannot route based on non-message state, and you cannot parallelize.

LangGraph makes the loop explicit as a directed graph. State is a TypedDict you define. Nodes are Python functions that read state and return updates. Edges (including conditional edges) define what runs next. The graph is the agent's control flow made visible.

LangGraph was created because complex agents outgrew AgentExecutor's model. Real workflows need cycles, conditional branches, parallel execution, and persistence across restarts. None of these are natural in AgentExecutor. LangGraph provides them all through the graph model.

Q2: How does LangGraph's checkpointing work, and what production problems does it solve?

After each node executes successfully, LangGraph serializes the complete state and saves it to the configured checkpointer (MemorySaver for development, PostgresSaver or RedisSaver for production). The save is keyed by thread_id + step_number.

When a process crashes mid-run, you can resume by calling graph.stream(None, config) with the same thread_id. LangGraph loads the last checkpoint, determines which nodes have not yet run, and continues from that point - not from the beginning.

This solves two production problems. First, it eliminates wasted work: a 50-step agent that crashes at step 40 resumes at step 40, not step 0. Second, it enables human-in-the-loop workflows that span hours or days: the agent can interrupt, save state, wait for a human reviewer, and resume after the review - even if the process has restarted multiple times in between.

Q3: Explain LangGraph's fan-out and fan-in patterns and the critical implementation detail you must get right.

Fan-out: add edges from one node to multiple downstream nodes. All of them will execute simultaneously. Fan-in: add edges from multiple nodes to one downstream node. LangGraph waits until all upstream nodes have completed before executing the downstream node.

The critical detail: reducer annotations on shared state fields. When three parallel nodes all add to the same list field, LangGraph needs to merge their updates. Without Annotated[list[str], operator.add], the default behavior is last-write-wins - you lose all but one result. With operator.add, all three lists are concatenated.

This is a silent bug. The graph runs without errors. Your state simply has fewer results than you expected. Always annotate list fields with operator.add when using fan-out patterns.

Q4: How does human-in-the-loop work in LangGraph? What are the two mechanisms available?

Two mechanisms: interrupt_before and interrupt().

interrupt_before=["node_name"] is specified at compile time. When the graph reaches that node, it saves state and pauses before executing the node. The caller sees that state.next contains that node name. The caller gets human input, optionally updates the state with graph.update_state(), and calls graph.stream(None, config) to resume.

interrupt() called inside a node body is more flexible. It can be called conditionally - only interrupt if the confidence score is below threshold, for example. It can pass a payload to the caller explaining what needs review. The human's response becomes the return value of the interrupt() call when execution resumes.

Both require a persistent checkpointer in production. MemorySaver works for development and processes that will not restart, but a human review that takes 4 hours requires PostgresSaver so the state survives the inevitable process restart.

Q5: You are building a LangGraph agent that scrapes five websites in parallel and synthesizes the results. Walk through the state design and what could go wrong.

State design:

class ScraperState(TypedDict):
urls: list[str]
# MUST use operator.add - five parallel scrapers all write here
scraped_content: Annotated[list[dict], operator.add]
synthesis: str
failed_urls: Annotated[list[str], operator.add] # Track failures separately

Graph: five scraper nodes all receive edges from START (fan-out). All five send edges to a synthesizer node (fan-in). The synthesizer runs once all five have completed.

What can go wrong: (1) A scraper raises an exception and crashes its node - the synthesis never runs. Solution: wrap every scraper in try-except, return failure info in failed_urls rather than raising. (2) A slow website makes all five scrapers wait - LangGraph runs them in parallel but waits for the slowest. Solution: set a timeout in each scraper and return a partial result if the timeout fires. (3) Missing operator.add on scraped_content - only one scraper's results survive. The graph runs successfully with no error. Solution: always verify reducer annotations before deployment by writing a test that runs two nodes in parallel and checks that both results are present in the final state.

© 2026 EngineersOfAI. All rights reserved.