:::tip 🎮 Interactive Playground Visualize this concept: Try the Event-Driven ML Architecture demo on the EngineersOfAI Playground - no code required. :::
Event Sourcing for ML Systems
The Audit Trail Problem
A financial institution's credit scoring model made a decision in October: deny a loan application. The applicant appeals six months later. By April, the model has been retrained twice, the feature engineering pipeline has been updated, and the team that built the original model has partially turned over. The compliance team needs to answer one question: what features did the model see when it made that decision in October, and were they computed correctly?
In a conventional system, this question is unanswerable. The model was replaced. The feature values at that instant were never persisted - only the current state of each feature is stored. The PostgreSQL row for this user shows their current credit utilization, not what it was in October. You can reconstruct the model's decision approximately, but not exactly. For a loan denial under the Equal Credit Opportunity Act, "approximately" is not good enough.
Event sourcing solves this problem by changing the fundamental storage primitive. Instead of storing the current state of each entity, you store every event that ever changed that entity's state. The current state is derived from the event history by replaying events forward. You never lose history. You can reconstruct the exact state of any entity at any point in time by replaying events up to that moment.
For ML systems, event sourcing is more than an audit mechanism. It is the foundation for a class of features - behavioral history, sequence models, temporal aggregations - that simply cannot be computed correctly without a complete, immutable record of what happened and when. It is also the key to the "time travel" capability: train your model on features computed as of time T, not as of today, which is essential for avoiding data leakage in temporal datasets.
Why This Exists: State vs Events
The State-Based Model (How Most Systems Work)
In a conventional database-backed system, each entity has one row that represents its current state. When a user's email changes, you UPDATE users SET email = '[email protected]' WHERE id = 123. The old email is gone. When a product's price changes, you overwrite the price column. The history is gone.
This is the "current state" model. It is simple, fast for reads, and sufficient for many applications. It fails ML systems in three specific ways:
-
No history for feature computation: Your recommendation model wants to know "how many items has this user viewed in the last 30 days?" You cannot answer this from a row showing current state. You need the event history.
-
No time travel for label-free validation: When you retrain your model, you want to compute features "as of" the time of each training example, not as of today. If a user purchased something in January and you are computing their behavioral features for that training example in March, you should see their January behavior, not their March behavior. Without an event log, this is impossible.
-
No audit trail for compliance: When a model makes a high-stakes decision, you need to be able to reconstruct exactly what it saw. A mutable state database cannot provide this.
The Event-Sourced Model
In an event-sourced system, you store events, not state. Every change to an entity is recorded as an immutable event with a timestamp, an entity ID, an event type, and a payload.
event_id: "evt_a1b2c3"
entity_type: "user"
entity_id: "user_12345"
event_type: "item_viewed"
payload: {"item_id": "item_789", "category": "electronics", "price_usd": 299}
occurred_at: "2024-01-15T14:32:11.123Z"
The current state of a user - their view count, last viewed category, total spend - is derived by replaying all their events from the beginning. This derived representation is called a projection.
The Event Log as the Single Source of Truth
The event log is the canonical truth. Every other representation - the feature store, the analytics database, the audit trail - is a derived view of the event log. This means:
- Any projection can be rebuilt by replaying the event log from the beginning
- Multiple projections can coexist with different schemas optimized for different query patterns
- New projections can be added retroactively - replay all events through the new projection logic to populate a new store
- Bugs in projections are recoverable - fix the projection code, delete the incorrect projection, replay from the event log
This is why Kafka's long-retention (or infinite-retention, using tiered storage) is so important for event-sourced ML systems. Your event log must be authoritative and permanent.
Kafka as an Event Store for ML
Kafka is not just a message queue for ML systems. With appropriate configuration, it functions as a durable, replayable event store.
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import uuid
from datetime import datetime, timezone
from typing import Optional
class MLEventStore:
"""
Kafka-backed event store for ML systems.
Provides: durable event storage, replayable history, topic-per-entity-type.
"""
TOPICS = {
"user_events": {
"partitions": 64,
"replication_factor": 3,
"retention_ms": -1, # infinite retention
"compression_type": "lz4",
},
"item_events": {
"partitions": 32,
"replication_factor": 3,
"retention_ms": -1,
},
"label_events": {
"partitions": 16,
"replication_factor": 3,
"retention_ms": -1,
},
}
def __init__(self, bootstrap_servers: list):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
acks="all", # wait for all replicas to acknowledge
retries=3,
compression_type="lz4",
linger_ms=5, # small batching for throughput
batch_size=32768,
)
def publish_event(
self,
topic: str,
entity_id: str,
event_type: str,
payload: dict,
occurred_at: Optional[datetime] = None,
) -> str:
"""
Publish an immutable event to the event log.
entity_id is used as the Kafka key - ensures all events
for the same entity go to the same partition (ordering guarantee).
"""
event_id = str(uuid.uuid4())
ts = occurred_at or datetime.now(timezone.utc)
event = {
"event_id": event_id,
"entity_id": entity_id,
"event_type": event_type,
"payload": payload,
"occurred_at": ts.isoformat(),
"schema_version": "v1",
}
future = self.producer.send(
topic,
key=entity_id, # partition by entity for ordering
value=event,
)
# Block to confirm delivery (remove in hot path - use fire-and-forget)
record_metadata = future.get(timeout=10)
return event_id
def publish_user_event(
self,
user_id: str,
event_type: str,
payload: dict,
) -> str:
return self.publish_event("user_events", user_id, event_type, payload)
def publish_label_event(
self,
entity_id: str,
label_type: str,
label_value,
reference_event_id: str,
occurred_at: Optional[datetime] = None,
) -> str:
"""
Publish a label event (ground truth signal).
reference_event_id links the label back to the original event
that generated the prediction.
"""
payload = {
"label_type": label_type,
"label_value": label_value,
"reference_event_id": reference_event_id,
}
return self.publish_event(
"label_events", entity_id, "label_received", payload, occurred_at
)
Event Schema Design for ML
Designing event schemas for ML is different from designing them for application state. The schema must capture everything the model might ever need as a feature, even if you do not know exactly what features you will compute today.
Three Event Categories
Entity events capture state transitions. When a user's subscription changes from free to paid, record the full before-and-after state. This allows you to compute features like "was the user a paid subscriber at time T?"
Interaction events capture user actions. These are the most important for behavioral feature computation. Include everything relevant: user context (device, location, time of day), item context (position in list, price, category), and action details.
Label events capture ground truth signals. These are often delayed - a chargeback (fraud signal) arrives days after the transaction. Event sourcing handles delayed labels naturally because the label event carries its own occurred_at timestamp and a reference to the original event it labels.
Schema Example: Interaction Event
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Optional
import json
@dataclass
class ItemViewedEvent:
"""
Schema for an item view event.
Designed to capture all context needed for future feature computation.
"""
# Entity identifiers
user_id: str
item_id: str
session_id: str
# Action context
position_in_list: int # where was item shown?
recommendation_source: str # "collaborative_filter", "trending", "search"
page_type: str # "home", "search_results", "item_detail"
# Item context at view time (snapshot - item may change later)
item_price_usd: float
item_category: str
item_is_on_sale: bool
# User context at view time
user_session_view_count: int # how many items viewed this session
user_device_type: str # "mobile", "desktop", "tablet"
user_country_code: str
# Temporal
occurred_at: str = None # ISO 8601 UTC
def __post_init__(self):
if self.occurred_at is None:
self.occurred_at = datetime.now(timezone.utc).isoformat()
def to_event_payload(self) -> dict:
return asdict(self)
# Publishing an event
def record_item_view(
event_store: "MLEventStore",
user_id: str,
item_id: str,
**context,
) -> str:
event = ItemViewedEvent(
user_id=user_id,
item_id=item_id,
**context,
)
return event_store.publish_user_event(
user_id=user_id,
event_type="item_viewed",
payload=event.to_event_payload(),
)
Projections: Deriving Current State from Event History
A projection is a function that transforms a stream of events into a read-optimized data store. Each projection serves one specific query pattern.
from kafka import KafkaConsumer
import redis
import json
from collections import defaultdict
from datetime import datetime, timezone
class UserFeatureProjection:
"""
Projection: user_events -> Redis feature store.
Maintains real-time aggregates for each user:
- views_last_7d: item view count in the last 7 days
- purchases_last_30d: purchase count in the last 30 days
- favorite_category: most viewed category ever
"""
def __init__(self, kafka_brokers: list, redis_host: str = "localhost"):
self.consumer = KafkaConsumer(
"user_events",
bootstrap_servers=kafka_brokers,
group_id="user_feature_projection_v2",
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset="earliest", # replay from beginning
)
self.redis = redis.Redis(host=redis_host, decode_responses=True)
def apply_event(self, event: dict) -> None:
"""
Update the projection for one event.
This function must be idempotent - it may be called multiple times
for the same event during replay.
"""
entity_id = event["entity_id"]
event_type = event["event_type"]
payload = event["payload"]
occurred_at = datetime.fromisoformat(event["occurred_at"])
if event_type == "item_viewed":
self._handle_view(entity_id, payload, occurred_at)
elif event_type == "item_purchased":
self._handle_purchase(entity_id, payload, occurred_at)
def _handle_view(self, user_id: str, payload: dict, ts: datetime) -> None:
key_prefix = f"user_features:{user_id}"
pipe = self.redis.pipeline()
# Sorted set: member=item_id, score=unix_timestamp
# We can query by time range using ZRANGEBYSCORE
pipe.zadd(
f"{key_prefix}:view_history",
{payload["item_id"]: ts.timestamp()},
)
# Keep only last 30 days in sorted set to bound memory
cutoff = (ts.timestamp() - 30 * 86400)
pipe.zremrangebyscore(f"{key_prefix}:view_history", "-inf", cutoff)
# Increment category counter (all time, for favorite category)
category = payload.get("item_category", "unknown")
pipe.zincrby(f"{key_prefix}:category_counts", 1, category)
pipe.execute()
def _handle_purchase(self, user_id: str, payload: dict, ts: datetime) -> None:
key_prefix = f"user_features:{user_id}"
self.redis.zadd(
f"{key_prefix}:purchase_history",
{payload["item_id"]: ts.timestamp()},
)
def get_features(self, user_id: str, as_of: datetime = None) -> dict:
"""
Compute features for a user, optionally as of a historical time.
as_of enables point-in-time correct feature computation.
"""
now = (as_of or datetime.now(timezone.utc)).timestamp()
key_prefix = f"user_features:{user_id}"
# Views in last 7 days
seven_days_ago = now - 7 * 86400
views_7d = self.redis.zcount(
f"{key_prefix}:view_history",
seven_days_ago,
now,
)
# Purchases in last 30 days
thirty_days_ago = now - 30 * 86400
purchases_30d = self.redis.zcount(
f"{key_prefix}:purchase_history",
thirty_days_ago,
now,
)
# Favorite category (top of sorted set)
fav_cats = self.redis.zrevrange(
f"{key_prefix}:category_counts", 0, 0
)
favorite_category = fav_cats[0] if fav_cats else "unknown"
return {
"views_last_7d": int(views_7d),
"purchases_last_30d": int(purchases_30d),
"favorite_category": favorite_category,
}
def run(self) -> None:
for message in self.consumer:
try:
self.apply_event(message.value)
except Exception as e:
print(f"[Projection] Error: {e}")
Temporal Queries: "What Did the Model See at Time T?"
This is the killer application of event sourcing for ML. When you are preparing training data, you need features computed "as of" the time of each training example - not as of today. This is called point-in-time correct feature computation, and it prevents the most insidious form of data leakage: training a model that has access to information it would not have had at the time of the prediction.
from datetime import datetime, timezone
from typing import Optional
class PointInTimeFeatureComputer:
"""
Compute features for training examples using only information
available at the time of the example.
This is what separates real event-sourced ML systems from systems
that accidentally leak future information into training data.
"""
def __init__(self, event_store_reader):
self.event_store = event_store_reader
def compute_user_features_at_time(
self,
user_id: str,
as_of: datetime,
) -> dict:
"""
Replay all user events up to `as_of` and compute features.
Only events that occurred BEFORE as_of are included.
"""
# Fetch all events for this user with timestamp <= as_of
events = self.event_store.get_events(
entity_id=user_id,
max_timestamp=as_of, # strict upper bound
)
views_30d = 0
purchases_30d = 0
category_counts: dict = {}
cutoff_30d = as_of.timestamp() - 30 * 86400
for event in events:
ts = datetime.fromisoformat(event["occurred_at"]).timestamp()
if event["event_type"] == "item_viewed":
if ts >= cutoff_30d:
views_30d += 1
category = event["payload"].get("item_category", "unknown")
category_counts[category] = category_counts.get(category, 0) + 1
elif event["event_type"] == "item_purchased":
if ts >= cutoff_30d:
purchases_30d += 1
favorite_category = (
max(category_counts, key=category_counts.get)
if category_counts else "unknown"
)
return {
"views_last_30d": views_30d,
"purchases_last_30d": purchases_30d,
"favorite_category": favorite_category,
"computed_as_of": as_of.isoformat(),
}
def build_training_dataset(
self,
examples: list, # [{"user_id": ..., "occurred_at": ..., "label": ...}]
) -> list:
"""
For each training example, compute features as of the example's time.
This ensures no future leakage.
"""
dataset = []
for example in examples:
as_of = datetime.fromisoformat(example["occurred_at"])
features = self.compute_user_features_at_time(
user_id=example["user_id"],
as_of=as_of,
)
dataset.append({
**features,
"label": example["label"],
"user_id": example["user_id"],
})
return dataset
Snapshots: Avoiding Full Replay
Replaying all events from the beginning is correct but slow. For a user with 5 years of behavioral history (tens of thousands of events), replaying everything to compute current features takes too long. Snapshots solve this.
A snapshot is a periodic checkpoint of a projection's state. Instead of replaying all events from event 0, you load the most recent snapshot and replay only the events that occurred after the snapshot.
import json
from datetime import datetime, timezone
class SnapshotManager:
"""
Manages snapshots for event-sourced projections.
Snapshots reduce replay time from O(all events) to O(events since last snapshot).
"""
def __init__(self, s3_client, bucket: str = "ml-snapshots"):
self.s3 = s3_client
self.bucket = bucket
def save_snapshot(
self,
entity_type: str,
entity_id: str,
state: dict,
last_event_offset: int,
snapshot_ts: datetime,
) -> None:
"""Save current state as a snapshot with the last processed event offset."""
key = f"snapshots/{entity_type}/{entity_id}/latest.json"
snapshot = {
"entity_id": entity_id,
"state": state,
"last_event_offset": last_event_offset,
"snapshot_ts": snapshot_ts.isoformat(),
}
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=json.dumps(snapshot),
ContentType="application/json",
)
def load_snapshot(
self,
entity_type: str,
entity_id: str,
) -> Optional[dict]:
"""Load the most recent snapshot. Returns None if no snapshot exists."""
key = f"snapshots/{entity_type}/{entity_id}/latest.json"
try:
response = self.s3.get_object(Bucket=self.bucket, Key=key)
return json.loads(response["Body"].read())
except self.s3.exceptions.NoSuchKey:
return None
def get_events_since_snapshot(
self,
entity_id: str,
snapshot: Optional[dict],
event_store,
) -> list:
"""
If snapshot exists, fetch only events after the snapshot.
If no snapshot, fetch all events from the beginning.
"""
if snapshot:
start_offset = snapshot["last_event_offset"] + 1
else:
start_offset = 0
return event_store.get_events(
entity_id=entity_id,
start_offset=start_offset,
)
Real Example: Uber Michelangelo
Uber's Michelangelo ML platform (published 2017) uses event sourcing as the foundation of its feature computation pipeline. User actions (trip requests, driver assignments, ratings, payments) are published to Kafka topics. Michelangelo's data pipeline consumes these events and projects them into a unified feature store - Hive for batch features, Cassandra for online features.
The temporal query capability is built into Michelangelo's training data generation: when you initiate a training run, you specify the time range of training examples, and the system computes features "as of" each example's timestamp by joining against a snapshot of the feature values at that time. This prevents leakage and ensures training features match serving features.
Uber processes approximately 10 billion events per day across their Kafka cluster. The event store retention policy is 30 days for hot storage (Kafka), with older data archived to HDFS (now S3) indefinitely. This gives them both low-latency replay for recent events and full historical replay for training data generation.
:::danger Projection Idempotency
Every projection must be idempotent: applying the same event twice produces the same state as applying it once. This sounds obvious, but it is easy to violate. If your projection does INCR views_count for each view event, and Kafka delivers the same event twice (which it can, due to at-least-once delivery semantics), you will over-count.
Solution: use the event_id as an idempotency key. Before applying an event, check whether you have already processed this event_id (store processed event IDs in a set with TTL). Alternatively, design your projection operations to be naturally idempotent: use SET key value instead of INCR, store the full event in a sorted set by event_id, and recompute aggregates from the set rather than incrementally updating them.
:::
:::warning Event Schema Evolution
Events are immutable and permanent. You cannot change an event you have already published. This means your event schema must be designed for evolution from day one. Use a schema registry (Confluent Schema Registry, AWS Glue Schema Registry) to version your event schemas. Use backward-compatible changes only: add optional fields, never remove required fields, never change field types. When you need a breaking change, introduce a new event type and process both old and new event types in your projections until old events age out. :::
Interview Q&A
Q1: What is event sourcing and why is it useful for ML systems?
Event sourcing is an architectural pattern where you store a log of events that happened (facts about the past) rather than the current state of entities. The current state is derived by replaying events. For ML systems, event sourcing provides three critical capabilities: (1) complete behavioral history for feature computation - you can compute any temporal aggregate over any time window; (2) point-in-time correct features for training - you can compute features "as of" any historical timestamp, preventing data leakage; (3) full auditability - you can reconstruct exactly what the model saw when it made any historical decision, which is essential for compliance, debugging, and appeals processes.
Q2: How do you handle delayed labels in an event-sourced ML system?
Delayed labels are natural in event sourcing because label events are just events with their own timestamps. A fraud chargeback that arrives 7 days after the transaction is published as a fraud_confirmed label event with occurred_at = the chargeback date and reference_event_id = the original transaction event ID.
The training pipeline handles this by: (1) generating candidate training examples from transaction events; (2) waiting for a label observation window (e.g., 14 days) before including an example in the training dataset; (3) joining transaction events with label events by reference_event_id to attach the label. Examples that did not receive a label within the observation window are labeled as negative (non-fraud). This is identical to how real-world fraud labeling works.
Q3: Explain the projection pattern and how it differs from a traditional database query.
A projection is a derived read model built by replaying events through a transformation function. Unlike a traditional database query (which reads from a mutable table), a projection is: (1) built from immutable events, so it can always be rebuilt if corrupted; (2) optimized for one specific query pattern rather than being a general-purpose table; (3) eventually consistent - the projection is updated asynchronously as events arrive, not synchronously as in a transactional database.
For ML, a typical projection is: consume all user_viewed events from Kafka, maintain a sorted set in Redis keyed by user_id where each member is an item_id and the score is the view timestamp. This projection answers "what items has this user viewed in the last N days?" in O(log n) time, which a general-purpose SQL query over a transaction table cannot match.
Q4: How does Uber Michelangelo use event sourcing for feature computation?
Uber publishes all application events (trip requests, completions, payments, ratings, driver locations) to Kafka topics. Michelangelo's data pipeline consumes these events in two modes: batch (Spark jobs reading Kafka topics via HDFS archive, producing features in Hive) and streaming (Flink jobs consuming from Kafka in real time, producing features in Cassandra). Both modes read from the same event source, ensuring consistency.
When a training run is initiated, Michelangelo's training data generation joins the label dataset (e.g., "did this driver complete the trip?") with point-in-time correct features from Hive - specifically, the feature values that were present at the time of each training example, not the current values. This prevents leakage and is only possible because the full event history is stored, not just current state.
Q5: What is the difference between an event store and a message queue?
A message queue (Kafka with short retention, RabbitMQ, SQS) is designed for temporary message passing - messages are consumed and deleted. An event store is permanent - events are never deleted and can be replayed arbitrarily. The key differences: (1) retention - event store keeps events indefinitely, message queue discards after consumption; (2) replay - event store supports seeking to any offset and replaying, message queue does not; (3) purpose - message queue for decoupling producers from consumers, event store as the authoritative system of record.
In practice, Kafka with retention.ms=-1 (unlimited) and cleanup.policy=compact (log compaction for stateful topics) functions as an event store. Many production ML systems use exactly this configuration, sometimes combining it with Kafka tiered storage (3.6+) to move cold data to S3 at much lower cost than hot Kafka storage.
Summary
Event sourcing stores immutable events rather than mutable state, enabling the three capabilities that ML systems need most: complete behavioral history for feature computation, point-in-time correct features for leak-free training, and full auditability for compliance and debugging. Kafka with long retention serves as the event store. Projections derive read-optimized views from the event log - feature stores in Redis, training datasets in S3, audit trails in a document store. Snapshots avoid full replay overhead by providing periodic checkpoints. Delayed labels are handled naturally because label events carry their own timestamps. The pattern is foundational at Uber Michelangelo and any ML platform that needs to answer "what did the model see when it made that decision?"
