:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::
Delta Lake
The Half-Written Table
The streaming pipeline had been running reliably for six months. Every five minutes, a Spark Structured Streaming job picked up events from Kafka, aggregated them, and wrote the results as Parquet files to an S3 prefix. The dashboard read from that prefix. Simple, fast, cheap.
Then the on-call alert fired at 3:47am. The revenue dashboard was showing numbers that were 30% lower than expected for the last two hours. The SRE woke up the data engineer. The data engineer connected to the cluster and ran a quick file listing on S3. There were 847 Parquet files in the directory from the last hour. That was correct. The counts looked right.
Then he ran the query that the dashboard was running and watched it execute. He watched the row count scroll up to 4.2 million. Expected was 6 million. He ran it again. 5.8 million. The numbers were different every time.
The problem was a race condition. Readers were scanning the S3 directory concurrently with writers. A reader would start listing files at the moment a Spark micro-batch was mid-write - after files 1 through 412 were written but before files 413 through 500 arrived. The reader saw a consistent snapshot of the file listing but that snapshot was taken at a moment of partial truth. The listing was consistent; the data was not.
The fix was not complex. The team added Delta Lake as a dependency, changed .format("parquet") to .format("delta") in both the writer and the reader, and redeployed. The next morning, the dashboard was accurate to the second. No more race conditions. No more partial reads. Delta Lake's transaction log meant readers always saw a consistent snapshot - if a write was in progress, readers saw the previous committed state.
This is the problem Delta Lake was built to solve first: atomic visibility. A batch of files is either all visible or none of it is. The transaction log is the mechanism.
Why This Exists - The Parquet Directory Problem
In 2019, Databricks had millions of customers running Spark jobs on cloud object storage. The pattern was the same everywhere: Spark writes Parquet files, a downstream job reads the directory. And everywhere, the same bugs appeared.
Partial writes. A Spark job writing 200 files fails at file 150. Files 1–150 are on S3. Files 151–200 never arrive. The next reader sees 150 files and processes incomplete data - silently, without error.
Inconsistent reads. Reader starts scanning a directory at the moment a writer is adding files. Some files are new, some are old. The reader sees a mix that represents no consistent point in time.
No efficient updates. To update a single record in a Parquet file, you must rewrite the entire file. For a daily partition with 100 files, updating one row means reading 100 files and writing 100 new files. This makes CDC (change data capture) patterns - receiving ongoing updates to existing records - extremely expensive on raw Parquet.
No way to undo. Once a Spark job overwrites a directory, the previous data is gone. There is no rollback, no version history, no audit trail.
Databricks had been working on these problems internally. In 2019 they open-sourced Delta Lake - a storage layer that adds ACID transactions to Parquet files on object storage through a simple append-only transaction log.
By 2023, Delta Lake had become the most widely deployed open table format in the industry, primarily through Databricks but increasingly standalone through the deltalake Python library (built on Rust via delta-rs) that requires no Spark or JVM dependency.
The Transaction Log - _delta_log/
Every Delta table has a _delta_log/ directory at its root. This directory is the heart of Delta Lake - it is the transaction log that records every change ever made to the table.
Each committed transaction creates one JSON file in _delta_log/ with a sequentially incrementing name:
s3://bucket/my_table/
_delta_log/
00000000000000000000.json ← version 0 (table creation)
00000000000000000001.json ← version 1 (first append)
00000000000000000002.json ← version 2 (second append)
00000000000000000003.json ← version 3 (MERGE INTO)
00000000000000000010.checkpoint.parquet ← checkpoint at version 10
_last_checkpoint ← points to latest checkpoint
part-00001-abc.parquet
part-00002-def.parquet
part-00003-ghi.parquet
...
Each JSON file is a sequence of action records. The most important actions are:
add: a new data file was added - includes path, size, statistics, and partition valuesremove: a data file was logically deleted - still on disk untilVACUUMrunsmetaData: schema definition or schema changecommitInfo: who committed, when, what operation, what parameters
To reconstruct the current state of the table, replay all JSON files from 0 to the latest version - collect all add actions, remove those that were subsequently removed. The remaining set of files is the current table state.
Checkpoints. After every 10 commits (configurable), Delta writes a Parquet checkpoint file that captures the full table state at that version. Reading the table no longer requires replaying all JSON files from the beginning - just replay from the last checkpoint forward. This keeps read overhead bounded as the log grows.
ACID in Delta Lake - How It Actually Works
Delta Lake implements three of the four ACID properties directly. The fourth (consistency) is enforced by schema checking and constraint validation at commit time.
Atomicity. A transaction either fully commits (all files written + JSON log entry created) or fails (no log entry). If a Spark job crashes after writing 100 of 200 data files but before writing the JSON commit entry, those 100 orphaned files are not referenced by any log entry. Readers never see them. They are cleaned up by VACUUM.
Isolation. Each read works against a specific log version. A reader scanning at version 5 is unaffected by writers committing versions 6, 7, or 8 concurrently. The reader's file list was determined by replaying the log up to version 5, and that list does not change.
Durability. The transaction log is written to the same durable object storage as the data files. S3, GCS, and ADLS all provide 11 nines of durability. Once the JSON commit entry exists, the transaction is permanent.
Concurrent write handling. When two writers attempt to commit the same version number simultaneously:
Writer A reads current version: 5
Writer B reads current version: 5
Writer A writes 10 data files
Writer A attempts to write _delta_log/000...006.json ← succeeds
Writer B writes 10 data files
Writer B attempts to write _delta_log/000...006.json ← fails (already exists)
Writer B conflict resolution:
- If writes are to different partitions → merge and retry as version 7
- If writes conflict (same partition, same rows) → fail with ConflictException
Delta's conflict resolution logic handles the most common cases automatically. Appends to different partitions can always be merged. Updates to the same rows are a genuine conflict and require application-level retry logic.
DML Operations - UPDATE, DELETE, MERGE INTO
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("delta-dml")
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0")
.config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
# --- Setup: create a Delta table ---
from pyspark.sql import Row
from datetime import date
customers = spark.createDataFrame([
Row(customer_id="c001", name="Alice", plan="free", mrr=0.0, updated=date(2024, 1, 1)),
Row(customer_id="c002", name="Bob", plan="basic", mrr=29.0, updated=date(2024, 1, 1)),
Row(customer_id="c003", name="Carol", plan="pro", mrr=99.0, updated=date(2024, 1, 1)),
Row(customer_id="c004", name="Dave", plan="basic", mrr=29.0, updated=date(2024, 1, 1)),
])
customers.write.format("delta").mode("overwrite").save("/tmp/customers_delta")
# --- UPDATE ---
spark.sql("""
UPDATE delta.`/tmp/customers_delta`
SET mrr = 49.0, updated = DATE '2024-02-01'
WHERE plan = 'basic'
""")
# --- DELETE ---
spark.sql("""
DELETE FROM delta.`/tmp/customers_delta`
WHERE plan = 'free'
""")
# --- MERGE INTO (upsert) ---
# Simulate incoming CRM changes
changes = spark.createDataFrame([
# Existing customer upgrading plan
Row(customer_id="c002", name="Bob", plan="pro", mrr=99.0, updated=date(2024, 3, 1)),
# New customer
Row(customer_id="c005", name="Eve", plan="basic", mrr=29.0, updated=date(2024, 3, 1)),
# Customer flagged for deletion
Row(customer_id="c003", name="Carol", plan="deleted", mrr=0.0, updated=date(2024, 3, 1)),
])
changes.createOrReplaceTempView("customer_changes")
spark.sql("""
MERGE INTO delta.`/tmp/customers_delta` AS target
USING customer_changes AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.plan = 'deleted' THEN
DELETE
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.plan = source.plan,
target.mrr = source.mrr,
target.updated = source.updated
WHEN NOT MATCHED THEN
INSERT (customer_id, name, plan, mrr, updated)
VALUES (source.customer_id, source.name, source.plan, source.mrr, source.updated)
""")
# Verify results
spark.read.format("delta").load("/tmp/customers_delta").show()
Time Travel
Delta Lake preserves every version of the table - old data files that were removed by UPDATE or DELETE operations remain on disk until VACUUM is explicitly run. This enables reading any historical version.
# --- Time travel by version number ---
df_v0 = (
spark.read
.format("delta")
.option("versionAsOf", 0)
.load("/tmp/customers_delta")
)
print("Version 0:")
df_v0.show()
# --- Time travel by timestamp ---
df_jan = (
spark.read
.format("delta")
.option("timestampAsOf", "2024-02-01 00:00:00")
.load("/tmp/customers_delta")
)
print("As of Feb 1:")
df_jan.show()
# --- SQL time travel syntax ---
spark.sql("""
SELECT * FROM delta.`/tmp/customers_delta` VERSION AS OF 0
""").show()
spark.sql("""
SELECT * FROM delta.`/tmp/customers_delta`
TIMESTAMP AS OF '2024-02-01'
""").show()
# --- Inspect history ---
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/tmp/customers_delta")
dt.history().select("version", "timestamp", "operation", "operationParameters").show(truncate=False)
:::tip Time Travel for ML Reproducibility
Time travel is one of the most underused features of Delta Lake for ML teams. When training a model, record the table version you trained on: table.currentSnapshot().version. To reproduce the exact training dataset six months later, read VERSION AS OF {recorded_version}. This is far more reliable than trying to recreate data from audit logs.
:::
Change Data Feed - Streaming CDC
Change Data Feed (CDF) is a Delta Lake feature that exposes a changelog of row-level changes - which rows were inserted, updated (before and after values), and deleted. It enables downstream consumers to process only the changes since they last read, rather than scanning the full table on every run.
# Enable CDF on a table
spark.sql("""
ALTER TABLE delta.`/tmp/customers_delta`
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# After making some changes...
spark.sql("""
UPDATE delta.`/tmp/customers_delta`
SET mrr = 149.0
WHERE customer_id = 'c002'
""")
# Read the change feed between two versions
changes_df = (
spark.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 1) # or "startingTimestamp"
.load("/tmp/customers_delta")
)
# _change_type column: insert, update_preimage, update_postimage, delete
changes_df.select(
"customer_id", "name", "mrr",
"_change_type", "_commit_version", "_commit_timestamp"
).show(truncate=False)
CDF output for an UPDATE produces two rows per changed record:
update_preimage- the row values before the updateupdate_postimage- the row values after the update
This is essential for downstream consumers that need to know what changed, not just what the current state is. Use cases include:
- Downstream aggregate tables: recompute only the affected aggregates
- ML feature stores: update feature values for changed entities
- Search index updates: only reindex changed documents
- Event-driven pipelines: trigger downstream processing for specific change types
# Streaming CDC consumer pattern
# Continuously reads new changes as they arrive
streaming_changes = (
spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.load("/tmp/customers_delta")
)
# Process only updates (for a feature store sync pattern)
updates_only = streaming_changes.filter(
streaming_changes._change_type.isin("update_postimage", "insert")
)
query = (
updates_only.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/feature_store_checkpoint")
.outputMode("append")
.start("/tmp/feature_store_delta")
)
Z-Ordering - Multi-Dimensional Clustering
Z-ordering (also called Z-curve ordering) is Delta Lake's mechanism for co-locating related data within Parquet files to improve data skipping. The problem it solves is that data written over time lands in files in arrival order, not in query-efficient order. A query filtering by customer_id has to scan every file because customer_id values are scattered across all files.
After Z-ordering by customer_id, related customer IDs are co-located in the same files. The min/max customer_id range per file becomes much narrower, so a filter WHERE customer_id = 'c001' can skip far more files.
Z-ordering is a multi-dimensional generalization: it can co-locate by multiple columns simultaneously using a space-filling curve that maps N-dimensional space to a 1-dimensional ordering that preserves locality. A Z-order by (customer_id, event_date) is better for queries filtering on either or both columns than ordering by customer_id alone.
# OPTIMIZE + ZORDER - rewrites and clusters existing data files
spark.sql("""
OPTIMIZE delta.`/tmp/customers_delta`
ZORDER BY (customer_id, plan)
""")
# Add bloom filters for high-cardinality point-lookup columns
spark.sql("""
ALTER TABLE delta.`/tmp/customers_delta`
SET TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '5',
'delta.bloomFilter.customer_id.enabled' = 'true',
'delta.bloomFilter.customer_id.fpp' = '0.01'
)
""")
:::note OPTIMIZE + ZORDER is Not Free OPTIMIZE rewrites all affected Parquet files, which involves reading and writing potentially gigabytes or terabytes of data. Schedule it during low-traffic periods - typically nightly for tables with daily batch writes, or weekly for append-heavy tables. The performance improvement for ad-hoc queries is typically 5–50x on selective filters. For full-table scans, there is no benefit. :::
Delta Lake for ML Workflows
Delta Lake is an excellent foundation for ML data pipelines because versioned datasets and reproducible reads come for free.
import mlflow
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ml-training").getOrCreate()
# Record the exact dataset version used for training
table_path = "/tmp/training_data_delta"
dt = DeltaTable.forPath(spark, table_path)
dataset_version = dt.history(1).collect()[0]["version"]
with mlflow.start_run():
mlflow.log_param("dataset_path", table_path)
mlflow.log_param("dataset_version", dataset_version)
# Read the training data at a pinned version
df = (
spark.read
.format("delta")
.option("versionAsOf", dataset_version)
.load(table_path)
)
# ... train model ...
# mlflow.sklearn.log_model(model, "model")
# Six months later, to reproduce:
# df = spark.read.format("delta").option("versionAsOf", recorded_version).load(table_path)
# This gives you the exact same rows that were used in training.
Delta as a feature store backend. The combination of MERGE INTO (for upserts from feature pipelines), Change Data Feed (for incremental feature updates), and time travel (for point-in-time correct features to avoid training-serving skew) makes Delta tables a practical feature store backend without requiring a dedicated feature store platform.
# Feature store upsert pattern
feature_updates = spark.createDataFrame([
("c001", 5, 0.82, "2024-03-01"),
("c002", 12, 0.91, "2024-03-01"),
("c005", 2, 0.45, "2024-03-01"),
], ["customer_id", "sessions_30d", "churn_risk_score", "as_of_date"])
feature_updates.createOrReplaceTempView("feature_updates")
spark.sql("""
MERGE INTO delta.`/tmp/customer_features` AS target
USING feature_updates AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
Vacuum - Cleaning Up Old Files
VACUUM permanently deletes data files that are no longer referenced by any version within the retention window. This reclaims storage but also destroys time travel capability for those versions.
# Check what VACUUM would delete (dry run)
spark.sql("""
VACUUM delta.`/tmp/customers_delta` RETAIN 168 HOURS DRY RUN
""").show(truncate=False)
# Actually run vacuum with 7-day retention (168 hours)
spark.sql("""
VACUUM delta.`/tmp/customers_delta` RETAIN 168 HOURS
""")
# IMPORTANT: The default retention is 7 days (168 hours)
# Running VACUUM with a shorter retention (e.g., RETAIN 0 HOURS)
# requires disabling the safety check:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
# Only do this if you are absolutely certain you do not need time travel
:::danger VACUUM and Time Travel Destruction
Running VACUUM permanently deletes old data files. After VACUUM RETAIN 168 HOURS, you cannot time travel to any version older than 7 days. For tables used in ML training, consider setting a longer retention (30 or 90 days) so you can reproduce experiments from the past month. Set delta.logRetentionDuration = interval 90 days and delta.deletedFileRetentionDuration = interval 90 days in table properties to extend the default.
:::
Delta vs. Iceberg - Technical Differences
| Dimension | Delta Lake | Apache Iceberg |
|---|---|---|
| Transaction log format | Append-only JSON files (_delta_log/) | Metadata.json + Avro manifest files |
| Catalog requirement | None (can work without catalog) | Requires a catalog (REST, Glue, Hive, JDBC) |
| Multi-engine support | Improving (Delta Universal Format) | Excellent (native open standard) |
| Spark streaming | Excellent (native integration) | Good (Flink native, Spark via connector) |
| Time travel syntax | VERSION AS OF N / TIMESTAMP AS OF ts | FOR SYSTEM_TIME AS OF ts / snapshot ID |
| Partition evolution | Requires rewrite or new partition column | Hidden, no rewrite needed |
| Schema evolution | Column add/rename (name-based) | Column add/rename (ID-based, more correct) |
| Compaction API | OPTIMIZE (SQL) | rewriteDataFiles (programmatic) |
| Change Data Feed | Built-in, simple | Supported via changelog scan |
| Community | Databricks-led (Linux Foundation since 2019) | Apache Software Foundation |
| Python without Spark | delta-rs / deltalake library (Rust) | pyiceberg (pure Python) |
The practical decision: if you are on Databricks or are Spark-first, Delta Lake is the natural choice and has better tooling integration. If you need multi-engine interoperability (reading the same tables from Trino, Flink, Athena, and Spark), Iceberg is the better foundation.
Delta Live Tables - Declarative Pipelines
Delta Live Tables (DLT) is a Databricks-specific feature that brings declarative, constraint-aware pipeline definitions to Delta Lake. It handles dependency management, incremental updates, and data quality enforcement automatically.
# Delta Live Tables pipeline definition
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
@dlt.table(
name="raw_events",
comment="Raw events ingested from Kafka",
)
def raw_events():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.load()
.selectExpr("CAST(value AS STRING) AS json_value", "timestamp")
)
@dlt.table(
name="parsed_events",
comment="Parsed and validated events",
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect("positive_revenue", "revenue >= 0")
def parsed_events():
schema = StructType([
StructField("event_id", StringType()),
StructField("customer_id", StringType()),
StructField("revenue", DoubleType()),
])
return (
dlt.read_stream("raw_events")
.select(F.from_json("json_value", schema).alias("data"), "timestamp")
.select("data.*", "timestamp")
)
@dlt.table(
name="customer_daily_revenue",
comment="Daily revenue rollup per customer - SLA: fresh within 30 min",
)
def customer_daily_revenue():
return (
dlt.read("parsed_events")
.groupBy(
"customer_id",
F.to_date("timestamp").alias("event_date"),
)
.agg(F.sum("revenue").alias("daily_revenue"))
)
DLT's @dlt.expect annotations write quality metrics to a separate _dlt_expectations table. You can query it to see what percentage of rows failed each constraint, which partition failed, and when. This turns data quality from a silent failure into an observable system property.
Production Notes
Log compaction is handled automatically. Delta writes a Parquet checkpoint every 10 commits. You do not need to manage this manually. For tables with extremely high commit rates (thousands of commits per day), increase the checkpoint frequency: delta.checkpointInterval = 5.
Small file problem with streaming. Spark Structured Streaming writing to Delta creates one or more Parquet files per micro-batch. A 30-second micro-batch interval creates 2,880 files per day - after a month, 86,000 files. Schedule OPTIMIZE nightly (or on a cron) to compact them. Use AUTO OPTIMIZE on Databricks to compact automatically after each write.
Column statistics coverage. By default, Delta collects statistics for the first 32 columns. If your most-filtered columns are beyond position 32, data skipping will not help. Set delta.dataSkippingNumIndexedCols to a higher value, or reorder columns so the high-cardinality filter columns come first.
Partition pruning vs. Z-ordering. Partitioning by a column prunes entire directories. Z-ordering by a column prunes files within a partition. Use partitioning for low-cardinality date/region columns (the table will have O(days) partitions). Use Z-ordering for high-cardinality columns like customer_id or user_id where partitioning would create millions of tiny directories.
Common Mistakes
:::danger Running VACUUM with Short Retention
The most common irreversible mistake with Delta Lake is running VACUUM RETAIN 0 HOURS to save on storage costs. This permanently deletes all historical data files, destroying time travel capability and making it impossible to audit past states. Never vacuum below 7 days unless you have explicitly decided that time travel is not needed for that table. For ML training tables, 30–90 day retention is recommended.
:::
:::danger Overwriting with mode("overwrite") Instead of MERGE
Developers new to Delta Lake often use .write.format("delta").mode("overwrite") for daily loads. This is a full table overwrite - it removes all existing data and replaces with the new batch. If the source data was incomplete that day, you have just destroyed historical rows. Use MERGE INTO for idempotent upserts, or replaceWhere for partition-scoped overwrites.
:::
:::warning Concurrent Writes Without Partitioning
Two Spark jobs writing to the same unpartitioned Delta table concurrently will conflict and one will retry. At moderate concurrency (3–5 parallel writers) this creates significant retry overhead. Partition the table by a natural key that separates writer domains (e.g., partition by region and have each writer only write one region) to eliminate conflicts.
:::
:::warning Change Data Feed Storage Overhead
Enabling CDF (delta.enableChangeDataFeed = true) on a high-update table adds ~20–40% storage overhead because it writes change log data files in addition to the normal data files. For tables with low update rates or batch-only updates, the overhead is negligible. For tables with many small updates, benchmark storage costs before enabling.
:::
Interview Q&A
Q: Explain how Delta Lake achieves atomic writes on object storage like S3.
A: Delta Lake's atomicity comes from the transaction log in _delta_log/. A writer writes all Parquet data files first - they land on S3, but they are not referenced by any commit yet. Then the writer creates the commit JSON file (e.g., 000...007.json) in _delta_log/. This JSON file atomically lists all the new files added and all files removed. The creation of this single JSON file is the commit. S3 object creation is atomic (an object either exists or it does not - there is no partial object visible to readers). Before the JSON file exists, readers do not know about the new data files. After the JSON file exists, all new files are visible simultaneously. If the Spark job crashes before writing the JSON file, the orphaned Parquet files are cleaned up by VACUUM - readers never see them.
Q: What is the Delta Lake transaction log and how does it handle concurrent writers?
A: The transaction log is a directory of append-only JSON files in _delta_log/, one per committed version. Each JSON file describes what files were added and removed in that commit. Two concurrent writers both read the current version (say, version 5) and attempt to write version 6. The first to succeed writes 000...006.json. The second attempt to create 000...006.json fails because S3 already has that object. The second writer must retry: re-read current state (now version 6), re-plan its write taking version 6 into account, and attempt to write version 7. If the two writes were to different data (different partitions, or both are pure appends), they can be automatically merged. If they conflict on the same rows, the retry fails and the application must handle the conflict.
Q: When would you use Change Data Feed and what does it output?
A: CDF is used when downstream consumers need to process only the changes to a Delta table rather than re-scanning the full table on every run. Enable it with SET TBLPROPERTIES (delta.enableChangeDataFeed = true). Read it with .option("readChangeFeed", "true").option("startingVersion", N). The output adds three columns: _change_type (values: insert, update_preimage, update_postimage, delete), _commit_version, and _commit_timestamp. An UPDATE to a single row produces two rows: the update_preimage (before values) and update_postimage (after values). CDF is used for: incrementally updating ML feature stores, syncing changes to downstream Elasticsearch indexes, and building event-driven pipelines that react to specific change types.
Q: What is Z-ordering and how does it improve query performance?
A: Z-ordering is a data layout optimization that co-locates related rows within Parquet files by sorting data along a Z-curve (a space-filling curve that maps multi-dimensional space to one dimension while preserving locality). Running OPTIMIZE table ZORDER BY (customer_id, region) rewrites Parquet files so that rows with similar customer_id and region values end up in the same files. This makes the per-file min/max statistics for those columns much tighter - a query filtering WHERE customer_id = 'c001' can skip far more files because most files contain a narrow range of customer IDs. The improvement is most dramatic for high-cardinality columns on selective queries. A 1TB table with random layout might require scanning all 500 files for a point query. After Z-ordering, the same query might read only 3 files.
Q: Compare Delta Lake's schema evolution to Iceberg's. Which is more robust and why?
A: Delta Lake's schema evolution tracks columns by name. When you add a column or rename one, the metadata is updated by name. This creates a subtle problem: if column revenue is renamed to gross_revenue, old Parquet files still have a column called revenue. When a reader asks for gross_revenue, Delta must map the new name to the old name in old files and the new name in new files - this mapping is stored in the metadata. Iceberg's schema evolution tracks columns by immutable integer column IDs. The column name is just an alias for the ID. Rename revenue (column ID 3) to gross_revenue - old files have data for column ID 3, new files have data for column ID 3, both are returned correctly as gross_revenue regardless of what name was used when the file was written. Iceberg's approach is more correct because it does not depend on any name mapping logic - the ID is always the authoritative identity of a column.
Q: Walk through what happens physically when you run DELETE FROM customers WHERE plan = 'free' on a Delta table with Copy-on-Write semantics.
A: Delta (with CoW) reads the current table state from the transaction log to find all active data files. It then filters these files to find which ones contain rows where plan = 'free' - this uses the column statistics in the log to skip files that definitely do not contain free plan rows. For each affected file, Delta reads the entire Parquet file, filters out the rows where plan = 'free', and writes a new Parquet file containing only the non-deleted rows. The old Parquet files are listed in a remove action in the new commit JSON. The new Parquet files are listed in add actions. The commit is written to _delta_log/. After this commit, readers see the new files (without the deleted rows) and the old files are considered logically deleted - they remain on disk until VACUUM runs after the retention window expires. The cost of the DELETE is proportional to the number of affected files, not to the total table size - if plan = 'free' rows are concentrated in 3 files out of 500, only 3 files are rewritten.
Delta Without Spark - The deltalake Python Library
A major friction point with early Delta Lake was the Spark dependency. Every operation - even reading 1,000 rows from a Delta table - required spinning up a JVM and a Spark session. For lightweight Python scripts, ML inference jobs, and feature store lookups, this was unacceptable.
The delta-rs project (also published as the deltalake Python package) is a pure Rust implementation of the Delta Lake protocol with Python bindings. No JVM, no Spark, no cluster. Just Python and fast Rust-based Parquet reads via Apache Arrow.
# pip install deltalake pyarrow pandas
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import pyarrow as pa
# --- Write with deltalake (no Spark) ---
df = pd.DataFrame({
"order_id": ["o001", "o002", "o003"],
"customer_id": ["c100", "c200", "c100"],
"revenue": [29.99, 99.99, 49.99],
"order_date": ["2024-01-15", "2024-01-15", "2024-01-16"],
})
write_deltalake(
"/tmp/orders_delta",
df,
mode="overwrite",
partition_by=["order_date"],
schema_mode="overwrite",
)
# --- Read with deltalake ---
dt = DeltaTable("/tmp/orders_delta")
# Read as Pandas
df_pandas = dt.to_pandas()
# Read as PyArrow Table (faster for large datasets)
arrow_table = dt.to_pyarrow_table()
# Read with filtering (pushes down to Parquet reader)
filtered = dt.to_pandas(
filters=[("customer_id", "=", "c100")]
)
# --- Time travel ---
dt_v0 = DeltaTable("/tmp/orders_delta", version=0)
df_v0 = dt_v0.to_pandas()
# --- Inspect table metadata ---
print("Protocol:", dt.protocol())
print("Metadata:", dt.metadata())
print("History:", dt.history())
print("Files:", dt.files())
# --- Schema ---
print("Schema:", dt.schema())
# --- Upsert / MERGE using deltalake (no Spark) ---
updates = pa.table({
"order_id": ["o001", "o004"],
"customer_id": ["c100", "c300"],
"revenue": [39.99, 19.99], # o001 revenue changed
"order_date": ["2024-01-15", "2024-01-17"],
})
(
dt.merge(
source=updates,
predicate="target.order_id = source.order_id",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
print("After merge:", dt.to_pandas())
The deltalake library supports reads, writes, MERGE, DELETE, schema evolution, and time travel - all without Spark. It uses a thread pool for parallel Parquet reads and leverages Arrow's memory format for zero-copy interoperability with Pandas and NumPy. For ML training pipelines that need to read a Delta table into memory as fast as possible, deltalake is typically 3–5x faster than using Spark because there is no JVM startup time and no serialization overhead between the JVM and Python.
Writing Efficient Delta Tables - Real Patterns
Pattern 1: Idempotent Batch Loads
The most common Delta anti-pattern is using .mode("overwrite") for daily loads. If the source data for a day is reprocessed (common in event pipelines where late-arriving data needs backfill), an overwrite destroys all other partitions.
Use replaceWhere instead - a partition-scoped overwrite that only affects the specified partitions:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("delta-patterns").getOrCreate()
# BAD: full table overwrite - destroys all history
df_today.write.format("delta").mode("overwrite").save("/tmp/events/")
# GOOD: partition-scoped overwrite - only affects today's partition
(
df_today.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "event_date = '2024-01-15'")
.save("/tmp/events/")
)
# Even better: MERGE for true idempotency
# (handles both new records and updates to existing records)
df_today.createOrReplaceTempView("events_today")
spark.sql("""
MERGE INTO delta.`/tmp/events` AS target
USING events_today AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
Pattern 2: Schema-on-Write Enforcement
Delta enforces the schema on every write by default. This is a feature - it prevents silent schema drift from upstream sources.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
# Enable schema enforcement (default is True for Delta)
(
df.write
.format("delta")
.mode("append")
# This will FAIL if df has extra columns or wrong types
.save("/tmp/events/")
)
# To allow schema evolution (additive only - new columns are OK, type changes are not):
(
df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") # Allow new columns to be added
.save("/tmp/events/")
)
# To overwrite the schema entirely (dangerous - use only for table rebuilds):
(
df.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save("/tmp/events/")
)
Pattern 3: Streaming with Auto Compaction
Streaming writes create many small files. Enable auto-optimization (Databricks) or schedule compaction manually:
# On Databricks: enable auto optimize and auto compact
spark.sql("""
ALTER TABLE delta.`/tmp/events`
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Open-source Delta: schedule OPTIMIZE as a cron job
# Run this after your streaming job has been running for a day:
spark.sql("""
OPTIMIZE delta.`/tmp/events`
WHERE event_date >= date_sub(current_date(), 2)
""")
Concurrent Write Patterns - Advanced
Understanding Delta's concurrency model is important for designing pipelines that can safely write in parallel.
Delta's conflict resolution applies these rules in order:
- If both operations are pure appends to disjoint sets of files → always merge, both succeed
- If both operations write to disjoint partitions → merge, both succeed
- If one is an
INSERTand the other is aDELETE/UPDATEon a different partition → merge - If both modify the same partition with conflicting changes →
ConcurrentModificationException
For pipelines with genuinely concurrent writes to the same table, the best strategy is to design writes so they are always partition-disjoint (e.g., each Kafka partition maps to a Delta partition, each regional pipeline writes to its own region partition).
Monitoring a Delta Table in Production
Healthy Delta table operations require ongoing monitoring. Key metrics to track:
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("delta-monitoring").getOrCreate()
dt = DeltaTable.forPath(spark, "/tmp/events/")
# --- Table detail: current state ---
detail = spark.sql("DESCRIBE DETAIL delta.`/tmp/events/`")
detail.select(
"name", "location", "numFiles", "sizeInBytes",
"partitionColumns", "lastModified"
).show(truncate=False)
# --- File size distribution: detect small file problem ---
spark.sql("""
SELECT
COUNT(*) as file_count,
MIN(size) / 1024 / 1024 as min_size_mb,
AVG(size) / 1024 / 1024 as avg_size_mb,
MAX(size) / 1024 / 1024 as max_size_mb,
SUM(size) / 1024 / 1024 / 1024 as total_size_gb
FROM delta.`/tmp/events/`.files
""").show()
# --- Recent operations: detect unexpected overwrites or large deletes ---
spark.sql("""
SELECT
version,
timestamp,
operation,
operationParameters,
operationMetrics.numOutputRows,
operationMetrics.numRemovedFiles,
operationMetrics.numAddedFiles
FROM delta.`/tmp/events/`.history
ORDER BY version DESC
LIMIT 20
""").show(truncate=False)
# Alert conditions:
# - avg_size_mb < 32: small file problem, run OPTIMIZE
# - file_count > 10000 for a single partition: run OPTIMIZE with ZORDER
# - numRemovedFiles > 1000 in a single operation: unexpected large DELETE or OVERWRITE
# - version gap (missing versions): manual file deletion bypassed the log (data corruption)
Summary
Delta Lake solves the core reliability problems of raw Parquet on object storage - partial writes, inconsistent reads, no efficient updates, no version history - through a simple but powerful mechanism: an append-only JSON transaction log in _delta_log/. Every commit is an atomic entry in this log. Readers always see a consistent snapshot. Writers use optimistic concurrency to handle simultaneous writes. On top of this foundation, Delta Lake adds MERGE INTO for upserts, Change Data Feed for CDC consumers, Z-ordering for query performance, time travel for reproducibility, and Delta Live Tables for declarative pipeline definitions. The delta-rs / deltalake Python library (built in Rust) brings all of this to Python programs without requiring Spark or a JVM - making Delta Lake practical for lightweight scripts, feature store updates, and ML pipelines that do not need distributed processing.
