:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::
Apache Hudi
The 6-Hour Batch Job That Broke Every Morning
The on-call data engineer at a ride-sharing company knew the drill: wake up at 5 AM, check the Airflow dashboard, pray the nightly update job finished before the BI team arrived at 9. Most days it didn't. The job was responsible for updating trip records - 50 million of them - with final fare adjustments, driver rating changes, dispute resolutions, and status corrections that trickled in throughout the day. Every night, a Spark job would scan the raw S3 data, identify which Parquet partitions had changed records, and rewrite those entire partitions from scratch.
The problem was thermodynamic in nature. A single fare dispute touching one row in a partition of 10 million records meant rewriting all 10 million. A regulatory correction affecting 500,000 records spread across 200 partitions meant rewriting 200 full Parquet files - gigabytes of data moving through S3 for the sake of 500K changed rows. The job took 6 hours on a good night. On nights with high dispute volumes, it ran into the morning BI window and caused read errors while partitions were mid-rewrite.
The engineering team evaluated three options: Snowflake (too expensive at their volume), rebuilding the pipeline around a custom merge logic (too complex to maintain), or adopting Apache Hudi. They chose Hudi, specifically its Merge-On-Read storage type, and deployed it on their existing EMR cluster. The migration took three weeks. The nightly update job became a near-real-time streaming pipeline. All 50 million daily updates processed in 12 minutes. The 5 AM wake-ups stopped.
This is what Hudi was built for - not as a general-purpose lakehouse format, but as a purpose-built solution to the hardest problem in data engineering at scale: updating records that already exist in your data lake without the cost of full partition rewrites.
Why This Exists: The Immutability Tax
Object storage - S3, GCS, ADLS - is architecturally immutable at the file level. You cannot partially update a Parquet file the way you update a row in PostgreSQL. This is a feature: immutability enables concurrent reads, cheap replication, and simple disaster recovery. But it creates a painful mismatch with operational reality.
Most real-world data is not append-only:
- E-commerce orders change status from
pending→processing→shipped→delivered→ sometimesreturned - Financial transactions get corrected, reversed, or annotated days after they occur
- User profiles update continuously - preferences, addresses, consent flags
- IoT sensor readings get backfilled corrections when calibration errors are discovered
- CDC streams from operational databases emit UPDATE and DELETE events, not just INSERTs
Before table formats like Hudi existed, engineering teams handled updates with one of three patterns, all of which were painful:
- Full table overwrite: Read everything, apply changes, write everything back. Simple but catastrophically slow at scale.
- Partition overwrite: Identify changed partitions, rewrite only those. Better, but still wastes I/O on unchanged rows within a partition, and the "identify changed partitions" step requires expensive scans.
- Upsert staging tables: Write changes to a delta table, merge at query time using SQL
MERGE. Readable but creates unbounded query complexity as deltas accumulate.
Hudi's insight was to bring the write-ahead log model from database engineering - where changes are first written to an append-only log and only periodically compacted into the main storage - into the object storage world. This is the core architectural idea that makes Hudi fundamentally different from simply "better Parquet."
Historical Context: From Uber's Hadoop Cluster to the Open Lakehouse
Apache Hudi was created at Uber in 2016. The engineering team, led by Vinoth Chandar, was grappling with a version of the exact problem described above: Uber's data lake contained billions of trip records, driver records, and payment records that required continuous correction and update. The Hadoop ecosystem of 2016 offered HBase (too complex, required ZooKeeper) or full HDFS partition rewrites (too slow).
Vinoth's team built Hudi - originally standing for Hadoop Upserts Deletes and Incrementals - as an internal library. The name was deliberately operational: it named the four things it did, not the abstract concept it embodied. Hudi was open-sourced in 2019 and accepted into the Apache Incubator. It graduated to a Top-Level Apache Project in 2020.
The original design was Hadoop-centric (HDFS, MapReduce, Hive metastore), but as the industry shifted to cloud object storage, Hudi evolved to support S3, GCS, and ADLS as first-class storage backends. Today Hudi works seamlessly with Spark, Flink, Presto, Trino, Hive, and the major cloud query services.
What makes Hudi's trajectory interesting is that it emerged from a specific operational pain point - not from a research lab or a clean-slate architectural vision. This origins story explains why Hudi's API feels more operational than Iceberg's (which was designed from scratch for correctness) and why Hudi's documentation emphasizes ingestion patterns over query optimization.
Core Concept: The Hudi Data Model
The Timeline
Every Hudi table maintains a timeline - a totally ordered log of all actions ever performed on the table. The timeline is stored in the .hoodie/ directory alongside the data files. Each entry on the timeline is called an instant and has three attributes:
- Instant time: A monotonically increasing timestamp (milliseconds since epoch)
- Action:
commit,deltacommit,compaction,clean,rollback, orsavepoint - State:
requested,inflight, orcompleted
The three-state model prevents partial writes from being visible to readers. An instant transitions from requested → inflight → completed. Readers only see completed instants. If a writer crashes mid-write, the instant remains in inflight state and is invisible - equivalent to an automatic transaction rollback.
This timeline is what enables incremental queries: instead of asking "give me all the data," you ask "give me all changes since instant time T." The timeline makes this efficient because it's an ordered index of mutations.
The Two Storage Types
Hudi offers two fundamentally different storage strategies, each optimized for a different workload:
Copy-On-Write (COW)
In COW mode, every write operation - whether insert or update - rewrites the affected base Parquet files. The write is more expensive (more I/O), but reads are cheap because every file is a clean, complete Parquet file with no merging required.
When to use COW:
- Data is mostly append-only with occasional corrections
- Read throughput is the primary concern (BI dashboards, reporting)
- The update rate is low relative to the total data volume
- Downstream readers include tools with limited Hudi awareness (they can read the base Parquet directly)
Merge-On-Read (MOR)
In MOR mode, updates are written to delta log files (Avro-encoded) rather than rewriting the base Parquet files. A background compaction process periodically merges the delta logs into the base files. Reads require merging the base file with any outstanding delta logs, which adds read latency.
When to use MOR:
- High-frequency updates (CDC streams, event sourcing)
- Write throughput is the primary concern
- Latency from write to query visibility must be minimal
- Compaction can run in the background without blocking writes
The key insight is that MOR trades read cost for write cost. Instead of paying to rewrite a 1 GB Parquet file because 1,000 rows changed, you append a 50 KB delta log. Reads become a merge operation, but for workloads where most rows are not updated frequently, the base file dominates and the merge is cheap.
Three Query Types
Hudi exposes three distinct query surfaces on the same physical table:
| Query Type | What It Reads | Use Case |
|---|---|---|
| Snapshot | Latest merged state of all data | Standard analytical queries |
| Incremental | Only records changed since a checkpoint | CDC, event-driven pipelines |
| Read-Optimized (MOR only) | Base Parquet files only, no delta merge | Maximum read speed, accepts slight staleness |
The incremental query type is what makes Hudi a first-class citizen for streaming architectures. Instead of polling a source and re-processing everything, downstream consumers can checkpoint their last-seen instant time and request only the delta.
Hudi's Architecture in Detail
Code: Writing and Querying a Hudi Table with PySpark
Setup
# requirements: apache-hudi-spark3.3-bundle_2.12-0.14.0.jar on the classpath
# pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp
spark = (
SparkSession.builder
.appName("HudiDemo")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate()
)
Writing a Hudi Table (COW)
from datetime import datetime
# Sample trip data - ride-sharing scenario
trip_data = [
(1001, "driver_001", "completed", 18.50, 4.8, "2024-01-15"),
(1002, "driver_002", "completed", 32.00, 4.5, "2024-01-15"),
(1003, "driver_001", "completed", 11.25, 5.0, "2024-01-15"),
(1004, "driver_003", "disputed", 45.00, 1.0, "2024-01-15"),
(1005, "driver_002", "completed", 22.75, 4.9, "2024-01-15"),
]
columns = ["trip_id", "driver_id", "status", "fare", "rating", "trip_date"]
trips_df = spark.createDataFrame(trip_data, columns)
# Hudi write configuration - COW mode
hudi_options_cow = {
"hoodie.table.name": "trips_cow",
"hoodie.datasource.write.recordkey.field": "trip_id", # unique record key
"hoodie.datasource.write.partitionpath.field": "trip_date", # partition column
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
"hoodie.datasource.write.operation": "upsert", # insert or update
"hoodie.datasource.write.precombine.field": "fare", # tie-breaking field
"hoodie.upsert.shuffle.parallelism": 2,
"hoodie.insert.shuffle.parallelism": 2,
}
table_path = "s3://my-lakehouse/hudi/trips_cow"
trips_df.write.format("hudi") \
.options(**hudi_options_cow) \
.mode("append") \
.save(table_path)
print("Initial write complete - 5 trips written")
Performing Upserts
# Simulate fare adjustments and a dispute resolution
update_data = [
# trip 1004: dispute resolved, fare corrected, status updated
(1004, "driver_003", "completed", 38.00, 4.2, "2024-01-15"),
# trip 1002: rating correction
(1002, "driver_002", "completed", 32.00, 4.7, "2024-01-15"),
# New trip arriving late
(1006, "driver_001", "completed", 15.50, 4.6, "2024-01-15"),
]
updates_df = spark.createDataFrame(update_data, columns)
# Upsert - Hudi will merge by trip_id
updates_df.write.format("hudi") \
.options(**hudi_options_cow) \
.mode("append") \ # Always use append for upserts - Hudi handles the merge
.save(table_path)
print("Upsert complete - 2 updated, 1 inserted")
Reading the Snapshot
# Read the latest state - snapshot query (default)
trips_snapshot = spark.read.format("hudi").load(table_path)
trips_snapshot.select(
"trip_id", "driver_id", "status", "fare", "rating"
).orderBy("trip_id").show()
# Output:
# +-------+-----------+---------+-----+------+
# |trip_id| driver_id| status| fare|rating|
# +-------+-----------+---------+-----+------+
# | 1001|driver_001 |completed|18.50| 4.8|
# | 1002|driver_002 |completed|32.00| 4.7| ← rating updated from 4.5 to 4.7
# | 1003|driver_001 |completed|11.25| 5.0|
# | 1004|driver_003 |completed|38.00| 4.2| ← dispute resolved, fare corrected
# | 1005|driver_002 |completed|22.75| 4.9|
# | 1006|driver_001 |completed|15.50| 4.6| ← new record
# +-------+-----------+---------+-----+------+
Incremental Query - The Killer Feature
# Get the begin time - the timestamp of the first commit
# In practice, you'd store this checkpoint in your pipeline state
begin_time = "20240115120000000" # millisecond timestamp
incremental_options = {
"hoodie.datasource.query.type": "incremental",
"hoodie.datasource.read.begin.instanttime": begin_time,
}
incremental_df = (
spark.read.format("hudi")
.options(**incremental_options)
.load(table_path)
)
# Only records changed since begin_time are returned
incremental_df.select(
col("trip_id"),
col("status"),
col("fare"),
col("_hoodie_commit_time").alias("changed_at")
).show()
# This is the foundation of CDC pipelines:
# downstream consumers receive only the delta, not the full table
Writing in MOR Mode
# MOR configuration - optimized for high-frequency updates
hudi_options_mor = {
"hoodie.table.name": "trips_mor",
"hoodie.datasource.write.recordkey.field": "trip_id",
"hoodie.datasource.write.partitionpath.field": "trip_date",
"hoodie.datasource.write.table.type": "MERGE_ON_READ", # Key difference
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "fare",
# Compaction strategy: trigger after N deltacommits
"hoodie.compact.inline": "false", # async compaction
"hoodie.compact.inline.max.delta.commits": "5",
"hoodie.upsert.shuffle.parallelism": 4,
}
table_path_mor = "s3://my-lakehouse/hudi/trips_mor"
trips_df.write.format("hudi") \
.options(**hudi_options_mor) \
.mode("append") \
.save(table_path_mor)
Read-Optimized Query on MOR
# Read only base Parquet files - maximum speed, accepts delta lag
read_optimized_options = {
"hoodie.datasource.query.type": "read_optimized",
}
fast_df = (
spark.read.format("hudi")
.options(**read_optimized_options)
.load(table_path_mor)
)
# This query will NOT reflect updates written to delta logs since last compaction
# Trade-off: speed vs. freshness
fast_df.count()
Hudi + Flink: Real-Time Ingestion Pipeline
For sub-minute latency from source to lakehouse, Hudi integrates with Apache Flink to write MOR tables in streaming mode. Flink's checkpoint mechanism aligns with Hudi's commit model - each Flink checkpoint triggers a Hudi deltacommit, making every successful checkpoint a queryable snapshot.
# Flink SQL - real-time Kafka to Hudi MOR pipeline
# This runs in a Flink SQL session
flink_sql = """
CREATE TABLE kafka_trips (
trip_id BIGINT,
driver_id STRING,
status STRING,
fare DOUBLE,
rating DOUBLE,
trip_date STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'trip-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE hudi_trips_sink (
trip_id BIGINT,
driver_id STRING,
status STRING,
fare DOUBLE,
rating DOUBLE,
trip_date STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (trip_id) NOT ENFORCED
) PARTITIONED BY (trip_date)
WITH (
'connector' = 'hudi',
'path' = 's3://my-lakehouse/hudi/flink-trips',
'table.type' = 'MERGE_ON_READ',
'write.operation' = 'upsert',
'write.precombine.field' = 'event_time',
'compaction.async.enabled' = 'true',
'compaction.schedule.enabled' = 'true'
);
INSERT INTO hudi_trips_sink
SELECT trip_id, driver_id, status, fare, rating, trip_date, event_time
FROM kafka_trips;
"""
CDC Pipelines with Hudi and Debezium
One of the most powerful production patterns is using Hudi as the target of a Change Data Capture pipeline. Debezium captures row-level changes from PostgreSQL (or MySQL) and emits them as structured events. Hudi consumes these events and applies them as upserts or deletes to the lakehouse copy.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, from_json, schema_of_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
spark = SparkSession.builder.appName("CDC-Hudi").getOrCreate()
# Debezium CDC event schema (simplified)
cdc_schema = StructType([
StructField("op", StringType()), # 'c'=create, 'u'=update, 'd'=delete
StructField("after", StructType([ # row state after the operation
StructField("trip_id", LongType()),
StructField("driver_id", StringType()),
StructField("status", StringType()),
StructField("fare", DoubleType()),
StructField("rating", DoubleType()),
StructField("trip_date", StringType()),
])),
StructField("ts_ms", LongType()), # change timestamp
])
# Read from Kafka (Debezium CDC topic)
cdc_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "postgres.public.trips")
.load()
.select(from_json(col("value").cast("string"), cdc_schema).alias("cdc"))
.select("cdc.*")
)
# Filter deletes separately - Hudi handles them via a special marker
upserts = cdc_stream.filter(col("op").isin("c", "u")).select("after.*", "ts_ms")
deletes = cdc_stream.filter(col("op") == "d").select("after.trip_id", "ts_ms")
# Hudi upsert options
hudi_opts = {
"hoodie.table.name": "trips_cdc",
"hoodie.datasource.write.recordkey.field": "trip_id",
"hoodie.datasource.write.partitionpath.field": "trip_date",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "ts_ms",
}
def write_batch(batch_df, batch_id):
if batch_df.count() > 0:
batch_df.write.format("hudi") \
.options(**hudi_opts) \
.mode("append") \
.save("s3://my-lakehouse/hudi/trips_cdc")
# Write upserts in micro-batches
query = (
upserts.writeStream
.foreachBatch(write_batch)
.option("checkpointLocation", "s3://my-lakehouse/checkpoints/trips_cdc")
.trigger(processingTime="30 seconds")
.start()
)
query.awaitTermination()
Compaction: Keeping MOR Tables Healthy
In MOR mode, delta log files accumulate over time. Without compaction, reads become progressively more expensive as they must merge an increasing number of delta logs against the base file. Compaction is the background process that merges delta logs into new base Parquet files, resetting the merge overhead.
Compaction Strategies
Inline Compaction: Runs synchronously during the write job. Simplest to configure but adds write latency.
# Inline compaction - compacts every 5 deltacommits
hudi_opts_inline_compact = {
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": "5", # compact every 5 writes
"hoodie.parquet.max.file.size": "128000000", # target 128 MB Parquet files
}
Async Compaction: A separate Spark job triggered by a scheduler (Airflow, Databricks Jobs). Decouples write latency from compaction cost.
# Step 1: Schedule compaction (writes a compaction plan to the timeline)
from pyspark.sql import SparkSession
def schedule_compaction(table_path: str) -> None:
spark = SparkSession.builder.appName("HudiCompactionScheduler").getOrCreate()
spark.sql(f"""
CALL schedule_compaction(
table => '{table_path}',
scheduleEvenIfLastRunNotComplete => true
)
""")
# Step 2: Execute the scheduled compaction plan
def run_compaction(table_path: str) -> None:
spark = SparkSession.builder.appName("HudiCompactionExecutor").getOrCreate()
spark.sql(f"CALL run_compaction(op => 'run', table => '{table_path}')")
# Airflow DAG calls these two functions on separate schedules:
# - schedule_compaction: runs with every write batch
# - run_compaction: runs every 30 minutes
Clustering: A separate optimization that reorganizes data files for better query performance (similar to Z-ordering in Delta Lake). Useful after compaction when read patterns are known.
hudi_clustering_opts = {
"hoodie.clustering.inline": "false",
"hoodie.clustering.async.enabled": "true",
"hoodie.clustering.plan.strategy.class":
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824", # 1 GB
"hoodie.clustering.plan.strategy.small.file.limit": "629145600", # 600 MB
# Sort clustered files by these columns for better predicate pushdown
"hoodie.clustering.plan.strategy.sort.columns": "driver_id,trip_date",
}
Hudi vs. Delta Lake vs. Apache Iceberg
All three are open table formats that solve the same high-level problem. The right choice depends on your specific constraints:
| Criterion | Hudi | Delta Lake | Apache Iceberg |
|---|---|---|---|
| Primary strength | High-frequency record updates | Spark-native DML, ACID | Interoperability, spec correctness |
| Update model | COW or MOR (your choice) | COW by default | COW default, MOR via Merge-on-Read extension |
| Incremental queries | First-class (built-in) | Via CDC tracking | Via append-only snapshots |
| Streaming writes | Flink-native, Spark Structured Streaming | Spark Structured Streaming | Flink, Spark |
| Query engine support | Spark, Flink, Hive, Presto, Trino | Spark (best), Trino, Presto | Spark, Flink, Trino, Dremio, Athena |
| Compaction | Required for MOR | Automatic OPTIMIZE | OPTIMIZE (Spark only) |
| Metadata layer | Timeline in .hoodie/ | Transaction log in _delta_log/ | Manifest tree in metadata dir |
| Best ecosystem fit | AWS (EMR), GCP (Dataproc), standalone Spark | Databricks, Azure Synapse | Multi-cloud, multi-engine shops |
| Record-level deletes | Native, efficient | Native | Native |
| Schema evolution | Supported, with caveats on type widening | Strong (schema enforcement + evolution) | Strongest spec (type promotions, struct renames) |
Decision guide:
- Choose Hudi when: you have high-frequency updates (CDC, event correction), need sub-minute write-to-read latency, or are building on AWS/GCP without a managed Spark service.
- Choose Delta Lake when: your team lives in Databricks, your ETL is heavily Spark-based, or you want the simplest operational experience with managed
OPTIMIZE. - Choose Iceberg when: you need multiple query engines to read the same table, you care about spec stability and long-term format compatibility, or you're on a multi-cloud architecture.
Production Engineering Notes
File Sizing
One of the most common Hudi performance problems is too-small Parquet files - created when high insert rates produce many small files before compaction runs. Target 128–256 MB per file. Configure:
prod_sizing_opts = {
"hoodie.parquet.max.file.size": "268435456", # 256 MB max Parquet file
"hoodie.parquet.small.file.limit": "104857600", # files < 100 MB are "small"
"hoodie.copyonwrite.insert.split.size": "500000", # rows per insert split
}
Partition Design
Hudi inherits Spark's partition model: partition paths are directory names. Poorly designed partitions cause the same problems here as in raw Parquet:
- Over-partitioning:
partition_by_user_idwith 10M users → 10M directories → metadata explosion - Under-partitioning:
partition_by_year→ 1 directory with 500 GB → no partition pruning benefit - Ideal: partition by a column that appears in most query filters and has cardinality between 100 and 100,000 values. Date (
YYYY-MM-DD) is the most common choice.
Monitoring
Track these metrics in production:
# Hudi provides a metrics reporter interface
metrics_opts = {
"hoodie.metrics.on": "true",
"hoodie.metrics.reporter.type": "PROMETHEUS", # or GRAPHITE, DATADOG
"hoodie.metrics.prometheus.port": "9090",
}
# Key metrics to alert on:
# - hoodie_commit_duration_in_ms: write latency per commit
# - hoodie_compaction_duration_in_ms: compaction latency
# - hoodie_delta_log_file_size_sum: total delta log accumulation (trigger compaction if high)
# - hoodie_num_write_errors: failed write operations
:::danger Hudi MOR Without Compaction is a Time Bomb
A MOR table that never compacts will degrade read performance indefinitely. Each read must merge ALL outstanding delta logs against the base file. In a high-volume table with daily writes and no compaction, reads can become 10–100x slower within weeks. Always configure async compaction and monitor hoodie_delta_log_file_size_sum. Set an alert threshold and treat it as a production incident if compaction falls behind.
:::
:::warning The precombine.field Must Be Monotonically Increasing
The hoodie.datasource.write.precombine.field is used to resolve conflicts when two records share the same recordkey in the same batch. Hudi keeps the record with the highest precombine value. If you use a non-monotonic field (like a name or category), conflict resolution becomes non-deterministic. Always use a timestamp, version number, or sequence ID as the precombine field.
:::
:::tip Use bulk_insert for Initial Data Loading
For the initial load of a large existing dataset into a Hudi table, use hoodie.datasource.write.operation = bulk_insert instead of upsert. Bulk insert bypasses the expensive index lookup (Hudi doesn't need to check for existing records when the table is empty) and sorts data for optimal file layout. Switch to upsert for all subsequent incremental loads.
:::
:::note Hudi Metadata Table
Hudi 0.10+ ships with an internal metadata table (hoodie.metadata.enable=true) that indexes file listings and column statistics, dramatically improving query performance by avoiding expensive S3 LIST calls. Enable it in production - the cold start cost is worth the ongoing benefit, especially on tables with thousands of partitions.
:::
Interview Questions and Answers
Q1: What is the difference between Hudi's Copy-On-Write and Merge-On-Read storage types? When would you choose each?
Answer:
Copy-On-Write (COW) rewrites entire base Parquet files on every update. When you upsert 1,000 records into a partition containing 10 million records, Hudi reads the entire partition, merges the updates, and writes a new Parquet file. Reads are fast - every base file is complete and clean. Writes are expensive.
Merge-On-Read (MOR) appends updates to Avro-format delta log files without touching the base Parquet files. Reads must merge the base file with any outstanding delta logs at query time. Writes are cheap. Reads incur merge overhead, which grows until compaction runs.
Choose COW when:
- Update rate is low (less than 10–20% of records per partition per day)
- Read throughput is the bottleneck (BI dashboards, reporting tools)
- Downstream tools lack Hudi awareness and read base Parquet directly
- Simpler operability is preferred (no compaction scheduling required)
Choose MOR when:
- Update rate is high (CDC streams, ride-sharing status updates, IoT corrections)
- Write latency must be minimized
- You can afford async compaction infrastructure
- You need the incremental query type with very low latency between write and visibility
A useful mental model: COW is like an immutable database where every write creates a new version of the entire table; MOR is like a database with a write-ahead log that gets periodically checkpointed.
Q2: Explain Hudi's timeline and why it matters for data pipeline reliability.
Answer:
The Hudi timeline is a totally ordered, append-only log of every action performed on a table, stored in the .hoodie/ directory. Each entry (called an instant) records an action type (commit, deltacommit, compaction, rollback), a timestamp, and a state (requested, inflight, completed).
The three-state model is what makes Hudi ACID-compliant on object storage: a write job first writes requested, then transitions to inflight as data files are written, then atomically transitions to completed only if all writes succeeded. If the writer crashes mid-operation, the instant remains inflight and is invisible to readers - there is no partial write visible to downstream consumers.
For pipeline reliability, the timeline enables:
- Automatic rollback: Hudi can roll back any
inflightinstant on startup, cleaning up partial writes from crashed jobs. - Incremental processing: Downstream consumers checkpoint the last-seen instant time and request only changes since that point, avoiding full re-scans.
- Point-in-time queries: You can query the table as it existed at any historical instant.
- Audit trails: The timeline provides a complete history of every mutation, useful for compliance and debugging.
Q3: How does Hudi handle GDPR deletion (right to erasure) requests?
Answer:
Hudi supports hard deletes through the delete write operation. The workflow:
# Step 1: Create a DataFrame containing only the record keys to delete
delete_keys = spark.createDataFrame(
[(user_id_to_delete, partition_value)],
["user_id", "event_date"]
)
# Step 2: Write with operation=delete
delete_opts = {
"hoodie.table.name": "user_events",
"hoodie.datasource.write.recordkey.field": "user_id",
"hoodie.datasource.write.partitionpath.field": "event_date",
"hoodie.datasource.write.operation": "delete",
}
delete_keys.write.format("hudi") \
.options(**delete_opts) \
.mode("append") \
.save(table_path)
For hard deletion (physical removal of data, required for GDPR), you must also run clean to remove older file versions containing the deleted records:
spark.sql("CALL run_clean(table => 'user_events', retainCommits => 1)")
The challenge is time travel: even after deletion, older snapshots contain the deleted record. To fully comply with GDPR, you must set hoodie.cleaner.commits.retained = 1 and run clean aggressively, or accept that point-in-time queries before the deletion will still return the record. This is a fundamental tension between time-travel capabilities and right-to-erasure requirements that exists in all table formats, not just Hudi.
Q4: When would you choose Hudi over Apache Iceberg? What are the trade-offs?
Answer:
Choose Hudi over Iceberg when:
-
You have high-frequency record updates from CDC or event streams. Hudi's MOR storage type is purpose-built for this. Iceberg supports MERGE operations but lacks MOR's write-optimized append-and-merge-later model.
-
You need sub-minute write-to-read latency with streaming writes. Flink + Hudi MOR is a mature, production-proven stack for this. Iceberg's streaming support is improving but Hudi has more operational history here.
-
You're building a CDC pipeline and need incremental query semantics. Hudi's incremental query type is a first-class API feature. With Iceberg, you'd build incremental processing on top of snapshot metadata, which is possible but more manual.
Choose Iceberg over Hudi when:
-
You need multiple query engines to read the same table natively. Iceberg has the broadest engine support (Spark, Flink, Trino, Dremio, BigQuery, Snowflake, Athena). Hudi's engine support is narrower, especially for non-Spark engines.
-
Schema evolution correctness is critical. Iceberg's schema evolution spec handles type promotions, struct field renames, and nested type changes more robustly than Hudi.
-
You're in a multi-cloud or vendor-neutral architecture. Iceberg is the most vendor-neutral format. Hudi has stronger AWS ties (Amazon EMR ships Hudi as a first-class citizen).
The key trade-off in one sentence: Hudi optimizes for the writer; Iceberg optimizes for the reader ecosystem.
Q5: What is Hudi compaction and why is it essential for MOR tables?
Answer:
Compaction is the background process that merges delta log files (Avro-encoded change records) into base Parquet files in a Merge-On-Read table. Without compaction, every read must merge the base file with all accumulated delta logs - and as more deltas accumulate, read latency grows linearly.
Compaction proceeds in two steps:
-
Planning: Hudi scans the table, identifies file groups with many outstanding delta logs, and writes a compaction plan to the timeline as a
requestedinstant. The plan specifies which base files and delta logs to merge. -
Execution: A Spark job reads each base file + its delta logs, merges them into a new clean base Parquet file, and marks the compaction instant as
completed. Old delta logs are retained until cleaning runs.
Operational strategies:
- Inline compaction (simple, higher write latency): Set
hoodie.compact.inline=trueandhoodie.compact.inline.max.delta.commits=5. Compaction runs as part of the write job. - Async compaction (production recommendation): Run compaction as a separate Spark job on a schedule. Decouples write latency from compaction cost. Use Airflow to schedule compaction every 30–60 minutes.
The key metric to monitor: hoodie_delta_log_file_size_sum - the total accumulated delta log size across the table. Alert if it exceeds a threshold (e.g., 50 GB or 10x the average file size). A large delta accumulation means compaction has fallen behind and reads are degrading.
Q6: How do you handle schema evolution in a Hudi table when a new field is added?
Answer:
Hudi supports schema evolution through the hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true and related configuration options. Adding a nullable column is the safest schema change:
# New schema adds 'surge_multiplier' column
new_schema_opts = {
"hoodie.datasource.write.schema.evolution.enable": "true",
"hoodie.schema.on.read.enable": "true",
}
# Write with the new schema - Hudi will reconcile old and new records at read time
new_df.write.format("hudi") \
.options(**{**hudi_options_cow, **new_schema_opts}) \
.mode("append") \
.save(table_path)
When a reader queries the table after a schema evolution, Hudi uses the latest schema to read all files. Old files missing the new column will return null for that column. This is the Schema-on-Read model for evolution.
Dropping columns is more dangerous and requires explicit configuration (hoodie.schema.on.read.enable=true with allow.auto.evolution.column.drop=true). Renaming columns is not supported natively - you must add the new column, backfill it from the old column, and then drop the old column in separate steps. For complex schema evolution needs, Iceberg offers a more robust spec.
Hudi Indexing: How Upserts Find Existing Records
The upsert operation in Hudi must answer a question before writing: "Does this incoming record already exist in the table, and if so, which file group contains it?" This lookup is what the Hudi index solves. Without an index, every upsert would require a full table scan to determine whether a record exists - catastrophically slow at scale.
Hudi supports several index types, each with different trade-offs:
Bloom Filter Index (Default)
Each Parquet file maintains a bloom filter of the record keys it contains. On upsert, Hudi checks the bloom filters to narrow down which files may contain the incoming keys, then verifies with a precise key lookup.
bloom_index_opts = {
"hoodie.index.type": "BLOOM",
"hoodie.bloom.index.parallelism": "200",
"hoodie.bloom.index.filter.dynamic.max.entries": "100000",
# Tune false positive rate - lower FPR = larger bloom filter
"hoodie.index.bloom.fpp": "0.000000001",
}
Best for: medium-sized tables where the record key distribution is uniform. Bloom filters scale with the number of files, not with the number of records per file. As tables grow into millions of files, bloom filter lookup itself becomes expensive.
HBase Index
For tables with billions of records, Hudi can use an external HBase cluster as an index store. HBase provides O(1) lookup for any record key, regardless of table size.
hbase_index_opts = {
"hoodie.index.type": "HBASE",
"hoodie.index.hbase.zkquorum": "zookeeper-host",
"hoodie.index.hbase.zkport": "2181",
"hoodie.index.hbase.table": "hudi_index_trips",
"hoodie.hbase.index.update.partition.path": "true",
}
Best for: tables with billions of records where bloom filter lookup is too slow, and you already operate an HBase cluster.
Simple Index
No bloom filter - directly looks up record keys in the relevant Parquet files. Simpler but slower than bloom index for large tables. Useful for small tables or during initial development.
simple_index_opts = {
"hoodie.index.type": "SIMPLE",
"hoodie.simple.index.parallelism": "50",
}
Bucket Index
The bucket index assigns each record to a fixed bucket based on a hash of the record key. Updates for a key always go to the same bucket, eliminating the index lookup entirely - Hudi knows which file group to write to deterministically.
bucket_index_opts = {
"hoodie.index.type": "BUCKET",
"hoodie.bucket.index.num.buckets": "256", # tune based on data volume
"hoodie.bucket.index.hash.field": "trip_id", # key to hash
}
Best for: very high-frequency upsert workloads (Flink streaming) where index lookup latency is a bottleneck. The trade-off: bucket count is fixed at table creation; resizing requires a full table rewrite.
Index Selection Guide
| Table Size | Update Frequency | Recommended Index |
|---|---|---|
| Less than 100 GB | Any | Bloom Filter (default) |
| 100 GB – 10 TB | Low to medium | Bloom Filter with tuned FPP |
| 10 TB+ | Medium | HBase Index |
| Any size | Very high (streaming) | Bucket Index |
Hudi Savepoints and Disaster Recovery
Hudi provides a savepoint mechanism - a way to mark a specific commit as protected from cleaning. Savepoints prevent the cleaner from removing old file versions, enabling point-in-time recovery to a known-good state.
# Create a savepoint at the current timeline state
spark.sql("""
CALL savepoint(
table => 's3://my-lakehouse/hudi/trips_cow',
commitTime => '20240115120000000'
)
""")
# List all savepoints
spark.sql("""
CALL show_savepoints(table => 's3://my-lakehouse/hudi/trips_cow')
""").show()
# Restore to a savepoint (disaster recovery - rolling back a bad batch)
spark.sql("""
CALL restore_to_savepoint(
table => 's3://my-lakehouse/hudi/trips_cow',
savepointTime => '20240115120000000'
)
""")
# Delete a savepoint when it's no longer needed
spark.sql("""
CALL delete_savepoint(
table => 's3://my-lakehouse/hudi/trips_cow',
savepointTime => '20240115120000000'
)
""")
The production pattern: create a savepoint before every major batch job (daily ETL, weekly backfill). If the job produces corrupt data, roll back to the savepoint instead of manually reprocessing. Delete savepoints after verifying the downstream data is correct. At any given time, maintain 2–3 savepoints (before the last three major jobs) so you always have a safe recovery point.
Cleaning: Managing Storage Costs in Hudi Tables
The Hudi cleaner is the background process that removes old file versions no longer needed for time travel or active queries. Without cleaning, Hudi tables grow unboundedly as every write creates new file versions.
Cleaning Policies
KEEP_LATEST_COMMITS: Retains the last N commits' worth of file versions. Any file versions older than the Nth commit back are cleaned.
cleaning_opts = {
"hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained": "10", # keep 10 commits of history
"hoodie.clean.automatic": "true", # run cleaner after every commit
"hoodie.clean.async": "true", # run cleaner asynchronously
}
KEEP_LATEST_FILE_VERSIONS: Retains the last N versions of each data file, regardless of how many commits have passed. Simpler mental model for MOR tables.
file_version_opts = {
"hoodie.cleaner.policy": "KEEP_LATEST_FILE_VERSIONS",
"hoodie.cleaner.fileversions.retained": "3",
}
Balancing Cleaning with Time Travel
The number of commits retained directly determines how far back incremental queries can look. If you retain 10 commits and write every 30 minutes, incremental consumers can only look back 5 hours. Set hoodie.cleaner.commits.retained based on your longest acceptable incremental query lookback window:
# If downstream consumers need to catch up after a 24-hour outage,
# and you write every 15 minutes (96 commits per day):
cleaning_opts = {
"hoodie.cleaner.policy": "KEEP_LATEST_COMMITS",
"hoodie.cleaner.commits.retained": "100", # ~25 hours of history
}
Savepoints (described above) override the cleaner - a savepointted commit is never cleaned, regardless of the cleaning policy. Use savepoints to protect specific historical states that must survive beyond the regular retention window.
