:::tip ๐ฎ Interactive Playground Visualize this concept: Try the Stream Pipeline Viz demo on the EngineersOfAI Playground - no code required. :::
Streaming Concepts - Why Batch Fails for Real-Time ML
import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
Reading time: ~35 minutes | Interview relevance: Very High | Target roles: Data Engineer, ML Engineer, AI Platform Engineer
This lesson covers the theoretical foundations of stream processing. The concepts here - event time, watermarks, windows, delivery semantics - underpin everything in the remaining lessons. If you plan to read only one lesson before a streaming interview, read this one. Every Kafka and Flink question ultimately reduces to: did you get the time semantics right? Did you choose the right delivery guarantee? Did you handle late data?
The Real Production Momentโ
It is 2:17 AM on a Tuesday. Your phone buzzes with a PagerDuty alert. Not a system outage - the fraud detection model is running fine, making predictions every hour on schedule, achieving 94% precision. The alert is from the business: a customer's account was drained of $12,000 in 38 minutes. The model never flagged it.
You pull up the logs. The features the model used were computed at 11 PM - three hours before the fraud occurred. The fraudster made 22 transactions in 38 minutes, a velocity pattern the model would have detected immediately if the features had been fresh. But your velocity feature - "number of transactions in the last 60 minutes" - was stale by 38 minutes when the fraud started, and by 3 hours 38 minutes when the batch job finally ran again. By then, the account was empty and the fraudster was gone.
The fraud analyst pulls you aside the next morning. "We've been telling the data team about this for six months. The model is great. The features are useless for real-time fraud." You check the feature freshness SLA in the internal wiki. It says "refresh every 4 hours." The model team wrote the SLA without knowing what it implied operationally. The data team hit the SLA. The fraud kept happening.
This is not a story about a bad model. It is a story about the fundamental mismatch between batch computation and real-time decision-making. Your fraud model is a time machine that can only see the past. The question is how far into the past you make it look. And the answer, for fraud detection, is: seconds, not hours. That is what stream processing is for.
The next day, you start designing a streaming feature pipeline. Within two weeks, feature freshness is 8 seconds. Fraud detection rate increases by 31% in the first month. The batch job still runs - for backfill, for training data generation, for the weekly reporting dashboard. But the serving path runs on streams. This is the architecture that most mature ML platforms converge on: batch for learning, streams for serving.
Why This Exists - The Problem With Batchโ
Batch processing is the natural first architecture for any data system. You collect data, you process it at some interval (hourly, daily, nightly), you produce outputs. This works extraordinarily well for a large class of problems. But it has a fundamental property that makes it unsuitable for real-time ML: the minimum achievable staleness equals the batch interval.
If you run your feature pipeline every 4 hours, the freshest feature you can ever serve is 4 hours old (at the start of the batch window) and the stalest is 8 hours old (at the end of the window). You cannot improve this without running the batch more frequently. If you run it every 15 minutes, you improve to 15-to-30-minute staleness. If you run it every 1 minute, you improve to 1-to-2-minute staleness. But at some point - usually around 1-minute intervals - you have built a poor man's streaming system. You are spawning, scheduling, and tearing down Spark jobs every 60 seconds. The overhead dominates the work.
Beyond latency, batch processing has a semantic problem for ML: it treats time as discrete rather than continuous. Your model was trained on a world where features represent point-in-time snapshots. At serving time, if those snapshots are stale by variable amounts (sometimes 4 hours, sometimes 8 hours, depending on where you are in the batch cycle), you have introduced training-serving skew - the features at training time had a specific freshness profile that does not match the features at serving time. This degrades model accuracy in ways that are subtle and hard to debug.
The problems batch processing fails to solve well:
- Fraud and risk: Fraudsters act in minutes. Any batch latency longer than a few minutes is operationally useless.
- Real-time recommendations: Recommendation freshness directly correlates with engagement. A recommendation based on what a user did yesterday is weaker than one based on what they did 30 seconds ago.
- Anomaly detection: An anomaly that persists for 4 hours before you detect it has already caused damage. Anomaly detection only prevents damage when detection latency is low.
- Dynamic pricing: Surge pricing, yield management, bid adjustments - these require observing current supply/demand state, not state from 4 hours ago.
Stream processing solves the latency problem by abandoning the batch interval entirely. Instead of processing data at scheduled intervals, a streaming system processes each event as it arrives. The system is always running, always consuming, always producing outputs. Feature freshness is measured in seconds, not hours.
Historical Context - From Unix Pipes to Flinkโ
The idea of processing data as a stream is older than most engineers realize. Unix pipes (introduced in 1973 by Doug McIlroy at Bell Labs) are the original streaming primitive: cat access.log | grep 404 | wc -l. Each program produces output, the next consumes it, and the whole composition runs continuously. This is stream processing at the most fundamental level.
The modern stream processing lineage starts with Twitter Storm in 2011 (created by Nathan Marz). Storm was the first widely adopted distributed stream processing system - it introduced the concepts of spouts (sources), bolts (processing operators), and topologies (directed acyclic graphs of operators). Storm had a critical limitation: it provided at-least-once delivery but not exactly-once, and it had no state management.
LinkedIn Samza (2013) addressed the state problem by tightly coupling stream processing with Kafka for storage and using RocksDB for local state. Samza was built for exactly the same LinkedIn use cases we will see in the Kafka lesson.
Apache Spark Streaming (2013) took a different approach: instead of true streaming, it ran micro-batches - collecting events for a short interval (typically 500ms-2s) and then processing them as a tiny Spark batch. This gave Spark's batch semantics (including fault tolerance) to near-real-time processing. The tradeoff: latency floor bounded by the micro-batch interval, and event-time semantics are awkward because you are still fundamentally batching.
Apache Flink (2014, commercialized by data Artisans, later Ververica) is the first production-grade true streaming system. Flink processes each event individually, not in micro-batches. It has native event-time support, watermarks, state backends with RocksDB, and exactly-once semantics via distributed snapshots (Chandy-Lamport algorithm). Flink is now the standard for latency-sensitive ML feature pipelines.
Kafka Streams (2016) embedded stream processing directly into Kafka clients - no separate cluster, no separate scheduler. A Kafka Streams application is just a JVM process that reads from Kafka, applies transformations, and writes back to Kafka. This dramatically lowered the operational overhead for teams that already ran Kafka.
The progression represents a consistent trend: each generation solved the problems its predecessor could not - latency, state management, exactly-once semantics, operational simplicity. Modern ML platforms typically use Kafka as the transport layer and Flink or Kafka Streams as the processing layer.
Core Concept 1 - Event Time vs Processing Timeโ
This is the single most important concept in stream processing for ML, and the one most commonly misunderstood. Let us define the terms precisely and then build intuition for why the distinction matters.
Processing time (): the wall-clock time at which a system processes an event - the time on the processing machine's clock when your code runs on that event.
Event time (): the time at which an event actually occurred in the real world - typically embedded in the event's payload as a timestamp.
The gap between them is the event delay:
In an ideal world with perfect networks and no buffering, - events arrive and are processed almost instantly after they occur. In the real world, can be anywhere from milliseconds to hours.
Consider a mobile banking app. When a user makes a transaction, the app records in the transaction payload. If the user is connected to the internet, the event arrives at your Kafka topic within milliseconds. But if the user is on a subway with intermittent connectivity, the event might be buffered on-device and not transmitted until they emerge from the tunnel - 12 minutes later. When your stream processor receives that event, is 12 minutes after .
Now consider what this means for your fraud detection features. If you compute "number of transactions in the last 60 minutes" based on processing time, an event that was delayed 12 minutes on the subway will be counted in the wrong 60-minute window. The feature will be incorrect. Your model will make a prediction based on wrong inputs.
This is not a hypothetical. Consider the classic example:
Event A: occurred at 10:00:00, arrived at 10:00:01 (ฮt = 1 second)
Event B: occurred at 10:00:05, arrived at 10:00:06 (ฮt = 1 second)
Event C: occurred at 10:00:03, arrived at 10:12:14 (ฮt = 12 minutes)
If you compute a 10-second tumbling window from 10:00:00 to 10:00:10 based on event time:
- Event A belongs in this window (event time 10:00:00)
- Event B belongs in this window (event time 10:00:05)
- Event C belongs in this window (event time 10:00:03)
But if you compute based on processing time, Event C arrives at 10:12:14 - it falls into the 10:12:10-10:12:20 window instead of the 10:00:00-10:00:10 window. Wrong window. Wrong aggregation. Wrong features. Wrong prediction.
The rule: always use event time for ML feature computation. Processing time is only appropriate when you do not care about correctness and just want "how many events arrived in the last N seconds."
# Simulating event-time vs processing-time divergence
import time
import random
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List
@dataclass
class Transaction:
transaction_id: str
user_id: str
amount: float
event_time: float # when the transaction actually occurred
processing_time: float # when we receive/process it
def simulate_events_with_delays() -> List[Transaction]:
"""
Simulate a stream of transactions where some events arrive late.
Mobile users, network issues, retries - all cause event-time lag.
"""
base_time = time.time()
events = []
for i in range(10):
event_time = base_time + i * 2 # events every 2 seconds real-world
# Most events arrive quickly, some are delayed (mobile buffering, retries)
delay_seconds = random.choices(
population=[0.1, 0.5, 30.0, 180.0, 720.0],
weights=[0.70, 0.20, 0.05, 0.03, 0.02], # 2% delayed 12+ minutes
k=1
)[0]
processing_time = event_time + delay_seconds
events.append(Transaction(
transaction_id=f"txn_{i:04d}",
user_id="user_001",
amount=round(random.uniform(10, 500), 2),
event_time=event_time,
processing_time=processing_time,
))
return events
def count_in_window_by_event_time(
events: List[Transaction],
window_start: float,
window_end: float
) -> int:
"""Count events whose event_time falls in [window_start, window_end)."""
return sum(
1 for e in events
if window_start <= e.event_time < window_end
)
def count_in_window_by_processing_time(
events: List[Transaction],
window_start: float,
window_end: float
) -> int:
"""Count events whose processing_time falls in [window_start, window_end)."""
return sum(
1 for e in events
if window_start <= e.processing_time < window_end
)
# Demonstrate the divergence
events = simulate_events_with_delays()
base = min(e.event_time for e in events)
# First 10-second window
window_start = base
window_end = base + 10.0
event_time_count = count_in_window_by_event_time(events, window_start, window_end)
processing_time_count = count_in_window_by_processing_time(events, window_start, window_end)
print(f"Window 0sโ10s event count by EVENT TIME: {event_time_count}")
print(f"Window 0sโ10s event count by PROCESSING TIME: {processing_time_count}")
print(f"Divergence: {abs(event_time_count - processing_time_count)} events miscounted")
# Late events: show which events would be miscounted
for e in events:
delay = e.processing_time - e.event_time
if delay > 5.0:
et_win = int((e.event_time - base) / 10)
pt_win = int((e.processing_time - base) / 10)
print(f"\nLate event {e.transaction_id}:")
print(f" Event time window: {et_win}")
print(f" Processing time window: {pt_win}")
print(f" Delay: {delay:.1f}s - MISCOUNTED by processing-time logic")
Core Concept 2 - Watermarks: Handling Late Dataโ
Event-time processing creates an immediate problem: how do you know when a window is complete? If you are computing a 10:00-10:10 window, and it is now 10:15, have all the events for that window arrived? What about the mobile user who was on the subway? What if an event arrives at 10:30 with event time 10:05?
You cannot wait forever. A streaming system must eventually emit window results. Watermarks are the mechanism that answers the question: "At processing time , we believe all events with event time less than have arrived."
The heuristic watermark formula is:
where is the maximum event timestamp seen so far across all received events, and is the allowed lateness - how far back in event time you are willing to wait for late arrivals.
Intuition: if the most recent event you have seen had event time 10:14:30, and your allowed lateness is 30 seconds, your watermark is 10:14:00. This means: "I am confident that all events with event time before 10:14:00 have arrived. Any window that closes before 10:14:00 can now be finalized."
When a late event arrives with event time before the current watermark, you have three choices:
- Drop it (at-most-once for late data) - simplest, loses data
- Recompute the window (exactly-once, expensive) - emit a retraction + correction
- Add to a side output (dead-letter pattern) - process late data separately
# Watermark calculation and late event detection
from collections import defaultdict
from typing import Dict, Tuple, Optional
class WatermarkTracker:
"""
Tracks watermark position across a stream of events.
The watermark advances as new events arrive with higher event timestamps.
Events that arrive with event_time < watermark are considered 'late'.
"""
def __init__(self, allowed_lateness_seconds: float = 30.0):
self.allowed_lateness = allowed_lateness_seconds
self.max_event_time_seen: Optional[float] = None
self.total_events = 0
self.late_events = 0
def advance(self, event_time: float) -> float:
"""
Process an event and return the current watermark.
Returns: current watermark position (seconds since epoch)
"""
self.total_events += 1
# Update the maximum event time seen
if self.max_event_time_seen is None or event_time > self.max_event_time_seen:
self.max_event_time_seen = event_time
return self.current_watermark()
def current_watermark(self) -> float:
"""
Current watermark = max_event_time_seen - allowed_lateness
All windows with close_time <= watermark can be safely finalized.
"""
if self.max_event_time_seen is None:
return float('-inf')
return self.max_event_time_seen - self.allowed_lateness
def is_late(self, event_time: float) -> bool:
"""Returns True if this event arrived after the window it belongs to closed."""
is_late_event = event_time < self.current_watermark()
if is_late_event:
self.late_events += 1
return is_late_event
def late_event_fraction(self) -> float:
if self.total_events == 0:
return 0.0
return self.late_events / self.total_events
# Demonstrate watermark behavior with a realistic event stream
tracker = WatermarkTracker(allowed_lateness_seconds=30.0)
# Simulate events arriving out of order
import_events = [
(1000.0, "txn_001"), # t=1000, arrives on time
(1005.0, "txn_002"), # t=1005, arrives on time
(1010.0, "txn_003"), # t=1010, arrives on time
(1020.0, "txn_004"), # t=1020, on time - watermark advances to 990
(1050.0, "txn_005"), # t=1050, on time - watermark advances to 1020
(990.0, "txn_late1"), # t=990, LATE - watermark is 1020, this is 30s late
(1015.0, "txn_late2"), # t=1015, slightly late but within lateness
(970.0, "txn_late3"), # t=970, very late - 80s behind watermark
]
print("Event processing with watermark (allowed lateness = 30s):\n")
for event_time, txn_id in import_events:
watermark = tracker.advance(event_time)
late = tracker.is_late(event_time)
status = "LATE - will be dropped or side-output" if late else "on time"
print(f" {txn_id}: event_time={event_time:.0f} watermark={watermark:.0f} [{status}]")
print(f"\nLate event rate: {tracker.late_event_fraction():.1%}")
Setting allowed_lateness too high means windows are held open longer, increasing memory usage and output latency. Setting it too low means legitimate late events are dropped, causing incorrect feature values. For most ML feature pipelines, 30โ120 seconds of allowed lateness covers 99%+ of real-world delays. Measure your P99 event delay in production and set allowed lateness to cover it.
Core Concept 3 - Windows: The Aggregation Abstractionโ
Stream processing would be useless for ML without windows. A raw event stream gives you individual events. Your ML model needs features: "how many transactions in the last 60 minutes," "average spend in the last 24 hours," "has this user had more than 5 failed logins in the last 10 minutes." These require aggregating events over a time range. Windows are that time range.
There are four window types. Understanding when to use each is a core interview topic.
Tumbling Windowsโ
Non-overlapping, fixed-size windows. Each event belongs to exactly one window.
time: |---0:00---||---0:05---||---0:10---||---0:15---|
| W1 || W2 || W3 || W4 |
Use tumbling windows when: you need non-overlapping time buckets (hourly counts, daily aggregates, 5-minute fraud velocity).
Fraud example: Count transactions per user in each 5-minute window. A user with 0-2 transactions per 5-minute window is normal. A user with 20+ transactions in one 5-minute window is suspicious.
Sliding Windowsโ
Overlapping windows of fixed size, advancing by a fixed step. Each event belongs to windows.
size=10min, step=5min:
W1: [0:00 โ 0:10)
W2: [0:05 โ 0:15)
W3: [0:10 โ 0:20)
Use sliding windows when: you need a rolling aggregate - "spend in last 60 minutes" updated every 5 minutes, "average response time in last 10 minutes" updated every minute.
ML example: A 1-hour sliding window updated every 5 minutes gives you 12 overlapping windows per hour. Each 5-minute slide, you emit a fresh feature value. This is how most real-time feature pipelines compute rolling statistics.
Session Windowsโ
Gap-based windows with no fixed size. A session window opens when an event arrives and closes when no events are seen for the gap duration.
events: * * * * * * *
|session1| gap > 30min closes |session2|
Use session windows when: natural user sessions exist in the data - browsing sessions, login sessions, app usage sessions. The window duration is defined by the user's behavior, not a fixed interval.
ML example: Compute "number of pages viewed in this browsing session" for a recommendation model. Session closes when the user is inactive for 30 minutes.
Global Windowsโ
A single window covering all time. Requires manual triggers (a custom trigger fires when some condition is met). Rarely used in ML except for very specific patterns.
# Implementing the three main window types from scratch (pure Python - no framework)
# This is for conceptual clarity; production systems use Flink or Kafka Streams
from collections import defaultdict
from typing import List, Dict, Callable, Any
import math
@dataclass
class Event:
user_id: str
value: float
event_time: float # seconds since epoch
def tumbling_window_aggregate(
events: List[Event],
window_size_seconds: float,
aggregator: Callable[[List[float]], float],
start_time: float,
) -> Dict[Tuple[float, float], float]:
"""
Assign each event to a non-overlapping tumbling window and aggregate.
Returns: dict mapping (window_start, window_end) -> aggregated value
"""
windows: Dict[Tuple[float, float], List[float]] = defaultdict(list)
for event in events:
# Which window does this event belong to?
window_index = math.floor((event.event_time - start_time) / window_size_seconds)
window_start = start_time + window_index * window_size_seconds
window_end = window_start + window_size_seconds
windows[(window_start, window_end)].append(event.value)
return {
window: aggregator(values)
for window, values in sorted(windows.items())
}
def sliding_window_aggregate(
events: List[Event],
window_size_seconds: float,
slide_seconds: float,
aggregator: Callable[[List[float]], float],
start_time: float,
end_time: float,
) -> Dict[Tuple[float, float], float]:
"""
Compute sliding window aggregates. Each event belongs to multiple windows.
Returns: dict mapping (window_start, window_end) -> aggregated value
"""
results = {}
current = start_time
while current + window_size_seconds <= end_time + slide_seconds:
window_start = current
window_end = current + window_size_seconds
# Collect events in this window
window_values = [
e.value for e in events
if window_start <= e.event_time < window_end
]
if window_values:
results[(window_start, window_end)] = aggregator(window_values)
current += slide_seconds
return results
def session_window_aggregate(
events: List[Event],
gap_seconds: float,
aggregator: Callable[[List[float]], float],
) -> List[Dict[str, Any]]:
"""
Group events into sessions based on inactivity gap.
A new session starts when the gap between consecutive events exceeds gap_seconds.
"""
if not events:
return []
# Sort by event time
sorted_events = sorted(events, key=lambda e: e.event_time)
sessions = []
session_start = sorted_events[0].event_time
session_values = [sorted_events[0].value]
session_last_time = sorted_events[0].event_time
for event in sorted_events[1:]:
gap = event.event_time - session_last_time
if gap > gap_seconds:
# Close current session, start new one
sessions.append({
"start": session_start,
"end": session_last_time,
"duration": session_last_time - session_start,
"event_count": len(session_values),
"aggregate": aggregator(session_values),
})
session_start = event.event_time
session_values = [event.value]
else:
session_values.append(event.value)
session_last_time = event.event_time
# Close the final session
sessions.append({
"start": session_start,
"end": session_last_time,
"duration": session_last_time - session_start,
"event_count": len(session_values),
"aggregate": aggregator(session_values),
})
return sessions
# Example: compute transaction velocity features
import random
base = 1_700_000_000.0 # arbitrary epoch base
sample_events = [
Event("user_001", random.uniform(10, 200), base + t)
for t in [0, 45, 90, 120, 200, 310, 800, 850, 900,
2500, 2510, 2520, 2530] # gap of ~1600s between clusters
]
# Tumbling 5-minute windows (300 seconds)
tumbling = tumbling_window_aggregate(
sample_events,
window_size_seconds=300,
aggregator=len,
start_time=base,
)
print("Tumbling 5-min windows (transaction count):")
for (ws, we), count in tumbling.items():
offset_start = int(ws - base)
offset_end = int(we - base)
print(f" [{offset_start}s โ {offset_end}s]: {count} transactions")
# Session windows with 5-minute gap
sessions = session_window_aggregate(
sample_events,
gap_seconds=300,
aggregator=sum,
)
print(f"\nSession windows (5-min gap, total spend):")
for i, session in enumerate(sessions):
print(f" Session {i+1}: {session['event_count']} events, "
f"duration={session['duration']:.0f}s, "
f"total=${session['aggregate']:.2f}")
Core Concept 4 - Delivery Semanticsโ
When a streaming system fails and recovers, which events get processed? This question defines the three delivery semantics. They are a fundamental interview topic and a critical design decision for ML pipelines.
At-most-once: Events may be lost, but are never processed twice.
- Producer sends event, does not wait for acknowledgment.
- If the consumer crashes, it restarts from the latest offset - events during the crash window are skipped.
- Use when: loss is acceptable and simplicity is paramount. Metrics aggregation where losing a few data points is tolerable. Almost never appropriate for ML feature computation.
At-least-once: Events are never lost, but may be processed more than once.
- Producer retries until acknowledgment. Consumer re-processes events after crash recovery.
- If a consumer crashes after processing event X but before committing offset X, it will re-process X on restart.
- Use when: your processing is idempotent (safe to process twice) or you have downstream deduplication.
- This is the default for most Kafka consumer configurations.
Exactly-once: Each event is processed exactly once, even in the presence of failures.
- Requires: idempotent producers + transactional consumers + atomic offset commit.
- The cost: approximately 20-40% throughput reduction vs at-least-once.
- Use when: feature computation must be precise - counts, sums, and window aggregates must be exactly correct.
For ML feature computation, the right answer depends on the feature type:
| Feature type | Appropriate semantic | Reasoning |
|---|---|---|
| COUNT, SUM in a window | Exactly-once or idempotent at-least-once | Duplicate counts corrupt features |
| MAX, MIN in a window | At-least-once (idempotent) | Max/min of duplicates = correct max/min |
| Model inference logging | At-least-once | Duplicate log entries can be deduped |
| Training data collection | Exactly-once | Duplicate training examples bias models |
| Feature serving (online) | At-least-once + read-your-writes | Reads are idempotent |
The cost of exactly-once is primarily latency and throughput. Exactly-once in Kafka requires two-phase commit between the producer and the __consumer_offsets topic: the producer marks a transaction start, writes the output event, commits the consumer offset atomically, then marks the transaction committed. If any step fails, the entire transaction is aborted and retried.
For most ML feature pipelines, the right choice is: use at-least-once delivery with idempotent writes to the feature store. Write features using upsert semantics (key = user ID + window start time) so that reprocessing a duplicate event simply overwrites the same row with the same value. This gives you the correctness of exactly-once with the throughput of at-least-once.
Core Concept 5 - Backpressureโ
Backpressure is what happens when your consumers cannot keep up with your producers. Imagine your transaction topic receives 100,000 events per second during a flash sale. Your feature computation consumer can only process 60,000 events per second. The gap grows: 40,000 events/second accumulate in Kafka partitions as consumer lag.
Different systems handle backpressure differently:
Blocking (the right answer): The producer is slowed down to match consumer speed. In Flink, operators pass credit tokens upstream - if a downstream operator is full, it stops issuing credits, which causes the upstream operator to block, which propagates back to the source. This prevents data loss at the cost of increased end-to-end latency.
Dropping (at-most-once): Events beyond a buffer threshold are dropped. Fast, but loses data.
Buffering (temporary): Accumulate events in a queue or Kafka itself. This is what actually happens at scale - Kafka is the buffer. Consumer lag is the measurement of how far behind the consumer is.
# Simulating backpressure with a bounded queue
import queue
import threading
import time
import random
def simulate_backpressure(
producer_rate_per_second: float,
consumer_rate_per_second: float,
simulation_duration_seconds: float,
max_queue_size: int = 10_000,
):
"""
Simulate a producer-consumer pipeline with backpressure.
When the queue fills up, the producer is slowed (blocking backpressure).
In Kafka terms, this is analogous to consumer lag growing.
"""
event_queue: queue.Queue = queue.Queue(maxsize=max_queue_size)
stats = {
"produced": 0,
"consumed": 0,
"dropped": 0,
"max_queue_size": 0,
}
stop_flag = threading.Event()
def producer():
interval = 1.0 / producer_rate_per_second
end_time = time.time() + simulation_duration_seconds
while time.time() < end_time and not stop_flag.is_set():
event = {"id": stats["produced"], "timestamp": time.time()}
try:
# block=True: blocking backpressure - producer waits if queue full
event_queue.put(event, block=True, timeout=5.0)
stats["produced"] += 1
except queue.Full:
stats["dropped"] += 1
stats["max_queue_size"] = max(stats["max_queue_size"], event_queue.qsize())
time.sleep(interval)
def consumer():
interval = 1.0 / consumer_rate_per_second
while not stop_flag.is_set() or not event_queue.empty():
try:
event_queue.get(timeout=1.0)
stats["consumed"] += 1
# Simulate processing work
time.sleep(interval)
except queue.Empty:
continue
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
stop_flag.set()
consumer_thread.join()
lag = stats["produced"] - stats["consumed"]
print(f"Producer rate: {producer_rate_per_second}/s, Consumer rate: {consumer_rate_per_second}/s")
print(f"Produced: {stats['produced']}, Consumed: {stats['consumed']}")
print(f"Final consumer lag: {lag} events")
print(f"Peak queue depth: {stats['max_queue_size']}")
print(f"Estimated time to drain lag: {lag / consumer_rate_per_second:.1f}s\n")
# Fast consumer keeps up
simulate_backpressure(
producer_rate_per_second=100,
consumer_rate_per_second=120,
simulation_duration_seconds=5,
)
# Slow consumer falls behind
simulate_backpressure(
producer_rate_per_second=100,
consumer_rate_per_second=60,
simulation_duration_seconds=5,
)
In production Kafka + Flink pipelines, monitor consumer lag per partition as the primary health metric. A growing lag means your pipeline is not keeping up. Set alerts at 2ร your normal lag baseline. A lag spike during a flash sale is expected; a lag that keeps growing after the sale ends indicates a processing bottleneck that needs to be fixed.
Core Concept 6 - State in Streamingโ
Stateless operators process each event independently. A stateless filter (event.amount > 1000) requires no memory of past events. These are easy to parallelize and recover from failures.
Stateful operators require memory of past events. A count of transactions in the last 60 minutes requires remembering which transactions have been seen. This is where stream processing gets complex - and where most ML features live.
State in a distributed streaming system must be:
- Partitioned: each partition of the input is processed by one worker, and that worker owns the state for that partition's keys
- Persistent: state survives worker restarts via periodic checkpointing to durable storage (HDFS, S3, RocksDB)
- Queryable: in advanced setups, state can be queried externally (Flink queryable state) for low-latency feature serving
The state backends in Apache Flink:
- HashMapStateBackend: stores state in JVM heap memory. Fast but limited by heap size.
- EmbeddedRocksDBStateBackend: stores state on local disk via RocksDB. Handles state much larger than memory. Required for any ML feature pipeline with large key spaces (millions of users).
Core Concept 7 - Stream-Table Dualityโ
One of the most elegant ideas in stream processing: a stream and a table are two views of the same data.
A stream is a sequence of change events ordered by time: "user 123 changed their spending limit," "user 456 logged in," "user 789 made a transaction."
A table is the current state obtained by applying all changes up to now: the current user profile, the current account balance, the current spending limit.
Formally:
- A table is a snapshot of a stream at a point in time
- A stream is the changelog of a table over time
This duality is implemented directly in Apache Kafka via compacted topics and Kafka Streams' KTable abstraction, and in Flink via the Table API. For ML, it enables a powerful pattern: your feature store is a table; the stream is the mechanism by which features are updated. A transaction event updates the user's rolling aggregate features. A model prediction event updates the user's last-predicted-risk-score. The stream is the change, the feature store is the current state.
True Streaming vs Micro-Batch - A Practical Comparisonโ
Before diving into architecture patterns, it is worth clarifying a distinction that matters for ML latency: true streaming versus micro-batch.
A true streaming system (Flink, Kafka Streams) processes each event individually as it arrives. The minimum latency is bounded only by network transit time + processing time per event - typically single-digit milliseconds.
A micro-batch system (Spark Structured Streaming) collects events for a short configurable interval (the trigger interval, typically 100msโ30s) and then processes the entire batch atomically. The minimum latency equals the trigger interval.
| Property | True Streaming | Micro-Batch (Spark Streaming) |
|---|---|---|
| Latency floor | Sub-10ms | Trigger interval (typically 100ms+) |
| Throughput | High | Very high (batch amortizes overhead) |
| Event-time support | Native, first-class | Supported, but more complex |
| Exactly-once | Via distributed snapshots | Via idempotent writes + checkpointing |
| State management | RocksDB, in-memory | In-memory (limited state size) |
| Operational complexity | Higher | Lower (reuses Spark skillset) |
| Best for ML | Feature pipelines requiring sub-second freshness | Near-real-time features (1-30s latency acceptable) |
For most ML feature pipelines where the freshness requirement is "under 5 seconds," both work. Where the requirement is "under 500ms," only true streaming works. Where the team already operates Spark heavily and the freshness requirement is 10-30 seconds, Spark Structured Streaming may be the right pragmatic choice.
Lambda vs Kappa Architectureโ
Two architectural patterns define how teams structure batch and streaming together.
Lambda Architecture (Nathan Marz, 2011): run both a batch layer (for accurate, complete results) and a speed layer (for approximate, low-latency results). Query time merges the two.
Raw data โ [Batch layer โ Batch views ] โ Merge โ Serving
โ [Speed layer โ Real-time views]
Problems with Lambda: you maintain two codebases - one batch job and one streaming job - that must produce semantically identical results. In practice, they drift. Bugs fixed in one codebase do not get fixed in the other. Testing both is twice the work.
Kappa Architecture (Jay Kreps, 2014, "Why local state is a fundamental primitive in stream processing"): eliminate the batch layer. Run only the streaming layer. When you need to reprocess historical data (model retraining, bug fixes), replay the Kafka topic from the beginning.
Raw data โ [Streaming layer โ Streaming views] โ Serving
Modern ML platforms use Kappa for feature serving: streaming is the single serving path, batch is used only for training data generation and backfills (which are themselves just Kafka replays). The key enabler is Kafka's configurable retention - keep raw events for 30โ90 days and you can always replay history.
Streaming for ML - Three Patternsโ
Pattern 1: Real-time feature computation. Events flow from Kafka into a Flink job. The Flink job maintains stateful aggregations (rolling counts, sums, averages) keyed by entity ID (user ID, device ID, merchant ID). When the aggregation updates, the new feature value is written to an online feature store (Redis, Cassandra). The model serving layer reads from the feature store at inference time.
Pattern 2: Online model serving with streaming features. Events flow into a Kafka topic. A consumer reads the event, fetches current features from the feature store, calls the model, and writes the prediction back to a Kafka output topic. The serving path is entirely streaming. Latency is measured from event arrival to prediction in milliseconds.
Pattern 3: Streaming model evaluation. Predictions flow through a Kafka topic. Ground truth labels arrive on another Kafka topic (with delay - you know the outcome of a loan application weeks later). A Flink job joins predictions with ground truth (stream-stream join with long window), computes rolling precision/recall/AUC, and writes drift alerts when metrics degrade.
Event Time vs Processing Time - Mermaid Diagramโ
Production Notesโ
Measure your P99 event delay before setting allowed lateness. Do not guess. Instrument your producers to embed event timestamps, consume from your Kafka topic for 24 hours, and compute the distribution of processing_time - event_time. The P99 of this distribution is your allowed lateness floor. Set allowed lateness to 1.5โ2ร P99 to cover tail events.
Window size selection for ML features: Start with the feature importance analysis from your model. If your batch model used 1-hour rolling features, start with 1-hour tumbling windows and 5-minute sliding windows. Validate that the streaming feature distribution matches the batch feature distribution before switching serving paths.
State size grows proportionally to cardinality ร window size. A 1-hour sliding window for 10 million users, each storing 3 aggregates (count, sum, max), requires approximately of state minimum. Use RocksDB state backend for anything over a few GB. Profile state size before production launch.
Kafka topic retention must cover your maximum allowed replay window. If you want to reprocess the last 30 days of events for a model retraining, your Kafka topic must retain 30 days of data. Set retention.ms = 2592000000 (30 days) for topics that feed model training pipelines.
Production Notesโ
Choosing event-time semantics adds operational cost - make it worth it. Event-time processing requires watermarks, which require you to measure and configure allowed lateness. You need to instrument your producers to embed event timestamps. You need to monitor the gap between event time and processing time across your pipeline. For ML features that are time-sensitive (fraud velocity, session activity, real-time engagement), this cost is absolutely justified. For features that are not time-sensitive (user age, account creation date, static profile attributes), use processing time or just read from a database at serving time - streaming is overkill.
Separate your feature computation frequency from your feature freshness. These are different things. A 60-minute rolling window does not mean you compute it once per 60 minutes. You should compute it (emit a new value) every time a new event arrives - potentially thousands of times per minute - but each computation looks back 60 minutes of event time. The window size defines the lookback period; the trigger frequency defines how often you emit a result. Most production ML feature pipelines trigger on every event, giving near-real-time feature freshness regardless of window size.
Model your late data policy as a first-class design decision. Before writing any code, answer these questions: What is our P99 event delay? What is our P99.9? What do we do with events that arrive after their window closes? The answers drive your allowed lateness setting and your late-data handling policy. Document the policy explicitly - the choice of dropping vs. side-outputting late events affects model correctness and is not obvious from reading the code six months later.
Stream-table joins are the most common source of unexpected complexity in ML pipelines. When you enrich streaming events with profile data (joining transaction events with user profile features), you are doing a stream-table join. The "table" side must be kept current - either by reading from a database at join time (adds latency, external dependency) or by maintaining a copy as a Kafka Streams KTable or Flink DataStream (adds state complexity). Decide early which approach you need. Lesson 05 covers stream-table join patterns in depth.
Plan for replay from day one. One of Kafka's most valuable properties for ML is its replay capability: keep events for 30-90 days and you can always reprocess history. This is what enables Kappa architecture. But replay only works if: (1) your Kafka topic retention covers your reprocessing window, (2) your streaming job can be restarted from an arbitrary offset, and (3) your downstream writes are idempotent (safe to overwrite). Design all three properties into your pipeline architecture before launch. It is very painful to retrofit replay capability after a production incident exposes the need for it.
Instrument training-serving skew from the moment you go live. Log both the event time and processing time for every feature written to the online feature store. In production, periodically compute the distribution of serving_time - event_time across all feature writes - this is your actual feature freshness. If the P50 is 3 seconds and the P99 is 4 minutes, you have a tail latency problem that will silently degrade your model for 1% of predictions. Compare this distribution against the freshness profile of your training data. If training data was computed with 5-second freshness and production features have 4-minute P99 freshness, your model has been trained on a distribution it will rarely see in production. This is one of the most common causes of unexplained model degradation in production ML systems.
Common Mistakesโ
Using processing time for ML features. This is the most common correctness error in streaming ML systems. "Number of transactions in the last 60 minutes" computed using processing time will give wrong answers whenever events arrive out of order (which they always do at scale). Always use event time for any feature that involves temporal semantics.
Setting allowed lateness to 0. With zero allowed lateness, any event that arrives even 1 millisecond after its window's theoretical close time is dropped. Real networks have variable latency. Even in a low-latency datacenter setup, P99 network latency is easily 10โ50ms, which means you will drop legitimate events. Set allowed lateness based on measured P99 event delay.
Ignoring consumer lag as a signal. Consumer lag is the canary in the coal mine for streaming pipelines. A lag that grows over time indicates that your processing speed is slower than your ingestion speed. Left unchecked, this lag can grow to hours, turning your "real-time" feature pipeline into a de facto batch pipeline. Alert on lag, not just consumer errors.
Assuming exactly-once is free. Enabling Kafka transactions and exactly-once semantics adds overhead - typically 20โ40% throughput reduction. This is acceptable for low-volume, high-value pipelines (financial transactions, fraud features). For high-volume, low-value pipelines (clickstream impressions, ad views), at-least-once with idempotent writes is more appropriate. Always benchmark before choosing semantics.
Test with late data in your CI pipeline. Write test cases that deliberately inject late events - events with timestamps 30, 60, 300 seconds behind the watermark. Verify that your pipeline handles them according to your configured policy (drop, side output, or recompute). Most streaming pipelines are only tested with on-time events and break in production when late data appears.
The "correct" window type is defined by the ML feature, not engineering preference. Before choosing a window type, ask: "What does this feature represent semantically?" A feature defined as "spend in the trailing 60 minutes" is naturally a sliding window. A feature defined as "number of transactions in the 5 PM to 6 PM hour" is naturally a tumbling window. A feature defined as "total spend in the current visit" is naturally a session window. Let the feature semantics drive the window choice. If you choose tumbling because it is simpler but the feature semantics require sliding, you will serve systematically incorrect features - just in a way that is hard to detect.
Interview Q&Aโ
Q: What is the difference between event time and processing time, and why does it matter for ML?
Event time is when an event actually occurred in the real world, embedded as a timestamp in the event payload. Processing time is when the stream processor receives and processes the event. They diverge whenever there is network delay, buffering, or out-of-order delivery - common in mobile apps, edge devices, and multi-datacenter systems. For ML features, the difference matters critically: a "transactions in the last 60 minutes" feature computed using processing time will include events that occurred outside that window and exclude events that occurred inside it but arrived late. This creates training-serving skew if training data was computed with correct event-time semantics. Always use event time for temporal ML features.
Q: What is a watermark in stream processing?
A watermark is a statement about completeness: "all events with event time less than W have arrived." The heuristic watermark formula is , where is the allowed lateness. As events arrive with increasing timestamps, the watermark advances, allowing windows to be finalized. A window with event-time range can be safely closed when the watermark exceeds . Late events (event time below the current watermark) are either dropped, written to a side output, or used to trigger a window retraction and recomputation, depending on the pipeline's late data policy.
Q: What are the three delivery semantics and which is appropriate for ML feature computation?
At-most-once means events may be lost but never duplicated - producers don't retry, consumers skip events after a crash. At-least-once means events are never lost but may be processed more than once - producers retry, consumers re-process events from the last committed offset after a crash. Exactly-once means each event is processed exactly once even in the presence of failures - requires idempotent producers, transactional consumers, and atomic offset commits. For ML feature computation, the answer depends on the feature: idempotent features (max, min, last-value) can use at-least-once safely; non-idempotent features (count, sum) require exactly-once or at-least-once with idempotent upsert semantics to the feature store. Model training data collection typically requires exactly-once to avoid biasing training data.
Q: What is the difference between tumbling, sliding, and session windows?
Tumbling windows are non-overlapping fixed-size windows - each event belongs to exactly one window. Use for hourly/daily aggregates or fixed-interval fraud velocity. Sliding windows are overlapping fixed-size windows that advance by a step smaller than the window size - each event belongs to windows. Use for rolling statistics like "spend in last 60 minutes, updated every 5 minutes." Session windows are variable-size windows defined by inactivity gaps - a window opens on the first event and closes when no event is seen for the gap duration. Use for user sessions, browsing sessions, or any natural grouping by inactivity.
Q: What is backpressure and how do streaming systems handle it?
Backpressure occurs when the consumer processes events slower than the producer emits them. Without a mechanism to handle this, the gap grows until either memory is exhausted (if buffering) or data is dropped (if using bounded buffers). Apache Flink handles backpressure via credit-based flow control: each downstream operator has a buffer with a credit counter; upstream operators can only send as many records as the downstream has credits. When a downstream operator is slow, it issues fewer credits, which slows the upstream operator, which propagates back to the source. This prevents out-of-memory crashes at the cost of increased end-to-end latency during backpressure periods. In Kafka-based pipelines, consumer lag is the observable manifestation of backpressure - the Kafka topic acts as an effectively unlimited buffer, and consumer lag is the metric to alert on.
Q: What is the Lambda architecture and why is Kappa preferred for modern ML?
Lambda architecture runs both a batch layer (accurate but high-latency) and a speed layer (approximate but low-latency) in parallel, merging their outputs at query time. The motivation was that in 2011, streaming systems could not provide the accuracy of batch systems. The problem: you maintain two separate codebases (one batch, one streaming) that must compute semantically identical results. In practice they drift - bugs fixed in one are not fixed in the other. Kappa architecture eliminates the batch layer entirely. A single streaming pipeline handles both real-time serving and historical reprocessing (by replaying from Kafka). Modern systems (Flink, Kafka Streams) have improved enough to compute exactly the same results as batch for the feature types ML systems need. For most ML platforms, Kappa is simpler, more maintainable, and equally correct.
Q: What is the stream-table duality?
A stream and a table are two representations of the same underlying data. A table is a snapshot of the current state - the result of applying all changes up to now. A stream is the changelog of a table - the sequence of events that produced the current state. Every table can be reconstructed from its stream (by replaying from the beginning), and every stream can be "materialized" into a table (by applying all changes to an initial empty state). This duality is fundamental to how Kafka, Kafka Streams, and Flink model data. For ML, it explains why the feature store is a table (current feature values), the event stream is the mechanism by which features are updated (the changelog), and why you can rebuild feature history by replaying the stream from Kafka.
Q: What is the difference between true streaming and micro-batch, and when does it matter for ML?
True streaming (Flink, Kafka Streams) processes each event individually as it arrives. Latency is bounded by network transit time plus per-event processing time - typically sub-10ms. Micro-batch (Spark Structured Streaming) collects events for a configurable trigger interval (100ms to 30s) and processes the collected batch atomically. The minimum latency is the trigger interval. For ML feature pipelines with freshness requirements under 1 second (fraud detection, real-time bidding), only true streaming is viable. For requirements of 5-30 seconds, micro-batch is often the pragmatic choice - especially when the team already operates Spark, because it reuses existing infrastructure and skills. The throughput advantage of micro-batching (batch amortizes per-record overhead) is meaningful at very high event rates (millions of events/second), but for most ML feature pipelines, true streaming throughput is sufficient.
Q: How do you handle state in a distributed streaming system, and why is it harder than stateless processing?
Stateless operators - filters, maps, projections - process each event independently and scale trivially: add more parallel instances, distribute events across them, done. Stateful operators require memory of past events, which creates three challenges. First, state must be partitioned consistently: all events for a given key (e.g., user ID) must be routed to the same processing instance, so that instance has the complete history needed for that key's aggregation. Second, state must survive failures: if a processing instance crashes, its state cannot be lost - it must be checkpointed to durable storage (RocksDB, S3) periodically so that on recovery the operator resumes from its last checkpoint rather than starting from scratch. Third, state size is bounded by storage capacity: a 24-hour rolling window for 50 million users storing 10 aggregates each requires gigabytes of state, which must live on disk (RocksDB backend) rather than in memory. In Apache Flink, stateful operators use the ValueState, ListState, MapState, and AggregatingState APIs, all backed by configurable state backends. The combination of partitioned state, periodic checkpointing, and RocksDB-backed storage is what makes Flink suitable for production ML feature pipelines with large key spaces.
The concepts in this lesson recur throughout the module. Every Kafka lesson references delivery semantics. Every Flink lesson builds on event-time and watermarks. The window types you learned here are the same abstractions Flink and Kafka Streams expose in their APIs - just with production-grade state management and fault tolerance underneath. Read this lesson. Then read it again before any streaming interview.
