Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Kafka Architecture demo on the EngineersOfAI Playground - no code required. :::

Apache Kafka Architecture - The Nervous System of Real-Time ML

import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';

Reading time: ~40 minutes | Interview relevance: Very High | Target roles: Data Engineer, ML Engineer, AI Platform Engineer, Backend Engineer


The Real Production Moment

It is 2011 at LinkedIn. The platform has 175 activity streams: page views, clicks, search queries, job applications, connection requests, profile updates, InMail messages, endorsements. Each stream has its own ingestion pipeline. The site reliability team draws out the data flow on a whiteboard and steps back. What they see is a web - no, a spiderweb. Every producer service is directly connected to every consumer service that needs its data. 175 sources, dozens of destinations. If you want to add a new consumer - say, a new ML model for "People You May Know" recommendations - you have to coordinate with every producer team to wire up a new direct feed. The complexity is O(N×M)O(N \times M) where N is producers and M is consumers. Adding one producer means wiring it to all existing consumers. Adding one consumer means convincing all existing producers to feed it.

The team - Jay Kreps, Neha Narkhede, and Jun Rao - recognizes the pattern. This is not a data problem. This is an integration architecture problem. They have accidentally recreated the enterprise service bus anti-pattern, but at internet scale. Every company that reaches a certain size hits this wall.

Their insight: replace the point-to-point web with a central log. Every producer writes to one place. Every consumer reads from one place. Producers and consumers are fully decoupled. Adding a new consumer requires zero changes to any producer. Adding a new producer requires zero changes to any consumer. The O(N×M)O(N \times M) complexity collapses to O(N+M)O(N + M). They call it Kafka - named after Franz Kafka because it is "a system optimized for writing." It is open-sourced in 2011 and becomes the defining infrastructure of the modern data platform.

Today, LinkedIn processes 7 trillion Kafka messages per day. Netflix uses Kafka as the backbone for its entire data infrastructure - recommendation signals, viewing events, error logs, A/B test results. Uber uses Kafka to coordinate real-time surge pricing, dispatching, and driver-rider matching. Every major technology company running ML at scale runs Kafka. When an ML engineer asks "how does this event get from the mobile app to the fraud model," the answer, almost universally, is: through Kafka.

Understanding Kafka deeply - not just how to produce and consume, but how partitions work, how replication guarantees durability, why consumer groups enable horizontal scaling, and what makes it fast - is one of the highest-leverage skills for any engineer building ML systems. This is what this lesson teaches.


Why This Exists - The N×M Integration Problem

Before Kafka (and systems like it), data integration at scale looked like this: every system that needed data from another system had a direct pipeline to it. A recommendation system needing clickstream data had a direct feed from the clickstream service. A fraud detection system needing transaction data had a direct feed from the payment service. A feature pipeline needing user events had direct feeds from every service that generates user events.

The problems multiply as the organization grows:

  1. N×M coupling: If you have 10 producer services and 10 consumer services, you potentially have 100 direct connections. Every connection is a piece of code someone owns, monitors, and fixes when it breaks.

  2. Different protocols: The clickstream service might push via HTTP. The payment service might write to a PostgreSQL queue. The user service might use an internal RPC. Consumers must implement different clients for each producer.

  3. No independent scaling: If the recommendation team wants to consume events faster, they must coordinate with the producer team to increase throughput on the producer side.

  4. No replay: If a consumer has a bug and processes events incorrectly, the data is gone. There is no way to reprocess historical events.

  5. No multi-consumer fan-out: If three teams all need the same event stream, the producer must maintain three separate feeds.

Kafka solves all five problems:

  1. O(N+M)O(N + M) coupling: producers write to Kafka topics; consumers read from Kafka topics. No direct connections.
  2. Single protocol: all producers and consumers use the Kafka protocol.
  3. Independent scaling: consumer throughput is controlled by adding consumer instances, independent of producers.
  4. Replay: Kafka retains data for a configurable duration (days to weeks). Consumers can reset their offset and replay from any point.
  5. Fan-out: unlimited consumer groups can read the same topic independently, each at their own pace, without impacting each other.

The Distributed Commit Log

The central abstraction in Kafka is the distributed commit log (also called an append-only log, or just "the log"). Understanding this abstraction is the key to understanding everything else.

A commit log is:

  • Append-only: you can only add records to the end; you cannot update or delete existing records
  • Ordered: records are assigned sequential offsets; the log is totally ordered within a partition
  • Immutable: once written, a record cannot be changed
  • Durable: records are written to disk and replicated across multiple brokers

This is not a new idea. Databases have used commit logs (write-ahead logs, redo logs) for decades to provide crash recovery. What Kafka does is make the commit log itself the primary data structure - not an implementation detail of a database, but the core interface through which all data flows.

The implications for ML are profound. Because the log retains all events in order, you can:

  • Reprocess any historical window for feature backfills
  • Rebuild your feature store from scratch by replaying from offset 0
  • Join two event streams by reading both in parallel at the same logical time
  • Debug production issues by replaying exactly what happened

Topics, Partitions, and Offsets

A topic is a logical channel for a category of events. payment-transactions, user-logins, model-predictions, feature-updates are all topics. Topics are what producers write to and consumers read from.

A partition is the physical subdivision of a topic. When you create a topic with 12 partitions, Kafka splits the event stream into 12 independent sub-streams. Each partition is an independent ordered log. Events within a partition are totally ordered (by their offset). Events across partitions have no ordering guarantee.

An offset is the position of an event within a partition - a monotonically increasing integer starting at 0. The triple (topic, partition, offset) uniquely identifies any event in Kafka.

Topic: payment-transactions (12 partitions)

Partition 0: [offset 0] [offset 1] [offset 2] [offset 3] → ...
Partition 1: [offset 0] [offset 1] [offset 2] [offset 3] → ...
Partition 2: [offset 0] [offset 1] [offset 2] [offset 3] → ...
...
Partition 11: [offset 0] [offset 1] [offset 2] [offset 3] → ...

How partitioning works: when a producer sends a message, Kafka assigns it to a partition. The assignment follows this priority:

  1. If the producer specifies a partition explicitly, use it.
  2. If the producer specifies a key (user_id, transaction_id, etc.), hash the key modulo the partition count: partition = hash(key) % num_partitions. All events with the same key go to the same partition - and therefore are ordered relative to each other.
  3. If no key, use round-robin across partitions (default for throughput).

Why partition count matters for ML: partition count determines parallelism. If your topic has 12 partitions and you run 12 consumer instances, each instance gets exactly one partition. Maximum parallelism = partition count. If you have 6 consumer instances and 12 partitions, each instance gets 2 partitions. If you have 24 consumer instances and 12 partitions, 12 of them have no partitions assigned and sit idle. You cannot consume a topic more parallelally than its partition count.

For ML feature pipelines:

  • Low-volume topics (10K events/second): 6-12 partitions
  • Medium-volume topics (100K events/second): 24-48 partitions
  • High-volume topics (1M+ events/second): 96-192 partitions
warning

Partition count cannot be decreased after a topic is created (without deleting and recreating it). Keys that hash to partitions 0-11 in a 12-partition topic will hash to different partitions in a 24-partition topic. This breaks key-based ordering and can corrupt stateful stream processing operators that depend on all events for a key arriving at the same partition. Plan partition counts carefully at topic creation time. Over-provision rather than under-provision.


Brokers, Clusters, and Replication

A Kafka broker is a server that stores partitions and serves producers and consumers. A Kafka cluster is a group of brokers. In production, you run at least 3 brokers.

Partitions are distributed across brokers for fault tolerance and load balancing. For a 12-partition topic on a 3-broker cluster, each broker stores 4 partitions. But Kafka does not just store one copy - it replicates each partition across multiple brokers.

Replication factor (replication.factor) determines how many copies of each partition exist. With replication.factor=3, each partition has 3 copies: one leader and two followers (ISR members).

  • Leader partition: handles all reads and writes for that partition. Producers always write to the leader. Consumers (by default) read from the leader.
  • Follower partitions: replicate data from the leader. They send fetch requests to the leader and copy new records.
  • ISR (In-Sync Replicas): the set of replicas that are fully caught up with the leader. A replica must be within replica.lag.time.max.ms (default: 30 seconds) of the leader to be in-sync.
Partition 0 (replication.factor=3):
Broker 1: LEADER ← producers write here; consumers read here
Broker 2: FOLLOWER (ISR) ← replicates from leader
Broker 3: FOLLOWER (ISR) ← replicates from leader

What happens when a broker fails: if the broker hosting the leader of Partition 0 crashes, Kafka elects one of the in-sync followers as the new leader. Producers and consumers automatically discover the new leader (via metadata fetched from any live broker) and reconnect. This failover typically takes 5-30 seconds in practice.

min.insync.replicas: this topic/broker configuration says "at minimum, this many replicas must acknowledge a write before it is considered committed." With min.insync.replicas=2 and replication.factor=3, Kafka requires at least 2 brokers (the leader + 1 follower) to acknowledge each write. If only the leader is available (2 followers are down or lagging), writes will fail - the producer receives a NOT_ENOUGH_REPLICAS error.

This is the durability tradeoff: stricter min.insync.replicas means higher durability (a write is not acknowledged until it is on multiple brokers), but also higher write latency and reduced availability (a write fails if not enough replicas are available).

For ML feature pipelines:

  • Training data topics: replication.factor=3, min.insync.replicas=2 - you cannot afford to lose training events
  • Prediction logging topics: replication.factor=3, min.insync.replicas=1 - occasional loss is acceptable; high availability is more important

Producers - Batching, Compression, and Acks

The Kafka producer is not a naive "send one message, get one acknowledgment" mechanism. Understanding its internal behavior is essential for building high-throughput, reliable ML event pipelines.

Batching: producers accumulate messages into batches before sending. Two config parameters control this:

  • batch.size (default: 16KB): the maximum size of a batch in bytes. When a batch reaches this size, it is sent immediately.
  • linger.ms (default: 0): how long to wait for more messages before sending an incomplete batch. Setting linger.ms=5 means the producer waits up to 5ms to accumulate more messages before sending, increasing throughput at the cost of 0-5ms additional latency.

For ML pipelines, linger.ms=5-20ms is typically acceptable and significantly improves throughput.

Compression: producers can compress batches before sending. Options: none, gzip, snappy, lz4, zstd. LZ4 provides the best balance of compression speed and ratio for JSON event payloads. For ML feature events (typically JSON), compression often achieves 3-5× size reduction.

Acks: the acks configuration controls when the producer considers a send complete:

  • acks=0: fire-and-forget. No acknowledgment. Maximum throughput, no durability.
  • acks=1: leader acknowledgment only. The leader writes to disk and acknowledges, but followers may not have replicated yet. If the leader fails before replication, the event is lost.
  • acks=all (equivalent to acks=-1): all ISR members must acknowledge. Highest durability, higher latency.

Idempotent producers: setting enable.idempotence=True ensures that even if the producer retries a send (due to a network timeout), the broker deduplicates it and the message is written exactly once. This is implemented using a producer ID (PID) and sequence numbers per partition. Idempotent producers are required for exactly-once semantics.

# Production Kafka producer for ML feature events (confluent-kafka)
from confluent_kafka import Producer, KafkaException
from confluent_kafka.admin import AdminClient, NewTopic
import json
import time
import logging
from typing import Optional, Dict, Any
from dataclasses import dataclass, asdict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class FeatureEvent:
user_id: str
feature_name: str
feature_value: float
event_time: float # Unix timestamp seconds
model_version: str


def create_producer(bootstrap_servers: str = "localhost:9092") -> Producer:
"""
Create a Kafka producer configured for reliable ML feature event delivery.

Key settings:
- acks=all: wait for all ISR replicas (durability)
- enable.idempotence=True: exactly-once per partition
- compression.type=lz4: fast compression for JSON payloads
- linger.ms=10: wait 10ms to batch more messages (throughput)
- batch.size=65536: 64KB batches
- retries=5: retry on transient failures
"""
config = {
"bootstrap.servers": bootstrap_servers,
"acks": "all",
"enable.idempotence": True,
"compression.type": "lz4",
"linger.ms": 10,
"batch.size": 65536,
"retries": 5,
"retry.backoff.ms": 100,
"max.in.flight.requests.per.connection": 5, # max 5 with idempotence
"delivery.timeout.ms": 30000,
}
return Producer(config)


def delivery_report(err, msg):
"""
Callback invoked by confluent-kafka on message delivery or failure.
Called from the producer's poll() loop - do not block here.
"""
if err is not None:
logger.error(
f"Delivery failed for topic={msg.topic()} "
f"partition={msg.partition()} offset={msg.offset()}: {err}"
)
# In production: increment a failure counter, trigger an alert
# if failure rate exceeds threshold
else:
logger.debug(
f"Delivered to topic={msg.topic()} "
f"partition={msg.partition()} offset={msg.offset()} "
f"latency={msg.latency():.3f}s"
)


def publish_feature_event(
producer: Producer,
topic: str,
event: FeatureEvent,
headers: Optional[Dict[str, str]] = None,
) -> None:
"""
Publish a feature event to Kafka.

Key: user_id - ensures all events for the same user go to the same
partition, maintaining per-user ordering.
"""
payload = json.dumps(asdict(event)).encode("utf-8")
key = event.user_id.encode("utf-8")

kafka_headers = [(k, v.encode()) for k, v in (headers or {}).items()]

while True:
try:
producer.produce(
topic=topic,
key=key,
value=payload,
headers=kafka_headers,
on_delivery=delivery_report,
)
# Poll to trigger delivery callbacks for accumulated messages
producer.poll(0)
break
except BufferError:
# Internal queue full - poll to flush some messages, then retry
logger.warning("Producer queue full, polling to drain...")
producer.poll(0.1)


def flush_producer(producer: Producer, timeout_seconds: float = 10.0) -> int:
"""
Flush all pending messages. Returns number of messages still in queue
(non-zero means flush timed out).
"""
remaining = producer.flush(timeout=timeout_seconds)
if remaining > 0:
logger.error(f"Producer flush timed out with {remaining} messages undelivered")
return remaining


# Example usage
if __name__ == "__main__":
BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC = "ml-feature-events"

producer = create_producer(BOOTSTRAP_SERVERS)

# Simulate a burst of feature events (e.g., from a real-time feature computation job)
events = [
FeatureEvent(
user_id=f"user_{i:06d}",
feature_name="tx_count_1h",
feature_value=float(i % 50),
event_time=time.time(),
model_version="fraud-v2.1",
)
for i in range(1000)
]

start = time.time()
for event in events:
publish_feature_event(producer, TOPIC, event)

undelivered = flush_producer(producer, timeout_seconds=30.0)
elapsed = time.time() - start

logger.info(f"Published 1000 events in {elapsed:.2f}s ({1000/elapsed:.0f} events/sec)")
logger.info(f"Undelivered messages: {undelivered}")

Consumer Groups - Parallel Consumption

The consumer group is Kafka's mechanism for horizontal scaling of consumption. A consumer group is a set of consumer instances that cooperatively consume a topic. Kafka guarantees: each partition is assigned to at most one consumer instance in a given group at any time.

This has two important consequences:

  1. Parallelism: to consume a 12-partition topic with maximum parallelism, run 12 consumer instances in the same group. Each instance processes events from exactly one partition. Events within a partition are processed in order by one instance.

  2. Independent groups: multiple consumer groups can read the same topic completely independently. The fraud detection pipeline and the recommendation pipeline can both consume the same payment-transactions topic at their own pace, with their own offsets. One group being slow does not affect the other.

Consumer rebalancing: when a consumer joins or leaves the group, Kafka triggers a rebalance - it reassigns partitions among the current members. During a rebalance, all consumers in the group stop consuming (the "stop-the-world" rebalance problem). Kafka 2.4+ introduced cooperative rebalancing (incremental cooperative rebalancing): instead of revoking all partitions from all consumers at once, only the partitions that need to change ownership are revoked. This dramatically reduces the stop-the-world period.

Offset management: consumer offsets (which offset each consumer has processed up to, per partition) are stored in a special Kafka topic: __consumer_offsets. The offset commit is what makes at-least-once delivery possible: a consumer reads events up to offset X, processes them, then commits offset X. If it crashes before committing, it re-processes from the last committed offset after restart.

# Production Kafka consumer with manual offset commit and rebalance handling
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
import json
import logging
import signal
import sys
from typing import Callable, Dict, List

logger = logging.getLogger(__name__)


class MLFeatureConsumer:
"""
A Kafka consumer for ML feature event processing with:
- Manual offset commit (at-least-once, avoids auto-commit pitfalls)
- Rebalance callbacks (for graceful partition hand-off)
- Configurable batch processing (process N messages before committing)
"""

def __init__(
self,
bootstrap_servers: str,
group_id: str,
topics: List[str],
auto_offset_reset: str = "latest", # 'latest' or 'earliest'
):
self.config = {
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
"auto.offset.reset": auto_offset_reset,
# NEVER set enable.auto.commit=true for ML feature pipelines
# Auto-commit can acknowledge messages before you have processed them
"enable.auto.commit": False,
"session.timeout.ms": 30000,
"heartbeat.interval.ms": 10000,
"max.poll.interval.ms": 300000, # 5min - max time between polls
# Cooperative rebalancing reduces stop-the-world pauses
"partition.assignment.strategy": "cooperative-sticky",
}

self.consumer = Consumer(self.config)
self.topics = topics
self._running = False
self._current_assignment: List[TopicPartition] = []

# Register rebalance callbacks before subscribing
self.consumer.subscribe(
topics,
on_assign=self._on_assign,
on_revoke=self._on_revoke,
)

def _on_assign(self, consumer, partitions: List[TopicPartition]):
"""Called when partitions are assigned to this consumer."""
self._current_assignment = partitions
logger.info(f"Partitions assigned: {[(p.topic, p.partition) for p in partitions]}")
# Opportunity to: load state from the feature store for these partitions,
# initialize per-partition accumulators, etc.

def _on_revoke(self, consumer, partitions: List[TopicPartition]):
"""
Called before partitions are revoked (rebalance or shutdown).
THIS IS YOUR LAST CHANCE to commit offsets for these partitions.
"""
logger.info(f"Partitions being revoked: {[(p.topic, p.partition) for p in partitions]}")

# Commit current offsets synchronously before giving up partitions
# Without this, another consumer taking over these partitions may
# re-process events you have already processed
try:
consumer.commit(asynchronous=False)
logger.info("Committed offsets before partition revoke")
except KafkaException as e:
logger.error(f"Failed to commit on revoke: {e}")

def consume(
self,
process_fn: Callable[[Dict], None],
batch_size: int = 100,
poll_timeout_seconds: float = 1.0,
):
"""
Main consumption loop.

Accumulates messages into batches, processes them, then commits.
If processing fails, does NOT commit - events will be reprocessed
on restart (at-least-once semantics).
"""
self._running = True
batch = []

while self._running:
msg = self.consumer.poll(timeout=poll_timeout_seconds)

if msg is None:
# No messages in the poll window - commit any pending batch
if batch:
self._process_and_commit(batch, process_fn)
batch = []
continue

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Reached end of partition - not an error, just no new data
logger.debug(
f"End of partition {msg.topic()}/{msg.partition()} "
f"at offset {msg.offset()}"
)
else:
logger.error(f"Consumer error: {msg.error()}")
continue

try:
event = json.loads(msg.value().decode("utf-8"))
event["_kafka_meta"] = {
"topic": msg.topic(),
"partition": msg.partition(),
"offset": msg.offset(),
"timestamp": msg.timestamp()[1],
}
batch.append(event)
except json.JSONDecodeError as e:
logger.error(f"Failed to deserialize message at offset {msg.offset()}: {e}")
# Still add to batch to advance offset (skip malformed messages)
# In production: write to dead-letter topic instead

if len(batch) >= batch_size:
self._process_and_commit(batch, process_fn)
batch = []

def _process_and_commit(
self,
batch: List[Dict],
process_fn: Callable[[Dict], None],
):
"""Process a batch, then commit offsets. Never commit before processing."""
try:
for event in batch:
process_fn(event)
# Async commit for throughput - sync commit on rebalance/shutdown
self.consumer.commit(asynchronous=True)
except Exception as e:
logger.error(f"Batch processing failed: {e} - offsets NOT committed")
# Do NOT commit - on restart, this batch will be reprocessed
raise

def close(self):
"""Graceful shutdown - commit final offsets, close consumer."""
self._running = False
try:
self.consumer.commit(asynchronous=False)
except KafkaException:
pass
self.consumer.close()
logger.info("Consumer closed cleanly")


def example_feature_processor(event: Dict) -> None:
"""Example: update a feature in an online feature store."""
user_id = event.get("user_id")
feature_name = event.get("feature_name")
feature_value = event.get("feature_value")
logger.info(f"Updating feature: {user_id} / {feature_name} = {feature_value}")
# In production: redis_client.hset(f"features:{user_id}", feature_name, feature_value)

Consumer Lag Monitoring

Consumer lag - the number of messages in a partition that have been produced but not yet consumed - is the single most important operational metric for streaming ML pipelines. Growing lag means your pipeline is falling behind real time, which means features are getting staler.

lagpartition=latest_offsetpartitioncommitted_offsetpartition\text{lag}_{partition} = \text{latest\_offset}_{partition} - \text{committed\_offset}_{partition}

total_lagtopic=p=0Npartitions1lagpartitionp\text{total\_lag}_{topic} = \sum_{p=0}^{N_{partitions}-1} \text{lag}_{partition_p}

# Consumer lag monitoring per partition
from confluent_kafka import Consumer, TopicPartition
from confluent_kafka.admin import AdminClient
from typing import Dict, List, Tuple
import logging

logger = logging.getLogger(__name__)


def get_consumer_lag(
bootstrap_servers: str,
group_id: str,
topic: str,
) -> Dict[int, Dict[str, int]]:
"""
Calculate consumer lag per partition for a given consumer group and topic.

Returns: dict mapping partition_id -> {
'committed_offset': int,
'log_end_offset': int,
'lag': int,
}
"""
# Consumer to query committed offsets for this group
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": group_id,
})

# Get the list of partitions for this topic
metadata = consumer.list_topics(topic=topic, timeout=10)
topic_metadata = metadata.topics.get(topic)

if topic_metadata is None or topic_metadata.error is not None:
consumer.close()
raise ValueError(f"Topic '{topic}' not found or has error")

partitions = [
TopicPartition(topic, p)
for p in topic_metadata.partitions.keys()
]

# Get committed offsets for this consumer group
committed = consumer.committed(partitions, timeout=10)

# Get the log-end offsets (latest available offsets) for all partitions
# We use a temporary consumer assigned to these partitions
temp_consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": f"{group_id}-lag-checker",
})
temp_consumer.assign(partitions)

# Query watermark offsets (low, high) per partition
lag_info = {}
for tp in committed:
low, high = temp_consumer.get_watermark_offsets(tp, timeout=5)
committed_offset = tp.offset if tp.offset >= 0 else low # -1001 = OFFSET_INVALID

lag = max(0, high - committed_offset)
lag_info[tp.partition] = {
"committed_offset": committed_offset,
"log_end_offset": high,
"lag": lag,
}

consumer.close()
temp_consumer.close()
return lag_info


def check_lag_and_alert(
bootstrap_servers: str,
group_id: str,
topic: str,
warn_lag_threshold: int = 10_000,
critical_lag_threshold: int = 100_000,
) -> None:
"""
Check consumer lag and emit alerts if thresholds are exceeded.
In production, this would integrate with PagerDuty, Slack, or Datadog.
"""
lag_info = get_consumer_lag(bootstrap_servers, group_id, topic)
total_lag = sum(p["lag"] for p in lag_info.values())

for partition_id, info in sorted(lag_info.items()):
logger.info(
f" Partition {partition_id}: committed={info['committed_offset']} "
f"end={info['log_end_offset']} lag={info['lag']}"
)

if total_lag >= critical_lag_threshold:
logger.critical(
f"CRITICAL: consumer group '{group_id}' has {total_lag:,} messages of lag "
f"on topic '{topic}'. Feature freshness severely degraded."
)
elif total_lag >= warn_lag_threshold:
logger.warning(
f"WARNING: consumer group '{group_id}' has {total_lag:,} messages of lag "
f"on topic '{topic}'. Feature freshness may be degraded."
)
else:
logger.info(f"OK: total lag = {total_lag:,} messages")

Compacted Topics

Standard Kafka topics retain all events up to a time or size limit. Every event is kept. For ML feature pipelines, this is usually what you want: you need the full history of transactions to compute rolling features.

But for some use cases, you only care about the latest value per key - not the history. User profile features, model configuration, schema definitions, lookup tables. For these, Kafka provides log compaction.

A compacted topic retains only the most recent record for each key. Kafka's log cleaner periodically scans partition segments and removes older records that have been superseded by a newer record with the same key. Sending a record with a null value (tombstone) deletes the key entirely.

Before compaction:
[user_001: balance=100] [user_002: score=0.3] [user_001: balance=150] [user_001: balance=200]

After compaction (only latest per key retained):
[user_002: score=0.3] [user_001: balance=200]

Use cases for compacted topics in ML:

  • Feature store changelog: keep the latest feature value per entity ID. A new consumer can read the compacted topic from the beginning to reconstruct the full current feature table - without replaying months of history.
  • CDC (Change Data Capture): stream database changes to a downstream data lake or feature store. Compaction ensures you only need to replay the latest state per row.
  • Schema Registry: Confluent Schema Registry uses a compacted topic to store the latest schema per subject.
  • Model configuration: stream model hyperparameters, feature lists, and serving configuration to consumers. Compaction ensures new consumers always get the latest config.
# Working with compacted topics: writing and reading feature updates
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource, ConfigSource
import json
import time
import logging
from typing import Dict, Optional

logger = logging.getLogger(__name__)

BOOTSTRAP_SERVERS = "localhost:9092"
COMPACTED_TOPIC = "user-feature-store"


def create_compacted_topic(topic_name: str, num_partitions: int = 12) -> None:
"""Create a Kafka topic with log compaction enabled."""
admin_client = AdminClient({"bootstrap.servers": BOOTSTRAP_SERVERS})

new_topic = NewTopic(
topic_name,
num_partitions=num_partitions,
replication_factor=3,
config={
"cleanup.policy": "compact",
"min.cleanable.dirty.ratio": "0.1", # compact when 10% of log is dirty
"segment.ms": "86400000", # 24h segments
"delete.retention.ms": "86400000", # 24h tombstone retention
"min.compaction.lag.ms": "60000", # wait 1min before compacting
},
)

fs = admin_client.create_topics([new_topic])
for topic, future in fs.items():
try:
future.result()
logger.info(f"Compacted topic '{topic}' created")
except Exception as e:
if "already exists" in str(e):
logger.info(f"Topic '{topic}' already exists")
else:
raise


def write_feature_update(
producer: Producer,
topic: str,
entity_id: str,
features: Dict[str, float],
) -> None:
"""
Write a feature update as a key-value pair to a compacted topic.
Key = entity_id (determines which partition and which record to compact with)
Value = JSON of current feature values
Null value = tombstone (delete this entity's features)
"""
key = entity_id.encode("utf-8")
value = json.dumps({
"entity_id": entity_id,
"features": features,
"updated_at": time.time(),
}).encode("utf-8")

producer.produce(
topic=topic,
key=key,
value=value,
on_delivery=lambda err, msg: (
logger.error(f"Feature write failed: {err}") if err else None
),
)
producer.poll(0)


def delete_entity_features(producer: Producer, topic: str, entity_id: str) -> None:
"""Send a tombstone record (null value) to delete this entity's features."""
producer.produce(
topic=topic,
key=entity_id.encode("utf-8"),
value=None, # Tombstone - log compaction will remove this key
)
producer.poll(0)


def read_latest_features(
topic: str,
entity_ids: Optional[list] = None,
) -> Dict[str, Dict[str, float]]:
"""
Read the latest feature values for all (or specified) entities
by consuming the compacted topic from the beginning.

This is how you rebuild the feature store from scratch.
"""
consumer = Consumer({
"bootstrap.servers": BOOTSTRAP_SERVERS,
"group.id": "feature-store-reader",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe([topic])

features: Dict[str, Dict[str, float]] = {}
target_set = set(entity_ids) if entity_ids else None
end_offsets_reached = set()

# Get the number of partitions to know when we have read everything
metadata = consumer.list_topics(topic=topic, timeout=10)
num_partitions = len(metadata.topics[topic].partitions)

try:
while len(end_offsets_reached) < num_partitions:
msg = consumer.poll(timeout=2.0)

if msg is None:
break # Timed out - treat as end of data

if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
end_offsets_reached.add(msg.partition())
continue

entity_id = msg.key().decode("utf-8") if msg.key() else None
if entity_id is None:
continue

if target_set and entity_id not in target_set:
continue

if msg.value() is None:
# Tombstone - remove this entity
features.pop(entity_id, None)
else:
record = json.loads(msg.value().decode("utf-8"))
features[entity_id] = record.get("features", {})

finally:
consumer.close()

return features

Why Kafka Is Fast - Architectural Decisions

Kafka achieves throughput measured in millions of messages per second on commodity hardware. Understanding why requires looking at four architectural decisions.

1. Sequential disk I/O. Kafka writes to disk as an append-only log. Sequential writes on an HDD perform at 100-200 MB/s. Random writes on the same HDD perform at only 1-2 MB/s - 100× slower. By committing to sequential, append-only writes, Kafka achieves near-maximum disk throughput. Even on SSDs, where the random vs sequential gap is smaller, sequential I/O is still faster and generates less write amplification.

2. Zero-copy data transfer (sendfile). When a consumer requests data, a naive implementation would: read from disk into kernel page cache → copy to user-space buffer → copy to socket buffer → send over network. That is 4 copies. Kafka uses the Linux sendfile(2) system call, which transfers data directly from the page cache to the network socket - 2 fewer copies. For a topic receiving 1GB/s of writes with 10 consumers, zero-copy reduces CPU usage by roughly 60%.

3. Page cache. Kafka relies on the operating system's page cache rather than managing its own in-process cache. When a producer writes data, it goes to the kernel page cache first and is then flushed to disk asynchronously. When a consumer reads data that was recently written, it hits the page cache - effectively memory speed, not disk speed. On a typical Kafka broker, the working set (recent messages) fits entirely in page cache. Kafka's memory footprint in the JVM heap is intentionally small; it "borrows" memory from the OS.

4. Batching at every layer. Producers batch messages before sending. The broker batches writes to disk. The network layer batches TCP segments. Consumers batch fetch requests. Batching amortizes the per-message overhead (network round-trip, disk seek for metadata, context switch) across many messages, dramatically increasing effective throughput per message.

The combined effect: a single Kafka broker (16 cores, 64GB RAM, RAID-0 SSDs) can sustain 2-4 million messages per second at ~1KB per message, with sub-10ms write latency at P99.


ZooKeeper to KRaft

Prior to Kafka 2.8, Kafka relied on Apache ZooKeeper to store cluster metadata: which brokers are alive, which broker is the leader for each partition, topic configurations, consumer group state. This created a second distributed system to operate alongside Kafka, adding operational complexity, two sets of logs to monitor, two sets of configuration, and two failure modes.

KRaft (Kafka Raft, introduced in Kafka 2.8 as preview, production-ready in Kafka 3.3) replaces ZooKeeper with a built-in Raft-based consensus protocol. A subset of Kafka brokers are designated as controllers (typically 3). These controllers use Raft consensus to elect a leader controller, replicate metadata, and handle broker registration and partition leadership. The metadata is stored in a special internal Kafka topic (@metadata) rather than in ZooKeeper.

Changes with KRaft:

  • Single system to operate: no ZooKeeper cluster to provision, monitor, or upgrade.
  • Faster controller failover: metadata is replicated via Kafka's own replication protocol, which is more predictable than ZooKeeper's ephemeral node-based leadership.
  • Higher partition counts: ZooKeeper had practical limits around 200,000 partitions per cluster. KRaft scales to millions of partitions.
  • Simplified security: one set of authentication/authorization configs instead of two.

For new Kafka deployments as of 2024, use KRaft. ZooKeeper mode is deprecated and will be removed.


Kafka for ML - Architecture Overview


Creating Topics with the Admin API

Topic creation deserves deliberate engineering. The wrong partition count, replication factor, or retention settings will cause problems that are expensive to fix in production.

# Topic creation with correct settings for ML pipelines
from confluent_kafka.admin import AdminClient, NewTopic
import logging

logger = logging.getLogger(__name__)


def create_ml_pipeline_topics(bootstrap_servers: str = "localhost:9092") -> None:
"""
Create all Kafka topics needed for a real-time ML feature pipeline.

Topic design considerations:
- Partition count: target_throughput_msgs_per_sec / msgs_per_sec_per_partition
A single partition can handle ~50K messages/second at 1KB per message
- Replication factor: always 3 in production, 1 in local dev
- Retention: enough to support full replay for training data backfills
"""
admin = AdminClient({"bootstrap.servers": bootstrap_servers})

topics = [
# Raw transaction events - high volume, long retention for training data
NewTopic(
"transactions",
num_partitions=24,
replication_factor=3,
config={
"retention.ms": str(30 * 24 * 60 * 60 * 1000), # 30 days
"compression.type": "lz4",
"min.insync.replicas": "2",
},
),

# ML feature updates - moderate volume, compacted for latest-value semantics
NewTopic(
"feature-updates",
num_partitions=12,
replication_factor=3,
config={
"cleanup.policy": "compact",
"min.cleanable.dirty.ratio": "0.1",
"compression.type": "lz4",
},
),

# Model predictions - low volume, short retention (predictions expire quickly)
NewTopic(
"model-predictions",
num_partitions=12,
replication_factor=3,
config={
"retention.ms": str(7 * 24 * 60 * 60 * 1000), # 7 days
"compression.type": "snappy",
"min.insync.replicas": "2",
},
),

# Dead-letter queue - low volume, long retention for debugging
NewTopic(
"dead-letter",
num_partitions=6,
replication_factor=3,
config={
"retention.ms": str(90 * 24 * 60 * 60 * 1000), # 90 days
"compression.type": "gzip", # max compression for archival
},
),
]

futures = admin.create_topics(topics)

for topic_name, future in futures.items():
try:
future.result()
logger.info(f"Topic '{topic_name}' created successfully")
except Exception as e:
if "already exists" in str(e).lower():
logger.info(f"Topic '{topic_name}' already exists - skipping")
else:
logger.error(f"Failed to create topic '{topic_name}': {e}")
raise

Production Notes

Monitor ISR shrinkage as a leading indicator of broker stress. When a broker is overloaded, its follower replicas may fall behind the leader and be removed from the ISR. This does not immediately cause data loss (the leader continues serving), but if the leader then fails, the only available replicas are out-of-sync - and you will lose the unacknowledged messages. Alert when kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions is non-zero for more than 30 seconds.

Key selection is a correctness concern, not just a throughput concern. For stateful stream processing (counting transactions per user), all events for the same user must go to the same partition (to be processed by the same Flink task or Kafka Streams thread). Always set the message key to the entity ID you are aggregating over. Round-robin producers (no key) will scatter events for the same user across partitions, making correct stateful aggregation impossible.

Set max.poll.interval.ms correctly. If a consumer takes longer than max.poll.interval.ms between polls (because processing is slow), Kafka's group coordinator considers it dead and triggers a rebalance. If your feature computation for a batch of 100 events takes up to 10 seconds, set max.poll.interval.ms >= 15000. Rebalances triggered by this setting are a common source of "consumer lag spikes" in production that look like outages but are actually misconfiguration.

Topic naming conventions matter at scale. Use a consistent naming scheme: <domain>.<entity>.<event-type> - e.g., payments.transactions.created, ml.features.updated, ml.predictions.scored. Include environment (dev, staging, prod) as a prefix if using a shared cluster. Good names prevent the "what does this topic contain" debugging exercise at 3 AM.


Common Mistakes

danger

enable.auto.commit=True in ML feature consumers. Auto-commit runs on a timer (default: every 5 seconds) and commits the latest offset regardless of whether you have successfully processed the messages. If your consumer crashes between the auto-commit and completing processing, you have committed offsets beyond what you processed. On restart, you will skip those events. For ML features, this means silently incorrect feature values. Always use manual commit and commit only after successful processing.

danger

Under-provisioning partition count. If you create a topic with 6 partitions, you can never consume it with more than 6 consumer instances in a group. If your feature pipeline needs to scale to handle a 10× traffic spike, you cannot scale beyond 6 instances. Partition count cannot be decreased, and increasing it breaks key-based routing. Set partition counts at 2-3× your expected peak parallelism need and never use fewer than 12 partitions for any topic that will feed a production ML pipeline.

warning

Ignoring min.insync.replicas during broker maintenance. If you are doing rolling broker upgrades and one broker is down, you are temporarily at replication.factor=3, available_ISR=2. If a second broker goes down during the maintenance window, you drop to available_ISR=1. If min.insync.replicas=2, all writes will fail - your feature pipeline stops producing. Always schedule broker maintenance during low-traffic periods and monitor ISR size throughout.

warning

Using message offsets as unique identifiers. Offset is unique within a partition, not across partitions or topics. Two messages can have the same offset if they are in different partitions. Never use (partition, offset) as a global unique ID for deduplication - use (topic, partition, offset), or better, embed a UUID in the message key or payload.

tip

Use Kafka consumer groups for A/B testing feature pipelines. Run your existing feature pipeline as consumer group feature-pipeline-v1 and your new pipeline as feature-pipeline-v2. Both read from the same topic at the same speed. Compare outputs. When you are confident in v2, remove v1. Zero-downtime, parallel evaluation, easy rollback.


Interview Q&A

Q: What is a Kafka partition and why does partition count matter for ML systems?

A Kafka partition is the physical unit of parallelism and ordering in Kafka. A topic is divided into N partitions, each of which is an independent ordered log. Each partition is assigned to exactly one consumer instance in a consumer group, so the partition count is the maximum parallelism you can achieve for a given consumer group. For ML feature pipelines, partition count matters because: (1) it determines maximum throughput - adding more consumer instances beyond the partition count yields no benefit, (2) it determines ordering scope - events for the same user key always go to the same partition, guaranteeing per-user ordering which is required for stateful aggregations, and (3) it cannot be easily changed after creation - increasing partitions breaks key-based routing for existing consumers. Always provision partition counts at 2-3× your expected peak parallelism need.

Q: What is the ISR and what happens when min.insync.replicas is not met?

ISR (In-Sync Replicas) is the set of replica partitions that are fully caught up with the leader - within replica.lag.time.max.ms of the leader's latest offset. When a producer writes with acks=all, Kafka requires all ISR members to acknowledge the write before responding to the producer. min.insync.replicas sets the minimum ISR size required to service writes. If the current ISR falls below min.insync.replicas (because followers are lagging or brokers are down), the broker rejects writes with a NOT_ENOUGH_REPLICAS exception. This is the safety mechanism that prevents data loss: if not enough replicas have acknowledged, the write is rejected rather than acknowledged with incomplete replication. For production ML topics, replication.factor=3, min.insync.replicas=2 is the standard configuration - tolerate one broker failure without data loss.

Q: How does a consumer group achieve parallel consumption?

A consumer group is a set of consumer instances sharing a group ID. Kafka's group coordinator (a broker) assigns each partition to at most one consumer in the group at any time. With a 24-partition topic and 12 consumers in the group, each consumer is assigned 2 partitions. With 24 consumers, each gets 1 partition - maximum parallelism. Events within a partition are processed by one consumer in order, maintaining per-partition ordering. Events across partitions can be processed in parallel. When a consumer joins or leaves the group (or crashes), Kafka triggers a rebalance to reassign partitions among the remaining members. Multiple consumer groups can read the same topic independently - each group has its own offset per partition, and one group's consumption rate has no effect on another.

Q: What is a compacted topic and when would you use it for ML?

A compacted topic retains only the most recent record per key. Kafka's log cleaner periodically scans partition segments and removes records whose key has a newer record in the log. Sending a null-value record (tombstone) deletes the key. For ML, compacted topics are used for: (1) feature store changelogs - the latest feature value per entity ID, allowing new consumers to reconstruct current feature state by reading from offset 0; (2) CDC (change data capture) from databases, where you only need the current row state per primary key; (3) model configuration propagation, where consumers always need the current config, not the history. The key difference from a regular topic: regular topics retain all events for a time window; compacted topics retain all distinct keys forever, but only the most recent value per key.

Q: What is the danger of enable.auto.commit=True?

Auto-commit commits offsets on a timer (default: every 5 seconds), regardless of whether the consumer has successfully processed the messages. The danger: the consumer reads events at offset X, begins processing them, and the auto-commit timer fires and commits offset X before processing is complete. If the consumer crashes during processing, on restart it resumes from X+1 - the events at X are never reprocessed. For ML feature computation, this means silently skipping events: rolling aggregate counts are wrong, and the model makes predictions on incorrect features. The correct approach is manual commit: accumulate a batch, process it completely, then commit the offset only after successful processing. On crash and restart, the batch is reprocessed (at-least-once semantics), which is correct if your downstream writes are idempotent.

Q: Why is Kafka fast - what architectural decisions enable high throughput?

Four decisions: (1) Sequential disk I/O - Kafka uses an append-only log, enabling sequential writes that are 100× faster than random writes on HDDs and significantly faster even on SSDs. (2) Zero-copy data transfer - Kafka uses the sendfile(2) Linux system call to transfer data from the page cache directly to the network socket, eliminating two data copies that a naive implementation would require. (3) OS page cache - Kafka stores data in the OS page cache rather than JVM heap, enabling recent data (which consumers are most likely to read) to be served at memory speed without any disk I/O. (4) Batching at every layer - producers batch messages, brokers batch disk writes, consumers batch fetch requests. Batching amortizes per-message overhead (network round trips, context switches) across many messages. The combination enables a single broker to sustain 2-4 million messages per second at 1KB per message.

Q: What changed when ZooKeeper was replaced by KRaft?

ZooKeeper stored Kafka cluster metadata externally: partition leaders, broker registrations, topic configs, consumer group state. This required operating two separate distributed systems (Kafka + ZooKeeper), with different configuration, monitoring, and failure modes. KRaft replaces ZooKeeper with a built-in Raft consensus protocol among a subset of Kafka brokers designated as controllers. The controllers elect a leader via Raft, store metadata in an internal Kafka topic (@metadata), and handle all cluster management. Changes: (1) single system to operate - no separate ZooKeeper cluster; (2) faster controller failover - no ZooKeeper session expiry delays; (3) higher partition limits - practical limits scale from ~200K partitions (ZooKeeper) to millions (KRaft); (4) simplified deployment - Kafka can now run with a single process, making development and testing significantly simpler.

© 2026 EngineersOfAI. All rights reserved.