Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::

Lakehouse for ML Workflows

Three Systems, One Change, Three Problems

The ML team at a fintech company ran three separate data systems. Snowflake held the analytical data that the BI team queried - cleaned, modeled, ready for dashboards. S3 Parquet files (a separate copy) held the training data that the ML team used - same underlying data, but denormalized differently, with different feature columns, sometimes at different freshness levels. Redis held the serving features - the subset of features that needed to be retrieved in under 10 milliseconds at prediction time.

The system worked, more or less. Then one day, a product manager asked a simple question: "Can we add the customer's average transaction amount from the last 30 days as a feature for the churn model?" The answer involved six weeks of engineering work. The Snowflake model needed a new column. The ETL pipeline that copied data to S3 needed to be updated. The feature computation job needed to add the new rolling average. The Redis precomputation job needed to index the new field. Four different teams, four different deployments, one new column.

A week after the feature went live in production, an analyst noticed something odd: the average transaction amounts in the BI dashboards didn't match what the ML model was using for predictions. The Snowflake computation used a calendar-day 30-day window; the S3 training data had been built with a rolling 30-day window. The model was trained on one definition and served on another. The discrepancy was small enough that it hadn't been caught in testing but large enough to degrade model accuracy.

This is the training-serving skew problem, the data duplication tax, and the organizational coordination overhead that arise when ML data lives in separate systems from analytical data. The lakehouse architecture solves all three by making the lakehouse the single source of truth: BI queries, ML training, and batch feature computation all read from the same Iceberg table, with the same definitions, at the same or explicitly versioned snapshots.


Why the Lakehouse Is the Natural Foundation for ML

The requirements of ML data management map almost perfectly onto what a lakehouse provides:

ML RequirementLakehouse Feature
Reproducible training setsIceberg/Delta time travel - pin a snapshot ID
Large-scale raw data accessDirect Parquet reads with Spark - no query result size limits
Versioned datasetsImmutable snapshots - rollback to any point in history
Feature computation at scaleSpark on the same data warehouse queries use
Schema consistency between training and servingOne table definition used by all consumers
Experiment trackingMLflow logs experiment metadata alongside Delta table
Efficient incremental updatesHudi/Iceberg appends for streaming features
GDPR complianceDelete rows from training data using Iceberg DELETE + VACUUM

A traditional data warehouse (Snowflake, Redshift) satisfies most analytical requirements but is a poor fit for ML because: (a) it charges per query, making large training data scans expensive; (b) it limits result set sizes; (c) it doesn't integrate natively with PyTorch, TensorFlow, or Spark MLlib; (d) it doesn't support efficient bulk reads in columnar binary format (Arrow/Parquet).

A traditional ML data layer (raw S3 Parquet, DVC-tracked files) satisfies the ML requirements but creates the duplication problem: the same data must be maintained in two places (warehouse + ML store), and any transformation or definition change must be applied in both.

The lakehouse eliminates the duplication. The data lives once, in open Parquet files with Iceberg or Delta metadata. The BI team queries it through Trino. The ML team reads it through Spark or the PyIceberg library. The feature computation team writes new derived features back to the same lakehouse. One system, all consumers.


Versioned Training Datasets: The Foundation of Reproducibility

The most fundamental ML requirement that lakehouses satisfy better than any other system is training dataset versioning. A model's behavior is fully determined by the combination of its architecture, its hyperparameters, and the data it was trained on. If you cannot pin a training job to a specific, immutable version of the training data, you cannot reproduce the trained model - and you cannot debug it when something goes wrong in production.

The Traditional Problem

Without versioning, training data changes continuously as new events are ingested. A model trained on Monday and a model trained on Wednesday use different data even if all other parameters are identical. If the Wednesday model performs worse, you don't know whether the degradation is due to a pipeline change, a data drift, or noise. If you try to retrain the Monday model to debug it, the Monday data no longer exists - it has been overwritten by Tuesday's ingestion.

Iceberg Time Travel as Data Version Control

Iceberg's immutable snapshot model gives you dataset versioning for free. Every write to an Iceberg table creates a new snapshot with a unique snapshot ID. Previous snapshots are retained until explicitly expired. This means every training run can pin its training data to a specific snapshot:

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from datetime import datetime
import mlflow

def load_training_data(
catalog_name: str,
table_name: str,
snapshot_id: int = None, # None = latest snapshot
as_of_timestamp: str = None # ISO timestamp string
) -> pa.Table:
"""
Load training data from an Iceberg table at a specific snapshot.
Returns an Arrow table for efficient conversion to pandas/PyTorch.
"""
catalog = load_catalog(catalog_name, **{
"type": "glue",
"region_name": "us-east-1"
})

table = catalog.load_table(table_name)

scan_kwargs = {
"selected_fields": ["user_token", "avg_txn_amount_30d", "days_since_last_txn",
"total_txn_count_90d", "declined_count_30d", "churn_label"],
"row_filter": "event_date >= '2023-01-01' AND churn_label IS NOT NULL",
}

if snapshot_id is not None:
scan_kwargs["snapshot_id"] = snapshot_id
elif as_of_timestamp is not None:
# Find the snapshot closest to but not after the given timestamp
ts = datetime.fromisoformat(as_of_timestamp)
scan_kwargs["as_of_timestamp"] = int(ts.timestamp() * 1000)

return table.scan(**scan_kwargs).to_arrow()


# Training pipeline - pin to a specific snapshot for reproducibility
with mlflow.start_run() as run:

# Record which snapshot this training run uses
snapshot_id = 8473920183740928374 # captured from catalog before training starts
mlflow.log_param("training_data_table", "production.features.churn_features")
mlflow.log_param("training_data_snapshot_id", snapshot_id)
mlflow.log_param("training_data_row_count", 1_450_000)

# Load the pinned snapshot - always returns the same data, even months later
training_data = load_training_data(
catalog_name="glue",
table_name="production.features.churn_features",
snapshot_id=snapshot_id,
)

training_df = training_data.to_pandas()
X = training_df.drop(columns=["churn_label", "user_token"])
y = training_df["churn_label"]

# Train model...
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
model = GradientBoostingClassifier(n_estimators=300, max_depth=5, random_state=42)
model.fit(X_train, y_train)

val_auc = roc_auc_score(y_val, model.predict_proba(X_val)[:, 1])
mlflow.log_metric("val_auc_roc", val_auc)
mlflow.sklearn.log_model(model, "churn_model")

print(f"Training complete. Snapshot: {snapshot_id}, Val AUC: {val_auc:.4f}")

Six months later, if the production model shows unexpected behavior, you can re-run this exact training job with the same snapshot ID and reproduce the model bit-for-bit. No DVC, no data versioning system - Iceberg's immutable snapshots handle it.


The Feature Engineering Layer on a Lakehouse

dbt for Feature Transformations

dbt (data build tool) runs SQL transformations on top of lakehouse tables and materializes the results as new Iceberg or Delta tables. This is the right tool for time-invariant feature definitions - features that are computed once per batch and used for both training and serving:

-- dbt model: models/features/driver_performance_features.sql

{{
config(
materialized='incremental',
file_format='iceberg',
incremental_strategy='merge',
unique_key='driver_id',
partition_by={'field': 'feature_date', 'data_type': 'date'}
)
}}

-- Compute driver performance features for ML models
-- These features are used by: churn prediction model, surge pricing model,
-- driver quality ranking system
SELECT
driver_id,
CAST('{{ run_started_at }}'[:10] AS DATE) AS feature_date,

-- Trip volume features
COUNT(DISTINCT trip_id) AS total_trips_30d,
COUNT(DISTINCT trip_id) FILTER (WHERE status = 'completed') AS completed_trips_30d,
COUNT(DISTINCT trip_id) FILTER (WHERE status = 'cancelled') AS cancelled_trips_30d,
ROUND(
COUNT(DISTINCT trip_id) FILTER (WHERE status = 'cancelled') * 1.0 /
NULLIF(COUNT(DISTINCT trip_id), 0),
4
) AS cancellation_rate_30d,

-- Revenue features
ROUND(SUM(fare) FILTER (WHERE status = 'completed'), 2) AS total_revenue_30d,
ROUND(AVG(fare) FILTER (WHERE status = 'completed'), 2) AS avg_fare_30d,

-- Quality features
ROUND(AVG(rating) FILTER (WHERE rating IS NOT NULL), 3) AS avg_rating_30d,
COUNT(*) FILTER (WHERE rating <= 2.0) AS low_rating_count_30d,
COUNT(*) FILTER (WHERE rating >= 4.8) AS high_rating_count_30d,

-- Recency features
MAX(trip_date) AS last_trip_date,
DATEDIFF(CAST('{{ run_started_at }}'[:10] AS DATE), MAX(trip_date)) AS days_since_last_trip

FROM {{ ref('trips_enriched') }} -- upstream dbt model on top of raw Iceberg table
WHERE trip_date >= DATEADD(day, -30, CAST('{{ run_started_at }}'[:10] AS DATE))

{% if is_incremental() %}
-- Only process new trips since last dbt run
AND trip_date > (SELECT MAX(feature_date) FROM {{ this }})
{% endif %}

GROUP BY driver_id

This dbt model runs nightly, materializes driver features into an Iceberg table, and that same Iceberg table is used by:

  • The surge pricing model (reads features for active drivers in real time via Spark batch job)
  • The churn prediction model (reads historical feature snapshots for training)
  • The driver quality dashboard (queried via Trino)

One definition, three consumers. No duplication. When the cancellation rate calculation is updated, it propagates everywhere simultaneously on the next dbt run.

Spark Feature Engineering for Large-Scale Computations

For features that require window functions over billions of events (not suitable for dbt), Spark is the right tool:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()

# Load raw events from Iceberg
events = spark.read.format("iceberg").load("production.events.raw_transactions")

# Define window specifications for temporal features
user_window_7d = (
Window.partitionBy("user_token")
.orderBy(F.col("event_timestamp").cast("long"))
.rangeBetween(-7 * 24 * 3600, 0) # 7-day lookback in seconds
)

user_window_30d = (
Window.partitionBy("user_token")
.orderBy(F.col("event_timestamp").cast("long"))
.rangeBetween(-30 * 24 * 3600, 0) # 30-day lookback
)

user_window_90d = (
Window.partitionBy("user_token")
.orderBy(F.col("event_timestamp").cast("long"))
.rangeBetween(-90 * 24 * 3600, 0) # 90-day lookback
)

features = (
events
# 7-day features
.withColumn("txn_count_7d", F.count("txn_id").over(user_window_7d))
.withColumn("txn_amount_7d", F.sum("amount").over(user_window_7d))
.withColumn("avg_amount_7d", F.avg("amount").over(user_window_7d))
# 30-day features
.withColumn("txn_count_30d", F.count("txn_id").over(user_window_30d))
.withColumn("txn_amount_30d", F.sum("amount").over(user_window_30d))
.withColumn("decline_count_30d",
F.count(F.when(F.col("status") == "declined", 1)).over(user_window_30d))
# 90-day features
.withColumn("txn_count_90d", F.count("txn_id").over(user_window_90d))
.withColumn("unique_merchants_90d",
F.approx_count_distinct("merchant_id").over(user_window_90d))
# Derived ratio features (handle division by zero)
.withColumn("decline_rate_30d",
F.col("decline_count_30d") / F.greatest(F.col("txn_count_30d"), F.lit(1)))
# Keep only the latest feature point per user per day
.withColumn("rn", F.row_number().over(
Window.partitionBy("user_token", F.col("event_timestamp").cast("date"))
.orderBy(F.col("event_timestamp").desc())
))
.filter(F.col("rn") == 1)
.drop("rn")
.select(
"user_token", "event_timestamp",
F.col("event_timestamp").cast("date").alias("feature_date"),
"txn_count_7d", "txn_amount_7d", "avg_amount_7d",
"txn_count_30d", "txn_amount_30d", "decline_count_30d",
"txn_count_90d", "unique_merchants_90d", "decline_rate_30d",
)
)

# Write features back to the lakehouse as an Iceberg table
features.write.format("iceberg") \
.mode("overwrite") \
.option("partitionOverwriteMode", "dynamic") \
.partitionBy("feature_date") \
.saveAsTable("production.ml_features.user_transaction_features")

print(f"Feature computation complete. {features.count():,} feature rows written.")

MLflow + Delta Lake: Experiment Tracking on the Lakehouse

MLflow is the de facto standard for ML experiment tracking. When combined with a lakehouse, it enables tracking not just which hyperparameters produced which metrics, but which data snapshot trained which model. This closes the reproducibility loop completely.

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from pyspark.sql import SparkSession

# Configure MLflow to use Delta Lake as the artifact store
# (MLflow artifacts - model files, plots - stored in the lakehouse)
mlflow.set_tracking_uri("databricks") # or "http://mlflow-server:5000"
mlflow.set_experiment("/ml/churn-prediction")

spark = SparkSession.builder.getOrCreate()

# Capture the current snapshot ID before loading training data
catalog = spark.sql("DESCRIBE HISTORY production.features.churn_features LIMIT 1")
current_snapshot = catalog.first()["version"] # Delta version number
snapshot_timestamp = catalog.first()["timestamp"]

with mlflow.start_run(run_name=f"churn-gbm-snapshot-{current_snapshot}") as run:

# Log all data provenance metadata
mlflow.log_params({
"training_table": "production.features.churn_features",
"training_data_version": current_snapshot, # Delta snapshot version
"training_data_timestamp": str(snapshot_timestamp),
"training_start_date": "2023-01-01",
"training_end_date": "2024-01-01",
"feature_set_version": "v3.2", # semantic version from your feature catalog
})

# Load training data at the pinned snapshot
training_df = spark.read.format("delta") \
.option("versionAsOf", current_snapshot) \
.table("production.features.churn_features") \
.filter("event_date BETWEEN '2023-01-01' AND '2024-01-01'") \
.toPandas()

mlflow.log_metric("training_row_count", len(training_df))
mlflow.log_metric("positive_class_rate",
training_df["churn_label"].mean())

X = training_df.drop(columns=["churn_label", "user_token", "event_date"])
y = training_df["churn_label"]

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
import numpy as np

# Hyperparameters
params = {
"n_estimators": 500,
"max_depth": 6,
"learning_rate": 0.05,
"subsample": 0.8,
"min_samples_leaf": 50,
"random_state": 42,
}
mlflow.log_params(params)

model = GradientBoostingClassifier(**params)

# Cross-validation
cv_scores = cross_val_score(model, X, y, cv=5, scoring="roc_auc", n_jobs=-1)
mlflow.log_metric("cv_auc_mean", np.mean(cv_scores))
mlflow.log_metric("cv_auc_std", np.std(cv_scores))

# Final fit on all training data
model.fit(X, y)

# Log the model - stored in the Delta Lake artifact store
mlflow.sklearn.log_model(
model,
"churn_model",
registered_model_name="churn-prediction-gbm",
input_example=X.head(5),
)

print(f"Run ID: {run.info.run_id}")
print(f"CV AUC: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")
print(f"Training data snapshot: version {current_snapshot}")

Now, six months later, any engineer can look up this MLflow run and know exactly which data snapshot was used to train the model. They can re-load the identical training data:

# Reproduce the training data from 6 months ago - exact same rows
historical_training = spark.read.format("delta") \
.option("versionAsOf", current_snapshot) \ # the same version logged in MLflow
.table("production.features.churn_features") \
.filter("event_date BETWEEN '2023-01-01' AND '2024-01-01'")

Streaming Features on the Lakehouse

For features that must be near-real-time (computed within seconds or minutes of the triggering event), the architecture is: Kafka → Flink → Iceberg MOR table → Spark Structured Streaming reader.

# Flink writes streaming features to an Iceberg MOR table
# (in Flink SQL session)

flink_streaming_sql = """
CREATE TABLE kafka_transactions (
txn_id STRING,
user_token STRING,
amount DOUBLE,
merchant_id STRING,
status STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'payment-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);

CREATE TABLE iceberg_realtime_features (
user_token STRING,
window_end TIMESTAMP(3),
txn_count_5min BIGINT,
total_amount_5min DOUBLE,
unique_merchants_5min BIGINT,
PRIMARY KEY (user_token, window_end) NOT ENFORCED
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'glue',
'warehouse' = 's3://my-lakehouse/iceberg',
'table.type' = 'MERGE_ON_READ',
'write.upsert.enabled' = 'true'
);

-- 5-minute tumbling windows over payment events
INSERT INTO iceberg_realtime_features
SELECT
user_token,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
COUNT(txn_id) AS txn_count_5min,
SUM(amount) AS total_amount_5min,
COUNT(DISTINCT merchant_id) AS unique_merchants_5min
FROM kafka_transactions
GROUP BY user_token, TUMBLE(event_time, INTERVAL '5' MINUTE);
"""

# Spark Structured Streaming reads the Iceberg MOR table
# - processes new Iceberg snapshots as they appear (committed by Flink every checkpoint)
realtime_features = (
spark.readStream
.format("iceberg")
.option("stream-from-timestamp", "2024-01-01T00:00:00.000")
.load("production.ml_features.realtime_user_features")
)

# Join real-time features with batch features for online scoring
def score_batch(batch_df, batch_id):
from pyspark.sql.functions import broadcast

# Load batch features (cached - small enough to broadcast)
batch_features = spark.read.format("iceberg") \
.load("production.ml_features.user_transaction_features") \
.filter("feature_date = current_date()") \
.select("user_token", "txn_count_30d", "decline_rate_30d", "avg_amount_7d")

# Join real-time features with batch features
combined = batch_df.join(
broadcast(batch_features),
on="user_token",
how="left"
)

# Score with model (loaded from MLflow)
import mlflow.pyfunc
model = mlflow.pyfunc.load_model("models:/churn-prediction-gbm/Production")

scored = combined.withColumn(
"churn_probability",
model.predict(combined.select(
"txn_count_5min", "total_amount_5min",
"txn_count_30d", "decline_rate_30d", "avg_amount_7d"
))
)

# Write predictions back to the lakehouse
scored.select("user_token", "churn_probability", "window_end") \
.write.format("iceberg") \
.mode("append") \
.save("production.ml_predictions.realtime_churn_scores")

query = (
realtime_features.writeStream
.foreachBatch(score_batch)
.option("checkpointLocation", "s3://my-lakehouse/checkpoints/realtime-scoring")
.trigger(processingTime="1 minute")
.start()
)

Full End-to-End: Churn Model Training and Batch Scoring

This section brings together all the concepts into a single, complete workflow: training a churn model, versioning the training data, logging the experiment, and scoring all users with a batch job that writes predictions back to the lakehouse.

The Complete Batch Scoring Pipeline

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import mlflow.pyfunc
from datetime import date, timedelta

spark = SparkSession.builder \
.appName("ChurnBatchScoring") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()


# ── Step 1: Load the production model from MLflow ────────────────────────────

model_uri = "models:/churn-prediction-gbm/Production"
model = mlflow.pyfunc.load_model(model_uri)
model_version = mlflow.MlflowClient().get_latest_versions(
"churn-prediction-gbm", stages=["Production"]
)[0].version

print(f"Scoring with model version: {model_version}")


# ── Step 2: Load latest features from the lakehouse ──────────────────────────

scoring_date = date.today() - timedelta(days=1) # Score on yesterday's features

features_df = (
spark.read.format("iceberg")
.load("production.ml_features.churn_features")
.filter(F.col("feature_date") == str(scoring_date))
.select(
"user_token",
"txn_count_7d",
"txn_count_30d",
"txn_count_90d",
"avg_amount_7d",
"avg_amount_30d",
"decline_rate_30d",
"days_since_last_txn",
"unique_merchants_90d",
)
)

feature_count = features_df.count()
print(f"Scoring {feature_count:,} users for {scoring_date}")


# ── Step 3: Score in batches using pandas UDF (Arrow-based, vectorized) ──────

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
import numpy as np

feature_cols = [
"txn_count_7d", "txn_count_30d", "txn_count_90d",
"avg_amount_7d", "avg_amount_30d", "decline_rate_30d",
"days_since_last_txn", "unique_merchants_90d"
]

# Broadcast the model to all executors
model_broadcast = spark.sparkContext.broadcast(model)

@pandas_udf(DoubleType())
def score_users(*feature_series: pd.Series) -> pd.Series:
"""Vectorized scoring UDF using broadcasted model."""
features = pd.concat(feature_series, axis=1)
features.columns = feature_cols
features = features.fillna(0)

# Predict churn probability (positive class)
probs = model_broadcast.value.predict(features)
if hasattr(probs, 'iloc'):
return probs
return pd.Series(probs)


predictions = features_df.withColumn(
"churn_probability",
score_users(*[F.col(c) for c in feature_cols])
).withColumn(
"churn_score_date", F.lit(str(scoring_date))
).withColumn(
"model_version", F.lit(str(model_version))
).withColumn(
"score_bucket",
F.when(F.col("churn_probability") >= 0.8, "high")
.when(F.col("churn_probability") >= 0.5, "medium")
.otherwise("low")
)


# ── Step 4: Write predictions back to the lakehouse ──────────────────────────

predictions.write.format("iceberg") \
.mode("overwrite") \
.option("partitionOverwriteMode", "dynamic") \
.partitionBy("churn_score_date") \
.saveAsTable("production.ml_predictions.churn_scores")

# Log scoring run to MLflow for audit
with mlflow.start_run(run_name=f"batch-scoring-{scoring_date}"):
mlflow.log_params({
"scoring_date": str(scoring_date),
"model_version": model_version,
"model_uri": model_uri,
})
mlflow.log_metrics({
"scored_user_count": feature_count,
"high_risk_count": predictions.filter("score_bucket = 'high'").count(),
"avg_churn_probability": predictions.agg(
F.avg("churn_probability")
).first()[0],
})


# ── Step 5: Materialize high-risk predictions to Redis for real-time serving ─

import redis

r = redis.Redis(host="redis", port=6379, decode_responses=True)

high_risk = predictions.filter(
F.col("score_bucket") == "high"
).select("user_token", "churn_probability").collect()

pipe = r.pipeline(transaction=False)
for row in high_risk:
pipe.setex(
f"churn:{row['user_token']}",
86400, # TTL: 24 hours
str(round(row["churn_probability"], 4))
)
pipe.execute()

print(f"Materialized {len(high_risk):,} high-risk scores to Redis")
print(f"Batch scoring complete for {scoring_date}")

Model Serving from the Lakehouse

Serving Architecture

The lakehouse serves ML predictions via two paths:

  1. Batch predictions in Iceberg (latency: hours): Predictions computed nightly by the batch scoring job, written to an Iceberg table. Low-latency read via Trino or DuckDB. Used for dashboards, reporting, and non-time-sensitive decisions (send a discount email tomorrow).

  2. Redis materialization (latency: milliseconds): High-risk or high-priority predictions from the batch job are materialized to Redis. The serving API reads Redis first (fast path) and falls back to Iceberg (slow path) for cache misses.

# FastAPI serving endpoint - demonstrates the two-path serving model
from fastapi import FastAPI, HTTPException
import redis
import duckdb
from functools import lru_cache

app = FastAPI()
r = redis.Redis(host="redis", port=6379, decode_responses=True)

@lru_cache(maxsize=1)
def get_duckdb_connection():
"""Shared DuckDB connection for Iceberg fallback reads."""
con = duckdb.connect()
con.execute("INSTALL iceberg; LOAD iceberg; INSTALL httpfs; LOAD httpfs;")
con.execute("SET s3_region='us-east-1';")
return con

@app.get("/churn-score/{user_token}")
async def get_churn_score(user_token: str):
"""
Return churn probability for a user.
Fast path: Redis (< 5ms)
Slow path: DuckDB → Iceberg (< 500ms)
"""

# Fast path: check Redis
cached = r.get(f"churn:{user_token}")
if cached:
return {
"user_token": user_token,
"churn_probability": float(cached),
"source": "cache",
"latency_path": "redis"
}

# Slow path: query Iceberg via DuckDB
con = get_duckdb_connection()
result = con.execute("""
SELECT churn_probability, model_version, churn_score_date
FROM iceberg_scan('s3://my-lakehouse/iceberg/ml_predictions/churn_scores')
WHERE user_token = ?
AND churn_score_date = current_date - INTERVAL 1 DAY
LIMIT 1
""", [user_token]).fetchone()

if result is None:
# User has no recent score - return default
return {
"user_token": user_token,
"churn_probability": 0.05, # prior / default
"source": "default",
"latency_path": "no_score_found"
}

churn_prob, model_version, score_date = result
return {
"user_token": user_token,
"churn_probability": float(churn_prob),
"model_version": model_version,
"score_date": str(score_date),
"source": "lakehouse",
"latency_path": "duckdb_iceberg"
}

Data Versioning: Lakehouse as DVC Alternative

Data Version Control (DVC) was created to solve the ML data versioning problem: Git tracks code changes, but committing multi-GB datasets to Git is impractical. DVC tracks dataset files separately, storing them in a remote (S3, GCS) and keeping pointers in Git.

A lakehouse with Iceberg or Delta makes DVC unnecessary for most use cases:

DVC CapabilityLakehouse Equivalent
Version a datasetEach Iceberg write creates a new immutable snapshot
Pin a specific versionsnapshot_id=8473920183740928374 in scan options
Tag a versionAdd a metadata entry: ALTER TABLE ... SET TBLPROPERTIES ('training-v3.2' = '8473920')
Roll back to previous versionspark.sql("RESTORE TABLE ... TO VERSION AS OF 5")
Compare two versionsdf1 = table.scan(snapshot_id=v1); df2 = table.scan(snapshot_id=v2); df1.except(df2)
Push/pull large filesNot needed - data lives in S3 behind the table format
from pyiceberg.catalog import load_catalog

catalog = load_catalog("glue", **{"type": "glue", "region_name": "us-east-1"})
table = catalog.load_table("production.features.churn_features")

# Inspect all historical snapshots - equivalent to `dvc log`
for snapshot in table.snapshots():
print(f"Snapshot: {snapshot.snapshot_id}")
print(f" Timestamp: {snapshot.timestamp_ms}")
print(f" Operation: {snapshot.summary.get('operation', 'unknown')}")
print(f" Added rows: {snapshot.summary.get('added-records', 0)}")
print()

# Tag a snapshot as the official training dataset for model v3.2
# (Iceberg stores this as table property - queryable metadata)
table.update_properties() \
.set("training-dataset.v3.2.snapshot_id", str(8473920183740928374)) \
.set("training-dataset.v3.2.created_at", "2024-01-15T10:30:00Z") \
.set("training-dataset.v3.2.row_count", "1450000") \
.commit()

# Later - retrieve the tagged snapshot
properties = table.properties()
v32_snapshot_id = int(properties["training-dataset.v3.2.snapshot_id"])

# Load exactly the same data
training_data_v32 = table.scan(snapshot_id=v32_snapshot_id).to_arrow().to_pandas()

Production Engineering Notes

Avoiding Training-Serving Skew

Training-serving skew is the silent killer of ML production systems. It occurs when the features used for training are computed differently from the features used at serving time.

# BAD: Two different feature computation paths
# Training:
avg_amount = training_df["amount"].rolling(window=30).mean() # pandas rolling

# Serving:
avg_amount = redis.hget(f"user:{user_id}", "avg_amount_30d") # precomputed differently

# GOOD: One feature computation function, used in both training and serving
def compute_avg_amount_30d(events_df: pd.DataFrame) -> pd.Series:
"""
Canonical feature computation - used for both training and serving.
Training: called on historical Iceberg data.
Serving: called on Redis-cached event history.
"""
return (
events_df
.sort_values("event_time")
.groupby("user_id")["amount"]
.transform(lambda x: x.rolling("30D", on=events_df.loc[x.index, "event_time"]).mean())
)

The lakehouse architecture supports this by making the same feature computation logic (dbt SQL or Spark job) run in both the training pipeline (on historical data) and the batch scoring pipeline (on current data). One definition, no drift.

Efficient Large-Scale Reads for Training

Loading 100M rows for training via pandas is impractical (memory explosion). Use Arrow and stream in batches:

from pyiceberg.catalog import load_catalog
import pyarrow as pa
import numpy as np

catalog = load_catalog("glue", **{"type": "glue", "region_name": "us-east-1"})
table = catalog.load_table("production.features.churn_features")

# Stream data in Arrow RecordBatches - avoids loading all data into memory
scan = table.scan(
row_filter="event_date >= '2023-01-01'",
selected_fields=["user_token", "txn_count_30d", "decline_rate_30d", "churn_label"],
snapshot_id=8473920183740928374,
)

X_batches = []
y_batches = []

for record_batch in scan.to_arrow_batch_reader():
batch_df = record_batch.to_pandas()
X_batches.append(batch_df.drop(columns=["user_token", "churn_label"]).values)
y_batches.append(batch_df["churn_label"].values)

X = np.concatenate(X_batches, axis=0)
y = np.concatenate(y_batches, axis=0)

print(f"Loaded {len(X):,} training examples with {X.shape[1]} features")
# Now X and y are numpy arrays - pass directly to sklearn, XGBoost, or PyTorch

:::danger Training on the Latest Snapshot Without Pinning Is a Reproducibility Violation If your training job loads features from an Iceberg table without pinning a snapshot ID, the training data changes every time new features are ingested. Running the same training job on Monday and Tuesday will produce different models even with identical hyperparameters - because the feature table has been updated between runs. Always capture the current snapshot ID before loading training data, log it to MLflow, and use snapshot_id= in all subsequent loads of that training run. This is not optional for production ML systems. :::

:::warning Feature Store and Lakehouse Are Not the Same Thing Feast, Tecton, and Hopsworks are feature stores - systems that manage feature computation, serving, and monitoring. A lakehouse is not a feature store replacement. A lakehouse is the ideal offline store backend for a feature store (Feast can read from Iceberg as its offline store), but it does not handle online serving (sub-millisecond Redis lookups), point-in-time correct joins (complex for time-series features), or feature monitoring. Use the lakehouse as the persistent, versioned store for features. Use a feature store layer (or Redis + batch materialization) for online serving. They are complementary, not competing. :::

:::tip Use Iceberg Snapshots as Your ML Data Version Control System Before your team invests in DVC or a custom data versioning solution, evaluate whether Iceberg's built-in snapshot model meets your needs. For most teams, it does: snapshots are immutable, indexed by timestamp and ID, can be tagged via table properties, and support instant rollback. DVC adds value for teams with a strong Git-centric workflow where all ML artifacts (data, models, code) must be versioned together in one system. But if your data already lives in a lakehouse, adding DVC creates a parallel versioning system with extra complexity and no additional data safety. :::

:::note The Feedback Loop: Write Predictions Back to the Lakehouse The most overlooked part of the ML lakehouse workflow is the feedback loop. Predictions written back to Iceberg become training data for the next generation of models. A churn model's predictions, combined with the actual churn outcome (observed 30 days later), become labeled training data. This feedback loop is only practical when predictions and outcomes live in the same system. The lakehouse closes this loop naturally - the batch scoring job writes predictions to Iceberg, the ground truth labeling job joins predictions against actual outcomes (also in Iceberg), and the labeled dataset is immediately available for the next training run. :::


Interview Questions and Answers

Q1: How does a lakehouse enable reproducible ML training? What does "reproducibility" actually require in practice?

Answer:

Reproducibility in ML means: given the same model architecture, hyperparameters, and training data, produce the same model weights (within floating-point tolerance). In practice, this requires three guarantees:

1. Identical training data: If two training runs use different data - even if the difference is "just" new records ingested between runs - they are not reproducible. Iceberg and Delta solve this with immutable snapshots. A training job captures the snapshot ID before loading data and logs it to MLflow. Any future run that loads the same snapshot ID reads the exact same rows, even if the table has received millions of new records in the interim.

2. Identical preprocessing: Feature computation logic must be deterministic and version-controlled. If you compute avg_amount_30d differently in January vs. July, training is not reproducible even with the same snapshot. Version your feature computation code alongside your training code (Git), and run the feature computation pipeline with the historical feature date to reproduce historical feature values.

3. Identical random seeds and software versions: ML frameworks have randomness in weight initialization, mini-batch ordering, and data sampling. Pin random seeds and library versions in your training environment.

The lakehouse specifically solves requirement 1 - the hardest one to solve with traditional storage. With a plain S3 bucket full of Parquet files, there is no mechanism to "pin" the state of the data at a specific point in time. With Iceberg, you have a first-class API for it.


Q2: What is training-serving skew, and how does the lakehouse architecture help prevent it?

Answer:

Training-serving skew is when the statistical distribution of features at training time differs from the distribution at serving time. It's one of the most common causes of ML production failures and one of the hardest to diagnose because it is silent - the model appears to work correctly during evaluation (on held-out training data) but performs poorly in production.

Root causes:

  • Different code paths for feature computation in training vs. serving
  • Training data preprocessing that uses global statistics (e.g., mean-normalization using the training set mean, but the serving pipeline uses a different mean)
  • Time-based features computed differently: training uses a fixed window, serving uses a rolling window that doesn't match
  • Missing value imputation strategies that differ between training and serving

How the lakehouse helps:

The lakehouse provides one canonical feature table (ml_features.churn_features) computed by one pipeline (a dbt model or Spark job with a single definition). Both the training pipeline and the batch scoring pipeline read from this same table:

  • Training: reads the feature table at snapshot X (historical period)
  • Batch scoring: reads the feature table at the latest snapshot (current period)

Because both read from the same table with the same schema and the same column definitions, the feature computation is identical - there is only one code path. The training pipeline cannot accidentally compute features differently from the serving pipeline if they're reading from the same Iceberg table.

The remaining risk is time-based features: a 30-day rolling average computed as of January 1st will be different from the same feature computed as of July 1st because the underlying events are different. This is expected - it's data drift, not code drift. Monitor it with feature distribution tracking (log feature statistics per training run and per scoring run, alert when distributions diverge).


Q3: How would you design the data pipeline for an ML model that needs both historical (batch) features and near-real-time (streaming) features?

Answer:

This architecture is called a Lambda Architecture applied to features, or more modernly, a Lakehouse Feature Architecture. It has two paths:

Batch path (for slowly-changing features):

  1. Raw events → Iceberg raw table (Spark/Flink ingestion, hourly or daily)
  2. Iceberg raw → dbt feature models → Iceberg feature table (daily dbt run)
  3. Training reads the feature table at a specific snapshot (versioned, reproducible)
  4. Batch scoring reads the latest feature table snapshot (current features for all users)

Batch features include: 30-day rolling averages, 90-day counts, account age, historical ratings. These change slowly enough that hourly or daily recomputation is acceptable.

Streaming path (for rapidly-changing features):

  1. Operational events → Kafka topic (real-time)
  2. Kafka → Flink → Iceberg MOR table (5-minute windows, committed every checkpoint)
  3. Spark Structured Streaming reads new Iceberg snapshots for near-real-time scoring
  4. High-risk predictions materialized to Redis for API serving

Streaming features include: transaction count in the last 5 minutes, current session activity, recent declined attempts, real-time merchant category.

At training time: replay the streaming path using historical Kafka events (Kafka retains events for configurable retention periods) or read historical Iceberg snapshots that were written by the Flink job. This allows training on the combined batch + streaming feature set.

The join at serving time: the serving API (or batch scoring job) reads batch features from Iceberg and streaming features from Redis (or from the Iceberg MOR table if Redis misses), joins them by user token, and scores. The join logic is the same in both training and serving - this is where the lakehouse architecture prevents skew by making the feature tables the single source of truth.


Q4: Why would you store model predictions back in the lakehouse instead of in a separate database?

Answer:

Storing predictions in the lakehouse (Iceberg) rather than a separate database (PostgreSQL, MongoDB) has several advantages:

1. Analytical queries on predictions: "What percentage of users scored high risk last month? How did the distribution change after we retrained the model?" These queries are trivial in Trino or DuckDB against an Iceberg prediction table. Against a transactional database, they require index-aware query planning and often degrade performance at scale.

2. Joining predictions with features and outcomes for model evaluation: The most important post-deployment activity is evaluating whether the model's predictions were correct. This requires joining: prediction table (user_token, churn_probability, model_version) + outcome table (user_token, actually_churned, churn_date) + feature table (user_token, feature_values_at_prediction_time). All three are Iceberg tables; joining them in Spark or Trino is a single SQL query. If predictions were in PostgreSQL and the rest in the lakehouse, this join would require ETL.

3. The feedback loop: Labeled data (prediction + ground truth) becomes the training data for the next model generation. If predictions are already in Iceberg, the labeling job is a simple join. If predictions are in a separate database, you first export them to the lakehouse, then join.

4. Time travel for retrospective analysis: "Show me every prediction we made for user X over the last 6 months, with the features that produced each prediction." With Iceberg time travel on the prediction table, this is trivial. With a transactional database that overwrites old predictions, this requires separate audit logging.

The pattern is: use Redis for the low-latency serving path (cache, fast lookups), but always write predictions to Iceberg as the system of record. Redis is ephemeral and optimized for point lookups; the lakehouse is persistent, queryable, and joinable.


Q5: How do you handle point-in-time correct feature joins for ML training, and why does this matter?

Answer:

Point-in-time correctness is the requirement that, when creating a training dataset, each training example only uses features that were available at the time of the event - not features from the future.

The problem without point-in-time correctness (label leakage):

Imagine training a churn model. For each user observation on day D, you want to use the user's 30-day transaction count as a feature. If you naively join the user's current 30-day count (computed at training time), you're using future transactions for past observations:

User A:
Observation date: 2024-01-15 (did they churn in the next 30 days?)
Features joined at 2024-03-01 (when training job ran):
txn_count_30d = 15 ← includes transactions from Jan 16 to Feb 14
The model "sees the future" during training

The correct approach - point-in-time join using Iceberg time travel:

from pyiceberg.catalog import load_catalog
import pandas as pd

catalog = load_catalog("glue", **{"type": "glue", "region_name": "us-east-1"})
feature_table = catalog.load_table("production.ml_features.user_features")

# For each observation date, load the feature snapshot from BEFORE that date
observations = pd.DataFrame({
"user_token": ["u1", "u2", "u3"],
"observation_date": ["2024-01-15", "2024-01-20", "2024-02-01"],
"churn_label": [1, 0, 1]
})

point_in_time_features = []

for obs_date in observations["observation_date"].unique():
# Find the snapshot that was current at obs_date
obs_timestamp_ms = int(pd.Timestamp(obs_date).timestamp() * 1000)
snapshot_at_obs_date = max(
(s for s in feature_table.snapshots() if s.timestamp_ms <= obs_timestamp_ms),
key=lambda s: s.timestamp_ms
)

# Load features as they existed on the observation date
features_at_obs = feature_table.scan(
snapshot_id=snapshot_at_obs_date.snapshot_id,
row_filter=f"feature_date = '{obs_date}'"
).to_arrow().to_pandas()

features_at_obs["observation_date"] = obs_date
point_in_time_features.append(features_at_obs)

# Join observations with their point-in-time correct features
pit_features_df = pd.concat(point_in_time_features)
training_data = observations.merge(
pit_features_df,
on=["user_token", "observation_date"],
how="left"
)

Feature stores (Feast, Tecton) automate this point-in-time join. When using a lakehouse directly, you must implement it yourself using snapshot time travel. The Iceberg snapshot model makes this possible - without it (raw Parquet, Snowflake without time travel), point-in-time correctness requires maintaining separate historical copies of each feature table, which is extremely expensive to store and manage.


Q6: What is the "feature store vs. lakehouse" debate, and how do you think about when to use each?

Answer:

The debate is not really a debate - it's a question of layer responsibility. The lakehouse and the feature store solve different problems and are best used together.

What a lakehouse provides for ML:

  • Large-scale raw data storage and access
  • Versioned snapshots for reproducible training
  • Efficient batch feature computation (Spark + dbt)
  • SQL-queryable predictions and feature history
  • GDPR compliance (delete rows from training data)

What a feature store adds:

  • Online serving: sub-millisecond feature lookup during model inference (Redis or DynamoDB backend)
  • Point-in-time correct joins: automated, correct handling of temporal feature joins during training
  • Feature sharing: a catalog where teams publish and discover features, with versioned definitions
  • Feature monitoring: tracking feature distribution drift between training and serving in real time
  • Backfill: automated backfilling of historical feature values for new features

The integration pattern:

  • Lakehouse is the offline store backend: Feast can be configured to use an Iceberg or Delta table as its offline store. Training dataset generation reads from the lakehouse via the Feast SDK, with point-in-time joins handled by Feast.
  • Feast/Tecton handles the online materialization: reads batch features from the lakehouse, materializes them to Redis or DynamoDB, serves them at < 10ms.
  • The lakehouse stores the raw events and base features. The feature store layer adds serving and monitoring on top.

When you don't need a feature store:

  • Batch-only prediction (scoring runs daily, no real-time API)
  • Small team, few models, features computed fresh per batch
  • Features simple enough that Redis precomputation by a custom script suffices

When you do need a feature store:

  • Real-time serving APIs requiring < 10ms feature retrieval
  • Multiple teams sharing feature definitions (feature reuse across models)
  • Complex temporal features requiring point-in-time correct training
  • Feature drift monitoring at production scale

For most teams in the first 12–18 months of productionizing ML: use the lakehouse without a feature store. When you hit the scaling or operational limits of manual feature management, adopt a feature store with the lakehouse as the offline store backend.

© 2026 EngineersOfAI. All rights reserved.