Delta Lake and Iceberg for ML
The Silent Schema Change
It is 2:30 AM. Your retraining pipeline has been running for 6 hours. It is scheduled to finish by morning, load the new model into staging, and run automated evaluations before the 9 AM team review.
At 2:47 AM, the pipeline fails with a cryptic error:
pyspark.sql.utils.AnalysisException: Cannot resolve column name `user_segment_v2`
in schema [user_id, user_segment, purchase_history, session_count, ...]
The upstream data team renamed user_segment_v2 to user_segment two days ago as part of a "cleanup" migration. They sent a Slack message to #data-platform at 4 PM on a Tuesday. Your ML team was not in that channel.
Now it is 3 AM. You have no model ready for the 9 AM review. The previous model is still serving, but it was supposed to be replaced. And worse: the old column name user_segment_v2 no longer exists in the live table. You cannot even roll back to a previous run of the pipeline - the data it would consume is gone.
Delta Lake solves two of these three problems: it would have given you time travel to access the table state before the migration, and its schema enforcement would have caught the mismatch immediately rather than 6 hours in. The third problem - notification - is solved by data contracts (next lesson).
:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Processing Pipeline demo on the EngineersOfAI Playground - no code required. :::
What Is Delta Lake
Delta Lake is an open-source storage layer developed by Databricks (2019) that brings ACID transactions to data lakes. It sits on top of cloud object storage (S3, GCS, Azure Blob) and adds:
- ACID transactions: reads see a consistent snapshot; concurrent writes do not corrupt data
- Time travel: query any historical version of a table via timestamp or version number
- Schema enforcement: writes that violate the current schema fail fast
- Schema evolution: controlled, opt-in schema changes with full history
- Scalable metadata: handles tables with billions of rows and millions of files
Delta Lake is not a database - it is a table format. The data is stored in Parquet files in your object store. Delta adds a transaction log (_delta_log/) that records every change to the table.
Apache Iceberg is an alternative table format with similar capabilities, preferred in non-Databricks environments. We cover both.
Delta Lake Architecture
Each entry in the _delta_log/ is a JSON file describing one transaction: which files were added, which were removed, and the metadata of the change. To read the table at version , Delta replays the log from the last checkpoint up to version to determine which files are active.
Setting Up Delta Lake
pip install delta-spark pyspark
# or with Databricks runtime (already included)
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
# Build Spark session with Delta Lake extensions
builder = (
SparkSession.builder
.appName("ML Data Pipeline")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Writing Delta Tables for ML
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder.appName("ml-features").getOrCreate()
# Define the schema explicitly (enforced on write)
feature_schema = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("session_count_30d", IntegerType(), nullable=True),
StructField("purchase_amount_90d", DoubleType(), nullable=True),
StructField("user_segment", StringType(), nullable=True),
StructField("last_active_date", DateType(), nullable=True),
StructField("feature_version", StringType(), nullable=False),
StructField("created_at", TimestampType(), nullable=False),
])
# Write features to a Delta table
df_features = compute_features(df_raw)
(
df_features
.write
.format("delta")
.mode("overwrite") # or "append"
.option("overwriteSchema", "false") # REFUSE schema changes on overwrite
.partitionBy("last_active_date") # partition for query efficiency
.save("s3://ml-data/delta/user_features/")
)
# Read back
df = spark.read.format("delta").load("s3://ml-data/delta/user_features/")
print(f"Table has {df.count():,} records")
MERGE (Upsert) - Critical for Feature Stores
The MERGE operation is one of Delta Lake's most powerful capabilities. It lets you upsert records (update if exists, insert if not) atomically.
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://ml-data/delta/user_features/")
# New batch of features computed today
new_features = compute_daily_features(df_raw_today)
# Upsert: update existing users, insert new users
(
delta_table.alias("existing")
.merge(
new_features.alias("new"),
"existing.user_id = new.user_id",
)
.whenMatchedUpdate(set={
"session_count_30d": "new.session_count_30d",
"purchase_amount_90d": "new.purchase_amount_90d",
"user_segment": "new.user_segment",
"last_active_date": "new.last_active_date",
"created_at": "new.created_at",
# Keep feature_version unchanged unless explicitly updating it
})
.whenNotMatchedInsert(values={
"user_id": "new.user_id",
"session_count_30d": "new.session_count_30d",
"purchase_amount_90d": "new.purchase_amount_90d",
"user_segment": "new.user_segment",
"last_active_date": "new.last_active_date",
"feature_version": "new.feature_version",
"created_at": "new.created_at",
})
.execute()
)
Time Travel: The Core ML Use Case
Time travel is Delta Lake's most important feature for ML teams. It lets you query a table as it existed at any past version or timestamp.
Query by Version Number
# Read the table as it was at version 5
df_v5 = (
spark.read
.format("delta")
.option("versionAsOf", 5)
.load("s3://ml-data/delta/user_features/")
)
# Read as it was two weeks ago
from datetime import datetime, timedelta
two_weeks_ago = (datetime.now() - timedelta(weeks=2)).isoformat()
df_historical = (
spark.read
.format("delta")
.option("timestampAsOf", two_weeks_ago)
.load("s3://ml-data/delta/user_features/")
)
Inspect the Table History
delta_table = DeltaTable.forPath(spark, "s3://ml-data/delta/user_features/")
# Show all historical versions
history = delta_table.history()
history.select(
"version", "timestamp", "operation", "operationParameters",
"operationMetrics.numAddedFiles", "operationMetrics.numRemovedFiles",
"userMetadata", # custom metadata you can attach to each write
).show(truncate=False)
+-------+-------------------+-----------+--------------------------+
|version|timestamp |operation |numAddedFiles|numRemovedFiles|
+-------+-------------------+-----------+-------------+---------------+
|7 |2024-10-15 09:23:11|MERGE |3 |2 |
|6 |2024-10-14 18:45:02|OPTIMIZE |1 |12 |
|5 |2024-10-13 09:01:33|WRITE |4 |0 |
|4 |2024-10-12 09:00:51|WRITE |4 |0 |
...
Using Time Travel for ML Reproducibility
The critical use case: when you log dataset_version=7 in your experiment tracker, you can always reconstruct the exact training data:
import mlflow
def load_training_data(
table_path: str,
version: int | None = None,
timestamp: str | None = None,
) -> "DataFrame":
"""Load training data at a specific version."""
reader = spark.read.format("delta")
if version is not None:
reader = reader.option("versionAsOf", version)
version_label = f"v{version}"
elif timestamp is not None:
reader = reader.option("timestampAsOf", timestamp)
version_label = timestamp
else:
raise ValueError("Must specify version or timestamp")
df = reader.load(table_path)
# Log the exact version being used
mlflow.log_params({
"dataset_path": table_path,
"dataset_version": version_label,
"dataset_record_count": df.count(),
})
return df
# In training run: pin to version 7
df_train = load_training_data(
"s3://ml-data/delta/user_features/",
version=7,
)
Schema Evolution
Delta Lake supports controlled schema evolution. By default, writing data with a different schema raises an error - this is schema enforcement, and it is the right default. Schema evolution is opt-in.
Schema Evolution Options
# Option 1: Merge schema - adds new columns, keeps old ones
(
df_with_new_columns
.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") # adds new_column to the schema
.save("s3://ml-data/delta/user_features/")
)
# Option 2: Overwrite schema - replaces the schema entirely (dangerous)
(
df_different_schema
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true") # replaces schema - loses type safety
.save("s3://ml-data/delta/user_features/")
)
# Option 3: Explicit DDL (SQL) - recommended for planned schema changes
spark.sql("""
ALTER TABLE delta.`s3://ml-data/delta/user_features/`
ADD COLUMNS (new_feature_score DOUBLE COMMENT 'model score v3 feature')
""")
# Option 4: Rename a column (requires column mapping mode)
spark.sql("""
ALTER TABLE delta.`s3://ml-data/delta/user_features/`
RENAME COLUMN user_segment_v2 TO user_segment
""")
The Key Rule for ML Teams
After every schema change, run your training pipeline in a staging environment before promoting to production. Never apply schema changes to tables that live production models consume without testing the downstream pipelines.
OPTIMIZE and Z-Ordering
Delta Lake accumulates many small Parquet files over time (from many MERGE/APPEND operations). These hurt query performance. OPTIMIZE compacts them. Z-ORDER clusters related records together for faster range queries.
# Compact small files into larger ones
delta_table.optimize().executeCompaction()
# Z-order by the columns most commonly used in WHERE clauses
# Z-ordering co-locates rows with similar values in the same files
delta_table.optimize().executeZOrderBy("user_id", "last_active_date")
# After Z-ordering, queries like:
# WHERE user_id = 'u123' AND last_active_date > '2024-09-01'
# will skip irrelevant files (data skipping), dramatically reducing scan cost
When to Run OPTIMIZE
Run OPTIMIZE: after large MERGE operations that create many small files, before training jobs that do full-table scans, and on a weekly schedule for actively written tables.
VACUUM: Managing Storage Costs
Delta Lake's time travel retains old data files. VACUUM deletes files older than the retention threshold.
# Default retention: 7 days (168 hours)
# After vacuum, you cannot time-travel before the vacuum timestamp
delta_table.vacuum()
# Custom retention (be careful - removes time travel ability)
delta_table.vacuum(retentionHours=336) # 14 days
# Show what would be deleted (dry run - useful before committing)
delta_table.vacuum(retentionHours=168, dryRun=True).show()
For ML tables, retain at least as long as your longest training run plus a buffer. If training takes 12 hours and you want to be able to reproduce any run from the last 30 days, set retention to at least 30 days (720 hours), not the default 7 days.
Delta Lake + MLflow: Full Lineage
import mlflow
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
delta_table = DeltaTable.forPath(spark, TABLE_PATH)
# Get the current version before training
current_version = delta_table.history(1).first()["version"]
with mlflow.start_run(run_name="training_delta_v7"):
mlflow.log_params({
"dataset_table": TABLE_PATH,
"dataset_delta_version": current_version,
"dataset_delta_timestamp": str(delta_table.history(1).first()["timestamp"]),
})
# Load pinned version for training
df_train = (
spark.read
.format("delta")
.option("versionAsOf", current_version)
.load(TABLE_PATH)
.filter("split = 'train'")
)
# ... training ...
mlflow.log_metrics({"val_auc": val_auc})
To reproduce this run 6 months later:
# Find the run in MLflow
run = mlflow.get_run(run_id="abc123...")
dataset_version = int(run.data.params["dataset_delta_version"])
# Load the exact same data
df_train = (
spark.read
.format("delta")
.option("versionAsOf", dataset_version)
.load(TABLE_PATH)
.filter("split = 'train'")
)
# Exact same data as the original run - guaranteed
Apache Iceberg: The Alternative
Apache Iceberg was developed at Netflix and donated to the Apache Software Foundation. It solves the same problems as Delta Lake but with:
- Better vendor neutrality: supported by Trino, Presto, Flink, Spark, Hive, and more
- Hidden partitioning: Iceberg partitions data transparently without requiring partition column syntax in queries
- Partition evolution: change partitioning schemes without rewriting data
- Row-level deletes: more efficient than Delta Lake's file-rewrite approach for large tables with sparse deletes
# Iceberg with Spark
spark.sql("""
CREATE TABLE ml_catalog.features.user_features (
user_id STRING NOT NULL,
session_count_30d INT,
purchase_amount_90d DOUBLE,
user_segment STRING,
last_active_date DATE,
feature_version STRING
)
USING iceberg
PARTITIONED BY (days(last_active_date))
TBLPROPERTIES (
'history.expire.max-snapshot-age-ms' = '2592000000' -- 30 days
)
""")
# Time travel in Iceberg
spark.sql("""
SELECT * FROM ml_catalog.features.user_features
VERSION AS OF 7
""")
spark.sql("""
SELECT * FROM ml_catalog.features.user_features
TIMESTAMP AS OF '2024-10-01 00:00:00'
""")
# Row-level delete (more efficient than Delta for sparse deletes)
spark.sql("""
DELETE FROM ml_catalog.features.user_features
WHERE user_id = 'gdpr_erased_user_123'
""")
Delta Lake vs. Iceberg at a Glance
| Feature | Delta Lake | Apache Iceberg |
|---|---|---|
| ACID transactions | Yes | Yes |
| Time travel | Yes | Yes |
| Schema evolution | Yes | Yes |
| Vendor neutrality | Databricks-first | Broad (Apache) |
| Hidden partitioning | No | Yes |
| Partition evolution | Limited | Full |
| Row-level deletes | File rewrite | Positional deletes |
| Best ecosystem | Databricks/Spark | Trino/Presto/Flink |
| ML tooling | MLflow integration | OpenLineage |
For Databricks-centric teams: use Delta Lake. For heterogeneous compute environments (Spark + Trino + Flink): use Iceberg.
Common Mistakes
:::danger Not Pinning Delta Versions in Training Jobs
Reading a Delta table without specifying a version reads the current version. If the table is updated between two training runs (even by a background job), the two runs use different data despite having the same table path in their config. Always pin: .option("versionAsOf", N).
:::
:::danger Running VACUUM With Short Retention on ML Tables Vacuuming with a 7-day retention on a table used for weekly retraining means you can only reproduce runs from the last week. Set retention to cover your full reproducibility window - typically 90 days for most ML teams. :::
:::warning Not Running OPTIMIZE Before Large Training Scans A table with 50,000 small Parquet files (common after heavy MERGE traffic) scans 10-100x slower than the same data in 50 optimized files. Run OPTIMIZE on any table that feeds a training job that does a full-table scan. The performance impact on training time can be dramatic. :::
:::warning Applying Schema Changes Without Downstream Testing Renaming a column in a Delta table instantly breaks any pipeline that references the old column name. Always test schema changes against staging copies of downstream pipelines before applying to production tables. :::
Interview Q&A
Q: What is Delta Lake's transaction log and how does it enable time travel?
A: Delta Lake maintains a directory called _delta_log/ in the table path. Each commit to the table appends a new JSON file to this directory, describing what files were added, what files were removed, and metadata about the operation. To read the table at version , Delta reads the log from the last checkpoint (a Parquet file that compacts the log for performance) up to version , reconstructing which Parquet data files were active at that version. Time travel is purely a metadata operation - the actual data files are never deleted (until VACUUM). This makes reading historical versions as fast as reading the current version.
Q: How would you design a feature store using Delta Lake for an ML team?
A: Use a single Delta table per feature group (e.g., user_behavioral_features, item_content_features). Use MERGE for incremental updates - the feature store always has the latest features but maintains full history for point-in-time queries. Partition by date to enable efficient range scans. Add Z-ordering on entity IDs (user_id, item_id) for fast lookup by entity. Log the Delta version number used for each training run in MLflow. Enable 90-day retention with VACUUM. Add a monitoring job that computes feature distribution statistics at each version and alerts when distributions shift significantly.
Q: What is the difference between Delta Lake's schema enforcement and schema evolution?
A: Schema enforcement is the default: any write that violates the current table schema (wrong column types, missing required columns, unknown columns) raises an error immediately. This protects ML pipelines from silent data corruption. Schema evolution is opt-in: by adding option("mergeSchema", "true"), a write that introduces new columns adds them to the table schema rather than failing. Existing rows have null for the new column. The table history records the schema at every version, so you can always inspect what the schema was at the time of any historical version.
Q: When would you choose Iceberg over Delta Lake for an ML data platform?
A: Choose Iceberg when: (1) your compute environment is heterogeneous - you use both Spark for batch feature engineering and Trino or Presto for ad-hoc analysis (Iceberg has first-class support for all three; Delta Lake's non-Spark support is secondary); (2) you need hidden partitioning - Iceberg abstracts partition columns from query writers, preventing accidental full-table scans when engineers forget partition predicates; (3) you need efficient row-level deletes for GDPR erasure at scale - Iceberg's positional delete mechanism is more efficient than Delta's file-rewrite approach for sparse deletes on large tables; (4) vendor lock-in is a concern and you want to avoid Databricks dependency.
Q: How does Delta Lake handle concurrent writes from multiple training jobs?
A: Delta Lake uses optimistic concurrency control. Each transaction reads the current version of the table, performs its writes to new Parquet files, and then attempts to commit a new log entry. If another transaction committed in the interim, Delta checks whether the conflicts are resolvable: if the concurrent commits touched different partitions, both succeed (isolation level: serializable for overlapping writes, snapshot isolation for non-overlapping). If the same partition was modified by both, one commit succeeds and the other fails with a ConcurrentModificationException. The failed transaction must retry. For ML training jobs that only read the table, there are no conflicts - they always see a consistent snapshot.
