Agent Communication Protocols
Reading time: ~35 minutes | Relevance: Critical for multi-agent reliability | Target roles: AI Engineer, Backend Engineer, Systems Architect
The Scenario
You have five agents: a planner, a researcher, a coder, a tester, and a deployer. They communicate through function calls. The coder sends its output to the tester as a plain string: "Here's the code." Six weeks into production, the tester occasionally receives output from the researcher by mistake. Sometimes the code is truncated. Sometimes the format changes and the tester can't parse it. Debugging takes hours because there are no message IDs, no timestamps, no sender information.
The problem wasn't the agents. It was the communication protocol - specifically, the lack of one.
Agents that can't communicate clearly are useless to each other. The format, schema, error handling, and metadata of every message determine whether a multi-agent system succeeds or silently fails.
:::tip 🎮 Interactive Playground Visualize this concept: Try the Agent Communication Protocols demo on the EngineersOfAI Playground - no code required. :::
Why This Exists
Early multi-agent systems (AutoGen 0.1 era, early CrewAI) treated agent communication as a string-passing problem: output of agent A is the input string to agent B. Simple, fast, and fragile.
As systems grew in complexity - more agents, longer pipelines, parallel execution, retry logic - string-passing broke down:
- No way to identify which agent sent a message
- No way to correlate a response with the request that generated it
- No error metadata - just a success string or an exception
- No audit trail - impossible to replay or debug a failed run
Production multi-agent systems need structured communication: typed messages, explicit routing, delivery guarantees, and tracing. This is the same lesson the distributed systems community learned about microservice communication in 2012-2018.
Communication Primitives
Four primitives underlie all agent communication:
1. Direct Function Calls
Agent A calls Agent B's function synchronously. The simplest primitive.
# Direct call - simple but tightly coupled
result = writer_agent.write(topic="RAG systems", research=research_output)
Pros: Simple, no infrastructure, easy debugging Cons: Tight coupling, no async, no retry built in, no tracing
2. Message Queues
Agent A puts a message in a queue. Agent B reads from the queue when ready.
import queue
message_queue = queue.Queue()
# Agent A sends
message_queue.put(Message(to="writer", content="RAG research findings..."))
# Agent B receives
msg = message_queue.get(timeout=30)
Pros: Decoupled, supports retry, buffering Cons: More infrastructure, async complexity, ordering guarantees vary
3. Shared Memory
All agents read/write a shared state object (in-process) or shared store (Redis, database).
shared_state = {}
# Research agent writes
shared_state["research"] = {"topic": "RAG", "findings": [...]}
# Writer reads
research = shared_state.get("research")
Pros: No serialization, agents can observe each other's state Cons: Race conditions, hard to audit, coupling through state shape
4. Event Streams
Agents emit events; other agents subscribe to relevant events.
from collections import defaultdict
class EventBus:
def __init__(self):
self._handlers = defaultdict(list)
def subscribe(self, event_type: str, handler):
self._handlers[event_type].append(handler)
def emit(self, event_type: str, data: dict):
for handler in self._handlers[event_type]:
handler(data)
bus = EventBus()
bus.subscribe("research.completed", writer_agent.on_research_complete)
bus.emit("research.completed", {"topic": "RAG", "findings": [...]})
Pros: Fully decoupled, supports fan-out Cons: Complex to reason about, hard to trace causality
Message Formats
Plain Text (Flexible, Fragile)
"Here are the research findings on RAG systems. RAG stands for..."
No schema, no metadata, no error information. Works for prototypes. Breaks in production.
Structured JSON (Strict, Reliable)
{
"message_id": "msg_abc123",
"timestamp": "2025-01-15T10:30:00Z",
"sender": "research_agent",
"recipient": "writer_agent",
"task_id": "task_789",
"message_type": "research_complete",
"content": {
"topic": "RAG systems",
"findings": ["RAG combines retrieval with generation...", "..."],
"confidence": 0.87,
"sources": ["arxiv:2312.10997"]
},
"metadata": {
"tokens_used": 1240,
"model": "claude-opus-4-5",
"duration_ms": 3200
},
"status": "success"
}
Every field serves a purpose. message_id enables deduplication. task_id enables tracing. status enables error handling. metadata enables cost tracking.
Tool Call Format
When agents communicate through function calling, the format is determined by the tool calling schema:
# The orchestrator calls the writer agent as a tool
tools = [{
"name": "writer_agent",
"description": "Write content based on research findings",
"input_schema": {
"type": "object",
"properties": {
"topic": {"type": "string"},
"research_findings": {"type": "array", "items": {"type": "string"}},
"target_length": {"type": "integer"},
"format": {"type": "string", "enum": ["article", "summary", "report"]}
},
"required": ["topic", "research_findings"]
}
}]
Tool call format enforces schema at the LLM level - the model must produce valid JSON that matches the schema before the call is executed.
Message Schema Design
A complete message schema for a production multi-agent system:
Synchronous vs Asynchronous Communication
The most important design decision in agent communication.
Synchronous: Agent A calls Agent B, waits for the response, then continues.
- Simple to reason about
- Blocking - Agent A is idle while Agent B works
- Natural for sequential pipelines
Asynchronous: Agent A sends a message to Agent B, continues other work, handles the response when it arrives.
- Complex but non-blocking
- Natural for parallel pipelines
- Requires correlation of requests and responses
import asyncio
# Synchronous (sequential agents)
def sync_pipeline(task: str) -> str:
research = researcher(task) # blocks until done
draft = writer(research) # blocks until done
return critic(draft) # blocks until done
# Asynchronous (parallel agents)
async def async_pipeline(tasks: list[str]) -> list[str]:
# All researchers run simultaneously
research_results = await asyncio.gather(*[
researcher_async(task) for task in tasks
])
# All writers run simultaneously with their research
drafts = await asyncio.gather(*[
writer_async(research) for research in research_results
])
return drafts
When to use sync: Sequential pipelines where each step depends on the previous. Simple systems. Debugging-heavy workflows.
When to use async: Independent parallel subtasks. High-throughput production systems. Any pipeline where multiple agents can work simultaneously.
Communication Patterns
Request-Response
The most common pattern. Agent A sends a request and expects exactly one response.
Agent A → [request] → Agent B
Agent A ← [response] ← Agent B
Broadcast
Agent A sends a message to all agents. Used for notifications, shared context updates.
Agent A → [broadcast] → Agent B, Agent C, Agent D
Point-to-Point
Directed message from one agent to one specific agent. Default for most agent communication.
Publish-Subscribe
Agents publish to topics; other agents subscribe. Used in event-driven architectures.
class PubSubBus:
def __init__(self):
self._subscriptions: dict[str, list[Callable]] = {}
def publish(self, topic: str, message: "AgentMessage"):
subscribers = self._subscriptions.get(topic, [])
for handler in subscribers:
handler(message)
def subscribe(self, topic: str, handler: Callable):
self._subscriptions.setdefault(topic, []).append(handler)
Full Python Code: Typed Message System
"""
agent_communication.py
A typed message-passing system for multi-agent coordination.
Features: Pydantic schemas, routing, delivery guarantees, tracing.
"""
import uuid
import time
import asyncio
from datetime import datetime, timezone
from typing import Optional, Any, Callable, Awaitable
from dataclasses import dataclass, field
from enum import Enum
import json
import anthropic
client = anthropic.Anthropic()
MODEL = "claude-opus-4-5"
# ─── Message Types ────────────────────────────────────────────────────────────
class MessageStatus(str, Enum):
PENDING = "pending"
DELIVERED = "delivered"
PROCESSED = "processed"
FAILED = "failed"
RETRYING = "retrying"
class MessageType(str, Enum):
TASK_REQUEST = "task_request"
TASK_RESPONSE = "task_response"
ERROR = "error"
BROADCAST = "broadcast"
HEARTBEAT = "heartbeat"
@dataclass
class MessageMetadata:
tokens_used: int = 0
duration_ms: int = 0
model_id: str = MODEL
retry_count: int = 0
@dataclass
class AgentMessage:
"""
Typed message for agent-to-agent communication.
Every field has a purpose; no implicit string passing.
"""
message_type: MessageType
sender: str
recipient: str # agent name or "broadcast"
content: Any # typed payload
# Auto-assigned
message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
task_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
conversation_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
status: MessageStatus = MessageStatus.PENDING
metadata: MessageMetadata = field(default_factory=MessageMetadata)
error_detail: Optional[str] = None
correlation_id: Optional[str] = None # links responses to requests
def to_dict(self) -> dict:
return {
"message_id": self.message_id,
"task_id": self.task_id,
"conversation_id": self.conversation_id,
"timestamp": self.timestamp,
"type": self.message_type.value,
"sender": self.sender,
"recipient": self.recipient,
"status": self.status.value,
"content": self.content,
"metadata": {
"tokens": self.metadata.tokens_used,
"duration_ms": self.metadata.duration_ms,
"model": self.metadata.model_id,
"retries": self.metadata.retry_count
},
"error": self.error_detail,
"correlation_id": self.correlation_id
}
def make_response(self, content: Any, status: MessageStatus = MessageStatus.PROCESSED) -> "AgentMessage":
"""Create a response message correlated to this request."""
return AgentMessage(
message_type=MessageType.TASK_RESPONSE,
sender=self.recipient,
recipient=self.sender,
content=content,
task_id=self.task_id,
conversation_id=self.conversation_id,
status=status,
correlation_id=self.message_id
)
def make_error(self, error: str) -> "AgentMessage":
"""Create an error response correlated to this request."""
return AgentMessage(
message_type=MessageType.ERROR,
sender=self.recipient,
recipient=self.sender,
content=None,
task_id=self.task_id,
conversation_id=self.conversation_id,
status=MessageStatus.FAILED,
correlation_id=self.message_id,
error_detail=error
)
# ─── Message Router ───────────────────────────────────────────────────────────
class AgentRouter:
"""
Routes messages between registered agents.
Tracks delivery, handles retries, records traces.
"""
def __init__(self, max_retries: int = 2):
self._handlers: dict[str, Callable] = {}
self._trace: list[dict] = []
self._max_retries = max_retries
def register(self, agent_name: str, handler: Callable[[AgentMessage], AgentMessage]):
"""Register an agent to receive messages."""
self._handlers[agent_name] = handler
print(f"[Router] Registered agent: {agent_name}")
async def send(self, message: AgentMessage) -> AgentMessage:
"""Send a message to the target agent, with retry logic."""
message.status = MessageStatus.DELIVERED
self._trace.append({
"event": "sent",
"message_id": message.message_id,
"from": message.sender,
"to": message.recipient,
"type": message.message_type.value,
"timestamp": message.timestamp
})
handler = self._handlers.get(message.recipient)
if not handler:
error_msg = f"No handler registered for agent: {message.recipient}"
return message.make_error(error_msg)
for attempt in range(self._max_retries + 1):
try:
start = time.time()
# Support both sync and async handlers
if asyncio.iscoroutinefunction(handler):
response = await handler(message)
else:
response = handler(message)
response.metadata.duration_ms = int((time.time() - start) * 1000)
response.metadata.retry_count = attempt
self._trace.append({
"event": "processed",
"message_id": message.message_id,
"response_id": response.message_id,
"status": response.status.value,
"duration_ms": response.metadata.duration_ms
})
return response
except Exception as e:
if attempt < self._max_retries:
message.status = MessageStatus.RETRYING
message.metadata.retry_count = attempt + 1
await asyncio.sleep(2 ** attempt)
else:
error_response = message.make_error(str(e))
self._trace.append({
"event": "failed",
"message_id": message.message_id,
"error": str(e),
"attempts": attempt + 1
})
return error_response
async def broadcast(self, sender: str, content: Any) -> list[AgentMessage]:
"""Send a message to all registered agents."""
responses = []
for recipient_name in self._handlers:
if recipient_name == sender:
continue
msg = AgentMessage(
message_type=MessageType.BROADCAST,
sender=sender,
recipient=recipient_name,
content=content
)
response = await self.send(msg)
responses.append(response)
return responses
def get_trace(self) -> list[dict]:
return self._trace
def print_trace(self):
print("\n[Message Trace]")
for event in self._trace:
print(f" {event['event'].upper():10s} | {event.get('from', '?'):15s} → {event.get('to', '?'):15s} | {event.get('message_id', '?')}")
# ─── Agent Implementations ────────────────────────────────────────────────────
def make_llm_agent(name: str, system_prompt: str) -> Callable[[AgentMessage], AgentMessage]:
"""Factory that creates an LLM-backed agent handler."""
def handler(message: AgentMessage) -> AgentMessage:
if message.message_type not in (MessageType.TASK_REQUEST, MessageType.BROADCAST):
return message.make_error(f"{name}: unexpected message type {message.message_type}")
task = message.content if isinstance(message.content, str) else json.dumps(message.content)
start = time.time()
response = client.messages.create(
model=MODEL,
max_tokens=800,
system=system_prompt,
messages=[{"role": "user", "content": task}]
)
duration_ms = int((time.time() - start) * 1000)
output = response.content[0].text
tokens = response.usage.input_tokens + response.usage.output_tokens
result = message.make_response(output)
result.metadata.tokens_used = tokens
result.metadata.duration_ms = duration_ms
return result
handler.__name__ = name
return handler
# ─── Multi-Agent System with Typed Communication ──────────────────────────────
async def run_typed_pipeline(task: str) -> dict:
"""
A 3-agent pipeline using typed message passing.
Researcher → Writer → Critic
"""
router = AgentRouter(max_retries=2)
# Register agents
router.register("researcher", make_llm_agent(
"researcher",
"You are a research agent. Given a topic, return structured research findings: "
"key facts, definitions, and key insights. Be specific and factual."
))
router.register("writer", make_llm_agent(
"writer",
"You are a technical writer. Given research findings, write a clear, "
"well-structured explanation suitable for engineers. Use examples."
))
router.register("critic", make_llm_agent(
"critic",
"You are an editorial critic. Review content for accuracy, clarity, "
"and completeness. Output: VERDICT (APPROVE/REVISE) + specific feedback."
))
conversation_id = str(uuid.uuid4())[:8]
# Step 1: Research
research_request = AgentMessage(
message_type=MessageType.TASK_REQUEST,
sender="orchestrator",
recipient="researcher",
content=f"Research this topic: {task}",
conversation_id=conversation_id
)
print(f"\n[Pipeline] Sending to researcher (msg={research_request.message_id})")
research_response = await router.send(research_request)
if research_response.status == MessageStatus.FAILED:
return {"success": False, "error": research_response.error_detail}
# Step 2: Write (using research output)
write_request = AgentMessage(
message_type=MessageType.TASK_REQUEST,
sender="orchestrator",
recipient="writer",
content=f"Write about: {task}\n\nResearch:\n{research_response.content}",
conversation_id=conversation_id,
task_id=research_request.task_id # same task, different step
)
print(f"[Pipeline] Sending to writer (msg={write_request.message_id})")
write_response = await router.send(write_request)
if write_response.status == MessageStatus.FAILED:
return {"success": False, "error": write_response.error_detail}
# Step 3: Critique
critique_request = AgentMessage(
message_type=MessageType.TASK_REQUEST,
sender="orchestrator",
recipient="critic",
content=f"Task: {task}\n\nContent to review:\n{write_response.content}",
conversation_id=conversation_id
)
print(f"[Pipeline] Sending to critic (msg={critique_request.message_id})")
critique_response = await router.send(critique_request)
router.print_trace()
total_tokens = sum([
research_response.metadata.tokens_used,
write_response.metadata.tokens_used,
critique_response.metadata.tokens_used
])
return {
"success": True,
"research": research_response.content,
"draft": write_response.content,
"critique": critique_response.content,
"total_tokens": total_tokens,
"conversation_id": conversation_id,
"trace_events": len(router.get_trace())
}
# ─── Usage ────────────────────────────────────────────────────────────────────
async def main():
task = "Explain how vector embeddings work in RAG systems"
result = await run_typed_pipeline(task)
if result["success"]:
print(f"\nConversation: {result['conversation_id']}")
print(f"Tokens used: {result['total_tokens']}")
print(f"Trace events: {result['trace_events']}")
print(f"\n--- CRITIQUE ---\n{result['critique']}")
else:
print(f"Pipeline failed: {result['error']}")
if __name__ == "__main__":
asyncio.run(main())
Error Propagation
How errors travel through a multi-agent system determines how robust the system is.
Propagation Models
Fail-fast: Error in any agent immediately aborts the pipeline. Simplest to implement. Appropriate when intermediate results have no value.
Partial completion: Pipeline continues past failures, skipping failed steps. Final output notes what's missing. Appropriate when partial results are useful.
Error isolation: Each agent's failure is contained; other agents continue with fallback input. Most complex, most resilient.
def handle_agent_error(
error_response: AgentMessage,
fallback_value: Optional[str] = None,
required: bool = True
) -> tuple[Optional[str], bool]:
"""
Handle an agent error message.
Returns (value, success) tuple.
"""
if error_response.status == MessageStatus.FAILED:
if required:
raise RuntimeError(f"Required agent failed: {error_response.error_detail}")
print(f"[Warning] Optional agent failed: {error_response.error_detail}")
return fallback_value, False
return error_response.content, True
Message Tracing
Every production multi-agent system needs message tracing - the ability to reconstruct the complete sequence of messages for any given task.
@dataclass
class MessageTrace:
conversation_id: str
messages: list[AgentMessage] = field(default_factory=list)
def add(self, msg: AgentMessage):
self.messages.append(msg)
def replay(self) -> str:
"""Human-readable trace for debugging."""
lines = [f"Conversation: {self.conversation_id}"]
for i, msg in enumerate(self.messages):
lines.append(
f" [{i+1}] {msg.sender:15s} → {msg.recipient:15s} "
f"| {msg.message_type.value:20s} "
f"| {msg.status.value:10s} "
f"| tokens={msg.metadata.tokens_used}"
)
return "\n".join(lines)
Communication Patterns Comparison
Production Notes
Message IDs everywhere: Generate a unique ID for every message, every agent call, every pipeline run. Without IDs, debugging is nearly impossible.
Correlation IDs: When Agent B responds to Agent A's request, the response must carry the request's message ID as a correlation ID. This enables tracing which responses correspond to which requests in async systems.
Schema versioning: As your system evolves, message schemas change. Include a schema_version field in every message. Handle schema mismatches gracefully - don't crash because an agent running v1.2 sent a message to an agent expecting v1.3.
Dead letter queues: In queue-based systems, messages that consistently fail delivery should go to a dead letter queue for manual inspection, not silent discard.
:::warning Silent Failures in String-Based Communication The most insidious failure mode in multi-agent systems is silent failure: an agent returns an empty string, a malformed response, or a canned error message - and the next agent treats it as valid input. Always validate agent output before passing it downstream. At minimum: check length, check format, check for error indicators in the content. :::
:::danger Schema Drift Across Agent Versions When you update one agent's output format, every downstream agent that consumes that output must be updated simultaneously. In production systems with independent agent deployments, schema drift causes cascading failures. Use explicit versioning in message schemas and graceful fallback for version mismatches. :::
Interview Q&A
Q: What's the difference between synchronous and asynchronous agent communication, and when would you choose each?
A: Synchronous communication blocks the calling agent until the recipient responds - simple, predictable, but serializes execution. Asynchronous communication lets the caller continue other work while waiting - enables parallelism but requires correlation of requests and responses. Choose sync for sequential pipelines where each step depends on the previous. Choose async for fan-out patterns where multiple agents can work simultaneously, or when you need to issue multiple requests and aggregate results.
Q: How do you prevent schema drift in a multi-agent system?
A: Schema versioning in every message, schema validation at ingestion (not just at generation), and integration tests that check cross-agent compatibility. Treat agent message schemas like API contracts - version them, document breaking changes, and maintain backward compatibility during transitions. In Python, Pydantic models with explicit version fields are the practical standard.
Q: How do you trace a request through multiple agents?
A: Assign a conversation_id at the pipeline entry point and propagate it through every message in that pipeline. Every message carries the conversation_id and its own message_id. Responses carry a correlation_id pointing to the request that triggered them. With this structure, you can reconstruct the complete message graph for any conversation_id from logs.
Q: What is the dead letter queue pattern and why does it matter for agents?
A: A dead letter queue collects messages that fail delivery after all retry attempts. Without it, failed messages are silently dropped - you never know a failure occurred. In agent systems, messages fail for many reasons: agent crashes, schema validation errors, timeout, output quality failure. Routing these to a dead letter queue with full context enables post-hoc investigation, manual retry, and alerting on elevated failure rates.
Q: How do you design message schemas that are forward compatible?
A: Use optional fields with defaults for new additions, never remove required fields in minor versions, and include a schema_version field. When an agent receives a message with a higher schema version, it should handle known fields and ignore unknown fields rather than rejecting the message. For breaking schema changes, bump a major version and run old/new versions in parallel during the transition period.
