Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Spark Batch Processing demo on the EngineersOfAI Playground - no code required. :::

Lakehouse Query Engines

The Two-Cluster Tax

The BI team had a problem that looked like a success story. They had two world-class query engines: Presto for ad-hoc analytics, and Spark for ETL and large transformations. Both were running well. Both teams were happy with their tools. But the infrastructure team was quietly paying a tax that no one had fully accounted for.

Two EMR clusters - one always-on for Presto (BI needed sub-5-second queries at any time), one auto-scaling for Spark (ETL jobs burst to 50 nodes and scale down). Two Hive Metastore instances, one per cluster, which had to stay in sync. When a data engineer added a column to an Iceberg table via Spark, they had to manually run MSCK REPAIR TABLE on the Presto metastore before the BI team could see the change - and this step was forgotten often enough that it became a recurring support ticket. Two cost centers on the AWS bill. Two oncall rotations. Two sets of upgrades to coordinate.

The migration to Trino as a unified query layer took eight weeks. One cluster. One catalog (the Hive Metastore, or Glue, depending on the table). Trino's connector architecture meant it could query Iceberg tables, the legacy Delta Lake tables that hadn't been migrated yet, PostgreSQL operational databases, and a Kafka topic - all in the same SQL statement. The Presto cluster was decommissioned. The Spark cluster remained for heavy ETL but now shared the same metadata, so schema changes were instantly visible everywhere. Infrastructure cost dropped 40%. The catalog sync ticket closed permanently.

This story illustrates a fundamental truth about the lakehouse architecture: open table formats decouple storage from compute, but only if your query engines can read the metadata correctly. A perfectly structured Iceberg table is useless to a query engine that doesn't understand Iceberg's snapshot model. A Delta table's transaction log is meaningless to an engine that reads raw Parquet. The query engine layer is where the lakehouse architecture either delivers on its promise or falls apart.

Understanding which engine to use, when, and why is one of the most consequential decisions a data platform engineer makes.


Why Query Engines Matter in the Lakehouse

In a traditional data warehouse (Snowflake, Redshift, BigQuery), the query engine is inseparable from the storage format. The engine understands the internal format perfectly because it controls it. This tight coupling enables extreme optimization: the engine can push predicates deep into storage, leverage internal statistics, and compress data in engine-specific ways.

The lakehouse breaks this coupling. Storage is open - Parquet, ORC, or Avro files in S3. The table format layer (Iceberg, Delta, Hudi) adds metadata for transactions, schema, partitioning, and statistics. But the query engine must understand this metadata layer to exploit it effectively.

A query engine that doesn't understand Iceberg's manifest files will:

  • Scan every Parquet file in the table, ignoring partition pruning
  • Miss column-level min/max statistics, forcing full column scans
  • Never benefit from Z-ordering or clustering optimizations
  • Fail on tables with schema evolution (it'll see different schemas in different files and error out)

A query engine with native Iceberg support will:

  • Read the snapshot manifest to identify exactly which Parquet files contain relevant data
  • Apply predicate pushdown using Iceberg's column statistics (skipping entire files)
  • Handle schema evolution transparently using the table's schema evolution history
  • Use Z-order file groupings to skip large swaths of data

The difference between a "Parquet-aware" engine and a "table-format-native" engine can mean the difference between a 30-second query and a 3-second query on the same physical data.


Apache Spark: The Workhorse

Spark is not primarily a query engine - it's a general-purpose distributed compute framework. But its SQL interface (Spark SQL) and DataFrame API make it the most widely used engine for querying lakehouse data at scale. More importantly, Spark's native connectors for all three major table formats (Delta Lake, Iceberg, Hudi) are the most mature and feature-complete available.

What Spark Does Well

Large-scale ETL and transformations: When you need to join a 10 TB table against a 500 GB dimension table, filter on complex predicates, apply 20 business logic transformations, and write the result back as Iceberg - Spark is the right tool. Its Catalyst optimizer rewrites logical plans into physical plans, performs predicate pushdown, applies column pruning, and uses Adaptive Query Execution (AQE) to dynamically adjust partition counts during execution.

Adaptive Query Execution (AQE): A critical production feature introduced in Spark 3.0. AQE allows Spark to re-plan query execution at runtime based on actual data statistics collected during shuffle operations. This solves the data skew problem - if one partition has 100x more data than others (a common issue in real-world data), AQE automatically splits it into smaller partitions, preventing the single-task bottleneck that once required manual tuning.

from pyspark.sql import SparkSession

spark = (
SparkSession.builder
.appName("IcebergQuery")
.config("spark.sql.adaptive.enabled", "true") # AQE
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") # merge small partitions
.config("spark.sql.adaptive.skewJoin.enabled", "true") # fix data skew
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.lakehouse.type", "glue") # AWS Glue catalog
.getOrCreate()
)

# Query Iceberg with full predicate pushdown and column pruning
result = spark.sql("""
SELECT
driver_id,
COUNT(*) AS total_trips,
AVG(fare) AS avg_fare,
SUM(fare) AS total_revenue
FROM lakehouse.rides.trips
WHERE trip_date BETWEEN '2024-01-01' AND '2024-03-31'
AND status = 'completed'
GROUP BY driver_id
ORDER BY total_revenue DESC
LIMIT 100
""")

result.explain("extended") # See the physical plan to verify predicate pushdown

Feature engineering for ML: Spark's MLlib and integration with Python data science libraries make it the natural engine for computing training features at scale. Reading versioned Iceberg snapshots ensures reproducible feature sets.

When NOT to use Spark: For interactive, sub-second analytical queries on moderate-sized data (under 1 TB). Spark has significant startup overhead (JVM, executor allocation, cluster coordination), making it poorly suited for the "type a SQL query, get an answer in 2 seconds" experience that BI users expect. For that, use Trino or DuckDB.


Trino: The Federation Layer

Trino (formerly PrestoSQL) is a distributed SQL query engine designed for interactive analytics across heterogeneous data sources. Its defining feature is the connector architecture: Trino can query Iceberg tables, Delta tables, Hudi tables, PostgreSQL, MySQL, Kafka, Elasticsearch, and dozens of other sources - and JOIN across them in a single SQL statement.

The Connector Architecture

Each Trino connector translates between Trino's internal query model and a specific data source. For Iceberg, the connector reads the Iceberg REST catalog or Hive Metastore to discover table locations and metadata, then reads Parquet files with full predicate pushdown using Iceberg's statistics. For PostgreSQL, the connector pushes predicates down to the Postgres query planner.

This architecture enables query federation - a concept that sounds abstract until you see it in action:

-- A single Trino query spanning Iceberg, Delta, and PostgreSQL
-- No data movement between clusters required

SELECT
t.trip_id,
t.fare,
t.status,
d.driver_name,
d.license_plate,
u.user_email,
u.account_tier
FROM iceberg.rides.trips t -- Iceberg table in S3
JOIN delta.drivers.profiles d -- Delta Lake table in ADLS
ON t.driver_id = d.driver_id
JOIN postgresql.production.users u -- Live PostgreSQL database
ON t.user_id = u.user_id
WHERE t.trip_date = CURRENT_DATE
AND t.status = 'disputed'
ORDER BY t.fare DESC
LIMIT 500;

This query would have required three separate queries plus Python glue code in a pre-Trino world. In Trino, it's one statement with a single result set.

Trino Configuration for Iceberg

# /etc/trino/catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=glue
hive.metastore.glue.region=us-east-1
iceberg.file-format=PARQUET
iceberg.compression-codec=ZSTD

# Statistics: use Iceberg column statistics for predicate pushdown
iceberg.statistics-enabled=true
iceberg.collect-column-statistics-on-write=true

# Query parallelism
hive.max-partitions-per-scan=1000000
-- Trino session properties for performance tuning
SET SESSION task_concurrency = 8;
SET SESSION query_max_memory_per_node = '8GB';

-- Query Iceberg with partition pruning
-- Trino reads the Iceberg manifest to identify relevant files,
-- never touching partitions outside the date range
EXPLAIN ANALYZE
SELECT
DATE_TRUNC('hour', event_time) AS hour,
COUNT(*) AS event_count
FROM iceberg.analytics.events
WHERE event_date BETWEEN DATE '2024-01-15' AND DATE '2024-01-16'
AND event_type = 'purchase'
GROUP BY 1
ORDER BY 1;

Trino Strengths and Limitations

Strengths:

  • Interactive query latency (seconds, not minutes)
  • Cross-source federation without data movement
  • No JVM startup overhead per query (workers are always running)
  • Broad table format support (Iceberg, Delta, Hudi all supported)
  • Lower cost than Spark for ad-hoc queries (no cluster spin-up)

Limitations:

  • Not suitable for very long-running queries (hours) - use Spark
  • No native machine learning or complex UDF framework
  • Stateful streaming queries require Flink, not Trino
  • Memory pressure: Trino is memory-intensive; large hash joins can OOM workers

DuckDB: The In-Process Analytical Database

DuckDB is the most surprising entry in the lakehouse query engine ecosystem. It's an in-process analytical database - a library you embed in Python, R, or a CLI, with no cluster, no JVM, no network calls. It runs entirely within a single process. For datasets that fit in memory (or somewhat larger via its spill-to-disk engine), DuckDB can outperform Spark on a 32-core machine at a fraction of the operational complexity.

More importantly for the lakehouse: DuckDB can read Parquet files and Iceberg tables directly from S3, with predicate pushdown and parallel file reading, without any cluster setup.

DuckDB Reading Iceberg from S3

import duckdb

con = duckdb.connect()

# Configure S3 access
con.execute("""
INSTALL httpfs;
LOAD httpfs;
SET s3_region = 'us-east-1';
SET s3_access_key_id = 'YOUR_KEY';
SET s3_secret_access_key = 'YOUR_SECRET';
""")

# Install and load the Iceberg extension
con.execute("""
INSTALL iceberg;
LOAD iceberg;
""")

# Query an Iceberg table directly - DuckDB reads the snapshot manifest
# and applies predicate pushdown to skip irrelevant Parquet files
result = con.execute("""
SELECT
driver_id,
COUNT(*) AS trips,
ROUND(AVG(fare), 2) AS avg_fare,
ROUND(SUM(fare), 2) AS total_revenue
FROM iceberg_scan(
's3://my-lakehouse/iceberg/trips',
allow_moved_paths = true
)
WHERE trip_date >= '2024-01-01'
AND trip_date < '2024-02-01'
AND status = 'completed'
GROUP BY driver_id
ORDER BY total_revenue DESC
LIMIT 20
""").df()

print(result)

DuckDB for Local Development and CI

One of DuckDB's killer use cases is running the same analytical queries locally that would otherwise require a Spark cluster. A data engineer can develop and test an ETL transformation against a 10 GB sample Iceberg table on their laptop, then deploy the identical query logic (via PySpark or Trino) against the 100 TB production table.

import duckdb
import pandas as pd
from pathlib import Path

def analyze_trip_revenue(
iceberg_path: str,
start_date: str,
end_date: str,
engine: str = "duckdb" # "duckdb" | "spark" | "trino"
) -> pd.DataFrame:
"""
Run the same analysis across different engines.
DuckDB for local dev, Spark/Trino for production.
"""
sql = f"""
SELECT
DATE_TRUNC('week', CAST(trip_date AS DATE)) AS week,
COUNT(*) AS trip_count,
SUM(fare) AS revenue,
AVG(fare) AS avg_fare
FROM trips
WHERE trip_date BETWEEN '{start_date}' AND '{end_date}'
AND status = 'completed'
GROUP BY 1
ORDER BY 1
"""

if engine == "duckdb":
con = duckdb.connect()
con.execute(f"""
CREATE VIEW trips AS
SELECT * FROM parquet_scan('{iceberg_path}/**/*.parquet')
""")
return con.execute(sql).df()

elif engine == "spark":
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
trips = spark.read.format("iceberg").load(iceberg_path)
trips.createOrReplaceTempView("trips")
return spark.sql(sql).toPandas()

raise ValueError(f"Unknown engine: {engine}")

# Local dev - runs on laptop against a 10 GB sample
local_result = analyze_trip_revenue(
iceberg_path="data/sample_trips/",
start_date="2024-01-01",
end_date="2024-03-31",
engine="duckdb"
)

Apache Flink is architecturally different from Spark, Trino, and DuckDB. Flink is streaming-first - its core abstraction is an unbounded data stream, not a bounded batch dataset. This makes Flink the natural engine for pipelines where data arrives continuously and must be processed with low latency.

But Flink also has a powerful batch mode and a SQL API that works uniformly across streaming and batch. For lakehouse workloads, Flink shines in two scenarios:

  1. Writing to Iceberg/Hudi in real-time: Flink consumes from Kafka and writes to an Iceberg table with exactly-once semantics, using Flink checkpoints to trigger Iceberg commits.
  2. Continuous queries over lakehouse tables: Flink reads Iceberg tables in streaming mode, processing new snapshots as they arrive.
-- Flink SQL session

-- Create a Kafka source (unbounded stream)
CREATE TABLE kafka_events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'raw-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
);

-- Create an Iceberg sink (lakehouse table)
CREATE TABLE iceberg_events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
event_date STRING -- partition column
) PARTITIONED BY (event_date)
WITH (
'connector' = 'iceberg',
'catalog-name' = 'glue_catalog',
'catalog-type' = 'glue',
'warehouse' = 's3://my-lakehouse/iceberg',
'write.distribution-mode' = 'hash',
'write.upsert.enabled' = 'false'
);

-- Stream events from Kafka to Iceberg - exactly-once via Flink checkpoints
INSERT INTO iceberg_events
SELECT
event_id,
user_id,
event_type,
amount,
event_time,
DATE_FORMAT(event_time, 'yyyy-MM-dd') AS event_date
FROM kafka_events;

The key property: Flink commits to Iceberg at every checkpoint interval (typically every 1–5 minutes). Between checkpoints, writes are buffered in Flink's task manager memory. On commit, Flink atomically appends a new Iceberg snapshot - so readers always see complete, consistent snapshots, never partial writes.


Query Optimization Techniques Across All Engines

Regardless of which engine you use, these optimization principles apply universally when querying lakehouse data:

1. Partition Pruning

Never scan what you don't need. Partition pruning eliminates entire directories from the scan based on the query's WHERE clause.

-- GOOD: Partition pruning eliminates 364/365 days of data
SELECT COUNT(*) FROM trips WHERE trip_date = '2024-01-15';

-- BAD: No partition pruning - scans the entire table
SELECT COUNT(*) FROM trips WHERE YEAR(trip_date) = 2024 AND MONTH(trip_date) = 1;
-- The function application prevents the engine from pruning partitions

-- FIX: Rewrite to explicit range
SELECT COUNT(*) FROM trips
WHERE trip_date BETWEEN '2024-01-01' AND '2024-01-31';

2. Column Statistics and Data Skipping

Iceberg and Delta maintain min/max statistics per column per data file. A query engine can use these statistics to skip entire Parquet files:

# Iceberg: enable column statistics collection on write
spark.sql("""
ALTER TABLE lakehouse.rides.trips
SET TBLPROPERTIES (
'write.metadata.metrics.default' = 'truncate(16)',
'write.metadata.metrics.column.fare' = 'full',
'write.metadata.metrics.column.status' = 'full'
)
""")

# After enabling, Iceberg tracks min/max/null_count for each Parquet file
# A query WHERE fare > 100 will skip all files where max(fare) <= 100

3. Z-Ordering and Clustering

Z-ordering co-locates related data within Parquet files, improving the effectiveness of min/max statistics:

# Delta Lake Z-ordering
spark.sql("""
OPTIMIZE delta.`s3://my-lakehouse/delta/trips`
ZORDER BY (driver_id, trip_date)
""")

# Iceberg sort order (applied on write, not after the fact)
spark.sql("""
ALTER TABLE lakehouse.rides.trips
WRITE ORDERED BY driver_id, trip_date
""")

4. Predicate Pushdown Into Storage

Parquet stores data in row groups, with column statistics per row group. Push predicates down to the storage layer to skip row groups:

# Spark: verify predicate pushdown in the query plan
df = spark.read.format("iceberg").load("s3://my-lakehouse/iceberg/trips")
df.filter("fare > 50 AND status = 'completed'").explain()

# Look for "PushedFilters" in the output:
# PushedFilters: [IsNotNull(fare), GreaterThan(fare,50.0), EqualTo(status,completed)]
# This means Parquet row groups are being skipped before data reaches Spark

5. Vectorized Reads via Apache Arrow

Modern query engines read Parquet in columnar, vectorized batches using Apache Arrow. This dramatically increases throughput by enabling SIMD CPU instructions to process entire columns at once:

# DuckDB uses Arrow natively - all reads are vectorized
# Spark 3.x: enable Arrow optimization for Python UDFs
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Read Iceberg into Arrow for downstream ML or pandas processing
import pyarrow as pa
from pyiceberg.catalog import load_catalog

catalog = load_catalog("glue", **{"type": "glue", "region_name": "us-east-1"})
table = catalog.load_table("rides.trips")

# Scan returns Arrow batches - zero-copy to pandas or numpy
arrow_table = table.scan(
row_filter="trip_date >= '2024-01-01'",
selected_fields=["trip_id", "driver_id", "fare"]
).to_arrow()

df = arrow_table.to_pandas()

Same Query, Three Engines: A Comparison

To make the engine differences concrete, here is the same analytical query implemented in Spark, DuckDB, and Trino:

The query: For each driver, compute total trips, total revenue, and average fare for January 2024.

# ── SPARK ─────────────────────────────────────────────────────────────
# Best when: data > 500 GB, or this is part of a larger ETL pipeline

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("DriverRevenue").getOrCreate()

result_spark = (
spark.read.format("iceberg")
.load("s3://my-lakehouse/iceberg/trips")
.filter(
(F.col("trip_date") >= "2024-01-01") &
(F.col("trip_date") < "2024-02-01") &
(F.col("status") == "completed")
)
.groupBy("driver_id")
.agg(
F.count("*").alias("total_trips"),
F.round(F.sum("fare"), 2).alias("total_revenue"),
F.round(F.avg("fare"), 2).alias("avg_fare")
)
.orderBy(F.col("total_revenue").desc())
)

result_spark.write.format("iceberg").mode("overwrite") \
.save("s3://my-lakehouse/iceberg/driver_revenue_jan24")
# Use Spark when you need to WRITE the result back to the lakehouse


# ── DuckDB ────────────────────────────────────────────────────────────
# Best when: data < 100 GB, local dev, or one-off analysis

import duckdb

con = duckdb.connect()
con.execute("INSTALL iceberg; LOAD iceberg; INSTALL httpfs; LOAD httpfs;")
con.execute("SET s3_region='us-east-1';")

result_duckdb = con.execute("""
SELECT
driver_id,
COUNT(*) AS total_trips,
ROUND(SUM(fare), 2) AS total_revenue,
ROUND(AVG(fare), 2) AS avg_fare
FROM iceberg_scan('s3://my-lakehouse/iceberg/trips')
WHERE trip_date >= '2024-01-01'
AND trip_date < '2024-02-01'
AND status = 'completed'
GROUP BY driver_id
ORDER BY total_revenue DESC
""").df()

result_duckdb.to_csv("driver_revenue_jan24.csv", index=False)
# DuckDB is the fastest path from "Iceberg table" to "CSV on my laptop"


# ── Trino ─────────────────────────────────────────────────────────────
# Best when: BI team needs interactive results, or cross-source join required

import trino

conn = trino.dbapi.connect(
host="trino.internal.company.com",
port=443,
http_scheme="https",
auth=trino.auth.OAuth2Authentication(),
)
cursor = conn.cursor()

cursor.execute("""
SELECT
driver_id,
COUNT(*) AS total_trips,
ROUND(SUM(fare), 2) AS total_revenue,
ROUND(AVG(fare), 2) AS avg_fare
FROM iceberg.rides.trips
WHERE trip_date >= DATE '2024-01-01'
AND trip_date < DATE '2024-02-01'
AND status = 'completed'
GROUP BY driver_id
ORDER BY total_revenue DESC
""")

result_trino = cursor.fetchall()
# Trino returns results in ~3 seconds for 50 GB tables
# No cluster spin-up overhead - workers are always running

Engine Selection Decision Matrix


Lakehouse Federation in Practice

The most powerful use case for Trino is federating queries across systems that would otherwise require ETL to combine. A fintech company might run:

-- Trino federation query: combine lakehouse events with live operational DB
-- and a Kafka topic for real-time fraud detection context

SELECT
e.user_id,
e.amount,
e.merchant_id,
u.account_age_days,
u.fraud_score_model_v2,
m.merchant_category,
m.risk_tier,
rs.recent_decline_count -- from a Kafka-backed real-time table
FROM iceberg.finance.transactions e -- Iceberg: last 90 days of events
JOIN postgresql.production.users u -- Postgres: live user profiles
ON e.user_id = u.user_id
JOIN iceberg.reference.merchants m -- Iceberg: merchant reference data
ON e.merchant_id = m.merchant_id
LEFT JOIN kafka.realtime.user_risk_signals rs -- Kafka: last 5-minute signals
ON e.user_id = rs.user_id
WHERE e.transaction_date = CURRENT_DATE
AND e.amount > 1000
ORDER BY e.amount DESC;

This query cannot be expressed in Spark without moving data between systems first. Trino executes it as a federated plan - pushing predicates to each source system, pulling only the filtered rows, and performing the joins in Trino's distributed memory. No ETL pipeline required.


Production Engineering Notes

Caching

Trino and Spark both support caching hot datasets in memory or on local SSDs to avoid repeated S3 reads:

# Spark: cache a hot reference table in memory
spark.sql("CACHE TABLE lakehouse.reference.merchants")

# Trino: use local file system cache (Alluxio or native file cache)
# trino.properties:
# hive.cache.enabled=true
# hive.cache.location=/var/lib/trino/cache
# hive.cache.max-disk-usage=100GB

Cost-Based Optimization

All production engines support cost-based optimization (CBO) - using table statistics to choose better join orders and strategies. Collect statistics after major data loads:

# Spark / Iceberg: compute table statistics
spark.sql("ANALYZE TABLE lakehouse.rides.trips COMPUTE STATISTICS FOR ALL COLUMNS")

# Trino: collect table statistics (triggers Iceberg stats collection)
# Run after significant data additions
cursor.execute("ANALYZE iceberg.rides.trips")

:::danger Never Run Spark for Interactive BI Queries Routing ad-hoc BI queries through Spark clusters is one of the most expensive mistakes in data platform engineering. Even a "small" Spark query incurs 30–120 seconds of cluster spin-up overhead (if auto-scaling) or wastes idle cluster resources (if always-on). A Trino cluster serves the same query in 2–5 seconds at a fraction of the cost. Use Spark for what it's designed for: large-scale transformations, ML feature engineering, and ETL jobs that need to write back to the lakehouse at scale. :::

:::warning DuckDB in Production Requires Careful Concurrency Management DuckDB is a single-process, in-process database. It does not have a server mode with connection pooling. If you run DuckDB in production for multiple concurrent users, each user needs their own DuckDB process, and each process independently reads from S3. At 50 concurrent users, you have 50 independent S3 scan processes - this can exceed your S3 request rate limits and produce surprisingly high S3 costs. Use DuckDB for individual analyst workloads, local development, and automated pipelines where concurrency is controlled. For interactive multi-user BI, use Trino. :::

:::tip Match Your Engine to Your Data Volume, Not Your Team's Familiarity The most common suboptimal pattern is using Spark for everything because the team knows Spark. A 5 GB analytical query that runs in 45 seconds on a Spark cluster (including startup) runs in 8 seconds on DuckDB locally. A 200 GB aggregation that takes 12 minutes in Spark SQL takes 45 seconds in a Trino cluster with proper partitioning. Choose the engine that fits the query, then learn it. :::

:::note Unified Catalog is the Key to Multi-Engine Architectures The Iceberg REST Catalog, AWS Glue, or Databricks Unity Catalog act as the shared metadata layer that makes multi-engine architectures practical. When Spark writes an Iceberg table and registers it in Glue, Trino can immediately query it without any schema synchronization step. This is the architectural win that consolidates multiple engine deployments from separate silos into a coherent lakehouse platform. :::


Interview Questions and Answers

Q1: Why can't you just use Spark for everything in a lakehouse? What does Trino add?

Answer:

Spark is a batch compute framework with a SQL interface. Its design centers on distributed transformations over bounded datasets - reading, processing, and writing large amounts of data. This design creates inherent overhead that makes it unsuitable for interactive analytical queries:

  1. Startup latency: On a dynamically allocated cluster, Spark needs 30–120 seconds to provision executors before the first byte of data is read. For a BI analyst who clicks "Run" and expects results in 5 seconds, this is a broken experience.

  2. Resource idle cost: An always-on Spark cluster large enough to handle peak concurrent BI queries wastes compute during off-peak hours. Spark is designed for bursting, not sustained concurrent workloads.

  3. Query concurrency: Spark executes one query at a time per SparkSession. True multi-user concurrency requires multiple SparkSessions, which multiplies memory requirements linearly.

Trino solves these specific problems: it has a long-running coordinator and always-warm workers, so query startup overhead is near zero. It is designed for concurrent, short-lived queries - 100 users running different queries simultaneously is the expected workload, not a corner case. It reads the same Iceberg/Delta metadata Spark writes, so they share one logical catalog without data duplication.

The right architecture: Spark for ETL and ML feature engineering (bulk transformations, writes back to lakehouse), Trino for ad-hoc queries and BI dashboards (reads from lakehouse, serves analysts). Both use the same Iceberg table - storage is shared, compute is separate.


Q2: Explain predicate pushdown in the context of a lakehouse query. Why does it matter?

Answer:

Predicate pushdown is the optimization where a query filter (predicate) is applied as early as possible in the execution pipeline - ideally before data is read from disk - rather than after loading it into memory.

In a lakehouse context, predicate pushdown happens at multiple levels:

Level 1: Partition pruning (coarsest, most impactful). If a table is partitioned by event_date and the query has WHERE event_date = '2024-01-15', the engine reads only the files in the event_date=2024-01-15/ directory. Every other partition's data is never touched. This is why partitioning strategy is so important - a query on a daily-partitioned table scanning one day reads 1/365th of the data.

Level 2: File-level skipping using Iceberg/Delta statistics. Iceberg maintains per-file min/max statistics for each column. If a file's max(fare) = 45.00 and the query has WHERE fare > 100, that file is skipped entirely without reading a single byte. This is O(number of files) metadata work rather than O(data volume) I/O work.

Level 3: Row-group-level skipping in Parquet. Within a Parquet file, data is stored in row groups (typically 128 MB each). Each row group has column statistics (min/max, null count). The engine evaluates predicates against these statistics and skips row groups where no matching rows can exist.

Level 4: Page-level skipping. Within a row group, data is stored in pages. Dictionary encoding allows the engine to check whether the filter value exists in the page's dictionary before decoding any data.

The impact compounds across levels. A query on a 10 TB table with good partitioning, Z-ordering, and statistics might read only 50 MB of actual data - a 200,000x reduction. Without pushdown, every level degenerates to a full scan.


Q3: When would you use DuckDB in a production data pipeline? What are its limitations?

Answer:

DuckDB is production-appropriate in several specific scenarios:

Single-process ETL jobs: A Python script runs on an EC2 instance, reads a 20 GB Iceberg table from S3, performs transformations, and writes a 500 MB result. DuckDB executes this faster than Spark (no JVM, no cluster coordination) and simpler to deploy (no cluster management). The limitation: if the data grows to 200 GB, DuckDB's spill-to-disk behavior will slow it down significantly and you'll need to re-architect to Spark.

Local development and testing: Engineers develop and validate ETL logic against DuckDB using a representative sample of production Iceberg data. The same SQL runs in production on Spark or Trino. This eliminates the "write locally, break in production" problem from engine-specific SQL dialects.

Automated reporting pipelines with controlled concurrency: A nightly Python script generates 10 reports from an Iceberg table. No concurrent users, known data size (< 100 GB), scheduled execution. DuckDB is faster and cheaper than spinning up a Spark cluster.

Data export jobs: Read from Iceberg, write to CSV, Parquet, or a PostgreSQL staging table. DuckDB's COPY TO statement handles this in a single operation.

Limitations to understand:

  1. No horizontal scaling: DuckDB runs on a single machine. If data exceeds the machine's memory + disk, you need to rethink the approach.
  2. No built-in concurrency for multiple users: Each user needs their own process; no connection pooling.
  3. Limited write-back to table formats: DuckDB can read Iceberg well but writing back to Iceberg in production is less mature than Spark's writer.
  4. Not suited for streaming: DuckDB is a batch engine; streaming requires Flink or Spark Structured Streaming.

Q4: What is the difference between Trino and Spark for querying a table with 5 TB of data and 100 concurrent users?

Answer:

This scenario exposes the fundamental architectural difference between the two engines.

Trino is designed exactly for this workload. Each of the 100 queries is submitted to the Trino coordinator, which plans them independently and distributes them across the always-warm worker pool. Workers share resources across all concurrent queries. Memory is managed at the coordinator level - queries that exceed their memory budget are spilled to disk or killed, but they don't block other queries. A well-tuned Trino cluster handling 100 concurrent queries on 5 TB data is the system's expected operating mode, not a stress condition.

Typical Trino cluster for this workload: 1 coordinator (16 vCPU, 64 GB RAM) + 20 workers (32 vCPU, 128 GB RAM each). Cost: approximately $2,500/month on EC2 for always-on capacity.

Spark is designed for the exact opposite: one or a few large, long-running jobs with exclusive cluster access. With 100 concurrent users, you face a choice:

  • Shared SparkContext: 100 users sharing one SparkSession. Queries queue behind each other. User 100 waits for users 1–99 to finish. Unacceptable for interactive BI.
  • One SparkSession per user: 100 separate Spark applications, each requiring its own executor allocation. On a 200-node cluster, this fragments resources into 100 2-node "clusters," each with massive overhead per query. Total cost: approximately $8,000–15,000/month.

The conclusion is not that Spark is bad - it's that Spark is the wrong tool for this workload. Use Trino for the 100-user interactive query workload, and reserve Spark for the nightly 5 TB ETL jobs that these 100 users trigger based on the insights they find in Trino.


Q5: How do you choose a partition strategy for a lakehouse table to maximize query engine performance?

Answer:

Partition strategy is one of the highest-leverage decisions in lakehouse design because it determines the effectiveness of partition pruning - the most impactful single optimization in query engines.

The core principle: Partition by the column(s) most commonly used as filter predicates in your workload's queries.

Step 1: Understand your query patterns. Examine the WHERE clauses in your top 20 queries by frequency and cost. If 80% of queries filter on event_date, partition by event_date. If most queries filter on both event_date and region, consider a two-level partition: event_date/region=US/, event_date/region=EU/.

Step 2: Evaluate partition cardinality. The partition column should have cardinality that produces files of 256 MB–1 GB each. Too low cardinality (e.g., partitioning by status with 3 values) produces 3 massive partitions with no pruning benefit for date-filtered queries. Too high cardinality (e.g., partitioning by user_id with 50M users) produces 50M directories - the metadata overhead collapses performance.

Step 3: Consider time-based partitioning for event data. Daily partitioning (YYYY-MM-DD) is the most common choice for event data. It aligns with most retention and query patterns. Hourly partitioning is appropriate for very high-volume streaming data (>1 TB/day) where you need sub-hour query isolation.

Step 4: Handle skew explicitly. If one partition value represents 40% of all data (e.g., status = 'completed' for most rides), partition pruning on status is ineffective. In this case, partition by date (evenly distributed) and use Z-ordering on status to cluster data within files.

Step 5: Use Iceberg hidden partitioning to decouple the physical partition from the logical column type:

-- Iceberg hidden partitioning: partition by MONTH(event_time)
-- but query using WHERE event_time >= '2024-01-01'
-- Iceberg handles the mapping - no function application in WHERE clause needed
ALTER TABLE events ADD PARTITION FIELD months(event_time)

This avoids the classic anti-pattern of WHERE MONTH(event_date) = 1 breaking partition pruning.


Q6: What is Apache Arrow and why does it matter for lakehouse query engines?

Answer:

Apache Arrow is an in-memory columnar data format specification and a set of libraries that implement it across languages (Python, Java, C++, R, Go). It is not a file format - it defines how columnar data is laid out in memory, using a specific binary layout optimized for SIMD CPU operations and zero-copy data sharing between processes.

For lakehouse query engines, Arrow matters for three reasons:

1. Zero-copy data transfer between engines. When Spark reads Iceberg data and hands it to Python for a UDF, without Arrow, the data is serialized (Java objects → Python bytes → Python objects) - expensive. With Arrow enabled (spark.sql.execution.arrow.pyspark.enabled=true), the columnar data stays in the same memory layout and is passed to Python via a shared memory pointer. No serialization. The speedup for vectorized UDFs is typically 10–100x.

2. SIMD-accelerated column operations. Arrow's memory layout is designed so that entire columns are contiguous in memory. Modern CPUs can process 8–32 values simultaneously using AVX2/AVX-512 SIMD instructions. A filter fare > 100 applied to a million-row column column using SIMD processes 8 rows per CPU cycle instead of 1. DuckDB, Spark with Velox, and Trino with native connectors all exploit this.

3. The Arrow Flight protocol for engine interoperability. Arrow Flight is a gRPC-based protocol for efficiently transferring Arrow data between services over the network. This enables query results from Trino to be streamed directly to Python in Arrow format, or for DuckDB to receive data from an Arrow Flight server without deserialization. It's the emerging standard for cross-engine data exchange in lakehouse architectures.

The practical implication: when evaluating query engines and connectors, prefer those with native Arrow support. "Arrow-native" vs. "Arrow via conversion" can represent a 5–10x performance difference on large result sets.


Benchmarking Query Engines in Practice

Before committing to an engine for a workload, benchmark it on your actual data with your actual query patterns. Synthetic benchmarks (TPC-H, TPC-DS) provide directional guidance but rarely match the specific characteristics of production data - skew patterns, partition layouts, data types, and query complexity all affect results.

A Simple Benchmark Harness

import time
import duckdb
import pandas as pd
from dataclasses import dataclass
from typing import Callable, Any

@dataclass
class BenchmarkResult:
engine: str
query_name: str
elapsed_seconds: float
row_count: int
error: str = None

def benchmark_query(
engine_name: str,
query_fn: Callable[[], Any],
iterations: int = 3
) -> BenchmarkResult:
"""Run a query function multiple times and return median result."""
times = []
row_count = 0

for i in range(iterations):
try:
start = time.perf_counter()
result = query_fn()
elapsed = time.perf_counter() - start
times.append(elapsed)

# Get row count from result
if hasattr(result, '__len__'):
row_count = len(result)
elif hasattr(result, 'count'):
row_count = result.count()
except Exception as e:
return BenchmarkResult(
engine=engine_name,
query_name="unknown",
elapsed_seconds=-1,
row_count=0,
error=str(e)
)

median_time = sorted(times)[len(times) // 2]
return BenchmarkResult(
engine=engine_name,
query_name="benchmark",
elapsed_seconds=round(median_time, 3),
row_count=row_count
)


# Benchmark the same aggregation across DuckDB and Spark
def duckdb_monthly_revenue():
con = duckdb.connect()
con.execute("INSTALL iceberg; LOAD iceberg; INSTALL httpfs; LOAD httpfs;")
con.execute("SET s3_region='us-east-1';")
return con.execute("""
SELECT
DATE_TRUNC('month', CAST(trip_date AS DATE)) AS month,
COUNT(*) AS trip_count,
SUM(fare) AS revenue
FROM iceberg_scan('s3://my-lakehouse/iceberg/trips')
WHERE trip_date >= '2024-01-01'
GROUP BY 1
ORDER BY 1
""").df()

def spark_monthly_revenue():
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
return (
spark.read.format("iceberg")
.load("s3://my-lakehouse/iceberg/trips")
.filter(F.col("trip_date") >= "2024-01-01")
.groupBy(F.date_trunc("month", F.col("trip_date")).alias("month"))
.agg(F.count("*").alias("trip_count"), F.sum("fare").alias("revenue"))
.orderBy("month")
.collect()
)

duckdb_result = benchmark_query("duckdb", duckdb_monthly_revenue)
spark_result = benchmark_query("spark", spark_monthly_revenue)

results_df = pd.DataFrame([
{"engine": duckdb_result.engine, "elapsed_s": duckdb_result.elapsed_seconds,
"rows": duckdb_result.row_count},
{"engine": spark_result.engine, "elapsed_s": spark_result.elapsed_seconds,
"rows": spark_result.row_count},
])
print(results_df.to_string(index=False))
# Typical output on a 50 GB table:
# engine elapsed_s rows
# duckdb 4.231 12
# spark 18.750 12 (on an always-warm cluster)
# spark 112.000 12 (including cluster startup)

Reading Benchmark Results Correctly

A benchmark result is only meaningful in the context of:

  1. Data volume: DuckDB wins at 10–50 GB; Spark wins at 1 TB+
  2. Cluster state: Spark numbers without cluster startup time are misleading for interactive use cases
  3. Query complexity: DuckDB's in-process execution struggles with joins that produce large intermediate results that exceed memory; Spark's shuffle handles this gracefully
  4. Concurrency: DuckDB benchmarks single-user; Trino benchmarks should include concurrent query load

Always benchmark with:

  • The actual data volume (or a representative sample with the same skew)
  • The actual cluster/machine configuration you'll run in production
  • Concurrent users if the workload is multi-user
  • Both cold-start and warm-start timings for interactive engines

These benchmarks, run on your specific workload and data, will tell you more than any published comparison - and they will tell you exactly which engine to choose.

© 2026 EngineersOfAI. All rights reserved.