:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Pipeline Viz demo on the EngineersOfAI Playground - no code required. :::
Data Pipeline Patterns for AI/ML Workflows
The Pipeline That Corrupted Six Months of Training Data
It's a Tuesday morning when Marcus, a senior data engineer at a fintech company processing millions of transactions daily, gets a message from the ML team: "The fraud model's AUC dropped from 0.94 to 0.81 overnight. We're holding the deployment." Marcus pulls up the feature pipeline logs. The training data assembly job, which runs every Sunday at 2 AM, completed successfully. No errors. He checks the feature store. The features look reasonable. He checks the model training metrics. The validation AUC had also been declining gradually over the past six weeks, but no one had noticed because each individual weekly drop was within noise tolerance.
He starts digging into the feature pipeline code. The rolling_7d_transaction_count feature is supposed to count the number of transactions a user made in the seven days before each labeled event. But six weeks ago, a colleague fixed a bug in the pipeline that was double-counting cancelled transactions. The fix was correct. The pipeline was redeployed. The problem: the fix was not idempotent. The pipeline didn't have a mechanism to re-run against historical data - so the fix only applied going forward. Now the training dataset contained six weeks of data computed with the corrected logic and twenty-two weeks of data computed with the buggy logic. The model was trained on a mixed distribution that didn't match either historical reality or the current production feature computation.
The remediation took three weeks. The ML team had to invalidate six months of training data, rerun the feature computation from bronze sources all the way through the feature store, and retrain the model from scratch. The root cause was not the bug - bugs happen. The root cause was that the pipeline was not designed for reprocessing. There was no backfill capability, no idempotency guarantee, and no mechanism to audit which version of the logic produced which row of data.
What Marcus learned that week is the lesson at the center of this lesson: pipeline patterns are not optional architecture decisions - they are the difference between a data platform that can be trusted and one that produces silent corruption. Idempotency, backfill, exactly-once semantics, dead letter queues - these are not engineering luxuries. They are the minimum requirements for a production data system that serves ML models where data quality directly affects business outcomes.
This lesson covers the patterns in depth: not just what they are, but why they are necessary, how to implement them, and where they break in practice.
Why This Exists
The Era of Brittle Pipelines
Before these patterns were formalized, data pipelines were sequences of scripts. Job A extracts from the database. Job B transforms the extract. Job C loads into the warehouse. Each job is triggered by the previous one's success. When Job B fails halfway through, the team re-runs it manually. But Job B has already processed and written the first half of the records. Re-running it processes all the records again - including the first half - which now appear twice in the warehouse. The data is corrupted. The analyst who notices the duplication files a bug report. The engineer who investigates spends days trying to determine which records are legitimate and which are duplicates.
This was not an edge case. It was the normal state of data pipelines in 2010. Every company operating at scale had a backlog of data quality issues caused by non-idempotent pipelines. The patterns in this lesson exist because teams were burned by these failures enough times to formalize the solutions.
The Cloud Warehouse Shift
The shift from on-premise data warehouses to cloud warehouses (Redshift, BigQuery, Snowflake) changed what was architecturally feasible. Compute and storage became cheap and elastically scalable. This enabled ELT as a viable alternative to ETL - load raw data first, transform inside the warehouse using cheap compute. It enabled Lambda architecture - run a batch layer on the full dataset and a speed layer on recent data simultaneously. It enabled on-demand backfill - reprocess six months of history by spinning up a large Spark cluster for a few hours instead of waiting for a dedicated cluster to become available.
The patterns we cover in this lesson were all developed in response to real production problems, refined over years of operational experience, and are now standard knowledge for any data engineer working at scale.
Historical Context
ETL (Extract, Transform, Load) dates to the 1970s as a concept and was the dominant pattern through the 2000s. Commercial ETL tools (Informatica, SSIS, DataStage) generated significant revenue. The transformation happened outside the warehouse - in the ETL tool itself - because warehouses were expensive and storage was the bottleneck.
ELT (Extract, Load, Transform) emerged around 2010–2013 as cloud warehouses became viable. The insight: load raw data first, then transform inside the warehouse using SQL. The warehouse is now the computation engine, not just the storage layer.
Lambda architecture was coined by Nathan Marz in 2011 in his paper "How to beat the CAP theorem." Marz was working at Twitter on the BackType analytics platform. The insight was to run batch processing (high accuracy, high latency) and stream processing (low accuracy, low latency) in parallel, merging their outputs in a serving layer.
Kappa architecture was proposed by Jay Kreps (Kafka co-creator) in 2014 as a critique of Lambda. Kreps argued that maintaining two separate code paths (batch and streaming) was too complex - unified stream processing should handle everything.
Exactly-once semantics in Kafka was not available until version 0.11 (released 2017). Before that, exactly-once was not achievable at the message broker level, and engineers had to implement idempotent consumers to handle at-least-once delivery.
ETL vs ELT: Why ELT Won
ETL: The Classic Pattern
In ETL, data moves through three sequential stages: Extract from the source, Transform using an external compute engine, Load the transformed result into the destination.
The extract might be a SQL dump from Postgres. The transform happens in a Python script or an Informatica job - cleaning nulls, casting types, applying business logic. The load writes the result to the data warehouse.
The critical characteristic: transformation happens before the data reaches the warehouse. The warehouse never sees raw data.
ETL works well when:
- Raw data volume is very large and the transformed result is much smaller (e.g., pre-aggregate before loading to avoid warehouse storage costs)
- The destination system can't run arbitrary compute (an OLTP database, not a warehouse)
- Sensitive data must be masked or tokenized before it reaches the warehouse (compliance requirement)
- Transformation requires external systems - an API call to enrich each record, for example
ETL fails when:
- Transformation logic has bugs. Raw data is gone - you can only re-extract from source, which may be slow or rate-limited.
- Business requirements change and you need to recompute with different logic. No raw data = recompute from scratch from source.
- Schema evolution in the source breaks the transformation step. The load never happens and data is lost for the failed window.
ELT: Load First, Transform in the Warehouse
In ELT, data is extracted from source, loaded immediately to the warehouse in raw form, and transformations are applied inside the warehouse using SQL (typically via dbt).
The raw data is always available. If a transformation has a bug, fix the transformation and rerun it - the raw source data is sitting in the warehouse's raw/bronze layer. Recomputing from bronze is fast (no network round trip to the source system) and cheap (warehouse compute is elastic).
ELT works well when:
- Cloud warehouse costs are low enough that storing raw data doesn't break the budget
- Transformation logic evolves frequently (iteration is cheap - just re-run the SQL)
- Multiple teams need access to raw data for different purposes (a single bronze table serves analytics, ML, and compliance)
- Transformation is primarily SQL-expressible (most OLAP transformations are)
ELT fails when:
- The source data contains PII that must never reach the warehouse. ETL with masking at the extraction stage is required.
- Transformation requires ML model inference on each row (enrich each record with a sentiment score, for example). SQL can't do this; Python in the extraction stage or a separate post-load enrichment step is needed.
- Storage costs are genuinely prohibitive (rare with modern cloud warehouses, but real for extreme-scale IoT data)
:::tip The practical answer For most AI/ML data platforms built today: use ELT. Load raw data to bronze. Transform with dbt. The ability to reprocess from bronze when bugs are found is worth the storage cost.
Reserve ETL patterns for compliance-sensitive data that must be masked before storage, or for enrichment workflows that require external API calls per record. :::
Lambda Architecture: Two Layers, One Answer
The Lambda architecture was a breakthrough when it was proposed - a systematic way to have both low latency and high accuracy.
The Three Layers
Batch layer - runs on the complete dataset, recomputing results from scratch periodically (nightly or weekly). Results are highly accurate because they process all historical data. Latency is high - results are available hours after the data arrives. Technology: Spark, Hive, dbt on a warehouse.
Speed layer - processes only the most recent data (the last few hours or days) using a streaming engine. Results are available within seconds or minutes but may be approximate (because they haven't seen all historical data). Technology: Kafka Streams, Flink, Spark Structured Streaming.
Serving layer - merges the batch layer's accurate historical results with the speed layer's recent approximations. A query for "total orders in the last 30 days" returns: the batch layer's answer for days 1–27 (highly accurate) plus the speed layer's answer for the last 3 days (approximate). The combination is both low-latency and accurate.
Lambda in ML Context
For a recommendation model's feature computation:
- Batch layer: every night, recompute
user_7d_order_countfrom all raw events. Accurate, always self-consistent. - Speed layer: every minute, process new order events from Kafka and update a running counter in Redis.
- Serving layer: at inference time, the feature server reads the nightly batch result from the warehouse and adds the Redis counter for today's events.
The model gets a feature that is accurate for the last 7 days and updated within minutes for the most recent orders.
Lambda's Problems
Lambda has one fatal flaw: two code paths for the same logic.
The batch aggregation SQL that computes user_7d_order_count over 90 days of Parquet files is different code from the Flink streaming job that increments a Redis counter per event. Both implement the same business logic. When the business logic changes - say, "cancelled orders should not count toward order history" - you must update both code paths and keep them in sync.
In practice, they drift. The batch layer handles edge cases the streaming layer doesn't. The streaming layer uses a slightly different join key. The serving layer is merging results that were computed with subtly different semantics. The resulting features have small inconsistencies that are difficult to diagnose.
Kappa Architecture: Unified Streaming
Jay Kreps proposed Kappa in 2014 as a simplification: eliminate the batch layer. Treat all data as a stream. Reprocess historical data by replaying the event log from the beginning.
The Core Insight
A batch processing job is just a streaming job that happens to start at the beginning of the data and stop at the end. If your streaming engine (Kafka + Flink, or Spark Structured Streaming) can replay historical events and produce the same results as a batch job, you don't need a separate batch layer.
Kafka's log retention (configurable, can be infinite with tiered storage) is the key enabler. Keep all events in Kafka. When you need to reprocess history - deploy a new version of the streaming job, point it at the beginning of the Kafka log, let it catch up, then promote it to the serving layer.
Kappa's Trade-offs
Simpler code path - one streaming job, not two codebases. Business logic changes happen in one place.
More complex infrastructure - Kafka with long retention (terabytes or petabytes of event log) is operationally intensive. The cluster must be sized for both throughput and retention. Compacted topics, tiered storage, and cross-region replication add operational surface area.
Reprocessing is slow - replaying three years of event history through a Flink job to produce updated features takes time. A batch-optimized Spark job reading Parquet files from S3 is often 10–100x faster for historical reprocessing than a streaming job replaying Kafka. Kappa's reprocessing cost is real.
Not suitable for all workloads - complex joins over large historical windows (join today's events with six months of user history) are awkward in streaming. They require windowed state, which at large scale requires careful state backend management (RocksDB in Flink) and can cause excessive memory pressure.
Idempotency: The Most Important Pipeline Property
An idempotent pipeline produces the same output regardless of how many times it is run with the same input. Re-running an idempotent pipeline for the same time window produces no duplicates, no missing records, no data corruption.
Why Pipelines Must Be Idempotent
Pipelines fail. Networks time out. Workers crash. Cloud spot instances are reclaimed mid-job. When a pipeline fails and is retried - and all production orchestrators retry on failure - if the pipeline is not idempotent, retrying it corrupts data.
Consider a naive pipeline:
def load_events(date: str) -> None:
events = fetch_from_source(date)
db.execute("INSERT INTO events SELECT * FROM staging_events")
# If this fails and retries: duplicates in the events table.
If the INSERT succeeds but the job fails afterward (updating a checkpoint, sending a success notification), the orchestrator retries. The same records are inserted again. The events table now has duplicates.
Implementing Idempotency
Pattern 1: Delete-then-insert (truncate-and-reload)
Before inserting new data for a partition, delete the existing data for that partition. Re-running the job produces the same result because you always start from empty for that partition.
def load_partition_idempotent(
date: str,
conn: sa.engine.Connection,
new_records: list[dict],
) -> None:
"""
Idempotent partition load: delete existing data for date, then insert.
Safe to re-run any number of times.
"""
with conn.begin():
# Delete existing data for this partition
conn.execute(
sa.text("DELETE FROM events WHERE event_date = :date"),
{"date": date}
)
# Insert new records
if new_records:
conn.execute(
sa.text(
"INSERT INTO events (event_id, user_id, event_date, amount_usd) "
"VALUES (:event_id, :user_id, :event_date, :amount_usd)"
),
new_records
)
# Both operations are in the same transaction.
# If insert fails, delete is rolled back.
# Re-run: delete (idempotent on empty partition) + insert = correct result.
This is simple but requires the destination to support transactions and the partition key to be well-defined.
Pattern 2: Upsert (merge/insert-or-update)
Use a unique key to detect existing records and update them rather than inserting duplicates. Supported natively in most databases (PostgreSQL's INSERT ... ON CONFLICT DO UPDATE, BigQuery's MERGE statement).
def upsert_events(
conn: sa.engine.Connection,
records: list[dict],
) -> int:
"""
Upsert events by event_id. Re-running with the same records is safe:
existing records are updated (no-op if unchanged), new records are inserted.
Returns count of affected rows.
"""
upsert_sql = sa.text("""
INSERT INTO events (event_id, user_id, event_date, amount_usd, updated_at)
VALUES (:event_id, :user_id, :event_date, :amount_usd, :updated_at)
ON CONFLICT (event_id) DO UPDATE SET
amount_usd = EXCLUDED.amount_usd,
updated_at = EXCLUDED.updated_at
""")
with conn.begin():
result = conn.execute(upsert_sql, records)
return result.rowcount
Upsert is the correct pattern for tables where individual records may be updated (user profiles, order status updates). It requires a natural unique key - if your data doesn't have one, delete-then-insert is simpler.
Pattern 3: Write-to-new-location (atomic swap)
Write output to a new directory or staging table. When complete, atomically swap (rename the directory, or swap the table pointer). This is the pattern used by Spark on S3: write to a temporary prefix, then rename to the final prefix. Rename is atomic on most object stores (with some caveats). Re-running the job overwrites the temporary prefix before swapping, so the final result is always from the latest run.
Exactly-Once Semantics: What It Means and Why It's Hard
Exactly-once semantics means each message is processed exactly once - not zero times (at-most-once) and not one-or-more times (at-least-once), but precisely once. It is the hardest semantic to achieve in distributed systems.
The Three Levels
At-most-once - messages may be lost, but never duplicated. Easiest to implement: fire-and-forget. Acceptable for metrics, logging, or analytics where occasional loss is tolerable. Never acceptable for financial transactions or ML features.
At-least-once - messages are never lost, but may be duplicated. The standard default for Kafka consumers. Consumer reads a message, processes it, then commits the offset. If the consumer crashes after processing but before committing, it re-reads and reprocesses the same message. Duplicates are possible. Acceptable when downstream is idempotent (de-duplicates by message ID).
Exactly-once - messages are never lost and never duplicated. Requires coordination between the message broker, the compute layer, and the destination system. Kafka has supported exactly-once since 0.11 (2017) for Kafka-to-Kafka pipelines using the transactional producer API.
Why It's Hard
Achieving exactly-once requires atomic coordination across three systems: the source (Kafka), the compute (Flink/Spark), and the sink (database, warehouse, another Kafka topic). Each transition between systems introduces a failure window.
Consider: a Flink job reads from Kafka, applies a transformation, writes to a PostgreSQL table, then commits the Kafka offset. If the Postgres write succeeds but the offset commit fails, Flink re-reads and re-processes the same record. The result is a duplicate in Postgres.
Flink solves this with its checkpointing mechanism: Flink checkpoints include both the Kafka offsets and the output transaction state. During recovery, it rolls back both - re-reads from the checkpointed offset and rolls back the Postgres transaction. This requires the sink to support two-phase commit, which Postgres does and S3 does not.
Practical Approaches
For most teams, at-least-once delivery with idempotent sinks is the pragmatic answer:
- Accept that the streaming engine delivers messages at-least-once.
- Make the sink idempotent: use upsert by message ID, or deduplication windows.
- Reserve true exactly-once for cases where duplicate processing has severe consequences (financial ledger updates, user-visible notifications).
# At-least-once with idempotent deduplication
# Flink job delivers messages at-least-once
# Sink deduplicates by event_id within a 24-hour window
def process_with_dedup(
event: dict,
dedup_cache: dict, # In production: Redis with TTL
) -> bool:
"""
Returns True if the event should be processed (not a duplicate).
Uses event_id as the deduplication key with a time-based TTL.
"""
event_id = event["event_id"]
if event_id in dedup_cache:
return False # Duplicate: skip processing
dedup_cache[event_id] = True # Mark as seen
return True
In production, the dedup cache is Redis with a TTL equal to the maximum expected redelivery window (typically 24 hours for most streaming pipelines).
Dead Letter Queues and Error Handling
A dead letter queue (DLQ) is a separate storage destination for messages that could not be processed successfully after a configurable number of retries.
Why DLQs Are Essential
Without a DLQ, a pipeline has two options when it encounters an unprocessable message:
- Skip it - the message is silently lost. For ML feature pipelines, silent loss accumulates into systematic feature bias.
- Halt - the pipeline stops processing all subsequent messages until the problematic one is resolved. One corrupt event blocks all future events.
Neither is acceptable in production. A DLQ provides a third option: route unprocessable messages to a side channel for investigation, continue processing the rest of the stream, and replay DLQ messages after fixing the underlying cause.
DLQ Message Structure
A good DLQ record preserves the original message plus metadata about why it failed:
import json
import traceback
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
@dataclass
class DLQRecord:
"""
A message that could not be processed, preserved for later investigation.
"""
original_topic: str
original_partition: int
original_offset: int
original_message: str # JSON-serialized original
error_type: str # e.g., "ValidationError", "SchemaError"
error_message: str
error_traceback: str
retry_count: int
first_failure_at: str # ISO 8601 UTC
last_failure_at: str
pipeline_version: str # which version of the code failed
class PipelineWithDLQ:
"""
Stream processing pipeline with DLQ error handling.
Processes events from a Kafka topic, routes failures to DLQ.
"""
MAX_RETRIES = 3
def __init__(
self,
dlq_writer, # callable: writes DLQRecord to storage
pipeline_version: str,
):
self.dlq_writer = dlq_writer
self.pipeline_version = pipeline_version
self._retry_counts: dict[str, int] = {}
self._first_failure_times: dict[str, str] = {}
def process_event(
self,
message: dict,
topic: str,
partition: int,
offset: int,
) -> bool:
"""
Process a single event with retry logic and DLQ routing.
Returns True if processed successfully, False if sent to DLQ.
"""
message_key = f"{topic}:{partition}:{offset}"
try:
self._validate_schema(message)
self._apply_transform(message)
self._write_to_sink(message)
# Success - clear retry state
self._retry_counts.pop(message_key, None)
self._first_failure_times.pop(message_key, None)
return True
except Exception as exc:
retry_count = self._retry_counts.get(message_key, 0) + 1
self._retry_counts[message_key] = retry_count
now = datetime.now(timezone.utc).isoformat()
if message_key not in self._first_failure_times:
self._first_failure_times[message_key] = now
if retry_count >= self.MAX_RETRIES:
# Route to DLQ
dlq_record = DLQRecord(
original_topic=topic,
original_partition=partition,
original_offset=offset,
original_message=json.dumps(message),
error_type=type(exc).__name__,
error_message=str(exc),
error_traceback=traceback.format_exc(),
retry_count=retry_count,
first_failure_at=self._first_failure_times[message_key],
last_failure_at=now,
pipeline_version=self.pipeline_version,
)
self.dlq_writer(dlq_record)
self._retry_counts.pop(message_key, None)
self._first_failure_times.pop(message_key, None)
return False
# Retry will be triggered by the calling framework
raise
def _validate_schema(self, message: dict) -> None:
required = {"event_id", "user_id", "event_type", "timestamp"}
missing = required - message.keys()
if missing:
raise ValueError(f"Missing required fields: {missing}")
def _apply_transform(self, message: dict) -> None:
# Business logic transformation
message["processed_at"] = datetime.now(timezone.utc).isoformat()
def _write_to_sink(self, message: dict) -> None:
# Write to database / warehouse / feature store
pass
DLQ records should be monitored: alert when DLQ depth exceeds a threshold (e.g., more than 100 messages in the DLQ within one hour indicates a systemic problem, not a random corrupt event). DLQ records should be replayed by a separate process after the underlying cause is fixed.
Backfilling: Reprocessing History
Backfilling is rerunning a pipeline over historical data. It is needed when: a bug is discovered in a transformation, business logic changes, a new feature is added to a model (and you need historical values), or a schema migration changes how data should be interpreted.
Backfill Design Principles
Separation from the live pipeline. A backfill job should run on separate infrastructure from the live pipeline. A backfill consuming 100% of a Spark cluster's resources while the live pipeline waits is a production incident.
Idempotent writes. The backfill pipeline must use the same idempotent write patterns as the live pipeline. If backfill runs in parallel with live data, both must produce consistent results.
Partition-level parallelism. Backfills are embarrassingly parallel: each date partition can be reprocessed independently. Design the backfill to process N partitions concurrently.
"""
backfill.py
Reprocess historical partitions of the bronze->silver pipeline.
Each date partition is processed independently and idempotently.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date, timedelta
from typing import Callable
import logging
logger = logging.getLogger(__name__)
def generate_date_range(start: date, end: date) -> list[date]:
"""Generate list of dates from start (inclusive) to end (inclusive)."""
result = []
current = start
while current <= end:
result.append(current)
current += timedelta(days=1)
return result
def backfill_partition(
partition_date: date,
process_fn: Callable[[date], None],
) -> tuple[date, bool, str | None]:
"""
Process a single partition with error handling.
Returns (date, success, error_message).
"""
try:
process_fn(partition_date)
return (partition_date, True, None)
except Exception as exc:
logger.error(f"Failed to process partition {partition_date}: {exc}")
return (partition_date, False, str(exc))
def run_backfill(
start_date: date,
end_date: date,
process_fn: Callable[[date], None],
max_workers: int = 4,
) -> dict:
"""
Backfill all partitions from start_date to end_date.
Processes up to max_workers partitions concurrently.
Returns summary with success/failure counts and failed dates.
"""
dates = generate_date_range(start_date, end_date)
total = len(dates)
logger.info(
f"Starting backfill: {start_date} to {end_date} "
f"({total} partitions, {max_workers} workers)"
)
results = {"success": [], "failed": [], "errors": {}}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(backfill_partition, d, process_fn): d
for d in dates
}
for future in as_completed(futures):
partition_date, success, error = future.result()
if success:
results["success"].append(partition_date)
logger.info(
f"[{len(results['success'])}/{total}] "
f"Completed partition {partition_date}"
)
else:
results["failed"].append(partition_date)
results["errors"][str(partition_date)] = error
logger.error(f"Failed partition {partition_date}: {error}")
logger.info(
f"Backfill complete. "
f"Success: {len(results['success'])}, "
f"Failed: {len(results['failed'])}"
)
if results["failed"]:
logger.warning(
f"Failed partitions (rerun these): "
f"{sorted(results['failed'])}"
)
return results
# Example usage:
# from pipeline import process_date_partition
#
# results = run_backfill(
# start_date=date(2024, 1, 1),
# end_date=date(2024, 6, 30),
# process_fn=process_date_partition,
# max_workers=8,
# )
Watermarking: Handling Late-Arriving Data
In stream processing, events rarely arrive in the order they were generated. A user clicks on a mobile app while on a subway. The click event is generated at 2:00:00 PM. The network is spotty. The event arrives at the Kafka broker at 2:07:32 PM. The streaming pipeline processing minute-level windows has already closed the 2:00 window.
Should this event be counted in the 2:00–2:01 window, or dropped? If counted, how long should the pipeline wait for late events before closing each window? The answer is the watermark.
What a Watermark Is
A watermark is the pipeline's assertion about the minimum event time it expects to receive going forward. Formally, if the current watermark is , the pipeline guarantees that no event with event time will be seen in the future.
When the watermark advances past a window's end time, that window is closed and its result is emitted. A watermark with a lag of means: "we are willing to wait time for late events before closing each window."
The watermark is typically computed as:
where is the allowed lateness. As the pipeline processes events with increasing timestamps, the watermark advances. When an event arrives with event time less than the current watermark, it is "late" - the framework either drops it or routes it to a side output for handling.
Implementing Watermarking Manually
Understanding watermarking at the code level - before relying on Flink or Spark's built-in watermark generators - makes debugging and configuration much clearer:
"""
watermark.py
Manual implementation of watermark-based windowing to illustrate
what frameworks like Flink and Spark Structured Streaming do internally.
This is educational - in production, use the framework's built-in API.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from typing import Callable
@dataclass
class Event:
event_id: str
user_id: str
event_type: str
event_time: datetime # time the event occurred (from the payload)
value: float
@dataclass
class WindowResult:
window_start: datetime
window_end: datetime
user_id: str
event_count: int
total_value: float
late_events_dropped: int
class WatermarkWindowProcessor:
"""
Processes events into tumbling time windows with watermark-based
late event handling.
allowed_lateness: how long after a window closes to still accept events
window_size: size of each tumbling window
emit_callback: called when a window is finalized
"""
def __init__(
self,
allowed_lateness: timedelta,
window_size: timedelta,
emit_callback: Callable[[WindowResult], None],
):
self.allowed_lateness = allowed_lateness
self.window_size = window_size
self.emit_callback = emit_callback
# Current watermark: the minimum event time we accept
self._watermark: datetime | None = None
# Max event time seen (watermark = max_event_time - allowed_lateness)
self._max_event_time: datetime | None = None
# In-flight windows: window_key -> {user_id -> accumulated state}
# window_key = window_start timestamp
self._windows: dict[datetime, dict[str, dict]] = defaultdict(
lambda: defaultdict(lambda: {"count": 0, "total_value": 0.0})
)
self._late_drop_count = 0
def _window_start_for(self, event_time: datetime) -> datetime:
"""Compute the tumbling window start for a given event time."""
window_seconds = int(self.window_size.total_seconds())
ts = int(event_time.timestamp())
window_ts = (ts // window_seconds) * window_seconds
return datetime.fromtimestamp(window_ts, tz=timezone.utc)
def _advance_watermark(self, event_time: datetime) -> None:
"""Update watermark based on the new event's event time."""
if self._max_event_time is None or event_time > self._max_event_time:
self._max_event_time = event_time
new_watermark = self._max_event_time - self.allowed_lateness
if self._watermark is None or new_watermark > self._watermark:
old_watermark = self._watermark
self._watermark = new_watermark
# Close any windows that are now past the watermark
self._close_expired_windows(old_watermark)
def _close_expired_windows(self, old_watermark: datetime | None) -> None:
"""Close and emit windows whose end time is <= current watermark."""
windows_to_close = []
for window_start in list(self._windows.keys()):
window_end = window_start + self.window_size
if window_end <= self._watermark:
windows_to_close.append(window_start)
for window_start in windows_to_close:
window_end = window_start + self.window_size
window_state = self._windows.pop(window_start)
for user_id, state in window_state.items():
result = WindowResult(
window_start=window_start,
window_end=window_end,
user_id=user_id,
event_count=state["count"],
total_value=state["total_value"],
late_events_dropped=self._late_drop_count,
)
self.emit_callback(result)
def process_event(self, event: Event) -> None:
"""
Process a single event. Routes late events to drop or side output.
Advances the watermark based on event_time.
"""
# Advance watermark first
self._advance_watermark(event.event_time)
# Check if event is late (event_time < current watermark)
if self._watermark is not None and event.event_time < self._watermark:
# Late event: beyond allowed lateness, drop it
self._late_drop_count += 1
return # In production: emit to side output / DLQ
# Accumulate into the appropriate window
window_start = self._window_start_for(event.event_time)
self._windows[window_start][event.user_id]["count"] += 1
self._windows[window_start][event.user_id]["total_value"] += event.value
def flush(self) -> None:
"""
Force-close all open windows. Call at end of stream or for testing.
"""
if self._max_event_time is not None:
# Advance watermark to max event time (close all windows)
self._watermark = self._max_event_time + self.window_size
self._close_expired_windows(None)
This implementation shows exactly what Flink's WatermarkStrategy.forBoundedOutOfOrderness() does internally: track the maximum event time seen, subtract the allowed lateness to compute the watermark, and close windows when their end time is overtaken by the watermark. Understanding this makes configuring Flink's watermark generators in production far less mysterious.
Push vs Pull Pipelines
Pull (scheduled/polling) - the pipeline runs on a schedule. Every hour, the Airflow DAG triggers, the job reads new records from the source, processes them, writes to the destination. Simple, predictable, easy to monitor. The downside is latency bounded by the poll interval.
Push (event-driven) - source systems push events to the pipeline as they occur (via Kafka, SNS, webhooks). The pipeline reacts to each event immediately. Low latency, but requires the source to support event emission and the pipeline to handle backpressure.
For ML feature pipelines, the choice often depends on the freshness requirement:
- Model retrained weekly: pull, daily batch pipeline
- Model retrained daily: pull, hourly pipeline, or daily with micro-batch for the last few hours
- Real-time inference with freshness requirement under 5 minutes: push, streaming pipeline
The Fan-out / Fan-in Pattern
Fan-out and fan-in are patterns for parallelizing pipeline stages that are independent of each other.
Fan-out - split a single input stream or dataset into multiple parallel processing stages. Example: a gold-layer user features table is split by user_id range into 16 parallel Spark partitions, each processed by a separate executor.
Fan-in - merge the results of multiple parallel stages into a single output. Example: after 16 parallel partitions each compute their user features, the results are merged into a single output table.
Fan-out/fan-in is how batch pipelines achieve horizontal scalability. A Spark job processing 500 million user event records fans out across 200 executors (fan-out), each processing 2.5 million records, then merges the aggregated results (fan-in) to produce the final feature table.
In orchestration (Airflow/Dagster), fan-out is implemented with dynamic task mapping: one upstream task's output spawns N parallel downstream tasks, then a final aggregation task depends on all N.
YouTube Resources
| Title | Channel | Why Watch |
|---|---|---|
| Lambda vs Kappa Architecture | Bartosz Konieczny | Clear side-by-side comparison with real trade-offs |
| Exactly Once Semantics in Kafka | Confluent | Deep dive into Kafka's transactional producer and idempotent producers |
| Apache Flink Watermarks Explained | Apache Flink | Official Flink watermarking explanation from core contributors |
| dbt Incremental Models | dbt Labs | How to implement idempotent incremental loads in dbt |
| Backfilling in Data Engineering | Data With Zach | Practical backfill strategies for production pipelines |
Production Engineering Notes
Idempotency keys must be chosen carefully. The event ID or message ID from the source system is the natural deduplication key. But not all source systems provide unique, stable IDs. When the source doesn't provide one, construct a composite key from fields that uniquely identify the event (user_id + event_type + timestamp at microsecond resolution). Document the key choice - a future engineer changing the key without understanding its role will break idempotency.
Watermark lag is a tuning parameter, not a constant. The right watermark lag depends on the network and processing latency distribution of your event sources. A mobile app with users in poor-network areas may have events arrive 30 minutes late. A server-side event in the same datacenter arrives within milliseconds. Use different watermark configurations for different event sources. Monitor the "late event rate" - if more than 1% of events are arriving after the watermark, the lag is too small.
DLQ debt accumulates silently. Teams instrument the DLQ to alert when it receives messages, then stop monitoring it after the first few weeks when it's "clean." DLQ messages accumulate over months. When someone eventually investigates, the root cause of each failure is no longer reproducible. Treat DLQ messages as production bugs: assign ownership, triage weekly, resolve within an SLA.
Backfill resource contention is a common Saturday morning incident. A backfill job scheduled over a weekend to avoid impacting business hours consumes the entire Spark cluster, causing the Sunday morning live pipeline runs to queue behind the backfill. Always run backfills on separate clusters or with explicit resource limits. In Databricks, use separate job clusters. In on-prem Spark, use resource pools.
The ELT "transform in the warehouse" assumption breaks for ML at scale. dbt running on BigQuery is excellent for SQL-based transformations over data at the warehouse scale. But ML feature transformations often require Python (scikit-learn preprocessing, custom aggregations, embeddings), which dbt cannot execute. The pattern breaks and teams need a hybrid: SQL transformations in dbt, Python transformations in Spark or a dedicated feature computation service.
Common Mistakes
:::danger Non-idempotent write patterns
Using plain INSERT without any deduplication mechanism, then relying on "the pipeline never fails twice" as the safety assumption. Pipelines always fail twice eventually. The first retry produces duplicates. Duplicates compound over time - especially in feature tables where duplicate rows cause inflated aggregates (a user's 7-day order count of 50 becomes 100). This corrupts model features silently. Always use upsert, delete-then-insert, or atomic swap patterns.
:::
:::danger Treating watermark lag as "just set it high to be safe" Setting watermark lag to 24 hours because "we want to be safe and not drop any events." The consequence: every streaming window stays open for 24 hours before emitting results. A pipeline computing hourly user activity features now has 24-hour-old results at serving time. The model is using yesterday's features. This is worse than batch. Watermark lag must match the actual latency distribution of your event sources - typically measured in minutes, not hours. :::
:::warning Forgetting to backfill after bug fixes Fixing a pipeline bug in the transformation logic and deploying the fix to the live pipeline, without reprocessing the historical data that was computed under the buggy logic. The historical data remains incorrect. The next model training run uses a mixture of correctly-computed recent data and incorrectly-computed historical data. The model trains on a corrupted mixed distribution. This is one of the most common causes of unexplained model degradation. :::
:::warning Running backfill on the live pipeline cluster Triggering a full historical backfill (6 months of data) on the same Spark cluster that runs the hourly live pipelines. The backfill consumes available executor slots. Live pipelines queue. SLAs are breached. The on-call engineer kills the backfill partway through - leaving the historical data in a partially-reprocessed state that is more confusing than the original bug. :::
Interview Q&A
Q1: How would you handle late-arriving data in a streaming pipeline that feeds ML features?
This is the core watermarking question. The answer has three parts.
First, quantify "late" - late relative to what? Event time (when the event happened) or processing time (when the pipeline sees it)? Late data in streaming refers to events whose event_time is less than the current watermark. Before designing the solution, measure the actual latency distribution of your event sources: p50, p95, p99 of (processing_time - event_time). This measurement drives the watermark lag parameter.
Second, implement watermarking with an appropriate lag. In Flink: WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(5)) for p99 latency of 5 minutes. In Spark Structured Streaming: .withWatermark("event_time", "5 minutes"). Events arriving within 5 minutes of the window close are included; events arriving later are routed to a side output (DLQ or late event table) for monitoring.
Third, handle the edge cases. For ML features specifically: decide whether late events should trigger window recomputation or be dropped. Recomputation (updating the feature retroactively) is correct but expensive and complex. Dropping late events and accepting the small approximation error is simpler and often acceptable. For fraud detection features where accuracy is critical, recomputation with a correction mechanism (emit a "corrected" feature value) is required.
Q2: Explain exactly-once semantics. When do you actually need it?
Exactly-once means each input event is reflected exactly once in the output - no duplicates (from retries), no missing records (from failures). It requires coordination between the source (Kafka), the compute engine (Flink, Spark), and the sink (database, warehouse).
In Kafka, exactly-once for Kafka-to-Kafka pipelines is achieved via the transactional producer API: reads, processing, and writes are committed atomically in a single transaction. If any step fails, the transaction is rolled back and retried from the last committed offset.
For Kafka-to-database sinks, true exactly-once requires the database to support two-phase commit. Flink supports this with its TwoPhaseCommitSinkFunction. Practically: the Flink job writes to the database in a pre-commit phase, checkpoints the offset, then confirms the database transaction in the commit phase. Recovery always rolls back incomplete transactions and re-processes from the last checkpoint.
When do you actually need it? Rarely. For financial transactions (debit/credit), billing systems, and user-visible notifications - duplicates have real consequences. For ML feature pipelines, at-least-once delivery with idempotent sinks (upsert by event_id) is almost always sufficient. The operational complexity of true exactly-once is significant; justify it with a real requirement, not theoretical purity.
Q3: What is the difference between Lambda and Kappa architecture? When would you choose each?
Lambda: two parallel compute layers - batch (accurate, slow) and speed (approximate, fast) - merged in a serving layer. Kappa: unified streaming layer that handles all time ranges by replaying the event log.
Choose Lambda when: (1) batch processing performance for historical data is significantly better than streaming (complex joins over years of data are faster in Spark than in Flink with large state); (2) the batch and speed layers use genuinely different algorithms (e.g., batch uses a full-pass algorithm that streaming can't replicate); (3) the team has separate expertise in batch and streaming and can maintain two codebases.
Choose Kappa when: (1) the processing logic is the same for historical and recent data; (2) the team wants to minimize code duplication and operational complexity; (3) the event store (Kafka with long retention, or a data lake with streaming reads) can support full historical replay at acceptable performance.
The honest answer: most teams at medium scale use something between the two - batch-dominant with streaming for the most recent window. True Kappa is only viable when your streaming engine can reprocess historical data at batch speeds, which requires either very fast streaming (Flink on large clusters) or a log-structured storage system that supports parallel replay.
Q4: What makes a pipeline truly idempotent? Give a concrete example.
A pipeline is idempotent if running it N times with the same input produces the same output as running it once. For a data pipeline, this means: re-running for the same time partition leaves the destination table in exactly the state it would be in after a single successful run - no duplicates, no missing records.
Concrete example: a daily pipeline loads e-commerce orders into a warehouse table. On Monday at midnight, the job runs and loads all orders from Sunday. On Monday at 12:05 AM, the job fails during the write phase and is retried. The idempotent implementation:
-- Step 1: Delete existing data for Sunday's partition
DELETE FROM orders WHERE order_date = '2024-01-07';
-- Step 2: Insert the new data
-- (Both steps in a single transaction)
INSERT INTO orders SELECT * FROM staging_orders_20240107;
Both steps run in a single database transaction. If the insert fails, the delete is rolled back - the partition is untouched. On retry, the delete removes any partial data from the previous failed attempt (if the transaction committed the delete but not the insert - possible with some DB configurations), and the insert starts fresh. Re-running this pipeline 10 times produces exactly the same result as running it once.
Q5: What is a dead letter queue and how would you monitor it in a production pipeline?
A dead letter queue is a storage location for messages that could not be processed after N retries. It exists so that one unprocessable message doesn't halt the entire pipeline. Instead of blocking or discarding, the pipeline routes the message to the DLQ and continues processing subsequent messages.
Monitoring: (1) DLQ depth alert - alert if more than X messages accumulate in the DLQ within a time window. A small number of DLQ messages is normal (occasional malformed events from upstream). A spike indicates a systemic problem (schema change from an upstream service, infrastructure failure causing a class of events to be unprocessable). (2) Error type distribution - aggregate DLQ messages by error_type. If 99% are SchemaValidationError, the upstream service changed its schema. If 50% are DatabaseTimeoutError, the sink database is under-provisioned. (3) Age monitoring - alert if DLQ messages are older than X days without being resolved. DLQ debt that sits unresolved for weeks becomes impossible to triage (the context is lost, the bug may have been fixed but history corrupted). Treat DLQ messages as production bugs with an SLA for resolution.
Q6: When would you choose ELT over ETL? Are there cases where ETL is still the right answer?
ELT is the default for modern cloud-native data platforms. Load raw data to the warehouse (cheap storage), transform using warehouse compute (elastic, SQL-based). Advantages: raw data always available for reprocessing; transformation logic is version-controlled SQL; warehouse compute is elastic and cheap; no external transformation engine to maintain.
ETL is still appropriate when: (1) compliance requires data to never reach the warehouse in raw form (PII must be masked before storage). An ETL process masks at extraction time. (2) Transformation requires per-record enrichment from external APIs (enrich each user record with a third-party credit score). This is an API call per record - not SQL-expressible, must happen before or after loading. (3) The source data volume is extremely large and the transformation dramatically reduces it (10TB of raw IoT sensor data → 10GB of aggregated readings). Pre-aggregating before loading avoids storing 10TB you never query directly. (4) The destination is not a warehouse (it's an application database or a real-time API). ELT assumes a powerful warehouse as the destination; if the destination is a Postgres operational database, ELT is inappropriate.
