Skip to main content

:::tip ๐ŸŽฎ Interactive Playground Visualize this concept: Try the Flink Stream Processing demo on the EngineersOfAI Playground - no code required. :::

Real-Time Aggregations

Ten Thousand Events, Five Windows, One Wrong Designโ€‹

A risk scoring system needed five features: transaction count in the last 1 minute, 5 minutes, 1 hour, 24 hours, and 7 days. A natural first implementation: maintain five separate Flink windows, one per feature. Each window holds raw transaction events. The window for 7 days holds 7 days' worth of events per user.

For a user with 10,000 transactions over 7 days - not unusual for an active merchant account - the Flink state was: 10,000 events in the 7-day window + 10,000 events (duplicated) in the 24-hour window + 2,500 events in the 1-hour window + 500 events in the 5-minute window + 100 events in the 1-minute window. Total: roughly 23,100 stored events per user, for what amounts to five integers.

The correct design: maintain a single time-indexed event log per user, then query it with different window boundaries. For a user with 10,000 transactions over 7 days, state size is 10,000 events total - not 23,100. The five feature values are computed by querying this single log with five different start times.

Better still: pre-aggregate into minute-level buckets. A user with 10,000 transactions over 7 days has at most 10,080 minute buckets (7 days ร— 24 hours ร— 60 minutes). Each bucket stores a count and a sum - two integers. Total state: 10,080 ร— 2 integers = ~80KB per user, down from the raw event log's ~500KB. And the window query becomes a sum over contiguous buckets - O(window_minutes), not O(events).

This lesson covers the right way to compute aggregations over time windows at scale, and the data structures that make it efficient.


Why This Existsโ€‹

Batch aggregation is trivial: scan all events, group by entity and time window, count/sum/average. But batch aggregation is too slow for real-time features. A model that needs tx_count_1m at the moment a transaction arrives cannot wait for a Spark job to finish.

The alternative - scanning raw event history on every request - is too expensive at scale. For a user with 10,000 events, counting events in the last minute requires scanning the full history to find the recent ones. With millions of active users, this is impractical.

Real-time aggregation solves this by maintaining pre-aggregated state that makes window queries cheap. The state is updated incrementally as events arrive (O(1) per event) and queried efficiently (O(window_size) where window_size is measured in buckets, not events).


Window Typesโ€‹

Tumbling windows - non-overlapping fixed-size windows. "This hour" (00:00โ€“00:59, 01:00โ€“01:59, ...). Each event belongs to exactly one window. Used for hourly/daily batch-like aggregations.

Sliding windows - overlapping windows. "The last 60 minutes" - slides forward with each new event. Each event belongs to multiple windows. The standard for "last N minutes" velocity features.

Session windows - event-driven. A session window opens when an event arrives and closes when no events arrive for a configurable gap (e.g., 30 minutes of inactivity). Used for user session features: "items in current session."


Exact vs. Approximate Aggregationsโ€‹

Not all aggregations can be computed exactly at bounded memory. Understanding when to use approximations - and which approximations - is core to real-time aggregation design.

AggregationExact Possible?MemoryApproximate Algorithm
COUNTYesO(1)-
SUMYesO(1)-
MEANYesO(1) (count + sum)-
MAX / MINYesO(1)-
COUNT DISTINCTRequires O(N) for exactO(N)HyperLogLog (1โ€“2% error, 1.5KB)
TOP-KRequires O(KยทN)O(KยทN)Count-Min Sketch + Heap
PERCENTILERequires O(N) for exactO(N)t-digest (5% error)

For COUNT, SUM, MEAN, MAX - use exact. Memory is O(1), so there's no cost.

For COUNT DISTINCT - use HyperLogLog. Exact distinct counting requires storing every unique value seen, which grows unboundedly for high-cardinality sets (distinct merchants, distinct IPs, distinct items). HyperLogLog provides a 1โ€“2% relative error estimate using only ~1.5KB of state, independent of the true cardinality.


The Two-Level Aggregation Patternโ€‹

This is the core pattern for efficient real-time aggregation at scale. Instead of maintaining raw events in window state, maintain pre-aggregated minute-level buckets.

Level 1 (event ingestion): on each event, add it to the current minute's bucket. O(1) per event.

Level 2 (window query): to compute the "last N minutes" window, sum the most recent N minute buckets. O(N) where N is the window size in minutes.

For a 24-hour window (1440 minutes), a single query sums 1440 bucket values - a few microseconds of computation. Contrast with scanning raw events: 10,000 events ร— comparison operation = milliseconds.

import redis
import time
import json
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field


@dataclass
class MinuteBucket:
"""Aggregated statistics for one minute."""
count: int = 0
total_amount: float = 0.0
merchant_ids: set = field(default_factory=set) # For HLL approximation demo


class RealTimeAggregator:
"""
Efficient real-time aggregation using minute-level buckets stored in Redis.

Architecture:
- Events are bucketed by minute: bucket_key = floor(timestamp / 60) * 60
- Redis Hash per entity-minute: {count: N, sum: X.XX}
- Window query: sum buckets from (now - window) to now
- Expired buckets evict automatically via TTL on each bucket key
"""

# Maximum window we support - determines TTL for buckets
MAX_WINDOW_MINUTES = 7 * 24 * 60 # 7 days

def __init__(self, redis_client: redis.Redis, namespace: str = "agg"):
self.redis = redis_client
self.ns = namespace

def _bucket_ts(self, timestamp: float) -> int:
"""Floor timestamp to the nearest minute."""
return int(timestamp // 60) * 60

def _bucket_key(self, entity_id: str, bucket_ts: int) -> str:
return f"{self.ns}:{entity_id}:{bucket_ts}"

def ingest_event(
self,
entity_id: str,
amount: float,
timestamp: Optional[float] = None,
) -> None:
"""
Ingest a transaction event. Updates the appropriate minute bucket.
O(1) - one Redis pipeline per event.
"""
ts = timestamp or time.time()
bucket_ts = self._bucket_ts(ts)
key = self._bucket_key(entity_id, bucket_ts)

pipe = self.redis.pipeline()
pipe.hincrbyfloat(key, "count", 1)
pipe.hincrbyfloat(key, "sum", amount)
# TTL = 7 days + 1 hour buffer
pipe.expire(key, self.MAX_WINDOW_MINUTES * 60 + 3600)
pipe.execute()

def query_window(
self,
entity_id: str,
window_minutes: int,
now: Optional[float] = None,
) -> Dict[str, float]:
"""
Compute aggregations over the last window_minutes.
O(window_minutes) - one Redis pipeline with window_minutes HGETALL calls.

Supports any window up to MAX_WINDOW_MINUTES without re-computing.
"""
assert window_minutes <= self.MAX_WINDOW_MINUTES

now = now or time.time()
current_bucket = self._bucket_ts(now)

# Generate keys for all minute buckets in the window
bucket_timestamps = [
current_bucket - (i * 60)
for i in range(window_minutes)
]
keys = [self._bucket_key(entity_id, bts) for bts in bucket_timestamps]

# Fetch all buckets in a single pipeline
pipe = self.redis.pipeline(transaction=False)
for key in keys:
pipe.hgetall(key)
results = pipe.execute()

total_count = 0
total_amount = 0.0
active_buckets = 0

for raw in results:
if raw:
total_count += int(float(raw.get(b"count", 0)))
total_amount += float(raw.get(b"sum", 0))
active_buckets += 1

return {
"count": total_count,
"sum": round(total_amount, 2),
"mean": round(total_amount / max(total_count, 1), 2),
"active_buckets": active_buckets,
"window_minutes": window_minutes,
}

def query_multi_window(
self,
entity_id: str,
window_minutes_list: List[int],
now: Optional[float] = None,
) -> Dict[int, Dict[str, float]]:
"""
Query multiple windows in a single Redis round trip.
Fetches the max window's buckets once, then sub-sums for shorter windows.
O(max_window) instead of O(sum_of_all_windows).
"""
now = now or time.time()
max_window = max(window_minutes_list)
current_bucket = self._bucket_ts(now)

bucket_timestamps = [
current_bucket - (i * 60)
for i in range(max_window)
]
keys = [self._bucket_key(entity_id, bts) for bts in bucket_timestamps]

pipe = self.redis.pipeline(transaction=False)
for key in keys:
pipe.hgetall(key)
all_results = pipe.execute()

# Parse all buckets once
bucket_data = []
for raw in all_results:
if raw:
bucket_data.append((
int(float(raw.get(b"count", 0))),
float(raw.get(b"sum", 0)),
))
else:
bucket_data.append((0, 0.0))

# Build cumulative sums for efficient sub-window queries
cum_count = [0] * (max_window + 1)
cum_sum = [0.0] * (max_window + 1)
for i, (cnt, amt) in enumerate(bucket_data):
cum_count[i + 1] = cum_count[i] + cnt
cum_sum[i + 1] = cum_sum[i] + amt

results = {}
for w in window_minutes_list:
count = cum_count[w]
total = cum_sum[w]
results[w] = {
"count": count,
"sum": round(total, 2),
"mean": round(total / max(count, 1), 2),
"window_minutes": w,
}

return results

HyperLogLog for COUNT DISTINCTโ€‹

HyperLogLog (HLL) is a probabilistic data structure for estimating the number of distinct elements in a multiset. It uses ~1.5KB of memory regardless of the number of distinct elements - you can estimate the number of distinct IP addresses in the last hour from 10 million events using 1.5KB of state.

Redis has native HyperLogLog support via PFADD and PFCOUNT:

import redis
import time

r = redis.Redis(decode_responses=True)

def track_distinct_merchants(
user_id: str,
merchant_id: str,
window_minutes: int = 60,
timestamp: float = None,
) -> None:
"""
Track distinct merchants a user transacted with using HyperLogLog.
Each minute has its own HLL key (one per minute bucket).
"""
ts = timestamp or time.time()
bucket_ts = int(ts // 60) * 60
key = f"hll:merchants:{user_id}:{bucket_ts}"

r.pfadd(key, merchant_id)
r.expire(key, window_minutes * 60 + 3600) # TTL with buffer


def count_distinct_merchants(
user_id: str,
window_minutes: int = 60,
now: float = None,
) -> int:
"""
Estimate distinct merchants in the last window_minutes.
Uses PFMERGE to combine per-minute HLL buckets - O(window_minutes).
Error rate: ~1.5% (standard HLL error).
"""
now = now or time.time()
current_bucket = int(now // 60) * 60

source_keys = [
f"hll:merchants:{user_id}:{current_bucket - i * 60}"
for i in range(window_minutes)
]

# Filter to existing keys
pipe = r.pipeline()
for k in source_keys:
pipe.exists(k)
exists = pipe.execute()
existing_keys = [k for k, ex in zip(source_keys, exists) if ex]

if not existing_keys:
return 0

# Merge all per-minute HLLs into a temporary key and count
merge_key = f"hll:merge:{user_id}:{int(now)}"
r.pfmerge(merge_key, *existing_keys)
r.expire(merge_key, 10) # Temporary key, expires in 10 seconds
return r.pfcount(merge_key)


# Example usage
for merchant in ["amazon", "walmart", "target", "amazon", "bestbuy", "amazon"]:
track_distinct_merchants("u001", merchant)

distinct = count_distinct_merchants("u001", window_minutes=60)
print(f"Distinct merchants (HLL estimate): {distinct}") # Expected: ~4

Redis HLL accuracy: 0.81% standard error - for 100 distinct merchants, expect an estimate between 99 and 101 about 68% of the time.


Redis Sorted Set for Precise Sliding Windowsโ€‹

When you need exact counts (not HLL estimates) for recent events and memory is acceptable, use a Redis Sorted Set with event timestamps as scores. This enables precise O(log N) insertion and O(log N + result_count) window queries.

import redis
import time
import uuid

r = redis.Redis(decode_responses=True)

def log_transaction(
user_id: str,
amount: float,
timestamp: float = None,
) -> None:
"""Log a transaction event to the user's sorted set. O(log N)."""
ts = timestamp or time.time()
tx_id = str(uuid.uuid4())[:8]
member = f"{tx_id}:{amount}"

key = f"tx_log:{user_id}"
pipe = r.pipeline()
pipe.zadd(key, {member: ts})
# Keep only 7 days of events - evict older entries
cutoff = ts - 7 * 86400
pipe.zremrangebyscore(key, "-inf", cutoff)
pipe.expire(key, 7 * 86400 + 3600)
pipe.execute()


def count_transactions_in_window(
user_id: str,
window_seconds: int,
now: float = None,
) -> Tuple[int, float]:
"""
Count transactions and sum amounts in the last window_seconds. O(log N + k).
Returns (count, total_amount).
"""
now = now or time.time()
cutoff = now - window_seconds
key = f"tx_log:{user_id}"

# Get all events in [now - window, now]
members = r.zrangebyscore(key, cutoff, now)

count = len(members)
total = sum(float(m.split(":")[1]) for m in members)
return count, round(total, 2)


# Demonstrate multi-window query
def get_all_velocity_features(user_id: str) -> dict:
now = time.time()
windows = {
"1m": 60,
"5m": 300,
"1h": 3600,
"24h": 86400,
"7d": 604800,
}

features = {}
for label, seconds in windows.items():
count, total = count_transactions_in_window(user_id, seconds, now)
features[f"tx_count_{label}"] = count
features[f"tx_sum_{label}"] = total
return features

The sorted set approach stores actual events (not buckets), which enables exact counts and makes it easy to add new window sizes without schema changes. The trade-off: memory scales with event count (O(events)), not window size. For users with very high event rates (10,000+ transactions per day), prefer the bucket approach.


CUSUM for Real-Time Anomaly Detectionโ€‹

CUSUM (Cumulative Sum) is a sequential analysis algorithm for detecting shifts in a metric. In a risk context: detect when transaction velocity has deviated from the user's baseline by a statistically significant amount.

The algorithm: St=maxโก(0,Stโˆ’1+(xtโˆ’ฮผ)โˆ’k)S_t = \max(0, S_{t-1} + (x_t - \mu) - k)

where xtx_t is the current observation, ฮผ\mu is the baseline mean, kk is the slack parameter (typically k=ฯƒ/2k = \sigma / 2), and StS_t is the cumulative sum. Raise an alert when St>hS_t > h (the threshold).

from dataclasses import dataclass
from typing import Optional


@dataclass
class CUSUMState:
s_pos: float = 0.0 # Positive CUSUM (detects upward shifts)
s_neg: float = 0.0 # Negative CUSUM (detects downward shifts)
n_samples: int = 0
alert_count: int = 0


class CUSUMDetector:
"""
CUSUM-based anomaly detector for real-time velocity features.
Detects sustained deviations from baseline velocity.

Parameters:
baseline_mean: expected transaction count per minute (user baseline)
slack_k: slack factor (typically sigma / 2)
threshold_h: alert threshold (typically 4-5 sigma)
"""

def __init__(
self,
baseline_mean: float,
slack_k: float,
threshold_h: float,
):
self.mu = baseline_mean
self.k = slack_k
self.h = threshold_h

def update(self, state: CUSUMState, observation: float) -> Tuple[CUSUMState, bool]:
"""
Update CUSUM state with a new observation.
Returns (new_state, alert_triggered).
"""
deviation = observation - self.mu

# Upward CUSUM (detects fraud spikes)
s_pos = max(0.0, state.s_pos + deviation - self.k)
# Downward CUSUM (detects sudden drops - e.g., account takeover)
s_neg = max(0.0, state.s_neg - deviation - self.k)

alert = s_pos > self.h or s_neg > self.h

new_state = CUSUMState(
s_pos=s_pos,
s_neg=s_neg,
n_samples=state.n_samples + 1,
alert_count=state.alert_count + (1 if alert else 0),
)

return new_state, alert


# Example: user with baseline 2 tx/minute suddenly makes 15/minute
detector = CUSUMDetector(
baseline_mean=2.0, # Normal: 2 transactions per minute
slack_k=1.0, # Slack = 0.5 * baseline sigma
threshold_h=8.0, # Alert after sustained deviation
)

state = CUSUMState()
observations = [2, 1, 3, 2, 15, 14, 18, 16, 15] # Spike starts at index 4

for i, obs in enumerate(observations):
state, alert = detector.update(state, obs)
print(f"t={i}: obs={obs}, S+={state.s_pos:.1f}, alert={alert}")

Expected output:

t=0: obs=2, S+=0.0, alert=False
t=1: obs=1, S+=0.0, alert=False
t=2: obs=3, S+=0.0, alert=False
t=3: obs=2, S+=0.0, alert=False
t=4: obs=15, S+=12.0, alert=True
t=5: obs=14, S+=25.0, alert=True
t=6: obs=18, S+=42.0, alert=True
t=7: obs=16, S+=57.0, alert=True
t=8: obs=15, S+=71.0, alert=True

CUSUM detects the spike at the first fraudulent transaction (t=4). A simple threshold on the raw count would require knowing the threshold in advance; CUSUM adapts to the baseline.


For sliding window features computed in a Flink pipeline rather than Redis, use SlidingEventTimeWindows:

from pyflink.datastream.window import SlidingEventTimeWindows, Time
from pyflink.datastream.functions import AggregateFunction, ProcessWindowFunction
from pyflink.common import Types


class TransactionCountAgg(AggregateFunction):
"""Incremental aggregation: O(1) state per event."""
def create_accumulator(self): return {"count": 0, "sum": 0.0}
def add(self, value, acc):
acc["count"] += 1
acc["sum"] += value.amount
return acc
def get_result(self, acc): return acc
def merge(self, a, b):
return {"count": a["count"] + b["count"], "sum": a["sum"] + b["sum"]}


class WindowResultExtractor(ProcessWindowFunction):
"""Adds window metadata to the aggregated result."""
def process(self, key, context, elements):
agg = next(iter(elements)) # One element from AggregateFunction
yield {
"user_id": key,
"tx_count": agg["count"],
"tx_sum": agg["sum"],
"window_start": context.window().start / 1000,
"window_end": context.window().end / 1000,
}


# In the Flink pipeline
velocity_stream = (
transaction_stream
.key_by(lambda e: e.user_id)
.window(SlidingEventTimeWindows.of(
Time.hours(1), # 1-hour window
Time.minutes(1), # Slide: compute new feature every minute
))
.aggregate(
TransactionCountAgg(),
window_function=WindowResultExtractor(),
)
)

Two-Level Aggregation Architectureโ€‹


Production Notesโ€‹

Choose bucket granularity based on smallest window: if your smallest window is 1 minute, use 1-minute buckets. If smallest is 5 minutes, use 5-minute buckets (1/5 the Redis keys). Finer granularity enables smaller windows but increases the number of Redis keys and pipeline size for large windows.

Pre-warm features for new entities: a new user has no bucket history. Serve defaults (zeros) for count/sum features. Alert when a model endpoint receives all-zero features - it may indicate a new user type not well-represented in training data.

Monitor bucket fill rates: if event ingestion falls behind, recent buckets will have fewer events than expected. This is feature staleness at the bucket level. Alert when the current-minute bucket count is more than 50% below the historical p50 for this time of day.

:::danger Exact COUNT DISTINCT at Scale Is Expensive An exact count of distinct values requires storing every unique value seen in the window. For distinct_merchants_1h across 10M users with 100 merchants each, that's 1TB of state. Use HyperLogLog for any count-distinct feature with cardinality above a few hundred. Redis's native PFADD/PFCOUNT provide 0.81% error at ~12KB per entity. This is sufficient for fraud and recommendation features. :::

:::danger Storing Raw Events in Window State Maintaining raw event history in Flink state or Redis for window queries leads to O(events) memory - unbounded growth for high-volume entities. A merchant with 1M transactions per day maintains 1M events in state per window size. At 5 windows, that's 5M events. Use the two-level bucketing approach: O(window_minutes) state, independent of event rate. :::


Interview Q&Aโ€‹

Q: Explain the two-level aggregation pattern and why it is preferable to storing raw events in window state.

The two-level pattern pre-aggregates events into fixed time buckets (e.g., one-minute intervals). At ingestion, each event increments the current minute's bucket (O(1)). At query time, computing a window sum requires summing N buckets where N is the window size in minutes. Memory is O(window_minutes), independent of event rate. Storing raw events is O(events) - for a user with 10,000 transactions per day and a 7-day window, that's 70,000 events per user. The bucket approach stores 10,080 buckets (7 ร— 24 ร— 60 minutes), each containing just a count and sum. 70,000 events at ~50 bytes each = 3.5MB vs. 10,080 buckets at ~16 bytes each = 160KB. 22x less memory per user.

Q: When should you use HyperLogLog instead of exact COUNT DISTINCT in real-time features?

Use HyperLogLog when the set cardinality is large (more than a few hundred elements), when memory is constrained (HLL uses ~1.5KB regardless of cardinality), or when 1โ€“2% relative error is acceptable. For fraud detection features like distinct_ips_24h or distinct_merchants_7d, a 1% error on a count of 50 (estimated as 49 or 51) has no material impact on model performance. Use exact counting only when: the cardinality is guaranteed to be small (under 100 elements), the feature is a hard threshold (exactly 1 distinct IP - exact), or downstream logic requires bit-for-bit reproducibility. Redis's PFADD/PFCOUNT implements HLL natively and is the preferred implementation for Redis-based feature pipelines.

Q: A Flink sliding window job computes tx_count_1h every minute. How does this interact with late-arriving events?

A sliding window fires when Flink's watermark passes the window end time. The watermark is typically max(event_time) - allowed_lateness. With 30s allowed lateness, a window ending at 2:00 PM fires when Flink sees an event timestamped at 2:00:30 PM or later. Events that arrive at Flink after the window has fired but have event times within the window are "late data." By default, Flink discards them. To handle late events, configure allowedLateness(Duration.ofMinutes(2)) on the window - this keeps the window's state alive for 2 minutes after it fires, allowing late events to trigger window recomputation and emit a corrected result. The downstream sink must handle receiving updated results for the same window.

Q: How would you implement the CUSUM algorithm in a Flink streaming pipeline?

Implement CUSUM as a stateful Flink KeyedProcessFunction. The state is the CUSUM accumulator (S_pos, S_neg) per user, stored in Flink's ValueState<CUSUMState>. On each transaction event, retrieve the user's CUSUM state, compute the new observation (e.g., transaction count in last minute from a side call to Redis), update the CUSUM values, store the new state, and emit an alert event if the threshold is exceeded. The CUSUM state resets when an alert fires (or can be reset manually). Key considerations: the baseline (mu) and threshold (h) should be user-specific, learned from historical data, not fixed constants. Store them in the feature store and load them at job startup or per-user.

Q: How do you size a Redis cluster for a real-time aggregation system?

Memory estimate: for each active entity, you need (max_window_minutes) bucket keys ร— (count + sum values per bucket) ร— overhead. For a 7-day window with 1-minute buckets: 10,080 keys ร— ~100 bytes/key = ~1MB per entity. For 1M active entities: 1TB. This typically requires Redis Cluster with sharding across 10โ€“20 nodes for cost efficiency. Optimization: use Redis HASH instead of individual key-value pairs for buckets (reduces memory overhead 3โ€“5x via ziplist encoding for small hashes). Profile with redis-cli --bigkeys and DEBUG OBJECT to measure actual memory per entity. Add 30% headroom for hot-key replication overhead and peak traffic.

ยฉ 2026 EngineersOfAI. All rights reserved.