Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Pipeline Viz demo on the EngineersOfAI Playground - no code required. :::

Stream Processing for ML Systems

The Production Scenario

Your fraud detection team has a model that uses three groups of features: static user profile features (updated daily), historical aggregate features (updated hourly), and velocity features - how many transactions has this user made in the last 5 minutes, 15 minutes, and hour? The velocity features are the most predictive. Fraudsters typically run multiple small test transactions in rapid succession before attempting a large fraudulent charge.

The problem is computing those velocity features. Every transaction event arrives on a Kafka topic. To answer "how many transactions has this user made in the last 5 minutes," you need to maintain a rolling count per user over a sliding 5-minute window, updated every time a new transaction arrives, accessible within 20ms for any user ID. There are 50 million active users. The transaction rate peaks at 80,000 per second.

You cannot query the transaction database - it has a 200ms query latency. You cannot recompute from scratch on every request - that requires scanning the last 5 minutes of transactions for that user, which means searching 80,000 * 60 * 5 = 24 million records per request. You cannot precompute and store the values - they need to be exact for the current timestamp, not approximated from a batch job that ran 30 minutes ago.

The only viable solution is a streaming computation: a continuously running process that reads every transaction event as it arrives, updates the per-user window counts in memory, and makes those counts available via a fast lookup. This is exactly what stream processing was designed to do.

Why This Exists - The Limits of Batch for Real-Time Features

Batch processing is the default for computing ML features. A nightly Spark job reads all events from the data warehouse, computes aggregates per user, and writes the results to a feature store. This works for features where daily freshness is acceptable: historical engagement rates, demographic attributes, long-term behavioral signals.

But velocity features - and many other fraud, ad ranking, and personalization features - require freshness measured in seconds, not hours. A user who made 20 transactions in the last 5 minutes looks very different from a user who made 20 transactions in the last year. The batch approach collapses both into the same historical count.

Stream processing solves this by making computation continuous. Instead of running once per day, the computation runs on every event as it arrives. The state (rolling counts, aggregates) is maintained in memory between events. Features are always fresh.

Historical Context

The theoretical foundation for stream processing is the Dataflow model, published by Akidau et al. at Google in 2015. Google had been running large-scale streaming computations internally since at least 2003 (the MillWheel paper, 2013 describes an earlier version). The Dataflow paper formalized the concepts of watermarks, windows, and triggers that are now standard across all streaming systems.

Apache Flink was originally developed at TU Berlin and became an Apache top-level project in 2014. It was among the first open-source systems to implement the full Dataflow model, including event-time processing and exactly-once semantics. Kafka Streams was introduced by Confluent in 2016 as a lighter-weight option embedded in client applications.

The application of these systems to ML feature computation was a natural extension: as ML models grew more sophisticated and required features from live event streams, teams at Uber (Michelangelo, 2017), LinkedIn, and Airbnb built Flink-based real-time feature pipelines that are now standard in the industry.

Core Concepts

Bounded vs Unbounded Data

A batch computation reads a bounded dataset - a file, a table with a finite number of rows, a partition from yesterday's events. You know when you have processed all the data.

A stream computation reads an unbounded dataset - a Kafka topic with no end, a live event stream that continues indefinitely. You never know when you have processed "all" the data, because more data is always arriving. This fundamentally changes how you compute aggregates: instead of SUM(all_transactions), you compute SUM(transactions_in_window) where the window defines a finite scope over the infinite stream.

Windowing Types

Windows are the mechanism for computing aggregates over finite slices of an infinite stream:

Tumbling windows: Non-overlapping fixed-size windows. Every event belongs to exactly one window. A 5-minute tumbling window groups events from 12:00-12:05, 12:05-12:10, etc.

Event stream: ---e1--e2--e3--e4--e5--e6---→
| window 1 | window 2 |
12:00 12:05 12:10

Sliding windows: Overlapping windows of fixed size, sliding by a fixed step. A "last 5 minutes" rolling count is typically modeled as a 5-minute window sliding every 1 minute.

Session windows: Variable-length windows defined by activity gaps. A session starts when a user event arrives and ends when there is no event for a configurable timeout period. Useful for session-level feature computation.

Watermarks: Handling Late Data

Events arrive out of order. A mobile app might send events from 2:00 PM that arrive at the stream processor at 2:05 PM due to network delays. A streaming system must decide: when is a time window "closed" - when can we emit the final aggregate and discard the state?

Watermarks are the mechanism. A watermark at time TT asserts: "All events with timestamp earlier than TT have arrived." Events with timestamp before TT that arrive after the watermark are "late events." The system can either:

  • Drop late events (fastest, least accurate)
  • Accept late events and emit corrected results (slower, more accurate)
  • Apply a side output for late events (handle them separately)

The watermark lag (how far behind the current time the watermark is) is the key parameter: larger lag allows more late events at the cost of higher latency in result emission.

Apache Flink is the production standard for stateful stream processing for ML. The following example computes fraud detection velocity features from a Kafka transaction stream:

# flink_fraud_features.py
# Uses PyFlink - Apache Flink's Python API
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.window import SlidingEventTimeWindows
from pyflink.datastream.functions import ProcessWindowFunction, AggregateFunction
from pyflink.common import WatermarkStrategy, Duration, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.state import ValueStateDescriptor
import json
import time
from dataclasses import dataclass
from typing import Iterable


@dataclass
class Transaction:
user_id: str
transaction_id: str
amount: float
merchant_category: str
timestamp_ms: int
is_international: bool


@dataclass
class FraudFeatures:
user_id: str
window_end_ms: int
# Velocity features
tx_count_5min: int
tx_count_15min: int
tx_count_1hr: int
# Amount features
total_amount_5min: float
max_amount_5min: float
# Behavioral features
unique_merchants_5min: int
international_fraction: float


class TransactionAggregator(AggregateFunction):
"""Accumulator for window aggregation."""

def create_accumulator(self) -> dict:
return {
"count": 0,
"total_amount": 0.0,
"max_amount": 0.0,
"merchants": set(),
"international_count": 0,
}

def add(self, transaction: Transaction, accumulator: dict) -> dict:
accumulator["count"] += 1
accumulator["total_amount"] += transaction.amount
accumulator["max_amount"] = max(accumulator["max_amount"], transaction.amount)
accumulator["merchants"].add(transaction.merchant_category)
if transaction.is_international:
accumulator["international_count"] += 1
return accumulator

def get_result(self, accumulator: dict) -> dict:
return {
"count": accumulator["count"],
"total_amount": accumulator["total_amount"],
"max_amount": accumulator["max_amount"],
"unique_merchants": len(accumulator["merchants"]),
"international_fraction": (
accumulator["international_count"] / accumulator["count"]
if accumulator["count"] > 0 else 0.0
),
}

def merge(self, a: dict, b: dict) -> dict:
return {
"count": a["count"] + b["count"],
"total_amount": a["total_amount"] + b["total_amount"],
"max_amount": max(a["max_amount"], b["max_amount"]),
"unique_merchants": a["unique_merchants"] + b["unique_merchants"],
"international_fraction": (
(a["international_fraction"] * a["count"] +
b["international_fraction"] * b["count"])
/ max(1, a["count"] + b["count"])
),
}


class FraudFeatureWriter(ProcessWindowFunction):
"""Writes computed features to Redis after window closes."""

def process(
self,
user_id: str,
context: ProcessWindowFunction.Context,
agg_result: Iterable[dict],
) -> Iterable[str]:
"""Called once per window when the window closes."""
agg = list(agg_result)[0] # One aggregate per window

feature_key = f"fraud:features:{user_id}"
window_end = context.window().max_timestamp()

# In production: write to Redis via a Flink Redis sink
# Here we emit as JSON for the sink to handle
features = {
"user_id": user_id,
"window_end_ms": window_end,
"tx_count_5min": agg["count"],
"total_amount_5min": agg["total_amount"],
"max_amount_5min": agg["max_amount"],
"unique_merchants_5min": agg["unique_merchants"],
"international_fraction": agg["international_fraction"],
}
yield json.dumps(features)


def build_fraud_feature_pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(16) # 16 parallel tasks

# Read from Kafka
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("transactions")
.set_group_id("flink-fraud-features")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)

# Parse transactions with event-time watermarks
# Watermark lag = 5 seconds: accept events up to 5s late
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(
lambda tx, _: json.loads(tx)["timestamp_ms"]
)
)

transaction_stream = (
env
.from_source(kafka_source, watermark_strategy, "Kafka Transactions")
.map(lambda s: Transaction(**json.loads(s)))
.key_by(lambda tx: tx.user_id) # Partition by user - one state per user
)

# 5-minute sliding window, sliding every 1 minute
five_min_features = (
transaction_stream
.window(SlidingEventTimeWindows.of(
size=Duration.of_minutes(5),
slide=Duration.of_minutes(1),
))
.aggregate(TransactionAggregator(), FraudFeatureWriter())
)

# Write features to Redis (via Kafka intermediate topic here)
five_min_features.sink_to(
# In production: use a Redis sink connector
# Here: write to a Kafka topic that a Redis writer consumes
build_kafka_sink("fraud-features")
)

env.execute("FraudFeaturePipeline")

Flink's stateful processing is what makes it suitable for ML feature computation. State is the data that a processing task needs to remember between events:

# flink_stateful_features.py
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import (
ValueStateDescriptor,
MapStateDescriptor,
ListStateDescriptor,
)
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimerService


class SessionFeatureComputer(KeyedProcessFunction):
"""
Computes session-level features with explicit state management.

Session = a sequence of user events with no gap longer than 30 minutes.
Features: session duration, events in session, conversion funnel depth.
"""

def open(self, runtime_context):
"""Initialize state descriptors - called once at task startup."""
# Session start time
self.session_start = runtime_context.get_state(
ValueStateDescriptor("session_start", Types.LONG())
)
# Event count in current session
self.event_count = runtime_context.get_state(
ValueStateDescriptor("event_count", Types.INT())
)
# Max funnel depth reached (0=browse, 1=cart, 2=checkout, 3=purchase)
self.max_funnel_depth = runtime_context.get_state(
ValueStateDescriptor("max_funnel_depth", Types.INT())
)
# Timer handle for session timeout
self.session_timeout_timer = runtime_context.get_state(
ValueStateDescriptor("session_timeout_timer", Types.LONG())
)

SESSION_TIMEOUT_MS = 30 * 60 * 1000 # 30 minutes

def process_element(self, event: dict, ctx: KeyedProcessFunction.Context):
"""Called for each event. ctx.timer_service() allows setting timers."""
event_time = event["timestamp_ms"]
funnel_depth = {"browse": 0, "cart": 1, "checkout": 2, "purchase": 3}.get(
event["event_type"], 0
)

# Initialize session if first event
if self.session_start.value() is None:
self.session_start.update(event_time)
self.event_count.update(0)
self.max_funnel_depth.update(0)

# Update state
self.event_count.update(self.event_count.value() + 1)
self.max_funnel_depth.update(
max(self.max_funnel_depth.value(), funnel_depth)
)

# Cancel existing timeout timer and set a new one
if self.session_timeout_timer.value() is not None:
ctx.timer_service().delete_event_time_timer(
self.session_timeout_timer.value()
)
timeout = event_time + self.SESSION_TIMEOUT_MS
ctx.timer_service().register_event_time_timer(timeout)
self.session_timeout_timer.update(timeout)

# Emit current session features (for serving - updated on every event)
yield {
"user_id": event["user_id"],
"session_duration_ms": event_time - self.session_start.value(),
"events_in_session": self.event_count.value(),
"max_funnel_depth": self.max_funnel_depth.value(),
"timestamp": event_time,
}

def on_timer(self, timestamp: int, ctx: KeyedProcessFunction.OnTimerContext):
"""Called when the session timeout fires - session ended."""
# Clear session state
self.session_start.clear()
self.event_count.clear()
self.max_funnel_depth.clear()
self.session_timeout_timer.clear()
# Could emit a "session ended" event here for analysis

Real Example: Fraud Detection Feature Pipeline

A realistic fraud detection velocity feature computation with Kafka consumer:

# fraud_feature_consumer.py
import json
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
import redis
from confluent_kafka import Consumer, Producer, KafkaException


@dataclass
class SlidingWindowCounter:
"""
Approximate sliding window counter using a circular buffer of buckets.

Divides the window into N buckets. Each bucket counts events in a
time slice. When the window slides, old buckets are evicted and
the count is the sum of remaining buckets.

Trade-off: O(N) memory, O(1) update, approximate (within 1 bucket's error).
"""
window_size_seconds: int
n_buckets: int = 60 # One bucket per unit (second, minute, etc.)
buckets: dict = field(default_factory=lambda: defaultdict(float))
last_cleanup: float = field(default_factory=time.time)

@property
def bucket_size_seconds(self) -> float:
return self.window_size_seconds / self.n_buckets

def _current_bucket(self) -> int:
return int(time.time() / self.bucket_size_seconds)

def _evict_old_buckets(self):
cutoff = self._current_bucket() - self.n_buckets
old_keys = [k for k in self.buckets if k < cutoff]
for k in old_keys:
del self.buckets[k]

def add(self, value: float = 1.0):
self._evict_old_buckets()
self.buckets[self._current_bucket()] += value

@property
def sum(self) -> float:
self._evict_old_buckets()
return sum(self.buckets.values())

@property
def count(self) -> int:
return int(self.sum)


class InMemoryFraudFeatureComputer:
"""
Lightweight in-process fraud feature computation.

For lower scale (under 10K tx/sec): compute in a single consumer.
For higher scale: use Apache Flink with distributed state.
"""

def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Per-user sliding window counters (in memory - lost on restart)
self.user_tx_counts: dict = defaultdict(
lambda: {
"5min": SlidingWindowCounter(window_size_seconds=300),
"15min": SlidingWindowCounter(window_size_seconds=900),
"1hr": SlidingWindowCounter(window_size_seconds=3600),
}
)
self.user_amount_windows: dict = defaultdict(
lambda: {
"5min": SlidingWindowCounter(window_size_seconds=300),
}
)

def process_transaction(self, tx: dict):
user_id = tx["user_id"]
amount = tx["amount"]

# Update sliding windows
self.user_tx_counts[user_id]["5min"].add(1)
self.user_tx_counts[user_id]["15min"].add(1)
self.user_tx_counts[user_id]["1hr"].add(1)
self.user_amount_windows[user_id]["5min"].add(amount)

# Write to Redis for serving layer to read
features = {
"tx_count_5min": self.user_tx_counts[user_id]["5min"].count,
"tx_count_15min": self.user_tx_counts[user_id]["15min"].count,
"tx_count_1hr": self.user_tx_counts[user_id]["1hr"].count,
"total_amount_5min": self.user_amount_windows[user_id]["5min"].sum,
"updated_at": time.time(),
}

# Use a pipeline for atomic multi-key write
pipe = self.redis.pipeline()
pipe.hmset(f"fraud:features:{user_id}", features)
pipe.expire(f"fraud:features:{user_id}", 7200) # 2hr TTL
pipe.execute()

def run_kafka_consumer(self, bootstrap_servers: str, topic: str):
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": "fraud-feature-computer",
"auto.offset.reset": "latest",
"enable.auto.commit": False, # Manual commit for reliability
})
consumer.subscribe([topic])

tx_processed = 0
last_log = time.time()

try:
while True:
msg = consumer.poll(timeout=0.1)

if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())

tx = json.loads(msg.value().decode())
self.process_transaction(tx)

tx_processed += 1
consumer.commit(msg) # Commit after processing

# Log throughput every 10 seconds
if time.time() - last_log > 10:
rate = tx_processed / 10
print(f"Throughput: {rate:.0f} tx/sec")
tx_processed = 0
last_log = time.time()

except KeyboardInterrupt:
pass
finally:
consumer.close()

Production Engineering Notes

Flink checkpointing for exactly-once semantics: Without checkpointing, a Flink job crash loses all in-memory state. Enable RocksDB-backed checkpointing to periodically persist state to S3 or HDFS:

from pyflink.datastream.checkpoint_config import CheckpointingMode
from pyflink.datastream.state_backend import RocksDBStateBackend

env.set_state_backend(RocksDBStateBackend("s3://my-bucket/flink-checkpoints/"))
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_checkpoint_interval(60_000) # Every 60 seconds
env.get_checkpoint_config().set_checkpoint_timeout(600_000) # 10 minute timeout

Backpressure monitoring: When a downstream sink (Redis write) is slower than the upstream source (Kafka), Flink applies backpressure - it slows down the source to avoid memory overflow. Monitor backpressure in the Flink Web UI. Sustained backpressure indicates either a slow sink or insufficient parallelism.

Watermark lag calibration: Set your watermark lag based on your network delay distribution. If 99% of events arrive within 2 seconds of their event timestamp, a 3-second lag handles most late events without significant window close latency. Profile your actual event delivery latency distribution and set the lag at the 99th percentile.

:::warning Stateful Flink Jobs and Rescaling When you rescale a Flink job (add or remove parallelism), the state is redistributed. For keyed state (per-user windows), Flink redistributes state by key range - each key goes to a specific new task. This works correctly but takes time proportional to the state size. During rescaling, the job pauses briefly. Plan rescaling during low-traffic periods and monitor state migration time. :::

:::danger Training-Serving Skew in Stream Features If your training data uses batch-computed velocity features (e.g., a Spark job that computes "tx count in last 5 minutes" over a historical dataset) but your serving uses streaming features (Flink computing the same window live), the computation semantics may differ. Specifically: how are window boundaries defined? Does the batch computation use event time or processing time? Does the streaming computation handle late events the same way? Test explicitly that both approaches produce identical results on a reference dataset. :::

Interview Q&A

Q: What is the difference between event time and processing time in stream processing, and why does it matter for ML?

Event time is the timestamp when an event actually occurred (embedded in the event payload). Processing time is when the event is processed by the streaming system (the current wall clock). These differ because events travel through networks, get buffered in queues, and arrive out of order. For ML features that represent user behavior at a specific point in time - like "did the user click anything in the last 5 minutes before making a purchase?" - you need event time. If you use processing time, a mobile event sent at 2:00 PM but received at 2:03 PM would be counted in the 2:00-2:05 PM window correctly with event time, but in the 2:00-2:05 PM window even though it was processed at 2:03 with processing time. The difference causes training-serving skew when the serving system uses processing time but training used event time.

Q: How do watermarks handle late-arriving events in a streaming ML pipeline?

A watermark at time TT tells the streaming system: "I am confident that all events with timestamp less than TT have arrived." When a window's end time falls below the current watermark, the window is closed and its aggregate is emitted. Late events (events with timestamp below the current watermark) can be handled in three ways: drop them (fast, loses accuracy), accept them and update the result (slow, fully accurate, requires retracting previous result), or route them to a side output (handle separately with different logic). For ML feature computation, dropping late events beyond a threshold and accepting late events within a few seconds is the standard approach - exact velocity counts matter less than low latency.

Q: What is Flink's state backend and why does it matter for large-scale ML feature computation?

Flink's state backend determines where task state is stored during job execution. The heap backend stores state in Java heap memory - fastest access but limited by available RAM. The RocksDB backend stores state in RocksDB on local disk, with hot data in memory - supports much larger state than fits in RAM at the cost of some latency for cold reads. For ML feature computation with 50 million users and per-user sliding windows, the state size may exceed available heap memory. RocksDB allows you to maintain state for all users without OOM errors. The trade-off: RocksDB adds 1-5ms latency for state reads compared to the heap backend. For most ML feature use cases, this is acceptable.

Q: Describe the architecture of a real-time fraud detection feature pipeline serving 80,000 transactions per second.

At 80K tx/sec, a single consumer cannot keep up. The architecture: Kafka with 32 partitions (each partition handles ~2,500 tx/sec), Flink job with 32 parallel tasks (one per partition, ensuring keyed state locality), RocksDB state backend for per-user window state, Redis Cluster as the feature store output (16 shards), and Redis cluster read replicas co-located with the model serving tier. Flink computes 5-min, 15-min, and 1-hour rolling counts per user on the stream, checkpoints state to S3 every 60 seconds, and writes feature updates to Redis with a 2-hour TTL. The model serving layer reads from Redis (1-2ms lookup) at transaction decision time.

Q: How would you test a Flink stream processing job for correctness before deploying to production?

Three testing levels: (1) Unit test individual operators by creating test harnesses with MiniClusterWithClientResource (Flink's embedded test cluster). Feed synthetic events and assert on output events. (2) Integration test by replaying a snapshot of production Kafka data through the job and comparing output to a batch-computed reference (the batch computation is assumed correct). This catches training-serving skew. (3) Shadow run in production: deploy the new job alongside the production job, compare feature values for the same users, and alert on divergence above a threshold before routing any model serving traffic to the new features.

© 2026 EngineersOfAI. All rights reserved.