Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Feature Store Architecture demo on the EngineersOfAI Playground - no code required. :::

Real-Time Feature Computation for ML Inference

Reading time: ~28 minutes | Interview relevance: Very High | Target roles: ML Engineer, Data Engineer, AI Platform Engineer, MLOps Engineer


The 50-Millisecond Problem

It is 7:34 PM on a Thursday. A customer opens DoorDash in Chicago, selects a restaurant, and taps "Place Order." In the next 50 milliseconds, DoorDash's model must estimate: how long will this delivery take? The model needs to know the current driver availability within a 2-mile radius of the restaurant, the restaurant's average prep time based on recent orders (not last night's - right now, during the Thursday dinner rush), the real-time weather conditions in that neighborhood, the historical delivery duration for this specific route at this time of day and this day of the week, and a dozen more features.

Some of these features change slowly - historical route duration by day-of-week can be computed nightly. Others change every few minutes - restaurant prep time based on recent order completions. Others change every few seconds - driver availability as drivers complete and accept orders. Getting the model wrong costs money: under-estimate the delivery time and customers churn when it arrives late; over-estimate and customers order from a competitor that promises 10 minutes less.

This is the canonical real-time feature engineering problem, and every company running a recommendation, risk, pricing, or logistics model eventually arrives here. The question is not whether you need real-time features - if your business moves faster than your batch pipeline, you do. The question is how you build a system that makes the right features available at the right freshness level, at sub-millisecond read latency, while keeping the offline training data perfectly synchronized with what the model actually saw during serving.

The naive solution fails in three different ways. You can compute features in your application layer and store them in memory - but they do not survive restarts and do not scale across replicas. You can run your batch pipeline more frequently - but "every 5 minutes" is not the same as "every 5 seconds," and batch jobs have scheduling overhead that makes sub-minute freshness impractical. Or you can write to a shared cache in your application code at the time of the event - but now you have embedded feature computation in your application, which breaks when the computation logic needs to change and creates a training-serving skew nightmare.

The right answer is a streaming feature pipeline with a dual-store architecture. This is what DoorDash, Uber, Lyft, and every major ML platform company has built. This lesson teaches you how to build it.


Why This Exists - The Staleness-Accuracy Tradeoff

Features for ML models have a staleness problem that batch pipelines cannot solve. A feature value computed yesterday is stale. A feature value computed an hour ago is stale for a real-time model. The gap between when a feature was computed and when it is used for inference is called feature lag, and it directly degrades model accuracy for time-sensitive predictions.

The relationship between feature freshness and model performance is problem-specific, but the pattern is consistent: for any prediction that depends on recent events, there is a staleness threshold beyond which the feature adds noise rather than signal. For fraud detection, a rolling 1-hour transaction count computed 2 hours ago may be completely wrong if the fraud burst happened in the last hour. For ETA prediction, a restaurant prep time computed yesterday cannot account for tonight's specific kitchen conditions.

Freshness SLA: the maximum acceptable age of a feature value, beyond which the feature is treated as unreliable. Different features have different freshness SLAs:

Feature TypeExampleFreshness SLA
StaticUser demographics, countryDays to weeks
Slow-changingHistorical 30-day averageHours
Near-real-timeRecent 1-hour aggregationMinutes
Real-timeCurrent session stateSeconds
InstantaneousCurrent event contextMilliseconds

A well-designed feature pipeline acknowledges this hierarchy and routes each feature type through the appropriate computation path.


Feature Categories by Update Frequency

Batch Features - Nightly Recomputation

These are features where the computation is expensive (scanning months of history) and the underlying data changes slowly enough that nightly recomputation is acceptable. User lifetime value, 90-day purchase frequency, historical category preferences, demographic segment assignments.

These features are computed by a scheduled batch job (Spark, dbt, Airflow-orchestrated), written to the offline store (S3 Parquet, Delta Lake), and then synchronized to the online store (Redis, DynamoDB, Cassandra) once per batch cycle.

Implementation: write to offline store first, then run an export job to the online store.

Near-Real-Time Features - Micro-Batch or Short-Window Streaming

Features that need to be fresh within minutes rather than days. Recent purchase count in the last 15 minutes, current session page views, rolling 1-hour merchant activity score. These can be computed by a streaming job with short tumbling windows, or by a micro-batch job running every 1–5 minutes.

Flink or Faust with 5-minute tumbling windows handles this tier comfortably.

Real-Time Features - Per-Event Streaming

Features computed from each individual event as it arrives. Current transaction velocity (events per minute), ongoing session duration, live driver GPS position, current queue depth at a restaurant. These require a streaming pipeline that processes each event and immediately updates the feature store.

This is the tier that requires the full dual-store architecture described below.


The Dual-Store Architecture

The dual-store architecture is the industry-standard solution to the real-time feature problem. Every major ML platform - Uber Michelangelo, DoorDash's Riviera, Lyft's Amundsen, Meta's FBLearner - uses a variant of this pattern.

Online store: optimized for low-latency reads at inference time. Redis, DynamoDB, or Cassandra. Sub-millisecond read latency. Stores only the current (or recent) feature values - no history.

Offline store: optimized for high-throughput, analytical reads for model training. S3 Parquet, Delta Lake, Apache Hudi. Stores the full history of feature values with timestamps. Slower to read but handles petabytes.

The streaming pipeline writes to both simultaneously: every computed feature goes to the online store (for serving) and to a Kafka feature topic (which is batch-landed to the offline store for training).

Events → Kafka (raw) → Flink/Faust → Feature Store Write
├── Redis (online store, current value)
└── Kafka (feature topic, for offline landing)
└── Batch job → S3 Parquet (offline store)

Inference time:
Model Server → Redis (online store) → feature values → prediction

Training time:
Training job → S3 Parquet (offline store) → point-in-time join → training dataset

This architecture solves three problems simultaneously:

  1. Low-latency serving: Redis reads are sub-millisecond
  2. Training data availability: offline store has the full history
  3. Skew prevention: online and offline features are computed by the same streaming code

Streaming Feature Computation Pipeline - Step by Step

Step 1: Events Flow into Kafka

Raw business events (payments, clicks, location updates, order state changes) are published to Kafka topics. Schema is enforced via Confluent Schema Registry. Producers use Avro or Protobuf serialization for efficiency.

Step 2: Flink/Faust Computes Features

The streaming job reads from raw event topics, performs stateful computations (aggregations, joins, pattern detection), and produces feature values. Each feature value has:

  • entity_id: what entity the feature describes (user_id, driver_id, restaurant_id)
  • feature_name: what is being measured (spend_1h, txn_count_15min)
  • feature_value: the computed value
  • event_time: the timestamp of the event that caused this update
  • computation_time: when this value was computed

Step 3: Write to Online Store (Redis)

Computed features are written to Redis immediately. Use HSET to store all features for an entity in a single hash. Set TTL to freshness_sla * 3 to auto-expire stale features. Use event_time as a version key to prevent older computations from overwriting newer ones.

Step 4: Write to Feature Topic for Offline Landing

The same feature values are written to a Kafka feature topic with the full metadata envelope. A separate batch job (hourly or daily) reads this topic and writes to S3 Parquet, partitioned by date and entity_id. This is the data used for training.

Step 5: Model Serving Reads from Redis

At inference time, the model server makes a single HMGET or pipeline call to Redis to fetch all required features for an entity. With a properly constructed Redis key schema and hashed features, fetching 100+ features for one entity takes under 1 millisecond.


Point-in-Time Consistency - The Training Data Problem

When you train a model on historical data, you need to reconstruct exactly what features were available at the time each prediction was made. This is the point-in-time join problem, and it is one of the hardest problems in ML infrastructure.

Consider: your model made a prediction at 2024-03-15 14:32:07. At that moment, the user's 1-hour spend was 142.Butbythetimeyouconstructyourtrainingdataset,thecurrent1hourspendforthatuseris142. But by the time you construct your training dataset, the current 1-hour spend for that user is 89 (the window has rolled). If you naively join features at query time, you get the wrong values.

The correct approach: when you write real-time features to the offline store, preserve the full history with timestamps. Each feature record includes entity_id, feature_name, feature_value, and valid_from_timestamp. Training data construction performs a point-in-time join: for each label event at time TT, find the feature value with the largest valid_from_timestamp that is less than or equal to TT.

feature_valueT=max{v:valid_from(v)T}\text{feature\_value}_{T} = \max \{ v : \text{valid\_from}(v) \le T \}

In SQL:

-- Point-in-time join: get feature value available at label time
SELECT
l.label_event_id,
l.label_timestamp,
l.outcome,
f.feature_value AS spend_1h
FROM labels l
LEFT JOIN LATERAL (
SELECT feature_value
FROM feature_history f
WHERE f.entity_id = l.user_id
AND f.feature_name = 'spend_1h'
AND f.valid_from_timestamp <= l.label_timestamp
ORDER BY f.valid_from_timestamp DESC
LIMIT 1
) f ON TRUE

This is why the offline store must retain the full history of feature values, not just the latest. Feast, Tecton, and Hopsworks all implement this pattern in their feature store query APIs.


Code Examples

import json
import redis
from datetime import datetime
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common import Time, WatermarkStrategy, Duration
from pyflink.datastream.functions import ProcessWindowFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(8)
env.enable_checkpointing(30_000) # Checkpoint every 30 seconds

# Source: raw payments from Kafka
kafka_source = (
KafkaSource.builder()
.set_bootstrap_servers("kafka:9092")
.set_topics("payments")
.set_group_id("flink-feature-pipeline")
.set_value_only_deserializer(SimpleStringSchema())
.build()
)

watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(30))
.with_timestamp_assigner(
lambda raw, _: int(json.loads(raw)["event_timestamp"] * 1000)
)
)

class UserSpendFeatureWriter(ProcessWindowFunction):
"""
Computes per-user spend features in a 5-minute tumbling window.
Writes to Redis (online store) and yields records for offline landing.
"""
def open(self, runtime_context: RuntimeContext):
self.redis_client = redis.Redis(
host="redis-primary",
port=6379,
decode_responses=True,
socket_connect_timeout=1,
socket_timeout=0.5,
)

def process(self, user_id, context, elements):
records = [json.loads(e) for e in elements]
window_end_ms = context.window().end
window_end_ts = window_end_ms / 1000.0

# Compute features from window
spend_5min = sum(r["amount"] for r in records)
txn_count_5min = len(records)
avg_amount_5min = spend_5min / txn_count_5min if txn_count_5min > 0 else 0.0
unique_merchants = len({r["merchant_id"] for r in records})

# --- Write to Redis (online store) ---
redis_key = f"user_features:{user_id}"
pipeline = self.redis_client.pipeline()
pipeline.hset(redis_key, mapping={
"spend_5min": spend_5min,
"txn_count_5min": txn_count_5min,
"avg_amount_5min": avg_amount_5min,
"unique_merchants_5min": unique_merchants,
"feature_timestamp": window_end_ts,
})
# TTL: 3x the window size ensures stale values auto-expire
pipeline.expire(redis_key, 900) # 15 minutes = 3 * 5 minutes
pipeline.execute()

# --- Yield record for Kafka → offline store path ---
offline_record = {
"entity_id": user_id,
"entity_type": "user",
"features": {
"spend_5min": spend_5min,
"txn_count_5min": txn_count_5min,
"avg_amount_5min": avg_amount_5min,
"unique_merchants_5min": unique_merchants,
},
"valid_from_timestamp": window_end_ts,
"pipeline_version": "v2.3",
}
yield json.dumps(offline_record)

def close(self):
if hasattr(self, "redis_client"):
self.redis_client.close()

# Build the pipeline
stream = (
env
.from_source(kafka_source, watermark_strategy, "Payments Source")
.key_by(lambda raw: json.loads(raw)["user_id"])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(UserSpendFeatureWriter())
)

# Sink: write offline feature records back to Kafka for offline landing
feature_sink = (
KafkaSink.builder()
.set_bootstrap_servers("kafka:9092")
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("user-features-offline")
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)

stream.sink_to(feature_sink)
env.execute("User Spend Feature Pipeline v2.3")

2. Redis Feature Writer - Batch Atomic Write for Multiple Features

import redis
from typing import Dict, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class FeatureWriteRequest:
entity_id: str
entity_type: str # "user", "driver", "restaurant"
features: Dict[str, Any]
event_timestamp: float # Unix seconds
ttl_seconds: int = 900 # 15 minutes default

class RedisFeatureStore:
"""
Writes features to Redis with:
- Atomic batch writes via pipeline
- Version checking to prevent old overwrites
- Automatic TTL for stale value cleanup
"""

def __init__(self, redis_url: str = "redis://redis-primary:6379"):
self.client = redis.from_url(
redis_url,
decode_responses=True,
socket_connect_timeout=1,
socket_timeout=0.5,
retry_on_timeout=True,
)

def write_features(self, request: FeatureWriteRequest) -> bool:
"""
Atomically write all features for an entity.
Uses a Lua script to compare-and-set: only write if
the incoming timestamp is newer than what's stored.
"""
key = f"{request.entity_type}_features:{request.entity_id}"
version_key = f"{key}:version"

# Lua script for compare-and-set
# Atomically checks existing version and only writes if newer
lua_script = """
local current_ts = redis.call('GET', KEYS[2])
if current_ts and tonumber(current_ts) >= tonumber(ARGV[1]) then
return 0 -- existing is same or newer, skip write
end
-- Write all feature values
local features = {}
for i = 2, #ARGV do
features[#features+1] = ARGV[i]
end
redis.call('HSET', KEYS[1], unpack(features))
redis.call('EXPIRE', KEYS[1], ARGV[#ARGV - #features + 1]) -- TTL
redis.call('SET', KEYS[2], ARGV[1], 'EX', ARGV[#ARGV - #features + 1])
return 1 -- wrote successfully
"""

# Flatten features dict for HSET: [field1, value1, field2, value2, ...]
feature_args = []
for field, value in request.features.items():
feature_args.extend([field, str(value)])
feature_args.append("feature_timestamp")
feature_args.append(str(request.event_timestamp))

script = self.client.register_script(lua_script)
result = script(
keys=[key, version_key],
args=[str(request.event_timestamp)] + feature_args + [str(request.ttl_seconds)],
)
return bool(result)

def read_features_batch(
self,
entity_type: str,
entity_ids: list,
feature_names: list,
) -> Dict[str, Dict[str, Any]]:
"""
Fetch features for multiple entities in a single pipeline call.
Returns: {entity_id: {feature_name: value}}
"""
pipeline = self.client.pipeline(transaction=False)

for entity_id in entity_ids:
key = f"{entity_type}_features:{entity_id}"
pipeline.hmget(key, feature_names + ["feature_timestamp"])

results = pipeline.execute()

output = {}
for entity_id, values in zip(entity_ids, results):
if values and any(v is not None for v in values):
feature_dict = {}
for name, value in zip(feature_names + ["feature_timestamp"], values):
if value is not None:
try:
feature_dict[name] = float(value)
except (ValueError, TypeError):
feature_dict[name] = value
output[entity_id] = feature_dict
else:
output[entity_id] = {} # Cache miss - use fallback features

return output


# Usage in a model serving context
store = RedisFeatureStore("redis://redis-primary:6379")

# At inference time: fetch features for 1 user
user_features = store.read_features_batch(
entity_type="user",
entity_ids=["user_12345"],
feature_names=["spend_5min", "txn_count_5min", "avg_amount_5min", "spend_1h"],
)
print(user_features)
# {"user_12345": {"spend_5min": 142.50, "txn_count_5min": 3, ...}}

3. Feature Freshness Monitor

import redis
import time
from typing import Dict, List, NamedTuple
from dataclasses import dataclass

@dataclass
class FreshnessAlert:
entity_type: str
feature_name: str
staleness_seconds: float
sla_seconds: float
sample_entity_id: str

class FeatureFreshnessMonitor:
"""
Monitors feature freshness by sampling Redis entries and checking
the feature_timestamp field against current time.
Publishes metrics to Prometheus (or logs alerts directly).
"""

FEATURE_SLAS = {
"spend_5min": 600, # 10 minutes SLA for 5-min window feature
"txn_count_5min": 600,
"spend_1h": 3_600, # 1 hour SLA for 1-hour window feature
"driver_location": 30, # 30 seconds for real-time GPS features
"restaurant_wait_time": 60,
}

def __init__(self, redis_url: str, sample_entity_ids: List[str]):
self.client = redis.from_url(redis_url, decode_responses=True)
self.sample_entity_ids = sample_entity_ids

def check_freshness(self, entity_type: str) -> List[FreshnessAlert]:
alerts = []
now = time.time()

for entity_id in self.sample_entity_ids[:20]: # Sample up to 20 entities
key = f"{entity_type}_features:{entity_id}"
feature_ts_raw = self.client.hget(key, "feature_timestamp")

if feature_ts_raw is None:
# Key doesn't exist - feature is infinitely stale
for feature_name, sla in self.FEATURE_SLAS.items():
alerts.append(FreshnessAlert(
entity_type=entity_type,
feature_name=feature_name,
staleness_seconds=float("inf"),
sla_seconds=sla,
sample_entity_id=entity_id,
))
continue

feature_ts = float(feature_ts_raw)
staleness = now - feature_ts

for feature_name, sla in self.FEATURE_SLAS.items():
if staleness > sla:
alerts.append(FreshnessAlert(
entity_type=entity_type,
feature_name=feature_name,
staleness_seconds=staleness,
sla_seconds=sla,
sample_entity_id=entity_id,
))

return alerts

def run_once(self):
for entity_type in ["user", "driver", "restaurant"]:
alerts = self.check_freshness(entity_type)
for alert in alerts:
staleness_ratio = alert.staleness_seconds / alert.sla_seconds
if staleness_ratio > 2.0:
print(
f"[CRITICAL] {alert.entity_type}.{alert.feature_name}: "
f"{alert.staleness_seconds:.0f}s stale "
f"(SLA={alert.sla_seconds}s, ratio={staleness_ratio:.1f}x), "
f"sample_entity={alert.sample_entity_id}"
)
elif staleness_ratio > 1.2:
print(
f"[WARNING] {alert.entity_type}.{alert.feature_name}: "
f"{alert.staleness_seconds:.0f}s stale (SLA={alert.sla_seconds}s)"
)

# Run as a cron or a background thread in your monitoring service
monitor = FeatureFreshnessMonitor(
redis_url="redis://redis-primary:6379",
sample_entity_ids=["user_001", "user_002", "user_003", "user_004", "user_005"],
)
monitor.run_once()

4. Training-Serving Skew Test - Compare Batch vs Stream Features

import pandas as pd
import numpy as np
from typing import Tuple

def compute_spend_batch(events_df: pd.DataFrame, window_hours: int = 1) -> pd.Series:
"""
Batch computation of spend feature - simulates what a training pipeline does.
"""
cutoff = events_df["event_time"].max() - pd.Timedelta(hours=window_hours)
recent = events_df[events_df["event_time"] >= cutoff]
return recent.groupby("user_id")["amount"].sum()

def load_stream_features_from_redis(
user_ids: list,
redis_client,
feature_name: str = "spend_5min",
) -> pd.Series:
"""
Load the same feature from the online store (stream-computed).
"""
pipeline = redis_client.pipeline(transaction=False)
for uid in user_ids:
pipeline.hget(f"user_features:{uid}", feature_name)
values = pipeline.execute()
return pd.Series(
{uid: float(v) if v is not None else np.nan for uid, v in zip(user_ids, values)},
name=feature_name,
)

def detect_training_serving_skew(
events_df: pd.DataFrame,
redis_client,
feature_name: str = "spend_1h",
skew_threshold: float = 0.1, # 10% relative difference triggers alert
) -> Tuple[float, pd.DataFrame]:
"""
Compare batch-computed features with stream-computed features.
Returns (mean_relative_error, detailed_comparison_df)
"""
user_ids = events_df["user_id"].unique().tolist()

batch_features = compute_spend_batch(events_df, window_hours=1)
stream_features = load_stream_features_from_redis(user_ids, redis_client, feature_name)

comparison = pd.DataFrame({
"batch": batch_features,
"stream": stream_features,
}).dropna()

comparison["absolute_diff"] = (comparison["batch"] - comparison["stream"]).abs()
comparison["relative_diff"] = comparison["absolute_diff"] / comparison["batch"].clip(lower=0.01)

mean_relative_error = comparison["relative_diff"].mean()

if mean_relative_error > skew_threshold:
print(
f"[SKEW ALERT] {feature_name}: mean relative error = {mean_relative_error:.3f} "
f"(threshold={skew_threshold}). "
f"Training and serving features differ - model predictions will be biased."
)
print("Top skewed users:")
print(comparison.nlargest(5, "relative_diff")[["batch", "stream", "relative_diff"]])

return mean_relative_error, comparison

5. Hot Key Mitigation - Splitting High-Cardinality Restaurant Features

import hashlib
import redis
from typing import Optional

class HotKeyMitigatedFeatureStore:
"""
For entities with extremely high event rates (popular restaurants during peak hours),
naive Redis writes create hot keys - single Redis nodes absorb disproportionate load.

Solution: shard hot entity writes across N virtual keys, aggregate at read time.
"""

def __init__(self, redis_client: redis.Redis, num_shards: int = 8):
self.client = redis_client
self.num_shards = num_shards

def _shard_key(self, entity_id: str, shard_num: int) -> str:
return f"restaurant_features:{entity_id}:shard:{shard_num}"

def _routing_shard(self, entity_id: str, event_id: str) -> int:
"""Route a specific event to a deterministic shard."""
combined = f"{entity_id}:{event_id}"
return int(hashlib.md5(combined.encode()).hexdigest(), 16) % self.num_shards

def write_order_event(self, restaurant_id: str, order_id: str, prep_time_seconds: float):
"""
Write to a random shard to distribute write load.
Each shard accumulates partial aggregates.
"""
shard_num = self._routing_shard(restaurant_id, order_id)
shard_key = self._shard_key(restaurant_id, shard_num)

pipeline = self.client.pipeline()
pipeline.hincrbyfloat(shard_key, "total_prep_time", prep_time_seconds)
pipeline.hincrby(shard_key, "order_count", 1)
pipeline.expire(shard_key, 600) # 10-minute TTL
pipeline.execute()

def read_avg_prep_time(self, restaurant_id: str) -> Optional[float]:
"""
Read all shards and aggregate at read time.
This is slightly slower but write throughput is distributed.
"""
pipeline = self.client.pipeline(transaction=False)
for shard_num in range(self.num_shards):
shard_key = self._shard_key(restaurant_id, shard_num)
pipeline.hgetall(shard_key)

shard_results = pipeline.execute()

total_prep_time = 0.0
total_orders = 0

for shard_data in shard_results:
if shard_data:
total_prep_time += float(shard_data.get("total_prep_time", 0))
total_orders += int(shard_data.get("order_count", 0))

if total_orders == 0:
return None

return total_prep_time / total_orders


# Usage
store = HotKeyMitigatedFeatureStore(
redis_client=redis.Redis(host="redis-primary"),
num_shards=8,
)

# Write: distributed across 8 shards
store.write_order_event(
restaurant_id="mcdonalds_michigan_ave",
order_id="order_98765",
prep_time_seconds=420,
)

# Read: aggregate across all shards - slightly more expensive but acceptable
avg_prep = store.read_avg_prep_time("mcdonalds_michigan_ave")
print(f"Avg prep time: {avg_prep:.0f}s")

6. Feature Store Read at Inference - Multi-Entity Batch Fetch

import redis
import time
from typing import Dict, Any, Optional
import numpy as np

class InferenceFeatureLoader:
"""
Loads all features required by the model at inference time.
Uses pipelined Redis calls to minimize round trips.
"""

# Feature registry: entity_type → list of feature names
MODEL_FEATURES = {
"user": [
"spend_5min", "txn_count_5min", "avg_amount_5min",
"spend_1h", "txn_count_1h", "unique_merchants_1h",
"spend_7d_avg", "risk_score_v3",
],
"restaurant": [
"avg_prep_time_30min", "order_count_30min", "completion_rate_1h",
],
"driver": [
"deliveries_completed_1h", "avg_delivery_time_1h", "acceptance_rate_6h",
],
}

# Fallback values when feature is missing from cache
FEATURE_DEFAULTS = {
"spend_5min": 0.0,
"txn_count_5min": 0,
"avg_amount_5min": 25.0, # population average
"spend_1h": 0.0,
"risk_score_v3": 0.5, # neutral risk
"avg_prep_time_30min": 600.0, # 10 min population average
"deliveries_completed_1h": 2.0,
}

def __init__(self, redis_url: str):
self.client = redis.from_url(
redis_url,
decode_responses=True,
socket_connect_timeout=0.5,
socket_timeout=0.5,
)

def load_for_prediction(
self,
user_id: str,
restaurant_id: str,
driver_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Load all features needed for one prediction in a single pipeline call.
Latency target: under 2ms for the Redis portion.
"""
start = time.perf_counter()

entities = {
"user": user_id,
"restaurant": restaurant_id,
}
if driver_id:
entities["driver"] = driver_id

# Pipeline all HMGET calls in a single round trip
pipeline = self.client.pipeline(transaction=False)
request_order = []

for entity_type, entity_id in entities.items():
feature_names = self.MODEL_FEATURES[entity_type]
key = f"{entity_type}_features:{entity_id}"
pipeline.hmget(key, feature_names + ["feature_timestamp"])
request_order.append((entity_type, feature_names))

results = pipeline.execute()
elapsed_ms = (time.perf_counter() - start) * 1000

if elapsed_ms > 5:
print(f"[SLOW FEATURE FETCH] {elapsed_ms:.1f}ms for entities: {entities}")

# Build flat feature dict
all_features = {}
stale_features = []

for (entity_type, feature_names), values in zip(request_order, results):
# Last value is the feature_timestamp
feature_ts_raw = values[-1]
values = values[:-1]

if feature_ts_raw is not None:
feature_ts = float(feature_ts_raw)
staleness = time.time() - feature_ts
if staleness > 300: # Alert if feature is 5+ minutes stale
stale_features.append((entity_type, staleness))

for name, raw_value in zip(feature_names, values):
if raw_value is not None:
try:
all_features[name] = float(raw_value)
except (ValueError, TypeError):
all_features[name] = self.FEATURE_DEFAULTS.get(name, 0.0)
else:
# Cache miss - use population-level default
all_features[name] = self.FEATURE_DEFAULTS.get(name, 0.0)

if stale_features:
print(f"[STALE FEATURES] {stale_features}")

return all_features


# Usage at inference time
loader = InferenceFeatureLoader("redis://redis-primary:6379")

features = loader.load_for_prediction(
user_id="user_12345",
restaurant_id="restaurant_678",
driver_id="driver_99",
)
# Now pass features dict to model.predict(features)
feature_vector = np.array([[
features["spend_1h"],
features["txn_count_5min"],
features["avg_prep_time_30min"],
features["deliveries_completed_1h"],
features["risk_score_v3"],
]])

Architecture Diagram


Production Notes

DoorDash Riviera - Real Architecture

DoorDash's public engineering blog describes their real-time feature platform (Riviera) with these specifics: Apache Flink for streaming computation, Redis as the online store, S3 Parquet as the offline store. 150+ features per delivery time prediction. Freshness SLA varies by feature: driver GPS features must be fresh within 30 seconds; historical route duration features can be up to 1 hour old. The Flink job runs at ~8 parallelism per feature group, with 30-second checkpoints. Redis is a cluster with 128 GB RAM, 6 nodes, replication factor 2.

Uber Michelangelo Dual-Store

Uber Michelangelo (their ML platform, described publicly) uses Apache Flink for real-time feature computation, Cassandra as the online store (chosen over Redis for its multi-region replication), and Hive on HDFS as the offline store. The same streaming pipeline writes to both - one Flink sink goes to Cassandra, another to a Kafka topic that is batch-landed to Hive.

The key insight from Michelangelo: train with the exact same feature values that were served. They enforce this by logging the feature values actually used for each prediction and joining those logged values back to labels for training - rather than recomputing features from raw events. This completely eliminates training-serving skew.

Choosing the Online Store

StoreRead LatencyWrite ThroughputReplicationBest For
Redis<1msVery highSingle-regionMost ML serving use cases
DynamoDB1–5msHighMulti-regionAWS-native, global serving
Cassandra2–10msVery highMulti-regionLarge state, multi-region
Bigtable5–10msVery highMulti-regionGCP-native, high cardinality

Redis is the default choice. Cassandra when you need multi-region replication. DynamoDB when you are serverless on AWS and do not want to manage Redis.


Common Mistakes

:::danger Computing Features in Application Code Instead of a Pipeline Embedding feature computation inside your API handler - "add this amount to the user's Redis counter when a payment arrives" - seems simple. It creates an unmaintainable mess: feature logic is scattered across microservices, there is no offline history (you cannot train on it), and you cannot replay or backfill without re-processing production traffic. Always centralize feature computation in a dedicated streaming pipeline. :::

:::danger Training on Batch Features, Serving Real-Time Features If your training pipeline recomputes features from raw events using different code than your serving pipeline, you have training-serving skew by construction. The model learns patterns from batch-computed features and then receives real-time features at inference - they will diverge during real-time events the batch pipeline cannot capture. The solution: serve from and train on the same feature store, with the streaming pipeline as the single source of truth. :::

:::warning State Without TTL Causes Unbounded Growth Every entity that ever sent an event gets a Redis key. Without TTL, inactive users accumulate indefinitely. A system with 50 million users that each transact once and never return will consume 50 million Redis keys with no cleanup. Set TTL on every key, sized to freshness_sla * 3 or the session timeout, whichever is larger. Accept that inactive entities will have cache misses and fall back to defaults. :::

:::warning Not Testing Freshness Under Load Feature freshness SLAs are easy to meet in development and easy to violate in production during traffic spikes. Your Flink job may process 10,000 events/second normally but fall behind during peak (50,000/second), causing features to lag by several window periods. Load test your streaming pipeline at 2x peak throughput and verify that p99 feature freshness stays within SLA. :::

:::tip Use Pipeline Calls for Multi-Feature Reads Never fetch features with individual Redis GET calls - each call is a round trip. A model with 50 features served via 50 individual GET calls could add 50+ milliseconds of Redis latency. Use HMGET for all features of one entity in a single call, and use Redis pipeline for multiple entities in a single round trip. A well-implemented feature loader can fetch 150 features across 3 entities in under 2 milliseconds. :::


Interview Q&A

Q: What is training-serving skew and how do you prevent it in real-time features?

A: Training-serving skew is when the feature values used to train a model are computed differently from the feature values delivered at inference time, causing the model to perform worse in production than in training. For real-time features, the classic cause is: training uses batch-computed historical features (SQL aggregations over a data warehouse), while serving uses streaming-computed features (Flink or Faust aggregations from Kafka). Even with the same logical computation, different code paths lead to different values.

Prevention: use the same streaming pipeline to compute features for both serving and training. The pipeline writes to the online store (Redis, for serving) and to a Kafka feature topic (for offline landing to S3). Training jobs read from the offline store - they see the exact same values that were written by the same streaming code that serves predictions. An alternative is to log the feature values actually used for each prediction and join those logged values to training labels, which guarantees the training set reflects exactly what the model saw.

Q: How do you design a dual-store architecture for ML features?

A: A dual-store architecture has two storage layers with different characteristics. The online store (Redis, DynamoDB, or Cassandra) is optimized for low-latency reads at inference time - sub-millisecond reads, stores only current feature values, auto-expires with TTL. The offline store (S3 Parquet, Delta Lake) is optimized for high-throughput analytical reads for training - stores the full history of feature values with timestamps, supports point-in-time joins, handles petabytes.

A streaming pipeline (Flink or Faust) writes to both simultaneously: every computed feature value goes to the online store immediately, and also to a Kafka feature topic that a batch job periodically lands to the offline store. This dual-write pattern ensures that training data and serving data come from the same computation, preventing skew.

Q: What is feature freshness and how do you monitor it?

A: Feature freshness is the age of the most recent feature value in your online store - how long ago it was computed relative to the current time. A freshness SLA defines the maximum acceptable age beyond which the feature is considered unreliable.

Monitoring: store the computation timestamp alongside each feature value (HSET user_features:123 feature_timestamp 1710512340.5). A monitoring process periodically samples a representative set of entities, reads their feature_timestamp, and computes now - feature_timestamp. Alert when staleness exceeds the SLA. Key metrics: p50 and p99 staleness by feature group, count of entities with missing keys (cache misses), and end-to-end latency from Kafka event production to Redis write completion.

Q: How would you design the feature pipeline for DoorDash ETA prediction?

A: I would classify the 150+ features by freshness SLA and route each through the appropriate computation path. Driver GPS and availability features need freshness within 30 seconds - compute in a Flink job with per-event triggers, write to Redis immediately. Restaurant prep time features need freshness within 5 minutes - compute in a Flink 5-minute tumbling window. Route historical features (delivery time by route, day, time) can tolerate 1-hour staleness - compute in a micro-batch or hourly Spark job.

The shared infrastructure: all three computation paths write to the same Redis cluster (online store) and the same feature topic (for offline landing to S3). The model server reads all features from Redis in a single pipeline call. The training pipeline reads from S3 using a point-in-time join to reconstruct what features were available at each historical prediction time.

Q: What are hot keys in streaming feature computation and how do you handle them?

A: A hot key occurs when a single entity (a popular restaurant, a high-traffic user, a trending product) receives a disproportionately high rate of events. In Kafka, all events for the same entity key go to the same partition - a hot key creates one overloaded partition while others are idle. In Redis, a hot key concentrates writes on one node in the cluster.

Solutions: for Kafka partitioning, use a composite key that adds a random shard suffix to the entity ID - restaurant_123:shard:3 - and distribute events across multiple partitions. Aggregate results from all shards before writing to the feature store. For Redis hot keys, use the same shard-and-aggregate pattern: write incremental updates to sharded keys, aggregate at read time. The read becomes slightly more expensive (fetching 8 shard keys instead of 1), but write throughput scales linearly with shard count.

Q: How do you ensure point-in-time consistency when using real-time features for model training?

A: Point-in-time consistency means: when constructing a training example for a prediction made at time TT, use only feature values that were available at time TT - not values computed later. This prevents label leakage from future events.

The technical solution: the offline feature store preserves the full history of feature values with their valid_from_timestamp. When constructing training data, perform a point-in-time join: for each label event at time TT, select the feature record with the largest valid_from_timestamp less than or equal to TT. Most feature stores (Feast, Tecton, Hopsworks) provide this as a native API. If building from scratch, this is a window function in SQL: WHERE valid_from_timestamp <= label_timestamp ORDER BY valid_from_timestamp DESC LIMIT 1 per entity per label event.

© 2026 EngineersOfAI. All rights reserved.