Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::

Lakehouse Architecture for ML

The Production Moment

The data engineering team is in the middle of what they privately call "the migration from hell."

The platform was originally built the standard way: a data lake (S3 + Parquet for cheap storage, flexible schema) sitting alongside a data warehouse (Snowflake for SQL analytics). The ML team reads raw data from S3, runs feature engineering in Spark, and writes features back to S3. The analytics team queries Snowflake for dashboards and reports. Two separate systems, each good at their specific job.

The problems emerged gradually. The ML feature engineering pipeline reads 8 TB from S3 and writes results back to S3. Then a separate ETL process moves feature data from S3 into Snowflake so the analytics team can query it. Then another process moves from Snowflake back to S3 for the next training run. Each pipeline has a latency: data from Tuesday is available in Snowflake by Wednesday noon, then available for training by Thursday morning. By the time the ML model trains, the "current" features are 36-48 hours old.

The second problem: the data warehouse costs $40,000/month. Half that is for queries the ML team runs just to inspect training data quality - queries that Snowflake is fast at but S3 + Spark could handle for a fraction of the cost.

The third problem: schema changes require coordinated migrations across two systems. Adding a new feature column means updating the S3 Parquet schema, the Snowflake table, the ETL pipeline, and the feature documentation. Every schema change is a 3-day project.

The solution the team is migrating to: a lakehouse. One storage layer (S3) with an open table format (Delta Lake) that serves both the ML workloads and the SQL analytics workloads. No data duplication, no ETL between lake and warehouse, no latency from round-trips. The ML team reads features directly from Delta Lake. The analytics team queries Delta Lake with Trino or DuckDB. Same data, same version, same schema - one system.

Why Lakehouse: The Unification of Lake and Warehouse

The lakehouse pattern emerged from a simple observation: data warehouses are expensive because they provide reliability and performance guarantees that cloud object storage cannot. But those guarantees (ACID transactions, schema enforcement, efficient queries) can now be added to object storage using open table formats - Delta Lake, Apache Iceberg, Apache Hudi.

CapabilityData WarehouseData Lake (raw)Lakehouse
Cost at scaleHigh ($$$)Low ($)Low ($)
SQL analyticsExcellentPoor (Athena: slow, expensive)Good (Trino, DuckDB)
Unstructured dataNoYesYes
ACID transactionsYesNoYes
Schema enforcementYesNoYes
Time travelNoNoYes
ML/Spark workloadsPoor (ETL required)GoodExcellent
DML (UPDATE, DELETE)YesNoYes
Streaming ingestLimitedNoYes (with Hudi/Flink)

The Three Open Table Formats

Delta Lake (Databricks)

Delta Lake is the most widely adopted open table format. Originally developed at Databricks (2019), it is now an open-source Linux Foundation project.

Core mechanism: a _delta_log/ directory containing JSON commit files that record every table operation. Each read replays the log to determine the current set of valid data files.

Key strengths:

  • Deepest Spark integration (Spark is Delta's primary engine)
  • Z-ordering for multi-dimensional data clustering
  • Generated columns and column mapping
  • Row-level change data feed (enableChangeDataFeed) for streaming CDC
  • Native Databricks Runtime optimizations (photon, liquid clustering)
from delta import DeltaTable
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
.appName("lakehouse_ml") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog"
) \
.getOrCreate()

# Enable Change Data Feed on a Delta table
# Allows streaming consumers to read only the changed rows
spark.sql("""
CREATE TABLE IF NOT EXISTS delta.`s3://lakehouse/user_features/`
USING delta
TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true',
'delta.logRetentionDuration' = 'interval 90 days',
'delta.deletedFileRetentionDuration' = 'interval 90 days'
)
""")

# Read only changed rows since version 100 (for downstream ML pipelines)
changes_df = spark.read \
.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 100) \
.load("s3://lakehouse/user_features/")

# Filter to only inserts and updates (ignore deletes unless you need them)
new_features = changes_df.filter(
F.col("_change_type").isin(["insert", "update_postimage"])
)

Apache Iceberg (Netflix/Apple)

Apache Iceberg was developed at Netflix (2017) and is now an Apache top-level project. It is the preferred choice at Apple, Netflix, Adobe, and many cloud-native organizations. Its key differentiator from Delta Lake is catalog flexibility and multi-engine support.

Iceberg's architecture: a metadata tree (JSON/Avro manifest files) rather than a sequential log. This makes Iceberg more efficient for very large tables with many partitions because operations don't require replaying the full history.

# Iceberg with Glue catalog (AWS-native setup)
spark = SparkSession.builder \
.appName("iceberg_ml_features") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.glue", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.glue.warehouse", "s3://lakehouse/") \
.getOrCreate()

# Create an Iceberg table with partition evolution support
spark.sql("""
CREATE TABLE IF NOT EXISTS glue.ml_features.user_features (
user_id STRING NOT NULL,
purchase_count INTEGER,
session_count INTEGER,
avg_order_value DOUBLE,
feature_date DATE NOT NULL
)
USING iceberg
PARTITIONED BY (days(feature_date))
TBLPROPERTIES (
'write.target-file-size-bytes' = '134217728',
'write.distribution-mode' = 'hash',
'history.expire.max-snapshot-age-ms' = '7776000000'
)
""")

# Iceberg partition evolution: add bucketing to existing date partition
# Delta Lake cannot do this without rewriting data
# Iceberg handles it as a metadata-only operation
spark.sql("""
ALTER TABLE glue.ml_features.user_features
ADD PARTITION FIELD bucket(256, user_id)
""")

# Time travel in Iceberg
spark.sql("""
SELECT COUNT(*) FROM glue.ml_features.user_features
FOR TIMESTAMP AS OF TIMESTAMP '2024-09-23 14:30:00'
""")

# Iceberg metadata inspection
spark.sql("""
SELECT snapshot_id, committed_at, operation, summary
FROM glue.ml_features.user_features.snapshots
ORDER BY committed_at DESC
LIMIT 20
""")

Apache Hudi (Uber)

Apache Hudi (Hadoop Upserts Deletes and Incrementals) was developed at Uber (2016) for their ride-sharing data platform. Hudi's primary design goal is efficient streaming upserts - incrementally processing CDC streams and maintaining low-latency tables.

Hudi's key differentiation: it is optimized for write-heavy workloads with frequent upserts, while Delta and Iceberg are optimized for batch writes with occasional updates.

# Hudi: streaming upserts with near-real-time freshness
from pyspark.sql import SparkSession, functions as F

# Hudi table write (Copy-on-Write table type)
def write_hudi_features(new_features_df, table_path: str, table_name: str):
"""
Write to a Hudi table with upsert semantics.
Hudi handles deduplication and incremental writes automatically.
"""
hudi_options = {
"hoodie.table.name": table_name,
"hoodie.datasource.write.recordkey.field": "user_id",
"hoodie.datasource.write.precombine.field": "updated_at",
"hoodie.datasource.write.partitionpath.field": "feature_date",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.hive_sync.enable": "false",

# Compaction: Hudi MOR tables compact asynchronously
"hoodie.compact.inline": "false",
"hoodie.compact.inline.max.delta.commits": "5",
}

new_features_df.write \
.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(table_path)


# Read incremental changes from Hudi (efficient for downstream ML pipelines)
def read_hudi_incremental(
table_path: str,
begin_instant: str,
) -> "DataFrame":
"""
Read only the records changed since begin_instant.
Hudi's incremental read is more efficient than Delta CDF for high-frequency updates.
begin_instant: Hudi commit instant string, e.g. "20240923143000000"
"""
return spark.read \
.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", begin_instant) \
.load(table_path)

Choosing Between Delta Lake, Iceberg, and Hudi

FactorChoose Delta LakeChoose IcebergChoose Hudi
Primary engineDatabricks/SparkMulti-engine (Trino, Flink, Spark)Spark + Flink
Cloud platformAWS, Azure, GCP with DatabricksAWS (Glue), multi-cloud neutralAWS EMR, Cloudera
Write patternBatch daily/hourlyBatch or streamingHigh-frequency streaming upserts
Update frequencyLow (daily partitions)Low to mediumHigh (sub-minute upserts)
Catalog requirementUnity Catalog or GlueNessie, REST, Glue (most flexible)Hive, Glue
BigQuery integrationExternal tables onlyNative Iceberg tablesLimited
Team expertiseMost mature ecosystemGrowing fastUber, streaming-focused teams

:::tip The Practical Answer in 2026 If you are starting a new project, use Delta Lake if you are primarily on Databricks or Spark, and Apache Iceberg if you need multi-engine flexibility (Trino, Flink, BigQuery, Snowflake) or AWS Glue catalog. Use Hudi only if your primary use case is high-frequency streaming upserts (sub-minute CDC from databases). :::

Medallion Architecture

The medallion architecture is the canonical data organization pattern for lakehouses. It defines three data quality layers:

Bronze Layer: Raw Data Preserved

The bronze layer stores raw data exactly as received from sources - no cleaning, no validation, no transformation. The goal is a permanent, immutable record of what was received, when it arrived, and from which source.

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

def ingest_to_bronze(
raw_df,
bronze_path: str,
source_name: str,
partition_date: str,
):
"""
Write raw data to Bronze layer with ingestion metadata.
No cleaning, no filtering - preserve exactly what was received.
"""
# Add ingestion metadata columns
bronze_df = raw_df \
.withColumn("_ingested_at", F.current_timestamp()) \
.withColumn("_ingestion_date", F.lit(partition_date)) \
.withColumn("_source", F.lit(source_name)) \
.withColumn("_raw_payload", F.to_json(F.struct("*")))

bronze_df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"_ingestion_date = '{partition_date}'") \
.save(f"{bronze_path}/{source_name}/")

return {"rows_ingested": bronze_df.count(), "source": source_name}


# Bronze layer schema: keep original columns + ingestion metadata
# Example: raw transaction events
# user_id STRING (as received -- may be null, malformed)
# transaction_id STRING (as received)
# amount STRING (as received -- may contain currency symbols)
# timestamp STRING (as received -- may be in various formats)
# _ingested_at TIMESTAMP (added at ingestion)
# _ingestion_date DATE (partition key)
# _source STRING ("kafka_transactions_topic" or "postgres_cdc")
# _raw_payload STRING (full JSON of original row -- for debugging)

Silver Layer: Cleaned and Validated

Silver transforms raw bronze data into clean, typed, validated records. Schema enforcement happens here. This is where you filter duplicates, cast types, validate ranges, and apply business rules.

def bronze_to_silver_transactions(
bronze_path: str,
silver_path: str,
processing_date: str,
):
"""
Clean and validate raw transaction records.
Silver layer: typed, deduplicated, schema-validated.
"""
# Read bronze
raw_df = spark.read.format("delta").load(bronze_path) \
.filter(F.col("_ingestion_date") == processing_date)

# Step 1: Cast and validate types
typed_df = raw_df \
.withColumn("transaction_id", F.col("transaction_id").cast("string")) \
.withColumn("user_id", F.col("user_id").cast("string")) \
.withColumn("amount",
# Remove currency symbols, cast to decimal
F.regexp_replace(F.col("amount"), "[^0-9.]", "").cast("decimal(18,2)")
) \
.withColumn("event_timestamp",
F.to_timestamp(F.col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSX")
)

# Step 2: Filter invalid records (log them separately for debugging)
valid_df = typed_df.filter(
F.col("user_id").isNotNull() &
F.col("transaction_id").isNotNull() &
F.col("amount").isNotNull() &
(F.col("amount") > 0) &
(F.col("amount") < 1_000_000) # sanity check: no $1M single transactions
)

invalid_df = typed_df.filter(
F.col("user_id").isNull() |
F.col("transaction_id").isNull() |
F.col("amount").isNull() |
(F.col("amount") <= 0) |
(F.col("amount") >= 1_000_000)
)

# Log invalid records to a quarantine table for investigation
if invalid_df.count() > 0:
invalid_df.write \
.format("delta") \
.mode("append") \
.save(f"{bronze_path}/quarantine/transactions/")

# Step 3: Deduplicate (keep most recent record per transaction_id)
from pyspark.sql.window import Window

dedup_df = valid_df \
.withColumn("row_num",
F.row_number().over(
Window.partitionBy("transaction_id")
.orderBy(F.col("_ingested_at").desc())
)
) \
.filter(F.col("row_num") == 1) \
.drop("row_num", "_raw_payload", "_source")

# Step 4: Write to Silver
dedup_df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"transaction_date = '{processing_date}'") \
.save(f"{silver_path}/transactions/")

return {
"raw_count": raw_df.count(),
"valid_count": valid_df.count(),
"invalid_count": invalid_df.count(),
"deduplicated_count": dedup_df.count(),
}

Gold Layer: ML and Analytics Ready

Gold layer aggregates silver data into ML features and analytics tables. Gold tables are the direct source for model training and dashboards.

def silver_to_gold_user_features(
silver_path: str,
gold_path: str,
feature_date: str,
lookback_days: int = 90,
):
"""
Compute user features from clean silver transaction data.
Gold layer: ML-ready feature vectors with point-in-time correctness.
"""
from datetime import datetime, timedelta

start_date = (
datetime.strptime(feature_date, "%Y-%m-%d") - timedelta(days=lookback_days)
).strftime("%Y-%m-%d")

# Read silver transactions for the lookback window
transactions = spark.read.format("delta").load(f"{silver_path}/transactions/") \
.filter(
(F.col("transaction_date") >= start_date) &
(F.col("transaction_date") <= feature_date)
)

from pyspark.sql.window import Window

# Compute user-level features
features_df = transactions.groupBy("user_id").agg(

# Volume features
F.count("*").alias("transaction_count_90d"),
F.sum("amount").alias("total_spend_90d"),
F.avg("amount").alias("avg_transaction_amount_90d"),

# Recency features
F.max("event_timestamp").alias("last_transaction_at"),
F.countDistinct("transaction_date").alias("active_days_90d"),

# Temporal patterns
F.sum(
F.when(F.dayofweek("event_timestamp").isin(1, 7), 1).otherwise(0)
).alias("weekend_transactions_90d"),

) \
.withColumn("feature_date", F.lit(feature_date)) \
.withColumn("days_since_last_transaction",
F.datediff(F.lit(feature_date), F.to_date(F.col("last_transaction_at")))
) \
.withColumn("weekend_transaction_ratio",
F.col("weekend_transactions_90d") / F.col("transaction_count_90d")
)

# Write to Gold
features_df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"feature_date = '{feature_date}'") \
.save(f"{gold_path}/user_features/")

return {"feature_date": feature_date, "user_count": features_df.count()}

Query Engines on the Lakehouse

The lakehouse pattern enables multiple query engines to read the same Delta/Iceberg tables simultaneously - without data movement or duplication.

Trino: SQL Analytics at Scale

Trino (formerly PrestoSQL) is a distributed SQL query engine that reads Delta and Iceberg tables directly from S3. It enables BI tools and SQL analysts to query the same data the ML team uses, without ETL to a separate warehouse.

-- Trino: analyze user feature distributions for ML data validation
-- This query runs directly on Delta Lake files in S3 -- no Snowflake needed

SELECT
feature_date,
COUNT(DISTINCT user_id) AS user_count,
AVG(transaction_count_90d) AS avg_txn_count,
PERCENTILE_CONT(0.5) WITHIN GROUP
(ORDER BY total_spend_90d) AS median_spend,
PERCENTILE_CONT(0.95) WITHIN GROUP
(ORDER BY total_spend_90d) AS p95_spend,
SUM(CASE WHEN transaction_count_90d = 0 THEN 1 ELSE 0 END) AS inactive_users,
AVG(days_since_last_transaction) AS avg_days_inactive
FROM delta.default.user_features
WHERE feature_date >= DATE_ADD('day', -30, CURRENT_DATE)
GROUP BY feature_date
ORDER BY feature_date DESC;

-- Trino: feature correlation analysis for feature selection
SELECT
CORR(transaction_count_90d, total_spend_90d) AS count_spend_corr,
CORR(active_days_90d, transaction_count_90d) AS days_count_corr,
CORR(weekend_transaction_ratio, total_spend_90d) AS weekend_spend_corr
FROM delta.default.user_features
WHERE feature_date = CURRENT_DATE - INTERVAL '1' DAY;

DuckDB: Local Analytics for Data Scientists

DuckDB is an embedded OLAP database that can query Parquet files (and Delta Lake tables via the delta extension) directly from a Python script or notebook. It is dramatically faster than pandas for analytical queries on medium-sized datasets (1GB to 1TB):

import duckdb

# DuckDB: read Delta Lake directly (no Spark, no cluster)
con = duckdb.connect()

# Install and load the Delta extension
con.execute("INSTALL delta; LOAD delta;")

# Query Delta Lake table directly from S3
# DuckDB reads the Delta transaction log to find current Parquet files
result = con.execute("""
SELECT
feature_date,
COUNT(*) as user_count,
AVG(transaction_count_90d) as avg_txn_count,
PERCENTILE_CONT(0.95) WITHIN GROUP
(ORDER BY total_spend_90d) as p95_spend
FROM delta_scan('s3://lakehouse/gold/user_features/')
WHERE feature_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY feature_date
ORDER BY feature_date
""").df()

print(result)


# DuckDB vs pandas performance comparison
import pandas as pd
import time

# Simulate 10M row feature dataset
parquet_path = "s3://lakehouse/gold/user_features/feature_date=2024-09-23/"

# pandas: scan and filter
start = time.time()
df = pd.read_parquet(parquet_path)
result_pd = df[df["transaction_count_90d"] > 10].groupby("feature_date")["total_spend_90d"].mean()
pd_time = time.time() - start

# DuckDB: same query
start = time.time()
result_ddb = con.execute(f"""
SELECT feature_date, AVG(total_spend_90d)
FROM parquet_scan('{parquet_path}')
WHERE transaction_count_90d > 10
GROUP BY feature_date
""").df()
ddb_time = time.time() - start

# DuckDB is typically 5-10x faster than pandas for analytical queries
# because it uses vectorized execution and columnar reads
print(f"pandas: {pd_time:.1f}s | DuckDB: {ddb_time:.1f}s | speedup: {pd_time/ddb_time:.1f}x")

ML on the Lakehouse: End-to-End Pipeline

Here is a complete ML training pipeline that reads from a Gold layer Delta table, trains a model, and logs data provenance to MLflow:

from pyspark.sql import SparkSession, functions as F
from delta import DeltaTable
import mlflow
import lightgbm as lgb
import pandas as pd
import numpy as np
from datetime import datetime

def run_lakehouse_ml_pipeline(
gold_features_path: str,
gold_labels_path: str,
training_date: str,
model_name: str = "user_conversion_model",
) -> dict:
"""
End-to-end ML training on the lakehouse.
Reads from Gold Delta tables, trains, logs to MLflow.
"""
spark = SparkSession.builder.appName("lakehouse_ml_training").getOrCreate()

with mlflow.start_run(run_name=f"{model_name}_{training_date}") as run:

# Step 1: Read features and labels from Gold layer
# Use timestampAsOf for point-in-time correct data
read_timestamp = f"{training_date} 00:00:00"

features_df = spark.read \
.format("delta") \
.option("timestampAsOf", read_timestamp) \
.load(gold_features_path) \
.filter(F.col("feature_date") >= "2024-06-25")

labels_df = spark.read \
.format("delta") \
.option("timestampAsOf", read_timestamp) \
.load(gold_labels_path) \
.select("user_id", "converted_within_30d")

training_data = features_df.join(labels_df, on="user_id", how="inner")

# Step 2: Log data provenance
feature_version = DeltaTable.forPath(spark, gold_features_path) \
.history(1).collect()[0]["version"]
label_version = DeltaTable.forPath(spark, gold_labels_path) \
.history(1).collect()[0]["version"]

mlflow.log_params({
"training_date": training_date,
"features_delta_path": gold_features_path,
"features_delta_version": feature_version,
"labels_delta_path": gold_labels_path,
"labels_delta_version": label_version,
"num_training_rows": training_data.count(),
})

# Step 3: Convert to pandas for training
pandas_df = training_data.toPandas()

feature_cols = [
"transaction_count_90d",
"total_spend_90d",
"avg_transaction_amount_90d",
"active_days_90d",
"days_since_last_transaction",
"weekend_transaction_ratio",
]

X = pandas_df[feature_cols].fillna(0)
y = pandas_df["converted_within_30d"].astype(int)

# Step 4: Train
model_params = {
"n_estimators": 500,
"learning_rate": 0.05,
"max_depth": 6,
"num_leaves": 63,
"min_child_samples": 100,
"colsample_bytree": 0.8,
}
mlflow.log_params(model_params)

model = lgb.LGBMClassifier(**model_params)
model.fit(X, y, eval_set=[(X, y)], callbacks=[lgb.early_stopping(50)])

# Step 5: Log metrics and model
from sklearn.metrics import roc_auc_score
train_auc = roc_auc_score(y, model.predict_proba(X)[:, 1])
mlflow.log_metric("train_auc", train_auc)

mlflow.lightgbm.log_model(
model,
artifact_path="model",
registered_model_name=model_name,
)

return {
"run_id": run.info.run_id,
"train_auc": train_auc,
"feature_version": feature_version,
"label_version": label_version,
}

Lakehouse vs Separate Lake + Warehouse

The classic architecture uses a separate data lake and data warehouse. The lakehouse collapses them into one. Here is a concrete cost and complexity comparison:

def compare_architectures(
data_volume_tb: float,
daily_ml_queries: int,
daily_analytics_queries: int,
team_size: int,
) -> dict:
"""
Rough cost comparison: separate lake+warehouse vs lakehouse.
Prices as of 2025.
"""
# === Separate Lake + Warehouse Architecture ===

# S3 storage cost
s3_monthly = data_volume_tb * 1000 * 0.023 # $0.023/GB/month

# Snowflake compute for analytics
# Assume medium warehouse (8 credits/hour), 8 hours/day, 20 days/month
snowflake_monthly = 8 * 8 * 20 * 3.00 # $3/credit on Standard edition

# ETL pipeline between lake and warehouse (Fivetran/Airbyte)
etl_monthly = 1000 # rough estimate for data connector cost

# Engineering time: 2 engineers maintaining dual-system ETL
eng_overhead_monthly = 2 * 10000 / 4 # 25% of 2 eng time at $10K/mo each

separate_total = s3_monthly + snowflake_monthly + etl_monthly + eng_overhead_monthly

# === Lakehouse Architecture ===

# S3 storage (same data, no duplication)
lakehouse_s3 = s3_monthly

# Trino cluster (analytics) -- open-source, run on EC2
# Assume 4 x r5.2xlarge ($0.504/hr), 10 hours/day active use
trino_monthly = 4 * 0.504 * 10 * 30

# Spark/EMR for ML (same cost as before, already in lake architecture)
emr_monthly = 0 # already accounted for

# No ETL needed (no data movement between lake and warehouse)

# Reduced engineering overhead (one system, not two)
eng_overhead_lakehouse = eng_overhead_monthly * 0.4 # 60% reduction

lakehouse_total = lakehouse_s3 + trino_monthly + eng_overhead_lakehouse

return {
"separate_lake_warehouse_monthly": round(separate_total),
"lakehouse_monthly": round(lakehouse_total),
"monthly_savings": round(separate_total - lakehouse_total),
"annual_savings": round((separate_total - lakehouse_total) * 12),
"data_freshness_separate": "24-48 hours (ETL latency)",
"data_freshness_lakehouse": "Minutes (direct write to Delta)",
"schema_change_effort_separate": "3+ days (lake + warehouse + ETL)",
"schema_change_effort_lakehouse": "Hours (one system, mergeSchema=true)",
}


# Example output for 10 TB data, 50 analytics queries/day, 10 ML jobs/day, 8-person team:
print(compare_architectures(10.0, 10, 50, 8))
# Expected: $15K-25K/month savings, 36-48h latency improvement, 10x schema change speed

Production Engineering Notes

Catalog: The Lakehouse Metadata Layer

A catalog tracks all tables, schemas, locations, and access permissions in the lakehouse. Without a catalog, tables are just S3 paths that engineers must know and manage manually.

# Unity Catalog (Databricks) -- three-level namespace: catalog.schema.table
spark.sql("USE CATALOG ml_platform")
spark.sql("USE SCHEMA gold")

# Create a managed table (Unity Catalog tracks location, schema, ownership)
spark.sql("""
CREATE TABLE IF NOT EXISTS user_features (
user_id STRING NOT NULL,
transaction_count_90d INTEGER,
total_spend_90d DOUBLE,
feature_date DATE NOT NULL
)
USING delta
PARTITION BY (feature_date)
COMMENT 'User transaction features for ML models. Updated daily by feature_pipeline job.'
TBLPROPERTIES (
'owner' = 'ml-platform-team',
'delta.logRetentionDuration' = 'interval 90 days',
'delta.enableChangeDataFeed' = 'true'
)
""")

# Grant access: ML training job can read, feature pipeline can write
spark.sql("GRANT SELECT ON TABLE user_features TO `ml-training-service-principal`")
spark.sql("GRANT MODIFY ON TABLE user_features TO `feature-pipeline-service-principal`")


# Apache Polaris / REST Catalog (open standard, vendor-neutral)
# Used with Iceberg for multi-engine access control
# catalog_config = {
# "type": "rest",
# "uri": "https://catalog.company.com/api/catalog",
# "credential": "...",
# "warehouse": "s3://lakehouse/",
# }

Monitoring Lakehouse Table Health

class LakehouseHealthMonitor:
"""
Monitor data freshness, quality, and storage efficiency
across all Gold layer tables.
"""
def __init__(self, spark, gold_tables: dict):
self.spark = spark
self.gold_tables = gold_tables # {"table_name": "s3://path/"}

def check_all_tables(self) -> list:
"""Run health checks on all monitored Gold tables."""
results = []

for table_name, table_path in self.gold_tables.items():
result = self._check_table(table_name, table_path)
results.append(result)

if result["is_stale"]:
self._alert_stale_table(table_name, result["hours_since_update"])

if result["small_file_ratio"] > 0.3:
self._trigger_compaction(table_path)

return results

def _check_table(self, table_name: str, table_path: str) -> dict:
import datetime

delta_table = DeltaTable.forPath(self.spark, table_path)
history = delta_table.history(1).collect()[0]

last_updated = history["timestamp"]
hours_since_update = (
datetime.datetime.utcnow() - last_updated.replace(tzinfo=None)
).total_seconds() / 3600

# Check file size distribution (indicator of small file problem)
detail = self.spark.sql(
f"DESCRIBE DETAIL delta.`{table_path}`"
).collect()[0]

num_files = detail["numFiles"]
size_bytes = detail["sizeInBytes"]
avg_file_size_mb = (size_bytes / num_files / 1024 / 1024) if num_files > 0 else 0

# Files under 32MB are "small" for Delta Lake
# If avg file size is under 64MB, we likely have a small file problem
is_small_file_problem = avg_file_size_mb < 64

return {
"table_name": table_name,
"last_updated": str(last_updated),
"hours_since_update": round(hours_since_update, 1),
"is_stale": hours_since_update > 26, # alert if more than 26h old
"num_files": num_files,
"avg_file_size_mb": round(avg_file_size_mb, 1),
"small_file_ratio": 1.0 if is_small_file_problem else 0.0,
"current_version": history["version"],
}

def _alert_stale_table(self, table_name: str, hours: float):
print(f"ALERT: {table_name} is {hours:.1f} hours old (expected: < 26h)")

def _trigger_compaction(self, table_path: str):
print(f"Triggering OPTIMIZE for {table_path}")
DeltaTable.forPath(self.spark, table_path) \
.optimize() \
.where("feature_date >= date_sub(current_date(), 7)") \
.executeCompaction()

:::danger Schema-on-Read Trap in Bronze Layer Bronze layer's flexibility (schema-on-read) is a feature, not a bug. But resist the temptation to start using Bronze data directly in ML models. Bronze data has no quality guarantees - it may contain nulls where values should exist, malformed strings, duplicate events, or schema inconsistencies across different source versions. Always train ML models on Gold layer data that has passed through Silver validation. Use Bronze data only for debugging and reprocessing. :::

:::warning The Medallion Architecture Is Not Always Three Hops Some teams add unnecessary hops - Bronze to Silver1 to Silver2 to Gold, each with marginal transformations. This increases pipeline latency and operational complexity without proportional benefit. The right number of layers is determined by your data quality requirements: if your source data is already clean (e.g., from a reliable internal API), you may not need a Bronze layer at all. If your Gold layer has two distinct consumer groups with different requirements, consider two Gold sub-layers. The pattern is a guide, not a law. :::

:::tip DuckDB for Data Science Exploration Data scientists should use DuckDB with the Delta or Iceberg extension for local exploration of Gold layer tables. DuckDB reads directly from S3, handles the Delta transaction log correctly (reading only the current valid Parquet files), and runs analytical queries 5-20x faster than pandas on the same data - all without spinning up a Spark cluster. The dev cycle is: explore in DuckDB locally, then write the production feature engineering in Spark when you need to scale. :::

Interview Q&A

Q1: What is a lakehouse architecture and how does it differ from having a separate data lake and data warehouse?

A lakehouse adds an open table format layer (Delta Lake, Apache Iceberg, or Apache Hudi) on top of object storage (S3, GCS, ADLS), providing the SQL analytics capabilities of a data warehouse (ACID transactions, schema enforcement, efficient SQL queries) without the cost and data duplication of maintaining a separate warehouse.

In a separate lake + warehouse architecture, data flows: raw sources -> S3 (lake) -> ETL -> Snowflake/BigQuery (warehouse). This creates two copies of the data, 24-48 hours of ETL latency, and schema changes that must be synchronized across two systems. The lakehouse eliminates the ETL layer: sources write directly to Delta Lake, and both ML pipelines (Spark) and SQL analytics (Trino, DuckDB) read from the same tables. Benefits: no data duplication ($$$), no ETL latency (minutes instead of hours), single schema change propagation (mergeSchema), and unified access control. Trade-offs: the lakehouse SQL query performance (Trino) is somewhat less optimized than a dedicated data warehouse for very complex analytical workloads, though this gap has narrowed significantly.

Q2: Explain the medallion architecture (Bronze, Silver, Gold). Why is each layer necessary?

The medallion architecture organizes data into three quality tiers. Bronze stores raw data exactly as received - no transformations, with ingestion metadata added. Bronze is the system of record: if upstream data is corrupted or re-delivered, you can reprocess from Bronze rather than asking the source to resend. Silver applies cleaning (type casting), validation (null checks, range checks), deduplication, and business rule enforcement. Silver is typed and clean but not yet aggregated for any specific use case. Gold aggregates and joins Silver data into use-case-specific tables - user feature vectors for ML, revenue summaries for BI, event counts for dashboards.

Why each layer is necessary: Bronze ensures you never lose raw data (essential for debugging "why did the model predict X?"). Silver ensures consumers don't implement inconsistent data cleaning logic independently (if every team filters duplicates differently, models trained on different pipelines are incomparable). Gold ensures ML models train on consistent, pre-computed features rather than re-implementing the same aggregations in multiple jobs.

Q3: Compare Delta Lake, Apache Iceberg, and Apache Hudi. When would you choose each?

All three are open table formats that add ACID transactions, time travel, and schema evolution to object storage. The differences are in architecture and optimization target.

Delta Lake uses a sequential transaction log (_delta_log/) and has the deepest Spark integration. It is the best choice if your primary engine is Databricks or Spark. Its Z-ordering is the most mature, and its CDC feed (Change Data Feed) is well-integrated with Spark Structured Streaming.

Apache Iceberg uses a metadata tree (manifest files) rather than a sequential log, which makes it more efficient for very large tables and provides better multi-engine support. It is the best choice if you need to query the same tables from Spark, Trino, Flink, BigQuery, and Snowflake. Its partition evolution (adding bucketing to existing date partitions) is more flexible than Delta's.

Apache Hudi is optimized for high-frequency streaming upserts - frequent small writes from CDC streams. It has built-in async compaction designed for this write pattern. It is the best choice if you are processing database CDC streams (Kafka + Debezium) and need sub-minute feature freshness with upsert semantics.

Q4: What is Z-ordering and how does it improve ML training query performance?

Z-ordering is a multi-dimensional data layout optimization in Delta Lake and Iceberg. It physically colocates rows with similar values in the Z-ordered columns into the same Parquet files. When a query filters on those columns (e.g., WHERE user_id = 'u123'), the query engine reads file-level min/max statistics and skips files whose range doesn't include the filter value.

For ML training queries, the impact is largest for selective lookups (retrieving features for a specific user or cohort) rather than full table scans. For a 5 TB user features table with user_id Z-ordered, a query that retrieves all features for 10,000 specific users might scan 200 GB instead of 5 TB - a 25x reduction. The real scenario from the lesson introduction: a 5 TB full-table scan that ran for 2 hours and 17 minutes became a 4-minute query after Z-ordering on user_id and feature_date.

The implementation: delta_table.optimize().where("feature_date >= date_sub(current_date(), 7)").executeZOrderBy("user_id", "feature_date"). Run on recently written partitions only - historical partitions were already optimized and rarely change. OPTIMIZE ZORDER rewrites data files so scope it narrowly.

Q5: How do you design a production lakehouse pipeline with data quality guarantees, and how do you handle late-arriving data?

The core design: Bronze receives raw data with ingestion timestamps. Silver validates and cleans in a scheduled batch job (hourly or daily). Gold computes features from Silver with a configurable lookback window. Each layer uses replaceWhere writes so processing is idempotent (rerunning the job for a given date overwrites that date without affecting others).

Late-arriving data handling: rather than processing only "today's" data, always process a lookback window of 2-3 days in Bronze-to-Silver and Silver-to-Gold pipelines. If an event from 2024-09-21 arrives on 2024-09-23 (48 hours late), the next Silver processing run for dates 2024-09-21 to 2024-09-23 will pick it up. Since writes are idempotent (replaceWhere on date partition), reprocessing 2-3 days is safe and cheap.

For features that depend on multi-day windows (90-day aggregations), late data means the features for the users who had late events will be slightly stale. Accept this as a trade-off, or add a "recompute affected users" step that identifies which users had late events and recomputes only their features.

Data quality gates: add a validation step between Silver and Gold that checks row counts (Silver should have at least 90% of expected rows), key metrics (median transaction amount within historical range), and referential integrity (all user_ids in transactions exist in user_profiles). Fail the pipeline and alert if gates fail - don't propagate bad data to Gold silently.

Summary

The lakehouse has become the standard data architecture for ML systems at scale, replacing the expensive and operationally complex pattern of separate data lake and data warehouse. Open table formats (Delta Lake, Apache Iceberg, Apache Hudi) provide the reliability of a warehouse (ACID, schema enforcement, time travel, DML) at the cost of a data lake (object storage pricing).

The medallion architecture (Bronze, Silver, Gold) provides clear data quality tiers: raw-as-received in Bronze, clean and typed in Silver, ML-ready in Gold. Each layer serves a distinct purpose and should not be bypassed. ML models train on Gold. Debugging happens in Bronze. Data quality enforcement happens in Silver.

The query engine diversity is a feature: Spark for large-scale feature engineering, Trino for SQL analytics by the business team, DuckDB for local exploration by data scientists, all reading the same Delta or Iceberg tables. One storage layer, multiple query interfaces - no ETL, no duplication, no staleness.

The team applying the lakehouse pattern from the opening scenario - collapsing lake + warehouse into one system - can expect: 24-48 hours of ETL latency eliminated, schema change time from 3 days to hours, and meaningful reduction in infrastructure cost. The architectural simplification is the primary benefit, not any single technical feature.

© 2026 EngineersOfAI. All rights reserved.