Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::

Apache Iceberg

The GDPR Deletion That Took 2 Seconds​

The on-call engineer got the ticket at 9am on a Friday: a user in France had submitted a GDPR erasure request. Per regulation, all records tied to this user must be deleted within 30 days. The ticket had been sitting in the backlog for 25 days. Five days left.

Before Iceberg, this process was manual and brutal. A Spark job would scan the entire 3TB events table, filter out the target user's rows, write a new version of every Parquet file containing their data, and then swap the old directory for the new one. If the job failed halfway through - and it often did, because 3TB is a lot of data and Spot instances are unreliable - it had to start over. Total time: 3 to 5 hours on a good day. Eight to twelve hours when something went wrong. Engineers dreaded these tickets. They ran them manually because automating a process this slow and fragile was asking for trouble.

After migrating to Iceberg, the engineer opened a Python script they had written three weeks earlier. They ran it. The terminal printed "Deleted 847 records in 2.1 seconds." The engineer stared at the terminal for a moment, reread the number, and then committed the output to the ticket. Two seconds.

What changed? Iceberg implements row-level deletes as delete files - small metadata files that record which rows are deleted, identified by row position or equality predicates. For a point deletion by user ID, Iceberg writes a tiny equality-delete file that says "rows where user_id = 'abc123' are deleted." No data rewrite happens. The operation is metadata. The actual Parquet files are untouched until the next scheduled compaction job rewrites them. For a deletion touching one user across a 3TB table, the work is proportional to the number of partitions containing that user's data - not proportional to the total table size.

This is what Apache Iceberg was built for. Not just fast deletes, but a complete reimagining of how a table format should work at scale.


Why This Exists - Hive's Scaling Failure at Netflix​

In 2017, Netflix was running one of the largest Hive deployments in the world. Their data warehouse held tens of petabytes. A single large table might have hundreds of thousands of partitions. A simple ALTER TABLE ADD PARTITION call to the Hive Metastore for a table with 500,000 partitions could take over ten minutes - the Metastore had to enumerate all existing partitions to verify the new one was unique.

Hive's fundamental architectural problem was that partitions were directories. If you had a table partitioned by (year, month, day, hour), the Hive Metastore tracked millions of individual directories. Every query started with a partition listing operation on HDFS or S3 - listing a bucket with millions of keys is slow. Listing every partition for a year of hourly data meant listing 8,760 directories before reading a single byte of actual data.

Beyond performance, Hive had no ACID at the partition level. Two jobs could both try to add data to the same partition directory and produce corrupt mixed results. Schema changes were risky - renaming a column in Hive Metastore did not update the underlying Parquet column references, so old files used the old name and new files used the new name, silently breaking queries.

Ryan Blue, one of Iceberg's original designers at Netflix, set out to fix this. The design goals were explicit:

  1. Table operations must be O(1)O(1) in partition count, not O(n)O(n)
  2. Schema evolution must be correct by design, not by convention
  3. ACID commits must be safe under concurrent writes
  4. The format must be queryable by any engine, not just Hive

The result was Apache Iceberg. It became a top-level Apache project in 2020. By 2024, it had been adopted by Netflix, Apple, LinkedIn, Adobe, Airbnb, and virtually every major cloud vendor as the open table format standard.


Core Concept - What Iceberg Is and Is Not​

Iceberg is a table format specification, not a storage engine. It defines how to organize data files, how to record metadata about those files, and how to implement ACID semantics - but it does not store or read data itself. Any compliant engine (Spark, Trino, Flink, DuckDB, PyIceberg) can read and write an Iceberg table by implementing the spec.

The key design decision is that Iceberg tracks individual files - not directories. Every data file is explicitly listed in Iceberg's metadata with its full path, its partition values, its row count, and its column-level statistics. This means:

  • No directory listing at query time - all file paths come from metadata
  • Partition changes do not require metadata migration - just update the spec
  • Statistics are always available - no need for ANALYZE TABLE jobs
  • Any file can be added or removed atomically by committing a new metadata version

The Three-Layer Metadata Architecture​

Iceberg's metadata forms a hierarchy of four levels. Understanding this hierarchy is essential for understanding how ACID, time travel, and performance all work.

Layer 1 - Data Files. The actual Parquet (or ORC, or Avro) files stored in object storage. Each file stores a subset of the table's rows. These are standard Parquet files - any tool that reads Parquet can read them, though without Iceberg metadata you would not know which files are "current."

Layer 2 - Manifest Files. Avro files that list a set of data files. Each entry in a manifest file includes the full path to the data file, the partition values it contains, and per-column statistics (lower bound, upper bound, null count, nan count). These statistics are what enable data skipping - the query planner reads manifest files to decide which data files to skip.

Layer 3 - Manifest List. Also called a "snapshot." An Avro file that lists all the manifest files belonging to a particular version of the table. Each commit creates a new snapshot (manifest list). The manifest list also records which manifests are new (added in this snapshot) vs. inherited (unchanged from the previous snapshot) - this is how Iceberg achieves O(changed files)O(\text{changed files}) commit overhead rather than O(total files)O(\text{total files}).

Layer 4 - metadata.json. A JSON file that is the entry point for every table operation. It contains the current schema, the partition spec, the full list of snapshots, and a pointer to the current snapshot. When a writer commits a new snapshot, they atomically swap the metadata.json pointer using an object store's conditional put (CAS) or a catalog-level lock.


ACID Transactions - Optimistic Concurrency​

Iceberg uses optimistic concurrency for ACID commits. The assumption is that most writes do not conflict with each other - they write to different partitions, or they append new data rather than updating existing data. Only when a genuine conflict is detected does a write fail.

The commit protocol:

  1. Read the current metadata.json to get the base snapshot
  2. Plan the write: determine which files will be added, which will be removed
  3. Write the data files and new manifest files to object storage
  4. Commit by writing a new metadata.json with the updated snapshot list and attempting an atomic swap (in S3: conditional PUT; in a catalog: optimistic lock)
  5. On conflict (another writer committed a new metadata.json between steps 1 and 4): retry from step 1 with conflict resolution strategy

For append operations (adding new rows only), conflicts are trivially resolved - both commits can succeed because they write to different files. For updates that modify the same rows, a conflict is a genuine data integrity issue and the second writer fails and must retry.

# Snapshot isolation in action
# Reader A starts reading at snapshot 42
# Writer B commits snapshot 43 (adds new data)
# Reader A still sees snapshot 42 - consistent throughout its scan
# Reader C starts after snapshot 43 - sees snapshot 43

from pyiceberg.catalog import load_catalog

catalog = load_catalog("default", **{
"type": "rest",
"uri": "http://localhost:8181",
})

table = catalog.load_table("analytics.events")

# Each scan() call works against a specific snapshot
# snapshot_id is locked at scan creation time
scan = table.scan(snapshot_id=42) # explicit snapshot
for batch in scan.to_arrow_table().to_batches():
# Always sees snapshot 42, regardless of concurrent writes
pass

Schema Evolution - Correct by Design​

Schema evolution is where Hive gets painful and Iceberg gets elegant. The core problem is simple: in Parquet files, columns are referenced by name. If you rename a column in your schema, old files use the old name and new files use the new name. A query for the column will return nulls for old files.

Iceberg solves this by assigning every column a column ID at creation time. Column IDs are integers that never change. The mapping from column ID to column name is stored in metadata.json. When you rename a column, you update the name in the schema but the column ID stays the same. Old Parquet files still have the data - Iceberg maps the old name to the same column ID, and readers find the data correctly.

The supported schema evolution operations in Iceberg:

OperationEffectData Rewrite?
Add columnNew column appended (null in old files)No
Drop columnColumn removed from schemaNo (data remains, just hidden)
Rename columnNew name maps to same column IDNo
Reorder columnsColumns reordered in schemaNo
Widen typee.g., int to long, float to doubleNo
Change type (incompatible)Not allowed - must add new columnN/A
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField, StringType, LongType, DoubleType, TimestampType
)

catalog = load_catalog("default", **{
"type": "sql",
"uri": "sqlite:///iceberg_catalog.db",
"warehouse": "/tmp/iceberg_warehouse",
})

# Create namespace and table
catalog.create_namespace("analytics")

schema = Schema(
NestedField(field_id=1, name="event_id", field_type=StringType(), required=True),
NestedField(field_id=2, name="user_id", field_type=StringType(), required=True),
NestedField(field_id=3, name="revenue", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="event_time", field_type=TimestampType(), required=True),
)

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform

partition_spec = PartitionSpec(
PartitionField(
source_id=4, # event_time column
field_id=1000,
transform=DayTransform(),
name="event_day",
)
)

table = catalog.create_table(
identifier="analytics.events",
schema=schema,
partition_spec=partition_spec,
)

print(f"Table created: {table.identifier}")
print(f"Schema: {table.schema()}")

Writing and Reading with PyIceberg​

import pyarrow as pa
from pyiceberg.catalog import load_catalog

catalog = load_catalog("default", **{
"type": "sql",
"uri": "sqlite:///iceberg_catalog.db",
"warehouse": "/tmp/iceberg_warehouse",
})

table = catalog.load_table("analytics.events")

# --- Write data ---
data = pa.table({
"event_id": ["e001", "e002", "e003", "e004", "e005"],
"user_id": ["u100", "u200", "u100", "u300", "u200"],
"revenue": [29.99, 49.99, None, 99.99, 19.99],
"event_time": pa.array(
[
"2024-01-15T10:00:00",
"2024-01-15T11:00:00",
"2024-01-16T09:00:00",
"2024-01-16T14:00:00",
"2024-01-17T08:00:00",
],
type=pa.timestamp("us"),
),
})

table.append(data)

snapshot_after_first_write = table.current_snapshot().snapshot_id
print(f"Snapshot after first write: {snapshot_after_first_write}")

# Write more data (creates a new snapshot)
more_data = pa.table({
"event_id": ["e006", "e007"],
"user_id": ["u400", "u100"],
"revenue": [149.99, 9.99],
"event_time": pa.array(
["2024-01-18T12:00:00", "2024-01-18T13:00:00"],
type=pa.timestamp("us"),
),
})
table.append(more_data)

snapshot_after_second_write = table.current_snapshot().snapshot_id
print(f"Snapshot after second write: {snapshot_after_second_write}")


# --- Read: current state ---
current_df = table.scan().to_pandas()
print(f"Current rows: {len(current_df)}")

# --- Read: time travel to first write ---
historical_df = table.scan(
snapshot_id=snapshot_after_first_write
).to_pandas()
print(f"Rows at first snapshot: {len(historical_df)}")

# --- Read: time travel by timestamp ---
import datetime
cutoff = datetime.datetime(2024, 1, 16, 0, 0, 0)
# Get the snapshot ID for a given timestamp
snapshots = list(table.history())
snapshot_before_cutoff = None
for snap in snapshots:
if snap.timestamp_ms < cutoff.timestamp() * 1000:
snapshot_before_cutoff = snap.snapshot_id

if snapshot_before_cutoff:
df_before = table.scan(snapshot_id=snapshot_before_cutoff).to_pandas()
print(f"Rows before cutoff: {len(df_before)}")

Schema Evolution - Live Example​

from pyiceberg.catalog import load_catalog

catalog = load_catalog("default", **{
"type": "sql",
"uri": "sqlite:///iceberg_catalog.db",
"warehouse": "/tmp/iceberg_warehouse",
})

table = catalog.load_table("analytics.events")

# Add a new column - no data rewrite
with table.update_schema() as update:
update.add_column(
path="session_id",
field_type="string",
doc="Browser session identifier",
)

print(f"Schema after add: {table.schema()}")

# Rename a column - still no data rewrite
# Old Parquet files with 'user_id' still work correctly
# because Iceberg maps by column_id, not column name
with table.update_schema() as update:
update.rename_column("user_id", "customer_id")

print(f"Schema after rename: {table.schema()}")

# Read old data - 'customer_id' works even for rows written as 'user_id'
df = table.scan().to_pandas()
print(df[["event_id", "customer_id", "revenue"]].head())

Partition Evolution - Hidden Partitioning​

Iceberg's hidden partitioning is one of its most important features. In Hive, users must manually specify partition predicates in every query: WHERE year=2024 AND month=1 AND day=15. If the partition layout changes (e.g., you switch from daily to hourly partitions), all existing queries break.

In Iceberg, partitioning is hidden from query writers. The partition spec is part of the table metadata - queries filter by the underlying column values (WHERE event_time BETWEEN ...), and Iceberg's query planner automatically translates that into the appropriate partition pruning. If you change the partition spec, old data remains in the old partition layout and new data uses the new layout - Iceberg handles both transparently.

from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import HourTransform, BucketTransform

# Current table partitioned by day(event_time)
# We want to change to hour(event_time) for higher-resolution partitioning

with table.update_spec() as update:
# Remove old partition field
update.remove_field("event_day")
# Add new partition field
update.add_field(
source_column_name="event_time",
transform=HourTransform(),
partition_field_name="event_hour",
)

# Old data: still partitioned by day (unchanged on disk)
# New data: partitioned by hour
# Queries on event_time work seamlessly across both partition layouts
# No data migration required

Row-Level Operations - MERGE, DELETE, UPDATE​

Iceberg supports full row-level DML via two strategies: Copy-on-Write (CoW) and Merge-on-Read (MoR).

In CoW mode (the default for correctness), a DELETE FROM rewrites all affected data files with the deleted rows removed. In MoR mode, a DELETE FROM writes a small delete file that marks the deleted rows - fast to write, slightly more work to read.

# Using Spark with Iceberg (production pattern)
from pyspark.sql import SparkSession

spark = (
SparkSession.builder
.appName("iceberg-dml")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg_spark")
.getOrCreate()
)

# Create table via Spark SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS local.analytics.events (
event_id STRING NOT NULL,
customer_id STRING NOT NULL,
revenue DOUBLE,
event_time TIMESTAMP NOT NULL
)
USING iceberg
PARTITIONED BY (days(event_time))
""")

# Insert data
spark.sql("""
INSERT INTO local.analytics.events VALUES
('e001', 'u100', 29.99, TIMESTAMP '2024-01-15 10:00:00'),
('e002', 'u200', 49.99, TIMESTAMP '2024-01-15 11:00:00'),
('e003', 'u100', NULL, TIMESTAMP '2024-01-16 09:00:00')
""")

# GDPR deletion - point delete by customer_id
spark.sql("""
DELETE FROM local.analytics.events
WHERE customer_id = 'u100'
""")

# MERGE INTO (upsert) - common for SCD Type 1 updates
spark.sql("""
MERGE INTO local.analytics.events AS target
USING (
SELECT 'e002' AS event_id,
'u200' AS customer_id,
59.99 AS revenue,
TIMESTAMP '2024-01-15 11:00:00' AS event_time
) AS source
ON target.event_id = source.event_id
WHEN MATCHED THEN
UPDATE SET target.revenue = source.revenue
WHEN NOT MATCHED THEN
INSERT *
""")

# Time travel query
spark.sql("""
SELECT COUNT(*) as row_count
FROM local.analytics.events
FOR SYSTEM_TIME AS OF '2024-01-16 00:00:00'
""").show()

# Inspect snapshots
spark.sql("""
SELECT snapshot_id, committed_at, operation, summary
FROM local.analytics.events.snapshots
ORDER BY committed_at
""").show(truncate=False)

Catalog Options​

An Iceberg catalog is the service responsible for mapping table names to their metadata.json locations. Choosing the right catalog determines your operability and vendor lock-in.

CatalogBest ForNotes
REST catalog (Polaris, Tabular, Nessie)Multi-engine, vendor-neutralOpen standard; Apache Polaris is the reference implementation
AWS GlueAWS-native deploymentsManaged, integrates with Athena, EMR, Glue ETL
Hive MetastoreExisting Hive/Spark shopsWorks but does not support Iceberg-specific features well
JDBC catalogDev/testUses any JDBC-compatible DB (Postgres, MySQL); simple but limited
Hadoop catalogLocal / dev onlyUses HDFS/S3 directory as catalog; no concurrent write safety

:::tip Production Catalog Choice For new production deployments, use a REST-compatible catalog. Apache Polaris (donated by Snowflake in 2024) is the open-source reference implementation. It provides multi-engine access, namespace management, and access control in one service. For AWS shops, Glue is the pragmatic default - it is managed, available, and supported by every AWS analytics service. :::


Production Notes​

File compaction is mandatory. Streaming ingestion creates tiny files. A table receiving 1,000 records/second with micro-batch writes every 30 seconds creates 2 new Parquet files per minute - 2,880 files per day. After a week that is 20,000 files. File listing and manifest parsing overhead grows linearly. Schedule a daily rewriteDataFiles job to compact small files into 256MB–512MB Parquet files.

from pyiceberg.catalog import load_catalog
from pyiceberg.table.rewrite import RewriteDataFilesAction

catalog = load_catalog("default", **{"type": "rest", "uri": "http://catalog:8181"})
table = catalog.load_table("analytics.events")

# Compact files smaller than 256MB into target size of 512MB
from pyiceberg.io.pyarrow import PyArrowFileIO
result = table.rewrite_data_files(
options={
"rewrite-job-order": "bytes-asc",
"target-file-size-bytes": str(512 * 1024 * 1024),
"min-file-size-bytes": str(32 * 1024 * 1024),
"max-file-size-bytes": str(800 * 1024 * 1024),
}
)
print(f"Rewrote {result.rewritten_data_files_count} files")
print(f"Added {result.added_data_files_count} files")

Expire snapshots to reclaim storage. Every commit creates a new snapshot. Snapshots reference data files - those files cannot be garbage collected until the snapshot is expired. If you need 7 days of time travel, expire snapshots older than 7 days and then run delete_orphan_files to clean up unreferenced data files.

from pyiceberg.table.snapshots import SnapshotSummaryCollector
import datetime

# Expire snapshots older than 7 days
older_than = datetime.datetime.now() - datetime.timedelta(days=7)
table.expire_snapshots().expire_older_than(older_than).commit()

# Delete orphaned files (files not referenced by any snapshot)
# CAUTION: run expire_snapshots first, then wait, then delete orphans
result = table.delete_orphan_files(
older_than=datetime.datetime.now() - datetime.timedelta(hours=1)
)
print(f"Deleted {len(result.orphan_file_locations)} orphaned files")

Common Mistakes​

:::danger Name-Based Column References in Custom Code Never write code that references Iceberg columns by name using string comparisons against schema metadata. Always use column IDs. A column rename will break name-based references. Iceberg's own query engines always use column IDs internally - your custom tooling should too. :::

:::danger Using Hadoop Catalog in Production The Hadoop catalog uses S3 directory writes for catalog operations - it has no concurrency safety for concurrent writers. Two simultaneous commits to the same table can corrupt the catalog. Only use the Hadoop catalog in local development. Use REST catalog, Glue, or JDBC catalog for anything that has concurrent writes. :::

:::warning Not Running Compaction Iceberg tables that receive streaming writes without scheduled compaction jobs degrade over months. We have seen production tables with 4 million small files where a query that should take 30 seconds takes 45 minutes because the query planner has to parse 4 million manifest entries before reading any data. Compaction is not optional - it is maintenance. :::

:::warning MoR vs CoW at the Wrong Scale Merge-on-Read is tempting for high-frequency upsert workloads because writes are fast. But at query time, every read must merge the base files with potentially hundreds of delete/update files. For tables with many small updates that are read frequently, CoW typically outperforms MoR in total system throughput. Benchmark your specific access pattern before committing to a strategy. :::


Iceberg + Spark - Full Production Pattern​

Iceberg's Spark integration is the most complete in the ecosystem. Production Iceberg deployments on Spark use the iceberg-spark-runtime JAR and the IcebergSparkSessionExtensions SQL extensions.

from pyspark.sql import SparkSession

# Configure Spark with Iceberg
spark = (
SparkSession.builder
.appName("iceberg-production")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Configure a REST catalog (production pattern)
.config("spark.sql.catalog.prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.prod.catalog-impl",
"org.apache.iceberg.rest.RESTCatalog")
.config("spark.sql.catalog.prod.uri", "http://iceberg-catalog:8181")
.config("spark.sql.catalog.prod.warehouse", "s3://my-bucket/iceberg/")
# Configure AWS credentials (if on S3)
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.InstanceProfileCredentialsProvider")
.getOrCreate()
)

# Create a namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS prod.analytics")

# Create an Iceberg table with partitioning
spark.sql("""
CREATE TABLE IF NOT EXISTS prod.analytics.orders (
order_id STRING NOT NULL,
customer_id STRING NOT NULL,
product_id STRING NOT NULL,
revenue DOUBLE,
order_status STRING,
order_time TIMESTAMP NOT NULL
)
USING iceberg
PARTITIONED BY (days(order_time), truncate(16, customer_id))
TBLPROPERTIES (
'write.target-file-size-bytes' = '536870912',
'write.delete.mode' = 'copy-on-write',
'write.update.mode' = 'merge-on-read',
'history.expire.max-snapshot-age-ms' = '604800000'
)
""")

# Write data
orders_df = spark.createDataFrame([
("o001", "c100", "p001", 29.99, "completed", "2024-01-15 10:00:00"),
("o002", "c200", "p002", 99.99, "completed", "2024-01-15 11:00:00"),
("o003", "c100", "p003", 49.99, "pending", "2024-01-16 09:00:00"),
], ["order_id", "customer_id", "product_id", "revenue", "order_status", "order_time"])

orders_df = orders_df.withColumn("order_time",
orders_df.order_time.cast("timestamp"))

orders_df.writeTo("prod.analytics.orders").append()

# Read with predicate pushdown - Iceberg prunes partitions automatically
spark.sql("""
SELECT customer_id, SUM(revenue) as total_revenue
FROM prod.analytics.orders
WHERE order_time >= TIMESTAMP '2024-01-15 00:00:00'
AND order_time < TIMESTAMP '2024-01-16 00:00:00'
GROUP BY customer_id
""").show()

# Inspect files and partitions
spark.sql("""
SELECT file_path, record_count, file_size_in_bytes,
lower_bounds, upper_bounds
FROM prod.analytics.orders.files
LIMIT 10
""").show(truncate=False)

# Inspect all snapshots
spark.sql("""
SELECT snapshot_id, committed_at, operation
FROM prod.analytics.orders.snapshots
ORDER BY committed_at DESC
""").show()

Iceberg Maintenance - Compaction and Snapshot Expiry​

Iceberg tables require periodic maintenance. Without it, query performance degrades as files accumulate.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# --- File Compaction ---
# Rewrites many small files into fewer large files
# Schedule this as a nightly cron job on each table

spark.sql("""
CALL prod.system.rewrite_data_files(
table => 'prod.analytics.orders',
options => map(
'target-file-size-bytes', '536870912',
'min-file-size-bytes', '33554432',
'max-file-size-bytes', '805306368'
)
)
""")

# --- Rewrite manifests ---
# After many small commits, the manifest list can become fragmented.
# Compacting manifests reduces the overhead of reading table state.
spark.sql("""
CALL prod.system.rewrite_manifests('prod.analytics.orders')
""")

# --- Expire old snapshots ---
# By default Iceberg retains ALL snapshots.
# Expire snapshots older than 7 days to reclaim storage.
import datetime
expiry_ts = int(
(datetime.datetime.now() - datetime.timedelta(days=7)).timestamp() * 1000
)
spark.sql(f"""
CALL prod.system.expire_snapshots(
table => 'prod.analytics.orders',
older_than => TIMESTAMP '{datetime.datetime.now() - datetime.timedelta(days=7)}',
retain_last => 10
)
""")

# --- Delete orphan files ---
# Files not referenced by any snapshot (from failed writes).
# Run this AFTER expiring snapshots, with at least a 1-hour gap.
spark.sql("""
CALL prod.system.delete_orphan_files(
table => 'prod.analytics.orders',
dry_run => false
)
""")

Recommended maintenance schedule:

JobFrequencyNotes
rewrite_data_filesDaily (off-peak)Compact files created by streaming or high-frequency batch writes
rewrite_manifestsWeeklyReduce manifest fragmentation from many small commits
expire_snapshotsDailyKeep 7–30 days depending on time travel requirements
delete_orphan_filesWeeklyRun 1 hour after expire_snapshots to be safe

Iceberg REST Catalog - The Open Standard​

The Iceberg REST catalog is a community standard HTTP API specification for catalog operations: create namespace, create table, load table, commit transaction, list tables. Any engine that implements the REST catalog client can interact with any REST catalog server.

This matters because it breaks the catalog lock-in problem. Historically, using Hive Metastore as your catalog meant your catalog was tied to Hive. With the REST catalog spec, you can swap catalog implementations without changing your Spark, Trino, or Flink code.

# REST catalog configuration is identical across all engines
# Spark:
spark_config = {
"spark.sql.catalog.my_catalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.my_catalog.catalog-impl": "org.apache.iceberg.rest.RESTCatalog",
"spark.sql.catalog.my_catalog.uri": "http://catalog-service:8181",
"spark.sql.catalog.my_catalog.credential": "client:secret",
}

# PyIceberg (same URI and credential):
from pyiceberg.catalog import load_catalog
catalog = load_catalog("my_catalog", **{
"type": "rest",
"uri": "http://catalog-service:8181",
"credential": "client:secret",
})

# The catalog server can be:
# - Apache Polaris (open source, donated by Snowflake 2024)
# - Project Nessie (version-controlled catalog, git-like branching)
# - Tabular (managed service, founded by Iceberg creators)
# - Snowflake Open Catalog
# - AWS Glue (supports REST catalog protocol as of 2024)
# Switching between them requires only changing the URI.

Project Nessie deserves special mention. Nessie is a REST catalog that implements git-like branching for data: you can create a branch of your catalog, run experimental transformations on that branch, validate the results, and merge back to main - exactly like git branching for code but for data tables. This enables safe experimentation without risk to production tables.


Performance Benchmarks - Iceberg at Scale​

Netflix published performance data on their Iceberg migration (2020):

MetricBefore Iceberg (Hive)After Iceberg
Partition listing for 500K partitions10+ minutesLess than 1 second
Daily partition add latencyMinutes (Metastore bottleneck)Milliseconds
Query planning overhead per querySeconds (directory listing)Sub-second (manifest read)
Schema change riskHigh (name-based, fragile)None (ID-based, correct)
Concurrent write failuresFrequentRare (optimistic concurrency handles most)

Apple has publicly stated they run Iceberg at exabyte scale. LinkedIn, Adobe, and Airbnb have all published case studies showing 10–100x improvements in query planning time after migrating from Hive to Iceberg.

The key insight from these benchmarks is that Iceberg's performance advantage is most pronounced for:

  1. Tables with many partitions (more than 10,000)
  2. Tables with frequent concurrent writes
  3. Tables that require schema changes
  4. Tables where ad-hoc queries need to filter on non-partition columns (data skipping)

For small tables (less than 100GB) with simple access patterns, the performance difference between Iceberg and raw Parquet is negligible. The value of Iceberg for small tables is in correctness (ACID) and operational simplicity (schema evolution, time travel), not raw performance.


Interview Q&A​

Q: Explain Iceberg's three-layer metadata architecture and why it was designed this way.

A: Iceberg has four layers: data files (Parquet/ORC), manifest files (Avro files listing data files + per-file statistics), manifest lists (Avro files listing manifest files, forming a snapshot), and metadata.json (the entry point listing all snapshots and the current schema). The layered design solves Hive's O(n)O(n) partition listing problem. Hive listed every partition directory at query time. Iceberg instead reads the manifest list for the current snapshot, which contains pointers to manifest files, which contain explicit file paths and statistics. A query planner can prune manifest files by partition boundaries and then prune individual data files by column statistics - all without any S3 LIST operations. The overhead is O(manifests read)O(\text{manifests read}) which is far smaller than O(all partition directories)O(\text{all partition directories}).

Q: How does Iceberg implement schema evolution correctly when the underlying Parquet files use column names, not IDs?

A: Iceberg assigns every column a column ID (an integer) at creation time. Column IDs are immutable - they never change regardless of renames, reorders, or additions. The mapping from column ID to column name lives in metadata.json, not in the Parquet files. When Iceberg writes Parquet, it stores the column ID in the Parquet field metadata alongside the column name. When reading, Iceberg resolves columns by ID. So if column 3 was originally named user_id and is later renamed customer_id, old Parquet files still have data for column ID 3 under the old name - Iceberg maps both the old name (for backward compatibility) and the new name to column ID 3. The data is always found correctly.

Q: What is hidden partitioning and why does it matter?

A: In Hive, users must write partition predicates explicitly: WHERE year=2024 AND month=1. If the partition scheme changes, all queries break. Iceberg's hidden partitioning means the partition spec is internal metadata - queries filter by the original column values (WHERE event_time BETWEEN ...) and Iceberg's query planner automatically applies partition pruning. Users never write partition predicates. If the partition spec evolves (e.g., from daily to hourly), old data uses the old layout and new data uses the new layout - both are transparently handled by Iceberg. This is critically important for long-running tables where partition granularity needs to change as data volume grows.

Q: How does Iceberg handle concurrent writes from two Spark jobs simultaneously?

A: Iceberg uses optimistic concurrency. Each writer reads the current metadata.json, plans its changes, writes new data files and manifest files, then attempts to atomically commit a new metadata.json by replacing the current version. The atomic swap uses the object store's conditional PUT (S3: If-None-Match, GCS: x-goog-if-generation-match) or a catalog-level lock. If two writers commit simultaneously, one will succeed and the other will find that metadata.json changed since it read it. The loser retries from the beginning - re-reads current state, re-plans, re-commits. For append-only operations, conflicts are always retryable. For updates that touch the same rows, the conflict is a genuine data integrity issue and may fail permanently with a conflict error.

Q: Compare Iceberg's time travel implementation to Delta Lake's. What are the differences?

A: Both support time travel, but through different mechanisms. Iceberg stores every snapshot in metadata.json with a timestamp. Time travel queries (FOR SYSTEM_TIME AS OF timestamp) find the snapshot whose commit timestamp is closest to (but before) the specified time, then read from that snapshot's manifest list. Delta Lake stores every commit as a JSON file in _delta_log/ numbered sequentially. Time travel in Delta reads the log up to the specified version number or timestamp. Iceberg's approach is more efficient for large tables because manifest files are shared between snapshots (only changed manifests are new) - you can have thousands of snapshots without proportionally large storage overhead. Delta's log is append-only and linear; reading a Delta table at version 1000 means replaying 1000 JSON commit files (or a recent checkpoint). Both provide equivalent user-facing functionality.

Q: A GDPR erasure request comes in for a single user who has records across 500 partitions in a 10TB Iceberg table. Walk through what happens when you run DELETE FROM events WHERE customer_id = 'abc123' in Merge-on-Read mode.

A: In MoR mode, Iceberg does not rewrite any Parquet files. Instead, it writes equality-delete files - small Avro files that record the predicate customer_id = 'abc123'. These delete files are associated with the affected partitions in a new manifest. Iceberg commits a new snapshot that adds these delete files and removes no existing data files. The physical deletion has not happened - the records still exist in the Parquet files, but they are logically deleted because every read merges the delete files and excludes matching rows. Physical cleanup happens when compaction runs: rewriteDataFiles rewrites affected Parquet files with the deleted rows excluded. For GDPR compliance, this is technically sufficient (the data is not returned by any query) but some interpretations require physical deletion - in that case, CoW mode or explicit compaction after deletion is required.


Iceberg Table Properties Reference​

Iceberg table behavior is controlled through table properties set in TBLPROPERTIES. Knowing the important ones helps you tune performance and correctness.

CREATE TABLE prod.analytics.events (
event_id STRING NOT NULL,
customer_id STRING,
revenue DOUBLE,
event_time TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(event_time))
TBLPROPERTIES (
-- File size targets
'write.target-file-size-bytes' = '536870912', -- 512MB target
'write.delete.target-file-size-bytes' = '67108864', -- 64MB for delete files

-- CoW vs MoR per operation type
'write.delete.mode' = 'copy-on-write', -- DELETE rewrites files (safe default)
'write.update.mode' = 'merge-on-read', -- UPDATE writes delta files (fast writes)
'write.merge.mode' = 'merge-on-read', -- MERGE writes delta files

-- Snapshot retention
'history.expire.max-snapshot-age-ms' = '604800000', -- 7 days in ms
'history.expire.min-snapshots-to-keep' = '5',

-- Format version (V2 required for row-level deletes)
'format-version' = '2',

-- Metadata compression
'write.metadata.compression-codec' = 'gzip',

-- Statistics
'write.metadata.metrics.default' = 'full', -- Collect full stats
'write.metadata.metrics.column.revenue' = 'full' -- Full stats for revenue column
)
PropertyDefaultWhen to Change
write.target-file-size-bytes512MBReduce for streaming (128MB), increase for batch-heavy workloads (1GB)
write.delete.modecopy-on-writeSwitch to merge-on-read for high-frequency GDPR deletion workflows
history.expire.max-snapshot-age-ms5 daysIncrease to 30–90 days for ML reproducibility requirements
format-version1Always set to 2 for new tables - required for row-level deletes
write.metadata.metrics.defaulttruncate(16)Set to full for high-cardinality columns used in WHERE clauses

Iceberg with AWS Glue Catalog - Production Setup​

For AWS-native deployments, AWS Glue is the practical catalog choice. It is managed, integrates natively with Athena and EMR, and has no operational overhead.

# Configure Spark to use Glue as the Iceberg catalog
spark = (
SparkSession.builder
.appName("iceberg-glue")
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Glue catalog configuration
.config("spark.sql.catalog.glue_catalog",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.glue_catalog.warehouse",
"s3://my-bucket/iceberg/")
.config("spark.sql.catalog.glue_catalog.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.glue_catalog.lock-impl",
"org.apache.iceberg.aws.glue.DynamoLockManager")
.config("spark.sql.catalog.glue_catalog.lock.table",
"iceberg_locks")
.getOrCreate()
)

# Create table - visible in Glue Data Catalog and queryable from Athena
spark.sql("""
CREATE TABLE IF NOT EXISTS glue_catalog.analytics.events (
event_id STRING NOT NULL,
customer_id STRING,
revenue DOUBLE,
event_time TIMESTAMP NOT NULL
)
USING iceberg
PARTITIONED BY (days(event_time))
LOCATION 's3://my-bucket/iceberg/analytics/events/'
TBLPROPERTIES ('format-version' = '2')
""")

# This same table is now queryable from:
# - Amazon Athena (SELECT * FROM analytics.events WHERE ...)
# - Amazon EMR / Spark jobs
# - Trino/Presto with the Glue catalog connector
# - Any tool that implements the Glue catalog API

The DynamoDB lock manager (DynamoLockManager) is critical for production Glue + Iceberg deployments. Without it, concurrent Spark jobs writing to the same Iceberg table can corrupt the metadata by attempting to update the Glue table metadata entry simultaneously. The DynamoDB lock table provides distributed mutual exclusion for metadata commits.


Iceberg Branch and Tag Support - Safe Experimentation​

Iceberg V2 introduced the concept of branches and tags - named references to snapshots. Branches are mutable references (they advance as new commits are made). Tags are immutable references (pinned to a specific snapshot forever).

This enables git-like workflows for data:

# Using Spark with Iceberg branches and tags
spark = SparkSession.builder.appName("iceberg-branches").getOrCreate()

# Create a branch for a risky schema migration
spark.sql("""
ALTER TABLE prod.analytics.orders
CREATE BRANCH migration_experiment
RETAIN 7 DAYS
""")

# Write experimental data to the branch (does not affect main/default)
spark.sql("""
INSERT INTO prod.analytics.orders
BRANCH migration_experiment
VALUES ('o999', 'c100', 'p001', 1234.56, 'experimental', TIMESTAMP '2024-06-01 00:00:00')
""")

# Read from the branch
spark.sql("""
SELECT * FROM prod.analytics.orders
FOR SYSTEM_VERSION AS OF 'migration_experiment'
""").show()

# Validate results on the branch
migration_count = spark.sql("""
SELECT COUNT(*) FROM prod.analytics.orders
FOR SYSTEM_VERSION AS OF 'migration_experiment'
""").collect()[0][0]

print(f"Branch row count: {migration_count}")

# If satisfied, fast-forward main to the branch
# (In Iceberg this is done via the catalog API)

# Create a tag to mark the state used for model training
# (immutable - this snapshot will never be garbage-collected until the tag is dropped)
spark.sql("""
ALTER TABLE prod.analytics.orders
CREATE TAG model_training_2024_q1
AS OF VERSION 142
RETAIN 365 DAYS
""")

# Read exactly the data used for model training (6 months later)
spark.sql("""
SELECT * FROM prod.analytics.orders
FOR SYSTEM_VERSION AS OF 'model_training_2024_q1'
""").show()

Tags solve one of the most painful ML reproducibility problems: time travel by version number works until expire_snapshots deletes the snapshot. A tag explicitly protects a snapshot from expiry - it cannot be deleted until the tag itself is dropped. Use tags to permanently mark the exact dataset state used for each model training run.


Summary​

Apache Iceberg is an open table format specification - not a storage engine - that adds ACID transactions, schema evolution, partition evolution, time travel, and row-level deletes to Parquet/ORC/Avro files stored in object storage. Its three-layer metadata architecture (manifest files → manifest lists → metadata.json) solves the scaling problems that made Hive unusable at petabyte scale: partition listing is now O(1)O(1), schema changes are correct by design through column IDs, and concurrent writes use optimistic concurrency with snapshot isolation. Iceberg is the format of choice for multi-engine deployments and is now supported natively by Snowflake, BigQuery, Athena, Trino, Spark, Flink, and DuckDB - making it the closest thing the lakehouse ecosystem has to a universal open standard.

© 2026 EngineersOfAI. All rights reserved.