:::tip ๐ฎ Interactive Playground Visualize this concept: Try the Flink Stream Processing demo on the EngineersOfAI Playground - no code required. :::
Apache Flink Fundamentals
import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
Reading time: 40 minutes | Interview relevance: Very High | Target roles: Data Engineer, ML Engineer, Stream Processing Engineer
The Production Scenarioโ
The recommendation team at a large e-commerce platform - let's call it ShopCore - has a problem that cannot be solved with batch processing. Their model needs 50 real-time features per user per event: rolling 5-minute purchase totals, current session length, category affinity shifts over the last hour, price sensitivity indicators computed from the last 20 browsing events, and a real-time estimate of cart abandonment risk based on the sequence of interactions in the current session.
The first architecture they try is Spark Structured Streaming with 10-second micro-batches. On paper it works. In production, the 10-second minimum latency means the model is always scoring based on a user's state from 10 seconds ago. During a flash sale where user intent shifts rapidly - browsing โ cart โ purchase in 30 seconds - the recommendation system is perpetually behind. The model's category affinity features are stale by the time the serving layer receives them. Add-to-cart conversion drops 3.2% during the flash sale window.
The team rebuilds the feature pipeline on Apache Flink. True event-at-a-time processing means each user interaction updates all 50 features within 80ms of arrival. During the next flash sale, the recommendation model sees real-time user state. Add-to-cart conversion improves by 4.8% compared to the Spark baseline. More importantly, exactly-once state guarantees mean the rolling purchase totals are correct even after a TaskManager failure - no double-counted amounts, no missed events.
This lesson is a deep technical dive into Apache Flink: why it was built the way it was, how its runtime executes stateful computation at scale, and the specific APIs and configurations that matter for ML feature pipelines.
Why Flink: True Streaming vs Micro-Batchโ
The fundamental difference between Flink and Spark Streaming is architectural. Spark's streaming model is micro-batch: it accumulates events into small batches (minimum ~100ms, typically seconds), then executes a batch computation on each one. The programming model is elegant, but the latency floor is bounded by the batch interval. You cannot process an event the instant it arrives.
Flink's model is event-at-a-time streaming. The runtime maintains a continuously running operator graph. Each event flows through the graph immediately upon arrival. There are no micro-batches. Latency is bounded only by processing time - typically 10โ80ms end-to-end, depending on the complexity of the operator graph.
For ML features that require sub-second freshness (fraud signals, real-time personalization, live bidding), the latency difference is decisive. For features where minutes of lag is acceptable (daily active users, weekly purchase history), either model works fine.
The second critical difference is state. Flink's state management is first-class - the framework manages state persistence, fault tolerance, and scaling. In Spark, you can maintain state across micro-batches using mapGroupsWithState or flatMapGroupsWithState, but it is bolted on. In Flink, stateful operators are the fundamental primitive.
Historical Contextโ
Apache Flink originated as the Stratosphere research project at TU Berlin in 2009, led by Professor Volker Markl. The research focused on parallel data processing systems that could handle both batch and stream data within a unified programming model. The core insight - that batch processing is a special case of stream processing (a bounded stream) - was radical at the time, when Hadoop's batch paradigm dominated.
In 2014, Stratosphere was donated to the Apache Software Foundation and renamed Flink - the German word for "agile" or "nimble." The key figures in the transition: Stephan Ewen (CTO of Ververica, now Alibaba Cloud's Flink team lead), who led the technical design of the streaming engine; Kostas Tzoumas (Ververica CEO); and the broader TU Berlin group. Alibaba became the largest Flink contributor in 2016, running Flink at a scale of trillions of events per day on their Double 11 (Singles' Day) shopping events.
The name choice - "flink" meaning agile - reflects the design philosophy: a processing system that reacts immediately to events, rather than waiting for batches to accumulate.
Flink Runtime Architectureโ
JobManager - the coordinator. It translates the user's program into an execution graph (JobGraph โ ExecutionGraph), schedules tasks on TaskManagers, coordinates checkpoints, and handles failures. There is typically one active JobManager and one or more standby for high availability.
TaskManager - the workers. Each TaskManager has a fixed number of task slots. A task slot is a unit of resource: one thread. A slot can run one parallel slice of one operator. If you have 3 TaskManagers with 4 slots each, you have 12 total slots and can run a job with parallelism up to 12.
Task Slots - slots within the same TaskManager share JVM memory and GC. Slots across TaskManagers are completely isolated. One slot hosts one subtask (one parallel instance of an operator). Flink's slot sharing allows multiple operators from the same job to share a slot, reducing the total number of slots needed.
DataStream API vs Table API/SQLโ
Flink offers two primary APIs:
DataStream API - the low-level, procedural API. You operate on DataStream<T> objects, applying transformations one at a time. Full control over state, timers, side outputs, and watermarks. Required when you need ProcessFunction-level access to arbitrary state and timers.
Table API / Flink SQL - the high-level declarative API. You define tables over streams and write SQL queries. Flink compiles the SQL into an optimized execution plan. Best for standard aggregations, joins, and projections. Much easier to write and maintain, but less flexible for complex stateful logic.
For ML feature pipelines:
- Use Flink SQL for standard window aggregations (rolling sums, counts, averages)
- Use DataStream + ProcessFunction for complex state machines, model hot-reload, and custom timer logic
Core Flink Conceptsโ
DataStream and KeyedStreamโ
The DataStream<T> is the fundamental abstraction - an unbounded sequence of events of type T. The KeyedStream<T, K> is a DataStream partitioned by a key: all events with the same key are guaranteed to be processed by the same operator instance. This is the foundation of stateful processing - state is always per-key.
Transformationsโ
| Transformation | Description |
|---|---|
map(f) | One-to-one transformation. f: T โ R |
flatMap(f) | One-to-many. f: T โ Iterable[R] |
filter(f) | Keep events where f returns True |
keyBy(key_selector) | Partition stream by key - returns KeyedStream |
window(assigner) | Group KeyedStream events into windows |
process(ProcessFunction) | Full control: state, timers, side outputs |
ProcessFunctionโ
The ProcessFunction is the lowest-level, most powerful Flink API. It gives you:
- Access to arbitrary managed state (
ValueState,ListState,MapState) - Access to timers (event-time and processing-time)
- The ability to emit to side outputs
- Access to the watermark
If you can write it as a windowed aggregation, use the window API. If you need timer-based logic, custom state machines, or late event routing, use ProcessFunction.
State in Flinkโ
Flink's state is the distinguishing feature of the framework. State is managed by the framework - not by the user in external databases. This means Flink handles persistence, fault tolerance, and scaling automatically.
State Primitivesโ
| State Type | Description | ML Use Case |
|---|---|---|
ValueState<T> | Single value per key | Last seen event, running total, current session start time |
ListState<T> | List of values per key | Last N events for sequence features |
MapState<K,V> | Map per key | Category-level counts per user |
ReducingState<T> | Aggregation with ReduceFunction | Running sum, min, max |
AggregatingState<IN,OUT> | Aggregation with different input/output types | Count with type conversion |
State Backendsโ
The state backend determines where state is stored.
HashMapStateBackend (formerly MemoryStateBackend for small state): stores all state as Java objects on the JVM heap. Fast access (no serialization for reads). But: bounded by heap size, GC pressure with large state, not suitable for millions of keys with large values.
RocksDBStateBackend: stores all state in embedded RocksDB instances on the TaskManager's local disk. State is serialized/deserialized on every access (adds ~1โ2ms). But: can hold state far exceeding JVM heap - terabytes of state per TaskManager, millions of concurrent keys. The only practical choice for large-scale ML feature computation.
State TTLโ
State TTL allows Flink to automatically expire state entries that haven't been updated within a specified time window. Without TTL, state for users who haven't interacted with the system in months continues to consume memory/disk space indefinitely.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
from pyflink.common import Duration, Types, WatermarkStrategy
import json
# --- PyFlink DataStream: Kafka โ keyBy โ tumbling 5-min window โ feature sum ---
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Add the Flink Kafka connector jar (required for PyFlink)
env.add_jars("file:///opt/flink/lib/flink-connector-kafka-1.17.0.jar")
# Source: Kafka topic with user purchase events
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("raw.purchases")
.set_group_id("flink-feature-pipeline")
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
# Watermark strategy: allow 5 seconds of out-of-order events
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(
# Extract event_time_ms from the JSON payload
lambda event, _: json.loads(event).get("event_time_ms", 0)
)
)
ds = env.from_source(kafka_source, watermark_strategy, "Kafka Source")
# Parse JSON events
def parse_event(raw: str) -> tuple:
event = json.loads(raw)
return (
event["user_id"],
event["amount_usd"],
event["category_id"],
event["event_time_ms"],
)
parsed = ds.map(parse_event, output_type=Types.TUPLE([
Types.STRING(), Types.DOUBLE(), Types.INT(), Types.LONG()
]))
# KeyBy user_id - all events for a user go to the same operator instance
keyed = parsed.key_by(lambda x: x[0])
# Tumbling 5-minute window, compute total spend per user per window
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
result = (
keyed
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce(
# reduce: sum amounts, keep latest user_id, category_id, timestamp
lambda a, b: (a[0], a[1] + b[1], a[2], max(a[3], b[3]))
)
)
# Write feature to Kafka output
from pyflink.datastream.connectors.kafka import KafkaSink, KafkaRecordSerializationSchema
kafka_sink = (
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("features.user_5min_spend")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
result.map(
lambda x: json.dumps({
"user_id": x[0],
"spend_5min_usd": x[1],
"category_id": x[2],
"window_end_ts": x[3],
})
).sink_to(kafka_sink)
env.execute("user-5min-spend-feature")
ProcessFunction with ValueState: Stateful Session Trackerโ
The ProcessFunction gives you timer access and full state control. This example tracks user sessions - essential for session-length and session-event-count features.
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
from pyflink.datastream.timerservice import TimerService
from pyflink.common import Duration, Types
from pyflink.util.java_utils import get_j_env_configuration
import json
class UserSessionTracker(KeyedProcessFunction):
"""
Tracks user sessions: emits a session feature record when a session ends
(30-minute inactivity timeout).
State:
- session_start_ts: timestamp of first event in current session
- event_count: number of events in current session
- last_event_ts: timestamp of most recent event (for timeout detection)
"""
def open(self, runtime_context):
# State TTL: expire all session state after 24 hours of inactivity
ttl_config = (
StateTtlConfig
.new_builder(Duration.of_hours(24))
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
)
session_start_desc = ValueStateDescriptor("session_start_ts", Types.LONG())
session_start_desc.enable_time_to_live(ttl_config)
self.session_start_ts = runtime_context.get_state(session_start_desc)
event_count_desc = ValueStateDescriptor("event_count", Types.INT())
event_count_desc.enable_time_to_live(ttl_config)
self.event_count = runtime_context.get_state(event_count_desc)
last_event_desc = ValueStateDescriptor("last_event_ts", Types.LONG())
last_event_desc.enable_time_to_live(ttl_config)
self.last_event_ts = runtime_context.get_state(last_event_desc)
def process_element(self, event, ctx: KeyedProcessFunction.Context, out):
user_id, _, _, event_ts = event
current_start = self.session_start_ts.value()
current_count = self.event_count.value() or 0
last_ts = self.last_event_ts.value()
SESSION_TIMEOUT_MS = 30 * 60 * 1000 # 30 minutes
if current_start is None:
# New session
self.session_start_ts.update(event_ts)
self.event_count.update(1)
else:
# Check for session gap (event_ts - last_event_ts > 30 min)
if last_ts and (event_ts - last_ts) > SESSION_TIMEOUT_MS:
# Emit completed session feature
session_duration_s = (last_ts - current_start) / 1000.0
out.collect({
"user_id": user_id,
"session_start_ts": current_start,
"session_end_ts": last_ts,
"session_duration_s": session_duration_s,
"session_event_count": current_count,
})
# Reset session
self.session_start_ts.update(event_ts)
self.event_count.update(1)
else:
self.event_count.update(current_count + 1)
self.last_event_ts.update(event_ts)
# Register a processing-time timer to emit the session after 30 min of silence
ctx.timer_service().register_processing_time_timer(
ctx.timer_service().current_processing_time() + SESSION_TIMEOUT_MS
)
def on_timer(self, timestamp: int, ctx: KeyedProcessFunction.OnTimerContext, out):
"""Called when the 30-minute inactivity timer fires."""
current_start = self.session_start_ts.value()
current_count = self.event_count.value() or 0
last_ts = self.last_event_ts.value()
if current_start and last_ts:
session_duration_s = (last_ts - current_start) / 1000.0
out.collect({
"user_id": ctx.get_current_key(),
"session_start_ts": current_start,
"session_end_ts": last_ts,
"session_duration_s": session_duration_s,
"session_event_count": current_count,
"session_closed_by": "timeout",
})
# Clear state - session is over
self.session_start_ts.clear()
self.event_count.clear()
self.last_event_ts.clear()
RocksDBStateBackend for Large Stateโ
For ML feature pipelines with millions of concurrent users, the HashMapStateBackend (heap-based) cannot hold enough state without running out of JVM heap. RocksDBStateBackend stores state on local disk with a write-ahead log for durability.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state_backend import EmbeddedRocksDBStateBackend, CheckpointStorage
env = StreamExecutionEnvironment.get_execution_environment()
# Switch to RocksDB state backend
# incremental=True: only checkpoint state that changed since last checkpoint
# This dramatically reduces checkpoint size for large state
rocksdb_backend = EmbeddedRocksDBStateBackend(incremental=True)
env.set_state_backend(rocksdb_backend)
# Checkpoint storage: where checkpoints land (S3 for production)
env.get_checkpoint_config().set_checkpoint_storage(
"s3://my-flink-checkpoints/credit-risk-feature-pipeline/"
)
# Checkpoint every 60 seconds
env.enable_checkpointing(60_000)
# Tolerate 1 consecutive checkpoint failure before declaring job failure
env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(1)
# Minimum time between checkpoint start (don't overlap checkpoints)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)
# Maximum concurrent checkpoints (1 = sequential, safer for RocksDB)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
For RocksDB with large state (tens of GB per TaskManager), tune the following JVM options in flink-conf.yaml:
taskmanager.memory.managed.fraction: 0.4 # Give 40% of TM memory to RocksDB
state.backend.rocksdb.block.cache-size: 256mb
state.backend.rocksdb.writebuffer.size: 64mb
state.backend.rocksdb.writebuffer.count: 2
The managed memory pool is used by RocksDB's block cache and write buffers. Increase it if you see RocksDB spilling to disk frequently (watch the rocksdb.block-cache-miss-rate metric).
State TTL: Automatic Expirationโ
Without TTL, every user who ever touched your system accumulates state forever. With 10 million users and 1KB of state per user, that's 10GB of RocksDB state that grows indefinitely. State TTL allows Flink to automatically delete state entries that haven't been accessed or updated within a specified duration.
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig
from pyflink.common import Duration, Types
def configure_state_with_ttl(runtime_context, ttl_hours: int = 24):
"""
Configure ValueState with TTL - state expires after `ttl_hours` of no updates.
This is essential for ML feature pipelines with millions of concurrent users.
"""
ttl_config = (
StateTtlConfig
.new_builder(Duration.of_hours(ttl_hours))
# Reset TTL on every write - 24h from last update, not from creation
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
# Never return expired state (treat as if it doesn't exist)
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
# Compact expired state in background (no latency impact)
.cleanup_in_rocksdb_compact_filter(1000)
.build()
)
desc = ValueStateDescriptor("user_purchase_history", Types.LIST(Types.STRING()))
desc.enable_time_to_live(ttl_config)
return runtime_context.get_state(desc)
Windows in Flinkโ
Windows are the mechanism for grouping events over time for aggregate computation. Flink's window model is more powerful than most streaming frameworks because it separates the window assigner (which events go into which window) from the window function (what to compute on the window contents).
Window Typesโ
Tumbling Event-Time Windows - non-overlapping, fixed-size windows. Every event belongs to exactly one window. The standard for computing "purchases in the last 5 minutes" features.
Sliding Event-Time Windows - overlapping windows. An event belongs to multiple windows simultaneously. Used for features like "purchases in any 5-minute window within the last hour" - more computationally expensive because each event is processed by floor(slide / size) windows.
Session Windows - variable-size windows that close after a gap of inactivity. Natural for user session features. No fixed size - the window ends when no events arrive for a specified gap duration.
Late Element Handlingโ
In a distributed system, events arrive out of order. A transaction processed in Australia may be delayed by network issues and arrive at the Flink cluster 30 seconds after events from the same time window. Without handling, these late events are silently dropped when their window has already been computed.
from pyflink.datastream.window import TumblingEventTimeWindows, SideOutputContext
from pyflink.common.time import Time
from pyflink.datastream import OutputTag
import json
# Define a side output tag for late fraud signals
late_fraud_tag = OutputTag("late-fraud-signals", Types.STRING())
# Window with late element handling
windowed = (
keyed
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
# Allow up to 30 seconds of lateness - late elements update the window result
.allowed_lateness(Time.seconds(30))
# Route elements arriving after even the allowed lateness to side output
.side_output_late_data(late_fraud_tag)
.reduce(lambda a, b: (a[0], a[1] + b[1], a[2], max(a[3], b[3])))
)
# Main stream: on-time window results โ feature topic
windowed.map(
lambda x: json.dumps({"user_id": x[0], "spend_5min": x[1]})
).sink_to(kafka_sink)
# Side output: very late events โ investigation topic
windowed.get_side_output(late_fraud_tag).sink_to(late_events_sink)
Watermarksโ
Watermarks are the mechanism Flink uses to reason about event time in a distributed system. A watermark at a given point in the stream asserts: "all events with timestamp have already arrived." When Flink's watermark advances past the end of a window, that window is closed and its result emitted.
The key equation is:
For WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), the watermark lags the maximum observed event timestamp by 5 seconds. This means Flink is prepared to wait up to 5 seconds for out-of-order events before closing a window.
Watermark too tight (0 seconds): any out-of-order event is late. High accuracy only for perfectly ordered streams. Causes excessive late event routing.
Watermark too loose (60 seconds): allows 60 seconds of disorder. Low late event rate, but every window result is delayed by 60 seconds. The tail latency of your feature pipeline is at least 60 seconds.
For ML feature pipelines, 5โ15 seconds of allowed latency is the standard starting point. Monitor the watermark.lag metric to tune this.
Flink SQL on Streamsโ
Flink SQL allows you to write standard SQL against a continuous stream. The SQL compiler translates GROUP BY with TUMBLE/HOP/SESSION window functions into Flink's streaming execution plan.
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Define Kafka source as a SQL table
t_env.execute_sql("""
CREATE TABLE raw_transactions (
transaction_id STRING,
user_id STRING,
amount_usd DOUBLE,
category_id INT,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'raw.transactions',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-features',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""")
# Define feature output table
t_env.execute_sql("""
CREATE TABLE features_user_spend (
user_id STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
total_spend_usd DOUBLE,
tx_count BIGINT,
PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'features.user_5min_spend',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Streaming SQL: 5-minute tumbling window aggregation
# This is compiled to Flink's native window operator - same performance as DataStream API
t_env.execute_sql("""
INSERT INTO features_user_spend
SELECT
user_id,
window_start,
window_end,
SUM(amount_usd) AS total_spend_usd,
COUNT(*) AS tx_count
FROM TABLE(
TUMBLE(TABLE raw_transactions, DESCRIPTOR(event_time), INTERVAL '5' MINUTE)
)
GROUP BY user_id, window_start, window_end
""")
Flink SQL's TUMBLE, HOP, and SESSION window table functions are the streaming equivalents of SQL GROUP BY with time-based intervals. TUMBLE creates non-overlapping windows. HOP creates sliding windows. SESSION creates gap-based windows. All three are compiled to Flink's native streaming operators - the SQL abstraction has no performance penalty.
Checkpoints and Savepointsโ
Flink's fault tolerance is built on the Chandy-Lamport distributed snapshot algorithm. Periodically, the JobManager injects a special marker (a "checkpoint barrier") into every source stream. Operators process their regular events until they receive a barrier on every input. When all barriers arrive at an operator, it snapshots its state and passes the barrier downstream. When the sink confirms the barrier, the checkpoint is complete.
Checkpoints - automatic, for fault tolerance only. If a TaskManager crashes, Flink restarts the job from the last successful checkpoint. Checkpoints are managed by Flink and deleted automatically when superseded by newer checkpoints.
Savepoints - manual snapshots taken before planned maintenance. Unlike checkpoints, savepoints are retained indefinitely and survive job restarts. Use savepoints when:
- Upgrading Flink version
- Changing job parallelism (scaling)
- Deploying a new feature pipeline version that must continue from the previous state
- A/B testing two feature computation approaches on the same state
# Take a savepoint before a planned upgrade
flink savepoint <job-id> s3://flink-savepoints/feature-pipeline-v2/
# Restore from savepoint after the upgrade
flink run -s s3://flink-savepoints/feature-pipeline-v2/<savepoint-dir> \
-c com.shopcore.features.UserSpendPipeline \
feature-pipeline-v3.jar
The key difference in the state serialization format: checkpoints may use Flink's internal format and may not be compatible across Flink versions. Savepoints always use a stable, version-independent format.
Production Notesโ
Parallelism tuning:
Start with parallelism equal to the number of Kafka partitions on your source topic. If your source has 12 partitions, set env.set_parallelism(12). Each parallel Flink source instance reads from one Kafka partition. Downstream operators can have higher parallelism if CPU-bound.
Memory configuration:
# flink-conf.yaml for a 16GB TaskManager
taskmanager.memory.process.size: 16g
taskmanager.memory.managed.fraction: 0.4 # 40% for RocksDB
taskmanager.memory.network.fraction: 0.1 # 10% for network buffers
# Remaining ~50% is JVM heap + metaspace
Backpressure: If downstream operators are slower than upstream (e.g., the feature store write is slower than event arrival rate), Flink applies backpressure - upstream operators slow down to match the slowest downstream. Monitor the Flink Web UI's backpressure indicators. If an operator is persistently backpressured, it needs more parallelism or optimization.
Restart strategy:
from pyflink.datastream import RestartStrategies
# Fixed delay restart: 3 attempts, 30 seconds between retries
env.set_restart_strategy(
RestartStrategies.fixed_delay_restart(
restart_attempts=3,
delay_between_attempts=30_000 # ms
)
)
Common Mistakesโ
Mistake: Using HashMapStateBackend for large-scale user state
If you have millions of users with non-trivial per-user state (session history, rolling counts, ListState of recent events), the default heap-based state backend will OOM your TaskManagers. Switch to EmbeddedRocksDBStateBackend for any production ML feature pipeline with more than a few hundred thousand concurrent keys.
Mistake: Not configuring State TTL State without TTL grows forever. For user-keyed state in a feature pipeline, this means every user who ever used the platform (including churned users from years ago) continues to consume RocksDB space. Configure TTL based on your feature's temporal window - a 7-day rolling feature needs at most 7 days of TTL on its state.
Mistake: Confusing checkpoint failure with job failure
By default, a single checkpoint failure causes the job to fail. In production, transient S3 issues or slow GC pauses can cause occasional checkpoint timeouts. Set tolerable-checkpoint-failure-number: 2 to tolerate transient failures without job restarts.
Mistake: Setting allowed lateness too high without understanding the memory cost
Every window with allowedLateness(Duration) must buffer its state until the allowed lateness window closes. For a 5-minute tumbling window with 60 seconds of allowed lateness, each window lives for 6 minutes (5 + 1) before state is released. With millions of keys and large windows, this multiplies state size significantly.
Interview Q&Aโ
What is the difference between Flink and Spark Streaming?โ
The fundamental difference is the processing model. Spark Streaming (Structured Streaming) uses micro-batches: events are accumulated into small batches (minimum ~100ms in practice, often seconds) and processed as a batch. Latency is bounded below by the batch interval. Flink uses true event-at-a-time streaming: the operator graph runs continuously and each event is processed immediately upon arrival. Latency is bounded only by processing time - typically 10โ80ms.
The second major difference is state management. Flink's state is a first-class primitive: the framework manages state persistence, TTL, fault tolerance, and scaling. Spark's stateful streaming (mapGroupsWithState) is more limited and harder to reason about. For ML feature pipelines requiring sub-second feature freshness or complex per-user state machines, Flink is the right choice. For standard aggregations where seconds of latency is acceptable, Spark Structured Streaming is simpler to operate.
What is KeyedStream and why is it fundamental to stateful processing?โ
A KeyedStream<T, K> is a DataStream partitioned by a key function - all events with the same key (e.g., the same user_id) are guaranteed to be processed by the same operator instance. This is fundamental to stateful processing because Flink's state is per-key, per-operator. When an operator maintains ValueState<Double> for "user's rolling 5-minute spend," that state is scoped to one user. For this to be correct, all events for that user must arrive at the same operator instance - which is what keyBy guarantees via hash-partitioning.
Without keyBy, you can only maintain global state (not per-key), and you lose the ability to compute per-entity features. Every meaningful ML feature computation in Flink starts with keyBy.
What is the difference between a checkpoint and a savepoint?โ
Both capture a consistent snapshot of the Flink job's state. The differences are in purpose, lifecycle, and format.
Checkpoints are automatic, periodic snapshots taken for fault tolerance. They are managed by Flink, may be deleted when superseded, and are not guaranteed to be compatible across Flink versions. If the job fails, it restarts from the last successful checkpoint. The application developer does not interact with checkpoints directly.
Savepoints are manual snapshots taken by the operator (via CLI or REST API) before planned maintenance. They are retained indefinitely until explicitly deleted. They use a stable, version-independent format. Use savepoints when upgrading the Flink version, changing job parallelism, deploying a new code version that must resume from existing state, or running A/B tests between feature pipeline versions on the same state.
How does Flink achieve exactly-once semantics?โ
Flink's exactly-once guarantee uses the Chandy-Lamport distributed snapshot algorithm. The JobManager periodically injects checkpoint barriers into every source stream. Barriers flow through the operator graph with the event data. When an operator receives a barrier on all its inputs, it snapshots its state to durable storage (S3/HDFS) and forwards the barrier downstream. The checkpoint is complete when all operators (including sinks) have acknowledged the barrier.
For end-to-end exactly-once (including the sink), the sink must be a transactional system. Flink's Kafka sink implements a two-phase commit: during a checkpoint, it pre-commits messages to Kafka but keeps them in an uncommitted transaction. When the checkpoint completes successfully, it commits the transaction. If the job fails and restarts from the checkpoint, any uncommitted transactions are aborted - no duplicates are written. Consumers with isolation.level=read_committed only see committed messages.
What is RocksDBStateBackend and when should you use it over HashMapStateBackend?โ
RocksDBStateBackend stores Flink state in embedded RocksDB instances on the TaskManager's local disk, with a write-ahead log for crash safety. State is serialized to bytes on every write and deserialized on every read, adding approximately 1โ2ms overhead per state access compared to heap-based state.
Use RocksDBStateBackend when:
- Your state per TaskManager exceeds available JVM heap (typically 4โ8 GB)
- You have millions of concurrent keys (e.g., per-user state for a large platform)
- You use incremental checkpoints (only changed state is uploaded, dramatically reducing checkpoint time for large state)
Use HashMapStateBackend when:
- State is small (tens of MB per TaskManager)
- You need sub-millisecond state access latency and cannot afford the RocksDB serialization overhead
- Development or testing environments where simplicity matters
For production ML feature pipelines at any meaningful scale, RocksDBStateBackend with incremental checkpoints is the standard.
How do you handle late events in Flink?โ
Flink provides three mechanisms for late event handling, typically used together:
1. Watermark slack (forBoundedOutOfOrderness): the watermark lags the maximum observed timestamp by a configured duration. Events arriving within this slack are treated as on-time. Set this to the 99th percentile of observed event delay in your stream.
2. allowedLateness: even after a window's watermark deadline passes, the window is not immediately discarded. With allowedLateness(Duration), late events can still arrive and update the window result for the specified duration. Each late event triggers a corrected window result emission. This is useful for fraud signals that arrive slightly late - you want to update the feature, even if the initial window result has already been emitted.
3. Side outputs: events arriving after even the allowed lateness window are routed to a side output stream (instead of silently dropped). You can sink this side output to an investigation topic, a DLQ, or a reprocessing pipeline. In fraud detection, a very late fraud signal (arrived 10 minutes late) may still be valuable for a retraining dataset even if it cannot update the real-time feature.
What is State TTL and why is it important for ML feature pipelines?โ
State TTL is a Flink mechanism that automatically expires state entries that haven't been accessed or updated within a specified duration. When a state entry's TTL expires, Flink treats it as non-existent and eventually removes it from storage (either lazily on access or in RocksDB compaction).
For ML feature pipelines, TTL solves the state growth problem. Consider a platform with 50 million registered users, 2 million of whom are active daily. Without TTL, the Flink job accumulates state for all 50 million users indefinitely. With TTL configured to 30 days (matching the feature's temporal window), only the state for users active in the last 30 days is retained - roughly 5โ10 million users instead of 50 million. This reduces RocksDB storage, checkpoint size, and checkpoint duration proportionally.
Configure TTL to match the temporal window of your feature. A 7-day rolling purchase count needs at most 7 days of TTL. A session feature (30-minute timeout) needs 30 minutes of TTL after the last event. Setting TTL too short means you lose state for users who have been inactive for a legitimate reason (vacation, business trip) and incorrectly treat their return event as a fresh start.
Flink Metrics and Observability for ML Pipelinesโ
A Flink job that runs but produces wrong features is worse than a job that fails loudly. These are the metrics that matter most for ML feature pipelines.
Key Metrics to Monitorโ
| Metric | What it Measures | Alert Threshold |
|---|---|---|
records-lag-max (Kafka source) | Max consumer lag across all partitions | Greater than your feature freshness budget |
currentInputWatermark | Current event-time watermark per operator | Stalled watermark โ stream has stopped |
numRecordsInPerSecond | Events processed per operator per second | Drop below baseline โ backpressure or failure |
numLateRecordsDropped | Late events dropped per window operator | Greater than 0.1% of total โ watermark too tight |
rocksdb.block-cache-miss-rate | RocksDB cache miss rate | Greater than 20% โ need more managed memory |
lastCheckpointDuration | Time to complete last checkpoint (ms) | Greater than checkpoint interval โ checkpoint lag |
numberOfFailedCheckpoints | Count of failed checkpoints | Greater than 0 โ investigate immediately |
backPressuredTimeMsPerSecond | Milliseconds per second an operator is backpressured | Greater than 100ms/s โ add parallelism |
Watermark Monitoring: The Most Important Flink Metricโ
A stalled watermark is the silent killer of Flink streaming pipelines. If the watermark stops advancing, no windows ever close, output stops, and state grows unboundedly. The watermark stalls when:
- One Kafka partition receives no events - Flink's watermark is the minimum across all source partitions
- An event with an extremely old timestamp arrives and the idle source timeout is not set
- The source consumer is paused due to consumer group rebalancing
Fix for stalled watermarks from idle partitions:
from pyflink.common import Duration
# Idle source timeout: if a source partition emits no events for 30 seconds,
# treat it as idle and stop waiting for its watermark contribution
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_idleness(Duration.of_seconds(30)) # Critical for ML pipelines with uneven traffic
.with_timestamp_assigner(
lambda event, _: json.loads(event).get("event_time_ms", 0)
)
)
Without withIdleness(), one quiet Kafka partition can stall the entire Flink job's watermark. This is a common production gotcha in ML pipelines that partition by user segment - some segments may have zero traffic for minutes at a time.
Flink at ML Scale: ShopCore Numbersโ
To make the concepts concrete, here are the actual numbers for a production-scale ML feature pipeline:
ShopCore recommendation system:
- Event rate: 80,000 user interaction events/second at peak (Double 11-style sale)
- Features per event: 50 real-time features
- Unique active users: 2.5 million concurrent sessions
- State per user: ~4KB (session history, rolling counts, category affinities)
- Total state: ~10GB per TaskManager with RocksDB
- Checkpoint interval: 60 seconds
- Checkpoint duration: 8โ12 seconds (incremental RocksDB checkpoint)
- P99 end-to-end latency: 85ms (Kafka source to Kafka feature topic)
- Flink cluster: 24 TaskManagers, 4 slots each = 96 slots total
- Parallelism: 48 (source partitions: 48)
- RocksDB state backend:
incremental=True, 40% managed memory fraction
Key sizing formula for ML Flink pipelines:
For 10GB state, 24 TMs, 40% managed memory fraction:
Each TaskManager needs ~5GB heap. 24 TMs = ~120GB cluster-wide RAM for the feature pipeline alone.
Flink Job Graph: End-to-End Feature Pipelineโ
This job graph has six operator layers at parallelism 48. Each layer runs on a separate set of task slots. The checkpoint barrier flows from sources through every operator layer to the sinks. The state backend serializes and uploads only the changed RocksDB SST files on each incremental checkpoint. With 10GB total state and typical 2โ5% change rate per 60-second interval, each incremental checkpoint uploads roughly 200โ500MB to S3 - completing in 8โ12 seconds, well within the 60-second checkpoint interval.
This architecture is the foundation of real-time ML feature computation at scale. Every concept in this lesson - KeyedStream, ProcessFunction, RocksDBStateBackend, watermarks, State TTL, checkpoints, Flink SQL - is present in this one job.
Choosing Between Flink APIs: A Decision Guideโ
One of the most common questions on ML engineering teams switching to Flink is: when do I use the DataStream API vs Flink SQL? The answer comes down to complexity and flexibility requirements.
Use Flink SQL when:
- The computation is expressible as a SELECT with GROUP BY and a window function
- The team has SQL experience and the logic is stable (not changing frequently)
- You need rapid prototyping or non-engineers need to understand the logic
- Standard aggregations: SUM, COUNT, AVG, MIN, MAX over tumbling/sliding/session windows
Use DataStream API + ProcessFunction when:
- You need per-event timer logic (session timeouts, delayed joins with custom logic)
- You need access to multiple state types simultaneously (ValueState + ListState + MapState)
- You need side outputs (routing events to DLQ, late event handling)
- You are implementing a state machine (e.g., order lifecycle tracking for logistics ML)
- You need to load and execute external artifacts (ONNX models, custom serializers)
- Custom watermark generation based on domain-specific logic
Use Table API (programmatic SQL) when:
- You want the expressiveness of SQL but need to generate queries programmatically
- The schema is dynamic or the query structure is templated at runtime
- You are building a meta-layer that generates Flink SQL from a configuration spec (e.g., a feature platform that compiles feature definitions to Flink SQL)
In practice, most production ML feature pipelines use both: Flink SQL for the standard windowed aggregations (rolling counts, rolling sums, rolling averages) and DataStream API for the complex state machines (session tracking, model hot-reload, delayed label joins). The two APIs can be mixed in the same Flink job - a Table result can be converted to a DataStream and vice versa.
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Step 1: Use Flink SQL for the simple window aggregations
t_env.execute_sql("""
CREATE VIEW user_5min_spend AS
SELECT user_id, SUM(amount_usd) AS total_spend, COUNT(*) AS tx_count,
window_start, window_end
FROM TABLE(TUMBLE(TABLE raw_transactions, DESCRIPTOR(event_time), INTERVAL '5' MINUTE))
GROUP BY user_id, window_start, window_end
""")
# Step 2: Convert the Table result back to DataStream for ProcessFunction logic
from pyflink.table import DataTypes
spend_stream = t_env.to_append_stream(
t_env.from_path("user_5min_spend"),
type_info=Types.ROW([
Types.STRING(), Types.DOUBLE(), Types.LONG(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()
])
)
# Step 3: Apply complex stateful logic via DataStream ProcessFunction
enriched = (
spend_stream
.key_by(lambda row: row[0]) # key by user_id
.process(UserSessionTracker())
)
This hybrid pattern - SQL for aggregations, DataStream for state machines - is how mature ML platform teams structure their Flink feature pipelines. It gives the readability and maintainability of SQL where SQL is appropriate, and the full power of the DataStream API where it is not.
