Skip to main content

Message Queues and Kafka

The Feature Pipeline That Could Not Keep Up

The fraud detection team had a problem. Their model was excellent - 98.7% precision on holdout data. But in production, the fraud rate they were catching was closer to 60%. The gap was not the model. It was the features.

Their feature pipeline worked like this: when a transaction arrived, the API server called the feature store synchronously, which queried a Postgres database for the user's transaction history, computed velocity features (transactions in the last hour, last 24 hours, average amount), and returned them to the model server. Total latency: 180ms. Their SLA for fraud scoring was 200ms, so theoretically fine.

The problem was coupling. When transaction volume spiked 10x during a major sales event, the Postgres database became the bottleneck. Feature store latency climbed to 800ms. The model service started timing out. Engineers added circuit breakers that returned default features when the feature store was slow. The model ran on stale or default features. Fraud slipped through.

The root cause: the fraud scoring pipeline was synchronous and tightly coupled. A spike in transaction volume directly caused feature quality degradation, which directly caused model accuracy degradation. Three services tied together with synchronous calls, no buffering, no isolation between stages.

The team rebuilt the pipeline with Kafka. Transactions publish to a Kafka topic. A Faust stream processor consumes the topic, maintains a sliding window of transaction history in Redis, and continuously updates precomputed velocity features. The model server reads from the feature store asynchronously. When transaction volume spikes, Kafka absorbs the burst - messages queue up but nothing times out. The feature processor works through the backlog at its own pace. The model always has features, even if they are 5 seconds old during peak load instead of 50ms.

This is the core value proposition of message queues in ML systems: decoupling. The producer does not need to wait for the consumer. Services fail independently. Throughput spikes do not cascade into failures.

This lesson builds from that intuition to the specific patterns that make Kafka the backbone of production ML data infrastructure.


Why This Exists - The Problem Message Queues Solve

Direct synchronous communication between services creates three structural problems in ML systems:

Temporal coupling: Service A can only run when Service B is running. If B is down for a deploy, A fails or must wait. For ML training pipelines that run for days, this is untenable.

Throughput coupling: If A produces data faster than B can consume it, either A must slow down (backpressure) or data is dropped. In ML, your data ingestion rate is often determined by user behavior, which you cannot control. Your training pipeline or feature computation rate is determined by your infrastructure, which you can scale. Decoupling them with a buffer (the message queue) lets both run at their natural rates.

Spatial coupling: A must know B's address. In dynamic systems where instances scale up and down, maintaining direct connection state is complex. A message queue is a stable rendezvous point.

Message queues solve all three by introducing an intermediary that stores messages until consumers are ready to process them. The producer publishes and moves on. The consumer reads when it is ready.

For ML specifically, message queues appear in:

  • Feature pipelines: raw events in, computed features out, consumed by model servers
  • Training data pipelines: streaming data continuously written to training datasets
  • Prediction logging: every model prediction captured for monitoring and retraining
  • Model monitoring: real-time feature drift and prediction drift detection
  • Async inference: decouple request receipt from actual model inference

Historical Context - From Simple Queues to Distributed Logs

Message queuing has been in enterprise software since the 1980s. IBM's MQSeries (now IBM MQ) launched in 1992. Microsoft Message Queuing (MSMQ) in 1997. These early systems were point-to-point: one producer, one consumer per message.

AMQP (Advanced Message Queuing Protocol) in 2003 standardized the protocol for more complex routing. RabbitMQ, launched in 2007 and built on AMQP, introduced flexible routing, exchanges, and bindings that allowed sophisticated message routing patterns.

Apache Kafka was created at LinkedIn between 2009 and 2011 and open-sourced in 2011. Jay Kreps, Neha Narkhede, and Jun Rao built it because existing message systems could not handle LinkedIn's scale of activity stream data. The fundamental insight that differentiated Kafka: it is not a message queue in the traditional sense. It is a distributed commit log. Messages are not deleted when consumed - they are retained for a configurable period. Multiple consumers can read the same messages independently, each at their own pace. This single design decision opened up use cases that traditional message queues could not support: replaying historical data, adding new consumers without changing producers, and treating the log as the source of truth.

By 2020, Kafka was processing over 7 trillion messages per day at LinkedIn. At Uber, Kafka handles hundreds of billions of messages daily including real-time ML feature computation for surge pricing and fraud detection. At Netflix, Kafka is the backbone of their recommendation system's online learning pipeline.


Core Concepts

Kafka Architecture

Understanding Kafka's architecture is understanding why it can handle the scale of modern ML data pipelines.

Broker: A Kafka server process. Brokers store messages and serve consumer and producer requests. A Kafka cluster has multiple brokers for fault tolerance and throughput.

Topic: A named category of messages. Analogous to a database table. Topics are split into partitions for parallelism.

Partition: The unit of parallelism in Kafka. A topic with 12 partitions can be consumed by up to 12 consumers simultaneously. Messages within a partition are strictly ordered. Messages across partitions are not ordered relative to each other. This is the key tradeoff: more partitions = more parallelism but weaker ordering guarantees.

Offset: A sequential integer identifying each message within a partition. Consumer track their position (offset) in each partition independently. This allows replay: set your offset back to 0 and reprocess all historical messages.

Consumer Group: A set of consumers that jointly consume a topic. Each partition is assigned to exactly one consumer in the group. Consumer groups enable horizontal scaling: add a consumer to the group, Kafka rebalances so the new consumer gets some partitions.

Replication factor: How many copies of each partition to keep across brokers. Replication factor 3 means 3 brokers hold a copy, tolerating up to 2 broker failures. One copy is the "leader" (handles reads and writes), others are "followers."

Kafka Producer - Batching, Compression, and Acks

The Kafka producer has three parameters that dramatically affect both throughput and reliability:

Acks (acks setting): Controls when the producer considers a message "sent":

  • acks=0: Fire and forget. No confirmation. Maximum throughput, possible data loss.
  • acks=1: Leader acknowledges. Data safe as long as leader doesn't fail before replication.
  • acks=all (or acks=-1): All in-sync replicas acknowledge. No data loss under normal failures.

For ML training data and feature pipelines: use acks=all. You want to know your features are durably stored. For monitoring telemetry: acks=1 is acceptable - losing 0.1% of metric messages is fine.

Batching (linger.ms, batch.size): The producer buffers messages and sends them in batches. linger.ms=5 means "wait up to 5ms to accumulate a batch." More batching = higher throughput, higher latency. For ML feature pipelines processing millions of events/hour, batching is essential.

Compression (compression.type): Kafka supports gzip, snappy, lz4, and zstd. For ML feature data (JSON objects with repeated field names), compression ratios of 5-10x are common. LZ4 is recommended for ML: fast compression/decompression, good ratio.

Kafka Consumer - The Poll Loop and Offset Management

The Kafka consumer works fundamentally differently from traditional message queue consumers. There is no push. The consumer is always the one asking for messages via a poll loop.

consumer = KafkaConsumer(
"transactions",
bootstrap_servers=["kafka:9092"],
group_id="fraud-model",
)

while True:
records = consumer.poll(timeout_ms=1000)
for partition, messages in records.items():
for message in messages:
process(message)
# Commit offsets after processing
consumer.commit()

The poll loop calls poll() which fetches a batch of records from all assigned partitions. You process them, then commit the offsets to indicate "I have successfully processed up to this point."

Offset commit strategies:

  • Auto-commit (enable.auto.commit=True): Kafka automatically commits offsets every 5 seconds. Simple but dangerous: if your process crashes after committing but before processing, messages are lost. If it crashes after processing but before committing, messages are reprocessed.
  • Manual commit after processing (enable.auto.commit=False): Commit only after successfully processing each batch. Guarantees at-least-once delivery.
  • Transactional (exactly-once): Commit Kafka offset and write results atomically. More complex but eliminates duplicates.

For ML feature computation and prediction logging, at-least-once with idempotent processing is usually the right choice.

Exactly-Once Semantics

Kafka supports exactly-once delivery through idempotent producers and transactional API. This is important for ML use cases where duplicate processing would corrupt feature values or double-count predictions.

The idempotent producer assigns a sequence number to each message. If a network error causes a retry and the same message arrives twice, the broker deduplicates using the sequence number.

The transactional API allows atomically writing to multiple partitions and committing consumer offsets in a single transaction:

producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
enable_idempotence=True,
transactional_id="feature-processor-1", # Unique per producer instance
)

producer.init_transactions()
producer.begin_transaction()

# These writes are atomic
producer.send("features", key=b"user-123", value=computed_features)
producer.send("feature-audit", key=b"user-123", value=audit_record)

# Commit consumer offset as part of the same transaction
producer.send_offsets_to_transaction(
{TopicPartition("transactions", 0): OffsetAndMetadata(100, None)},
group_id="feature-processor"
)

producer.commit_transaction()
# If anything above fails, nothing is committed

Consumer Rebalancing

When a consumer joins or leaves a group, Kafka triggers a rebalance: all consumers in the group temporarily stop processing while Kafka reassigns partition ownership. During the rebalance, no progress is made.

For ML feature pipelines, rebalance latency can cause spikes in feature freshness. The remediation:

  1. Larger max.poll.interval.ms: Default is 5 minutes. Increase for ML workloads that do expensive per-batch processing (e.g., batch inference on consumed messages).

  2. Static membership (group.instance.id): Assign stable IDs to consumers. When a consumer restarts, it reclaims its partitions without triggering a full rebalance. Critical for ML feature processors running as Kubernetes pods.

  3. Cooperative rebalancing (partition.assignment.strategy=CooperativeStickyAssignor): Only reassign partitions that need to move, not all partitions. Consumers that keep their partitions do not stop processing.


Code Examples

Confluent-Kafka Producer for ML Feature Logging

"""
Kafka producer for ML prediction logging.
Every model prediction is logged to Kafka for:
- Model monitoring (drift detection)
- Training data collection for future retraining
- Audit trail for regulated use cases
"""
import json
import uuid
import time
import logging
from datetime import datetime, timezone
from typing import Any

from confluent_kafka import Producer, KafkaError

logger = logging.getLogger(__name__)


class MLPredictionLogger:
"""
Logs ML predictions to Kafka for monitoring and retraining.

Design decisions:
- Asynchronous delivery: on_delivery callback, not blocking
- LZ4 compression: fast, good ratio for JSON feature data
- High linger.ms: batch many predictions together for efficiency
- acks=1: we want confirmation but exact durability less critical
than prediction throughput. For compliance use cases, use acks=all.
"""

def __init__(
self,
bootstrap_servers: str,
topic: str = "ml-predictions",
model_name: str = "unknown",
):
self.topic = topic
self.model_name = model_name
self._errors = 0

self.producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "1", # Leader acknowledgment
"linger.ms": 10, # Batch up to 10ms for throughput
"batch.size": 32768, # 32KB batch size
"compression.type": "lz4", # Fast compression for JSON
"queue.buffering.max.messages": 100000,
"queue.buffering.max.kbytes": 102400,
"message.max.bytes": 1000000, # 1MB max message
})

def log_prediction(
self,
request_id: str,
features: dict[str, Any],
prediction: dict[str, Any],
model_version: str = "unknown",
latency_ms: float = 0.0,
metadata: dict = None,
) -> None:
"""
Log a prediction asynchronously.
Does NOT block - fire and forget with error callback.
"""
record = {
"event_id": str(uuid.uuid4()),
"request_id": request_id,
"model_name": self.model_name,
"model_version": model_version,
"timestamp": datetime.now(timezone.utc).isoformat(),
"features": features,
"prediction": prediction,
"latency_ms": latency_ms,
"metadata": metadata or {},
}

# Use request_id as partition key so same user's predictions
# always go to the same partition (ordered per user)
key = request_id.encode("utf-8")
value = json.dumps(record).encode("utf-8")

self.producer.produce(
self.topic,
key=key,
value=value,
on_delivery=self._delivery_callback,
)

# Non-blocking poll - handle delivery callbacks without waiting
self.producer.poll(0)

def _delivery_callback(self, err, msg):
"""Called asynchronously when message delivery is confirmed or fails."""
if err is not None:
self._errors += 1
logger.error(
f"Prediction log delivery failed: {err} "
f"[topic={msg.topic()}, partition={msg.partition()}]"
)
# On success: msg.offset() contains the committed offset

def flush(self, timeout_sec: float = 30.0) -> int:
"""
Flush all buffered messages.
Call at shutdown or after each inference batch.
Returns number of messages still in queue (should be 0).
"""
remaining = self.producer.flush(timeout=timeout_sec)
if remaining > 0:
logger.warning(
f"Kafka flush timeout: {remaining} messages not delivered"
)
return remaining

@property
def error_count(self) -> int:
return self._errors


class MLFeatureProducer:
"""
Produces computed features to Kafka for consumption by model servers.
Uses acks=all for feature durability - feature loss can degrade model quality.
"""

def __init__(
self,
bootstrap_servers: str,
topic: str = "ml-features",
):
self.topic = topic

self.producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "all", # All replicas must confirm
"enable.idempotence": True, # Prevent duplicate writes
"retries": 2147483647, # Retry indefinitely on transient failures
"max.in.flight.requests.per.connection": 5, # Required for idempotence
"compression.type": "lz4",
"linger.ms": 5,
})

def publish_features(
self,
entity_id: str,
feature_set: str,
features: dict[str, Any],
timestamp: float = None,
) -> None:
"""Publish computed features for an entity."""
record = {
"entity_id": entity_id,
"feature_set": feature_set,
"features": features,
"computed_at": timestamp or time.time(),
}

# Partition by entity_id for locality - all features for
# the same entity go to the same partition
self.producer.produce(
self.topic,
key=entity_id.encode("utf-8"),
value=json.dumps(record).encode("utf-8"),
on_delivery=lambda err, msg: (
logger.error(f"Feature publish failed: {err}") if err else None
),
)
self.producer.poll(0)

Kafka Consumer with Manual Offset Commits

"""
Kafka consumer for ML feature extraction from event stream.
Demonstrates manual offset commits, error handling,
graceful shutdown, and lag monitoring.
"""
import json
import signal
import logging
import threading
from typing import Callable

from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition

logger = logging.getLogger(__name__)


class MLEventConsumer:
"""
Kafka consumer for processing events in ML pipelines.

Key decisions:
- Manual offset commits for at-least-once delivery
- Commit after each batch, not per-message (for throughput)
- Graceful shutdown handling
- Consumer lag monitoring built in
"""

def __init__(
self,
bootstrap_servers: str,
topics: list[str],
group_id: str,
batch_size: int = 100,
poll_timeout_ms: float = 1000,
):
self.topics = topics
self.group_id = group_id
self.batch_size = batch_size
self.poll_timeout_ms = poll_timeout_ms
self._running = False
self._processed_count = 0
self._error_count = 0

self.consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": "earliest", # Start from beginning if no committed offset
"enable.auto.commit": False, # Manual commits for reliability
"max.poll.interval.ms": 300000, # 5 minutes for slow ML processing
"session.timeout.ms": 30000,
"heartbeat.interval.ms": 3000,
# Static membership: stable partition assignments, fewer rebalances
# "group.instance.id": f"{group_id}-{socket.gethostname()}",
"fetch.max.bytes": 52428800, # 50MB max fetch per poll
"max.partition.fetch.bytes": 10485760, # 10MB per partition
})

self.consumer.subscribe(topics)

# Handle SIGTERM gracefully (Kubernetes sends this before SIGKILL)
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)

def run(self, process_fn: Callable[[list], None]) -> None:
"""
Main consumption loop.

process_fn: called with a list of (key, value, partition, offset) tuples
for each batch of messages. Must be idempotent.

Commits offsets after successful process_fn completion.
On exception: does NOT commit - messages will be reprocessed after restart.
"""
logger.info(
f"Starting consumer group={self.group_id} topics={self.topics}"
)
self._running = True

try:
while self._running:
# Poll for a batch of messages
messages = []
deadline = time.time() + (self.poll_timeout_ms / 1000)

while len(messages) < self.batch_size and time.time() < deadline:
msg = self.consumer.poll(timeout=0.1)

if msg is None:
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Reached end of partition - not an error
continue
elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
logger.error(f"Topic not found: {msg.error()}")
raise KafkaException(msg.error())
else:
logger.warning(f"Consumer error: {msg.error()}")
self._error_count += 1
continue

messages.append({
"key": msg.key().decode("utf-8") if msg.key() else None,
"value": json.loads(msg.value().decode("utf-8")),
"partition": msg.partition(),
"offset": msg.offset(),
"timestamp": msg.timestamp()[1],
})

if not messages:
continue

# Process the batch
try:
process_fn(messages)
self._processed_count += len(messages)

# Commit after successful processing
self.consumer.commit(asynchronous=False)

except Exception as e:
logger.exception(
f"Failed to process batch of {len(messages)} messages: {e}"
)
self._error_count += len(messages)
# Do NOT commit - messages will be reprocessed
# This implements at-least-once delivery
# Make sure process_fn is idempotent

finally:
logger.info(
f"Consumer shutting down. "
f"Processed: {self._processed_count}, Errors: {self._error_count}"
)
self.consumer.close()

def _handle_shutdown(self, signum, frame):
"""Handle graceful shutdown signal from Kubernetes."""
logger.info(f"Received signal {signum}, shutting down gracefully")
self._running = False

def get_lag(self) -> dict[str, int]:
"""
Get current consumer lag per partition.
High lag = consumer is falling behind producers.
Alert if lag exceeds your SLA (e.g., >10k messages = >10s at 1k msg/s).
"""
lag = {}
assignment = self.consumer.assignment()

for partition in assignment:
# Get our committed offset
committed = self.consumer.committed([partition])[0]
committed_offset = committed.offset if committed else 0

# Get the latest available offset (end of partition)
high_watermark = self.consumer.get_watermark_offsets(partition)
end_offset = high_watermark[1]

lag[f"{partition.topic}:{partition.partition}"] = (
max(0, end_offset - committed_offset)
)

return lag

Kafka Consumer for Real-Time ML Feature Extraction

"""
Real-time feature extraction from transaction events.
Demonstrates the fraud detection pipeline pattern:
transactions -> Kafka -> feature computation -> feature store -> model.
"""
import json
import time
import redis
import logging
from collections import defaultdict

from confluent_kafka import Consumer, Producer

logger = logging.getLogger(__name__)

import time # already imported above but kept for clarity


class VelocityFeatureProcessor:
"""
Computes velocity features from transaction stream.

Features computed:
- transactions_last_1h: count in last 60 minutes
- transactions_last_24h: count in last 24 hours
- amount_last_1h: total amount in last 60 minutes
- avg_amount_last_24h: average amount in last 24 hours
- unique_merchants_last_1h: distinct merchant count
"""

WINDOWS = {
"1h": 3600,
"24h": 86400,
}

def __init__(
self,
kafka_bootstrap: str,
redis_host: str = "localhost",
redis_port: int = 6379,
input_topic: str = "transactions",
output_topic: str = "features",
group_id: str = "velocity-feature-processor",
):
self.input_topic = input_topic
self.output_topic = output_topic

self.consumer = Consumer({
"bootstrap.servers": kafka_bootstrap,
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"max.poll.interval.ms": 300000,
})
self.consumer.subscribe([input_topic])

self.producer = Producer({
"bootstrap.servers": kafka_bootstrap,
"acks": "all",
"enable.idempotence": True,
"compression.type": "lz4",
})

self.redis = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)

def _compute_velocity_features(self, user_id: str, transaction: dict) -> dict:
"""
Compute velocity features using Redis sorted sets.

Redis sorted set stores (transaction_id, timestamp) pairs.
ZRANGEBYSCORE with time window gives us recent transactions efficiently.
Time complexity: O(log N + M) where M = transactions in window.
"""
now = transaction.get("timestamp", time.time())
amount = float(transaction.get("amount", 0))
merchant_id = transaction.get("merchant_id", "unknown")
tx_id = transaction.get("transaction_id", str(now))

features = {}

for window_name, window_seconds in self.WINDOWS.items():
window_start = now - window_seconds
key_prefix = f"user:{user_id}:txn:{window_name}"
amount_key = f"user:{user_id}:amount:{window_name}"
merchant_key = f"user:{user_id}:merchants:{window_name}"

# Add current transaction to sorted set (score = timestamp)
pipe = self.redis.pipeline()

# Store transaction with timestamp as score
pipe.zadd(key_prefix, {tx_id: now})
pipe.expire(key_prefix, window_seconds * 2)

# Store amount in separate sorted set
pipe.zadd(amount_key, {f"{tx_id}:{amount}": now})
pipe.expire(amount_key, window_seconds * 2)

# Store merchant
pipe.zadd(merchant_key, {merchant_id: now})
pipe.expire(merchant_key, window_seconds * 2)

pipe.execute()

# Remove expired entries
self.redis.zremrangebyscore(key_prefix, 0, window_start)
self.redis.zremrangebyscore(amount_key, 0, window_start)
self.redis.zremrangebyscore(merchant_key, 0, window_start)

# Compute features from current window
tx_count = self.redis.zcount(key_prefix, window_start, now)
merchant_count = self.redis.zcount(merchant_key, window_start, now)

# Sum amounts for this window
amount_entries = self.redis.zrangebyscore(
amount_key, window_start, now, withscores=False
)
total_amount = sum(
float(entry.split(":")[1])
for entry in amount_entries
if ":" in entry
)

features[f"tx_count_{window_name}"] = tx_count
features[f"total_amount_{window_name}"] = total_amount
features[f"avg_amount_{window_name}"] = (
total_amount / tx_count if tx_count > 0 else 0.0
)
features[f"unique_merchants_{window_name}"] = merchant_count

return features

def process(self) -> None:
"""Main processing loop."""
logger.info("Starting velocity feature processor")

while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue

if msg.error():
logger.warning(f"Consumer error: {msg.error()}")
continue

try:
transaction = json.loads(msg.value().decode("utf-8"))
user_id = transaction.get("user_id")

if not user_id:
logger.warning("Transaction missing user_id, skipping")
continue

# Compute velocity features
features = self._compute_velocity_features(user_id, transaction)

# Add raw transaction features
features["current_amount"] = float(transaction.get("amount", 0))
features["is_online"] = transaction.get("is_online", False)
features["merchant_category"] = transaction.get("merchant_category", "unknown")

# Publish computed features to output topic
feature_record = {
"user_id": user_id,
"transaction_id": transaction.get("transaction_id"),
"features": features,
"computed_at": time.time(),
}

self.producer.produce(
self.output_topic,
key=user_id.encode("utf-8"),
value=json.dumps(feature_record).encode("utf-8"),
)
self.producer.poll(0)

# Commit after each message for simplicity
# In production, commit in batches for throughput
self.consumer.commit(asynchronous=False)

except Exception as e:
logger.exception(f"Failed to process transaction: {e}")
# Do not commit - reprocess this message on restart

Faust Stream Processor for Feature Computation

"""
Faust stream processor for ML feature computation.
Faust is Python's answer to Kafka Streams - expressive, type-safe,
and integrates naturally with Python ML libraries.

Install: pip install faust-streaming
"""
import faust
import json
import time
from datetime import datetime, timezone


# Define record types for type safety
class Transaction(faust.Record, serializer="json"):
transaction_id: str
user_id: str
amount: float
merchant_id: str
merchant_category: str
timestamp: float
is_online: bool = False


class UserFeatures(faust.Record, serializer="json"):
user_id: str
tx_count_1h: int
tx_count_24h: int
total_amount_1h: float
total_amount_24h: float
unique_merchants_1h: int
computed_at: float


# Create Faust app
app = faust.App(
"ml-feature-processor",
broker="kafka://localhost:9092",
value_serializer="json",
# Store intermediate state in RocksDB (persistent across restarts)
store="rocksdb://",
)

# Define topics
transactions_topic = app.topic("transactions", value_type=Transaction)
features_topic = app.topic("ml-features", value_type=UserFeatures)

# Windowed state tables for velocity features
# hopping windows: count transactions in rolling 1h and 24h windows
tx_count_1h = app.Table(
"tx-count-1h",
default=int,
help="Transaction count per user in last 1 hour",
)

amount_sum_1h = app.Table(
"amount-sum-1h",
default=float,
help="Transaction amount sum per user in last 1 hour",
)


@app.agent(transactions_topic)
async def process_transaction(transactions):
"""
Process each transaction and emit computed features.

Faust agent: consumes from transactions_topic,
updates state tables, publishes to features_topic.
"""
async for transaction in transactions:
# Update rolling counts
user_key = transaction.user_id

# Increment transaction count
tx_count_1h[user_key] += 1
amount_sum_1h[user_key] += transaction.amount

# Compute features from current state
features = UserFeatures(
user_id=user_key,
tx_count_1h=tx_count_1h[user_key],
tx_count_24h=tx_count_1h[user_key], # Simplified - use separate table in prod
total_amount_1h=amount_sum_1h[user_key],
total_amount_24h=amount_sum_1h[user_key],
unique_merchants_1h=1, # Simplified - use HyperLogLog in prod
computed_at=time.time(),
)

# Publish features (key = user_id for co-partitioning with model serving)
await features_topic.send(key=user_key, value=features)


@app.timer(interval=60.0)
async def log_processing_stats():
"""Log feature pipeline health every 60 seconds."""
app.log.info(
f"Feature pipeline alive. "
f"Tables: tx_count_1h={len(tx_count_1h)}"
)

Dead Letter Queue Pattern for ML Pipelines

"""
Dead Letter Queue (DLQ) pattern for ML event processing.
When a message cannot be processed (bad schema, computation error),
route it to a DLQ instead of blocking the main pipeline.

DLQ messages can be:
- Inspected for debugging
- Replayed after fixing the processing bug
- Used to identify schema drift in upstream producers
"""
import json
import logging
import traceback
from datetime import datetime, timezone
from typing import Callable

from confluent_kafka import Consumer, Producer

logger = logging.getLogger(__name__)


class DLQConsumer:
"""
Kafka consumer with dead letter queue support.

Failed messages are written to {topic}-dlq with error metadata.
Main pipeline continues processing without blocking on bad messages.
"""

def __init__(
self,
bootstrap_servers: str,
input_topic: str,
group_id: str,
max_retries: int = 3,
):
self.input_topic = input_topic
self.dlq_topic = f"{input_topic}-dlq"
self.max_retries = max_retries

self.consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
})
self.consumer.subscribe([input_topic])

self.dlq_producer = Producer({
"bootstrap.servers": bootstrap_servers,
"acks": "all",
})

def run(self, process_fn: Callable[[dict], None]) -> None:
"""
Consume messages, sending failures to DLQ after max_retries.
"""
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue

if msg.error():
logger.warning(f"Consumer error: {msg.error()}")
continue

raw_value = msg.value()
last_error = None

# Retry logic for transient failures
for attempt in range(self.max_retries):
try:
value = json.loads(raw_value.decode("utf-8"))
process_fn(value)
last_error = None
break # Success

except json.JSONDecodeError as e:
# Schema error - no point retrying
last_error = e
break

except Exception as e:
last_error = e
logger.warning(
f"Processing attempt {attempt + 1}/{self.max_retries} failed: {e}"
)
if attempt < self.max_retries - 1:
import time
time.sleep(0.1 * (2 ** attempt))

if last_error is not None:
# All retries exhausted - send to DLQ
self._send_to_dlq(
raw_value=raw_value,
key=msg.key(),
partition=msg.partition(),
offset=msg.offset(),
error=last_error,
)

# Always commit - even failed messages are "done" (sent to DLQ)
self.consumer.commit(asynchronous=False)

def _send_to_dlq(
self,
raw_value: bytes,
key: bytes,
partition: int,
offset: int,
error: Exception,
) -> None:
"""Route failed message to DLQ with error metadata."""
dlq_record = {
"original_topic": self.input_topic,
"original_partition": partition,
"original_offset": offset,
"original_key": key.decode("utf-8") if key else None,
"error_type": type(error).__name__,
"error_message": str(error),
"error_traceback": traceback.format_exc(),
"failed_at": datetime.now(timezone.utc).isoformat(),
"raw_value": raw_value.decode("utf-8", errors="replace"),
}

self.dlq_producer.produce(
self.dlq_topic,
key=key,
value=json.dumps(dlq_record).encode("utf-8"),
)
self.dlq_producer.flush(timeout=5.0)

logger.error(
f"Message sent to DLQ: partition={partition} offset={offset} "
f"error={type(error).__name__}: {error}"
)

Production Engineering Notes

Kafka for ML - Topic Design

Topic design is a strategic decision that affects performance, scalability, and operational complexity:

ML Platform Kafka Topic Taxonomy:
- raw.transactions - Upstream source events (high volume)
- features.user-velocity - Computed velocity features
- features.item-embeddings - Precomputed item representations
- predictions.fraud-v2 - Fraud model predictions
- predictions.recs-v3 - Recommendation model predictions
- labels.fraud-feedback - Delayed fraud labels for monitoring
- models.training-triggers - Events that trigger retraining
- monitoring.feature-drift - Feature distribution statistics
- monitoring.pred-drift - Prediction distribution statistics

Partition count guidance for ML:

  • Input topics (transactions): 12-48 partitions for high parallelism
  • Feature topics: match your model server replica count (e.g., 8 partitions for 8 replicas)
  • Prediction logging: 1 partition per model replica to preserve per-request ordering

Redis Streams vs Kafka

For lower-scale ML applications, Redis Streams is a viable alternative to Kafka:

DimensionRedis StreamsKafka
Throughput100k-1M msgs/sec1M-10M msgs/sec
RetentionLimited by RAMTB-scale on disk
ReplayYes (XREAD from ID)Yes (offset-based)
Consumer groupsYesYes
Operational complexityLow (if you have Redis)High
Best for ML use caseFeature store updates, small ML appsLarge-scale data pipelines

RabbitMQ vs Kafka

Choose RabbitMQ when:

  • You need complex routing logic (exchanges, bindings, routing keys)
  • Message TTL and per-message priorities matter
  • You do not need replay - messages are consumed once
  • Team already operates RabbitMQ

Choose Kafka when:

  • You need replay for debugging, reprocessing, or adding new consumers
  • High throughput (>100k msgs/sec)
  • Multiple independent consumer groups (ML training, monitoring, fraud all reading the same event stream)
  • Event sourcing - Kafka log as source of truth

For ML platforms, Kafka is almost always the right choice because replay (for retraining, debugging, and backfills) is invaluable.


Common Mistakes

:::danger Auto-Commit Without Idempotent Processing

enable.auto.commit=True commits offsets every 5 seconds regardless of whether your processing succeeded. If your ML feature computation crashes after the auto-commit but before writing to the feature store, those events are marked as consumed and will never be reprocessed. Your feature store has gaps.

Equally problematic: if your process crashes after writing to the feature store but before the auto-commit fires, those events will be reprocessed. If your feature update is not idempotent (e.g., you increment a counter), you get double-counted features.

Fix: Use enable.auto.commit=False. Commit manually after successful processing. Make your processing logic idempotent (overwrite, do not increment).

# Wrong
consumer = Consumer({
"enable.auto.commit": True # Silent data quality corruption
})

# Correct
consumer = Consumer({
"enable.auto.commit": False
})
# After successful processing:
consumer.commit(asynchronous=False)

:::

:::danger No Consumer Lag Monitoring

If your Kafka consumers fall behind producers, the lag grows silently. In ML feature pipelines, high lag means your model is running on stale features. In prediction logging, high lag means your monitoring system is delayed. Neither condition triggers any alert by default.

A 10-minute feature lag in a fraud detection system means you are scoring transactions with features that are 10 minutes old. During a fraud attack, velocity features computed 10 minutes ago are useless.

Monitor consumer lag continuously:

# Check lag from command line
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group fraud-model --describe

# Alert thresholds for ML feature consumers:
# WARNING: lag > 1000 messages (about 1 second at 1k msg/s)
# CRITICAL: lag > 10000 messages (about 10 seconds)

Set up alerts in your monitoring system. Consumer lag is a leading indicator of feature freshness degradation. :::

:::warning Unkeyed Messages Lose Ordering Guarantees

Kafka guarantees ordering within a partition but not across partitions. If you produce messages without a key, Kafka distributes them round-robin across all partitions. Transactions for the same user end up in different partitions, consumed by different consumers, potentially in different orders.

For ML feature computation, this matters: if transaction T1 (creates account) and T2 (first purchase) for the same user end up in different partitions, T2 might be processed before T1, producing incorrect features.

Always set a meaningful partition key:

# Wrong - round-robin distribution, no ordering guarantee per user
producer.produce(topic, value=event_bytes)

# Correct - user's events always go to the same partition
producer.produce(
topic,
key=user_id.encode("utf-8"), # All user's events, same partition
value=event_bytes,
)

:::

:::warning Oversizing Partitions During Topic Creation

The number of partitions in a Kafka topic can only be increased, never decreased. Over-partitioning wastes resources: each partition has overhead (file handles, memory, ZooKeeper metadata). Under-partitioning limits throughput. For ML feature topics, start with a reasonable number (match your consumer replica count) and increase only when needed.

A topic with 200 partitions but only 5 consumers wastes 195 partitions of file handle and memory overhead for no benefit. The 5 consumers will each handle 40 partitions, which does not improve throughput beyond 5 consumers' worth.

Rule of thumb: Start with partitions = max_expected_consumer_parallelism. For a feature processor that will run at most 12 replicas, start with 12 partitions. :::


Interview Q&A

Q1: Explain why Kafka is a distributed commit log, not a traditional message queue, and why this distinction matters for ML.

Traditional message queues (RabbitMQ, SQS) delete messages after they are consumed. Kafka retains all messages for a configurable retention period (often 7 days) regardless of consumption. Multiple independent consumer groups can read the same topic at their own pace, each maintaining their own offset pointer.

This distinction matters enormously for ML because of the need for replay. In a traditional message queue, once your feature processor has consumed a transaction, it is gone. If you deploy a new feature transformation, you cannot backfill features for historical data without maintaining a separate archive. With Kafka, you simply start a new consumer group with auto.offset.reset=earliest, and it reprocesses all retained messages from the beginning. This enables: (1) backfilling features when you add a new feature to your set, (2) debugging by replaying events that caused bad model behavior, (3) adding new downstream consumers (a new monitoring service, a new model) without any changes to producers.

Q2: What are the three Kafka delivery semantics and when is each appropriate for ML?

At-most-once: messages may be lost but never duplicated. Auto-commit with no retries. Appropriate for high-frequency monitoring telemetry where losing 0.01% of metric messages is acceptable and throughput matters more than completeness.

At-least-once: messages are never lost but may be duplicated. Manual commits after successful processing with retries on failure. Appropriate for ML feature computation and prediction logging where losing data is unacceptable, but you can make processing idempotent (overwrite features rather than increment).

Exactly-once: messages are delivered and processed exactly once, with no loss and no duplicates. Requires idempotent producers (enable.idempotence=True) and transactional consumer-producer loops. Appropriate for financial ML applications (fraud labels, transaction amounts) where duplicates would corrupt model training data or cause regulatory issues. Higher complexity and lower throughput than at-least-once.

For most ML pipelines, at-least-once with idempotent processing is the pragmatic choice.

Q3: Explain Kafka consumer groups and how they enable horizontal scaling of ML feature processors.

A consumer group is a set of consumers with the same group.id that jointly consume a topic. Kafka assigns each partition to exactly one consumer in the group. Two consumers in the same group never read the same partition simultaneously - this is the point-to-point delivery within a group.

Horizontal scaling works by adding consumers to the group. A topic with 12 partitions can be consumed by up to 12 consumers simultaneously, each handling one partition. Adding a 13th consumer gives you zero additional throughput - one consumer will be idle with no partition assigned.

For ML feature processors: if your velocity feature computation can process 10k transactions/second per process, but your transaction topic has 100k msgs/second, you need 10 consumer instances in the same group. With 12 partitions: run 10-12 consumers, each processing one partition independently and in parallel. The feature computation throughput scales linearly with consumer count up to the partition limit.

Different ML systems (fraud model, recommendation system, monitoring) use different consumer groups. Each group gets its own independent view of the topic at its own position. The fraud consumer group's slow processing does not affect the recommendation consumer group's position.

Q4: What is consumer lag and how does it affect ML feature freshness?

Consumer lag is the difference between the latest message offset in a partition and the consumer's committed offset. Lag = messages_waiting_to_be_processed. At a processing rate of 1000 messages/second, a lag of 10,000 means your consumer is 10 seconds behind real-time.

For ML feature freshness, consumer lag directly translates to feature staleness. If your velocity feature processor has 30,000 messages of lag in your transaction topic at 3000 messages/second, your model is scoring transactions with velocity features computed on transaction data that is 10 seconds old. During a fraud attack where an attacker runs 50 fraudulent transactions in 30 seconds, features that are 10 seconds stale will have no visibility into the attack pattern.

Monitor lag continuously with kafka-consumer-groups.sh --describe or via Kafka's JMX metrics. Alert when lag exceeds your feature freshness SLA. The remediation is to either add consumer instances (if throughput is the bottleneck) or optimize the processing logic (if compute is the bottleneck).

Q5: Compare Kafka Streams vs Faust for ML feature computation in Python shops.

Kafka Streams is a Java library that runs as part of the JVM process consuming Kafka. It provides stateful stream processing with persistent RocksDB state stores, windowed aggregations, and exactly-once processing semantics. It is mature, battle-tested, and has excellent performance. However, it requires JVM deployment in a Python ML stack, adding operational complexity.

Faust is a Python implementation of the Kafka Streams API by the Robinhood team. It provides the same programming model (agents, tables, windowed aggregations) in pure Python. This is compelling for ML teams because the feature computation logic can call Python ML libraries directly - compute embeddings with PyTorch, run sklearn transformations, call custom Python feature functions.

For Python ML shops: use Faust for feature computation that requires Python libraries (custom tokenizers, domain-specific feature logic). Use Kafka Streams (Java) for high-throughput, low-latency pipelines where you need microsecond processing and battle-tested exactly-once semantics. The choice is often organizational - if your team does not write Java, Faust in Python is more maintainable despite the performance difference.

Q6: What is the exactly-once semantics pattern in Kafka and when is it worth the complexity cost?

Exactly-once delivery in Kafka requires two components: idempotent producers and transactional consumers. An idempotent producer assigns a sequence number to each message; the broker deduplicates retries using the sequence number, so network-level retries do not produce duplicates. The transactional API allows atomically committing both the results of processing (writes to output topics) and the consumer offset, ensuring that a failure between processing and committing cannot cause partial results.

The complexity cost is real: transactions require a transactional_id unique per producer instance (requires stable instance identifiers - challenging with ephemeral Kubernetes pods), increase latency by 10-20% due to the two-phase commit protocol, and require careful handling of transaction aborts and zombie transactions (old instances continuing to write after a restart).

Worth the cost for: financial ML applications where duplicate feature writes could corrupt training data used for regulatory reporting, fraud label pipelines where double-counting labels would bias model training, and ML pipelines where idempotent processing is genuinely difficult to implement (e.g., incremental aggregations that cannot simply be overwritten).

Not worth the cost for: prediction logging (duplicates are harmless), feature pipelines where you can implement idempotent writes (overwrite by entity ID), and monitoring telemetry.

© 2026 EngineersOfAI. All rights reserved.