Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Kafka Architecture demo on the EngineersOfAI Playground - no code required. :::

Kafka for ML Systems

import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';

Reading time: 35 minutes | Interview relevance: Very High | Target roles: ML Engineer, Data Engineer, AI Platform Engineer


The Production Scenario

It is 9:47 AM on a Tuesday. The credit risk team at a major retail bank has just pushed a new fraud detection model to production. Within six minutes, the on-call ML engineer gets a Slack message: "Model scores are identical for every transaction. Something is very wrong."

The post-mortem reveals the root cause: upstream system #7 - the customer balance service - quietly changed its event schema three days ago. A field renamed from account_balance to balance_usd. The model's feature pipeline consumed raw JSON from a direct connection, silently treated the missing field as zero, and served 50,000 transactions an hour with a zeroed-out balance feature. The model kept running, producing plausible-looking numbers, with no errors in the logs. Silent data corruption at scale.

This is the problem that every ML platform team at the scale of thousands of transactions per second eventually confronts. Before Kafka, the architecture looked like a nightmare: 12 upstream systems, each with a direct connection to the feature computation service, each connection with its own schema contract negotiated informally in a Confluence document. One team renames a field on a Friday afternoon and goes home for the weekend. By Monday morning, three models are silently broken.

The ML platform team rebuilds the architecture around Kafka as a central event bus. Every upstream system publishes to a topic. Every downstream consumer subscribes to topics. Crucially, they add Confluent Schema Registry: every message is tagged with a schema ID, and consumers reject messages that don't conform to the registered schema. The silent corruption is impossible now - a schema change either passes the compatibility check or it's blocked at the producer. The 200ms scoring SLA holds, 50,000 transactions per second flow cleanly, and the on-call engineer sleeps through the night.

This lesson is about how that architecture works in detail - the Kafka event bus pattern for ML, schema management, change data capture, exactly-once guarantees, and the operational patterns that make the difference between a fragile prototype and a production ML system.


Why This Exists

Before event buses, ML feature pipelines were built with point-to-point integrations. Each producer sent data directly to each consumer that needed it. With N producers and M consumers, you eventually have N×MN \times M connections. Each connection is a custom integration: different serialization formats, different retry logic, different error handling. When one producer changes, it potentially breaks all M consumers simultaneously.

Kafka solves this by inserting a durable, replayable log between producers and consumers. Producers write to topics without knowing who reads them. Consumers read at their own pace without knowing who writes. This decoupling is the foundational property that makes Kafka valuable for ML systems specifically - because ML systems have a deeply asymmetric relationship with data: a single feature can be consumed by dozens of models, and a single upstream system can feed hundreds of features.

The other critical property is durability and replay. When you need to retrain a model, you need historical data. When you deploy a new model version and need to backfill its feature values, you replay from an offset. When a consumer fails and recovers, it picks up where it left off. None of this is possible with direct streaming connections. Kafka's log retention makes it a time machine for ML data.


Historical Context

Apache Kafka was created at LinkedIn by Jay Kreps, Neha Narkhede, and Jun Rao, open-sourced in 2011. The name comes from Franz Kafka - Jay Kreps was reading Kafka at the time and thought it was a fitting name for a system built around writing. The original use case was activity tracking: LinkedIn needed to pipeline hundreds of billions of user activity events per day from the website into their Hadoop cluster for analytics.

The key insight that distinguished Kafka from existing message queues (RabbitMQ, ActiveMQ) was treating the log as the primary storage abstraction. Messages are not deleted when consumed - they are retained for a configurable window (days or weeks). Multiple independent consumers can read the same messages at different offsets. This log-centric design made Kafka a natural fit for ML pipelines, which often require the same events to be consumed by stream processors, batch training jobs, monitoring systems, and audit logs simultaneously.

Confluent, founded by the original Kafka creators in 2014, added the Schema Registry, Kafka Connect, and KSQL - the ecosystem components that transform raw Kafka into a complete data platform.


Kafka's Role in ML Systems

Think of Kafka as the central nervous system of an ML platform. Information flows from the periphery (upstream systems) inward (to feature computation) and then back out (to model serving, monitoring, and retraining). Without a central bus, this information flow is chaotic and brittle. With Kafka, it becomes structured and observable.

Three Topic Types in an ML System

Every ML Kafka deployment has the same three categories of topics. Understanding this taxonomy is essential for designing a clean architecture.

1. Raw Event Topics - the unprocessed signals from upstream systems. These contain transactions, clicks, sensor readings, database change events. They are append-only and retained for days or weeks for replay. Schema evolution happens here most often, which is why Schema Registry matters most for these topics.

2. Feature Topics - the computed features, ready for model consumption. The stream processor reads raw topics, applies transformations (aggregations, joins, normalizations), and writes the result here. Feature topics are the API boundary between the data engineering team (who owns the stream processor) and the ML engineering team (who owns the model server). They should be treated as versioned contracts.

3. Prediction and Feedback Topics - the model's outputs and eventually the ground truth labels. The prediction topic is the model server's output. The feedback topic receives delayed ground truth (the actual outcome of a credit decision, for example) which is later joined back to the prediction for training data generation.


Schema Management with Confluent Schema Registry

The Schema Registry is a standalone service that stores schemas indexed by subject (topic name + suffix). Every Kafka message produced with a schema-aware serializer is prefixed with:

  • 1 byte: magic byte 0x00
  • 4 bytes: schema ID (integer)
  • N bytes: the actual serialized payload

When a consumer receives a message, it reads the schema ID, fetches the schema from the registry (with aggressive caching), and deserializes the payload. If the producer changed the schema in an incompatible way, the consumer gets an error - a loud, obvious error, not silent corruption.

Avro vs Protobuf vs JSON Schema

For ML systems, the choice matters because of how schemas evolve and how features are represented.

FormatProsConsBest for
AvroCompact binary, native Schema Registry support, schema embedded in binary formatRequires separate schema file, no forward reading without schemaFeature events, CDC events
ProtobufVery compact, language-agnostic code gen, excellent versioning semanticsMore complex toolchain, proto file managementCross-team APIs, gRPC-integrated pipelines
JSON SchemaHuman-readable, easy debuggingVerbose, slower parsing, larger messagesDevelopment/debugging, low-throughput pipelines

For most ML teams, Avro with Schema Registry is the right default. It is the native format of the Confluent ecosystem, Schema Registry has first-class Avro support, and the binary encoding is compact enough for high-throughput feature pipelines.

Schema Evolution: What ML Teams Actually Need

Schema evolution compatibility modes determine which schema changes are allowed:

  • BACKWARD: New schema can read messages written with the old schema. Consumers can be upgraded before producers. This is the most common mode - add optional fields with defaults.
  • FORWARD: Old schema can read messages written with the new schema. Producers can be upgraded before consumers. Useful when you need to add fields quickly and update consumers later.
  • FULL: Both backward and forward compatible. The strictest - only additions of optional fields with defaults are allowed. This is what ML feature pipelines need: it guarantees that a feature topic is always readable by any deployed model version.
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka import Producer, Consumer, KafkaError
import json

# --- 1. Schema Registry: Register Avro schema for ML credit risk events ---

SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
KAFKA_BOOTSTRAP = "kafka:9092"

# Avro schema for a transaction event
TRANSACTION_SCHEMA_STR = json.dumps({
"type": "record",
"name": "TransactionEvent",
"namespace": "com.bank.creditrisk",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "account_id", "type": "string"},
{"name": "merchant_id", "type": "string"},
{"name": "amount_usd", "type": "double"},
{"name": "merchant_category_code", "type": "int"},
{"name": "balance_usd", "type": "double"},
# Optional field with default - BACKWARD and FORWARD compatible
{
"name": "device_fingerprint",
"type": ["null", "string"],
"default": None
},
{"name": "event_time_ms", "type": "long"},
]
})

schema_registry_client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

# Set FULL compatibility for the feature topic - strictest, both directions
schema_registry_client.set_compatibility(
subject_name="raw.transactions-value",
level="FULL"
)

avro_serializer = AvroSerializer(
schema_registry_client,
TRANSACTION_SCHEMA_STR,
lambda event, ctx: event # pass-through - event is already a dict
)

avro_deserializer = AvroDeserializer(
schema_registry_client,
TRANSACTION_SCHEMA_STR
)


# --- 2. Producer: publish transaction events with Avro schema ---

def create_producer():
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka import SerializingProducer

return SerializingProducer({
"bootstrap.servers": KAFKA_BOOTSTRAP,
"key.serializer": lambda k, ctx: k.encode("utf-8"),
"value.serializer": avro_serializer,
# Idempotent producer - prevents duplicate messages on retry
"enable.idempotence": True,
"acks": "all",
"retries": 10,
"max.in.flight.requests.per.connection": 5,
})


def publish_transaction(producer, transaction: dict):
producer.produce(
topic="raw.transactions",
key=transaction["account_id"],
value=transaction,
on_delivery=lambda err, msg: (
print(f"Delivery failed: {err}") if err else None
)
)
producer.poll(0) # non-blocking flush of delivery callbacks


# --- 3. Consumer: deserialize and process ---

def create_consumer(group_id: str):
from confluent_kafka import DeserializingConsumer

return DeserializingConsumer({
"bootstrap.servers": KAFKA_BOOTSTRAP,
"group.id": group_id,
"auto.offset.reset": "earliest",
"key.deserializer": lambda k, ctx: k.decode("utf-8"),
"value.deserializer": avro_deserializer,
})

Change Data Capture with Debezium

Change Data Capture (CDC) is the practice of capturing every row-level change in a relational database (INSERT, UPDATE, DELETE) and publishing it as an event to Kafka. For ML systems, CDC is the bridge between the transactional world (where the truth about customers, accounts, and merchants lives) and the streaming feature pipeline.

How Debezium Works

Debezium reads the database's binary replication log directly - MySQL binlog, PostgreSQL WAL, Oracle LogMiner. It requires no changes to the application and adds virtually no overhead to the database. Every committed transaction appears as a Kafka message within milliseconds.

The Debezium message envelope has three key fields:

{
"before": { "account_id": "ACC-123", "balance_usd": 4200.00, "risk_score": 0.12 },
"after": { "account_id": "ACC-123", "balance_usd": 3850.00, "risk_score": 0.12 },
"op": "u",
"ts_ms": 1702484731000,
"source": {
"db": "banking_core",
"table": "accounts",
"lsn": 24523417
}
}

The op field values: "c" (create/insert), "u" (update), "d" (delete), "r" (read/snapshot).

Using CDC to Keep Feature Stores in Sync

The core pattern: the transactional database is the source of truth for customer state. The feature store (Redis, DynamoDB) needs to reflect that state with minimal latency. Without CDC, you run a polling job every N minutes, which introduces lag and hammers the database. With CDC, every database commit triggers an immediate feature store update.

import json
from confluent_kafka import Consumer, Producer, KafkaError
import redis

# --- Debezium CDC message processing for feature store sync ---

REDIS_CLIENT = redis.Redis(host="redis", port=6379, decode_responses=True)

DEBEZIUM_TOPIC = "debezium.banking_core.accounts"
FEATURE_TOPIC = "features.account_state"

consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "feature-store-sync",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # Manual commit for exactly-once control
})

producer = Producer({"bootstrap.servers": "kafka:9092"})

consumer.subscribe([DEBEZIUM_TOPIC])


def extract_feature_delta(debezium_msg: dict) -> dict | None:
"""
Extract the ML-relevant features from a Debezium CDC envelope.
Returns None for delete operations (we tombstone the feature).
"""
op = debezium_msg.get("op")

if op == "d":
# Account closed - tombstone the feature entry
account_id = debezium_msg["before"]["account_id"]
return {"account_id": account_id, "deleted": True}

after = debezium_msg.get("after", {})
before = debezium_msg.get("before") or {}

# Compute balance delta - a useful feature for fraud detection
balance_delta = after.get("balance_usd", 0) - before.get("balance_usd", 0)

return {
"account_id": after["account_id"],
"balance_usd": after["balance_usd"],
"risk_score": after.get("risk_score", 0.0),
"balance_delta": balance_delta,
"op": op,
"source_ts_ms": debezium_msg.get("ts_ms"),
}


def sync_to_feature_store(feature: dict):
"""Write feature to Redis for sub-millisecond online lookup."""
if feature.get("deleted"):
REDIS_CLIENT.delete(f"account:{feature['account_id']}")
return

REDIS_CLIENT.hset(
f"account:{feature['account_id']}",
mapping={
"balance_usd": feature["balance_usd"],
"risk_score": feature["risk_score"],
"balance_delta": feature["balance_delta"],
"updated_ts": feature["source_ts_ms"],
}
)
# Set TTL: expire stale features after 48 hours of no updates
REDIS_CLIENT.expire(f"account:{feature['account_id']}", 172800)


while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(f"Kafka error: {msg.error()}")

debezium_payload = json.loads(msg.value())
feature = extract_feature_delta(debezium_payload)

if feature:
sync_to_feature_store(feature)

# Also publish to the feature topic for stream processing consumers
producer.produce(
topic=FEATURE_TOPIC,
key=feature["account_id"],
value=json.dumps(feature).encode("utf-8"),
)

consumer.commit(asynchronous=False) # Commit only after successful processing

Kafka Connect for ML Data Ingestion

Kafka Connect is a framework for building and running connectors - plugins that move data between Kafka and external systems without writing custom producer/consumer code. For ML teams, the most important connectors are:

Source Connectors (external system → Kafka):

  • JDBC Source: polls MySQL/Postgres tables on a schedule, publishes new/updated rows
  • Debezium CDC: reads database replication logs, publishes every change in real-time
  • S3 Source: reads files from S3 buckets for historical data backfill

Sink Connectors (Kafka → external system):

  • S3 Sink: writes Kafka topics to S3 as Parquet/JSON for batch retraining data
  • JDBC Sink: writes predictions back to a relational database
  • Elasticsearch Sink: indexes feature data for search-based ML

S3 Sink Configuration for ML Training Data

{
"name": "s3-sink-training-data",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "8",
"topics": "features.credit_risk,predictions.fraud_score,labels.ground_truth",
"s3.region": "us-east-1",
"s3.bucket.name": "ml-training-data-lake",
"s3.part.size": "67108864",
"flush.size": "50000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"schema.compatibility": "FULL",
"rotate.interval.ms": "3600000",
"rotate.schedule.interval.ms": "3600000",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_time_ms",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en_US",
"transforms": "AddMetadata",
"transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddMetadata.offset.field": "_kafka_offset",
"transforms.AddMetadata.timestamp.field": "_ingest_ts"
}
}

This configuration lands every event from the three ML topics into S3 in Parquet format, partitioned by hour. The training pipeline reads from this S3 path. The _kafka_offset field enables precise deduplication during retraining.


Exactly-Once in ML: Idempotent Producers and Transactional Consumers

In a feature pipeline, "at-least-once" delivery means a feature value might be computed and written multiple times. For aggregations, this means inflated counts. For balance computations, this means incorrect sums. For fraud scoring, this means the same transaction might be scored twice with different feature states.

Exactly-once semantics in Kafka requires two pieces working together:

  1. Idempotent producers: the broker deduplicates retries using a producer ID + sequence number. Even if the producer sends the same message twice due to a network retry, the broker writes it once.

  2. Transactional consumers + producers: the consumer commits its offset and the producer writes its output atomically as a single transaction. Either both happen or neither does.

from confluent_kafka import Producer, Consumer, KafkaError, TopicPartition
import json

# --- Exactly-once: transactional feature computation ---

TRANSACTIONAL_ID = "feature-pipeline-credit-risk-v2"

# Transactional producer
producer = Producer({
"bootstrap.servers": "kafka:9092",
"enable.idempotence": True,
"transactional.id": TRANSACTIONAL_ID,
"acks": "all",
"retries": 2147483647,
"max.in.flight.requests.per.connection": 5,
})

producer.init_transactions()

# Consumer with isolation.level=read_committed - only reads committed transactions
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "feature-pipeline-credit-risk",
"auto.offset.reset": "earliest",
# Critical: only read messages from completed transactions
"isolation.level": "read_committed",
"enable.auto.commit": False,
})

consumer.subscribe(["raw.transactions"])


def compute_credit_risk_features(transaction: dict) -> dict:
"""Compute ML features from a raw transaction event."""
return {
"account_id": transaction["account_id"],
"amount_usd": transaction["amount_usd"],
"normalized_amount": transaction["amount_usd"] / max(transaction["balance_usd"], 1.0),
"is_high_risk_mcc": transaction["merchant_category_code"] in {5912, 7995, 9754},
"balance_utilization": 1.0 - (transaction["balance_usd"] / max(transaction["amount_usd"], 1.0)),
"feature_computed_ts": transaction["event_time_ms"],
}


# Main exactly-once processing loop
while True:
messages = consumer.consume(num_messages=500, timeout=1.0)
if not messages:
continue

# Filter out errors
valid_messages = [m for m in messages if not m.error()]
if not valid_messages:
continue

try:
producer.begin_transaction()

for msg in valid_messages:
raw = json.loads(msg.value())
features = compute_credit_risk_features(raw)

producer.produce(
topic="features.credit_risk",
key=features["account_id"],
value=json.dumps(features).encode("utf-8"),
)

# Send consumer offsets as part of the transaction - atomic commit
offsets = {
TopicPartition(msg.topic(), msg.partition()): msg.offset() + 1
for msg in valid_messages
}
producer.send_offsets_to_transaction(
offsets,
consumer.consumer_group_metadata()
)

producer.commit_transaction()

except Exception as e:
producer.abort_transaction()
print(f"Transaction aborted: {e}")
# Do not commit consumer offset - messages will be reprocessed
note

Exactly-once comes with a performance cost. Transaction coordination adds ~5–20ms of overhead per batch. For a 200ms SLA with 50K TPS, you amortize this cost by batching many messages per transaction (as shown above with consume(num_messages=500)). The SLA still holds because you process 500 events atomically in one round-trip.


Dead Letter Queues in ML Pipelines

A Dead Letter Queue (DLQ) is a secondary topic where malformed, unprocessable, or schema-invalid messages are routed instead of being dropped or causing the consumer to crash. In ML pipelines, DLQs are essential for three reasons:

  1. Upstream schema violations: a producer sends a message that fails schema validation. You cannot process it, but you need to investigate it.
  2. Feature computation failures: a message is valid but causes an exception (division by zero, null reference, model input out of range). You need to capture the failing input.
  3. Poisoned messages: a message that is structurally valid but semantically wrong (negative amounts, future timestamps). These should be isolated and investigated.
from confluent_kafka import Producer, Consumer
from dataclasses import dataclass, asdict
import json
import traceback
from datetime import datetime, timezone

# --- DLQ pattern: route malformed events to {topic}.dlq ---

@dataclass
class DLQMessage:
original_topic: str
original_partition: int
original_offset: int
original_key: str
original_value: str # raw bytes as hex string
error_type: str # "SCHEMA_VIOLATION", "COMPUTE_ERROR", "SEMANTIC_ERROR"
error_message: str
error_stacktrace: str
failed_at_ts: str # ISO-8601


dlq_producer = Producer({"bootstrap.servers": "kafka:9092"})


def send_to_dlq(msg, error_type: str, exception: Exception):
"""Route a failed message to the dead letter queue topic."""
dlq_topic = f"{msg.topic()}.dlq"

dlq_payload = DLQMessage(
original_topic=msg.topic(),
original_partition=msg.partition(),
original_offset=msg.offset(),
original_key=msg.key().decode("utf-8") if msg.key() else "",
original_value=msg.value().hex() if msg.value() else "",
error_type=error_type,
error_message=str(exception),
error_stacktrace=traceback.format_exc(),
failed_at_ts=datetime.now(timezone.utc).isoformat(),
)

dlq_producer.produce(
topic=dlq_topic,
key=msg.key(),
value=json.dumps(asdict(dlq_payload)).encode("utf-8"),
)
dlq_producer.poll(0)


# Usage in the feature pipeline consumer loop
def process_with_dlq(msg):
try:
raw = json.loads(msg.value())

# Semantic validation before feature computation
if raw.get("amount_usd", 0) < 0:
raise ValueError(f"Negative amount: {raw['amount_usd']}")
if raw.get("balance_usd", 0) < 0:
raise ValueError(f"Negative balance: {raw['balance_usd']}")

features = compute_credit_risk_features(raw)
return features, None

except json.JSONDecodeError as e:
send_to_dlq(msg, "SCHEMA_VIOLATION", e)
return None, e

except ValueError as e:
send_to_dlq(msg, "SEMANTIC_ERROR", e)
return None, e

except Exception as e:
send_to_dlq(msg, "COMPUTE_ERROR", e)
return None, e
warning

Never silently drop messages in an ML pipeline. A dropped malformed message means you never know it arrived. Send it to the DLQ, which gives you an observable record of every failure, its cause, and its original content. Your DLQ message volume is a health metric: if it spikes, an upstream system changed its format.


Consumer Lag Monitoring

Consumer lag is the number of messages in a topic that have been produced but not yet consumed by a specific consumer group. In ML systems, consumer lag directly translates to feature freshness: if your feature pipeline has a lag of 10,000 messages and your producer is writing 1,000 messages/second, your features are 10 seconds stale. For a real-time fraud system where the 200ms SLA matters, 10 seconds of feature lag is catastrophic.

The throughput equation for partitions:

max_throughput=num_partitions×throughput_per_partition\text{max\_throughput} = \text{num\_partitions} \times \text{throughput\_per\_partition}

If each partition handles 5,000 messages/second and you need 50,000 messages/second, you need at least 10 partitions. Consumer lag tells you when you are falling behind - when you need more partitions or more consumer instances.

from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer, TopicPartition
from collections import defaultdict
import time

# --- Consumer lag monitoring for ML feature pipeline health ---

KAFKA_BOOTSTRAP = "kafka:9092"
CONSUMER_GROUP = "feature-pipeline-credit-risk"
TOPIC = "raw.transactions"
LAG_ALERT_THRESHOLD = 5000 # Messages; corresponds to ~1s at 5K/s/partition


def compute_consumer_lag(bootstrap_servers: str, group_id: str, topic: str) -> dict:
"""
Compute per-partition consumer lag for a consumer group.
Returns dict: {partition_id: {"committed": int, "end": int, "lag": int}}
"""
admin = AdminClient({"bootstrap.servers": bootstrap_servers})

# Temporary consumer to query committed offsets
tmp_consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
})

# Get partition metadata
metadata = admin.list_topics(topic=topic, timeout=10)
partitions = [
TopicPartition(topic, p)
for p in metadata.topics[topic].partitions.keys()
]

# Get committed offsets for the consumer group
committed = tmp_consumer.committed(partitions, timeout=10)

# Get the high-water mark (latest offset) for each partition
_, high_watermarks = tmp_consumer.get_watermark_offsets(
TopicPartition(topic, partitions[0].partition), timeout=5
)

lag_report = {}
for tp in committed:
_, hw = tmp_consumer.get_watermark_offsets(tp, timeout=5)
committed_offset = tp.offset if tp.offset >= 0 else 0
lag = hw - committed_offset

lag_report[tp.partition] = {
"committed": committed_offset,
"end": hw,
"lag": lag,
}

if lag > LAG_ALERT_THRESHOLD:
print(
f"ALERT: partition {tp.partition} lag={lag} "
f"(>{LAG_ALERT_THRESHOLD}). Feature freshness degraded."
)

tmp_consumer.close()
return lag_report


def lag_monitor_loop(interval_seconds: int = 30):
"""Run lag monitoring in a loop - push metrics to Datadog/Prometheus in production."""
while True:
lag = compute_consumer_lag(KAFKA_BOOTSTRAP, CONSUMER_GROUP, TOPIC)
total_lag = sum(p["lag"] for p in lag.values())
print(f"[{time.strftime('%H:%M:%S')}] total_lag={total_lag} partitions={lag}")
time.sleep(interval_seconds)

Production Notes

Topic configuration for ML systems:

  • Set retention.ms based on your retraining frequency. If you retrain weekly, retain at least 10 days. If you need replay for new model versions, retain longer.
  • Use cleanup.policy=compact for feature topics - you only need the latest value per key (account ID, user ID), not the full history.
  • Partition count should be a multiple of your consumer count. Start with 12 partitions (3× the number of consumer instances, allowing room to scale).

Replication factor:

  • Development: replication.factor=1
  • Production: replication.factor=3, min.insync.replicas=2. Never deploy ML-critical topics with replication.factor=1.

Message size:

  • Default max is 1MB. For feature vectors with hundreds of dimensions, consider increasing max.message.bytes on the broker and message.max.bytes on the producer, or compress with compression.type=lz4.

Key design:

  • Use account_id or user_id as the message key. This guarantees that all events for the same account go to the same partition, preserving per-account ordering. This is critical for stateful stream processors that maintain per-account state.

Common Mistakes

danger

Mistake: Using auto.offset.reset=latest in production If the consumer group has no committed offset (first deploy, or after offset loss), latest means you silently skip all messages produced before the consumer started. For ML feature pipelines, this means features are computed on an incomplete event history. Always use earliest for feature pipelines, or explicitly manage offset initialization.

danger

Mistake: Not setting min.insync.replicas acks=all on the producer means "all in-sync replicas must acknowledge." But if min.insync.replicas=1 (the default), acks=all with a 3-replica topic only requires 1 acknowledgment. Set min.insync.replicas=2 on production ML topics so that a single broker failure doesn't cause data loss.

warning

Mistake: One partition = one consumer Kafka's parallelism is bounded by partition count. If your topic has 4 partitions, you can have at most 4 active consumers in a group. Plan partition count at topic creation time - increasing partitions later is possible but disrupts message ordering guarantees.

warning

Mistake: Ignoring DLQ message volume DLQs exist to be monitored. If you set up a DLQ and never alert on it, you've created a black hole where data quality problems disappear silently. Set up a Datadog/Prometheus metric for DLQ message rate and alert when it exceeds 0.1% of total throughput.


Interview Q&A

What is Schema Registry and why is it essential for ML Kafka pipelines?

Schema Registry is a service that stores and enforces Avro/Protobuf/JSON schemas for Kafka topics. Every message produced is tagged with a schema ID (embedded as a 4-byte prefix in the message). Consumers fetch the schema and validate/deserialize accordingly.

For ML, it prevents the most dangerous class of data pipeline failure: silent schema drift. Without Schema Registry, a producer can rename a field, add a new required field, or change a type and the consumer will silently process corrupt data - potentially training models on incorrect features or serving predictions based on zeroed-out inputs. Schema Registry enforces compatibility rules (FULL compatibility is the ML standard - additions only, with defaults) and rejects schema changes that would break consumers. It turns silent data corruption into loud, observable schema validation failures.

What is CDC and how does Debezium enable real-time feature updates?

Change Data Capture (CDC) is the process of capturing every row-level change (INSERT, UPDATE, DELETE) in a relational database and publishing it as a stream of events. Debezium implements CDC by reading the database's replication log directly (PostgreSQL WAL, MySQL binlog) rather than polling tables. This gives sub-second latency, zero additional load on the database under normal operation, and complete change history including the before and after states of every row.

For ML feature pipelines, CDC solves the synchronization problem between transactional databases (the source of truth for customer state) and online feature stores (the sub-millisecond lookup store used during model serving). Without CDC, you poll the database on a schedule - introducing lag and database load. With CDC, every database commit triggers an immediate feature store update, keeping features within milliseconds of the ground truth.

How do you implement exactly-once semantics in a Kafka ML pipeline?

Exactly-once in Kafka requires both idempotent producers and transactional consumers. Idempotent producers: each producer is assigned a producer ID and attaches sequence numbers to messages. The broker deduplicates retries - if the same message is sent twice due to a network timeout, it is written once. Transactional consumers: the consumer commits its offset and the producer writes its output atomically within a single Kafka transaction. You call producer.begin_transaction(), produce output messages, call producer.send_offsets_to_transaction() to include the consumer offset commit, then producer.commit_transaction(). Either both the output messages and the offset commit succeed, or neither does.

The critical consumer configuration is isolation.level=read_committed - without this, consumers can read messages from aborted transactions, breaking the exactly-once guarantee on the consumer side.

What is consumer lag and how does it relate to ML feature freshness?

Consumer lag is the difference between the latest produced offset and the last committed consumer offset for a consumer group and partition. It represents the number of messages that have been produced but not yet consumed. In ML feature pipelines, lag directly maps to staleness: if your topic has 10,000 messages of lag and your producer is writing at 1,000 messages/second, your features are 10 seconds behind real-time events. This matters enormously for systems with tight latency SLAs - a fraud detection system scoring transactions based on 10-second-old balance features may miss rapid card-testing attacks that drain an account in seconds. Monitor lag per partition via get_watermark_offsets vs committed offsets, and alert when lag exceeds your feature freshness budget.

How would you design a Kafka topic structure for a real-time fraud detection system?

Three tiers: raw event topics (raw.transactions, raw.account_events, raw.device_events), feature topics (features.transaction_risk, features.account_state, features.device_fingerprint), and ML output topics (predictions.fraud_score, feedback.ground_truth, alerts.fraud_cases).

Key design decisions: partition by account ID (not transaction ID) so all events for an account go to the same partition, preserving per-account ordering for stateful feature computation. Use cleanup.policy=compact on feature topics - you need the latest state, not full history. Set replication.factor=3, min.insync.replicas=2 for all topics. Set retention.ms=604800000 (7 days) on raw topics for replay during model retraining. Apply FULL schema compatibility to all feature topics.

What is a Dead Letter Queue and why is it important for ML data pipelines?

A Dead Letter Queue (DLQ) is a secondary Kafka topic (conventionally named {original_topic}.dlq) where unprocessable messages are routed instead of being silently dropped or causing consumer crashes. When a message fails processing - due to schema violation, semantic error, or computation exception - the consumer captures the original message bytes, the error type, the stack trace, and a timestamp, then writes this diagnostic payload to the DLQ and commits the original message's offset.

For ML pipelines, DLQs are essential because silent message drops cause invisible feature gaps. If 0.1% of transactions are malformed and silently dropped, the model serves on incomplete feature history, potentially producing systematically biased scores. The DLQ makes every failure visible and auditable. In production, alert on DLQ throughput as a percentage of normal throughput - a spike in DLQ messages is a leading indicator that an upstream system changed its format.


Appendix: Kafka Throughput Math and Sizing

Understanding the relationship between partitions, consumers, and throughput is essential for capacity planning. The core model is simple:

total_throughput=num_partitions×throughput_per_partition\text{total\_throughput} = \text{num\_partitions} \times \text{throughput\_per\_partition}

Each partition is a sequential append-only log. A single partition can typically sustain 10–100 MB/s of write throughput depending on hardware. For ML feature pipelines processing JSON events at 1KB per event, this translates to roughly 10,000–100,000 events/second per partition.

The consumer parallelism bound:

max_consumer_parallelism=num_partitions\text{max\_consumer\_parallelism} = \text{num\_partitions}

You cannot have more active consumers in a group than partitions. If your feature pipeline needs to process 50,000 events/second and each consumer instance handles 5,000 events/second, you need at least 10 partitions. A safe rule is to provision 2–3x your current needs:

current rate: 50,000 events/sec
target headroom: 3x → 150,000 events/sec
per consumer: 5,000 events/sec
partitions: ceil(150,000 / 5,000) = 30 partitions

Replication and durability math:

For a topic with replication.factor=3 and min.insync.replicas=2:

  • Can tolerate 1 broker failure without data loss (2 of 3 replicas still in sync)
  • Can tolerate 1 broker failure without write unavailability
  • Cannot tolerate 2 simultaneous broker failures (only 1 replica in sync, below min.insync.replicas=2)

Retention and storage sizing:

storage_per_topic=throughput_MB_per_sec×retention_seconds×replication_factor\text{storage\_per\_topic} = \text{throughput\_MB\_per\_sec} \times \text{retention\_seconds} \times \text{replication\_factor}

For a raw transactions topic at 50MB/s, 7-day retention, replication factor 3: 50×604800×3=90.7 TB50 \times 604800 \times 3 = 90.7\text{ TB}

This is the storage cost of 7 days of retention across a 3-broker cluster. S3-backed tiered storage (Confluent Tiered Storage, Redpanda S3 tiering) can dramatically reduce local broker storage by offloading older segments to object storage.


The Feature Pipeline Architecture End-to-End

Pulling together all the concepts from this lesson, the production-grade ML feature pipeline looks like this:

Every component in this diagram maps to a concept from this lesson:

  • Debezium CDC powers real-time account feature updates without polling
  • Schema Registry enforces FULL compatibility on raw topics, preventing silent schema drift
  • DLQ routes malformed transactions to an observable failure queue
  • Exactly-once stream processor ensures feature values are correct even after failures
  • Compact feature topics retain only the latest value per account - no historical bloat
  • S3 Sink connector lands all three topic types (features, predictions, labels) to the training data lake
  • Consumer lag monitoring on the stream processor topic gives real-time feature freshness visibility

This is the architecture that runs fraud detection, credit scoring, and real-time personalization at the major financial institutions and tech companies building ML at scale.

© 2026 EngineersOfAI. All rights reserved.