Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Pipeline Viz demo on the EngineersOfAI Playground - no code required. :::

Streaming Pipeline Reliability for ML Systems

Reading time: ~28 minutes | Interview relevance: Very High | Target roles: ML Engineer, Data Engineer, AI Platform Engineer, MLOps Engineer


The 3 AM Page

It is 3:17 AM. The on-call ML engineer at a credit card company has been asleep for two hours when PagerDuty goes off. The alert: "fraud_feature_lag CRITICAL - consumer lag 2.1M events." She opens her laptop and the picture is not good.

The Flink job that computes fraud features from the payments Kafka topic has restarted four times in the last hour. Each restart takes 4–7 minutes to restore from the last checkpoint, and during that window the online feature store receives no updates. The fraud model is now running on features that are 3 hours and 20 minutes old. Transactions are being scored with yesterday's risk signals. Fraudsters are slipping through. The Slack channel for the fraud team is waking up as manual review queues spike.

The root cause, discovered at 4 AM after log triage: a new merchant data provider started sending payment records with a field that previously was an integer but is now a float. The Flink job's deserialization code throws a ClassCastException on every affected record. The job retries, hits the same record, throws again, crashes. Kafka advances no offsets because the job never successfully commits. Lag grows by 50,000 events every minute. The DLQ is empty - nobody implemented one.

This incident is preventable. Every failure mode it exhibits - crash-on-bad-record, no dead letter queue, no schema compatibility enforcement, lag that grew invisibly until it was catastrophic - has a known solution. This lesson covers all of them. By the end you will know exactly how to build streaming ML pipelines that handle failures gracefully, monitor themselves continuously, and recover quickly when something does go wrong.

Production streaming pipelines fail differently from batch jobs. A batch job fails loudly and completely - the run shows a red status, the alert fires, you know immediately. A streaming pipeline can fail silently: it continues running, consuming events, emitting output, while the output is subtly wrong. Consumer lag grows slowly for hours before anyone notices. The feature store fills with stale values. The model's predictions drift without any explicit error signal. Reliability in streaming means detecting and handling both loud failures and silent degradations.

This is the hardest part of building ML infrastructure at scale. The tools are mature. The patterns are known. The cost of not using them is a 3 AM page and a fraud model running blind.


Why This Exists - Batch vs Streaming Failure Modes

In batch processing, failure is discrete. A Spark job either succeeds and writes all its output, or it fails and writes nothing (assuming atomic writes). You can re-run the job. The retry is identical to the first run. Idempotency is easy - overwrite the output partition.

In streaming processing, failure is continuous and partial. Events arrive constantly. Some succeed. Some fail. A crash mid-processing means some events were processed and their effects written to the feature store, others were not. When the job restarts, where does it begin? Which writes to the feature store need to be re-applied, and which are already correct? How do you ensure that resuming from an offset does not double-count events that were processed before the crash but after the last committed offset?

These problems require specific mechanisms: checkpointing for fault-tolerant state, exactly-once semantics for idempotent writes, dead letter queues for events that cannot be processed, and consumer lag monitoring to detect when processing falls behind.


Failure Modes in Streaming ML Pipelines

Understanding the failure taxonomy is the first step to preventing each class.

1. Consumer Lag Explosion

The pipeline falls behind - events arrive faster than they are processed. Lag accumulates. Feature freshness SLAs are violated. The model runs on increasingly stale features.

Root causes: sudden traffic spike, slow downstream (Redis overloaded), CPU-intensive processing (model inference on every event), network partition to the feature store, insufficient parallelism.

Detection: monitor consumer group lag via kafka-consumer-groups.sh --describe. Alert when lag exceeds a threshold (e.g., 100,000 events for a pipeline with SLA of 30 seconds).

2. State Corruption

A bad checkpoint, a RocksDB I/O error, or an incompatible state schema after a code change causes the state store to become inconsistent. Aggregated features contain wrong values. Counts are wrong. Sums are inflated.

Detection: hard to detect directly. Symptom: feature values jump discontinuously after a job restart. Prevention: validate state structure on startup, use incremental checkpoints, test state schema migrations before deployment.

3. Serialization Errors

A Kafka message cannot be deserialized - wrong schema, missing field, unexpected type. If unhandled, this causes an exception on every attempt to process the record. The job retries endlessly, lag grows, nothing is processed.

The fix: catch all deserialization exceptions, route the offending record to a Dead Letter Queue, advance the offset, continue processing. Never let a single bad record stop the entire pipeline.

4. Back-Pressure Cascade

A slow downstream (Redis timeouts, feature store write failures) causes the Flink operator writing to Redis to slow down. Flink's back-pressure mechanism propagates upstream - the preceding operators slow down to match. Eventually the source (Kafka consumer) slows to a crawl. Lag grows.

Detection: Flink's web UI shows back-pressure as a red/yellow bar on operators. Flink metric: backPressuredTimeMsPerSecond.

5. Data Skew

One Kafka partition receives most of the high-value events (the hot key problem). One Flink task slot processes 90% of the work while others idle. The bottleneck task drives latency and lag for the entire job.

Detection: Flink metric numRecordsInPerSecond per task - if one task is 10x higher than others, you have skew. Solution: use a sub-key or random suffix to redistribute load.

6. Silent Data Loss

At-most-once processing drops events during failure recovery. If the consumer commits offsets before processing is complete - or if the job crashes between processing and offset commit - events are silently lost. No error. No log entry. Just missing features.

Prevention: use at-least-once or exactly-once semantics. Never commit Kafka offsets until processing and all downstream writes are complete.


Checkpointing is Flink's primary mechanism for fault tolerance. A checkpoint is a consistent snapshot of all operator state at a specific point in time. If the job fails, it restarts from the last successful checkpoint and resumes processing from the corresponding Kafka offsets.

Production Checkpoint Configuration

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import CheckpointingMode
from pyflink.datastream.checkpoint_config import ExternalizedCheckpointCleanup
from pyflink.common import Duration

env = StreamExecutionEnvironment.get_execution_environment()

# --- Checkpoint interval ---
# 1-5 minutes is the right range for ML feature jobs.
# Shorter: more overhead, smaller recovery window. Longer: slower restart.
env.enable_checkpointing(60_000) # Every 60 seconds

checkpoint_config = env.get_checkpoint_config()

# --- Exactly-once vs at-least-once ---
# Exactly-once: consistent results, ~10-20% throughput overhead
# At-least-once: faster, acceptable for most windowed ML features
checkpoint_config.set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

# --- CRITICAL: Retain checkpoints on cancellation ---
# Without this, cancelling the job deletes all checkpoints.
# You will lose all recovery points during planned deployments.
checkpoint_config.enable_externalized_checkpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)

# --- Checkpoint timeout ---
# If a checkpoint takes longer than this, it is aborted and retried.
# For large RocksDB state, set this to 10+ minutes.
checkpoint_config.set_checkpoint_timeout(300_000) # 5 minutes

# --- Tolerable failures ---
# Allow up to 3 consecutive checkpoint failures before failing the job.
# Prevents job death from transient I/O blips.
checkpoint_config.set_tolerable_checkpoint_failure_number(3)

# --- Minimum pause between checkpoints ---
# Ensures processing time between checkpoints is at least this long.
# Prevents checkpointing from overwhelming I/O.
checkpoint_config.set_min_pause_between_checkpoints(30_000) # 30 seconds

# --- RocksDB incremental checkpoints ---
# Only write the delta since the last checkpoint, not the full state.
# CRITICAL for large state (> 10 GB) - full checkpoints take too long.
env.set_state_backend(EmbeddedRocksDBStateBackend(incremental=True))

# --- Checkpoint storage ---
# Store checkpoints on durable distributed storage, not local disk.
from pyflink.datastream.checkpoint_storage import FileSystemCheckpointStorage
env.get_checkpoint_config().set_checkpoint_storage(
FileSystemCheckpointStorage("s3://ml-platform-checkpoints/fraud-feature-job/")
)

Savepoints - Planned Migrations

A savepoint is a manually triggered checkpoint. Use it before every deployment. Unlike automatic checkpoints, savepoints are never automatically deleted. They are the mechanism for: updating Flink job code without losing state, migrating to a new Flink version, changing parallelism.

# Trigger a savepoint before deploying new code
flink savepoint <job-id> s3://ml-platform-savepoints/fraud-feature-job/pre-deploy-2024-03-15/

# Cancel the job (savepoint is retained)
flink cancel <job-id>

# Deploy new code and resume from savepoint
flink run -s s3://ml-platform-savepoints/fraud-feature-job/pre-deploy-2024-03-15/ \
-p 8 fraud-feature-job-v2.jar

Savepoint compatibility rules:

  • You can change operator logic without breaking savepoint compatibility
  • You CANNOT change the number of operators or the topology without compatibility issues
  • You CAN change parallelism - Flink repartitions state during restore
  • Changing state type (e.g., ValueState to MapState) breaks savepoint compatibility

Dead Letter Queue Strategy

A Dead Letter Queue (DLQ) is where events go when they cannot be processed. Every streaming ML pipeline must have one. Without it, a single malformed event can halt the entire pipeline.

DLQ Design Principles

  1. Never let a bad event stop processing - catch all exceptions, route to DLQ, advance offset
  2. Preserve full context - store the original event bytes, the exception type, stack trace, timestamp, and pipeline version
  3. Make DLQ events actionable - a human or automated process should be able to inspect, fix, and replay
  4. Alert on DLQ growth - a DLQ that receives more than N events per minute indicates a systemic problem
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from pyflink.datastream import OutputTag
from pyflink.common.typeinfo import Types
import json
import traceback
import time

# Define the side output tag for DLQ records
DLQ_TAG = OutputTag("dlq", Types.STRING())

class PaymentFeatureProcessor(ProcessFunction):
"""
Processes payment events to compute features.
Routes malformed or processing-error records to DLQ via SideOutput.
Never raises an unhandled exception - the pipeline must keep running.
"""

def open(self, runtime_context: RuntimeContext):
self.redis_client = None # Initialize lazily or in open()
self.pipeline_version = "v3.1.2"

def process_element(self, raw_event: str, ctx: ProcessFunction.Context, collector):
try:
event = self._parse_event(raw_event)
features = self._compute_features(event)
collector.collect(json.dumps(features))

except json.JSONDecodeError as e:
# Deserialization failure - record is unparseable
dlq_record = self._build_dlq_record(
original_bytes=raw_event.encode("utf-8"),
error_class="JSONDecodeError",
error_message=str(e),
stack_trace=traceback.format_exc(),
error_category="deserialization",
)
ctx.output(DLQ_TAG, json.dumps(dlq_record))

except KeyError as e:
# Missing required field - likely a schema change
dlq_record = self._build_dlq_record(
original_bytes=raw_event.encode("utf-8"),
error_class="KeyError",
error_message=f"Missing field: {e}",
stack_trace=traceback.format_exc(),
error_category="schema_mismatch",
)
ctx.output(DLQ_TAG, json.dumps(dlq_record))

except Exception as e:
# Catch-all for unexpected processing errors
dlq_record = self._build_dlq_record(
original_bytes=raw_event.encode("utf-8"),
error_class=type(e).__name__,
error_message=str(e),
stack_trace=traceback.format_exc(),
error_category="processing_error",
)
ctx.output(DLQ_TAG, json.dumps(dlq_record))

def _parse_event(self, raw: str) -> dict:
event = json.loads(raw)
# Validate required fields - raises KeyError if missing
required = ["user_id", "amount", "merchant_id", "event_timestamp"]
for field in required:
_ = event[field]
return event

def _compute_features(self, event: dict) -> dict:
# Your feature computation logic here
return {
"user_id": event["user_id"],
"amount": float(event["amount"]), # May raise TypeError if not numeric
"computed_at": time.time(),
}

def _build_dlq_record(
self,
original_bytes: bytes,
error_class: str,
error_message: str,
stack_trace: str,
error_category: str,
) -> dict:
return {
"dlq_metadata": {
"error_class": error_class,
"error_message": error_message,
"stack_trace": stack_trace[-2000:], # Truncate to 2000 chars
"error_category": error_category,
"pipeline_version": self.pipeline_version,
"dlq_timestamp": time.time(),
},
"original_event_base64": __import__("base64").b64encode(original_bytes).decode(),
"original_event_string": original_bytes.decode("utf-8", errors="replace"),
}


# Wire up the DLQ side output
env = StreamExecutionEnvironment.get_execution_environment()

# Main stream
processed_stream = (
env
.from_source(kafka_source, watermark_strategy, "Payments")
.process(PaymentFeatureProcessor())
)

# Main output: write to feature store
processed_stream.sink_to(redis_sink)

# DLQ side output: write to DLQ Kafka topic
dlq_stream = processed_stream.get_side_output(DLQ_TAG)
dlq_stream.sink_to(
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("payments.dlq")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)

Idempotent Writes for Streaming ML

When a Flink job restarts from a checkpoint, it replays events from the Kafka offset saved in the checkpoint. If some of those events were already processed before the crash, their effects were already written to Redis. Replaying them without idempotency causes double-counting: rolling sums are inflated, counts are wrong.

The solution is idempotent writes: writing the same record twice has the same effect as writing it once.

Idempotent Redis Write - Lua Script for Atomic Compare-and-Set

import redis
import time
from typing import Any

class IdempotentFeatureWriter:
"""
Writes feature values to Redis idempotently using event_time as a version.
If an older event arrives after a newer one (due to replay or out-of-order),
the write is skipped.
"""

# Lua script: atomic compare-and-set with version check
UPSERT_SCRIPT = """
local current_version = redis.call('HGET', KEYS[1], 'event_time')
if current_version and tonumber(current_version) >= tonumber(ARGV[1]) then
return 0 -- current version is same or newer, skip
end
-- Build the HSET argument list from ARGV[2..end-1]
local args = {KEYS[1]}
for i = 2, #ARGV - 1 do
args[#args + 1] = ARGV[i]
end
redis.call('HSET', unpack(args))
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[#ARGV]))
return 1 -- wrote successfully
"""

def __init__(self, redis_url: str):
self.client = redis.from_url(redis_url, decode_responses=True)
self.upsert = self.client.register_script(self.UPSERT_SCRIPT)

def write(
self,
entity_type: str,
entity_id: str,
features: dict,
event_time: float,
ttl_seconds: int = 3600,
) -> bool:
"""
Write feature values only if event_time is newer than stored version.
Returns True if write occurred, False if skipped (not newer).
"""
key = f"{entity_type}_features:{entity_id}"

# Build flat args: [event_time_field, event_time_value, field1, val1, ..., ttl]
hset_args = ["event_time", str(event_time)]
for field, value in features.items():
hset_args.extend([field, str(value)])

result = self.upsert(
keys=[key],
args=[str(event_time)] + hset_args + [str(ttl_seconds)],
)
return bool(result)


# Faust-compatible idempotent write
import faust
from datetime import timedelta

app = faust.App("fraud-features-idempotent", broker="kafka://kafka:9092")
payments_topic = app.topic("payments", value_type=dict)

# Faust table stores the last written event_time per user for dedup
last_written_ts = app.Table("last_written_ts", default=float)

@app.agent(payments_topic)
async def process_idempotent(payments):
writer = IdempotentFeatureWriter("redis://redis-primary:6379")

async for payment in payments.group_by(lambda p: p["user_id"]):
uid = payment["user_id"]
event_time = payment["event_timestamp"]

# Skip if we've already processed a newer event for this user
if event_time <= last_written_ts[uid]:
continue # Deduplicate replayed events

written = writer.write(
entity_type="user",
entity_id=uid,
features={
"last_amount": payment["amount"],
"last_merchant": payment["merchant_id"],
},
event_time=event_time,
)

if written:
last_written_ts[uid] = event_time

Replay and Backfill

When a pipeline fails and falls behind, you eventually need to replay events from a specific point in time. Kafka retains events for a configured period (default 7 days; set to longer for ML pipelines). Replaying means resetting the consumer group offset and letting the job re-process.

Resetting Offsets for Replay

# Option 1: Reset to a specific timestamp (safest - replay from known good point)
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--group fraud-feature-flink-job \
--topic payments \
--reset-offsets \
--to-datetime 2024-03-15T00:00:00.000 \
--execute

# Option 2: Reset to beginning of topic (full replay)
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--group fraud-feature-flink-job \
--topic payments \
--reset-offsets \
--to-earliest \
--execute

# Option 3: Reset by relative offset (replay last N events)
kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--group fraud-feature-flink-job \
--topic payments \
--reset-offsets \
--shift-by -500000 \ # Go back 500,000 events
--execute

Dual Consumer Group Replay - Live and Replay Simultaneously

For zero-downtime recovery: run a separate consumer group for the replay while the live job continues processing new events. The replay group catches up on the backlog. Once it reaches the live position, switch over.

# Start a replay consumer group with a different group ID
# This does NOT interfere with the live consumer group
flink run -p 8 fraud-feature-job.jar \
--consumer-group-id fraud-feature-flink-job-REPLAY-2024-03-15 \
--kafka-offset-reset 2024-03-15T00:00:00.000 \
--write-target redis-staging # Write replay output to staging store first

# Once replay catches up to live position:
# 1. Validate staging store feature values match live store
# 2. Promote staging store to production OR merge corrections into production
# 3. Terminate replay consumer group

Monitoring Streaming ML Pipelines

Consumer Lag Monitor - Python to Prometheus

import subprocess
import json
import time
from prometheus_client import Gauge, start_http_server
from typing import Dict

# Prometheus metrics
CONSUMER_LAG_GAUGE = Gauge(
"kafka_consumer_group_lag_total",
"Total consumer group lag across all partitions",
["group", "topic"],
)
CONSUMER_LAG_PER_PARTITION = Gauge(
"kafka_consumer_group_lag_partition",
"Consumer group lag per partition",
["group", "topic", "partition"],
)

def get_consumer_lag(
bootstrap_server: str,
group_id: str,
topic: str,
) -> Dict[str, int]:
"""
Use kafka-consumer-groups.sh to get per-partition lag.
Returns {partition_id: lag}.
"""
result = subprocess.run(
[
"kafka-consumer-groups.sh",
"--bootstrap-server", bootstrap_server,
"--describe",
"--group", group_id,
],
capture_output=True,
text=True,
timeout=30,
)

# Parse the output: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
lag_by_partition = {}
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) >= 6 and parts[1] == topic and parts[5].isdigit():
partition = int(parts[2])
lag = int(parts[5])
lag_by_partition[partition] = lag

return lag_by_partition


ALERT_THRESHOLD = 100_000 # Alert if total lag exceeds 100k events

def monitor_lag(bootstrap_server: str, group_id: str, topic: str):
lag_by_partition = get_consumer_lag(bootstrap_server, group_id, topic)
total_lag = sum(lag_by_partition.values())

# Update Prometheus metrics
CONSUMER_LAG_GAUGE.labels(group=group_id, topic=topic).set(total_lag)
for partition, lag in lag_by_partition.items():
CONSUMER_LAG_PER_PARTITION.labels(
group=group_id, topic=topic, partition=str(partition)
).set(lag)

# Alert logic
if total_lag > ALERT_THRESHOLD:
print(
f"[ALERT] Consumer lag CRITICAL: group={group_id} topic={topic} "
f"total_lag={total_lag:,} (threshold={ALERT_THRESHOLD:,})"
)
# In production: send to PagerDuty / Opsgenie / Slack
return total_lag

if total_lag > ALERT_THRESHOLD // 2:
print(
f"[WARNING] Consumer lag elevated: group={group_id} topic={topic} "
f"total_lag={total_lag:,}"
)

return total_lag


if __name__ == "__main__":
start_http_server(9100) # Prometheus scrape endpoint
while True:
monitor_lag(
bootstrap_server="kafka:9092",
group_id="fraud-feature-flink-job",
topic="payments",
)
time.sleep(30)

Schema Evolution Strategy

Schema changes are inevitable. ML models get new features. Event producers add fields. Upstream teams rename things. Making these changes without breaking your streaming consumer requires a disciplined approach.

Schema Registry with Compatibility Rules

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.schema_registry import Schema

# Schema Registry client
registry_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})

# Current schema - version 1
PAYMENT_SCHEMA_V1 = """
{
"type": "record",
"name": "Payment",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "merchant_id", "type": "string"},
{"name": "event_timestamp", "type": "double"}
]
}
"""

# New schema - version 2: adds optional field with default
# BACKWARD COMPATIBLE: old consumer (V1 schema) can read V2 records
# because new field has a default value
PAYMENT_SCHEMA_V2 = """
{
"type": "record",
"name": "Payment",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "merchant_id", "type": "string"},
{"name": "event_timestamp", "type": "double"},
{"name": "currency_code", "type": "string", "default": "USD"},
{"name": "is_international", "type": "boolean", "default": false}
]
}
"""

# Set FULL compatibility for ML feature topics
# FULL = new schema can read old data AND old consumer can read new data
def set_schema_compatibility(subject: str, compatibility: str = "FULL"):
registry_client.set_compatibility(subject, compatibility)
print(f"Set {compatibility} compatibility for subject: {subject}")

# Safe schema evolution workflow:
# Step 1: Add field with default in V2 (backward compatible)
# Step 2: Deploy consumer that handles V2 (reads new field)
# Step 3: Deploy producer that sends V2 (adds new field)
# NEVER: remove a field, change a field type, or add a required field without default

Schema Evolution Rules for ML Topics

ChangeCompatible?Safe Procedure
Add optional field with defaultYesUpdate schema registry, then deploy consumer, then producer
Remove fieldNoMark as deprecated, keep indefinitely, add doc: "deprecated"
Change field type (int to float)NoAdd new field with new name and type, deprecate old field
Rename fieldNoAdd field with new name (with default), deprecate old name
Add required field (no default)NoNever. Always use defaults for new fields.

Capacity Planning

Computing Required Partitions and Parallelism

partitions=target_throughput (events/s)throughput_per_consumer (events/s)\text{partitions} = \lceil \frac{\text{target\_throughput (events/s)}}{\text{throughput\_per\_consumer (events/s)}} \rceil

flink_parallelism=partitions (for source operator)\text{flink\_parallelism} = \text{partitions (for source operator)}

For ML feature pipelines, throughput per consumer depends on processing complexity:

  • Simple aggregation: 50,000–100,000 events/s per task
  • With Redis write per event: 10,000–30,000 events/s per task
  • With ML inference per event: 1,000–5,000 events/s per task

Capacity Planning Script

import math
from dataclasses import dataclass
from typing import Dict

@dataclass
class PipelineCapacityPlan:
target_throughput_eps: int # events per second
processing_type: str # "aggregation", "redis_write", "ml_inference"
peak_multiplier: float = 2.0 # plan for 2x normal peak
kafka_partition_safety_factor: float = 1.5

# Throughput benchmarks per processing type (events/s per task slot)
THROUGHPUT_BENCHMARKS: Dict[str, int] = None

def __post_init__(self):
self.THROUGHPUT_BENCHMARKS = {
"aggregation_only": 100_000,
"aggregation_with_redis_write": 25_000,
"aggregation_with_batch_redis_write": 50_000,
"ml_inference_sklearn": 5_000,
"ml_inference_onnx": 15_000,
}

def compute(self) -> dict:
peak_throughput = self.target_throughput_eps * self.peak_multiplier
throughput_per_task = self.THROUGHPUT_BENCHMARKS.get(
self.processing_type,
self.THROUGHPUT_BENCHMARKS["aggregation_with_redis_write"],
)

# Flink parallelism (task slots per operator)
required_parallelism = math.ceil(peak_throughput / throughput_per_task)
# Round up to next power of 2 for clean partition division
flink_parallelism = 2 ** math.ceil(math.log2(max(required_parallelism, 1)))

# Kafka partitions (at least equal to max Flink parallelism)
kafka_partitions = math.ceil(flink_parallelism * self.kafka_partition_safety_factor)

# Memory estimate for RocksDB state
# Assume: 1 million unique users, 500 bytes per user state
num_entities = 1_000_000
bytes_per_entity = 500
total_state_bytes = num_entities * bytes_per_entity
state_per_task = total_state_bytes / flink_parallelism
rocksdb_memory_gb = (state_per_task * 2) / (1024 ** 3) # 2x for RocksDB overhead

return {
"target_throughput_eps": self.target_throughput_eps,
"peak_throughput_eps": peak_throughput,
"flink_parallelism": flink_parallelism,
"kafka_partitions": kafka_partitions,
"throughput_per_task": throughput_per_task,
"rocksdb_memory_per_task_gb": round(rocksdb_memory_gb, 2),
"total_rocksdb_memory_gb": round(rocksdb_memory_gb * flink_parallelism, 2),
"recommendation": (
f"Run {flink_parallelism} Flink task slots across "
f"{max(2, flink_parallelism // 4)} TaskManagers. "
f"Set {kafka_partitions} Kafka partitions. "
f"Allocate {math.ceil(rocksdb_memory_gb * 1.5)} GB RAM per TaskManager."
),
}


# Example: 100,000 events/second with Redis write per event
plan = PipelineCapacityPlan(
target_throughput_eps=100_000,
processing_type="aggregation_with_redis_write",
peak_multiplier=2.0,
)
result = plan.compute()
for k, v in result.items():
print(f"{k}: {v}")

Reliability Architecture Diagram


Alerting Runbook Structure

For each alert, the runbook must document exactly four things. Teams that skip this documentation spend 45 minutes in the middle of an incident figuring out what the alert means.

## Alert: kafka_consumer_group_lag_total > 100000

**What it means**: The Flink feature job is falling behind. It is processing events
slower than they arrive. Feature freshness is degrading. The fraud model will start
running on stale features within minutes.

**Immediate action (do this first)**:
1. Check Flink Web UI → are all 8 task slots running? If not, the job crashed.
2. Check Flink metrics → which operator is the bottleneck (highest backpressure)?
3. Check Redis latency → if Redis is slow, the Redis write operator is blocking.
4. If job crashed: trigger restart from last checkpoint
`flink run -s s3://checkpoints/fraud-feature-job/latest/ -p 8 fraud-feature-job.jar`

**Root cause investigation**:
- Traffic spike: check Kafka topic throughput metrics in Grafana
- Slow Redis: check Redis cluster metrics (CPU, memory, connection count)
- Processing bottleneck: look for long GC pauses in Flink TaskManager logs
- Skewed partitions: check per-partition lag - if one partition has 90% of the lag, hot key

**Resolution criteria**: alert resolves when lag drops below 10,000 and is decreasing

Common Mistakes

:::danger No Dead Letter Queue The single most common reliability mistake. A pipeline without a DLQ will halt on the first malformed event and stay halted - silently - until someone notices the lag alert. Implement the Flink SideOutput DLQ pattern before deploying anything to production. It is 20 lines of code that prevents the most common class of 3 AM incidents. :::

:::danger Checkpoints on Local Disk Only Storing Flink checkpoints on the TaskManager's local disk means that if the TaskManager machine fails, the checkpoint is gone. Use S3, GCS, or HDFS for checkpoint storage. Always. The checkpoint configuration example above shows the correct FileSystemCheckpointStorage setup. :::

:::warning Not Taking a Savepoint Before Deployment Deploying a new version of a Flink job without a savepoint means you cannot roll back if the new version has a bug. If you cancel the old job and the new version fails, you have no recovery point - you must replay from Kafka (potentially hours of data). Always: flink savepoint <job-id> before flink cancel. :::

:::warning Schema Changes Without Backward Compatibility Test A producer starts sending a new schema that your consumer cannot parse. Consumer crashes. DLQ fills. Lag explodes. Run a compatibility test before any schema change: use kafka-avro-console-consumer with the old schema against a topic receiving the new schema. If it fails, the change is not backward compatible. :::

:::tip Use Batch Redis Writes for Throughput Writing to Redis after every individual event is expensive. A pipeline processing 50,000 events/second with per-event Redis writes will spend more time waiting for Redis than computing features. Batch Redis writes using the pipeline API: accumulate 100–1000 feature records, then issue a single pipeline flush. This can improve throughput 5–10x with no semantic change to the output.

# Batch Redis writes using pipeline - 10x throughput improvement
pipeline = redis_client.pipeline(transaction=False)
for feature_record in batch:
pipeline.hset(f"user_features:{feature_record['user_id']}", mapping=feature_record)
pipeline.expire(f"user_features:{feature_record['user_id']}", 900)
pipeline.execute() # Single round trip for the entire batch

:::


Interview Q&A

Q: What is consumer lag and what does it tell you about a streaming pipeline's health?

A: Consumer lag is the difference between the latest offset in a Kafka partition and the offset the consumer group has committed - it represents the number of events that have arrived in Kafka but have not yet been processed. A lag of zero means the consumer is keeping up with the producer in real time. Growing lag means the consumer is falling behind: either events are arriving faster than they can be processed, or the consumer is stalled entirely.

For ML feature pipelines, consumer lag directly translates to feature staleness. If your pipeline has 500,000 events of lag and processes 50,000 events per second, your features are approximately 10 seconds stale. If lag is growing faster than it is shrinking, staleness is increasing continuously. Monitor lag with kafka-consumer-groups.sh --describe and alert when total lag across all partitions exceeds your freshness SLA's equivalent in event count.

Q: What is the difference between a checkpoint and a savepoint in Flink?

A: Both are consistent snapshots of all operator state in a Flink job, but they serve different purposes and have different lifecycle management.

A checkpoint is automatically triggered by Flink on a configured interval (e.g., every 60 seconds). It is used for automatic failure recovery - if the job crashes, Flink restarts from the last successful checkpoint and resumes processing from the corresponding Kafka offsets. Checkpoints are automatically cleaned up by Flink (unless you configure RETAIN_ON_CANCELLATION).

A savepoint is manually triggered by an operator (flink savepoint <job-id>). It is used for planned operations: deploying new code without losing state, changing parallelism, migrating Flink versions. Savepoints are never automatically deleted - they are retained indefinitely until you explicitly remove them. The practical rule: trigger a savepoint before every planned deployment so you can roll back if the new version has a bug.

Q: How do you implement a Dead Letter Queue in a streaming ML pipeline?

A: The standard Flink approach uses the SideOutput API. Every operator that can encounter a bad event (deserialization failures, missing fields, processing errors) catches all exceptions and emits the failing record to a side output stream tagged with OutputTag("dlq"). This side output stream is wired to a separate Kafka sink that writes to a {topic}.dlq topic.

The DLQ record should include: the original event bytes (encoded as base64 so it is recoverable), the exception class, the truncated stack trace (first 2000 characters), an error category (deserialization, schema_mismatch, processing_error), the pipeline version, and the timestamp when the error occurred. This metadata makes it possible for an operator to inspect the DLQ, understand whether the failures are systemic (new schema breaking all records) or isolated (a single malformed event), and decide whether to replay the corrected records or discard them.

Q: How do you handle schema evolution in a Kafka topic without downtime?

A: Use Confluent Schema Registry with FULL or BACKWARD compatibility mode enforced. BACKWARD compatibility means new schemas can read old data - old consumers continue working as new producers start sending the new schema. FULL compatibility adds the reverse: old schemas can also read new data.

The safe procedure for adding a new field: (1) register the new schema in the Schema Registry - if compatibility check passes, proceed; (2) deploy the consumer update that can handle both old and new schema (reads the new field with a default if absent); (3) deploy the producer update that starts sending the new field. The ordering matters: consumer first, producer second. This ensures there is never a moment when a consumer receives data it cannot parse.

Never remove a field from a Kafka schema used by ML pipelines. Mark it deprecated in the doc field and leave it forever. Downstream consumers may depend on it, and historical data in the offline store uses it for training.

Q: How do you replay events from Kafka after a pipeline failure?

A: Kafka retains events for the configured retention period (set this to at least 30 days for ML pipelines - the default 7 days is often insufficient for incident recovery). Replay is done by resetting the consumer group offset to a specific point.

Use kafka-consumer-groups.sh --reset-offsets --to-datetime <ISO-8601-timestamp> to set the offset to the precise point in time you want to replay from. Then restart the Flink job - it will begin consuming from that offset.

For zero-downtime replay: create a separate consumer group with a different ID, point it at the historical offset, and run the replay job writing to a staging feature store. Once it catches up to the live position, validate the staging output matches expectations, then promote. This lets the live job continue serving predictions while the replay job rebuilds the backlog.

Q: How would you capacity plan a Flink job for 100,000 events/second?

A: Start with the peak throughput: 100,000 events/second baseline, plan for 2x peak = 200,000 events/second. Measure the throughput achievable per Flink task slot for your specific processing logic - simple aggregation achieves 100,000+ events/second per slot; aggregation with Redis writes drops to 25,000–50,000 per slot; ML inference per event may be 5,000–15,000 per slot.

For 200,000 events/second with Redis writes (25,000 per slot): you need 8 task slots minimum. Round to 8 or 16 (powers of 2 for clean partition division). Set Kafka partitions to at least 12 (1.5x the Flink parallelism for safety margin).

For memory: estimate state size from entity cardinality × bytes per entity. 1 million users × 500 bytes = 500 MB total. Divide by parallelism = 62.5 MB per task slot for state. Multiply by 2–3x for RocksDB overhead and JVM heap. Add checkpoint overhead (incremental checkpoints need extra disk for delta files). Provision TaskManagers with at least 8 GB RAM per 4 task slots for typical ML feature pipelines.

© 2026 EngineersOfAI. All rights reserved.