Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Pipeline Viz demo on the EngineersOfAI Playground - no code required. :::

Stream-to-Feature Pipelines

Four Components, Not One

A recommendation team had a clear goal: build a feature called "items the user clicked in the last 30 minutes." The raw material existed. Every user click produced a Kafka event. The data was there, flowing at 50,000 events per second.

But the path from Kafka event to a queryable, low-latency feature required four components that the team hadn't fully accounted for. First, a stateful stream processor to maintain user-level state across the event stream - you can't compute a 30-minute window from a stateless function because the window spans many individual events. Second, a sliding window aggregation over event time - not processing time, because events arrive out of order and you need the window boundaries to reflect when clicks actually happened. Third, a write to Redis so the model server can retrieve the feature within its 15ms latency budget. Fourth, a schema registration in the feature store registry so the training pipeline knows the feature exists and can include it in training datasets.

Miss any one of these components and you have a broken pipeline: compute the wrong window (processing time instead of event time) and your 30-minute features drift when the pipeline falls behind. Skip the Redis write and the model server can't serve the feature. Skip the registry and the training pipeline generates its features independently and training-serving skew sets in.

This lesson builds all four components from scratch.


Why This Exists

Before stream processing matured, real-time features were built with a problematic pattern: a microservice subscribed to Kafka, maintained state in memory, and served features via REST. This worked until the service restarted and lost all state. Or until the service fell behind and state diverged from the event stream. Or until two services needed the same feature and each built it independently with subtly different logic.

Apache Flink emerged from academic research (stratosphere project, Technische Universität Berlin, ~2011, production release 2014) as a general-purpose stateful stream processor with first-class support for event time, watermarks, and fault-tolerant state. Flink changed the economics of stream-to-feature pipelines: state is durable (RocksDB backend, snapshot to S3), event time is a first-class citizen, and the same job can write to multiple sinks (Redis for online serving, Iceberg for offline training) in a single coordinated transaction.

The modern stream-to-feature pipeline runs in Flink. The event source is Kafka. The sinks are the online store (Redis) and the offline store (Iceberg or Delta Lake). The schema registry (Confluent Schema Registry, AWS Glue) tracks event schemas and feature definitions. This is not four separate systems bolted together - it is a single Flink job that reads, transforms, and writes in a coordinated, fault-tolerant manner.


The Stream-to-Feature Pipeline Pattern


Event Time vs. Processing Time

This distinction is the most important concept in stream-to-feature pipelines. Get it wrong and your features are subtly incorrect in ways that are hard to detect.

Processing time is when the event arrives at Flink. If Kafka experiences a 5-minute backlog, events generated at 2:00 PM arrive at Flink at 2:05 PM. A processing-time window that fires at 2:05 PM would include events that happened between 2:00 PM and 2:05 PM in the real world, mixed with events from 1:55 PM to 2:00 PM that arrived late.

Event time is when the event actually occurred, recorded in the event payload (e.g., click_timestamp). A 30-minute event-time window includes exactly the events that occurred within that 30-minute real-world window, regardless of when they arrived at Flink.

For feature engineering, event time is almost always correct. The feature items_clicked_30m means "items clicked in the 30 real-world minutes preceding this transaction" - not "items that arrived at Flink in the 30 minutes before this message was processed."

The cost of event time: you must handle late arrivals. Events that were generated at 2:00 PM but arrive at 2:07 PM are 7 minutes late. The watermark mechanism handles this: you configure a maximum allowed latency (e.g., 2 minutes), and Flink delays window evaluation until the watermark (max event time seen minus allowed latency) passes the window boundary. Events that arrive after the watermark are dropped or routed to a side output.


Watermarks and Late Arrivals

The watermark is Flink's estimate of "what is the latest event time for which all earlier events have arrived." It is computed from the stream itself.

A simple watermark strategy: subtract a fixed delay from the maximum event timestamp seen so far.

watermark(t)=max(event_time seen up to t)Δ\text{watermark}(t) = \max(\text{event\_time seen up to } t) - \Delta

where Δ\Delta is the allowed out-of-orderness. A Δ\Delta of 2 minutes means: "I will wait up to 2 minutes for late events. Any event more than 2 minutes late is discarded (or sent to the late data side output)."

The Δ\Delta trade-off:

  • Larger Δ\Delta: more complete windows (fewer late events discarded), but higher latency before a window fires
  • Smaller Δ\Delta: lower latency, but more late events dropped

For user session features (30-minute window), a Δ\Delta of 30 seconds is typically sufficient - click events are generated by browsers and mobile apps and rarely delayed more than a few seconds. For IoT sensor data with unreliable network connectivity, a Δ\Delta of 5–10 minutes may be necessary.


This PyFlink job reads click events from Kafka, computes session features (items clicked in the last 30 minutes per user), and writes to Redis (online) and Iceberg (offline).

# pipeline/session_features_job.py
"""
PyFlink job: compute user session features from click events.
Reads from Kafka, writes to Redis (online) and Iceberg (offline).
"""

import json
import time
import redis
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
KafkaSource,
KafkaOffsetsInitializer,
)
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import (
WatermarkStrategy,
TimestampAssigner,
)
from pyflink.datastream.functions import (
MapFunction,
ProcessWindowFunction,
RichMapFunction,
)
from pyflink.datastream.window import SlidingEventTimeWindows, Time
from pyflink.common import Types, Duration
from pyflink.datastream.state import ValueStateDescriptor, MapStateDescriptor


class ClickEvent:
def __init__(self, user_id: str, item_id: str, category: str, timestamp: float):
self.user_id = user_id
self.item_id = item_id
self.category = category
self.timestamp = timestamp

@classmethod
def from_json(cls, json_str: str) -> "ClickEvent":
d = json.loads(json_str)
return cls(
user_id=d["user_id"],
item_id=d["item_id"],
category=d.get("category", "unknown"),
timestamp=d["timestamp"], # Unix epoch float
)


class ParseClickEvent(MapFunction):
"""Parse raw JSON click events. Route malformed events to side output."""

def map(self, raw: str):
try:
return ClickEvent.from_json(raw)
except (json.JSONDecodeError, KeyError) as e:
# In production, route to DLQ side output
# For simplicity here, return None (filtered downstream)
return None


class ClickEventTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, element, record_timestamp: int) -> int:
# Convert float seconds to milliseconds for Flink's event time API
return int(element.timestamp * 1000)


class SessionFeatureWindowFunction(ProcessWindowFunction):
"""
Computes session features from a 30-minute sliding window of click events.
Called once per window per user when the watermark passes the window end.
"""

def process(self, key: str, context, elements):
clicks = list(elements)
if not clicks:
return

item_ids = [c.item_id for c in clicks]
categories = [c.category for c in clicks]

feature_vector = {
"user_id": key,
"items_clicked_30m": item_ids,
"item_count_30m": len(item_ids),
"distinct_items_30m": len(set(item_ids)),
"distinct_categories_30m": len(set(categories)),
"top_category_30m": max(set(categories), key=categories.count),
"window_end": context.window().end / 1000, # back to seconds
"computed_at": time.time(),
}

yield feature_vector


class RedisWriter(RichMapFunction):
"""Write feature vectors to Redis online store."""

def __init__(self, redis_host: str, redis_port: int):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_client = None

def open(self, runtime_context):
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
socket_timeout=0.1,
decode_responses=True,
)

def map(self, feature_vector: dict):
user_id = feature_vector["user_id"]
key = f"user:{user_id}:session_features"

# Serialize list features as JSON strings
redis_features = {
"items_clicked_30m": json.dumps(feature_vector["items_clicked_30m"]),
"item_count_30m": str(feature_vector["item_count_30m"]),
"distinct_items_30m": str(feature_vector["distinct_items_30m"]),
"distinct_categories_30m": str(feature_vector["distinct_categories_30m"]),
"top_category_30m": feature_vector["top_category_30m"],
"computed_at": str(feature_vector["computed_at"]),
}

pipe = self.redis_client.pipeline()
pipe.hset(key, mapping=redis_features)
pipe.expire(key, 7200) # 2-hour TTL for session features
pipe.execute()

return feature_vector # Pass through for Iceberg write


def build_pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8) # Match to Kafka partition count
env.enable_checkpointing(60000) # Checkpoint every 60 seconds

# Configure RocksDB state backend for large state
from pyflink.datastream.state_backend import EmbeddedRocksDBStateBackend
env.set_state_backend(EmbeddedRocksDBStateBackend())

# Kafka source
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("click-events")
.set_group_id("session-feature-pipeline")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)

# Watermark strategy: allow up to 30s of out-of-orderness
watermark_strategy = (
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(30))
.with_timestamp_assigner(ClickEventTimestampAssigner())
)

stream = (
env.from_source(kafka_source, watermark_strategy, "click-events")
.map(ParseClickEvent(), output_type=Types.PICKLED_BYTE_ARRAY())
.filter(lambda x: x is not None)
.key_by(lambda event: event.user_id)
.window(SlidingEventTimeWindows.of(
Time.minutes(30), # window size
Time.minutes(1), # slide interval - new feature every minute
))
.process(SessionFeatureWindowFunction())
.map(RedisWriter("redis", 6379))
# TODO: add Iceberg sink for offline store
)

env.execute("session-feature-pipeline")


if __name__ == "__main__":
build_pipeline()

Sliding Window Aggregations

Flink's window API supports four aggregation patterns relevant to feature engineering:

Count - how many events in the window. Exact, O(1) per event.

Sum - total amount in the window. Exact, O(1) per event with an AggregateFunction that maintains a running sum.

Distinct count - how many unique items in the window. Exact distinct count requires O(N) state (a set). For large N, use HyperLogLog for approximate distinct count with 1–2% error at 1KB state.

Top-N items - the N most frequent items in the window. Use a CountMinSketch for approximate frequency counting at bounded memory.

from pyflink.datastream.functions import AggregateFunction

class VelocityAccumulator:
"""Running accumulator for velocity features within a window."""
def __init__(self):
self.count = 0
self.total_amount = 0.0
self.merchants = set() # For distinct count - fine for small windows


class VelocityAggregateFunction(AggregateFunction):
def create_accumulator(self):
return VelocityAccumulator()

def add(self, value, accumulator):
accumulator.count += 1
accumulator.total_amount += value.amount
accumulator.merchants.add(value.merchant_id)
return accumulator

def get_result(self, accumulator):
return {
"tx_count": accumulator.count,
"tx_sum": accumulator.total_amount,
"distinct_merchants": len(accumulator.merchants),
}

def merge(self, acc_a, acc_b):
acc_a.count += acc_b.count
acc_a.total_amount += acc_b.total_amount
acc_a.merchants |= acc_b.merchants
return acc_a

State Management at Scale

Flink state for a billion-user feature pipeline cannot fit in JVM heap. Use the RocksDB state backend, which stores state on disk (with an in-memory cache) and supports state sizes up to tens of terabytes per task manager.

Critical configuration for production:

# flink-conf.yaml equivalent via Python API

# RocksDB state backend
env.set_state_backend(EmbeddedRocksDBStateBackend(
use_managed_memory=True # Use Flink's managed memory for RocksDB
))

# State TTL - automatically expire user state after 25 hours
from pyflink.datastream.state import StateTtlConfig, TtlUpdateType, TtlStateVisibility, Time as TtlTime

ttl_config = (
StateTtlConfig.new_builder(TtlTime.hours(25))
.set_update_type(TtlUpdateType.OnCreateAndWrite)
.set_state_visibility(TtlStateVisibility.NeverReturnExpired)
.build()
)

State TTL is critical: without it, state accumulates indefinitely. A user who made 1,000 transactions over 3 years has 3 years of transaction state. With TTL set to 25 hours (the max window size + buffer), only the last 25 hours of events are kept.


Dual-Write: Online and Offline Simultaneously

The stream-to-feature pipeline must write to both the online store (Redis) and the offline store (Iceberg) in the same Flink job. This ensures both stores are computed from the same events by the same logic - eliminating a source of training-serving skew.

The naive approach - two separate jobs, one writing to Redis and one to Iceberg - introduces subtle inconsistencies. Events may arrive at different times in each job. Schema changes must be applied to both jobs simultaneously. Any bug in the feature logic must be fixed in both places.

The correct approach: a single Flink job with two sinks.

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink
from pyflink.formats import ParquetColumnarRowInputFormat

# After computing features, split into two sinks
computed_features = stream.process(SessionFeatureWindowFunction())

# Sink 1: Redis (online serving)
computed_features.map(RedisWriter("redis", 6379))

# Sink 2: Iceberg (offline training)
# In practice, use the Flink Iceberg connector
# Here we illustrate with a file sink as a stand-in
computed_features.map(IcebergWriter(
catalog="my_catalog",
database="feature_store",
table="session_features",
))

:::note Iceberg Connector The production Iceberg connector for Flink is provided by the iceberg-flink-runtime JAR from the Apache Iceberg project. It supports Flink's exactly-once semantics via Flink checkpoints, ACID writes, and automatic partition management. See iceberg.apache.org for the full connector configuration. :::


Schema Evolution with Confluent Schema Registry

As the business adds new event fields (e.g., adding device_type to click events), the pipeline must handle both old and new event schemas without downtime.

Confluent Schema Registry enforces compatibility rules:

  • BACKWARD: new schema can read old data (new fields must have defaults)
  • FORWARD: old schema can read new data (old consumers ignore new fields)
  • FULL: both backward and forward compatible
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

schema_registry_conf = {"url": "http://schema-registry:8081"}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Schema v1 - original
click_event_schema_v1 = """
{
"type": "record",
"name": "ClickEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "item_id", "type": "string"},
{"name": "timestamp", "type": "double"}
]
}
"""

# Schema v2 - added device_type with default (BACKWARD compatible)
click_event_schema_v2 = """
{
"type": "record",
"name": "ClickEvent",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "item_id", "type": "string"},
{"name": "timestamp", "type": "double"},
{"name": "device_type", "type": "string", "default": "unknown"}
]
}
"""

# Register v2 - Schema Registry validates BACKWARD compatibility
schema_registry_client.register_schema(
"click-events-value",
click_event_schema_v2
)

Without a schema registry, schema changes cause runtime failures: the consumer expects field X, the producer stopped sending it, the pipeline throws a deserialization exception. With schema registry, all changes are validated before being applied, and old consumers can read new events (BACKWARD compatibility) while the pipeline is upgraded.


Dead Letter Queue

Malformed events - missing required fields, incorrect types, timestamps in the future - must not block the pipeline. Route them to a dead letter queue (DLQ) Kafka topic for investigation.

from pyflink.datastream.functions import ProcessFunction
from pyflink.datastream import OutputTag

# Define a side output for malformed events
malformed_output_tag = OutputTag("malformed-events")


class ParseWithDLQ(ProcessFunction):
def process_element(self, raw: str, ctx):
try:
event = ClickEvent.from_json(raw)
# Validate
if event.timestamp > time.time() + 60:
raise ValueError(f"Future timestamp: {event.timestamp}")
if not event.user_id or not event.item_id:
raise ValueError("Missing required fields")
yield event
except Exception as e:
# Route to DLQ side output
ctx.output(
malformed_output_tag,
json.dumps({"raw": raw, "error": str(e), "ts": time.time()})
)


# In the main pipeline
parsed_stream = env_stream.process(ParseWithDLQ())
malformed_stream = parsed_stream.get_side_output(malformed_output_tag)

# Write DLQ events to a separate Kafka topic for investigation
malformed_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("click-events-dlq")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)

The DLQ serves two purposes: it keeps the main pipeline running (bad events don't block good ones), and it provides an audit trail for investigation. Alert when DLQ volume exceeds a threshold - a spike in malformed events often indicates a producer bug or schema change that wasn't registered.


Production Notes

Parallelism must match Kafka partitions: Flink reads from Kafka with one subtask per partition. If the Kafka topic has 16 partitions, set Flink parallelism to 16. Mismatched parallelism wastes resources or underutilizes the cluster.

Checkpoint interval determines recovery time: with 60-second checkpoints, a Flink job failure recovers from at most 60 seconds before the failure. Shorter checkpoint intervals reduce recovery time at the cost of more checkpoint overhead. For feature pipelines, 30–60 seconds is typical.

Monitor consumer lag: the gap between the latest Kafka offset and the offset the Flink job has processed. A growing lag means the pipeline is falling behind the event rate. Alert and autoscale before the lag becomes hours-long.

Flink's exactly-once mode for the offline sink: configure CheckpointingMode.EXACTLY_ONCE and use the Flink Iceberg connector's two-phase commit sink. This prevents duplicate rows in the offline store during recovery from failures.


Interview Q&A

Q: What is the difference between event time and processing time in stream processing, and why does it matter for feature engineering?

Event time is when the event occurred in the real world (recorded in the event payload). Processing time is when the event is processed by the stream processor. For feature engineering, event time is almost always correct - "items clicked in the last 30 minutes" means 30 real-world minutes, not 30 minutes of processing time. When the pipeline falls behind (consumer lag), processing-time windows include events from different real-world time periods, making the feature semantically incorrect. Event time requires watermarks to handle late arrivals but produces features that are semantically accurate regardless of pipeline lag.

Q: How do you prevent duplicate feature writes when a Flink job fails and recovers from a checkpoint?

Flink's checkpoint mechanism ensures at-least-once delivery by default: after recovery, the job replays from the last checkpoint and may reprocess events, causing duplicate writes. For the Redis online store, duplicate writes are idempotent - writing the same feature value twice produces the same result (HSET is idempotent). For the Iceberg offline store, use the Flink Iceberg connector with CheckpointingMode.EXACTLY_ONCE, which implements two-phase commit: Flink commits data to Iceberg only after the checkpoint is confirmed, ensuring exactly-once semantics for the offline sink.

Q: A stream-to-feature pipeline is falling behind (consumer lag is growing). What do you do?

First, diagnose: is the lag caused by a sudden event rate spike (temporary) or a sustained increase? Check Flink back-pressure metrics - if upstream operators are applying back-pressure, the bottleneck is downstream (Redis writes, Iceberg writes). If the source is the bottleneck, increase Kafka partitions and Flink parallelism. Second, scale the Flink job: increase the number of task managers. Flink rescales by redistributing state - this works seamlessly with the RocksDB backend. Third, optimize the sinks: batch Redis writes (HMSET multiple keys per pipeline call), tune Iceberg commit interval. Fourth, consider whether the pipeline needs to catch up (replay historical events) or just keep pace going forward - catching up requires provisioning extra capacity temporarily.

Q: How do you handle schema changes to Kafka events without downtime in a feature pipeline?

Use Confluent Schema Registry with BACKWARD compatibility: new schemas must add fields with defaults, never remove fields without a versioning process. The pipeline upgrades proceed as follows: (1) register the new schema version; (2) deploy updated producers to start sending new events with the new field; (3) the existing consumer (Flink job) reads new events and uses the default value for the new field until the job is upgraded; (4) upgrade the Flink job to consume the new field; (5) old events in Kafka are still readable by the new job because the default value fills the new field. No downtime required.

Q: Why is dual-write (Redis + Iceberg in the same Flink job) better than two separate pipelines?

With two separate pipelines, events may arrive at different offsets in each pipeline, causing the online store and offline store to diverge. A Kafka event at offset 1,234,567 may be processed by the Redis pipeline at time T and the Iceberg pipeline at time T+30s. If there's a failure between the two, the stores diverge permanently. Any bug in the feature logic must be fixed in two places. Schema changes must be coordinated across two jobs. A single Flink job with two sinks reads each event exactly once, computes the feature once, and writes to both stores atomically. Fault tolerance is provided by Flink's checkpoint mechanism, which coordinates both sinks.

Q: What is a dead letter queue in a streaming pipeline and when should you use it?

A dead letter queue (DLQ) is a separate Kafka topic (or storage location) that receives events that could not be processed successfully - malformed JSON, missing required fields, schema violations, out-of-range values. Instead of failing the pipeline or silently dropping events, the DLQ captures them for investigation and potential reprocessing after fixing. Use a DLQ whenever: the pipeline receives events from external producers (you don't control the schema), the pipeline must be resilient to upstream bugs, or audit requirements demand that no data be silently discarded. Alert when DLQ volume spikes - it's often the first signal of an upstream schema change or producer bug.

© 2026 EngineersOfAI. All rights reserved.