:::tip 🎮 Interactive Playground Visualize this concept: Try the Feature Store Architecture demo on the EngineersOfAI Playground - no code required. :::
Temporal Features for Real-Time ML
Two Hundred Features, All Time-Based, No Leakage
The user behavior prediction team is engineering features for a model that predicts whether a user will make a purchase in the next 30 minutes. They have access to 18 months of user interaction logs and a feature engineering pipeline that runs on Apache Flink.
Their first model, built by a data scientist unfamiliar with temporal leakage, achieved 91.2% AUC in offline evaluation. In production: 73.8% AUC. The 17-point gap traced to a single feature: "average purchase rate of users who viewed this category." In the offline evaluation, this feature was computed using future data - the average included purchases that happened after the prediction point. In production, only historical data was available.
The second team, working more carefully, built 200+ temporal features without any leakage. The framework they used: every feature must be computable using only information available at the prediction timestamp, and the computation must be byte-for-byte identical between training and serving.
Key features in the final set:
- Recency-weighted view counts (recent views count more)
- Session-based features (views since the current session started)
- Sliding window aggregations (purchases in last 5/30/60 minutes)
- Inter-event timing features (time since last view, average time between views)
- User trajectory features (is view rate accelerating or decelerating?)
The model reached 89.4% AUC in production - within 1.8 points of offline. The gap disappeared when temporal leakage was eliminated.
Why This Exists - The Leakage Problem
Temporal leakage is the most common cause of "model looks great offline, fails in production" in time-series and behavior prediction tasks. It occurs when features at training time incorporate information that would not be available at prediction time.
The leakage is usually subtle. Consider "user's average session length over the past 30 days." Computed naively in SQL: AVG(session_length) WHERE session_date >= prediction_date - 30 days. In offline training, prediction_date is a historical label date - but if the query runs at the END of training data collection, sessions that happened AFTER the label date but within 30 days are included. In production, those future sessions do not exist yet.
The three forms of temporal leakage:
- Future data in aggregations: averages, counts, or statistics computed over a window that extends into the future
- Point-in-time join errors: joining features to labels using the current state of the feature table rather than the state at label time
- Target leakage: features that are derived from or correlated with the label after the prediction point
Temporal feature engineering is the discipline of computing features that are guaranteed leak-free - using only information available at the exact prediction timestamp.
Historical Context
Temporal feature engineering matured as a discipline around 2015-2020, driven by the proliferation of behavior prediction models at e-commerce and ad-tech companies. Kaggle competitions (especially time-series competitions) formalized temporal cross-validation strategies and made temporal leakage a well-understood failure mode.
The concept of point-in-time joins came from finance, where backtesting trading strategies required computing features using only historically available data at each decision point. Tools like Featurestore.org and Feast (2019) built point-in-time join as a first-class operation, making it easier to compute leak-free features at scale.
Flink's event-time processing with watermarks provided the streaming infrastructure needed to compute temporal aggregations correctly in real-time - using the event's own timestamp rather than wall-clock time for window assignments, enabling replayable, deterministic feature computation.
The training/serving skew problem became widely recognized around 2018, when papers from Google ("Machine Learning: The High Interest Credit Card of Technical Debt," 2015, Sculley et al.) and industry blog posts highlighted how subtle differences in feature computation between training and serving were responsible for a majority of production ML failures.
The Temporal Feature Taxonomy
Recency-Weighted Features
A simple count of views in the last hour treats a view from 5 minutes ago identically to a view from 59 minutes ago. Recency weighting gives more weight to recent events:
Where is an exponential decay function and is the event value.
The half-life (time for weight to decay by 50%) controls the decay rate:
# recency_weighted.py - recency-weighted feature computation
import math
import time
from typing import List, Tuple
from collections import deque
class RecencyWeightedCounter:
"""
Computes recency-weighted event counts.
Events decay exponentially with configurable half-life.
"""
def __init__(self, half_life_seconds: float = 300.0):
"""
half_life_seconds: time after which an event's weight halves.
300s = 5 minutes, 3600s = 1 hour
"""
self.half_life = half_life_seconds
self.decay_rate = math.log(2) / half_life_seconds
# (timestamp, value) pairs
self._events: deque = deque()
self._max_history_seconds = half_life_seconds * 10 # keep ~10 half-lives
def record(self, value: float = 1.0, timestamp: float = None):
"""Record an event with optional explicit timestamp."""
ts = timestamp or time.time()
self._events.append((ts, value))
self._evict_old(ts)
def compute(self, reference_time: float = None) -> float:
"""
Compute recency-weighted sum at reference_time.
reference_time defaults to now.
Critical for correctness: use event_time, not processing_time.
In training: reference_time = label timestamp (not current time)
In serving: reference_time = current time (or request timestamp)
"""
t_ref = reference_time or time.time()
self._evict_old(t_ref)
total = 0.0
for ts, value in self._events:
delta = t_ref - ts
if delta >= 0: # only use past events
weight = math.exp(-self.decay_rate * delta)
total += value * weight
return total
def _evict_old(self, current_ts: float):
"""Remove events beyond max history window."""
cutoff = current_ts - self._max_history_seconds
while self._events and self._events[0][0] < cutoff:
self._events.popleft()
def compute_recency_weighted_features(
events: List[Tuple[float, str, float]], # (timestamp, event_type, value)
prediction_time: float,
half_lives: List[float] = [60, 300, 1800, 3600] # 1m, 5m, 30m, 1h
) -> dict:
"""
Compute recency-weighted features for all specified half-lives.
prediction_time is the exact moment we are making a prediction.
Only events BEFORE prediction_time are used.
"""
features = {}
for half_life in half_lives:
counter = RecencyWeightedCounter(half_life_seconds=half_life)
for ts, event_type, value in events:
if ts < prediction_time: # strict: exclude events at or after prediction_time
counter.record(value, timestamp=ts)
score = counter.compute(reference_time=prediction_time)
hl_name = f"{int(half_life)}s" if half_life < 3600 else f"{int(half_life/3600)}h"
features[f"recency_score_hl{hl_name}"] = score
return features
Session-Based Features
A "session" is a period of continuous user activity, typically defined as: a new session starts after N minutes of inactivity. Session features capture behavior within the current engagement window.
# session_features.py - real-time session-based feature computation
import time
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Session:
session_id: str
user_id: str
start_time: float
last_event_time: float
timeout_seconds: float = 1800.0 # 30 minutes
# Session-level counters
view_count: int = 0
cart_add_count: int = 0
purchase_count: int = 0
unique_items_viewed: set = field(default_factory=set)
unique_categories_viewed: set = field(default_factory=set)
view_timestamps: List[float] = field(default_factory=list)
@property
def is_active(self) -> bool:
"""Session is active if last event was within timeout window."""
return (time.time() - self.last_event_time) < self.timeout_seconds
@property
def duration_seconds(self) -> float:
return self.last_event_time - self.start_time
@property
def view_rate_per_minute(self) -> float:
duration_min = max(self.duration_seconds / 60, 0.1)
return self.view_count / duration_min
@property
def inter_view_interval_seconds(self) -> Optional[float]:
"""Average time between consecutive views."""
if len(self.view_timestamps) < 2:
return None
intervals = [
self.view_timestamps[i+1] - self.view_timestamps[i]
for i in range(len(self.view_timestamps) - 1)
]
return sum(intervals) / len(intervals)
def apply_event(self, event_type: str, item_id: str, category: str, ts: float):
"""Update session state with a new event."""
self.last_event_time = ts
if event_type == "item_view":
self.view_count += 1
self.unique_items_viewed.add(item_id)
self.unique_categories_viewed.add(category)
self.view_timestamps.append(ts)
elif event_type == "cart_add":
self.cart_add_count += 1
elif event_type == "purchase":
self.purchase_count += 1
def to_features(self, prediction_time: float) -> dict:
"""Convert session state to ML features."""
return {
"session_duration_s": prediction_time - self.start_time,
"session_view_count": self.view_count,
"session_cart_add_count": self.cart_add_count,
"session_purchase_count": self.purchase_count,
"session_unique_items": len(self.unique_items_viewed),
"session_unique_categories": len(self.unique_categories_viewed),
"session_view_rate_per_min": self.view_rate_per_minute,
"session_inter_view_s": self.inter_view_interval_seconds or 0,
"session_cart_add_rate": (
self.cart_add_count / max(self.view_count, 1)
),
"time_since_session_start_s": prediction_time - self.start_time,
"time_since_last_event_s": prediction_time - self.last_event_time,
}
class SessionFeatureEngine:
"""
Manages per-user sessions and computes session features in real-time.
Stores session state in Redis for multi-server consistency.
"""
def __init__(self, redis_client, session_timeout_s: float = 1800):
self.redis = redis_client
self.timeout = session_timeout_s
self._local_cache = {} # hot user cache
async def get_features(
self, user_id: str, prediction_time: float = None
) -> dict:
"""Get current session features for prediction."""
ts = prediction_time or time.time()
session = await self._get_or_create_session(user_id, ts)
return session.to_features(ts)
async def record_event(
self, user_id: str, event_type: str,
item_id: str, category: str, event_time: float
):
"""Update session state with new event."""
session = await self._get_or_create_session(user_id, event_time)
session.apply_event(event_type, item_id, category, event_time)
await self._save_session(user_id, session)
async def _get_or_create_session(
self, user_id: str, current_time: float
) -> Session:
"""Load or create session, with timeout-based expiry."""
import json, uuid
if user_id in self._local_cache:
session = self._local_cache[user_id]
if session.is_active:
return session
raw = await self.redis.get(f"session:{user_id}")
if raw:
data = json.loads(raw)
session = Session(
session_id=data["session_id"],
user_id=user_id,
start_time=data["start_time"],
last_event_time=data["last_event_time"],
timeout_seconds=self.timeout,
)
session.view_count = data["view_count"]
session.cart_add_count = data["cart_add_count"]
session.purchase_count = data["purchase_count"]
session.unique_items_viewed = set(data["unique_items_viewed"])
session.unique_categories_viewed = set(data["unique_categories_viewed"])
session.view_timestamps = data["view_timestamps"]
# Check if session expired
if (current_time - session.last_event_time) > self.timeout:
# Start new session
session = Session(
session_id=str(uuid.uuid4()),
user_id=user_id,
start_time=current_time,
last_event_time=current_time,
timeout_seconds=self.timeout,
)
else:
session = Session(
session_id=str(uuid.uuid4()),
user_id=user_id,
start_time=current_time,
last_event_time=current_time,
timeout_seconds=self.timeout,
)
self._local_cache[user_id] = session
return session
async def _save_session(self, user_id: str, session: Session):
import json
data = {
"session_id": session.session_id,
"start_time": session.start_time,
"last_event_time": session.last_event_time,
"view_count": session.view_count,
"cart_add_count": session.cart_add_count,
"purchase_count": session.purchase_count,
"unique_items_viewed": list(session.unique_items_viewed),
"unique_categories_viewed": list(session.unique_categories_viewed),
"view_timestamps": session.view_timestamps[-50:], # keep last 50
}
await self.redis.setex(
f"session:{user_id}",
int(self.timeout * 2), # TTL = 2× timeout
json.dumps(data)
)
Point-in-Time Joins
The most technically complex aspect of temporal feature engineering is the point-in-time join: computing feature values as they existed at the exact moment of each training label.
# point_in_time_join.py - leak-free feature lookup for training
import pandas as pd
from typing import List
def point_in_time_join(
labels: pd.DataFrame, # columns: entity_id, label_time, label
feature_history: pd.DataFrame, # columns: entity_id, feature_time, feature_value
entity_col: str = "user_id",
label_time_col: str = "label_time",
feature_time_col: str = "feature_time",
feature_col: str = "feature_value",
max_lookback_seconds: float = 86400 * 30 # 30-day max feature age
) -> pd.DataFrame:
"""
For each label, find the most recent feature value
that was available at label time (no future leakage).
This is the critical operation for leak-free training data generation.
"""
results = []
for _, label_row in labels.iterrows():
entity = label_row[entity_col]
t_label = label_row[label_time_col]
# Get all feature values for this entity
entity_features = feature_history[
feature_history[entity_col] == entity
].copy()
# Filter: only features available at label time
# Critical: use strict less-than (<) not less-than-or-equal (<=)
# to avoid using features computed from the label itself
valid_features = entity_features[
(entity_features[feature_time_col] < t_label) &
(entity_features[feature_time_col] >= t_label - max_lookback_seconds)
]
if len(valid_features) == 0:
# No feature available - use default/null
feature_value = None
else:
# Take the most recent feature before label time
most_recent = valid_features.nlargest(1, feature_time_col)
feature_value = most_recent[feature_col].iloc[0]
results.append({
**label_row.to_dict(),
feature_col: feature_value,
"feature_age_seconds": (
t_label - most_recent[feature_time_col].iloc[0]
if feature_value is not None else None
)
})
return pd.DataFrame(results)
# For large-scale production use, Feast implements this as:
# feast.get_historical_features(entity_df=labels, feature_refs=[...])
# Which uses an ASOF join (sorted merge join on timestamps) for efficiency.
Temporal Leakage Prevention Framework
A systematic checklist for every feature:
# leakage_checker.py - automated temporal leakage detection
import pandas as pd
from typing import List, Tuple
def check_temporal_leakage(
training_df: pd.DataFrame,
label_col: str,
feature_cols: List[str],
label_time_col: str = "label_time",
significance_threshold: float = 0.05
) -> List[Tuple[str, float, str]]:
"""
Check features for temporal leakage using mutual information.
Features with suspiciously high correlation with future labels
may indicate leakage.
Returns list of (feature_name, suspicion_score, warning_message).
"""
from sklearn.feature_selection import mutual_info_classif
import numpy as np
warnings = []
for feature in feature_cols:
if feature not in training_df.columns:
continue
feature_values = training_df[feature].fillna(0).values
label_values = training_df[label_col].values
# Mutual information between feature and label
mi = mutual_info_classif(
feature_values.reshape(-1, 1),
label_values,
discrete_features=False
)[0]
# Features with very high MI are suspicious
if mi > 0.5:
warnings.append((
feature,
mi,
f"HIGH MI ({mi:.3f}) - possible leakage, verify feature timing"
))
elif mi > 0.3:
warnings.append((
feature,
mi,
f"MODERATE MI ({mi:.3f}) - review feature computation window"
))
# Also check for perfect correlation (obvious leakage)
corr = training_df[feature_cols].corrwith(training_df[label_col]).abs()
for feature, correlation in corr.items():
if correlation > 0.8:
warnings.append((
feature,
correlation,
f"VERY HIGH CORRELATION ({correlation:.3f}) - likely leakage"
))
return sorted(warnings, key=lambda x: x[1], reverse=True)
# Rules for temporal leakage prevention:
LEAKAGE_PREVENTION_RULES = """
Temporal Feature Engineering Rules (NO EXCEPTIONS):
1. POINT-IN-TIME CORRECTNESS
Every aggregation window must be computed using only events
with timestamp STRICTLY BEFORE the prediction timestamp.
Use: events WHERE event_time < prediction_time
NOT: events WHERE event_time <= prediction_time (may include concurrent events)
2. NO TARGET STATISTICS
Never use aggregate statistics that include the target event.
Bad: avg_purchase_rate_including_current_purchase
Good: avg_purchase_rate_excluding_current_session
3. FEATURE-LABEL TIMESTAMP ORDERING
Ensure feature_computation_time < label_time for all training examples.
Validate using: assert (training_df['feature_time'] < training_df['label_time']).all()
4. IDENTICAL COMPUTATION IN TRAINING AND SERVING
The exact same code/logic must compute features in both contexts.
Test: feed identical events to both training pipeline and serving pipeline,
verify identical feature values.
5. CONSISTENT WINDOW DEFINITIONS
A "last 5 minutes" window in training must use the same definition in serving.
Training: events in [label_time - 300s, label_time)
Serving: events in [now - 300s, now)
They must use the same time unit, same boundary treatment.
"""
Clock Skew in Distributed Systems
Real-time ML systems running across multiple servers face clock skew: server A's clock may be 50-200ms ahead of server B's. For temporal features computed in distributed systems, this creates subtle inconsistencies:
# clock_skew.py - handling clock skew in distributed temporal features
import time
import ntplib
from typing import Optional
class ClockSyncManager:
"""
Manages clock synchronization for temporal feature consistency.
Critical when event timestamps from different servers are compared.
"""
def __init__(self, ntp_server: str = "pool.ntp.org"):
self.ntp_server = ntp_server
self._clock_offset_ms: float = 0.0
self._last_sync: float = 0
def sync_ntp(self) -> float:
"""Sync with NTP server and return clock offset in ms."""
try:
c = ntplib.NTPClient()
response = c.request(self.ntp_server, version=3)
self._clock_offset_ms = response.offset * 1000
self._last_sync = time.time()
return self._clock_offset_ms
except Exception as e:
print(f"NTP sync failed: {e}")
return self._clock_offset_ms
def corrected_time(self) -> float:
"""Return NTP-corrected current time."""
return time.time() + (self._clock_offset_ms / 1000)
def safe_event_window(self, window_seconds: float, skew_buffer_ms: float = 200) -> float:
"""
Return a window slightly larger than requested to account for clock skew.
An event that "should" be in a 5-minute window from a server with 200ms skew
might arrive labeled as 200ms outside the window.
"""
return window_seconds + (skew_buffer_ms / 1000)
# Best practice: use event timestamps from a single authoritative clock source.
# All services should use NTP-synchronized timestamps.
# Accept events within a clock_skew_buffer of the window boundary.
# For Flink: use WatermarkStrategy.for_bounded_out_of_orderness(Duration.ofMillis(200))
Building 200+ Temporal Features Without Leakage
The full feature set for user behavior prediction:
# temporal_feature_set.py - comprehensive temporal feature computation
import time
from typing import List, Dict, Any
def compute_all_temporal_features(
user_id: str,
events: List[Dict], # sorted by event_time, older first
prediction_time: float, # the moment we are predicting for
session_timeout_s: float = 1800
) -> Dict[str, Any]:
"""
Compute 200+ temporal features from event history.
ONLY uses events strictly before prediction_time.
"""
# Filter to historical events only
history = [e for e in events if e["event_time"] < prediction_time]
history.sort(key=lambda e: e["event_time"])
features = {}
now = prediction_time
# =========================================
# 1. Account-level features (point-in-time)
# =========================================
if history:
account_created = min(e["event_time"] for e in history)
features["account_age_days"] = (now - account_created) / 86400
features["total_events"] = len(history)
# =========================================
# 2. Sliding window counts
# =========================================
for window_s, window_name in [(60, "1m"), (300, "5m"), (1800, "30m"),
(3600, "1h"), (86400, "1d"), (604800, "7d")]:
cutoff = now - window_s
window_events = [e for e in history if e["event_time"] >= cutoff]
features[f"event_count_{window_name}"] = len(window_events)
features[f"view_count_{window_name}"] = sum(
1 for e in window_events if e["type"] == "view"
)
features[f"purchase_count_{window_name}"] = sum(
1 for e in window_events if e["type"] == "purchase"
)
features[f"cart_add_count_{window_name}"] = sum(
1 for e in window_events if e["type"] == "cart_add"
)
features[f"unique_items_{window_name}"] = len(set(
e.get("item_id") for e in window_events if e.get("item_id")
))
features[f"unique_categories_{window_name}"] = len(set(
e.get("category") for e in window_events if e.get("category")
))
spend = sum(
e.get("price", 0) for e in window_events if e["type"] == "purchase"
)
features[f"spend_{window_name}"] = spend
# =========================================
# 3. Recency-weighted features
# =========================================
import math
for half_life_s, hl_name in [(60, "1m"), (300, "5m"), (1800, "30m")]:
decay_rate = math.log(2) / half_life_s
weighted_views = sum(
math.exp(-decay_rate * (now - e["event_time"]))
for e in history if e["type"] == "view"
)
features[f"recency_views_hl{hl_name}"] = weighted_views
# =========================================
# 4. Session features
# =========================================
# Find current session boundary
current_session_start = now
for e in reversed(history):
gap = current_session_start - e["event_time"]
if gap > session_timeout_s:
break
current_session_start = e["event_time"]
session_events = [
e for e in history if e["event_time"] >= current_session_start
]
features["session_view_count"] = sum(1 for e in session_events if e["type"] == "view")
features["session_duration_s"] = now - current_session_start
features["session_purchase_count"] = sum(1 for e in session_events if e["type"] == "purchase")
features["session_cart_rate"] = (
sum(1 for e in session_events if e["type"] == "cart_add") /
max(len(session_events), 1)
)
# =========================================
# 5. Trajectory (rate of change)
# =========================================
# View rate: first half vs second half of 1h window
hour_events = [e for e in history if e["event_time"] >= now - 3600]
first_half = [e for e in hour_events if e["event_time"] < now - 1800]
second_half = [e for e in hour_events if e["event_time"] >= now - 1800]
first_rate = len(first_half) / 30 # per minute
second_rate = len(second_half) / 30
features["view_rate_trend"] = second_rate - first_rate # positive = accelerating
# =========================================
# 6. Inter-event timing
# =========================================
if len(history) >= 2:
recent_events = history[-20:] # last 20 events
intervals = [
recent_events[i+1]["event_time"] - recent_events[i]["event_time"]
for i in range(len(recent_events)-1)
]
features["mean_inter_event_s"] = sum(intervals) / len(intervals)
features["min_inter_event_s"] = min(intervals)
features["time_since_last_event_s"] = now - history[-1]["event_time"]
else:
features["mean_inter_event_s"] = 0
features["min_inter_event_s"] = 0
features["time_since_last_event_s"] = now - (history[-1]["event_time"] if history else now)
return features
Common Mistakes
:::danger Future Data in Aggregation Windows
The most common leakage bug: computing "average purchases per user in the category during the week" where the week includes days after the prediction date. Always validate: assert event_time < prediction_time for every event used in a feature. In SQL: WHERE event_time < :prediction_time. In Flink: use event-time semantics with watermarks. This check should be automated in your feature validation pipeline.
:::
:::danger Training/Serving Skew in Window Boundary Treatment
Subtle but common: training computes "last 5 minutes" as event_time >= label_time - 300, but serving computes it as event_time >= now - 300. These should be equivalent, but if there is any clock skew or the feature computation is delayed in serving (feature is computed 2 seconds after the request), the windows are different. Standardize on UTC timestamps, use the request timestamp (not current time) as the reference point in serving, and test that identical event sequences produce identical features in both pipelines.
:::
:::warning Session Boundary Inconsistency Between Training and Serving Training computes sessions by processing events sequentially and applying a 30-minute inactivity timeout. Serving loads session state from Redis, which may have persisted a session start time from a different server. If the server's clock is ahead, the session might be shorter in serving than in training. Test with synthetic event sequences: feed the same events to both the training pipeline and serving pipeline, verify that session boundaries and feature values match exactly. :::
Interview Q&A
Q1: What is temporal leakage in ML feature engineering, and how do you prevent it?
A: Temporal leakage occurs when features computed for training include information that would not be available at prediction time - typically future data or data from after the label event. It causes models to appear highly accurate offline while performing much worse in production. Prevention has three levels: (1) Correct window boundaries: every aggregation uses events strictly before the prediction timestamp (event_time < prediction_time). (2) Point-in-time joins: when joining features to labels, use the most recent feature value before the label time, not the current feature value. (3) Code consistency: the exact same feature computation logic must run in training and serving. Different SQL in training and Python in serving, even if logically equivalent, creates opportunities for subtle discrepancies. Automate validation: for a sample of production requests, compare features computed by the offline pipeline with features computed by the serving system using identical inputs. Any discrepancy is a bug.
Q2: What is a point-in-time join and when is it used?
A: A point-in-time join (also called "as-of join" or "temporal join") looks up the most recent value of a feature that was available at each label's timestamp, without using any future information. For example: training labels have a label_time column. User features (like "average purchase rate") are stored with a feature_computation_time. A naive join would give each label the current feature value - which may include purchases that happened after label_time. A point-in-time join gives each label the feature value that was most recently computed before label_time. This is implemented as a sorted merge join: for each label sorted by time, find the latest feature update before that time. Feature stores like Feast, Tecton, and Hopsworks implement point-in-time joins as a built-in operation precisely because it is so commonly needed and so easy to get wrong.
Q3: How do you engineer session-based features for a real-time recommendation model without training/serving skew?
A: The key is using a single, shared session state management system for both training data generation and serving. In serving: when a user event arrives, load the user's current session from Redis, update it with the event, save it back, and use the updated session state for feature computation. In training: replay historical events through the same session logic (same code, same timeout rules, same Redis-like in-memory state), producing session features at each label timestamp. The session definition (inactivity timeout, session attributes tracked) must be identical in both. Test by replaying a sequence of production events and comparing session features produced by the training pipeline vs what was recorded in the serving system - they should match within clock skew tolerances.
Q4: Explain recency-weighted features and when they are better than simple counts.
A: Simple counts treat all events in a window equally - a view from 1 second ago and a view from 29 minutes ago contribute equally to a "views in last 30 minutes" count. Recency-weighted features use exponential decay: recent events contribute more than old events. The weight of an event at time evaluated at prediction time is , where and is the half-life (time for weight to halve). Recency weighting is better when: (a) you expect user interest to be time-sensitive (what a user viewed 1 minute ago is much more indicative of purchase intent than what they viewed 28 minutes ago); (b) you want smooth behavior near window boundaries (simple windows have a cliff: an event just inside the window has full weight, just outside has zero); (c) you want to capture momentum (rapidly accelerating recent activity vs slowly decelerating old activity). Use multiple half-lives (1min, 5min, 30min) as separate features - let the model learn which time horizon is most predictive.
Q5: How do you handle clock skew in distributed systems when computing temporal features?
A: Clock skew (different servers having slightly different clock readings, typically 50-500ms) creates problems when events from different servers are compared or when window boundaries depend on server-local time. The mitigations: (1) Use NTP synchronization on all servers, reduce max drift to under 50ms, and add a 200ms buffer to window boundaries when consuming events. (2) Use the event's embedded timestamp rather than the consumer's wall clock for all feature computations - the event timestamp is set by the producer and is consistent. (3) For Flink stream processing, use WatermarkStrategy.for_bounded_out_of_orderness(Duration.ofMillis(200)) to allow events 200ms late before closing windows. (4) For exact reproducibility (training/serving consistency), use logical timestamps where possible: the Kafka offset or a distributed sequence number as the ordering key, with event timestamps only for human-interpretable time windows. (5) Monitor clock drift with node_timex_offset_seconds metric from the Node Exporter Prometheus integration - alert when any node's drift exceeds 100ms.
