Skip to main content

Delta Lake and Iceberg for ML

The Silent Schema Change

It is 2:30 AM. Your retraining pipeline has been running for 6 hours. It is scheduled to finish by morning, load the new model into staging, and run automated evaluations before the 9 AM team review.

At 2:47 AM, the pipeline fails with a cryptic error:

pyspark.sql.utils.AnalysisException: Cannot resolve column name `user_segment_v2`
in schema [user_id, user_segment, purchase_history, session_count, ...]

The upstream data team renamed user_segment_v2 to user_segment two days ago as part of a "cleanup" migration. They sent a Slack message to #data-platform at 4 PM on a Tuesday. Your ML team was not in that channel.

Now it is 3 AM. You have no model ready for the 9 AM review. The previous model is still serving, but it was supposed to be replaced. And worse: the old column name user_segment_v2 no longer exists in the live table. You cannot even roll back to a previous run of the pipeline - the data it would consume is gone.

Delta Lake solves two of these three problems: it would have given you time travel to access the table state before the migration, and its schema enforcement would have caught the mismatch immediately rather than 6 hours in. The third problem - notification - is solved by data contracts (next lesson).


:::tip 🎮 Interactive Playground Visualize this concept: Try the Stream Processing Pipeline demo on the EngineersOfAI Playground - no code required. :::

What Is Delta Lake

Delta Lake is an open-source storage layer developed by Databricks (2019) that brings ACID transactions to data lakes. It sits on top of cloud object storage (S3, GCS, Azure Blob) and adds:

  • ACID transactions: reads see a consistent snapshot; concurrent writes do not corrupt data
  • Time travel: query any historical version of a table via timestamp or version number
  • Schema enforcement: writes that violate the current schema fail fast
  • Schema evolution: controlled, opt-in schema changes with full history
  • Scalable metadata: handles tables with billions of rows and millions of files

Delta Lake is not a database - it is a table format. The data is stored in Parquet files in your object store. Delta adds a transaction log (_delta_log/) that records every change to the table.

Apache Iceberg is an alternative table format with similar capabilities, preferred in non-Databricks environments. We cover both.


Delta Lake Architecture

Each entry in the _delta_log/ is a JSON file describing one transaction: which files were added, which were removed, and the metadata of the change. To read the table at version NN, Delta replays the log from the last checkpoint up to version NN to determine which files are active.


Setting Up Delta Lake

pip install delta-spark pyspark

# or with Databricks runtime (already included)
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# Build Spark session with Delta Lake extensions
builder = (
SparkSession.builder
.appName("ML Data Pipeline")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Writing Delta Tables for ML

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("ml-features").getOrCreate()

# Define the schema explicitly (enforced on write)
feature_schema = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("session_count_30d", IntegerType(), nullable=True),
StructField("purchase_amount_90d", DoubleType(), nullable=True),
StructField("user_segment", StringType(), nullable=True),
StructField("last_active_date", DateType(), nullable=True),
StructField("feature_version", StringType(), nullable=False),
StructField("created_at", TimestampType(), nullable=False),
])

# Write features to a Delta table
df_features = compute_features(df_raw)

(
df_features
.write
.format("delta")
.mode("overwrite") # or "append"
.option("overwriteSchema", "false") # REFUSE schema changes on overwrite
.partitionBy("last_active_date") # partition for query efficiency
.save("s3://ml-data/delta/user_features/")
)

# Read back
df = spark.read.format("delta").load("s3://ml-data/delta/user_features/")
print(f"Table has {df.count():,} records")

MERGE (Upsert) - Critical for Feature Stores

The MERGE operation is one of Delta Lake's most powerful capabilities. It lets you upsert records (update if exists, insert if not) atomically.

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "s3://ml-data/delta/user_features/")

# New batch of features computed today
new_features = compute_daily_features(df_raw_today)

# Upsert: update existing users, insert new users
(
delta_table.alias("existing")
.merge(
new_features.alias("new"),
"existing.user_id = new.user_id",
)
.whenMatchedUpdate(set={
"session_count_30d": "new.session_count_30d",
"purchase_amount_90d": "new.purchase_amount_90d",
"user_segment": "new.user_segment",
"last_active_date": "new.last_active_date",
"created_at": "new.created_at",
# Keep feature_version unchanged unless explicitly updating it
})
.whenNotMatchedInsert(values={
"user_id": "new.user_id",
"session_count_30d": "new.session_count_30d",
"purchase_amount_90d": "new.purchase_amount_90d",
"user_segment": "new.user_segment",
"last_active_date": "new.last_active_date",
"feature_version": "new.feature_version",
"created_at": "new.created_at",
})
.execute()
)

Time Travel: The Core ML Use Case

Time travel is Delta Lake's most important feature for ML teams. It lets you query a table as it existed at any past version or timestamp.

Query by Version Number

# Read the table as it was at version 5
df_v5 = (
spark.read
.format("delta")
.option("versionAsOf", 5)
.load("s3://ml-data/delta/user_features/")
)

# Read as it was two weeks ago
from datetime import datetime, timedelta

two_weeks_ago = (datetime.now() - timedelta(weeks=2)).isoformat()
df_historical = (
spark.read
.format("delta")
.option("timestampAsOf", two_weeks_ago)
.load("s3://ml-data/delta/user_features/")
)

Inspect the Table History

delta_table = DeltaTable.forPath(spark, "s3://ml-data/delta/user_features/")

# Show all historical versions
history = delta_table.history()
history.select(
"version", "timestamp", "operation", "operationParameters",
"operationMetrics.numAddedFiles", "operationMetrics.numRemovedFiles",
"userMetadata", # custom metadata you can attach to each write
).show(truncate=False)
+-------+-------------------+-----------+--------------------------+
|version|timestamp |operation |numAddedFiles|numRemovedFiles|
+-------+-------------------+-----------+-------------+---------------+
|7 |2024-10-15 09:23:11|MERGE |3 |2 |
|6 |2024-10-14 18:45:02|OPTIMIZE |1 |12 |
|5 |2024-10-13 09:01:33|WRITE |4 |0 |
|4 |2024-10-12 09:00:51|WRITE |4 |0 |
...

Using Time Travel for ML Reproducibility

The critical use case: when you log dataset_version=7 in your experiment tracker, you can always reconstruct the exact training data:

import mlflow

def load_training_data(
table_path: str,
version: int | None = None,
timestamp: str | None = None,
) -> "DataFrame":
"""Load training data at a specific version."""
reader = spark.read.format("delta")

if version is not None:
reader = reader.option("versionAsOf", version)
version_label = f"v{version}"
elif timestamp is not None:
reader = reader.option("timestampAsOf", timestamp)
version_label = timestamp
else:
raise ValueError("Must specify version or timestamp")

df = reader.load(table_path)

# Log the exact version being used
mlflow.log_params({
"dataset_path": table_path,
"dataset_version": version_label,
"dataset_record_count": df.count(),
})

return df

# In training run: pin to version 7
df_train = load_training_data(
"s3://ml-data/delta/user_features/",
version=7,
)

Schema Evolution

Delta Lake supports controlled schema evolution. By default, writing data with a different schema raises an error - this is schema enforcement, and it is the right default. Schema evolution is opt-in.

Schema Evolution Options

# Option 1: Merge schema - adds new columns, keeps old ones
(
df_with_new_columns
.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") # adds new_column to the schema
.save("s3://ml-data/delta/user_features/")
)

# Option 2: Overwrite schema - replaces the schema entirely (dangerous)
(
df_different_schema
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true") # replaces schema - loses type safety
.save("s3://ml-data/delta/user_features/")
)

# Option 3: Explicit DDL (SQL) - recommended for planned schema changes
spark.sql("""
ALTER TABLE delta.`s3://ml-data/delta/user_features/`
ADD COLUMNS (new_feature_score DOUBLE COMMENT 'model score v3 feature')
""")

# Option 4: Rename a column (requires column mapping mode)
spark.sql("""
ALTER TABLE delta.`s3://ml-data/delta/user_features/`
RENAME COLUMN user_segment_v2 TO user_segment
""")

The Key Rule for ML Teams

After every schema change, run your training pipeline in a staging environment before promoting to production. Never apply schema changes to tables that live production models consume without testing the downstream pipelines.


OPTIMIZE and Z-Ordering

Delta Lake accumulates many small Parquet files over time (from many MERGE/APPEND operations). These hurt query performance. OPTIMIZE compacts them. Z-ORDER clusters related records together for faster range queries.

# Compact small files into larger ones
delta_table.optimize().executeCompaction()

# Z-order by the columns most commonly used in WHERE clauses
# Z-ordering co-locates rows with similar values in the same files
delta_table.optimize().executeZOrderBy("user_id", "last_active_date")

# After Z-ordering, queries like:
# WHERE user_id = 'u123' AND last_active_date > '2024-09-01'
# will skip irrelevant files (data skipping), dramatically reducing scan cost

When to Run OPTIMIZE

Run OPTIMIZE: after large MERGE operations that create many small files, before training jobs that do full-table scans, and on a weekly schedule for actively written tables.


VACUUM: Managing Storage Costs

Delta Lake's time travel retains old data files. VACUUM deletes files older than the retention threshold.

# Default retention: 7 days (168 hours)
# After vacuum, you cannot time-travel before the vacuum timestamp
delta_table.vacuum()

# Custom retention (be careful - removes time travel ability)
delta_table.vacuum(retentionHours=336) # 14 days

# Show what would be deleted (dry run - useful before committing)
delta_table.vacuum(retentionHours=168, dryRun=True).show()
warning

For ML tables, retain at least as long as your longest training run plus a buffer. If training takes 12 hours and you want to be able to reproduce any run from the last 30 days, set retention to at least 30 days (720 hours), not the default 7 days.


Delta Lake + MLflow: Full Lineage

import mlflow
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
delta_table = DeltaTable.forPath(spark, TABLE_PATH)

# Get the current version before training
current_version = delta_table.history(1).first()["version"]

with mlflow.start_run(run_name="training_delta_v7"):
mlflow.log_params({
"dataset_table": TABLE_PATH,
"dataset_delta_version": current_version,
"dataset_delta_timestamp": str(delta_table.history(1).first()["timestamp"]),
})

# Load pinned version for training
df_train = (
spark.read
.format("delta")
.option("versionAsOf", current_version)
.load(TABLE_PATH)
.filter("split = 'train'")
)

# ... training ...
mlflow.log_metrics({"val_auc": val_auc})

To reproduce this run 6 months later:

# Find the run in MLflow
run = mlflow.get_run(run_id="abc123...")
dataset_version = int(run.data.params["dataset_delta_version"])

# Load the exact same data
df_train = (
spark.read
.format("delta")
.option("versionAsOf", dataset_version)
.load(TABLE_PATH)
.filter("split = 'train'")
)
# Exact same data as the original run - guaranteed

Apache Iceberg: The Alternative

Apache Iceberg was developed at Netflix and donated to the Apache Software Foundation. It solves the same problems as Delta Lake but with:

  • Better vendor neutrality: supported by Trino, Presto, Flink, Spark, Hive, and more
  • Hidden partitioning: Iceberg partitions data transparently without requiring partition column syntax in queries
  • Partition evolution: change partitioning schemes without rewriting data
  • Row-level deletes: more efficient than Delta Lake's file-rewrite approach for large tables with sparse deletes
# Iceberg with Spark
spark.sql("""
CREATE TABLE ml_catalog.features.user_features (
user_id STRING NOT NULL,
session_count_30d INT,
purchase_amount_90d DOUBLE,
user_segment STRING,
last_active_date DATE,
feature_version STRING
)
USING iceberg
PARTITIONED BY (days(last_active_date))
TBLPROPERTIES (
'history.expire.max-snapshot-age-ms' = '2592000000' -- 30 days
)
""")

# Time travel in Iceberg
spark.sql("""
SELECT * FROM ml_catalog.features.user_features
VERSION AS OF 7
""")

spark.sql("""
SELECT * FROM ml_catalog.features.user_features
TIMESTAMP AS OF '2024-10-01 00:00:00'
""")

# Row-level delete (more efficient than Delta for sparse deletes)
spark.sql("""
DELETE FROM ml_catalog.features.user_features
WHERE user_id = 'gdpr_erased_user_123'
""")

Delta Lake vs. Iceberg at a Glance

FeatureDelta LakeApache Iceberg
ACID transactionsYesYes
Time travelYesYes
Schema evolutionYesYes
Vendor neutralityDatabricks-firstBroad (Apache)
Hidden partitioningNoYes
Partition evolutionLimitedFull
Row-level deletesFile rewritePositional deletes
Best ecosystemDatabricks/SparkTrino/Presto/Flink
ML toolingMLflow integrationOpenLineage

For Databricks-centric teams: use Delta Lake. For heterogeneous compute environments (Spark + Trino + Flink): use Iceberg.


Common Mistakes

:::danger Not Pinning Delta Versions in Training Jobs Reading a Delta table without specifying a version reads the current version. If the table is updated between two training runs (even by a background job), the two runs use different data despite having the same table path in their config. Always pin: .option("versionAsOf", N). :::

:::danger Running VACUUM With Short Retention on ML Tables Vacuuming with a 7-day retention on a table used for weekly retraining means you can only reproduce runs from the last week. Set retention to cover your full reproducibility window - typically 90 days for most ML teams. :::

:::warning Not Running OPTIMIZE Before Large Training Scans A table with 50,000 small Parquet files (common after heavy MERGE traffic) scans 10-100x slower than the same data in 50 optimized files. Run OPTIMIZE on any table that feeds a training job that does a full-table scan. The performance impact on training time can be dramatic. :::

:::warning Applying Schema Changes Without Downstream Testing Renaming a column in a Delta table instantly breaks any pipeline that references the old column name. Always test schema changes against staging copies of downstream pipelines before applying to production tables. :::


Interview Q&A

Q: What is Delta Lake's transaction log and how does it enable time travel?

A: Delta Lake maintains a directory called _delta_log/ in the table path. Each commit to the table appends a new JSON file to this directory, describing what files were added, what files were removed, and metadata about the operation. To read the table at version NN, Delta reads the log from the last checkpoint (a Parquet file that compacts the log for performance) up to version NN, reconstructing which Parquet data files were active at that version. Time travel is purely a metadata operation - the actual data files are never deleted (until VACUUM). This makes reading historical versions as fast as reading the current version.

Q: How would you design a feature store using Delta Lake for an ML team?

A: Use a single Delta table per feature group (e.g., user_behavioral_features, item_content_features). Use MERGE for incremental updates - the feature store always has the latest features but maintains full history for point-in-time queries. Partition by date to enable efficient range scans. Add Z-ordering on entity IDs (user_id, item_id) for fast lookup by entity. Log the Delta version number used for each training run in MLflow. Enable 90-day retention with VACUUM. Add a monitoring job that computes feature distribution statistics at each version and alerts when distributions shift significantly.

Q: What is the difference between Delta Lake's schema enforcement and schema evolution?

A: Schema enforcement is the default: any write that violates the current table schema (wrong column types, missing required columns, unknown columns) raises an error immediately. This protects ML pipelines from silent data corruption. Schema evolution is opt-in: by adding option("mergeSchema", "true"), a write that introduces new columns adds them to the table schema rather than failing. Existing rows have null for the new column. The table history records the schema at every version, so you can always inspect what the schema was at the time of any historical version.

Q: When would you choose Iceberg over Delta Lake for an ML data platform?

A: Choose Iceberg when: (1) your compute environment is heterogeneous - you use both Spark for batch feature engineering and Trino or Presto for ad-hoc analysis (Iceberg has first-class support for all three; Delta Lake's non-Spark support is secondary); (2) you need hidden partitioning - Iceberg abstracts partition columns from query writers, preventing accidental full-table scans when engineers forget partition predicates; (3) you need efficient row-level deletes for GDPR erasure at scale - Iceberg's positional delete mechanism is more efficient than Delta's file-rewrite approach for sparse deletes on large tables; (4) vendor lock-in is a concern and you want to avoid Databricks dependency.

Q: How does Delta Lake handle concurrent writes from multiple training jobs?

A: Delta Lake uses optimistic concurrency control. Each transaction reads the current version of the table, performs its writes to new Parquet files, and then attempts to commit a new log entry. If another transaction committed in the interim, Delta checks whether the conflicts are resolvable: if the concurrent commits touched different partitions, both succeed (isolation level: serializable for overlapping writes, snapshot isolation for non-overlapping). If the same partition was modified by both, one commit succeeds and the other fails with a ConcurrentModificationException. The failed transaction must retry. For ML training jobs that only read the table, there are no conflicts - they always see a consistent snapshot.

© 2026 EngineersOfAI. All rights reserved.