:::tip 🎮 Interactive Playground Visualize this concept: Try the Feature Store Architecture demo on the EngineersOfAI Playground - no code required. :::
Real-Time Feature Engineering at Scale
The Production Scenario
It is 9:47 AM on a Monday. A user on a ride-sharing platform opens the app and requests a ride. In the next 200ms, the app must show a price, an ETA, and a confidence score that a driver will accept the request quickly. To produce these numbers, the ML system needs features that describe the current state of supply and demand in the user's geographic area - right now, at 9:47 AM on this specific Monday, not yesterday's average.
The features that matter are: how many drivers are active within 2km in the last 3 minutes, how many riders have requested in this area in the last 10 minutes, the ratio of active drivers to active riders (supply-demand ratio), the average driver acceptance rate in this area in the last hour, and the user's historical acceptance rate at this price point at this time of day.
None of these features can be pre-computed for 9:47 AM on Monday. They depend on what is happening right now, in this specific area, and they need to be ready within the 200ms window before the user's app times out. The feature engineering that produces these values - from a stream of GPS pings, ride requests, and acceptance events - is what real-time feature engineering actually involves.
Why This Exists - The Gap Between Training Features and Serving Features
In a training notebook, features look like pandas DataFrame columns. You read a table, join it with another table, compute some aggregates, and hand the resulting matrix to a model. The computation runs in minutes or hours. You are never worried about a 200ms deadline.
In production, those same features must be computed within a hard latency budget. The aggregates that took minutes to compute in training must be ready in milliseconds at serving time. The joins between tables must happen against live data, not historical snapshots.
This creates three distinct engineering problems:
-
Temporal aggregations: Computing "count of events in last N minutes" requires maintaining a rolling window over a continuous stream of events, updating it as new events arrive, and serving the current value in under 5ms.
-
Training-serving skew: If training uses a batch computation (Spark job over historical data) and serving uses a streaming computation (Flink over live data), subtle differences in how edge cases are handled - events that arrive out of order, duplicate events, different window semantics - will cause the features to differ. This degrades model quality silently.
-
Scale: For a platform with 50 million active users, you have 50 million per-user feature states. Storing and serving these states requires careful data structure and storage choices.
Historical Context
The concept of a feature store emerged from Uber's Michelangelo platform (2017) and Airbnb's Zipline (2018). Before these systems, each ML team maintained their own feature computation pipelines - often duplicate logic between training (Python/Spark) and serving (Java/C++), with frequent divergence causing training-serving skew bugs.
Uber's Michelangelo was the first publicly described system to provide both offline feature computation (for training data) and online feature serving (for real-time inference) from a unified feature definition. Feast (2020, open-sourced by Gojek and Tecton) brought this pattern to the open-source community.
The real-time feature engineering problem specifically - computing features from live events within milliseconds - is still largely solved with Redis, with different implementations for different aggregation types.
Feature Taxonomy for Real-Time Systems
Redis Data Structures for ML Features
Redis is the standard backend for real-time ML feature storage. Different Redis data structures map to different feature types:
Hash for User Feature Vectors
# redis_feature_store.py
import redis
import numpy as np
import json
import time
from typing import Optional
r = redis.Redis(host="localhost", port=6379, decode_responses=False)
def write_user_features(
user_id: str,
features: dict,
ttl_seconds: int = 7200,
):
"""Write a user's feature vector as a Redis Hash."""
key = f"features:user:{user_id}"
pipe = r.pipeline()
# Convert all values to strings for Redis hash
string_features = {}
for name, value in features.items():
if isinstance(value, (int, float)):
string_features[name] = str(value)
elif isinstance(value, np.ndarray):
# Store embeddings as binary (more efficient than JSON)
string_features[f"{name}:binary"] = value.astype(np.float32).tobytes()
else:
string_features[name] = json.dumps(value)
pipe.hset(key, mapping=string_features)
pipe.expire(key, ttl_seconds)
pipe.execute()
def read_user_features(
user_id: str,
feature_names: list[str],
) -> dict:
"""Read specific features for a user - HMGET for efficiency."""
key = f"features:user:{user_id}"
values = r.hmget(key, feature_names)
result = {}
for name, value in zip(feature_names, values):
if value is None:
result[name] = None
elif name.endswith(":binary"):
result[name.replace(":binary", "")] = np.frombuffer(value, dtype=np.float32)
else:
try:
result[name] = float(value)
except (ValueError, TypeError):
result[name] = json.loads(value)
return result
def read_batch_user_features(
user_ids: list[str],
feature_names: list[str],
) -> dict[str, dict]:
"""
Read features for multiple users in a single Redis pipeline.
Much faster than N individual HMGET calls.
"""
pipe = r.pipeline()
for user_id in user_ids:
key = f"features:user:{user_id}"
pipe.hmget(key, feature_names)
results = pipe.execute()
user_features = {}
for user_id, values in zip(user_ids, results):
user_features[user_id] = {
name: float(val) if val is not None else 0.0
for name, val in zip(feature_names, values)
}
return user_features
Sorted Set for Sliding Window Aggregations
Redis Sorted Sets (ZSET) are ideal for sliding window counts. Each event is added with its timestamp as the score. To count events in the last N seconds, remove old members and count the remaining:
# sliding_window_redis.py
import time
import redis
r = redis.Redis(host="localhost", port=6379)
def add_event(
window_key: str,
event_id: str,
timestamp: Optional[float] = None,
max_window_seconds: int = 3600,
):
"""
Add an event to a sliding window.
The sorted set key is the window identifier (e.g., "tx:count:user_123").
Score is the event timestamp in seconds.
Value is the event ID (must be unique to avoid ZADD deduplication).
"""
ts = timestamp or time.time()
pipe = r.pipeline()
# Add event with timestamp as score
pipe.zadd(window_key, {event_id: ts})
# Remove events older than the maximum window size
# (avoid unbounded growth)
pipe.zremrangebyscore(window_key, 0, ts - max_window_seconds)
# Set TTL on the key itself
pipe.expire(window_key, max_window_seconds + 60)
pipe.execute()
def count_events_in_window(
window_key: str,
window_seconds: int,
now: Optional[float] = None,
) -> int:
"""Count events in the last N seconds using ZCOUNT."""
ts = now or time.time()
cutoff = ts - window_seconds
return r.zcount(window_key, cutoff, "+inf")
def sum_in_window(
window_key: str,
amount_key: str, # Separate hash for amounts
window_seconds: int,
now: Optional[float] = None,
) -> float:
"""
Sum amounts for events in the last N seconds.
Uses a separate hash for amounts since ZSET scores are timestamps.
"""
ts = now or time.time()
cutoff = ts - window_seconds
# Get event IDs in window
event_ids = r.zrangebyscore(window_key, cutoff, "+inf")
if not event_ids:
return 0.0
# Get amounts for those events
amounts = r.hmget(amount_key, [eid.decode() for eid in event_ids])
return sum(float(a) for a in amounts if a is not None)
# Example: fraud velocity features
def update_fraud_features(user_id: str, transaction_id: str, amount: float):
"""Update all fraud velocity features for a user after a transaction."""
tx_window_key = f"fraud:tx:window:{user_id}"
amount_key = f"fraud:tx:amounts:{user_id}"
pipe = r.pipeline()
# Add transaction to sorted set with current timestamp
pipe.zadd(tx_window_key, {transaction_id: time.time()})
# Store the amount
pipe.hset(amount_key, transaction_id, str(amount))
pipe.expire(amount_key, 7200)
# Clean up old events (older than 1 hour)
pipe.zremrangebyscore(tx_window_key, 0, time.time() - 3600)
pipe.expire(tx_window_key, 7200)
pipe.execute()
def get_fraud_features(user_id: str) -> dict:
"""Get fraud velocity features for model serving."""
tx_window_key = f"fraud:tx:window:{user_id}"
amount_key = f"fraud:tx:amounts:{user_id}"
now = time.time()
pipe = r.pipeline()
# Count transactions in each window
pipe.zcount(tx_window_key, now - 300, "+inf") # Last 5 min
pipe.zcount(tx_window_key, now - 900, "+inf") # Last 15 min
pipe.zcount(tx_window_key, now - 3600, "+inf") # Last 1 hour
# Get event IDs in 5-min window for amount sum
pipe.zrangebyscore(tx_window_key, now - 300, "+inf")
count_5min, count_15min, count_1hr, recent_tx_ids = pipe.execute()
# Sum amounts for 5-min window
amount_5min = 0.0
if recent_tx_ids:
amounts = r.hmget(amount_key, [tid.decode() for tid in recent_tx_ids])
amount_5min = sum(float(a) for a in amounts if a is not None)
return {
"tx_count_5min": int(count_5min),
"tx_count_15min": int(count_15min),
"tx_count_1hr": int(count_1hr),
"total_amount_5min": amount_5min,
}
Session Feature Detection
# session_features.py
import time
import uuid
import redis
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
SESSION_TIMEOUT_SECONDS = 1800 # 30 minutes
def update_session(user_id: str, event_type: str, page: str) -> dict:
"""
Update session features for a user event.
Returns current session state.
"""
session_key = f"session:{user_id}"
now = time.time()
# Get current session state
session = r.hgetall(session_key)
if not session or (now - float(session.get("last_event_ts", 0))) > SESSION_TIMEOUT_SECONDS:
# New session (first event or timeout)
session = {
"session_id": str(uuid.uuid4()),
"session_start_ts": str(now),
"last_event_ts": str(now),
"event_count": "1",
"max_funnel_depth": str(_funnel_depth(event_type)),
"unique_pages": page,
}
else:
# Existing session - update
existing_pages = set(session.get("unique_pages", "").split(","))
existing_pages.add(page)
session["last_event_ts"] = str(now)
session["event_count"] = str(int(session["event_count"]) + 1)
session["max_funnel_depth"] = str(max(
int(session["max_funnel_depth"]),
_funnel_depth(event_type),
))
session["unique_pages"] = ",".join(existing_pages)
# Write updated session
r.hset(session_key, mapping=session)
r.expire(session_key, SESSION_TIMEOUT_SECONDS + 60)
return {
"session_id": session["session_id"],
"session_duration_seconds": now - float(session["session_start_ts"]),
"events_in_session": int(session["event_count"]),
"max_funnel_depth": int(session["max_funnel_depth"]),
"unique_pages": len(session["unique_pages"].split(",")),
"is_new_session": int(session["event_count"]) == 1,
}
def _funnel_depth(event_type: str) -> int:
"""Map event type to funnel depth (0=browse, 1=pdp, 2=cart, 3=checkout, 4=purchase)."""
depths = {
"page_view": 0,
"product_view": 1,
"add_to_cart": 2,
"checkout_start": 3,
"purchase": 4,
}
return depths.get(event_type, 0)
Supply and Demand Features: Geographic Aggregations
For ride-sharing, delivery, and marketplace platforms, features often require geographic aggregations - how many drivers are active within X km:
# geo_features.py
import redis
import math
import time
from typing import Optional
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
# Redis has native GEO commands - no external library needed
def update_driver_location(
driver_id: str,
latitude: float,
longitude: float,
status: str = "active",
):
"""Update driver location using Redis GEO commands."""
# Redis GEO stores lat/lon as geohash in a sorted set
r.geoadd("drivers:active", (longitude, latitude, driver_id))
r.expire("drivers:active", 300) # Drivers expire after 5 min if no update
# Store status separately
r.setex(f"driver:status:{driver_id}", 300, status)
def count_active_drivers_nearby(
latitude: float,
longitude: float,
radius_km: float = 2.0,
) -> int:
"""Count active drivers within radius_km of the given location."""
# GEORADIUS returns all members within the radius
nearby = r.georadius(
"drivers:active",
longitude,
latitude,
radius_km,
unit="km",
count=1000, # Max results
)
return len(nearby)
def get_supply_demand_ratio(
latitude: float,
longitude: float,
area_key: str, # Geohash of the area for bucketing requests
) -> dict:
"""
Compute supply/demand ratio for a geographic area.
Supply = active drivers in area.
Demand = active ride requests in area (last 10 minutes).
"""
now = time.time()
# Count active drivers
active_drivers = count_active_drivers_nearby(latitude, longitude, radius_km=2.0)
# Count active requests (from a sorted set of request events)
request_window_key = f"requests:area:{area_key}"
active_requests = r.zcount(request_window_key, now - 600, "+inf")
ratio = active_drivers / max(1, active_requests)
return {
"active_drivers_2km": active_drivers,
"active_requests_10min": int(active_requests),
"supply_demand_ratio": ratio,
"surge_predicted": ratio < 0.5, # Fewer drivers than requests
}
The Training-Serving Skew Solution
The root cause of training-serving skew in real-time features: different code computing the same feature in training and serving. The solution is one definition, two implementations that are tested for equivalence:
# feature_definitions.py - canonical feature logic
# Import this in BOTH training (Spark UDFs) and serving (Python functions)
def compute_tx_count_window(
event_timestamps: list[float],
window_start: float,
window_end: float,
) -> int:
"""
Count events within a time window.
Args:
event_timestamps: Sorted list of event timestamps (seconds since epoch)
window_start: Window start timestamp (inclusive)
window_end: Window end timestamp (inclusive)
This function is used in:
- Training: applied to historical event lists per user per window
- Serving: Redis ZCOUNT over the same window boundaries
- Integration test: compare outputs on a reference dataset
"""
return sum(1 for ts in event_timestamps if window_start <= ts <= window_end)
def compute_tx_amount_sum(
event_timestamps: list[float],
event_amounts: list[float],
window_start: float,
window_end: float,
) -> float:
"""Sum of transaction amounts within a time window."""
total = 0.0
for ts, amount in zip(event_timestamps, event_amounts):
if window_start <= ts <= window_end:
total += amount
return total
# Integration test: verify Redis implementation matches reference implementation
def verify_redis_matches_reference(
user_id: str,
reference_events: list[dict], # [{timestamp, amount}, ...]
window_seconds: int = 300,
):
"""
Assert that the Redis-based feature computation matches
the reference Python implementation.
Run this in CI for every change to the feature pipeline.
"""
now = time.time()
window_start = now - window_seconds
# Reference implementation
timestamps = [e["timestamp"] for e in reference_events]
amounts = [e["amount"] for e in reference_events]
ref_count = compute_tx_count_window(timestamps, window_start, now)
ref_sum = compute_tx_amount_sum(timestamps, amounts, window_start, now)
# Redis implementation
redis_features = get_fraud_features(user_id)
redis_count = redis_features["tx_count_5min"]
redis_sum = redis_features["total_amount_5min"]
assert abs(ref_count - redis_count) <= 1, (
f"Count mismatch: ref={ref_count}, redis={redis_count}"
)
assert abs(ref_sum - redis_sum) / max(1, ref_sum) < 0.01, (
f"Sum mismatch: ref={ref_sum:.2f}, redis={redis_sum:.2f}"
)
Uber's Michelangelo Real-Time Feature Pipeline
Uber's Michelangelo platform, described in their 2017 engineering blog post, is the reference architecture for large-scale real-time ML feature pipelines:
Key design decisions from Michelangelo:
-
Point-in-time correct joins: Training data must use the feature values that would have been available at the time of each training example, not the current (retrospectively computed) values. Michelangelo stores feature values with timestamps and performs point-in-time joins for training.
-
Unified feature definitions: Features are defined once in a YAML/DSL and compiled to both offline (Spark) and online (Flink) computation code. This eliminates the possibility of training-serving skew from divergent implementations.
-
Feature lineage: Every feature value is traceable to its source events. When a model's performance degrades, engineers can identify which features changed and trace them back to which upstream data pipelines produced them.
Production Engineering Notes
Measure feature freshness as a first-class metric: Track how many seconds old each feature is when it is read at serving time. Define SLAs per feature type (velocity features: under 30s stale, near-real-time features: under 5 min stale). Alert when staleness exceeds 2x the SLA - this indicates the streaming pipeline is falling behind.
Redis memory planning: At 50 million users with 20 features each (average 8 bytes per feature): 50M * 20 * 8 = 8 GB minimum. Add overhead for Redis hash encoding and sorted sets. A Redis Cluster with 3 shards, 32GB RAM each, handles this with headroom.
Pipeline backpressure: When the Kafka consumer lag grows (streaming pipeline falling behind), feature staleness increases. Monitor consumer lag alongside feature staleness. Set up alerts on consumer lag that fire before staleness SLAs are violated.
:::warning Feature Backfilling After Pipeline Bug Fix When you fix a bug in a feature computation pipeline, you must backfill the corrected features for the period when the bug was active. Without backfill, the feature store contains a mix of correct and incorrect values. The pattern: run the fixed pipeline over the historical event log for the affected period and write corrected values to the feature store with their original timestamps. Monitor for anomalies in the feature distribution during the backfill window. :::
:::danger The Lookup vs Compute Confusion A common architecture mistake: computing expensive features at serving time that could have been pre-computed. If a feature takes 50ms to compute (e.g., joining two tables), but you need a 50ms total SLA, you cannot compute it at serving time. Always ask: can this feature be pre-computed and stored? If yes, how fresh does it need to be? If freshness requirement allows pre-computation (even 1 minute stale is acceptable), move the computation to the feature pipeline and serve from Redis. :::
Interview Q&A
Q: How do you implement a sliding window count feature using Redis?
Use a Redis Sorted Set where each member is an event ID and the score is the event timestamp. To add an event: ZADD window_key timestamp event_id. To count events in the last N seconds: ZCOUNT window_key (current_time - N) +inf. This is O(log n) for both operations. To prevent unbounded growth, periodically call ZREMRANGEBYSCORE to remove events older than the maximum window size you need. Set a TTL on the key for additional safety. Wrap the add in a pipeline (ZADD + ZREMRANGEBYSCORE + EXPIRE) for atomicity and reduced round trips.
Q: What is point-in-time correct joining and why does it matter for training data?
Point-in-time correct joining ensures that training data uses only the feature values that would have been available at the moment each training label was generated - not future values. Without it, you have data leakage: a training example for a ride at 2:00 PM uses features computed from events that happened at 2:30 PM. The model learns patterns that include information from the future, leading to inflated offline metrics and poor production performance. Implementation: store all feature values with their computation timestamps. When generating training data, for each training example (label with timestamp T), look up feature values where feature_timestamp less than or equal to T. This requires a time-series feature store that retains historical values, not just the current value.
Q: How would you detect and respond to training-serving skew in real-time features?
Instrument both sides: log features at training time and at serving time. For a sample of serving requests, compare the feature values that were served against the values computed by replaying the training pipeline on the same inputs. Compute distribution statistics (mean, percentiles) for each feature in both training and serving. Alert when the KL divergence between training and serving distributions exceeds a threshold per feature. When skew is detected: first verify the serving pipeline is processing events correctly by comparing against a reference computation. Then check if the training data generation code matches the serving code (same window boundaries, same handling of missing values).
Q: How do you handle geographic aggregations as ML features within a 50ms serving budget?
Pre-compute geographic aggregations at the geohash level and store them in Redis. Divide the map into geohash cells (precision 5 gives ~5km x 5km cells). Run a Flink job that maintains per-cell counts of active drivers and recent requests. At serving time, look up the cell for the user's location (O(1) geohash computation) and read the pre-aggregated counts from Redis (1-2ms). This is the approach used by Uber: they pre-compute supply-demand features at multiple geohash precision levels and serve them from Redis at request time. The Flink job updates the per-cell aggregates as new GPS pings and requests arrive.
Q: Describe the architecture of Uber's Michelangelo feature pipeline and why it eliminates training-serving skew.
Michelangelo uses a unified feature definition layer: features are defined in a domain-specific language that is compiled to both Spark code (for offline training data generation) and Flink code (for online serving pipeline). Because both use the same feature logic compiled from the same definition, the computation is structurally identical - the only difference is the data source (historical events for training, live Kafka for serving). Michelangelo also implements point-in-time correct joins for training: when generating training data, it replays the historical event log through the feature pipeline to reconstruct the feature values that would have been available at each training example's label timestamp. This guarantees that what the model sees in training exactly matches what it sees in serving.
