:::tip 🎮 Interactive Playground Visualize this concept: Try the Event-Driven ML Architecture demo on the EngineersOfAI Playground - no code required. :::
Event-Driven Architecture for ML
The Recommendation System That Could Not Be Debugged
The recommendation team had a bug that drove them crazy for three weeks. The recommendation engine was sometimes returning items that users had already purchased - a particularly bad user experience. The fix seemed obvious: filter out purchased items. But the bug kept appearing intermittently.
The system architecture was request-response: user arrives → fetch user history from database → fetch candidate items → run recommendation model → filter out purchased items → return top-10. The database contained purchase history. The model was fine.
But the purchase history database had an issue: it was updated asynchronously from the payment service, with an unpredictable lag of 0.5 to 45 seconds. During that window, the recommendation engine did not know about the just-completed purchase. When a user bought item A and immediately browsed, item A could appear in their recommendations.
The real fix was architectural: switch to an event-driven design where a "purchase_completed" event was immediately published to Kafka when a purchase succeeded. The recommendation system consumed these events and maintained its own read model of user purchase history, updated in real-time. No database lag. No eventual consistency window from the payment service perspective.
More importantly: the event log became a complete, immutable, replayable record of every user action. When a new bug appeared - recommendations ignoring user preferences set earlier in the session - the team replayed the event log through a local copy of the system to reproduce the exact conditions. They found the bug in 45 minutes instead of three weeks.
Event-driven architecture did not just fix the bug. It made the entire system debuggable.
Why This Exists - The Coupling Problem
Traditional ML systems have a tight coupling problem. The model needs features. Features come from databases. Databases are written by other services. Those services are owned by other teams. Changes in upstream data sources break the ML pipeline silently: not with an error, but with wrong predictions.
Event-driven architecture decouples these dependencies by making the communication channel explicit and durable. Instead of "recommendation system reads from purchase database" (tight coupling, hidden dependency), you have "payment service publishes purchase events" (explicit, decoupled, observable). The recommendation system owns its own materialized view of purchases, derived from the event log. If the payment service is slow or down, the event log is still there. If the schema changes, the event log captures the change history.
For ML systems specifically, event-driven architecture solves three problems:
- Real-time feature updates: feature values update immediately when their source events arrive, not on the next database read
- Training/serving consistency: both training pipelines and serving systems read from the same event log, ensuring identical feature computation
- Debuggability: every state change is a recorded event; you can replay history to reproduce any system state at any past moment
Historical Context
Event sourcing as a pattern was formalized by Martin Fowler (2005) and Greg Young (around 2010). The core idea: instead of storing current state, store the sequence of events that produced that state. Current state = replay of all events.
CQRS (Command Query Responsibility Segregation, also Greg Young and Udi Dahan, 2010) separated write operations (commands that change state) from read operations (queries that return state). In ML contexts, commands are model updates and user actions; queries are feature lookups and prediction requests. Separating these allows each to scale independently.
Apache Kafka became the event log backbone for most large-scale implementations. LinkedIn published their data infrastructure architecture in 2011, describing Kafka as the system of record for all data changes. Jay Kreps formalized this in "The Log" (2013), arguing that the append-only event log is the fundamental abstraction for distributed systems.
For ML specifically, the event-driven pattern gained traction as teams struggled with training/serving skew - models trained on batch features (computed from database snapshots) performing differently in production (where features were computed differently from live requests). By deriving all features from the same event log, the skew disappeared.
CQRS Pattern for ML Systems
CQRS separates the write side (commands) from the read side (queries) with an event log in between:
The write side produces events. Event processors consume events and update read models. The ML model queries read models at prediction time. No direct coupling between the model and upstream data sources.
Event Schema Design for ML
Event schema design is critical for ML feature quality. Bad schemas lead to feature drift and debugging nightmares.
# event_schema.py - well-designed event schemas for ML pipelines
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from datetime import datetime
import json
import uuid
@dataclass
class MLEvent:
"""
Base class for all ML-relevant domain events.
Follows cloudevents spec: https://cloudevents.io
"""
# Event identification
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = "" # "user.purchase_completed", "model.prediction_served"
event_version: str = "1.0" # schema version - increment on breaking changes
# Timing - critical for temporal features
event_time: float = field(default_factory=lambda: datetime.utcnow().timestamp())
ingestion_time: float = field(default_factory=lambda: datetime.utcnow().timestamp())
# Source context
source_service: str = ""
source_instance: str = ""
# Entity identifiers - always present for feature lookups
user_id: Optional[str] = None
session_id: Optional[str] = None
# Event-specific payload
payload: Dict[str, Any] = field(default_factory=dict)
# ML metadata
experiment_context: Optional[Dict] = None # active A/B experiments
def to_kafka_message(self) -> bytes:
return json.dumps({
"event_id": self.event_id,
"event_type": self.event_type,
"event_version": self.event_version,
"event_time": self.event_time,
"ingestion_time": self.ingestion_time,
"source_service": self.source_service,
"user_id": self.user_id,
"session_id": self.session_id,
"payload": self.payload,
}).encode()
@classmethod
def from_kafka_message(cls, message_bytes: bytes) -> "MLEvent":
data = json.loads(message_bytes.decode())
event = cls()
for field_name, value in data.items():
if hasattr(event, field_name):
setattr(event, field_name, value)
return event
# Domain-specific events with typed payloads
@dataclass
class PurchaseCompleted(MLEvent):
event_type: str = "ecommerce.purchase_completed"
@classmethod
def create(
cls,
user_id: str,
session_id: str,
item_id: str,
price: float,
category: str
) -> "PurchaseCompleted":
event = cls()
event.user_id = user_id
event.session_id = session_id
event.source_service = "payment-service"
event.payload = {
"item_id": item_id,
"price": price,
"category": category,
"payment_method": "card",
}
return event
@dataclass
class RecommendationServed(MLEvent):
"""Records what recommendation was shown - critical for training labels."""
event_type: str = "recommendation.items_shown"
@classmethod
def create(
cls,
user_id: str,
session_id: str,
recommended_items: list,
model_version: str,
features_snapshot: dict # snapshot of features used for this prediction
) -> "RecommendationServed":
event = cls()
event.user_id = user_id
event.session_id = session_id
event.source_service = "recommendation-service"
event.payload = {
"recommended_items": recommended_items,
"model_version": model_version,
# Storing features snapshot enables offline evaluation
# "what accuracy would model X have on this exact request?"
"features_snapshot": features_snapshot,
"impression_id": str(uuid.uuid4()),
}
return event
Event Sourcing for ML State
Event sourcing materializes current state by replaying the event log. For an ML recommendation system, the current state of "what has this user interacted with?" is derived from all interaction events:
# event_sourcing.py - materializing ML feature state from events
import json
from collections import defaultdict
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
@dataclass
class UserMLState:
"""ML-relevant state for a user, derived from event log."""
user_id: str
purchased_items: Set[str] = field(default_factory=set)
viewed_items: List[str] = field(default_factory=list) # ordered by time
category_interests: Dict[str, float] = field(default_factory=dict)
session_start_time: Optional[float] = None
last_active_time: Optional[float] = None
total_spend: float = 0.0
def apply_event(self, event: dict):
"""Update state by applying a domain event."""
event_type = event["event_type"]
payload = event.get("payload", {})
event_time = event.get("event_time", 0)
if event_type == "ecommerce.purchase_completed":
item_id = payload["item_id"]
self.purchased_items.add(item_id)
self.total_spend += payload.get("price", 0)
# Increase interest in this category
category = payload.get("category", "unknown")
self.category_interests[category] = (
self.category_interests.get(category, 0) * 0.9 + 1.0
)
self.last_active_time = event_time
elif event_type == "ecommerce.item_viewed":
item_id = payload["item_id"]
self.viewed_items.append(item_id)
if len(self.viewed_items) > 100:
self.viewed_items = self.viewed_items[-100:]
category = payload.get("category", "unknown")
self.category_interests[category] = (
self.category_interests.get(category, 0) * 0.95 + 0.3
)
self.last_active_time = event_time
elif event_type == "session.started":
self.session_start_time = event_time
def to_ml_features(self) -> Dict:
"""Convert state to ML feature vector."""
return {
"n_purchased_items": len(self.purchased_items),
"n_viewed_items": len(self.viewed_items),
"total_spend": self.total_spend,
"top_categories": sorted(
self.category_interests.items(),
key=lambda x: x[1], reverse=True
)[:5],
"session_active": self.session_start_time is not None,
}
class EventSourcedFeatureStore:
"""
Materializes user feature states from an event log.
Supports both live updates and historical replay.
"""
def __init__(self, redis_client):
self.redis = redis_client
# In-memory cache for hot users
self._cache: Dict[str, UserMLState] = {}
async def process_event(self, event: dict):
"""Apply a new event to the user's materialized state."""
user_id = event.get("user_id")
if not user_id:
return
# Load existing state
state = await self._load_state(user_id)
# Apply event
state.apply_event(event)
# Persist updated state
await self._save_state(user_id, state)
async def get_features(self, user_id: str) -> Dict:
"""Get current ML features for user."""
state = await self._load_state(user_id)
return state.to_ml_features()
async def rebuild_from_log(
self, user_id: str, event_log: List[dict]
) -> UserMLState:
"""
Rebuild state by replaying event log.
Used for: debugging, backfill after schema change, testing.
"""
state = UserMLState(user_id=user_id)
# Sort by event_time to replay in chronological order
sorted_events = sorted(event_log, key=lambda e: e.get("event_time", 0))
for event in sorted_events:
state.apply_event(event)
return state
async def _load_state(self, user_id: str) -> UserMLState:
if user_id in self._cache:
return self._cache[user_id]
raw = await self.redis.get(f"user_state:{user_id}")
if raw:
data = json.loads(raw)
state = UserMLState(user_id=user_id)
state.purchased_items = set(data.get("purchased_items", []))
state.viewed_items = data.get("viewed_items", [])
state.category_interests = data.get("category_interests", {})
state.total_spend = data.get("total_spend", 0.0)
state.last_active_time = data.get("last_active_time")
return state
return UserMLState(user_id=user_id)
async def _save_state(self, user_id: str, state: UserMLState):
data = {
"purchased_items": list(state.purchased_items),
"viewed_items": state.viewed_items,
"category_interests": state.category_interests,
"total_spend": state.total_spend,
"last_active_time": state.last_active_time,
}
await self.redis.setex(
f"user_state:{user_id}",
86400 * 30, # 30 day TTL
json.dumps(data)
)
self._cache[user_id] = state
Dead Letter Queues for Failed Inference
When inference fails (model error, malformed input, timeout), the event must not be silently dropped. Dead letter queues (DLQs) capture failed events for later inspection and retry.
# dead_letter_queue.py - DLQ for failed inference events
import json
import time
from dataclasses import dataclass, field
from aiokafka import AIOKafkaProducer
@dataclass
class FailedEvent:
"""An event that failed processing, with failure metadata."""
original_topic: str
original_partition: int
original_offset: int
event: dict
failure_reason: str
failure_timestamp: float = field(default_factory=time.time)
retry_count: int = 0
max_retries: int = 3
@property
def should_retry(self) -> bool:
return self.retry_count < self.max_retries
@property
def is_retryable_error(self) -> bool:
"""Some errors should not be retried (malformed input vs transient failure)."""
retryable_errors = ["timeout", "model_unavailable", "gpu_oom"]
non_retryable = ["invalid_schema", "missing_required_field"]
return any(e in self.failure_reason.lower() for e in retryable_errors) and \
not any(e in self.failure_reason.lower() for e in non_retryable)
def to_kafka_message(self) -> bytes:
return json.dumps({
"original_topic": self.original_topic,
"original_offset": self.original_offset,
"event": self.event,
"failure_reason": self.failure_reason,
"failure_timestamp": self.failure_timestamp,
"retry_count": self.retry_count,
}).encode()
class DLQHandler:
"""Handles failed inference events - routes to DLQ or retries."""
def __init__(self, kafka_producer: AIOKafkaProducer, dlq_topic: str):
self.producer = kafka_producer
self.dlq_topic = dlq_topic
async def handle_failure(
self,
failed_event: FailedEvent,
input_topic: str
):
"""Route failed event to DLQ or re-queue for retry."""
if failed_event.should_retry and failed_event.is_retryable_error:
# Back off exponentially before retry
retry_delay = min(2 ** failed_event.retry_count, 60) # max 60s
print(f"Retrying event after {retry_delay}s (attempt {failed_event.retry_count + 1})")
failed_event.retry_count += 1
await self.producer.send(
input_topic,
value=json.dumps(failed_event.event).encode(),
)
else:
# Final failure - send to DLQ for human inspection
await self.producer.send(
self.dlq_topic,
value=failed_event.to_kafka_message()
)
# Alert if DLQ rate is high
dlq_events_total.labels(
reason=failed_event.failure_reason
).inc()
class EventReplayDebugger:
"""
Replays events from Kafka log to reproduce historical system states.
Essential for debugging production issues.
"""
def __init__(self, kafka_bootstrap: str):
self.kafka_bootstrap = kafka_bootstrap
async def replay_for_user(
self,
user_id: str,
from_timestamp: float,
to_timestamp: float,
topic: str
) -> list:
"""
Fetch all events for a user in a time range.
Enables "what did the system see at time T?" debugging.
"""
from aiokafka import AIOKafkaConsumer
from aiokafka.structs import TopicPartition
consumer = AIOKafkaConsumer(
bootstrap_servers=self.kafka_bootstrap,
group_id=f"replay-debug-{int(time.time())}", # unique group to read from beginning
auto_offset_reset="earliest",
)
await consumer.start()
# Seek to the approximate time
partitions = consumer.partitions_for_topic(topic)
offsets_for_times = {}
for partition in partitions:
tp = TopicPartition(topic, partition)
offsets_for_times[tp] = int(from_timestamp * 1000) # Kafka uses milliseconds
start_offsets = await consumer.offsets_for_times(offsets_for_times)
for tp, offset_info in start_offsets.items():
if offset_info:
consumer.seek(tp, offset_info.offset)
events = []
async for message in consumer:
event = json.loads(message.value.decode())
# Filter by user and time range
if (event.get("user_id") == user_id and
from_timestamp <= event.get("event_time", 0) <= to_timestamp):
events.append(event)
if message.timestamp / 1000 > to_timestamp:
break
await consumer.stop()
return sorted(events, key=lambda e: e.get("event_time", 0))
Consumer Group Patterns for ML Pipelines
Different ML consumers can read the same event stream independently, each maintaining their own offset position:
# consumer_groups.py - multiple independent consumers on same event log
# Each consumer group processes the full event stream independently
CONSUMER_GROUPS = {
# Feature materialization: maintains user/item feature state in Redis
"feature-materializer": {
"topic": "user-events",
"purpose": "Update Redis feature store",
"lag_tolerance_seconds": 5, # features must be fresh within 5s
},
# Online learning: updates fraud model weights continuously
"fraud-online-learner": {
"topic": "labeled-transactions",
"purpose": "Incremental model updates",
"lag_tolerance_seconds": 60, # can be slightly behind
},
# Audit logger: writes all events to data warehouse for compliance
"audit-logger": {
"topic": "user-events",
"purpose": "Compliance and audit trail",
"lag_tolerance_seconds": 3600, # can be up to 1 hour behind
},
# Training data generator: batches events for offline model retraining
"training-data-gen": {
"topic": "user-events",
"purpose": "Generate training dataset for next model version",
"lag_tolerance_seconds": 86400, # daily batch - can be 1 day behind
}
}
# Key insight: each consumer group has its own performance and lag tolerance.
# A spike in audit-logger throughput does not affect feature-materializer -
# they are completely independent consumers on the same data.
Production Engineering Notes
Schema Evolution Without Breaking Consumers
Event schemas must evolve as ML features change. Safe evolution rules (compatible with existing consumers):
- Add new optional fields: existing consumers ignore unknown fields; new consumers use them
- Never rename fields: renaming is a breaking change - consumers look for old field name
- Never remove required fields: consumers that depend on them will fail
- Increment schema version: allows consumers to handle multiple versions explicitly
# schema_evolution.py - backward-compatible schema evolution
def parse_purchase_event(event: dict, schema_version: str) -> dict:
"""Parse purchase event handling multiple schema versions."""
if schema_version == "1.0":
return {
"item_id": event["item_id"],
"price": event["amount"], # v1.0 used "amount"
"category": "unknown", # not in v1.0
}
elif schema_version >= "2.0":
return {
"item_id": event["item_id"],
"price": event["price"], # v2.0 uses "price"
"category": event.get("category", "unknown"),
}
Common Mistakes
:::danger Event Schema Without Timestamps An event schema without event_time is useless for ML. Every event must carry the timestamp of when it occurred in the real world (not when it was processed or when it arrived in Kafka). Without event timestamps: (a) temporal features cannot be computed correctly; (b) event replay produces wrong results (events played back in processing order, not occurrence order); (c) training/serving skew is guaranteed (training uses event order, serving uses processing order). Make event_time a required field in every event schema, validated at publish time. :::
:::danger Not Versioning Event Schemas Publishing events to Kafka without schema versioning makes it impossible to evolve the schema without coordinating all consumers simultaneously. Use a schema registry (Confluent Schema Registry, AWS Glue Schema Registry) to register event schemas and enforce backward compatibility. Every event should carry its schema version. Consumers can gracefully handle multiple versions. Attempting to evolve an unversioned schema requires simultaneously deploying all consumer changes - operationally complex and error-prone. :::
:::warning Event Log Not Being the Source of Truth A common mistake: events are published to Kafka but the "real" source of truth is still the database. The database is written first; then an event is published (or worse, a change-data-capture job publishes events from the database change log). This means the event log is derived, not primary - it is subject to the database's consistency properties. If you want event sourcing benefits (replay, debugging, consistency), the event log must be the primary write path. The database is a derived read model, updated by consuming events. Not the other way around. :::
Interview Q&A
Q1: What is CQRS and how does it apply to ML feature pipelines?
A: CQRS (Command Query Responsibility Segregation) separates write operations from read operations, using an event log as the communication channel between them. In ML feature pipelines: the write side consists of domain events (user purchases, clicks, logins) published to Kafka by source services. Event processors consume these events and materialize read models (feature stores in Redis, aggregated statistics) that the ML model can query. The ML model's inference path reads from these pre-materialized feature stores - never writing to or blocking on the event stream. Benefits for ML: (1) The model reads fast, pre-computed features rather than doing expensive joins at query time; (2) Features can be updated independently of the model (add a new feature processor without changing the model); (3) Training and serving both derive features from the same event log, eliminating training/serving skew; (4) The system degrades gracefully when feature processors are slow - stale features in Redis are used rather than blocking inference.
Q2: What is event sourcing and why is it valuable for debugging ML systems?
A: Event sourcing stores the complete sequence of events that produced the current system state, rather than storing only current state. For ML debugging, this means: every feature value change has a corresponding event that caused it; every model prediction has a snapshot of the features at that moment; the entire history is immutable and replayable. When a production bug is discovered - "the model recommended item X to user Y who had just purchased X" - you can replay the event log for that user at that time and see exactly what state the feature store had, what the model saw, and which event was missing or delayed. Without event sourcing, debugging requires reconstructing system state from database backups, log files, and memory - a process that takes days or weeks and often cannot definitively answer "what exactly happened?"
Q3: How do dead letter queues work in ML inference pipelines and what information should they capture?
A: A dead letter queue (DLQ) receives events that failed processing after all retries are exhausted. In ML inference pipelines, events fail for two categories of reasons: retryable (model server down, timeout, GPU OOM - temporary issues) and non-retryable (malformed input, schema mismatch, missing required feature - persistent issues). The DLQ should capture: the original event payload, the failure reason and error message, the number of retries attempted, the timestamps of each attempt, and the source topic/partition/offset for replay. DLQ consumers can: alert on high DLQ rate (indicates systemic issue), replay events after the underlying issue is fixed, route non-retryable events to a human review queue for data quality investigation. Monitor DLQ depth as a primary health metric - events accumulating in the DLQ mean inference outcomes are missing for those events.
Q4: How does event replay enable testing ML pipeline changes safely?
A: Event replay allows you to run a new ML pipeline version against historical events from production and compare outputs with the current pipeline. For a recommendation system: replay 1 million user interaction events from last Tuesday through both the current and new recommendation model. Compare predictions - do they diverge significantly for certain user segments? Did the new model correctly filter purchased items that the old one missed? This catches regressions before deployment without affecting production users. It also validates feature engineering changes: if you add a new windowed feature, replay historical events and verify the feature computes to the expected values. Event replay requires: (a) events stored in Kafka with sufficient retention (7-30 days typical); (b) seeking to a specific timestamp offset; (c) deterministic feature computation (same events → same features every time); (d) isolated replay environment that does not affect production state.
Q5: What are the key design decisions for Kafka event schemas in ML pipelines?
A: Five critical decisions: (1) Always include event_time (occurrence time) and ingestion_time (processing time) - separately. Temporal features and late event handling require both. (2) Include entity identifiers consistently (user_id, session_id, item_id) as top-level fields, not buried in payload - consumers key/filter on these without deserializing the full payload. (3) Schema version as a top-level field - enables graceful migration when schemas evolve. Use additive changes (new optional fields) to maintain backward compatibility. (4) Include causality context - what triggered this event? (user action vs system action vs ML model action). This prevents feedback loops where model predictions cause events that retrain the model in a circular way. (5) Immutable identity - the event_id is globally unique and set at creation time. Downstream systems use it for deduplication (exactly-once semantics rely on idempotent event IDs). Never reuse event IDs, even for retry attempts.
