Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Pipeline Viz demo on the EngineersOfAI Playground - no code required. :::

Stream Processing with Kafka for Real-Time ML

Batch processing tells you what happened. Stream processing tells you what is happening. The difference between fraud caught and fraud missed is often measured in seconds.

The Production Moment

The fraud team had a model that achieved 0.91 AUC on their validation set. When they deployed it, it caught 72% of fraud in production. The discrepancy was significant - 19 percentage points of fraud escaping detection that the offline metrics suggested should be caught.

The post-mortem revealed the problem: the model's most important features were velocity-based - "number of transactions in the last 60 minutes," "distinct merchants visited in the last 2 hours," "total spend in the last 30 minutes." During offline evaluation, these features were computed correctly from the historical log using batch processing.

In production, the features were being computed from a batch job that ran every 4 hours. When a fraud campaign started - testing a stolen card with small transactions before making a large purchase - the velocity features at model serving time reflected the state from up to 4 hours ago. The model couldn't see the campaign because its features didn't include the activity from the last 4 hours.

The solution was Kafka + Flink: a streaming pipeline that consumed transaction events in real-time, maintained sliding window aggregations, and kept the online feature store updated within seconds. After deployment, the model caught 89% of fraud - a 17-percentage-point improvement from solving the data freshness problem, not the model architecture problem.

This is why stream processing matters for ML. Not because batch processing is wrong, but because some features are only useful if they are fresh.

Kafka Fundamentals

Apache Kafka (developed at LinkedIn, open-sourced 2011, Jay Kreps, Neha Narkhede, Jun Rao) is a distributed event streaming platform. Understanding its core abstractions is essential before understanding how to use it for ML.

Topics, Partitions, and Offsets

A topic is a named stream of events. Think of it as a database table for events - except append-only, retained for a configurable period.

A partition is a unit of parallelism within a topic. Each partition is an ordered, immutable sequence of events. The key property: all events with the same key go to the same partition (when using key-based partitioning). This guarantees ordering for events from the same source.

An offset is the position of an event within a partition. Consumers track their offset - how far they've read - allowing them to resume from where they left off after failure.

Consumer Groups: Scaling and Isolation

A consumer group is a set of consumers that collectively read from a topic. Each partition is assigned to exactly one consumer within the group. This enables parallel processing: a topic with 12 partitions can be consumed by up to 12 consumers in parallel.

The critical property: multiple consumer groups can read the same topic independently. Kafka maintains a separate offset for each consumer group. This means:

  • The fraud detection service and the feature computation service can both consume from transactions without interfering with each other
  • Each service reads at its own pace
  • Adding a new ML pipeline that needs transaction data doesn't require modifying the data producers
from confluent_kafka import Consumer, Producer, KafkaError
import json
from dataclasses import dataclass
from typing import Any

@dataclass
class KafkaConfig:
bootstrap_servers: str
schema_registry_url: str = ""

class TransactionConsumer:
"""
Kafka consumer for real-time transaction processing.
Reads from the 'transactions' topic for ML feature computation.
"""
def __init__(self, config: KafkaConfig, consumer_group_id: str):
self.consumer = Consumer({
'bootstrap.servers': config.bootstrap_servers,
'group.id': consumer_group_id,
'auto.offset.reset': 'latest', # Start from now (not beginning)
'enable.auto.commit': False, # Manual commit for exactly-once
'max.poll.interval.ms': 300_000, # 5 min max before rebalance
'session.timeout.ms': 30_000,
'heartbeat.interval.ms': 10_000,
})

def consume_with_processing(self, topics: list[str], process_fn):
"""
Consume events and process them, committing offsets after processing.
Manual offset commit ensures at-least-once delivery.
"""
self.consumer.subscribe(topics)
try:
while True:
msg = self.consumer.poll(timeout=1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue # End of partition, not an error
raise KafkaError(msg.error())

# Deserialize and process
event = json.loads(msg.value().decode('utf-8'))
try:
process_fn(event)
# Commit AFTER successful processing
# (at-least-once: if process fails, message is reprocessed)
self.consumer.commit(asynchronous=False)
except Exception as e:
print(f"Processing error: {e}. Message will be reprocessed.")
# Don't commit - message will be reprocessed on restart

finally:
self.consumer.close()

# Producer example: application services publishing transaction events
class TransactionProducer:
"""Publish transaction events to Kafka from payment services."""
def __init__(self, config: KafkaConfig):
self.producer = Producer({
'bootstrap.servers': config.bootstrap_servers,
'acks': 'all', # Wait for all replicas to acknowledge
'retries': 3,
'linger.ms': 5, # Batch messages for 5ms before sending
'compression.type': 'snappy',
'enable.idempotence': True, # Exactly-once semantics on producer side
})

def publish_transaction(self, transaction: dict):
"""Publish a transaction event. user_id as key for partition affinity."""
self.producer.produce(
topic='transactions',
key=transaction['user_id'].encode('utf-8'), # Key determines partition
value=json.dumps(transaction).encode('utf-8'),
callback=self._delivery_callback
)
self.producer.poll(0) # Non-blocking poll to handle callbacks

def _delivery_callback(self, err, msg):
if err is not None:
print(f'Message delivery failed: {err}')

Kafka for ML: Decoupling Feature Computation from Model Serving

The core architectural pattern: Kafka sits between data producers (application services) and data consumers (ML feature pipelines, model serving), decoupling them completely.

The benefits of this decoupling:

  • Producers don't know about consumers: The payment service publishes transactions. It doesn't know or care that a fraud model reads them.
  • Multiple consumers, independent pace: The real-time scoring service reads transactions as they arrive (milliseconds lag). The training data writer batches them hourly.
  • Replay capability: Kafka retains events for a configurable retention period (days to weeks). If a consumer fails, it can replay from its last committed offset.
  • Schema evolution: Kafka's schema registry enables schema versioning without breaking producers or consumers.

Exactly-Once Semantics: Why It Matters for ML Feature Correctness

For ML feature computation, data correctness is critical. If a streaming feature computes "transactions in the last hour" and messages are processed twice (at-least-once delivery), the count will be inflated. Incorrect features produce incorrect model predictions.

Kafka's delivery semantics:

  • At-most-once: Messages may be lost. Fastest, lowest overhead. Never use for ML features.
  • At-least-once: Messages are never lost but may be processed multiple times (duplicates). Safe for idempotent operations (max, last-value-wins). Not safe for counts or sums.
  • Exactly-once: Each message is processed exactly once. Required for accurate count/sum features.
from confluent_kafka import Producer
import json

# Kafka producer with exactly-once semantics (idempotent + transactions)
class ExactlyOnceFeatureProducer:
"""
Produces feature updates to Redis via Kafka transactions.
Ensures each transaction event updates features exactly once.
"""
def __init__(self, bootstrap_servers: str, transactional_id: str):
self.producer = Producer({
'bootstrap.servers': bootstrap_servers,
'enable.idempotence': True, # Deduplicates retries on producer side
'transactional.id': transactional_id, # Enables Kafka transactions
'acks': 'all',
'retries': 10,
'max.in.flight.requests.per.connection': 5,
})
self.producer.init_transactions()

def process_batch_exactly_once(self, messages: list, consumer, feature_store):
"""
Process a batch of messages with exactly-once semantics.
Atomically: commit consumer offsets + write to output topic.
"""
try:
self.producer.begin_transaction()

for msg in messages:
event = json.loads(msg.value().decode('utf-8'))
# Compute feature update
feature_update = self._compute_feature_update(event)
# Write feature update to output topic
self.producer.produce(
'feature_updates',
key=event['user_id'].encode(),
value=json.dumps(feature_update).encode()
)

# Atomically commit consumer offsets WITH the producer transaction
# If this fails, neither the offsets nor the output messages are committed
offsets = {msg.topic_partition(): msg.offset() + 1 for msg in messages}
self.producer.send_offsets_to_transaction(offsets, consumer.consumer_group_metadata())
self.producer.commit_transaction() # Atomic!

except Exception as e:
self.producer.abort_transaction()
raise

def _compute_feature_update(self, event: dict) -> dict:
return {
"user_id": event["user_id"],
"feature": "tx_count_1h_increment",
"value": 1,
"timestamp": event["timestamp"]
}

Kafka handles event transport. Flink handles stateful computations on those events - maintaining aggregations across time windows, joining streams, and computing features that require state.

Apache Flink (developed at TU Berlin, now maintained by Apache) is the standard stateful stream processing framework for production ML pipelines. Key advantages over Kafka Streams: supports larger state (spills to disk via RocksDB), better windowing semantics, supports batch and stream processing with the same API.

# Flink Python API (PyFlink) for real-time feature computation
# Note: production Flink is typically Java/Scala, but PyFlink enables Python ML workflows

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.datastream.window import SlidingEventTimeWindows, Time
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common import Duration
import json

class TransactionVelocityFeature:
"""
Compute real-time velocity features using Flink sliding windows.
Updates online feature store every minute with fresh counts.
"""

def compute_velocity_features(self):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(12) # Match Kafka topic partitions

# Read from Kafka
kafka_source = FlinkKafkaConsumer(
topics='transactions',
deserialization_schema=..., # JSON deserialization
properties={
'bootstrap.servers': 'kafka:9092',
'group.id': 'flink-velocity-features'
}
)

# Create stream with watermark strategy
# WatermarkStrategy handles late events: allow events up to 30s late
stream = env.add_source(kafka_source) \
.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(30))
.with_timestamp_assigner(lambda event, ts: event['timestamp_ms'])
)

# Compute velocity features using sliding windows
velocity_1h = stream \
.key_by(lambda event: event['user_id']) \
.window(SlidingEventTimeWindows.of(
Time.hours(1), # Window size: 1 hour
Time.minutes(1) # Slide interval: recompute every minute
)) \
.aggregate(VelocityAggregator())

# Write results to Redis (online feature store)
velocity_1h.add_sink(RedisFeatureSink())

env.execute("TransactionVelocityFeatures")

# Psuedo-code for the aggregation logic
class VelocityAggregator:
"""Computes count, sum, and distinct user features in a time window."""

def create_accumulator(self):
return {
"tx_count": 0,
"total_amount": 0.0,
"unique_merchants": set(),
"user_id": None
}

def add(self, value, accumulator):
accumulator["tx_count"] += 1
accumulator["total_amount"] += value.get("amount", 0.0)
accumulator["unique_merchants"].add(value.get("merchant_id"))
accumulator["user_id"] = value["user_id"]
return accumulator

def get_result(self, accumulator):
return {
"user_id": accumulator["user_id"],
"tx_count_1h": accumulator["tx_count"],
"total_amount_1h": accumulator["total_amount"],
"unique_merchants_1h": len(accumulator["unique_merchants"]),
"computed_at": ... # current timestamp
}

def merge(self, acc1, acc2):
acc1["tx_count"] += acc2["tx_count"]
acc1["total_amount"] += acc2["total_amount"]
acc1["unique_merchants"] |= acc2["unique_merchants"]
return acc1

Event Time vs Processing Time

This distinction is critical for ML feature correctness (and directly related to the production failure in the opening story).

Event time: The time when the event actually occurred (stored in the event payload). A transaction that happened at 14:23:05 has event time 14:23:05, regardless of when Kafka received it.

Processing time: The time when the event is processed by Flink. If the Kafka message was delayed by network issues, processing time could be 14:24:30 - 85 seconds after event time.

For velocity features (transactions in the last X minutes), always use event time. Features computed with processing time are incorrect when events arrive late.

# WRONG: Processing time windows
# Features are incorrect when events arrive out-of-order or with delay
wrong_stream = stream \
.key_by(lambda e: e['user_id']) \
.window(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(1)))

# CORRECT: Event time windows with watermark
# Handles late events correctly; features reflect actual occurrence time
correct_stream = stream \
.assign_timestamps_and_watermarks(
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30))
) \
.key_by(lambda e: e['user_id']) \
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(1)))

Watermarks are Flink's mechanism for handling late events. A watermark at time T means "all events with timestamp < T have been received." Events arriving after the watermark for their window are either dropped or handled by a side output (for debugging).

The 30-second watermark means: if an event arrives more than 30 seconds late, it may be excluded from the window computation. For fraud detection velocity features, 30 seconds is acceptable. For financial reporting, you might set a 1-hour watermark.

Kafka Connect: Ingesting Data Without Custom Code

Kafka Connect is a framework for ingesting data from external systems into Kafka (source connectors) and delivering Kafka data to external systems (sink connectors) - without writing custom consumer code.

For ML pipelines, the most important connectors:

Source connectors:

  • Debezium: Change Data Capture (CDC) from PostgreSQL, MySQL, MongoDB. Captures every database insert, update, and delete as a Kafka event. Critical for keeping ML features synchronized with production databases.
  • S3 Source: Read files from S3 and publish to Kafka (for batch-to-stream bridge)
  • JDBC Source: Poll databases for changes and publish to Kafka

Sink connectors:

  • S3 Sink: Write Kafka messages to S3 in Parquet format (for training data collection)
  • Redis Sink: Write to Redis (for online feature store updates)
  • BigQuery Sink: Write to BigQuery (for analytics)
// Debezium PostgreSQL source connector configuration
// Captures all changes from transactions table
{
"name": "postgres-transactions-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${secrets:postgres_password}",
"database.dbname": "payments_db",
"database.server.name": "payments",
"table.include.list": "public.transactions,public.users",
"plugin.name": "pgoutput",
"publication.name": "debezium_publication",
// Transforms: flatten nested CDC envelope into flat record
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,ts_ms", // Add operation type and timestamp
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer"
}
}
// S3 Sink connector: save Kafka events to S3 for offline training
{
"name": "s3-events-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "10",
"topics": "transactions,user_events",
"s3.region": "us-east-1",
"s3.bucket.name": "company-data-lake",
"s3.part.size": "67108864", // 64 MB parts (S3 multipart upload)
"flush.size": "100000", // Write after 100K records
"rotate.interval.ms": "3600000", // Or every hour (whichever first)
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"locale": "en_US",
"timezone": "UTC",
// Hive-style partitioning
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_timestamp"
}
}

Real-Time Feature Computation: Session Features

Session features - "user behavior in the current session" - are among the most valuable ML features and the hardest to compute in batch. A session starts when a user begins interacting and ends after a period of inactivity.

import redis
import json
from datetime import datetime
from typing import Optional

class SessionFeatureComputer:
"""
Compute session-based features in real-time using Redis.
Called from a Kafka consumer as each event arrives.
Session: sequence of events with less than 30 min gap between them.
"""
SESSION_TIMEOUT_SECONDS = 1800 # 30 minutes of inactivity = new session

def __init__(self, redis_client):
self.redis = redis_client

def update_session(self, user_id: str, event: dict) -> dict:
"""
Update session state for a user based on a new event.
Returns the current session features for use in ML inference.
"""
session_key = f"session:{user_id}"
now = datetime.now().timestamp()

# Get current session state
session_raw = self.redis.get(session_key)
if session_raw:
session = json.loads(session_raw)
# Check if session has expired (too long since last event)
if now - session['last_event_time'] > self.SESSION_TIMEOUT_SECONDS:
session = self._new_session(user_id, now)
else:
session = self._new_session(user_id, now)

# Update session with new event
session['event_count'] += 1
session['last_event_time'] = now
session['session_duration_seconds'] = now - session['session_start_time']

if event.get('event_type') == 'view':
session['view_count'] += 1
session['items_viewed'].append(event.get('item_id'))
elif event.get('event_type') == 'add_to_cart':
session['cart_count'] += 1
elif event.get('event_type') == 'purchase':
session['purchase_count'] += 1
session['session_revenue'] += event.get('amount', 0.0)

# Keep items_viewed list bounded (last 20 items)
session['items_viewed'] = session['items_viewed'][-20:]

# Compute derived features
session['cart_to_view_ratio'] = (
session['cart_count'] / max(session['view_count'], 1)
)
session['purchase_to_view_ratio'] = (
session['purchase_count'] / max(session['view_count'], 1)
)

# Save updated session with TTL (auto-expire after inactivity)
self.redis.setex(
session_key,
self.SESSION_TIMEOUT_SECONDS * 2, # Keep for 2x the timeout period
json.dumps(session)
)

return self._to_ml_features(session)

def _new_session(self, user_id: str, timestamp: float) -> dict:
return {
"user_id": user_id,
"session_id": f"{user_id}_{int(timestamp)}",
"session_start_time": timestamp,
"last_event_time": timestamp,
"event_count": 0,
"view_count": 0,
"cart_count": 0,
"purchase_count": 0,
"session_revenue": 0.0,
"session_duration_seconds": 0.0,
"items_viewed": []
}

def _to_ml_features(self, session: dict) -> dict:
"""Extract ML-ready features from session state."""
return {
"session_event_count": session["event_count"],
"session_view_count": session["view_count"],
"session_cart_count": session["cart_count"],
"session_purchase_count": session["purchase_count"],
"session_revenue": session["session_revenue"],
"session_duration_minutes": session["session_duration_seconds"] / 60,
"cart_to_view_ratio": session["cart_to_view_ratio"],
"unique_items_viewed": len(set(session["items_viewed"])),
}

Architecture: Kafka as the ML System Backbone

Kafka Sizing and Configuration for ML Workloads

def estimate_kafka_cluster_size(
peak_events_per_second: float,
avg_event_size_bytes: int = 500,
replication_factor: int = 3,
retention_hours: int = 168, # 7 days
target_partition_throughput_mbps: float = 10.0
) -> dict:
"""Estimate Kafka cluster requirements for ML event pipelines."""
# Throughput requirements
peak_throughput_mbps = (peak_events_per_second * avg_event_size_bytes) / (1024 * 1024)
total_throughput_with_replication = peak_throughput_mbps * replication_factor

# Partition count (each partition handles target_partition_throughput_mbps)
min_partitions = int(peak_throughput_mbps / target_partition_throughput_mbps) + 1

# Storage requirements (7-day retention)
events_per_hour = peak_events_per_second * 3600
storage_per_hour_gb = (events_per_hour * avg_event_size_bytes * replication_factor) / (1024**3)
total_storage_gb = storage_per_hour_gb * retention_hours

# Broker count (target 70% utilization)
# Modern Kafka broker: 1 Gbps throughput each
broker_throughput_mbps = 1000 # 1 Gbps
min_brokers = int(total_throughput_with_replication / (broker_throughput_mbps * 0.7)) + 1
# Minimum 3 for replication
brokers = max(3, min_brokers)

return {
"peak_throughput_mbps": peak_throughput_mbps,
"total_throughput_with_replication_mbps": total_throughput_with_replication,
"recommended_partitions": min_partitions,
"total_storage_tb": total_storage_gb / 1024,
"recommended_brokers": brokers,
"storage_per_broker_tb": (total_storage_gb / brokers) / 1024
}

# Fraud detection pipeline: 50K transactions/sec peak
result = estimate_kafka_cluster_size(
peak_events_per_second=50_000,
avg_event_size_bytes=800,
replication_factor=3,
retention_hours=168
)
for k, v in result.items():
if isinstance(v, float):
print(f" {k}: {v:.1f}")
else:
print(f" {k}: {v}")

Common Mistakes

:::danger Using Processing Time for ML Features Instead of Event Time The most critical Kafka/Flink mistake for ML: computing velocity features (transactions/hour, events/session) using the time Flink processes the event, not the time the event occurred. Under normal conditions, these differ by milliseconds. Under queue backlog or replay scenarios, they can differ by hours. Features computed with processing time are silently wrong, degrading model performance without any obvious error. Always use event time with appropriate watermarks. :::

:::danger Not Handling Consumer Group Rebalancing When a new consumer joins a consumer group or a consumer fails, Kafka triggers a rebalance - reassigning partitions among active consumers. During rebalance, consumption stops. For ML serving systems that depend on near-real-time features, rebalances can cause feature staleness spikes. Mitigation: use partition.assignment.strategy=cooperative-sticky (incremental cooperative rebalancing, Kafka 2.4+) which rebalances only the necessary partitions instead of all. :::

:::warning Not Monitoring Consumer Lag Consumer lag - the number of messages in a topic that haven't been consumed by a consumer group - is the primary health metric for streaming ML pipelines. If the Flink feature computation job can't keep up with the Kafka producer, lag grows. Features become stale. Model quality degrades silently. Alert when consumer lag exceeds a threshold (e.g., more than 60 seconds of lag for fraud detection features). :::

Interview Q&A

Q1: What is a Kafka consumer group and why is it important for ML pipelines?

A consumer group is a set of consumers that collectively read from a Kafka topic, with each partition assigned to exactly one consumer in the group. If you have a topic with 12 partitions and a consumer group with 6 consumers, each consumer reads 2 partitions in parallel. If a consumer fails, its partitions are reassigned to the remaining consumers.

For ML, consumer groups enable two critical properties. First, parallelism: a Flink feature computation job with 12 workers (each in the same consumer group) can process events 12× faster than a single consumer. Second, isolation: different ML pipelines can independently consume the same Kafka topic. The fraud detection service and the recommendation feature service can both read from the transactions topic with their own consumer groups, progressing at different speeds without interference. Adding a new ML feature pipeline requires no changes to the data producers.

Q2: Explain exactly-once semantics in Kafka and why it matters for ML feature correctness.

At-least-once delivery guarantees events are never lost - but they may be processed multiple times (duplicates, when a consumer fails after processing but before committing the offset). This is dangerous for count and sum features: if the same transaction is counted twice, the velocity feature ("transactions in the last hour") is wrong.

Exactly-once semantics ensure each event is processed exactly once, even in the presence of failures. Kafka achieves this through two mechanisms: idempotent producers (the broker deduplicates retried messages using a producer ID + sequence number), and Kafka transactions (which atomically commit both the consumer offset and the producer output, so either both happen or neither does - no partial state).

For ML features: exactly-once semantics are required for accuracy-sensitive features (counts, sums). For features that are idempotent (latest value wins, max), at-least-once is acceptable and cheaper.

Q3: What is the difference between event time and processing time in stream processing, and why does it matter for ML?

Event time is when the event actually occurred (the timestamp in the event payload). Processing time is when the stream processing system processes the event. In normal operation, they differ by milliseconds. When a Kafka consumer is behind (consumer lag), or during replay of historical data, they can differ by hours.

For ML feature correctness, event time is almost always what you want. A "transactions in the last 60 minutes" feature should count transactions that occurred in the 60 minutes before the current transaction, not in the 60 minutes before the Flink task processed them. The fraud case in the opening scenario failed precisely because processing time was used instead of event time during a queue backlog.

Flink handles event time with watermarks: a watermark at time T tells Flink "all events with timestamp less than T have arrived." Windows close and results are emitted when the watermark passes the window end. The watermark lag (how much late arrival you tolerate) is a trade-off: more tolerance means more correct results but higher latency.

Q4: What is Kafka Connect and when would you use it instead of a custom Kafka consumer?

Kafka Connect is a framework for building and running data integration pipelines between Kafka and external systems. Source connectors read from external systems (databases, files, APIs) and write to Kafka. Sink connectors read from Kafka and write to external systems (S3, databases, search engines).

Use Kafka Connect instead of a custom consumer when: (1) you're doing CDC (Change Data Capture) from a database - Debezium's PostgreSQL or MySQL connector is far more reliable than a custom implementation, handling snapshots, WAL reading, and schema changes; (2) you're ingesting from a common data source (S3, JDBC, Elasticsearch) - connectors exist for hundreds of systems; (3) the transformation logic is simple - Connect supports a simple single-message transform API.

Write a custom consumer when: the transformation logic is complex ML-specific processing that can't be expressed in Connect's transform framework, or you need the control and observability of a custom Flink or Spark Structured Streaming job.

Q5: How would you design a real-time fraud detection feature pipeline using Kafka and Flink?

The architecture has three components:

First, event ingestion: Payment service publishes transaction events to a transactions Kafka topic (12 partitions, keyed by user_id for partition affinity). Debezium captures database changes (account updates, risk flags) to a user_events topic.

Second, streaming feature computation in Flink: A Flink job reads from both topics with event-time semantics and 30-second watermarks. It computes sliding window features: tx_count (1h, 6h, 24h), total_amount (1h, 24h), unique_merchants (1h), avg_amount_last_10_tx (stateful). Session features maintained in RocksDB state backend. Results published to feature_updates topic.

Third, feature store sync: A second consumer group reads feature_updates and writes to Redis (online feature store) using a Redis pipeline (batched writes for throughput). Redis TTL set to 48 hours - features automatically expire.

Correctness guarantees: exactly-once semantics for count/sum features (Kafka transactions). Event time windows for all velocity features. Watermark tolerance of 30 seconds. Alert when consumer lag exceeds 60 seconds (features more than 60 seconds stale).

Model serving fetches features from Redis with p99 < 2ms. Total pipeline latency from transaction to model feature: under 5 seconds for the Flink window slide (1-minute slide interval).

Handling Late-Arriving Data in ML Feature Pipelines

Late-arriving events - events that appear in the Kafka topic significantly after their event timestamp - are a fundamental challenge in real-time ML systems. Mobile apps buffer events offline. Network partitions delay delivery. Batch exports catch up hours later. If your streaming feature pipeline doesn't handle late arrivals, features will be silently incorrect.

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.streaming import StreamingQuery

spark = SparkSession.builder.appName("late_arrival_handling").getOrCreate()

# Strategy 1: Watermarks with late event tolerance
# Watermark tells Spark how late events can arrive and still be processed
# Trade-off: larger tolerance = more correct features, but higher output latency

def build_streaming_features_with_late_tolerance(
kafka_bootstrap: str,
topic: str,
checkpoint_path: str,
output_path: str
) -> StreamingQuery:
"""
Streaming feature computation with explicit late-arrival handling.
Uses a 10-minute watermark: events up to 10 minutes late are processed.
"""
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.load() \
.select(
F.col("timestamp").alias("kafka_ingestion_time"),
F.from_json(
F.col("value").cast("string"),
"user_id STRING, amount DOUBLE, event_time TIMESTAMP, event_type STRING"
).alias("data")
) \
.select("kafka_ingestion_time", "data.*")

# Apply watermark on event_time (not Kafka ingestion time)
# This is the key correctness decision: use the event's own timestamp
events_with_watermark = events \
.withWatermark("event_time", "10 minutes")

# Sliding window aggregation with late arrival tolerance
# Window slides every minute; result includes events up to 10 minutes late
velocity_features = events_with_watermark \
.groupBy(
F.col("user_id"),
F.window(F.col("event_time"), "60 minutes", "1 minute")
) \
.agg(
F.count("*").alias("tx_count_1h"),
F.sum("amount").alias("total_amount_1h"),
F.countDistinct("event_type").alias("distinct_event_types_1h")
) \
.select(
F.col("user_id"),
F.col("window.end").alias("window_end_time"),
F.col("window.start").alias("window_start_time"),
"tx_count_1h", "total_amount_1h", "distinct_event_types_1h"
)

# Write to Delta Lake with append mode
return velocity_features.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path) \
.trigger(processingTime="30 seconds") \
.start(output_path)


# Strategy 2: Detect and flag late arrivals for monitoring
def monitor_late_arrivals(events_df) -> None:
"""
Add monitoring for late-arriving events to understand pipeline health.
Emits metrics to Datadog/Prometheus for alerting.
"""
events_with_latency = events_df.withColumn(
"arrival_latency_seconds",
F.unix_timestamp(F.current_timestamp()) - F.unix_timestamp(F.col("event_time"))
)

# Classify events by how late they arrived
classified = events_with_latency.withColumn(
"latency_bucket",
F.when(F.col("arrival_latency_seconds") < 5, "on_time")
.when(F.col("arrival_latency_seconds") < 60, "slightly_late")
.when(F.col("arrival_latency_seconds") < 600, "moderately_late")
.otherwise("very_late")
)

# Aggregate by latency bucket - monitor this in Datadog
classified.groupBy("latency_bucket").count().show()

# Strategy 3: Recompute features from historical data when late data is discovered
def recompute_stale_windows(
delta_path: str,
recompute_from: str,
recompute_to: str
) -> None:
"""
When a batch of late events arrives (e.g., mobile app goes online after
12 hours offline), recompute affected feature windows.
Delta Lake's replaceWhere makes this safe and idempotent.
"""
spark = SparkSession.getActiveSession()

late_events = spark.read.format("delta").load(f"{delta_path}/raw_events") \
.filter(
(F.col("event_time").cast("date") >= recompute_from) &
(F.col("event_time").cast("date") <= recompute_to)
)

# Recompute features for the affected time range
corrected_features = late_events.groupBy(
"user_id",
F.date_trunc("hour", "event_time").alias("feature_hour")
).agg(
F.count("*").alias("tx_count_1h"),
F.sum("amount").alias("total_amount_1h"),
)

# Write back with replaceWhere to atomically overwrite the affected partition
corrected_features.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere",
f"feature_hour >= '{recompute_from}' AND feature_hour <= '{recompute_to}'") \
.save(f"{delta_path}/velocity_features")

print(f"Recomputed features for range {recompute_from} to {recompute_to}")

Summary

Kafka and Flink solve the ML feature freshness problem: the gap between what features the model was trained on and what features it sees at serving time. Batch pipelines are insufficient when features must reflect behavior from the last minutes or seconds, not the last hours.

The key concepts: Kafka topics and consumer groups enable decoupled, parallel, multi-consumer event streaming. Exactly-once semantics ensure feature accuracy for count and sum operations. Flink's event time processing with watermarks handles out-of-order events correctly. The combination of Kafka (transport) + Flink (computation) + Redis (serving) is the standard architecture for production real-time ML features.

The operational principle: always use event time, monitor consumer lag religiously, and design features to be idempotent where possible.

:::tip Key Takeaway The decision between batch and stream processing for ML features comes down to one question: how stale can this feature be before it materially hurts model performance? If the answer is "more than a few hours," use batch (Spark). If the answer is "less than a few minutes," use streaming (Kafka + Flink). Most production ML systems need both, and the architecture question is which features go where. :::

© 2026 EngineersOfAI. All rights reserved.