Skip to main content

Retail Data Engineering

The Hidden Infrastructure of Retail AI

Every recommendation shown to a Target customer, every dynamic price computed by Amazon's pricing engine, every inventory replenishment order triggered by Walmart's supply chain system - all of it depends on data infrastructure that most engineers never see.

The ML model is 10% of the problem. The data pipeline that ensures that model receives fresh, accurate, and complete features at the moment of inference is the other 90%. And in retail, this is extraordinarily hard.

Consider what happens in the 30 seconds after a customer at a Target store scans their loyalty card and begins checking out. Their purchase events are being recorded. Their loyalty program balance is being updated. Their purchase history is being enriched with new transaction data. A recommendation engine is deciding what coupon to print at the bottom of their receipt. A demand forecasting system is noting that Tide Pods just sold at store #4423 and updating its current-period sales accumulator. An inventory system is checking whether this sale brings store #4423 below reorder point for this SKU.

All of this happens simultaneously, at scale, across 2,000 stores, 80,000 SKUs, millions of customers. The data infrastructure making this possible is not just "a database." It is a distributed event-driven system with real-time stream processing, a feature store serving ML models with sub-100ms latency, a data lake storing petabytes of historical data for batch training, and a customer identity resolution layer that recognizes the same customer across in-store, online, and mobile channels.

This lesson covers the engineering of that infrastructure.


Why This Exists

The naive approach to retail data: extract transactions from POS systems nightly, load them into a data warehouse, run SQL reports every morning. This worked in 1995. It is inadequate for modern retail AI for three reasons.

Latency: ML-powered decisions need data from seconds ago, not yesterday. A recommendation engine needs to know that the customer just added a shirt to their cart (10 seconds ago) to suggest matching pants. A dynamic pricing system needs to know that a competitor changed their price (5 minutes ago) to respond. A fraud detection system needs to process a transaction while the customer is still at the checkout terminal.

Volume: A major retailer generates 10-50 million POS transactions per day. A large e-commerce site generates 100-500 million clickstream events per day. These volumes require distributed stream processing, not batch ETL jobs.

Complexity: Retail data is not just transactions. It is product catalog data (constantly updated as items change, launch, and discontinue), customer identity data (the same customer shops in-store, online, and via mobile app under potentially different identifiers), supplier data, logistics data, and external signals (weather, competitor prices, social trends). Joining all of these correctly, at scale, with proper handling of late-arriving data and schema evolution, is a substantial engineering challenge.


Historical Context

The history of retail data infrastructure mirrors the history of data engineering broadly.

1970s-1990s: Mainframe and early relational databases. POS systems ran on proprietary mainframe terminals. Transaction data was batch-loaded nightly into DB2 or Oracle databases. Reports were generated weekly. The concept of "real-time" was not on the table.

1990s-2000s: Data warehousing era. Teradata, Oracle Exadata, and IBM Netezza made petabyte-scale analytic queries feasible. Walmart built one of the world's largest data warehouses (7.5 petabytes by 2004). The extract-transform-load (ETL) model became standard. But the architecture was still fundamentally batch and backward-looking.

2010s: The Hadoop and Spark era. Open-source distributed computing (HDFS, MapReduce, then Spark) broke the proprietary lock of expensive data warehouse appliances. Retailers could now store raw data cheaply and process it with distributed compute. The "data lake" concept emerged: store everything in raw form, process on demand.

2015-2020: The streaming revolution. Kafka (open-sourced by LinkedIn in 2011) and Apache Flink (2016) enabled stream processing at massive scale. Netflix, Uber, and Airbnb published their streaming architectures. Retailers began deploying streaming pipelines for real-time inventory updates, fraud detection, and recommendation systems.

2020-present: Lakehouse and feature stores. The Delta Lake, Iceberg, and Hudi table formats brought ACID transactions and schema evolution to data lakes. Feature stores (Feast, Tecton, Vertex AI Feature Store, Databricks Feature Store) formalized the pattern of pre-computing and serving ML features at low latency. The "customer data platform" (CDP) emerged as a category specifically for managing first-party customer data.


Core Concepts

Point-of-Sale Data Streams

POS transactions are the ground truth of retail. Every sale is recorded with:

  • Timestamp (to the second)
  • Store ID and terminal ID
  • Cashier ID
  • Customer loyalty ID (if scanned)
  • Item: SKU, UPC, quantity, unit price, extended price
  • Payment method (cash, credit, debit, mobile pay)
  • Promotions applied (discount code, price reduction)
  • Tender change

At a 2,000-store retailer with average 5,000 transactions per store per day, this is 10 million transactions per day. Each transaction has 5-20 line items. That is 50-200 million transaction line item records per day.

The data quality challenges unique to POS:

  • POS system outages: stores may buffer transactions locally during connectivity issues and send them in batches when connectivity is restored. These arrive out of order relative to their actual timestamps.
  • Price override events: cashiers can override prices for customer complaints. These create records that look like individual transactions but are corrections to prior transactions.
  • Voided transactions: a customer changes their mind. The void must cancel the associated inventory decrement.
  • Duplicate transactions: network retry logic sometimes sends the same transaction twice.
  • Test transactions: stores run test scans during training and store setup. These must be filtered.

Customer Data Platform Architecture

A Customer Data Platform (CDP) maintains a unified profile of each customer across all channels and touchpoints. For a retailer, this means:

  • In-store purchases (linked via loyalty card or phone number)
  • E-commerce orders (linked via account)
  • Mobile app activity (linked via device ID and account)
  • Email marketing engagement (linked via email address)
  • Website sessions (linked via cookie, or account if logged in)

Identity resolution is the core technical challenge: the same person shops with their loyalty card on Monday, uses a different email for an online order on Wednesday, and browses the website anonymously on Friday. Connecting these three events to the same person requires a probabilistic matching engine that links identifiers using email, phone number, name+address combinations, and device fingerprints.

Event schema design: The CDP ingests events in a standardized schema. A canonical retail event schema:

{
"event_id": "uuid",
"event_type": "product_view | add_to_cart | purchase | return | search",
"timestamp": "ISO8601",
"session_id": "string",
"customer_id": "string or null (anonymous)",
"device_type": "mobile | desktop | tablet | in_store",
"channel": "web | app | store | email",
"properties": {
"sku_id": "string",
"category": "string",
"price": "number",
"quantity": "integer"
}
}

Medallion Architecture for Retail

The medallion architecture (Bronze/Silver/Gold) organizes data by refinement level:

Bronze (Raw): Exactly as received from source systems. No transformation. Immutable. Schema-on-read. Preserves all data including anomalies. Retention: 7+ years for compliance.

Silver (Cleansed): Cleaned, validated, and deduplicated data. Schema-on-write. Transformations applied: duplicate transactions removed, test records filtered, PII masked, timestamps normalized to UTC. The single source of truth for analytical queries.

Gold (Aggregated): Pre-aggregated, business-domain-specific views built from Silver. Examples:

  • daily_sales_by_store_sku: daily roll-up of units sold and revenue per store-SKU pair
  • customer_360_profile: denormalized customer profile with purchase history, preferences, and CLV score
  • product_performance_weekly: weekly category and SKU performance metrics

The medallion architecture addresses the common retail data lake antipattern: a "data swamp" where all data is raw and nobody can find or trust anything.

Apache Flink is the industry standard for stateful stream processing at retail scale. The key operations:

Aggregations over time windows: count purchases per SKU in the last hour, compute rolling 7-day average basket size per store, track session duration and page views per user.

Stream-stream joins: join clickstream events with product catalog data to enrich browsing events with category information and price.

Late data handling: events arrive out of order due to network delays, mobile offline buffering, and batch flush from POS terminals. Flink's watermark mechanism handles late-arriving data: define a maximum expected lateness, and windows remain open until the watermark passes their end time.

Stateful computation: a user's session needs to be aggregated across all events in that session, which can span hours. Flink maintains per-key state that persists across events.

Session-Based Event Tracking

Web and mobile sessions are the basic unit of user intent detection. A session is a contiguous sequence of user actions separated by no more than a timeout gap (typically 30 minutes).

Session features that are valuable for recommendations and personalization:

  • Session intent: what category did the user primarily browse? what was their search query?
  • Session progress: how many pages viewed, how much time spent, did they add to cart?
  • Session context: device type, day of week, time of day, referral source
  • Recency relative to prior session: how many days since last session? what did they buy last?

Computing session features requires stateful streaming: as events arrive, accumulate them into the current session. When a 30-minute gap is detected, emit the completed session as a record to downstream systems.

Privacy Engineering for Retail Data

CCPA (California Consumer Privacy Act) and GDPR impose specific requirements on retail data systems:

  • Right to know: users can request what data you have on them
  • Right to delete: users can request deletion of their personal data
  • Right to opt-out: users can opt out of sale of their personal data
  • Data minimization: only collect data necessary for stated purpose

Engineering implications:

  • PII catalog: maintain a catalog of where PII exists in your systems. Name, email, phone, address, and device identifiers must be tracked to all tables and systems that store them.
  • Deletion propagation: when a user requests deletion, the deletion must propagate to raw event logs, aggregated tables, feature stores, model training datasets, and ML model parameters that may have memorized individual records.
  • Pseudonymization: replace directly identifying fields (name, email) with a customer_id token as early as possible in the pipeline. Store the token-to-identity mapping in a separate access-controlled system.
  • Differential privacy for ML: when training models on customer data, techniques like differential privacy (adding calibrated noise to gradients) provide mathematical guarantees that individual records cannot be recovered from model parameters.

Practical Implementation

# ============================================================
# 1. Canonical Retail Event Schema and Validation
# ============================================================

from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, Any
from datetime import datetime
import json
import hashlib
import uuid


@dataclass
class RetailEvent:
"""
Canonical schema for all retail events.
Enforces required fields and type consistency.
"""
event_id: str
event_type: str
timestamp: datetime
session_id: str
channel: str # web | app | store | email

# Optional - populated after identity resolution
customer_id: Optional[str] = None
anonymous_id: Optional[str] = None # cookie or device ID

# Event-specific properties
sku_id: Optional[str] = None
category: Optional[str] = None
price: Optional[float] = None
quantity: Optional[int] = None
store_id: Optional[str] = None
search_query: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)

VALID_EVENT_TYPES = frozenset([
'session_start', 'product_view', 'search',
'add_to_cart', 'remove_from_cart', 'checkout_start',
'purchase', 'return', 'wishlist_add', 'page_view'
])
VALID_CHANNELS = frozenset(['web', 'app', 'store', 'email', 'kiosk'])

def __post_init__(self):
if self.event_type not in self.VALID_EVENT_TYPES:
raise ValueError(f"Invalid event_type: {self.event_type}")
if self.channel not in self.VALID_CHANNELS:
raise ValueError(f"Invalid channel: {self.channel}")
if self.event_id is None:
self.event_id = str(uuid.uuid4())

def to_json(self) -> str:
d = asdict(self)
d['timestamp'] = self.timestamp.isoformat()
return json.dumps(d)

@classmethod
def from_json(cls, json_str: str) -> 'RetailEvent':
d = json.loads(json_str)
d['timestamp'] = datetime.fromisoformat(d['timestamp'])
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})

def pseudonymize(self) -> 'RetailEvent':
"""
Replace PII with pseudonymous tokens.
Store the mapping separately in a controlled vault.
"""
import copy
event = copy.copy(self)
# In production: look up or create a stable pseudonymous token
# for customer_id using a tokenization service
if event.customer_id:
event.customer_id = hashlib.sha256(
event.customer_id.encode()
).hexdigest()[:16]
return event


# ============================================================
# 2. POS Data Deduplication and Validation
# ============================================================

import pandas as pd
import numpy as np
from typing import List


class POSDataCleaner:
"""
Clean and validate POS transaction data.
Handles duplicates, voids, test transactions, and out-of-order records.
"""

# Store IDs used for training/testing (to be filtered)
TEST_STORE_PATTERNS = ['TEST', 'DEMO', 'TRAIN', '9999']

def deduplicate(
self,
transactions_df: pd.DataFrame,
dedup_keys: List[str] = None
) -> pd.DataFrame:
"""
Remove duplicate transactions.
Uses transaction_id as primary dedup key.
For transactions without IDs, use composite key.
"""
if dedup_keys is None:
dedup_keys = ['store_id', 'terminal_id', 'transaction_timestamp', 'total_amount']

original_count = len(transactions_df)

# Primary dedup by transaction_id if available
if 'transaction_id' in transactions_df.columns:
df = transactions_df.drop_duplicates(subset=['transaction_id'], keep='last')
else:
df = transactions_df.drop_duplicates(subset=dedup_keys, keep='last')

removed = original_count - len(df)
if removed > 0:
print(f"Removed {removed} duplicate transactions ({removed/original_count:.2%})")

return df

def apply_voids(
self,
transactions_df: pd.DataFrame
) -> pd.DataFrame:
"""
Remove voided transactions. A void cancels a previous transaction.
Voids are identified by is_void=True flag and original_transaction_id reference.
"""
if 'is_void' not in transactions_df.columns:
return transactions_df

void_ids = transactions_df[transactions_df['is_void'] == True]['original_transaction_id']
# Remove both the void record and the voided original
mask = ~(
transactions_df['transaction_id'].isin(void_ids) |
transactions_df['is_void'].fillna(False)
)
return transactions_df[mask]

def filter_test_records(
self,
transactions_df: pd.DataFrame
) -> pd.DataFrame:
"""Filter out test and training transaction records."""
if 'store_id' not in transactions_df.columns:
return transactions_df

mask = ~transactions_df['store_id'].astype(str).str.contains(
'|'.join(self.TEST_STORE_PATTERNS), case=False, na=False
)

# Also filter zero-value transactions (training scans)
if 'total_amount' in transactions_df.columns:
mask = mask & (transactions_df['total_amount'] != 0)

return transactions_df[mask]

def handle_late_arriving_data(
self,
transactions_df: pd.DataFrame,
processing_timestamp: pd.Timestamp,
max_late_hours: int = 48
) -> tuple:
"""
Separate on-time records from late-arriving records.
Late records may need to be replayed into already-closed aggregation windows.
"""
transactions_df['transaction_timestamp'] = pd.to_datetime(
transactions_df['transaction_timestamp']
)
lateness_hours = (
processing_timestamp - transactions_df['transaction_timestamp']
).dt.total_seconds() / 3600

on_time = transactions_df[lateness_hours <= max_late_hours]
late = transactions_df[lateness_hours > max_late_hours]

return on_time, late

def clean(
self,
transactions_df: pd.DataFrame,
processing_timestamp: pd.Timestamp = None
) -> pd.DataFrame:
"""Full cleaning pipeline."""
if processing_timestamp is None:
processing_timestamp = pd.Timestamp.now()

df = self.deduplicate(transactions_df)
df = self.apply_voids(df)
df = self.filter_test_records(df)
on_time, late = self.handle_late_arriving_data(df, processing_timestamp)

if len(late) > 0:
print(f"Warning: {len(late)} late-arriving records detected. Manual replay may be needed.")

return on_time


# ============================================================
# 3. Medallion Architecture Implementation
# ============================================================

class RetailDataLake:
"""
Medallion architecture implementation for retail data.
In production: use Delta Lake / Iceberg on S3 or GCS.
Here we use Pandas as a simplified illustration.
"""

def __init__(self, base_path: str = '/data/retail-lake'):
self.base_path = base_path
self.cleaner = POSDataCleaner()

def write_bronze(
self,
data: pd.DataFrame,
source: str,
partition_date: str
):
"""
Bronze layer: raw data, exactly as received.
Never modify. Append-only.
Partitioned by source and date for efficient pruning.
"""
path = f"{self.base_path}/bronze/{source}/date={partition_date}/"
# In production: write as Parquet with Delta Lake for ACID guarantees
# data.to_parquet(path, engine='pyarrow', partition_cols=['store_id'])
print(f"[Bronze] Written {len(data)} records to {path}")

def process_silver_transactions(
self,
bronze_data: pd.DataFrame,
processing_timestamp: pd.Timestamp
) -> pd.DataFrame:
"""
Silver layer: cleaned, validated transactions.
Apply business rules: dedup, void handling, test filtering.
"""
# Clean
clean_data = self.cleaner.clean(bronze_data, processing_timestamp)

# Standardize
if 'transaction_timestamp' in clean_data.columns:
clean_data['transaction_timestamp'] = pd.to_datetime(
clean_data['transaction_timestamp'], utc=True
)

# Add metadata columns
clean_data['_processed_at'] = processing_timestamp
clean_data['_source_system'] = 'pos'

# Pseudonymize PII
if 'customer_email' in clean_data.columns:
clean_data['customer_email_hash'] = clean_data['customer_email'].apply(
lambda x: hashlib.sha256(x.encode()).hexdigest() if pd.notna(x) else None
)
clean_data = clean_data.drop(columns=['customer_email', 'customer_name'], errors='ignore')

print(f"[Silver] Processed {len(clean_data)} clean records from {len(bronze_data)} raw")
return clean_data

def build_gold_daily_sales(
self,
silver_transactions: pd.DataFrame,
date: str
) -> pd.DataFrame:
"""
Gold layer: daily_sales_by_store_sku aggregation.
Used by demand forecasting, inventory management, reporting.
"""
daily_sales = (
silver_transactions
.groupby(['store_id', 'sku_id'])
.agg(
units_sold=('quantity', 'sum'),
gross_revenue=('extended_price', 'sum'),
num_transactions=('transaction_id', 'nunique'),
num_customers=('customer_id', lambda x: x.dropna().nunique()),
avg_selling_price=('unit_price', 'mean'),
on_promotion_units=('is_on_promotion', 'sum'),
)
.reset_index()
)
daily_sales['date'] = pd.to_datetime(date)
return daily_sales

def build_gold_customer_profile(
self,
silver_transactions: pd.DataFrame,
observation_end: pd.Timestamp = None
) -> pd.DataFrame:
"""
Gold layer: customer_360_profile.
Aggregated customer features used by personalization and CLV models.
"""
if observation_end is None:
observation_end = pd.Timestamp.now(tz='UTC')

silver_transactions['transaction_timestamp'] = pd.to_datetime(
silver_transactions['transaction_timestamp'], utc=True
)

profile = (
silver_transactions
.dropna(subset=['customer_id'])
.groupby('customer_id')
.agg(
total_spend=('extended_price', 'sum'),
total_transactions=('transaction_id', 'nunique'),
avg_order_value=('transaction_id', lambda x: silver_transactions.loc[x.index, 'extended_price'].sum() / x.nunique()),
first_purchase_date=('transaction_timestamp', 'min'),
last_purchase_date=('transaction_timestamp', 'max'),
favorite_category=('category', lambda x: x.mode()[0] if len(x) > 0 else None),
channel_preference=('channel', lambda x: x.mode()[0] if len(x) > 0 else None),
)
.reset_index()
)

profile['recency_days'] = (observation_end - profile['last_purchase_date']).dt.days
profile['tenure_days'] = (profile['last_purchase_date'] - profile['first_purchase_date']).dt.days

return profile


# ============================================================
# 4. Real-Time Feature Computation (Flink-like in Python)
# ============================================================

from collections import defaultdict, deque
from datetime import timedelta


class SessionAggregator:
"""
Simulates Flink stateful session computation in Python.
In production: implement as a Flink DataStream job.
"""

def __init__(self, session_timeout_minutes: int = 30):
self.timeout = timedelta(minutes=session_timeout_minutes)
self.active_sessions = {} # customer_id -> session state
self.completed_sessions = []

def process_event(self, event: RetailEvent) -> Optional[dict]:
"""
Process a single event. Returns a completed session if one was closed.
"""
key = event.customer_id or event.anonymous_id
if not key:
return None

now = event.timestamp

# Check if existing session has expired
if key in self.active_sessions:
session = self.active_sessions[key]
if now - session['last_event_time'] > self.timeout:
# Close the existing session
completed = self._close_session(key, session)
self.completed_sessions.append(completed)
del self.active_sessions[key]

# Start or continue session
if key not in self.active_sessions:
self.active_sessions[key] = {
'session_id': event.session_id,
'customer_id': key,
'start_time': now,
'last_event_time': now,
'event_count': 0,
'product_views': [],
'categories_viewed': set(),
'search_queries': [],
'add_to_cart_count': 0,
'channel': event.channel,
'device': event.extra.get('device_type', 'unknown'),
}

session = self.active_sessions[key]
session['last_event_time'] = now
session['event_count'] += 1

# Accumulate signals by event type
if event.event_type == 'product_view' and event.sku_id:
session['product_views'].append(event.sku_id)
if event.category:
session['categories_viewed'].add(event.category)

elif event.event_type == 'search' and event.search_query:
session['search_queries'].append(event.search_query)

elif event.event_type == 'add_to_cart':
session['add_to_cart_count'] += 1

elif event.event_type == 'purchase':
completed = self._close_session(key, session)
self.completed_sessions.append(completed)
del self.active_sessions[key]
return completed

return None

def _close_session(self, key: str, session: dict) -> dict:
"""Finalize session and compute derived features."""
duration = (session['last_event_time'] - session['start_time']).total_seconds()
return {
'session_id': session['session_id'],
'customer_id': key,
'start_time': session['start_time'],
'end_time': session['last_event_time'],
'duration_seconds': duration,
'event_count': session['event_count'],
'distinct_products_viewed': len(set(session['product_views'])),
'distinct_categories': len(session['categories_viewed']),
'primary_category': max(session['categories_viewed'], key=lambda c: session['product_views'].count(c)) if session['categories_viewed'] else None,
'search_query_count': len(session['search_queries']),
'add_to_cart_count': session['add_to_cart_count'],
'channel': session['channel'],
'device': session['device'],
'intent_score': min(1.0, session['add_to_cart_count'] * 0.3 + len(session['search_queries']) * 0.1 + len(set(session['product_views'])) * 0.02)
}


# ============================================================
# 5. Feature Store Interface
# ============================================================

class RetailFeatureStore:
"""
Simplified feature store interface.
In production: use Feast, Tecton, or Databricks Feature Store.
"""

def __init__(self):
# Simulate online store (Redis in production)
self._online_store = {}
# Simulate offline store (Parquet/Delta Lake in production)
self._offline_store = {}

def write_customer_features(
self,
customer_id: str,
features: dict,
event_time: datetime
):
"""Write real-time customer features to online store."""
key = f"customer:{customer_id}"
self._online_store[key] = {
'features': features,
'event_time': event_time.isoformat(),
}

def get_customer_features(
self,
customer_id: str,
feature_names: List[str] = None
) -> dict:
"""
Retrieve customer features from online store.
Falls back to offline store if not found in online.
"""
key = f"customer:{customer_id}"
if key in self._online_store:
features = self._online_store[key]['features']
if feature_names:
features = {k: v for k, v in features.items() if k in feature_names}
return features

# Fallback to offline store
if customer_id in self._offline_store:
return self._offline_store.get(customer_id, {})

return {} # Unknown customer - cold start

def get_point_in_time_features(
self,
customer_ids: List[str],
timestamps: List[datetime],
feature_names: List[str]
) -> pd.DataFrame:
"""
Point-in-time correct feature lookup for training data generation.
Returns the feature values as they existed at each timestamp.
Critical for avoiding training data leakage.
"""
# In production: Feast's get_historical_features() handles this
# It joins the entity dataframe (customer_id, event_timestamp)
# against the offline feature store, returning features valid AT that timestamp
results = []
for cid, ts in zip(customer_ids, timestamps):
features = self.get_customer_features(cid, feature_names)
results.append({'customer_id': cid, 'timestamp': ts, **features})
return pd.DataFrame(results)

Architecture Diagrams

Retail Data Platform Architecture

Medallion Architecture Layers


Production Engineering Notes

Training-Serving Skew Prevention

Training-serving skew is the most insidious source of production ML bugs. It happens when the features computed during training differ from the features served at inference time - even when both use the same feature definition.

Root causes:

  • Different code paths for batch feature computation (training) vs. real-time feature computation (serving)
  • Data freshness differences: training features use 30-day windows; serving features have only 7 days of real data available
  • Missing value handling differences: training fills nulls with 0; serving returns null which downstream code handles differently

Prevention:

  1. Use the same feature computation code for both training and serving. Feature stores like Feast enforce this through "feature views" that define feature computation once and execute in both contexts.
  2. Log the feature values used for every prediction. Compare their distributions daily against training data distributions using Population Stability Index. Alert when PSI > 0.25.
  3. Implement shadow mode: run the new model alongside the production model, logging its predictions without serving them. Compare feature distributions between training and shadow serving before promotion.

Data Quality Monitoring

In retail, data quality issues are frequent and often invisible. A POS system upgrade at 200 stores might change the format of a product attribute field. An app update might add a new event type that downstream pipelines do not recognize. A promotional system change might alter how discount amounts are recorded.

Monitoring framework:

  • Volume anomalies: daily transaction count per store should not deviate more than 20% from same-day-last-year without a business explanation. Alert on drops (might indicate data pipeline failure) and spikes (might indicate duplication).
  • Schema drift: detect new or missing columns in incoming data, changed data types, unexpected null rates
  • Statistical drift: monitor mean and standard deviation of key fields (transaction amount, item count per transaction) using z-scores over rolling 7-day windows

Great Expectations (open source) and Monte Carlo Data (commercial) are standard tools for implementing these checks in data pipelines.


Common Mistakes

:::danger Point-in-Time Feature Leakage in Training Data The most dangerous data engineering mistake for retail ML: computing training features using data from after the prediction timestamp. Example: for a model trained to predict whether a customer will churn in the next 30 days, you use the customer's "days since last purchase" feature computed today rather than as it existed 30 days ago. The model sees the future (days_since_last_purchase is already 30 by the time you compute it for churned customers) and achieves artificially high training accuracy. In production, it sees the feature as it exists at prediction time (days_since_last_purchase might be 5) and performs much worse. Prevention: always use a feature store with point-in-time correct lookups for training data generation. The entity dataframe passed to get_historical_features() must include the exact prediction timestamp, and the feature store must return feature values as they existed at that timestamp. :::

:::danger Ignoring Late-Arriving Data in Aggregations POS transactions from stores with intermittent connectivity arrive hours or days late. If your daily sales aggregation job runs at midnight and a store uploads its afternoon transactions at 3 AM, your aggregation has already closed. The store's afternoon sales are missing from today's aggregation and will appear in tomorrow's, creating a systematic undercount in your demand forecasting training data for any day with late data. Prevention: design your aggregations with a late data buffer period (e.g., wait 24-48 hours before finalizing daily aggregations), use event time rather than processing time for window boundaries, and maintain an "is_final" flag on aggregations so downstream systems know which records may be revised. :::

:::warning Storing PII in Feature Stores Feature stores serving ML models often contain sensitive customer data: purchase history, browsing behavior, location data. If this data is not properly governed, it may persist longer than required, be accessible to more systems than authorized, and create CCPA/GDPR compliance exposure. Implement: (1) automatic TTL (time-to-live) on customer feature records based on your retention policy; (2) role-based access control separating read access for production model serving from write access for data engineering; (3) customer deletion propagation - when a customer exercises their right to deletion, the deletion must trigger cleanup in the feature store, not just the source database. :::

:::warning Session Attribution in Omnichannel Retail Attributing a purchase to the right marketing touchpoint is hard when customers research online, visit a store, and then purchase via mobile app. If your session tracking only covers the web, you attribute the purchase to the last web touchpoint, ignoring the in-store visit that likely drove the decision. This produces misleading attribution data that leads to under-investment in stores and over-investment in digital retargeting. Build identity-resolved cross-channel event streams: connect web sessions, app sessions, and in-store visits for authenticated (loyalty card) customers. Use a multi-touch attribution model rather than last-click. The data engineering requirement: a customer identifier that persists across channels, populated via identity resolution from loyalty card, email, and phone number matching. :::


Interview Questions and Answers

Q1: Explain the medallion architecture and why it is preferred over a single unified data store for retail.

A: The medallion architecture organizes data into Bronze (raw), Silver (cleansed), and Gold (aggregated) layers. Bronze stores data exactly as received from source systems - immutable, append-only. Silver applies business rules: deduplication, PII masking, void handling, schema normalization. Gold pre-aggregates into domain-specific views. The reason this beats a single unified store: retail data is heterogeneous in quality and has multiple consumers with different needs. A fraud detection system needs raw, unprocessed events as quickly as possible - Bronze. A demand forecasting model needs clean, validated sales records without test transactions or voids - Silver. A business report needs pre-aggregated daily sales by category - Gold. A single store would either be too raw (analysts spend hours cleaning data) or too processed (data scientists cannot access the raw signals they need). The medallion pattern also makes quality assurance explicit: Silver-to-Bronze transformation is where business rules are enforced, creating a documented, testable cleaning layer. When a new data quality issue is discovered (a POS upgrade changes a field format), you fix it in the Bronze-to-Silver transformation and replay affected dates through Silver and Gold. With a single-layer approach, every consumer of the data needs to handle the quality issue independently.

Q2: What is point-in-time correctness in a feature store and why does it matter?

A: Point-in-time correctness means that when you generate training data, the feature values returned for each training example reflect the state of those features at the prediction timestamp - not their current state. This matters because features change over time. A customer's "days since last purchase" on January 1st is different from their "days since last purchase" on March 1st. If you train a churn model and fetch features using the current values rather than the historical values at each training timestamp, your model learns from the future. Specifically: churned customers (those who stopped buying after January 1st) will have high "days since last purchase" when you compute features today - giving the model a feature that perfectly predicts churn because it already reflects the churn that has occurred. In production at prediction time, the feature value is current (potentially small) and the model is confused. The result is high training accuracy and poor production performance. Feature stores like Feast's get_historical_features() solve this by joining the entity dataframe (customer_id, prediction_timestamp) against a time-partitioned offline feature store, returning the last feature value computed before each prediction_timestamp.

Q3: How would you design a Kafka topic schema strategy for a retail event streaming platform?

A: The core decision is topic granularity: one topic per event type (product_view, add_to_cart, purchase) vs. one topic per domain (all commerce events in one topic) vs. one global topic. My recommendation: one topic per high-level event domain, with event_type as a field within the message. Rationale: separate topics for every event type creates 20+ topics that all need partition management, consumer group management, and retention policies. A single global topic creates a processing bottleneck where all consumers must process all events even if they only care about purchases. Domain-based topics (commerce_events, logistics_events, catalog_events) balance manageability against consumer isolation. Key schema design decisions: (1) Use Avro or Protobuf for serialization with a Schema Registry - this enforces schema contracts and allows schema evolution; (2) Include event_id, event_type, timestamp (both event time and ingestion time), source_system, customer_id (nullable), session_id, and a properties blob for event-specific fields; (3) Partition by customer_id for customer-centric consumers (recommendation systems) or by store_id for store-centric consumers (inventory systems). Multiple consumers with different partitioning needs can be served by separate topics created via Kafka Streams re-partitioning, or by separate partitioning schemes if Kafka Streams overhead is acceptable.

Q4: Describe how you would implement CCPA right-to-delete for a retail data platform with data in a data lake, feature store, and ML model registry.

A: Right-to-delete for a retail customer propagates through at least five systems. First, the source database: delete or anonymize the customer record in your CRM and loyalty system. This is the relatively easy part. Second, the data lake: raw event logs (Bronze) are immutable by design - you cannot delete individual records from Parquet files efficiently. Use crypto-shredding: encrypt customer PII using a per-customer encryption key. On deletion request, delete the encryption key. The data remains but becomes unreadable without the key. Silver and Gold aggregations that include customer PII must be rebuilt after crypto-shredding. Third, the feature store: purge all feature vectors associated with the customer_id from the online store (Redis) and mark records in the offline store as deleted with a TTL. Fourth, the ML model registry: this is the hardest part. Models trained on data including the deleted customer may have "memorized" their specific behavior. Differential privacy during training provides mathematical guarantees against this, but is rarely used retroactively. Machine unlearning is an active research area but not yet production-grade. Pragmatic approach: document that models trained before the deletion request may retain statistical influence from the deleted customer, but individual records are not recoverable from model parameters. This is legally acceptable in most jurisdictions as long as the source data and identifiable features are deleted. Implement a model retraining schedule that ensures models trained after deletion requests do not include deleted customer data. Fifth, downstream systems: audit all systems that receive data feeds from your platform to ensure the deletion propagates to analytics databases, data warehouses, and third-party integrations.

Q5: How do you handle schema evolution in a high-volume retail event stream where downstream consumers cannot all update simultaneously?

A: Schema evolution in event streaming is one of the most practically challenging data engineering problems in retail. The solution has three components. First, Schema Registry: use Confluent Schema Registry or AWS Glue Schema Registry to version all event schemas. Producers register schemas before producing; consumers can retrieve the schema for any message. This makes evolution explicit and auditable. Second, compatibility mode: configure the Schema Registry with backward compatibility - new schemas must be readable by consumers using the old schema. This means you can only add optional fields (with defaults) or remove fields. Adding required fields, renaming fields, or changing field types all require a major version increment (a new topic, essentially). Forward compatibility allows old producers to work with new consumers. Full compatibility requires both. Choosing backward compatibility allows rolling consumer updates: some consumers are on v1, some on v2; all can read messages produced by v1 or v2 producers. Third, migration strategy: for major breaking changes that require a new topic, run dual-write for an extended period. Producers write to both the old and new topic for 2-4 weeks. Consumers migrate to the new topic one by one. After all consumers have migrated, drain the old topic and decommission it. The key operational practice: maintain a schema changelog (what changed, when, who changed it, why) and communicate schema changes to all downstream team owners at least 2 sprint cycles in advance.

© 2026 EngineersOfAI. All rights reserved.