Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Event-Driven ML Architecture demo on the EngineersOfAI Playground - no code required. :::

Event-Driven ML Architecture

The Production Scenario

A food delivery platform has built a rider ETA model that runs every time a user opens the order tracking screen. The model needs five features: distance from rider to restaurant, distance from restaurant to customer, current traffic conditions, time of day, and a rider-specific speed estimate based on recent deliveries. The first four are easy. The fifth - rider speed - requires knowing the last 10 GPS pings for the rider and computing distance/time.

The naive implementation: when the user opens the tracking screen, the ETA service queries the database for the last 10 rider GPS records, computes the speed estimate, assembles all five features, and calls the model. Database query latency: 80ms. Total ETA response: 150ms. The user sees a loading spinner for 150ms before the map loads.

The team optimizes by switching to an event-driven approach. Every GPS ping from a rider device is published as an event to a Kafka topic. A Flink job processes these events, maintains a per-rider rolling speed estimate, and writes the result to Redis. Now the ETA service reads from Redis (2ms) instead of querying the database. Total ETA response: 25ms. The loading spinner disappears.

But more importantly: the ETA service is now decoupled from the GPS processing logic. The GPS event stream is consumed independently by multiple systems - the ETA model, the fraud detection system (detecting GPS spoofing), and the payments system (computing delivery completion). Adding a new consumer of GPS events requires no changes to the GPS publishing code. This is the fundamental value of event-driven architecture: decoupling producers from consumers through a shared event log.

Why This Exists - The Coupling Problem in ML Systems

Direct service calls create tight coupling. When the ETA service calls the GPS service directly (HTTP or gRPC), the ETA service depends on the GPS service's availability, latency, and API contract. If the GPS service is slow, the ETA service is slow. If the GPS service changes its API, the ETA service breaks.

More subtly: direct calls create a dependency on the GPS service's processing rate. At peak - 100,000 active deliveries, each sending a GPS ping every 5 seconds - the GPS service receives 20,000 pings per second. Every ETA request that calls the GPS service directly adds to this load. The GPS service needs to scale with both GPS ping volume and ETA request volume simultaneously.

Event-driven architecture breaks this dependency. GPS events are published to Kafka. The Flink feature pipeline consumes events at its own pace, maintaining per-rider state in Redis. The ETA service reads pre-computed features from Redis. GPS publishing, feature computation, and ETA serving are three separate concerns, each scalable independently.

Historical Context

Event-driven architecture has roots in the publish-subscribe pattern and message queuing systems of the 1990s. The modern event-driven patterns - event sourcing, CQRS, and the outbox pattern - were formalized by the Domain-Driven Design community around 2010-2015, particularly in work by Greg Young (event sourcing, 2010) and Martin Fowler (CQRS, 2011).

The application to ML systems was driven by the scale requirements of social networks and e-commerce platforms. LinkedIn's description of their Unified Messaging Platform (2021) is a canonical reference: billions of events per day processed by dozens of ML pipelines, all decoupled through Kafka. The pattern is now standard at any company with more than a handful of ML models consuming real-time data.

Core Concepts

Event Sourcing

Event sourcing stores the history of events as the primary source of truth, not the current state. Instead of storing "rider speed = 12.5 km/h," you store every GPS ping: "(rider_123, 47.6062, -122.3321, 1704067200000ms), (rider_123, 47.6058, -122.3325, 1704067205000ms), ..."

The current state (speed) is computed by replaying the events. This has a profound implication for ML: you can recompute any feature at any historical point in time by replaying the event log from that point. Training data for the ETA model can be constructed from historical GPS event replay - the exact features that would have been computed at each point in history, using the exact same feature computation code used in production.

The most important ML benefit of event sourcing: it eliminates training-serving skew by construction. When training data is generated by replaying the same event stream through the same feature computation code, the features in training are identical to the features in serving.

CQRS: Separate Read and Write Paths for ML Features

CQRS (Command Query Responsibility Segregation) separates the write path (feature computation from events) from the read path (feature lookup at serving time). For ML, this maps naturally:

  • Command side (write): Flink job processes events and writes features to Redis
  • Query side (read): Model serving layer reads features from Redis

The read side can be optimized independently of the write side. If the read pattern requires low-latency lookups by user ID, use Redis. If the read pattern requires complex queries (give me all users with feature X greater than Y), use a different store (Elasticsearch, Cassandra) consuming from the same write path.

Event Schema Design for ML

Event schemas define the contract between producers and consumers. For ML, poorly designed schemas cause two classes of bugs: parsing failures (events that cannot be deserialized) and semantic skew (events that parse successfully but contain different semantics than expected).

Avro for ML Events

Avro is the standard schema format for Kafka events in ML systems. It is schema-registered (schemas stored in Confluent Schema Registry), compactly encoded (binary), and supports backward/forward compatibility rules:

// gps_ping.avsc - Avro schema for GPS events
{
"type": "record",
"name": "GpsPing",
"namespace": "com.delivery.events",
"doc": "GPS location ping from a rider device.",
"fields": [
{
"name": "rider_id",
"type": "string",
"doc": "Unique rider identifier"
},
{
"name": "latitude",
"type": "double",
"doc": "WGS84 latitude in degrees"
},
{
"name": "longitude",
"type": "double",
"doc": "WGS84 longitude in degrees"
},
{
"name": "event_timestamp_ms",
"type": "long",
"doc": "Event time: milliseconds since Unix epoch"
},
{
"name": "accuracy_meters",
"type": ["null", "float"],
"default": null,
"doc": "GPS accuracy estimate in meters. Null if unknown."
},
{
"name": "speed_mps",
"type": ["null", "float"],
"default": null,
"doc": "Device-reported speed in meters per second. Null if unavailable."
},
{
"name": "schema_version",
"type": "int",
"default": 1,
"doc": "Schema version for debugging. Use Schema Registry for compatibility."
}
]
}

Producer code:

# gps_event_producer.py
import time
import json
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField


schema_registry_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})

# Load schema from registry (or inline)
gps_schema_str = open("gps_ping.avsc").read()

avro_serializer = AvroSerializer(
schema_registry_client=schema_registry_client,
schema_str=gps_schema_str,
)

producer = Producer({"bootstrap.servers": "kafka:9092"})


def publish_gps_ping(
rider_id: str,
latitude: float,
longitude: float,
accuracy_meters: float = None,
speed_mps: float = None,
):
event = {
"rider_id": rider_id,
"latitude": latitude,
"longitude": longitude,
"event_timestamp_ms": int(time.time() * 1000),
"accuracy_meters": accuracy_meters,
"speed_mps": speed_mps,
"schema_version": 1,
}

producer.produce(
topic="gps-pings",
key=rider_id.encode(), # Partition by rider_id for ordering
value=avro_serializer(
event,
SerializationContext("gps-pings", MessageField.VALUE),
),
)
producer.poll(0) # Trigger delivery callbacks

Schema Evolution Rules

When you need to change an event schema - adding a new field for a new ML feature - follow these rules to maintain compatibility:

ChangeSafe?Notes
Add field with default valueYesBackward compatible
Remove optional fieldYesForward compatible
Rename a fieldNoBreaking - old consumers break
Change field typeNoBreaking - binary encoding changes
Add field without defaultNoBreaks readers that don't have the field

The Confluent Schema Registry enforces compatibility rules automatically. Set "compatibility": "BACKWARD" to prevent incompatible schema changes from being registered.

The Outbox Pattern: Reliable Event Publishing

A common ML bug: an application updates a database and then publishes an event to Kafka. If the Kafka publish fails (or the process crashes between the DB write and the Kafka publish), the event is lost. The database is updated but downstream ML feature pipelines never see the event. The features in Redis are now stale relative to the database state.

The outbox pattern solves this by making event publishing atomic with the database transaction:

# outbox_pattern.py
import json
import uuid
import time
from dataclasses import dataclass
from typing import Optional
import psycopg2
from confluent_kafka import Producer


@dataclass
class OutboxEvent:
"""An event stored in the database outbox table before Kafka publishing."""
event_id: str
event_type: str
aggregate_type: str
aggregate_id: str
payload: dict
created_at: float
published_at: Optional[float] = None


# Database schema for outbox table:
# CREATE TABLE outbox_events (
# event_id UUID PRIMARY KEY,
# event_type VARCHAR(100) NOT NULL,
# aggregate_type VARCHAR(100) NOT NULL,
# aggregate_id VARCHAR(100) NOT NULL,
# payload JSONB NOT NULL,
# created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
# published_at TIMESTAMPTZ NULL
# );
# CREATE INDEX idx_outbox_unpublished ON outbox_events(created_at)
# WHERE published_at IS NULL;


class OutboxService:
"""
Implements the outbox pattern for reliable event publishing.

Write Phase: Application writes to DB + outbox in same transaction.
Relay Phase: Background job reads outbox and publishes to Kafka.
"""

def __init__(self, db_conn, kafka_producer: Producer):
self.db = db_conn
self.producer = kafka_producer

def write_with_event(
self,
db_operation: callable,
event_type: str,
aggregate_type: str,
aggregate_id: str,
payload: dict,
):
"""
Execute a DB operation and write an outbox event atomically.
If either fails, both are rolled back - no partial state.
"""
event_id = str(uuid.uuid4())

with self.db: # Transaction block
# Step 1: Execute the business logic DB operation
db_operation(self.db)

# Step 2: Write to outbox table in the same transaction
cursor = self.db.cursor()
cursor.execute(
"""
INSERT INTO outbox_events
(event_id, event_type, aggregate_type, aggregate_id, payload, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())
""",
(event_id, event_type, aggregate_type, aggregate_id, json.dumps(payload)),
)

# If we get here, both DB write and outbox write committed atomically
return event_id

def relay_outbox_to_kafka(self, batch_size: int = 100):
"""
Background relay: read unpublished outbox events and publish to Kafka.
Safe to run continuously - idempotent if Kafka publish fails.
"""
cursor = self.db.cursor()
cursor.execute(
"""
SELECT event_id, event_type, aggregate_id, payload
FROM outbox_events
WHERE published_at IS NULL
ORDER BY created_at ASC
LIMIT %s
""",
(batch_size,),
)
events = cursor.fetchall()

if not events:
return 0

published_ids = []
for event_id, event_type, aggregate_id, payload in events:
try:
self.producer.produce(
topic=event_type, # Event type as topic name
key=aggregate_id.encode(),
value=json.dumps(payload).encode(),
)
published_ids.append(event_id)
except Exception as e:
print(f"Kafka publish failed for {event_id}: {e}")
break # Stop on first failure - retry next time

self.producer.flush()

# Mark published events as done
if published_ids:
cursor.execute(
"""
UPDATE outbox_events
SET published_at = NOW()
WHERE event_id = ANY(%s)
""",
(published_ids,),
)
self.db.commit()

return len(published_ids)

Event Ordering and Idempotency in ML Pipelines

Kafka guarantees ordering within a partition, not across partitions. Events with the same key (e.g., rider_id) go to the same partition if you use key-based partitioning. This guarantees that GPS pings for a given rider arrive in order.

But consumers can fail and reprocess events. Your feature computation must be idempotent - processing the same event twice must produce the same result as processing it once:

# idempotent_feature_writer.py
import hashlib
import time
import redis


class IdempotentFeatureWriter:
"""
Writes features to Redis with deduplication.

Uses event_id to prevent duplicate processing from causing
incorrect feature values.
"""

def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.dedup_ttl = 3600 # 1 hour dedup window

def _dedup_key(self, event_id: str) -> str:
return f"processed:event:{event_id}"

def write_feature_if_new(
self,
event_id: str,
user_id: str,
feature_name: str,
feature_value: float,
ttl: int = 7200,
) -> bool:
"""
Write a feature value, but only if this event_id has not been processed.
Returns True if written, False if duplicate.
"""
dedup_key = self._dedup_key(event_id)

# SET NX: set only if not exists - atomic check-and-set
is_new = self.redis.set(dedup_key, "1", ex=self.dedup_ttl, nx=True)

if not is_new:
return False # Already processed this event

# Write the feature
feature_key = f"features:{user_id}:{feature_name}"
pipe = self.redis.pipeline()
pipe.set(feature_key, feature_value)
pipe.expire(feature_key, ttl)
pipe.execute()

return True

LinkedIn's Architecture Pattern

LinkedIn describes their ML feature pipeline as built on a "Unified Messaging Platform" - a Kafka-based event bus that serves as the backbone for all ML feature computation. Key properties:

  1. Single event bus: All raw events (profile views, connection requests, content engagement) flow through one Kafka cluster. Any ML feature pipeline subscribes to relevant event types.

  2. Standardized schemas: All events use Pegasus (LinkedIn's schema language, similar to Avro) with mandatory fields: event_time, source_service, event_id. These fields enable event sourcing replay and deduplication.

  3. Multiple consumers per event type: A single "profile_view" event is consumed independently by the "who viewed your profile" feature pipeline, the content recommendation pipeline, and the ad targeting pipeline. Each is a separate Flink job with its own state.

  4. Offline/online parity: Historical events are replayed through the same Flink jobs to generate training features. This eliminates training-serving skew at the infrastructure level - the same code runs both offline and online.

Production Engineering Notes

Schema Registry is essential at scale: Without a schema registry, different teams independently evolve their event schemas and consumers break silently (parsing the wrong field type). Confluent Schema Registry enforces compatibility rules and stores schema versions. Any team deploying a new consumer can look up the schema for any topic.

Monitor consumer lag per ML pipeline: Kafka consumer lag measures how far behind the consumer is from the latest events. For ML feature pipelines, consumer lag is the staleness of your features. If your "rider speed" Flink job is 5 minutes behind the latest GPS pings, your ETA model is using 5-minute-old speed estimates. Alert when consumer lag exceeds 2x your target freshness.

Compacted Kafka topics for feature stores: For features that are frequently overwritten (current rider location), use Kafka log compaction. A compacted topic retains only the latest value per key - this allows new consumers to bootstrap the current state without replaying the entire history.

:::warning Event Time vs Ingestion Time in Event Sourcing When replaying historical events for training data generation, ensure you use event time (when the event occurred) not ingestion time (when Kafka received it). If a mobile event is delayed 3 minutes in transit, replaying with ingestion time places it in the wrong feature window. Always embed event_timestamp_ms in every event payload and use it as the authoritative time. :::

:::danger The Dual-Write Problem Never update a database AND publish to Kafka in two separate operations without the outbox pattern. A failure between the two operations creates inconsistency: the database is updated but the event is not published (or vice versa). Downstream ML features become permanently out of sync with the database state, and the discrepancy may be invisible until model quality degrades. Always use the outbox pattern or a CDC (Change Data Capture) tool like Debezium to capture database changes and publish them as events atomically. :::

Interview Q&A

Q: What is event sourcing and how does it eliminate training-serving skew?

Event sourcing stores all changes as a sequence of immutable events rather than storing current state. For ML, this means: instead of storing "user has clicked 5 times in the last hour," you store every click event with its timestamp. Current feature values are computed by processing the event stream. To eliminate training-serving skew, you replay the historical event stream through the same feature computation code that runs in production. The training data is computed by exactly the same code path as the serving features - the only difference is whether the events come from Kafka (live) or from a replay (historical). This structural equivalence makes training-serving skew impossible at the feature computation level.

Q: Explain CQRS and how it applies to an ML feature store.

CQRS separates the system into a write path (commands, which change state) and a read path (queries, which read state). For ML feature stores: the command side is a stream processing job (Flink) that computes features from events and writes them to a feature store. The command side is optimized for write throughput. The query side is the model serving layer reading features from Redis - optimized for low-latency key-value reads. Separating these paths allows each to be scaled independently: if feature computation is slow, add more Flink parallelism. If serving is slow, add more Redis replicas. Neither change affects the other.

Q: What is the outbox pattern and when is it necessary for ML systems?

The outbox pattern guarantees that a database write and its corresponding event publication to Kafka are atomic - either both happen or neither happens. It is necessary whenever you need to: update application state and ensure ML feature pipelines see the event. The implementation: write both the application state change and an outbox record in the same database transaction, then have a relay process read unpublished outbox records and publish them to Kafka. After successful Kafka publish, mark the outbox record as published. This two-step approach tolerates failures at any step: a crash before Kafka publish leaves an unpublished outbox record that the relay will pick up and retry.

Q: How do you handle event ordering and idempotency in an ML event pipeline?

Ordering: Use key-based Kafka partitioning (rider_id, user_id) to ensure all events for the same entity arrive in the same partition and are processed in order. Cross-entity ordering guarantees require distributed transactions and are generally avoided. Idempotency: any event can arrive twice (due to producer retry, consumer rebalance, or checkpoint recovery). Feature writes must be idempotent - processing the same event twice must produce the same result. Implementation: use event_id-based deduplication with a Redis SET NX operation. Store processed event_ids with a TTL matching your expected duplicate window (typically 1 hour). If the event_id is already in Redis, skip processing.

Q: How would you design the event schema for a user click event that is consumed by three different ML feature pipelines?

Design for the union of all consumer requirements with mandatory common fields: event_id (UUID, for deduplication), event_time_ms (event time, not ingestion time), user_id, session_id. Add event-specific fields: item_id, click_position, page_type, device_type. Store in Avro (schema registry enforced) with BACKWARD compatibility - new fields must have default values so older consumers continue to work. Register the schema before deploying the producer. Each ML consumer (recommendation features, session features, ad targeting features) subscribes to the same topic and uses only the fields it needs, ignoring others. Never split events into separate topics per consumer - that creates coupling between the producer and each consumer's schema requirements.

© 2026 EngineersOfAI. All rights reserved.