:::tip 🎮 Interactive Playground Visualize this concept: Try the Feature Store Architecture demo on the EngineersOfAI Playground - no code required. :::
Feature Engineering Pipelines
"The most expensive computation is the one you repeat unnecessarily. The second most expensive is the one you design incorrectly and have to rewrite at scale."
The 4-Hour Problem
The feature was straightforward: user_purchase_count_30d, the number of purchases a user made in the past 30 days. The model needed it refreshed every 30 minutes. The Spark job took 4 hours.
The job was not inefficient by Spark standards. It read 90 days of raw purchase events from S3 - approximately 2.4 billion rows - applied a window function partitioned by user_id, and computed the rolling 30-day count for every user. The result was 40 million rows, one per active user, written back to S3 as Parquet. Spark did its job correctly. The problem was the design.
The team spent two weeks trying to optimize the Spark job. They tuned the number of executors, adjusted the shuffle partition count from 200 to 2000, enabled adaptive query execution, and switched from row-based to columnar execution. They reduced runtime to 2.5 hours. Still too slow. The 30-minute requirement was not negotiable - the fraud model needed near-current purchase velocity to catch bust-out fraud.
The actual fix required stepping back from Spark entirely and rethinking the feature's computational structure. A rolling 30-day count does not require reading 30 days of history on every run. If you maintain a daily aggregate - the number of purchases per user per day - then the 30-day rolling count is the sum of the last 30 daily aggregates. Each daily aggregate is computed once, from one day's events, and stored. Each subsequent run reads only new events from the current day and updates only that day's aggregate. The full rolling window recomputes in O(30) additions per user rather than O(30-days-of-events) per user.
Runtime after redesign: 12 seconds.
This lesson is about designing feature pipelines correctly the first time, and recognizing which pattern to apply to which problem. The wrong architecture is not just slow - it is often architecturally incompatible with the freshness requirements the business actually needs.
Why Feature Pipelines Exist as a Separate Engineering Domain
In the early days of ML at most companies, feature computation was embedded directly in model training code. A data scientist would write a Pandas script that read raw events, computed window aggregations, merged several tables, and output a training DataFrame. That script was the "pipeline." It ran once, produced a CSV, and the model trained on that CSV.
The problems emerged when the model went to production. The Pandas script ran on a laptop. The production environment had 50 million users. The script took 18 hours on a laptop; it ran out of memory on a cloud VM. The data scientist rewrote it in PySpark. But the Spark version handled null timestamps differently than the Pandas version. The production model received slightly different feature values than the training model had seen. Predictions diverged from validation set performance. Nobody knew why until someone spent a week auditing the two implementations line by line.
Feature pipeline engineering exists to solve this class of problem: how do you compute features that are:
- Correct - consistent between training and serving, with no leakage
- Fresh - updated at the frequency the model requires
- Efficient - computing only what needs to be computed, at the granularity that matters
- Reliable - monitored, retryable, and observable when they fail
None of these properties come for free. Each one requires intentional design decisions.
The Feature Pipeline Design Space
There are four fundamental patterns for feature computation. Every feature you build belongs to exactly one of them. Choosing the wrong pattern does not just affect performance - it determines whether the freshness requirement is even achievable.
Pattern 1: Batch Full Refresh
Recompute the entire feature from scratch on every run. Read all relevant history, apply transformations, write the complete output.
When it works: Features over small datasets, features with no time dependency, features that require joining multiple data sources where the join itself is expensive to make incremental.
When it breaks: When the history required for the feature grows with time (a 90-day window over a dataset that adds 10M rows per day), when the computation time exceeds the required refresh interval, when recompute costs become prohibitive at scale.
# Full refresh pattern - simple but scales poorly
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum as spark_sum, window
spark = SparkSession.builder.appName("user_stats_full_refresh").getOrCreate()
# Read ALL history - this is the expensive part
events_df = spark.read.parquet("s3://your-bucket/raw/purchase_events/")
# Compute 30-day window for every user
user_stats = (
events_df
.filter(col("event_type") == "PURCHASE")
.groupBy(
"user_id",
window("event_timestamp", "30 days", "1 day")
)
.agg(
count("*").alias("purchase_count_30d"),
spark_sum("amount").alias("total_spend_30d"),
)
.select(
"user_id",
col("window.end").alias("feature_timestamp"),
"purchase_count_30d",
"total_spend_30d",
)
)
user_stats.write.mode("overwrite").parquet("s3://your-bucket/features/user_stats/")
The cost of this pattern scales as where is the history length required for the window and is the number of rows. For a 30-day window over a dataset with 10M events per day, every run processes rows regardless of how many new events arrived since the last run.
Pattern 2: Batch Incremental
Compute only the new delta since the last run. Merge with existing state. The definition of "delta" and the merge strategy depend on the feature semantics.
When it works: Most production batch features, especially window aggregations where intermediate state can be efficiently maintained.
When it breaks: Features that require global sorts across all history, features whose recompute is truly all-or-nothing (rare).
Watermark-based incremental
import json
import boto3
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, spark_sum
spark = SparkSession.builder.appName("user_stats_incremental").getOrCreate()
# 1. Load the last successful run timestamp from a checkpoint
def get_last_run_timestamp() -> datetime:
s3 = boto3.client("s3")
try:
obj = s3.get_object(
Bucket="your-bucket",
Key="checkpoints/user_stats_last_run.json",
)
data = json.loads(obj["Body"].read())
return datetime.fromisoformat(data["last_run_timestamp"])
except s3.exceptions.NoSuchKey:
return datetime(2024, 1, 1) # Cold start: process from beginning
def save_last_run_timestamp(ts: datetime):
s3 = boto3.client("s3")
s3.put_object(
Bucket="your-bucket",
Key="checkpoints/user_stats_last_run.json",
Body=json.dumps({"last_run_timestamp": ts.isoformat()}),
)
last_run = get_last_run_timestamp()
current_run = datetime.utcnow()
# 2. Read only NEW events since last run (predicate pushdown on Parquet partitions)
new_events = (
spark.read.parquet("s3://your-bucket/raw/purchase_events/")
.filter(
(col("event_timestamp") > last_run) &
(col("event_timestamp") <= current_run) &
(col("event_type") == "PURCHASE")
)
)
# 3. Compute daily aggregates from new events only
new_daily_aggs = (
new_events
.withColumn("event_date", col("event_timestamp").cast("date"))
.groupBy("user_id", "event_date")
.agg(
count("*").alias("daily_purchase_count"),
spark_sum("amount").alias("daily_spend"),
)
)
# 4. Merge with existing daily aggregates
existing_daily_aggs = spark.read.parquet(
"s3://your-bucket/intermediate/user_daily_aggs/"
)
merged_daily_aggs = (
existing_daily_aggs.unionByName(new_daily_aggs)
.groupBy("user_id", "event_date")
.agg(
spark_sum("daily_purchase_count").alias("daily_purchase_count"),
spark_sum("daily_spend").alias("daily_spend"),
)
)
merged_daily_aggs.write.mode("overwrite").parquet(
"s3://your-bucket/intermediate/user_daily_aggs/"
)
# 5. Compute rolling 30-day window from daily aggregates (cheap!)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
window_spec = (
Window
.partitionBy("user_id")
.orderBy("event_date")
.rowsBetween(-29, 0) # 30 rows = 30 days of daily aggregates
)
user_stats = (
merged_daily_aggs
.withColumn("purchase_count_30d", F.sum("daily_purchase_count").over(window_spec))
.withColumn("total_spend_30d", F.sum("daily_spend").over(window_spec))
.groupBy("user_id")
.agg(
F.last("purchase_count_30d").alias("purchase_count_30d"),
F.last("total_spend_30d").alias("total_spend_30d"),
)
)
user_stats.write.mode("overwrite").parquet("s3://your-bucket/features/user_stats/")
# 6. Save checkpoint
save_last_run_timestamp(current_run)
The cost of this pattern is where is the number of new events since the last run and is the number of users with activity in the last 30 days. For hourly incremental runs, is typically 1/24th of the daily event volume - a 24x reduction in data read per run.
Delta Lake MERGE pattern
When your feature table lives in Delta Lake, you can use the MERGE INTO statement for upsert semantics - update existing rows for known users, insert rows for new users.
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Compute new feature values for users who had activity in the last hour
new_values = spark.sql("""
SELECT
user_id,
current_timestamp() as feature_timestamp,
COUNT(*) as purchase_count_30d,
SUM(amount) as total_spend_30d
FROM purchase_events
WHERE event_timestamp >= current_timestamp() - INTERVAL 30 DAYS
AND event_timestamp >= current_timestamp() - INTERVAL 1 HOUR
GROUP BY user_id
""")
# MERGE: update existing users, insert new users
delta_table = DeltaTable.forPath(spark, "s3://your-bucket/delta/user_stats/")
(
delta_table.alias("existing")
.merge(
new_values.alias("updates"),
"existing.user_id = updates.user_id",
)
.whenMatchedUpdate(set={
"feature_timestamp": "updates.feature_timestamp",
"purchase_count_30d": "updates.purchase_count_30d",
"total_spend_30d": "updates.total_spend_30d",
})
.whenNotMatchedInsert(values={
"user_id": "updates.user_id",
"feature_timestamp": "updates.feature_timestamp",
"purchase_count_30d": "updates.purchase_count_30d",
"total_spend_30d": "updates.total_spend_30d",
})
.execute()
)
Pattern 3: Streaming
Update the feature value as each event arrives. The online store reflects the event within seconds. No batch job, no materialization schedule - the feature is always current.
When it works: Features that must reflect events in the last 60 seconds (fraud velocity, real-time inventory, session-level features). When the aggregation is naturally amenable to incremental computation (counts, sums, averages - not medians or percentiles without approximations).
When it breaks: Features that require complex multi-source joins, features over very large historical windows (a true 90-day window is expensive to maintain in streaming state), when the data source is not a real-time stream (batch files updated hourly cannot be streamed).
Pattern 4: On-Demand
Compute the feature value at request time using data provided in the request or retrieved from other pre-computed features. No batch job, no stream, no storage.
When it works: Features that depend on request-time context (current user location, current transaction amount), lightweight transformations on pre-retrieved features (ratios, differences), very high-cardinality features where pre-computing all combinations is impractical.
When it breaks: When the computation takes more than 1–2 milliseconds (adds directly to serving latency), when it requires external API calls (latency unpredictable), when it requires data that is not available in the request context.
Streaming Feature Pipelines with Apache Flink
For features that must reflect real-time events, Apache Flink is the production standard. Flink's strength is stateful stream processing: it maintains per-entity state in a fault-tolerant state backend (RocksDB) and supports exactly-once processing semantics.
Time Windows in Flink
Flink supports three window types that map directly to feature semantics:
- Tumbling windows: non-overlapping fixed-size windows.
purchases_in_last_hourwhere "last hour" means the calendar hour (00:00–01:00, 01:00–02:00). Use for discrete time buckets. - Sliding windows: overlapping windows that slide at a fixed interval.
purchases_in_last_60_minutescomputed every 10 minutes. The window moves, not the bucket boundary. - Session windows: variable-length windows bounded by inactivity. A session ends after 30 minutes of no events. Use for session-level features.
# Flink PyFlink streaming feature pipeline
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import ProcessWindowFunction
from pyflink.common.watermark_strategy import WatermarkStrategy, Duration
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(8)
# --- Define the ProcessWindowFunction ---
class PurchaseWindowAggregator(ProcessWindowFunction):
"""Aggregate purchase events in a sliding window per user."""
def process(self, key, context, elements):
purchase_count = 0
total_spend = 0.0
for event in elements:
purchase_count += 1
total_spend += event["amount"]
window_end = context.window().end()
yield {
"user_id": key,
"window_end_ms": window_end,
"purchase_count": purchase_count,
"total_spend": total_spend,
}
# --- Configure Kafka source ---
kafka_properties = {
"bootstrap.servers": "kafka-broker:9092",
"group.id": "flink-feature-pipeline",
"auto.offset.reset": "earliest",
}
kafka_source = FlinkKafkaConsumer(
topics="purchase_events",
deserialization_schema=..., # JSON deserialization schema
properties=kafka_properties,
)
# --- Build the pipeline ---
purchase_stream = (
env.add_source(kafka_source)
.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(10))
.with_timestamp_assigner(lambda event, _: event["event_timestamp_ms"])
)
.filter(lambda e: e["event_type"] == "PURCHASE")
.key_by(lambda e: e["user_id"])
.window(
SlidingEventTimeWindows.of(
Time.minutes(60), # window size: 60 minutes
Time.minutes(5), # slide interval: every 5 minutes
)
)
.process(PurchaseWindowAggregator())
)
# --- Write to Redis online store ---
purchase_stream.add_sink(redis_sink) # custom Redis sink
# --- Write to S3 offline store (for training consistency) ---
purchase_stream.add_sink(s3_parquet_sink) # custom Parquet sink
env.execute("user_purchase_feature_pipeline")
Exactly-Once Semantics in Flink
Exactly-once guarantees are critical for feature correctness. Without them, a restarted Flink job may replay events, inflating counts. Or it may skip events during a partition rebalance, undercounting.
Flink achieves exactly-once through checkpointing with two-phase commit. Flink periodically snapshots all operator state to a durable store (S3 or HDFS). If the job fails, it restarts from the last successful checkpoint, replaying events from that point. For sinks (Redis, S3), Flink coordinates a two-phase commit: events are staged but not visible until the checkpoint completes successfully. This ensures the sink and the source offset are always consistent.
# Enable exactly-once checkpointing
from pyflink.datastream import CheckpointingMode
env.enable_checkpointing(60_000) # checkpoint every 60 seconds
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)
env.get_checkpoint_config().set_checkpoint_timeout(120_000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
Exactly-once requires that your Kafka source supports offset management (standard in Kafka) and that your sink supports idempotent writes or two-phase commit. Redis SET operations are idempotent by design, making it a natural fit. For S3/Parquet, Flink uses the FileSink with staged writes.
Feature Transformation Best Practices
Parameterized Transformations
When a feature makes sense at multiple time windows, write the transformation once as a function that accepts the window size as a parameter. This avoids maintaining multiple nearly-identical functions.
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from typing import List
def compute_purchase_aggregates(
events_df: DataFrame,
user_id_col: str,
timestamp_col: str,
amount_col: str,
window_days: List[int],
) -> DataFrame:
"""
Compute rolling purchase count and spend for multiple window sizes.
Returns a DataFrame with columns:
user_id,
purchase_count_{N}d,
total_spend_{N}d
for each N in window_days.
"""
# Compute daily aggregates once
daily_aggs = (
events_df
.withColumn("event_date", F.col(timestamp_col).cast("date"))
.groupBy(user_id_col, "event_date")
.agg(
F.count("*").alias("daily_count"),
F.sum(amount_col).alias("daily_spend"),
)
)
result_df = daily_aggs.select(user_id_col).distinct()
# Apply each window size
for n_days in window_days:
window_spec = (
Window
.partitionBy(user_id_col)
.orderBy("event_date")
.rowsBetween(-(n_days - 1), 0)
)
windowed = (
daily_aggs
.withColumn(f"purchase_count_{n_days}d", F.sum("daily_count").over(window_spec))
.withColumn(f"total_spend_{n_days}d", F.sum("daily_spend").over(window_spec))
.groupBy(user_id_col)
.agg(
F.last(f"purchase_count_{n_days}d").alias(f"purchase_count_{n_days}d"),
F.last(f"total_spend_{n_days}d").alias(f"total_spend_{n_days}d"),
)
)
result_df = result_df.join(windowed, on=user_id_col, how="left")
return result_df
# Usage: compute 7-day, 30-day, and 90-day windows with one function
user_features = compute_purchase_aggregates(
events_df=purchase_events,
user_id_col="user_id",
timestamp_col="event_timestamp",
amount_col="purchase_amount",
window_days=[7, 30, 90],
)
# Output columns: user_id, purchase_count_7d, total_spend_7d,
# purchase_count_30d, total_spend_30d,
# purchase_count_90d, total_spend_90d
Schema Evolution Without Breaking Consumers
When adding a new feature to an existing feature view, use a backward-compatible migration strategy:
- Add the new column to the transformation code with a default value
- Run the transformation and write to the feature table
- Update the feature view schema definition in the feature store (
feast applyor equivalent) - Do not remove old columns until all consumers have migrated (minimum: 2 weeks)
# Schema-safe feature table write with Spark
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
# Add new column with a safe default if it does not exist in the source data
new_feature_df = (
existing_features
.withColumn(
"purchase_count_90d",
# Default to 0 if not yet computed (graceful cold start)
F.coalesce(F.col("purchase_count_90d"), F.lit(0).cast(LongType()))
)
)
Testing Feature Pipelines
Feature pipelines are difficult to test because they process time-series data with complex semantics. Three levels of testing are required.
Unit Tests: Test Transformation Logic in Isolation
Transformation functions should be pure: given an input DataFrame, they return an output DataFrame without side effects. Pure functions are straightforward to unit test.
import pytest
from pyspark.sql import SparkSession
from datetime import date
from features.user_stats import compute_purchase_aggregates
@pytest.fixture(scope="module")
def spark():
return SparkSession.builder.master("local[2]").appName("test").getOrCreate()
def test_purchase_count_7d(spark):
"""Rolling 7-day count sums only events within the window."""
events = spark.createDataFrame([
# user 1: 3 purchases in last 7 days, 1 purchase 8 days ago (outside window)
("user_1", date(2024, 11, 1), 50.0), # 8 days ago - outside window
("user_1", date(2024, 11, 2), 30.0), # 7 days ago - boundary (included)
("user_1", date(2024, 11, 5), 20.0),
("user_1", date(2024, 11, 8), 40.0), # today
# user 2: 1 purchase in last 7 days
("user_2", date(2024, 11, 8), 100.0),
], schema=["user_id", "event_date", "amount"])
result = compute_purchase_aggregates(
events_df=events,
user_id_col="user_id",
timestamp_col="event_date",
amount_col="amount",
window_days=[7],
).collect()
result_dict = {row["user_id"]: row for row in result}
assert result_dict["user_1"]["purchase_count_7d"] == 3
assert result_dict["user_2"]["purchase_count_7d"] == 1
def test_null_handling(spark):
"""Users with no events in the window period return 0, not null."""
events = spark.createDataFrame([
("user_1", date(2024, 10, 1), 50.0), # 30+ days ago, outside window
], schema=["user_id", "event_date", "amount"])
result = compute_purchase_aggregates(
events_df=events,
user_id_col="user_id",
timestamp_col="event_date",
amount_col="amount",
window_days=[7],
).collect()
result_dict = {row["user_id"]: row for row in result}
# Expect 0, not null - the model cannot handle null inputs
assert result_dict["user_1"]["purchase_count_7d"] == 0
Integration Tests: Verify Full Pipeline Behavior
Integration tests run the complete pipeline against synthetic data and verify output schema, data types, and aggregate values.
def test_pipeline_schema(spark):
"""Output schema matches expected feature store contract."""
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
expected_schema = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("purchase_count_7d", LongType(), nullable=True),
StructField("total_spend_7d", DoubleType(), nullable=True),
StructField("purchase_count_30d", LongType(), nullable=True),
StructField("total_spend_30d", DoubleType(), nullable=True),
])
result = compute_purchase_aggregates(...)
assert result.schema == expected_schema
def test_no_duplicate_user_ids(spark):
"""Each user_id appears exactly once in the output."""
result = compute_purchase_aggregates(...)
total_rows = result.count()
distinct_users = result.select("user_id").distinct().count()
assert total_rows == distinct_users, "Duplicate user_ids detected in output"
def test_no_negative_counts(spark):
"""Purchase counts must be non-negative."""
result = compute_purchase_aggregates(...)
negative_count = result.filter(
(result["purchase_count_7d"] < 0) |
(result["purchase_count_30d"] < 0)
).count()
assert negative_count == 0
Backtesting: Validate Against Known-Good Reference
For critical features, generate a reference output by running the simplest correct implementation (even if slow) on a bounded historical slice, then compare against the optimized incremental implementation.
def backtest_incremental_vs_full_refresh(
spark,
events_df,
test_date: str,
sample_users: list,
):
"""
Verify that the incremental implementation matches the full-refresh result
for a set of sampled users on a specific test date.
"""
test_events = events_df.filter(f"event_date <= '{test_date}'")
# Reference: full refresh (slow but guaranteed correct)
reference = full_refresh_aggregation(test_events).filter(
spark.createDataFrame([(u,) for u in sample_users], ["user_id"])
)
# Optimized: incremental implementation
optimized = incremental_aggregation(test_events).filter(
spark.createDataFrame([(u,) for u in sample_users], ["user_id"])
)
# Join and compare
comparison = reference.join(
optimized,
on="user_id",
how="outer"
suffix=["_ref", "_opt"],
)
mismatches = comparison.filter(
(comparison["purchase_count_7d_ref"] != comparison["purchase_count_7d_opt"]) |
comparison["purchase_count_7d_ref"].isNull() |
comparison["purchase_count_7d_opt"].isNull()
)
mismatch_count = mismatches.count()
assert mismatch_count == 0, (
f"Backtest failed: {mismatch_count} mismatches between "
f"full refresh and incremental implementation"
)
Feature Pipeline Architecture: Pattern Comparison
Performance Patterns
Partition Pruning
Partition pruning is the single highest-impact optimization for batch feature pipelines reading Parquet from S3. Organize your raw event data by event_date (or event_year/event_month/event_day for Hive-style partitioning). When your pipeline filters by date range, Spark reads only the relevant Parquet files rather than scanning the entire dataset.
# Without partition pruning: scans ALL partitions (expensive)
events_df = spark.read.parquet("s3://your-bucket/raw/events/")
recent_events = events_df.filter("event_timestamp >= current_date() - 7")
# With partition pruning: scans only the last 7 day-partitions (cheap)
# Requires data to be partitioned by date column on disk
recent_events = (
spark.read.parquet("s3://your-bucket/raw/events/")
.filter("event_date >= current_date() - 7") # partition column, not timestamp
)
Broadcast Joins for Lookup Tables
When joining feature data against a small lookup table (e.g., a product category hierarchy, a user segment table with fewer than 10M rows), broadcast the small table to all executors to avoid an expensive shuffle.
from pyspark.sql.functions import broadcast
# Small lookup table (e.g., 500K rows of category metadata)
category_metadata = spark.read.parquet("s3://your-bucket/dim/categories/")
# Broadcast to all executors - eliminates the shuffle
user_features_with_category = (
user_features
.join(broadcast(category_metadata), on="category_id", how="left")
)
Spark vs. Flink for Different Workloads
| Workload | Spark | Flink |
|---|---|---|
| Historical backfill (one-time, large) | Excellent | Overkill |
| Hourly batch aggregations | Excellent | Adequate |
| Sub-minute streaming aggregations | Limited | Excellent |
| Complex event pattern detection | Limited | Excellent (CEP library) |
| ML training data generation | Excellent | Poor |
| Exactly-once streaming guarantees | Limited | Native |
The practical rule: use Spark for batch features and Flink for streaming features. Avoid mixing them in the same feature's pipeline - the operational complexity of maintaining two compute frameworks for one feature is rarely justified.
Anti-pattern: Full refresh at scale. A 90-day window over raw events computed on every hourly run is an architectural error, not a tuning problem. Once the history required for a feature exceeds what can be scanned in the available time window, no amount of Spark tuning will fix it. The solution is always to redesign the feature's computational structure: maintain daily aggregates, use rolling accumulators, or decompose the window into a hierarchy (daily aggregates → weekly aggregates → monthly aggregates). Recognize this pattern early - it appears innocent at 10M events per day and becomes catastrophically expensive at 1B events per day.
Anti-pattern: Streaming everything. Engineering teams that discover streaming feature pipelines sometimes over-apply the pattern. A feature whose underlying data changes once per day (a user's city, a product's category, a merchant's risk tier) does not benefit from a streaming pipeline. The streaming infrastructure adds operational complexity, increases failure surface area, and provides no freshness improvement over an hourly batch job. Apply streaming computation only when the business actually requires sub-minute feature freshness. For most features, hourly batch incremental is the right answer.
Warning: Ignoring late-arriving events. Real event streams have late arrivals - events that occurred at 10:00am but arrive at the processing system at 10:15am due to mobile offline buffers, retry queues, or CDN delays. If your streaming pipeline does not account for this, purchase events that arrive late are excluded from the window they belong to and counted in the wrong window. Flink's event-time processing with configurable watermarks handles this: events up to N seconds late are still attributed to the correct window. Set your watermark delay based on the observed 95th percentile arrival latency of your event source.
Warning: Testing with production-scale data only. Feature pipelines that pass a lightweight unit test can still fail at production scale due to data skew. A single user_id that appears in 50 million events will cause one Spark executor to process 50 million rows while others process thousands - a classic data skew problem that does not appear in test data. Add a skew detection step: log the top 10 user_ids by event count before aggregation, and alert if any single key exceeds 1% of total events.
Interview Q&A
Q1: A feature takes 4 hours to compute but needs to be refreshed every 30 minutes. What do you do?
The answer is to redesign the feature's computational structure, not to optimize the current implementation. A feature that takes 4 hours on Spark is almost certainly doing a full history scan for each run. The fix is to identify what intermediate state can be incrementally maintained. For window aggregations (counts, sums, averages), the answer is always to precompute daily (or hourly) aggregates once and roll them up. A 30-day rolling count becomes the sum of 30 daily counts - computed from a Delta table of daily aggregates that is updated incrementally in seconds. The design question is: what is the smallest unit of intermediate state that makes incremental computation feasible? That is always the right question before touching executor configuration.
Q2: What is the difference between event time and processing time in streaming feature pipelines, and why does it matter?
Event time is when the event actually occurred (embedded in the event payload as a timestamp). Processing time is when the event arrived at and was processed by the streaming system. For feature correctness, you must use event time. Consider a user who made a purchase at 11:55pm on November 30 but the event arrived at 12:05am on December 1 due to network delay. If you use processing time, that purchase is attributed to December 1 and excluded from the "November purchase count" window. If you use event time with a 30-minute watermark, the event is correctly attributed to November 30. Training-serving skew is the consequence of getting this wrong: during training, you used historical event timestamps correctly, but in production, the streaming pipeline used processing time. The model was trained on features that the production pipeline cannot produce.
Q3: How do you design a feature pipeline that is consistent between training and serving?
The fundamental principle is: training and serving must execute identical transformation code on identical data. In practice this means: (1) For batch features, the feature store (Feast, Tecton) materializes from the same offline store that is used for get_historical_features(). There is no separate code path. (2) For on-demand features, the transformation function is deployed as a Python module shared between the feature store definition and the offline retrieval code. Any change to the transformation requires re-generating training data. (3) For streaming features, the Flink job writes to both the online store (Redis) and the offline store (S3 Parquet) in the same execution graph. Training retrieval reads from the same Parquet files that served as the online store's source. (4) Never compute features in-line in the training notebook that differ from how they are computed in the serving pipeline. If you must, the offline-computed version must be byte-for-byte equivalent.
Q4: What is a watermark in Flink and how do you choose the watermark delay?
A watermark is a signal to Flink's windowing operator that all events up to a certain timestamp have been received. When a watermark with timestamp T arrives, Flink can safely close and emit all windows with an end time less than or equal to T. The watermark delay (also called the allowed lateness) is the maximum time Flink waits for late-arriving events before closing a window. Choosing the right delay requires measuring the distribution of arrival latency in your event stream. In practice: instrument your Kafka consumer to compute processing_time - event_time for every event, take the 95th or 99th percentile of that distribution, and set your watermark delay to that value. A delay of 10–30 seconds handles most mobile event sources. If you set the delay too low, you drop late events and undercount. If you set it too high, you increase window output latency by the delay amount.
Q5: How do you test that an incremental feature pipeline produces the same results as a full-refresh pipeline?
This is a backtesting problem. The approach: (1) Choose a bounded historical window (e.g., one month of data) and a set of sample entity keys (e.g., 1,000 randomly sampled users). (2) Run the full-refresh pipeline on that data - this is your ground truth because full refresh is simple and obviously correct. (3) Run the incremental pipeline on the same data, simulating day-by-day incremental runs starting from cold start. (4) Compare the final output for the sampled entity keys. Any divergence indicates a bug in the incremental logic - typically boundary condition bugs (off-by-one in the window, inclusive vs. exclusive range checks) or merge logic bugs (upsert not handling null replacement correctly). Run this backtest as part of CI on every change to the pipeline code.
Q6: When should you use Spark versus Flink for a feature pipeline?
Use Spark for batch features - any feature refreshed on a schedule of minutes to hours, any feature that requires joining multiple large datasets, any feature generated during ML training dataset creation. Spark's optimizer, adaptive query execution, and broad library ecosystem (Delta Lake, MLlib) make it the right tool for bounded datasets with rich transformations. Use Flink for streaming features - any feature that must reflect events within seconds, any feature using complex windowed aggregations over a real-time event stream, any feature that requires stateful per-entity computation with exactly-once guarantees. Flink's event-time processing, native watermark support, and RocksDB-backed state management are specifically designed for these workloads. The operational cost of running both is real but usually justified: use Spark materialization for 80% of features and Flink streaming for the 20% that genuinely require sub-minute freshness.
Q7: A feature was computed correctly during training but is wrong at serving time. What are the most common causes?
In order of frequency: (1) Different transformation code paths - the training pipeline runs a Spark SQL job while the serving pipeline uses a Python function with slightly different semantics (different null handling, different type casting, different edge case behavior). (2) Data source differences - training reads from a historical snapshot while serving reads from a live table that has been modified since the snapshot was taken. (3) Timestamp handling - training uses event time while serving uses processing time, or different timezone assumptions. (4) Window boundary definitions - training computed "last 7 days" as strictly less than 7 days ago while serving uses "7 calendar days back" - the 1-event-at-the-boundary difference compounds for users with exactly-boundary events. (5) Missing features defaulting differently - training drops rows with null features while serving defaults to 0, and the model never saw 0-defaulted rows during training. (6) Entity key format inconsistency - training used integer user IDs while serving serializes them as strings, causing cache misses in the online store.
Feature Pipeline Orchestration Patterns
Dependency Management Between Feature Pipelines
Features are not independent. A feature view computing user_segment may depend on the output of another pipeline computing user_purchase_count_30d. If the upstream pipeline fails or produces stale output, the downstream feature will be wrong.
Model this as a directed acyclic graph (DAG) in your orchestrator. In Airflow:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
with DAG(
dag_id="user_segment_features",
start_date=datetime(2024, 1, 1),
schedule_interval="0 * * * *",
catchup=False,
) as dag:
# Wait for the upstream purchase stats pipeline to complete this hour
wait_for_purchase_stats = ExternalTaskSensor(
task_id="wait_for_purchase_stats",
external_dag_id="user_purchase_stats",
external_task_id="validate_output",
timeout=1800, # fail after 30 minutes of waiting
mode="reschedule",
)
compute_user_segments = PythonOperator(
task_id="compute_user_segments",
python_callable=run_user_segment_pipeline,
)
materialize_to_feature_store = PythonOperator(
task_id="materialize_to_feature_store",
python_callable=feast_materialize_user_segments,
)
wait_for_purchase_stats >> compute_user_segments >> materialize_to_feature_store
The ExternalTaskSensor blocks the downstream pipeline until the upstream pipeline's validation task has completed successfully in the same time window. If the upstream pipeline is delayed, the downstream pipeline waits up to 30 minutes before failing and alerting. This prevents a downstream pipeline from reading stale upstream data silently.
Handling Cold Start for New Entities
New users, new items, and new merchants do not have any feature history. Every feature pipeline must explicitly handle this case, and the handling strategy must be consistent between training and serving.
Three cold start strategies:
Strategy 1: Return null, let the model handle it. The model is trained on data that includes null features for new users (sampled from the historical period when those users were also new). Use this when the model's framework handles nulls natively (e.g., LightGBM, XGBoost can process NaN values).
Strategy 2: Return a global default. Compute population-level statistics (median purchase count, mean spend) and use them as the default for new entities. Easy to implement, but the default is a static snapshot and may become stale.
Strategy 3: Return tier-specific defaults. Group new entities by observable attributes available at request time (e.g., new users who installed from a paid ad vs. organic) and return different defaults per group. More accurate, more complex.
def get_features_with_cold_start_handling(
user_id: str,
user_created_at: datetime,
population_defaults: dict,
) -> dict:
"""Handle cold start for users with insufficient history."""
features = store.get_online_features(
features=["user_stats:purchase_count_7d", "user_stats:total_spend_7d"],
entity_rows=[{"user_id": user_id}],
).to_dict()
user_age_hours = (datetime.utcnow() - user_created_at).total_seconds() / 3600
# Users less than 1 hour old have no meaningful purchase history
if user_age_hours < 1:
return {
"purchase_count_7d": 0,
"total_spend_7d": 0.0,
"is_cold_start": True,
}
# Fill nulls with population defaults for users with some history
result = {}
for key, value in features.items():
if value[0] is None:
result[key] = population_defaults.get(key, 0)
else:
result[key] = value[0]
result["is_cold_start"] = False
return result
Always include a boolean is_cold_start feature (or equivalently, a feature representing user account age in hours) in your training data. This allows the model to learn different behavior for new vs. established users rather than treating null-filled defaults as regular observations.
Feature Pipeline Cost Optimization
At scale, feature pipeline compute is a significant cost center. Three optimizations that consistently deliver 40–70% cost reduction:
1. Spot Instance Materialization
Batch materialization jobs are highly amenable to spot/preemptible instances - they are stateless (checkpoint-based restart works), have flexible timing (a 15-minute delay is usually acceptable), and run for bounded durations. Configuring Spark materialization jobs on spot instances typically reduces compute cost by 60–70%.
# Airflow EMR operator with spot instance configuration
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator
emr_config = {
"Name": "feast-materialization",
"ReleaseLabel": "emr-6.15.0",
"Instances": {
"MasterInstanceType": "m5.xlarge",
"SlaveInstanceType": "m5.2xlarge",
"InstanceCount": 5,
"KeepJobFlowAliveWhenNoSteps": False,
"Ec2KeyName": "your-key",
# Use spot for core/task instances
"InstanceFleets": [
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 8,
"InstanceTypeConfigs": [
{"InstanceType": "m5.2xlarge"},
{"InstanceType": "m5a.2xlarge"}, # fallback
],
"LaunchSpecifications": {
"SpotSpecification": {
"TimeoutDurationMinutes": 10,
"TimeoutAction": "SWITCH_TO_ON_DEMAND",
}
},
}
],
},
}
2. Partition-Aligned Scheduling
If your feature pipelines process Parquet files partitioned by event_date, schedule materialization to run after the day's partition is complete - not in the middle of it. Partial partitions cause Spark to scan an incomplete file and may miss events that arrive after the scan starts. Aligning the schedule to the partition boundary is a correctness optimization as much as a cost optimization.
3. Feature Pipeline Deduplication
As organizations grow, feature teams independently compute similar features. A "30-day purchase count" may be independently computed by the recommendations team, the fraud team, and the churn team. Auditing for duplicate feature computation and consolidating to a single shared pipeline typically reveals 20–40% of feature compute is redundant. The feature store registry makes this audit tractable - query all feature views with similar names or tag attributes and compare transformation logic.
# Audit feature registry for potential duplicates
from feast import FeatureStore
from difflib import SequenceMatcher
store = FeatureStore(repo_path=".")
registry = store.registry
all_features = []
for fv in registry.list_feature_views(project=store.project):
for field in fv.schema:
all_features.append({
"view": fv.name,
"feature": field.name,
})
# Find features with similar names across different views
for i, feat_a in enumerate(all_features):
for feat_b in all_features[i+1:]:
if feat_a["view"] == feat_b["view"]:
continue
similarity = SequenceMatcher(
None,
feat_a["feature"],
feat_b["feature"]
).ratio()
if similarity > 0.8:
print(
f"Potential duplicate: "
f"{feat_a['view']}.{feat_a['feature']} "
f"<-> {feat_b['view']}.{feat_b['feature']} "
f"(similarity: {similarity:.0%})"
)
