Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::

Data Versioning with Delta Lake

The Production Moment

The compliance audit email arrives on a Tuesday at 2 PM. The subject line: "Model Performance Review - Training Data Audit Required."

The ML team's recommendation model has been live for 11 months, serving 8 million users. The audit committee needs to verify that the model version currently in production (v47) was trained on the same data that was validated by the data quality team six weeks ago. Standard procedure for financial services models.

The engineer assigned to the audit pulls up the MLflow experiment logs. Model v47 was trained on September 23rd. The training job read features from s3://ml-features/user_features/. She types the S3 path into her terminal and checks the current state of the bucket.

The data is there. But there is no record of what the data looked like on September 23rd. The feature engineering pipeline has run 42 times since then - users have been added, features have been updated, some records have been corrected. The S3 bucket contains the current state of the world, not the state from six weeks ago.

The audit cannot be completed. The question "was this model trained on validated data?" cannot be answered. Model v47 must be pulled from production until the team can establish a chain of custody for the training data.

The root cause: the team was writing to Parquet files on S3 with no versioning. Every write overwrote or appended data without creating any record of what changed, when, or what the data looked like before. They had logging for code (git), logging for models (MLflow), but no versioning for the data that fed those models.

Delta Lake solves this problem. It adds a transaction log to object storage - a complete, immutable record of every write operation, enabling exact reproduction of any historical table state.

Why Plain Parquet Falls Short

Before Delta Lake, the standard pattern was Parquet files on S3. This worked well for read-heavy analytics but failed for ML data pipelines in three specific ways:

Problem 1: No atomicity. A Spark job writing 500 files to S3 is not atomic. If the job fails after writing 300 files, the table is now in a corrupt mixed state - half old data, half new data, no way to tell which is which. Downstream jobs reading the table get inconsistent results.

Problem 2: No history. Once you overwrite a partition, the old data is gone. You cannot query "what did this table look like on September 23rd?" You cannot reproduce a training dataset after the underlying data has changed.

Problem 3: No concurrent write safety. If two Spark jobs try to write to the same table simultaneously - the feature pipeline and the backfill job, for example - they may both succeed but produce corrupted output. Parquet has no mechanism to coordinate concurrent writers.

Delta Lake addresses all three by adding a transaction log: a directory (_delta_log/) in the same S3 path that records every table operation as an atomic JSON commit.

Delta Lake Architecture

Delta Lake's design is elegant: it stores data as standard Parquet files and tracks operations in a sequential transaction log.

Each commit file in _delta_log/ is a JSON document that records the exact files added and removed:

{
"add": {
"path": "part-00001-abc123.parquet",
"size": 134217728,
"modificationTime": 1727136000000,
"stats": "{\"numRecords\":1200000,\"minValues\":{\"user_id\":\"u0001\",\"feature_date\":\"2024-09-23\"},\"maxValues\":{\"user_id\":\"u9999\",\"feature_date\":\"2024-09-23\"}}"
}
}
{
"remove": {
"path": "part-00001-old789.parquet",
"deletionTimestamp": 1727136000000
}
}

When you read a Delta table, the engine replays the transaction log from the beginning (or from the most recent checkpoint) to determine which files are part of the current table version. When you request a historical version, it replays only the operations up to that point.

ACID Transactions on Object Storage

Setting up a Spark session with Delta Lake support:

from delta import DeltaTable, configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

builder = SparkSession.builder \
.appName("delta_ml_features") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog"
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

The most important write pattern is replaceWhere - it atomically replaces a specific partition while leaving all other partitions untouched:

def write_daily_features(
features_df,
delta_path: str,
partition_date: str,
):
"""
Write feature data for a specific date to Delta Lake.

Key properties:
- Atomic: either all files for this date are replaced, or none are
- Idempotent: running twice produces the same result (safe for retries)
- Concurrent-safe: other readers see a consistent state throughout
"""
features_df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"feature_date = '{partition_date}'") \
.save(delta_path)

# The transaction log now has one new commit entry:
# - "add" for each new Parquet file
# - "remove" for each old Parquet file for that date
# This is atomic: readers see either the old data or the new data, never both


def append_new_events(events_df, delta_path: str):
"""
Append new event records atomically.
Safe to run concurrently with other readers or writers.
"""
events_df.write \
.format("delta") \
.mode("append") \
.save(delta_path)

Time Travel: Querying Any Historical Version

Time travel is Delta Lake's most powerful feature for ML reproducibility. Every commit is assigned a monotonically increasing version number, and every version is queryable by number or timestamp.

def read_features_at_timestamp(delta_path: str, as_of_timestamp: str):
"""
Read feature data as it existed at a specific point in time.

Use cases:
- Reproduce a training dataset exactly
- Debug a model regression (what did the data look like when it broke?)
- Audit: verify what data was used in production on a specific date
- Point-in-time feature retrieval for backtesting

as_of_timestamp: ISO 8601 string, e.g. "2024-09-23 14:30:00"
"""
return spark.read \
.format("delta") \
.option("timestampAsOf", as_of_timestamp) \
.load(delta_path)


def read_features_at_version(delta_path: str, version: int):
"""
Read feature data at an exact table version number.
More precise than timestamp -- no ambiguity about which commit.
"""
return spark.read \
.format("delta") \
.option("versionAsOf", version) \
.load(delta_path)


def inspect_table_history(delta_path: str, limit: int = 20):
"""
View the complete history of a Delta table.
Shows: version, timestamp, operation type, user, number of files added/removed.
Essential for audit trails and debugging.
"""
delta_table = DeltaTable.forPath(spark, delta_path)
delta_table.history(limit).select(
"version",
"timestamp",
"operation",
"operationParameters",
"operationMetrics",
"userMetadata",
).show(truncate=False)


# SQL time travel syntax (often more readable)
spark.sql("""
SELECT * FROM delta.`s3://ml-features/user_features/`
TIMESTAMP AS OF '2024-09-23 14:30:00'
WHERE feature_date BETWEEN '2024-08-24' AND '2024-09-23'
""")

# By version:
spark.sql("""
SELECT COUNT(*), feature_date
FROM delta.`s3://ml-features/user_features/`
VERSION AS OF 42
GROUP BY feature_date
ORDER BY feature_date
""")

Creating a Versioned Training Dataset

Here is the full pattern for creating a reproducible training dataset that can be exactly reproduced months later:

import mlflow
from datetime import datetime
from dataclasses import dataclass

@dataclass
class TrainingDataset:
delta_path: str
delta_version: int
timestamp: str
num_rows: int
feature_date_range: tuple
feature_columns: list


def create_versioned_training_dataset(
feature_delta_path: str,
label_delta_path: str,
training_window_days: int = 90,
as_of_date: str = None,
) -> TrainingDataset:
"""
Create a training dataset and record the exact Delta table versions used.
Returns a TrainingDataset object that can be used to reproduce this
exact dataset at any future point.

as_of_date: optional -- use this to create a point-in-time snapshot.
If None, uses current state of the tables.
"""
if as_of_date is None:
as_of_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

# Read features at the exact point in time
features_df = spark.read \
.format("delta") \
.option("timestampAsOf", as_of_date) \
.load(feature_delta_path) \
.filter(F.col("feature_date") >= F.date_sub(F.lit(as_of_date[:10]), training_window_days))

labels_df = spark.read \
.format("delta") \
.option("timestampAsOf", as_of_date) \
.load(label_delta_path)

training_data = features_df.join(labels_df, on="user_id", how="inner")

# Record the exact version numbers for reproducibility
feature_version = (
DeltaTable.forPath(spark, feature_delta_path)
.history(1)
.collect()[0]["version"]
)
label_version = (
DeltaTable.forPath(spark, label_delta_path)
.history(1)
.collect()[0]["version"]
)

dataset_info = TrainingDataset(
delta_path=feature_delta_path,
delta_version=feature_version,
timestamp=as_of_date,
num_rows=training_data.count(),
feature_date_range=(
training_data.agg(F.min("feature_date")).collect()[0][0],
training_data.agg(F.max("feature_date")).collect()[0][0],
),
feature_columns=features_df.columns,
)

return training_data, dataset_info


def reproduce_training_run(
feature_delta_path: str,
feature_version: int,
label_delta_path: str,
label_version: int,
) -> "DataFrame":
"""
Reproduce the exact training dataset used in a specific past run.
Given version numbers from MLflow experiment metadata, returns
a DataFrame identical to what was used at training time.
"""
features_df = spark.read \
.format("delta") \
.option("versionAsOf", feature_version) \
.load(feature_delta_path)

labels_df = spark.read \
.format("delta") \
.option("versionAsOf", label_version) \
.load(label_delta_path)

return features_df.join(labels_df, on="user_id", how="inner")

MLflow Integration for Full Reproducibility

Record Delta table version numbers as MLflow run parameters so model artifacts are permanently linked to the exact data version used to train them:

def train_with_versioned_data(
feature_delta_path: str,
label_delta_path: str,
model_params: dict,
):
"""
Train a model and record data provenance in MLflow.
Enables exact data reproduction from any past model version.
"""
with mlflow.start_run() as run:
# Create versioned training dataset
training_data, dataset_info = create_versioned_training_dataset(
feature_delta_path=feature_delta_path,
label_delta_path=label_delta_path,
)

# Log data provenance (the critical step)
mlflow.log_params({
"feature_delta_path": feature_delta_path,
"feature_delta_version": dataset_info.delta_version,
"feature_delta_timestamp": dataset_info.timestamp,
"num_training_rows": dataset_info.num_rows,
"feature_date_start": str(dataset_info.feature_date_range[0]),
"feature_date_end": str(dataset_info.feature_date_range[1]),
})
mlflow.log_params(model_params)

# Train model
import lightgbm as lgb
X = training_data.select(dataset_info.feature_columns).toPandas()
y = training_data.select("label").toPandas()

model = lgb.LGBMClassifier(**model_params)
model.fit(X, y)

# Log model
mlflow.lightgbm.log_model(model, artifact_path="model")

return run.info.run_id, model


# To reproduce: load version from MLflow and replay
def reproduce_from_mlflow_run(run_id: str) -> "DataFrame":
"""
Given an MLflow run ID, reproduce the exact training dataset.
Works months or years later as long as VACUUM hasn't deleted the old files.
"""
run = mlflow.get_run(run_id)
params = run.data.params

return reproduce_training_run(
feature_delta_path=params["feature_delta_path"],
feature_version=int(params["feature_delta_version"]),
label_delta_path=params.get("label_delta_path", ""),
label_version=int(params.get("label_delta_version", 0)),
)

Schema Evolution

ML feature schemas evolve as models are updated. New features are added, old features are deprecated, data types change. Delta Lake handles schema changes safely.

# Scenario: adding a new feature column to an existing Delta table

# Original schema (v1 features)
original_features = spark.createDataFrame([
("user_001", 5, 2.3, "2024-09-01"),
("user_002", 12, 1.1, "2024-09-01"),
], ["user_id", "purchase_count_30d", "avg_session_duration", "feature_date"])

original_features.write.format("delta").save("s3://ml-features/user_features/")


# 6 weeks later: adding click_through_rate as a new feature
# New data has the new column; historical data does not

def safe_add_feature_column(
new_features_df,
delta_path: str,
new_column: str,
partition_date: str,
):
"""
Add a new column to a Delta table without breaking existing readers.
Old partitions will have null for the new column -- handle this in model training.
"""
new_features_df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"feature_date = '{partition_date}'") \
.option("mergeSchema", "true") \
.save(delta_path)

print(f"Added column '{new_column}' to schema.")
print("Historical partitions will have null for this column.")
print("Update training code to handle nulls: fill with median or 0.")


# Alternative: enable schema auto-merge globally for the session
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")


# Schema evolution options:
# 1. mergeSchema=true -- adds new columns, preserves existing ones (safe, backward compatible)
# 2. overwriteSchema=true -- replaces entire schema (destructive -- use carefully)

def force_schema_change_dangerous(df, delta_path: str):
"""
WARNING: This replaces the entire schema. Existing columns not in df will be lost.
Only use for table migrations, never for incremental updates.
"""
df.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save(delta_path)

DML Operations: UPDATE, DELETE, MERGE

Before Delta Lake, data lakes were append-only. Once written, data could not be corrected without rewriting entire partitions. Delta Lake brings full DML capabilities to object storage.

UPDATE

def correct_feature_values(
delta_path: str,
buggy_feature: str,
correction_factor: float,
affected_date_range: tuple,
):
"""
Correct a known data quality issue by updating specific column values.
This generates a new table version -- time travel still works on the pre-correction data.
"""
delta_table = DeltaTable.forPath(spark, delta_path)

delta_table.update(
condition=F.col("feature_date").between(*affected_date_range),
set={buggy_feature: F.col(buggy_feature) * correction_factor}
)

# SQL equivalent:
spark.sql(f"""
UPDATE delta.`{delta_path}`
SET {buggy_feature} = {buggy_feature} * {correction_factor}
WHERE feature_date BETWEEN '{affected_date_range[0]}' AND '{affected_date_range[1]}'
""")

DELETE (GDPR Right to Erasure)

def delete_user_data_for_gdpr(
delta_path: str,
user_id: str,
request_id: str,
):
"""
Delete all records for a user from a Delta table.
GDPR Article 17: right to erasure.

IMPORTANT: This marks files as removed in the transaction log,
but the underlying Parquet files still exist on disk until VACUUM runs.
Schedule VACUUM after bulk deletions to complete the erasure.
"""
delta_table = DeltaTable.forPath(spark, delta_path)

# Count records before deletion (for audit log)
records_before = delta_table.toDF().filter(
F.col("user_id") == user_id
).count()

# Delete
delta_table.delete(condition=F.col("user_id") == user_id)

records_after = delta_table.toDF().filter(
F.col("user_id") == user_id
).count()

return {
"request_id": request_id,
"user_id": user_id,
"records_deleted": records_before - records_after,
"status": "logical_delete_complete",
"physical_delete_status": "pending_vacuum",
"note": "Run VACUUM to physically remove Parquet files containing deleted records",
}

MERGE (Upsert)

MERGE is the most powerful DML operation - it handles insert, update, and delete in a single atomic operation:

def upsert_user_features(
new_features_df,
delta_path: str,
merge_key: str = "user_id",
):
"""
MERGE: update existing user records, insert new users.
Used for incremental feature pipeline updates.

Equivalent to: for each row in new_features_df:
if user_id exists in delta table: UPDATE the row
else: INSERT the row
The entire operation is atomic.
"""
delta_table = DeltaTable.forPath(spark, delta_path)

delta_table.alias("existing") \
.merge(
new_features_df.alias("new"),
f"existing.{merge_key} = new.{merge_key}"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()


def selective_upsert_with_deletes(
new_features_df,
deleted_user_ids_df,
delta_path: str,
):
"""
Advanced MERGE: update existing, insert new, delete removed users.
Three-way operation in a single atomic commit.
"""
delta_table = DeltaTable.forPath(spark, delta_path)

delta_table.alias("existing") \
.merge(
new_features_df.alias("new"),
"existing.user_id = new.user_id"
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.whenNotMatchedBySourceDelete(
condition=F.col("existing.user_id").isin(
[row.user_id for row in deleted_user_ids_df.collect()]
)
) \
.execute()

Optimistic Concurrency

Delta Lake uses optimistic concurrency control - multiple writers can proceed in parallel, and the transaction log arbitrates conflicts at commit time:

Delta Lake checks whether the files a writer read are still the current files at commit time. If another writer committed in between, the conflicting writer retries. For non-overlapping partitions (Writer 1 writes date A, Writer 2 writes date B), both succeed without conflict - this is the common case for daily feature pipelines.

OPTIMIZE and VACUUM

OPTIMIZE: Compaction and Z-Ordering

Streaming pipelines and frequent small writes produce many small Parquet files. Small files hurt query performance because each file requires a separate S3 API call and read operation. OPTIMIZE consolidates small files:

def run_table_optimization(
delta_path: str,
z_order_columns: list,
recent_days: int = 7,
):
"""
Compact small files and apply Z-ordering for query acceleration.

Z-ordering physically colocates data with similar column values in the
same Parquet files. A query filtering on a Z-ordered column reads far
fewer files because Delta can use per-file min/max statistics to skip
files that don't contain matching values.

Only OPTIMIZE recent partitions during daily maintenance.
Historical partitions were already optimized and rarely change.
"""
delta_table = DeltaTable.forPath(spark, delta_path)

# Compact recent partitions only (cost-effective)
delta_table.optimize() \
.where(f"feature_date >= date_sub(current_date(), {recent_days})") \
.executeZOrderBy(*z_order_columns)


def benchmark_z_order_impact(delta_path: str, filter_user_id: str) -> dict:
"""Compare query performance before and after Z-ordering."""
import time

# Before Z-order
start = time.time()
count_before = spark.read.format("delta").load(delta_path) \
.filter(F.col("user_id") == filter_user_id).count()
time_before = time.time() - start

# Apply Z-order
DeltaTable.forPath(spark, delta_path).optimize().executeZOrderBy("user_id")

# After Z-order
start = time.time()
count_after = spark.read.format("delta").load(delta_path) \
.filter(F.col("user_id") == filter_user_id).count()
time_after = time.time() - start

assert count_before == count_after, "Z-ordering should not change row counts"

return {
"rows": count_before,
"query_time_before_seconds": round(time_before, 1),
"query_time_after_seconds": round(time_after, 1),
"speedup_factor": round(time_before / time_after, 1),
}

VACUUM: Storage Reclamation

VACUUM removes Parquet files that are no longer referenced by any version within the retention window:

def vacuum_with_gdpr_compliance(
delta_path: str,
retention_hours: int = 168,
dry_run: bool = True,
):
"""
Remove old data files from Delta table storage.

retention_hours: keep files needed for time travel within this window.
After VACUUM, you cannot time-travel to versions older than the retention period.

Workflow for GDPR erasure:
1. Run DELETE to logically remove user records
2. Wait for any running queries to complete
3. Run VACUUM to physically remove the Parquet files containing deleted records
4. Log VACUUM completion timestamp as proof of physical erasure
"""
delta_table = DeltaTable.forPath(spark, delta_path)

if dry_run:
print("DRY RUN -- files that would be deleted:")
delta_table.vacuum(retention_hours, dry_run=True).show(truncate=False)
else:
delta_table.vacuum(retention_hours)
print(f"VACUUM complete. Time travel window: {retention_hours / 24:.0f} days")

return {
"delta_path": delta_path,
"retention_hours": retention_hours,
"time_travel_window_days": retention_hours / 24,
}


def gdpr_erasure_complete_workflow(
delta_paths: list,
user_id: str,
erasure_request_id: str,
):
"""
Complete GDPR erasure workflow: logical delete + physical delete.
Returns a signed audit record confirming physical erasure.
"""
from datetime import datetime

audit_trail = {
"request_id": erasure_request_id,
"user_id": user_id,
"tables_processed": [],
}

for delta_path in delta_paths:
# Step 1: Logical delete
delete_result = delete_user_data_for_gdpr(delta_path, user_id, erasure_request_id)

# Step 2: Physical delete (VACUUM with minimum retention for GDPR compliance)
# Disable time travel safety check to allow immediate physical deletion
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
vacuum_with_gdpr_compliance(
delta_path,
retention_hours=0,
dry_run=False,
)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")

audit_trail["tables_processed"].append({
"delta_path": delta_path,
"records_deleted": delete_result["records_deleted"],
"physical_erasure_timestamp": datetime.utcnow().isoformat(),
"vacuum_completed": True,
})

return audit_trail

Production Engineering Notes

Automated Table Maintenance

Set up a daily Airflow/Prefect task for each production Delta table:

class DeltaTableMaintainer:
"""
Automated daily maintenance for production Delta tables.
Run as a scheduled task after feature pipelines complete.
"""
def __init__(self, spark, table_config: dict):
self.spark = spark
self.table_config = table_config

def run_daily_maintenance(self, table_path: str):
import logging

config = self.table_config.get(table_path, {})
z_order_cols = config.get("z_order_columns", [])
retention_days = config.get("retention_days", 30)

delta_table = DeltaTable.forPath(self.spark, table_path)

# 1. Compact small files from recent writes
logging.info(f"Compacting {table_path}")
if z_order_cols:
delta_table.optimize() \
.where("feature_date >= date_sub(current_date(), 7)") \
.executeZOrderBy(*z_order_cols)
else:
delta_table.optimize() \
.where("feature_date >= date_sub(current_date(), 7)") \
.executeCompaction()

# 2. Remove files outside retention window
logging.info(f"Vacuuming {table_path} (retaining {retention_days} days)")
delta_table.vacuum(retentionHours=retention_days * 24)

# 3. Log current table version
current_version = delta_table.history(1).collect()[0]["version"]
logging.info(f"Maintenance complete. Current version: {current_version}")

return {"table_path": table_path, "current_version": current_version}

Setting Table-Level Retention Policies

# Set retention policy on a Delta table (survives VACUUM calls)
spark.sql("""
ALTER TABLE delta.`s3://ml-features/user_features/`
SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 90 days',
'delta.deletedFileRetentionDuration' = 'interval 90 days'
)
""")

# Verify table properties
spark.sql("""
DESCRIBE DETAIL delta.`s3://ml-features/user_features/`
""").select("format", "location", "numFiles", "properties").show(truncate=False)

:::danger VACUUM Destroys Time Travel History After VACUUM runs, you cannot time-travel to versions older than the retention window. If you rely on time travel for model reproducibility, set the VACUUM retention to cover your longest model lifecycle. If a model deployed 6 months ago needs to be reproduced for an audit, you need 6 months of retention (4380 hours). The default 7 days (168 hours) is far too short for most ML compliance requirements. Set retention based on your longest model version lifecycle, not on storage cost. :::

:::danger VACUUM After GDPR DELETE Removes Proof of Consent When you delete user data for GDPR compliance, VACUUM physically removes the Parquet files. Before running VACUUM, ensure your audit log (the record of what data the user consented to and what was deleted) is stored separately in an immutable audit store. The audit log itself should NOT be in a VACUUM-able Delta table - it should be in a write-once store like S3 with Object Lock or a dedicated audit database. :::

:::warning OPTIMIZE Rewrites Data Files OPTIMIZE with ZORDER physically rewrites the Parquet files it touches. On a large table, running full-table OPTIMIZE is expensive - proportional to table size. Always scope OPTIMIZE to recently written partitions (last 7 days). Run full-table OPTIMIZE once when setting up a new table, then only for new data. A 5 TB full-table OPTIMIZE is a 5 TB write operation. :::

:::warning Time Travel Requires Sufficient VACUUM Retention If another team or automation runs VACUUM with aggressive retention (less than 7 days), your time travel queries will fail with DeltaVersionNotFoundException. Establish a table-level retention policy as part of your data platform standards and enforce it via Delta table properties: delta.logRetentionDuration = "interval 90 days" and delta.deletedFileRetentionDuration = "interval 90 days". :::

Interview Q&A

Q1: What is Delta Lake and why is it needed for ML data pipelines? How does it differ from plain Parquet on S3?

Plain Parquet on S3 has three critical limitations for ML pipelines. First, no atomicity: if a Spark job writing 500 files fails halfway through, the table is in a corrupt mixed state with no way to recover. Second, no history: once data is overwritten, the previous state is gone. You cannot reproduce a training dataset after the underlying data has changed. Third, no safe concurrent access: two Spark jobs writing to the same table simultaneously can corrupt each other's output.

Delta Lake solves all three by adding a transaction log - a directory (_delta_log/) that records every table operation as an atomic JSON commit. Each commit records exactly which files were added and removed. Reads replay the log to determine current table state. Writes are atomic: either the full commit succeeds (all files registered in the log) or the table is unchanged. Time travel works by replaying the log up to a specified version or timestamp. Concurrent writers use optimistic concurrency - they proceed in parallel and the log arbitrates conflicts at commit time.

The practical difference: a Delta Lake table is a Parquet table where ACID properties are guaranteed, history is queryable, and corrections can be applied via UPDATE/DELETE/MERGE rather than full partition rewrites.

Q2: How does time travel work in Delta Lake, and how do you use it for reproducible ML experiments?

Every Delta Lake write creates a new version entry in the transaction log with a monotonically increasing version number and a timestamp. The log entry records which Parquet files were added and removed. To query the table at version N, the engine replays the log entries 0 through N to determine which files existed at that point, then reads only those files.

For ML reproducibility: after loading training data, read the current version number from DeltaTable.history(1) and log it to MLflow with the run metadata. When you need to reproduce the exact training dataset - for an audit, for a retrain comparison, or for debugging - use spark.read.format("delta").option("versionAsOf", N).load(path) with the logged version number. This returns exactly the same rows that were present when the model was trained, regardless of how many updates have happened since.

The constraint: time travel only works if VACUUM has not removed the files for that version. Set VACUUM retention to cover your longest model lifecycle. For financial services models with 12-month audit requirements, set retention to 365 days. For standard ML, 30-90 days is often sufficient.

Q3: Explain optimistic concurrency control in Delta Lake. What happens when two Spark jobs try to write to the same table at the same time?

Delta Lake uses optimistic concurrency: writers proceed optimistically (assuming no conflict) and check for conflicts only at commit time. Each writer reads the current table state, prepares its data files, then attempts to commit by writing a new version to the transaction log.

If two writers prepare commits at the same time and both try to write version 42, only one succeeds (using S3's atomic put-if-not-exists semantics). The losing writer detects the conflict, re-reads the current state (now version 42), and retries with version 43.

Delta Lake is smart about conflict detection: it checks whether the conflicting commit touched the same data the current writer read. If Writer 1 wrote date 2024-09-23 and Writer 2 also writes date 2024-09-23, Delta detects a conflict and forces a retry. If Writer 1 writes date 2024-09-23 and Writer 2 writes date 2024-09-24, they operate on disjoint partitions - Delta detects no conflict and both commits succeed without retry. This makes daily feature pipelines writing to different date partitions concurrency-safe by design.

Q4: What is the difference between OPTIMIZE and VACUUM in Delta Lake? When do you run each?

OPTIMIZE improves future read performance by consolidating small Parquet files into larger ones and (optionally) applying Z-ordering to colocate related data by column value. Run OPTIMIZE periodically (daily or weekly) on recently written partitions to maintain good query performance. OPTIMIZE does not delete any data - it rewrites files and adds new entries to the transaction log. OPTIMIZE delta.path ZORDER BY (user_id, feature_date) on recent partitions is a standard daily maintenance task.

VACUUM physically deletes Parquet files that are no longer referenced by any table version within the retention window. It frees storage but permanently removes the ability to time-travel to versions before the vacuum point. Run VACUUM monthly (or after GDPR deletion requests). The default retention is 7 days - VACUUM will not delete files newer than 7 days. To maintain 90-day time travel, run VACUUM with retentionHours=2160 (90 days).

The operational rule: OPTIMIZE is safe to run frequently (it only adds data, never removes it). VACUUM is permanent (it removes data) and should be run deliberately with full awareness of your time travel requirements.

Q5: How do you implement GDPR right-to-erasure for a user whose data is stored in a Delta Lake feature table?

The process has two phases: logical deletion (immediate) and physical deletion (via VACUUM).

Phase 1 - Logical deletion: call delta_table.delete(condition=F.col("user_id") == user_id). This creates a new transaction log entry marking the user's rows as deleted. The rows no longer appear in any current-version query. However, the underlying Parquet files still exist on disk and are accessible via time travel.

Phase 2 - Physical deletion: run delta_table.vacuum(retentionHours=0) with the retention duration check disabled (spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")). This physically deletes the Parquet files that contained the user's records. After VACUUM completes, the data is permanently gone - time travel to versions containing the user's data will fail.

Important nuance: the transaction log entries themselves (the JSON commit files in _delta_log/) record that a delete operation occurred on this user ID, but do not contain the actual feature values. The Parquet data files are what VACUUM removes. Store the GDPR audit trail (request ID, timestamp, tables processed, records deleted) in a separate immutable audit log that is not subject to VACUUM.

Summary

Delta Lake's transaction log turns object storage into a full database with ACID guarantees, history, and DML operations. For ML pipelines, the critical capabilities are: ACID writes that eliminate partial-write corruption, time travel that enables exact reproduction of any historical dataset, MERGE for efficient upsert pipelines, and DELETE for GDPR compliance.

The reproducibility pattern every ML team should implement: log the Delta table version number to MLflow with every training run. This creates an immutable link between model artifacts and data - you can reproduce the exact training dataset years later as long as VACUUM retention covers the time window. Set VACUUM retention based on your audit requirements, not on default values.

The operations to internalize: replaceWhere for atomic partition writes, timestampAsOf/versionAsOf for time travel, OPTIMIZE ZORDER BY for query acceleration, and VACUUM for storage reclamation (with full awareness that it permanently removes time travel capability for the vacuumed window).

© 2026 EngineersOfAI. All rights reserved.