:::tip 🎮 Interactive Playground Visualize this concept: Try the Build vs Buy demo on the EngineersOfAI Playground - no code required. :::
Cost and Performance Trade-offs in Data Infrastructure
Reading time: 40–45 min | Relevance: Very high for data engineers, ML engineers, platform engineers | Target roles: Data Engineer, Senior Data Engineer, ML Platform Engineer, Data Architect
The $500 Spark Job
It is 6 AM at DoorDash's data platform team and the overnight model training pipeline failed - again. The PySpark job that computes delivery time features across 2 billion courier GPS pings finished in 4 hours and 12 minutes. The SLA is 3 hours. The compute cost: 3,584 per week, $186,000 per year. For a single feature engineering job.
The platform engineer on call, Marcus, pulls up the Spark UI. He can see immediately that the job is stuck in a shuffle stage - the stage bar in the timeline is 90% gray (waiting) and 10% green (running). Specifically, one task in the aggregateByCoordinateGrid stage has been running for 3 hours while all 499 other tasks finished in under 2 minutes. One partition has 1.8 billion rows. The rest have an average of 4 million.
This is data skew. The coordinate grid cell for downtown Manhattan contains a disproportionate fraction of all deliveries. Every join, every aggregation, every operation that partitions by grid cell routes 90% of the data to a single executor. That executor runs out of memory, spills to disk, and the job grinds to a halt. The other 499 executors sit idle - paid for, doing nothing - while the one overloaded executor struggles.
Marcus makes two changes: adds a random salt to the join key for the skewed grid cells, and converts the dimension table join to a broadcast join (the dimension table is 200MB - small enough to fit in every executor's memory). The next run completes in 41 minutes at 3,584 to 154,000. From two code changes that took 45 minutes to implement.
This lesson is about the reasoning that lets you find those two changes. Understanding the cost model for cloud data infrastructure, diagnosing performance bottlenecks, and knowing which trade-offs to make - these are the skills that separate a data engineer who builds systems from a data engineer who just writes pipelines.
Why This Exists
The economics of data engineering changed dramatically between 2010 and 2020. In 2010, running Hadoop MapReduce on an on-premises cluster meant paying for servers whether they ran or not. The marginal cost of one more job was zero. Engineers optimized for developer velocity, not computational efficiency.
Cloud computing changed this. On AWS EMR, you pay per core-hour. On Google BigQuery, you pay per byte scanned. On Databricks, you pay per Databricks Unit (DBU) consumed. Every query, every join, every full-table scan has a line item. A poorly written BigQuery query against a 10TB table costs 5,000 before deploying to production.
The pay-per-use model created a new engineering discipline: cost engineering. Not just making code fast, but making code economically efficient. A job that costs 80 and finishes in 45 minutes - both meet the SLA, but one is $420 cheaper per run. At scale, these differences determine whether a company's data platform is a competitive advantage or a budget crisis.
The fundamental insight is that performance and cost are related but not identical. Throwing more compute at a problem makes it faster but costs more. The goal is to find the point on the trade-off curve that satisfies your SLA at minimum cost - and to understand which optimizations move the curve itself rather than just trading one resource for another.
Historical Context
MapReduce (Google, 2004; Apache Hadoop, 2006) introduced the modern paradigm of distributed data processing. The cost model was simple: CPU time × number of machines. But Hadoop MapReduce was notoriously slow - every stage wrote intermediate results to HDFS (disk). Engineers spent enormous effort minimizing the number of MapReduce stages.
Apache Spark (UC Berkeley AMPLab, 2012) addressed Hadoop's disk I/O bottleneck by keeping intermediate data in memory. DAG execution replaced the fixed two-phase MapReduce model. Spark was 10–100x faster than Hadoop MapReduce for iterative algorithms. But Spark introduced a new cost bottleneck: the shuffle - the process of redistributing data across executors for joins and aggregations.
BigQuery (Google, 2010; public, 2012) introduced the per-byte-scanned pricing model. A 1TB table costs 5/TB). This single pricing decision made partitioning and column selection economically critical in a way they never were with per-hour compute pricing. Engineers who understood partitioning could run the same query for 5.
Databricks commercialized Spark with the DBU model (Databricks Units per hour). This added another dimension: instance type selection. A memory-optimized instance (for shuffle-heavy jobs) costs more per DBU but may complete jobs faster, resulting in fewer total DBUs. The optimization space became multidimensional.
Serverless analytics (BigQuery, Athena, Redshift Spectrum, DuckDB) represents the current evolution: no cluster to manage, pure pay-per-query pricing. The cost model forces better query design but removes the control over execution that experts relied on for optimization.
Core Concepts
The Trade-off Triangle
Every data processing decision sits in a three-dimensional trade-off space:
- Low latency, high throughput: requires lots of expensive compute (fast, scalable - but expensive)
- Low latency, low cost: requires clever optimization (fast, cheap - but limited throughput)
- High throughput, low cost: requires batching and scheduling off-peak (scalable, cheap - but slow)
You cannot have all three simultaneously. A Spark job running on 100 executors with SSDs has low latency and high throughput - but costs 100x what a 1-executor job costs. Running that same job on a single machine with DuckDB has low cost and reasonable throughput for the data size - but cannot parallelize across nodes.
The practical skill is knowing where your system sits on this triangle and whether that position matches your requirements.
:::note The SLA determines your minimum latency requirement Once you know you need to finish in 3 hours, that is a hard constraint. The optimization question becomes: what is the cheapest way to finish in 3 hours? Not: what is the fastest possible time? Optimizing for speed beyond the SLA is wasted money. :::
Compute Cost Models
BigQuery charges per byte scanned, currently $5 per TB (on-demand) or a flat monthly reservation (capacity pricing). The scan is what triggers the cost - not compute time, not storage, not the number of rows returned.
A full table scan of a 10TB events table costs:
A partitioned query that filters by date (reading 1/365 of the data):
The cost reduction is - about 365x cheaper. This single optimization - partitioning by date and always filtering on the partition column - is the most impactful cost optimization for BigQuery users.
Spark on EMR charges per EC2 instance-hour. A 20-node cluster with r5.4xlarge instances (16 vCPU, 128 GB RAM, $1.008/hr on-demand) costs:
The optimization vectors here are different from BigQuery:
- Reduce wall-clock time (fewer hours billed)
- Right-size instances (use smaller instances if memory is underutilized)
- Use Spot instances (60–80% discount, with interruption risk)
- Minimize shuffle (the dominant cost of most Spark jobs)
Databricks uses DBUs (Databricks Units). A DBU is an abstract unit of compute that varies by workload type (jobs, all-purpose clusters, SQL warehouses). Pricing is $DBU_rate × hours × node_count. For jobs clusters (batch processing), the DBU rate is lower than for interactive all-purpose clusters. The optimization advice is the same as EMR, plus: use the jobs cluster type for production pipelines, not all-purpose clusters.
Storage Cost Models
Object storage (S3, GCS, Azure Blob) charges approximately $0.023 per GB per month. This seems trivial but accumulates:
- 100 TB of raw CSV data = 27,600/year just for storage
- The same data in Parquet with Snappy compression: ~25 TB = 6,900/year
- Savings from compression alone: $20,700/year
Format choice is a storage cost decision:
| Format | Relative Size | Monthly Cost for 100TB raw |
|---|---|---|
| CSV (uncompressed) | 100% | $2,300 |
| CSV (gzip) | 20–30% | 690 |
| Parquet (Snappy) | 15–25% | 575 |
| Parquet (Zstandard) | 10–20% | 460 |
| ORC (Zlib) | 12–18% | 414 |
The compression ratio varies significantly by data type. Integer and timestamp columns compress very well (10:1 or better in Parquet). High-cardinality string columns like UUIDs compress poorly (2:1 at best).
Tiered storage: S3 offers multiple storage classes with different cost/access-latency trade-offs:
| Class | Cost/GB/mo | Retrieval | Use Case |
|---|---|---|---|
| S3 Standard | $0.023 | Immediate | Hot data, last 30 days |
| S3 Standard-IA | $0.0125 | Immediate, $0.01/GB fee | Warm data, 30–90 days |
| S3 Glacier Instant | $0.004 | Milliseconds | Cold data, accessed occasionally |
| S3 Glacier Flexible | $0.0036 | 1–5 minutes | Archive, infrequent access |
| S3 Glacier Deep Archive | $0.00099 | 12–48 hours | Compliance archives |
S3 Intelligent-Tiering moves objects between tiers automatically based on access patterns ($0.0025/1000 objects per month monitoring fee). For data lakes with mixed hot/cold patterns, Intelligent-Tiering often saves 30–50% on storage costs.
Partitioning Strategy
Partitioning is the single most impactful performance and cost optimization for large datasets. A partition is a subset of data stored in a separate directory with a key-value path: s3://bucket/events/date=2024-01-15/country=US/part-00001.parquet.
When a query filters on date = '2024-01-15', the query engine only scans the date=2024-01-15 partitions. In BigQuery, only those partitions are billed. In Spark, only those files are scheduled for tasks.
Partition pruning math: A 10TB table partitioned by date (365 partitions/year). A query filtering to one day reads:
versus a full scan of 10 TB. The partition elimination makes this query 365x cheaper to run and 365x less data for the executors to process.
Choosing the right partition key:
- Date (most common): partition by
event_date,transaction_date. Almost every analytical query has a time range filter. Date partitioning is nearly always the right first choice. - Date + one high-cardinality column: common pattern is
date=2024-01-15/region=EMEA. Enables efficient regional queries. - Avoid high-cardinality partitioning: partitioning by
user_idorevent_idcreates millions of tiny files. This is "partition explosion" - the Parquet file reader overhead dominates, and queries become slower than a non-partitioned table.
Hive-style vs Iceberg hidden partitioning:
Hive-style partitioning (the directory naming convention above) requires queries to know the partition columns and filter on them explicitly. If you change from daily partitioning to hourly partitioning, all existing queries break.
Apache Iceberg's hidden partitioning stores the partition logic in the table metadata, not the directory structure. The table knows to partition event_timestamp by DAY. Queries filter on event_timestamp > '2024-01-15' - the partition is applied automatically, invisibly. If you evolve from daily to hourly partitioning, no queries need to change.
Shuffle Cost in Spark
The shuffle is Spark's most expensive operation. When Spark needs to redistribute data across executors - for a groupBy, join, or repartition - every executor must write its shuffle data to disk, and every executor must read shuffle data from all other executors over the network.
For a Spark job with 1 TB of data and a shuffle:
The shuffle is bounded by network bandwidth, not CPU. A cluster with 100 Gb/s network can sustain ~12 GB/s of shuffle throughput. Shuffling 1 TB takes at minimum ~83 seconds just for the network transfer, plus disk I/O on both ends.
Reducing shuffle with broadcast joins:
A standard join between a 1TB fact table and a 1GB dimension table requires shuffling both tables by the join key - 1TB + 1GB of shuffle. A broadcast join copies the small table to every executor in memory - the large table never shuffles. Spark executes the join locally on each executor with no network transfer for the large table.
The threshold for automatic broadcast joins in Spark is controlled by spark.sql.autoBroadcastJoinThreshold (default: 10MB). Tables under this size are automatically broadcast. For tables up to ~1GB, explicit broadcasting is worth considering:
from pyspark.sql.functions import broadcast
# Explicit broadcast join - no shuffle of dimension table
result = fact_df.join(broadcast(dim_df), "merchant_id")
Pre-partitioning with bucketing: If you repeatedly join two large tables on the same key, you can pre-partition both tables by that key (bucketing in Hive/Spark). Subsequent joins on that key require no shuffle - the data is already co-located by key on disk.
Data Skew
Data skew occurs when one partition (or a few partitions) contains orders of magnitude more data than the others. In Spark, each partition is processed by one task. A skewed partition means one task runs for hours while all others finish in minutes.
Diagnosing skew: In the Spark UI, look at the "Tasks" tab of a shuffle stage. If the max task duration is 10x the median task duration, you have skew. The "Input Size / Records" column shows partition sizes - one partition with 10x the rows of others is the smoking gun.
Salting technique for skewed join keys:
from pyspark.sql import functions as F
import random
# Without salting: all rows with merchant_id = "AMAZON" go to one partition
result = transactions.join(merchants, "merchant_id")
# With salting: distribute AMAZON rows across 10 partitions
SALT_FACTOR = 10
# Add random salt to fact table
transactions_salted = transactions.withColumn(
"salted_merchant_id",
F.concat(
F.col("merchant_id"),
F.lit("_"),
(F.rand() * SALT_FACTOR).cast("int").cast("string")
)
)
# Expand dimension table with all salt values
# Each merchant gets SALT_FACTOR copies, one per salt value
salt_values = spark.range(SALT_FACTOR).toDF("salt")
merchants_expanded = merchants.crossJoin(salt_values).withColumn(
"salted_merchant_id",
F.concat(
F.col("merchant_id"),
F.lit("_"),
F.col("salt").cast("string")
)
).drop("salt")
# Join on salted key - AMAZON rows now distributed across 10 partitions
result = transactions_salted.join(
merchants_expanded,
"salted_merchant_id"
).drop("salted_merchant_id")
The cost of salting: the dimension table expands by SALT_FACTOR. For a 200MB dimension table with salt factor 10, it becomes 2GB. This is acceptable if the 2GB fits in executor memory (broadcast join threshold). For larger dimension tables, lower salt factors or adaptive query execution (AQE) may be more appropriate.
Adaptive Query Execution (AQE): Spark 3.0+ includes AQE, which can dynamically coalesce partitions during shuffle (reducing skew), automatically convert shuffle joins to broadcast joins when possible, and optimize skewed joins. Enable it with:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
AQE is not a silver bullet - it handles moderate skew well but severe skew (one partition with 1000x more data than others) still requires manual salting.
Caching Strategy
Caching stores the result of a computation in memory (or disk) so repeated reads do not recompute or re-read from source. The trade-off is memory/storage cost for reduced latency and compute cost.
What to cache:
- Frequently queried aggregates: a daily summary table queried 1000 times/day. Cache as a materialized view.
- Small dimension tables: reference data (product catalog, merchant metadata) used in many joins. Cache in executor memory.
- Intermediate Spark DataFrames: when the same DataFrame is used in multiple downstream operations, cache it to avoid recomputing.
What not to cache:
- Large, infrequently queried tables: caching a 10TB table that is queried once/day wastes cache storage.
- Real-time data: cached data becomes stale. If freshness matters more than latency, do not cache.
- Highly skewed access patterns: if 80% of queries hit 1% of the data, cache that 1% (a "hot" subset), not everything.
Materialized views in BigQuery:
-- Create a materialized view - BigQuery computes and caches the result
CREATE MATERIALIZED VIEW `project.dataset.daily_transaction_summary`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 60
)
AS
SELECT
DATE(transaction_timestamp) AS date,
country_code,
transaction_type,
SUM(amount) AS total_amount,
COUNT(*) AS num_transactions,
SUM(is_fraud::INT) AS fraud_count
FROM `project.dataset.transactions`
WHERE amount > 0
GROUP BY 1, 2, 3;
Queries against the materialized view do not scan the source transactions table - they read the precomputed result. BigQuery charges for the materialized view storage, not for scanning the source. For a 10TB source table queried with the above aggregation, the materialized view might be 10GB - 1000x smaller. Query cost drops from 0.05.
Code: BigQuery Cost Estimation with dry_run
from google.cloud import bigquery
from google.cloud.bigquery import QueryJobConfig
client = bigquery.Client()
def estimate_query_cost(
query: str,
price_per_tb: float = 5.0
) -> dict:
"""
Estimate BigQuery query cost using dry_run mode.
No data is scanned, no charge is incurred.
"""
job_config = QueryJobConfig(
dry_run=True, # No execution - just cost estimate
use_query_cache=False, # Get accurate estimate, not cached result
)
job = client.query(query, job_config=job_config)
bytes_processed = job.total_bytes_processed
tb_processed = bytes_processed / (1024 ** 4)
estimated_cost = tb_processed * price_per_tb
return {
"bytes_processed": bytes_processed,
"gb_processed": bytes_processed / (1024 ** 3),
"tb_processed": tb_processed,
"estimated_cost_usd": estimated_cost,
}
# Example: compare full scan vs partitioned query cost
full_scan_query = """
SELECT
DATE(transaction_timestamp) AS date,
country_code,
SUM(amount) AS total
FROM `my-project.transactions.events`
GROUP BY 1, 2
"""
partitioned_query = """
SELECT
DATE(transaction_timestamp) AS date,
country_code,
SUM(amount) AS total
FROM `my-project.transactions.events`
WHERE DATE(transaction_timestamp) = '2024-01-15'
GROUP BY 1, 2
"""
full_cost = estimate_query_cost(full_scan_query)
partitioned_cost = estimate_query_cost(partitioned_query)
print(f"Full table scan:")
print(f" Data scanned: {full_cost['gb_processed']:.1f} GB")
print(f" Estimated cost: ${full_cost['estimated_cost_usd']:.2f}")
print(f"\nPartitioned query (single day):")
print(f" Data scanned: {partitioned_cost['gb_processed']:.1f} GB")
print(f" Estimated cost: ${partitioned_cost['estimated_cost_usd']:.2f}")
ratio = full_cost['bytes_processed'] / max(partitioned_cost['bytes_processed'], 1)
print(f"\nPartition pruning ratio: {ratio:.0f}x cheaper")
# Use in CI/CD: fail builds if a query exceeds cost threshold
MAX_COST_USD = 10.0
for query_name, query in [("full_scan", full_scan_query), ("partitioned", partitioned_query)]:
estimate = estimate_query_cost(query)
if estimate["estimated_cost_usd"] > MAX_COST_USD:
raise ValueError(
f"Query '{query_name}' estimated cost ${estimate['estimated_cost_usd']:.2f} "
f"exceeds threshold ${MAX_COST_USD:.2f}. "
f"Add partition filter or column selection."
)
Code: Spark Broadcast Join vs Shuffle Join
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
import time
spark = SparkSession.builder \
.appName("JoinOptimization") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") # disable auto-broadcast to compare fairly
.getOrCreate()
# Simulate fact table: 100M rows, 10GB
transactions = spark.range(100_000_000).toDF("transaction_id") \
.withColumn("merchant_id", (F.rand() * 10_000).cast("int")) \
.withColumn("amount", F.rand() * 1000) \
.withColumn("user_id", (F.rand() * 1_000_000).cast("long"))
# Simulate dimension table: 10K rows, ~200KB
merchants = spark.range(10_000).toDF("merchant_id") \
.withColumn("merchant_name", F.concat(F.lit("Merchant_"), F.col("merchant_id").cast("string"))) \
.withColumn("category", F.element_at(
F.array(F.lit("food"), F.lit("retail"), F.lit("travel"), F.lit("tech")),
((F.rand() * 4 + 1).cast("int"))
))
# --- Shuffle Join (baseline) ---
print("Running shuffle join...")
start = time.perf_counter()
shuffle_result = transactions.join(merchants, "merchant_id") \
.groupBy("category") \
.agg(F.sum("amount").alias("total_amount")) \
.collect()
shuffle_time = time.perf_counter() - start
print(f"Shuffle join: {shuffle_time:.1f}s")
# --- Broadcast Join ---
print("\nRunning broadcast join...")
start = time.perf_counter()
broadcast_result = transactions.join(broadcast(merchants), "merchant_id") \
.groupBy("category") \
.agg(F.sum("amount").alias("total_amount")) \
.collect()
broadcast_time = time.perf_counter() - start
print(f"Broadcast join: {broadcast_time:.1f}s")
print(f"Speedup: {shuffle_time / broadcast_time:.1f}x")
# Typical results on 8-core machine:
# Shuffle join: 87.3s (network shuffle of 10GB fact table)
# Broadcast join: 18.4s (broadcast 200KB dimension table, no shuffle)
# Speedup: 4.7x
# --- Cost Comparison (EMR, r5.4xlarge at $1.008/hr, 10 nodes) ---
cost_per_hour = 1.008 * 10
shuffle_cost = (shuffle_time / 3600) * cost_per_hour
broadcast_cost = (broadcast_time / 3600) * cost_per_hour
print(f"\nCost for 1000 daily runs:")
print(f" Shuffle join: ${shuffle_cost * 1000:.0f}")
print(f" Broadcast join: ${broadcast_cost * 1000:.0f}")
print(f" Annual savings: ${(shuffle_cost - broadcast_cost) * 365_000:.0f}")
Code: Salting for Skewed Keys in PySpark
from pyspark.sql import SparkSession, functions as F, Window
spark = SparkSession.builder.appName("SkewFix").getOrCreate()
def create_skewed_dataset():
"""Simulate a dataset where merchant MEGA_CORP has 80% of transactions."""
normal = spark.range(2_000_000).toDF("id") \
.withColumn("merchant_id", F.concat(F.lit("MERCHANT_"),
(F.rand() * 9_999).cast("int").cast("string"))) \
.withColumn("amount", F.rand() * 100)
skewed = spark.range(8_000_000).toDF("id") \
.withColumn("merchant_id", F.lit("MEGA_CORP")) \
.withColumn("amount", F.rand() * 100)
return normal.union(skewed)
def create_merchants():
normal = spark.range(10_000).toDF("id") \
.withColumn("merchant_id", F.concat(F.lit("MERCHANT_"), F.col("id").cast("string"))) \
.withColumn("merchant_name", F.concat(F.lit("Name_"), F.col("id").cast("string"))) \
.withColumn("fee_rate", F.lit(0.02))
mega = spark.createDataFrame([("MEGA_CORP", "Mega Corporation", 0.015)],
["merchant_id", "merchant_name", "fee_rate"])
return normal.select("merchant_id", "merchant_name", "fee_rate").union(mega)
transactions = create_skewed_dataset()
merchants = create_merchants()
# ---- Without salting ----
print("Running skewed join (no salting)...")
start = time.perf_counter()
skewed_result = transactions.join(merchants, "merchant_id") \
.withColumn("fee", F.col("amount") * F.col("fee_rate")) \
.groupBy("merchant_id") \
.agg(F.sum("fee").alias("total_fees")) \
.count()
skewed_time = time.perf_counter() - start
print(f"Skewed join time: {skewed_time:.1f}s")
# ---- With salting ----
SALT_FACTOR = 10
def salt_join(fact_df, dim_df, join_key: str, salt_factor: int):
"""
Perform a join with salting to fix data skew.
Args:
fact_df: Large fact table (may have skewed keys)
dim_df: Dimension table (smaller)
join_key: Column to join on
salt_factor: Number of salt values (higher = better skew distribution)
"""
# Step 1: Add random salt to fact table
fact_salted = fact_df.withColumn(
"_salt",
(F.rand() * salt_factor).cast("int")
).withColumn(
"_salted_key",
F.concat(F.col(join_key), F.lit("_"), F.col("_salt").cast("string"))
)
# Step 2: Expand dimension table with all salt values
salts = spark.range(salt_factor).toDF("_salt")
dim_expanded = dim_df.crossJoin(salts).withColumn(
"_salted_key",
F.concat(F.col(join_key), F.lit("_"), F.col("_salt").cast("string"))
).drop("_salt")
# Step 3: Join on salted key
result = fact_salted.join(
broadcast(dim_expanded), # dim table may now be larger - check if still broadcastable
"_salted_key"
).drop("_salt", "_salted_key")
return result
print("\nRunning salted join...")
start = time.perf_counter()
salted_result = salt_join(transactions, merchants, "merchant_id", SALT_FACTOR) \
.withColumn("fee", F.col("amount") * F.col("fee_rate")) \
.groupBy("merchant_id") \
.agg(F.sum("fee").alias("total_fees")) \
.count()
salted_time = time.perf_counter() - start
print(f"Salted join time: {salted_time:.1f}s")
print(f"Speedup: {skewed_time / salted_time:.1f}x")
Cost Optimization Playbook
10 concrete actions that reduce cloud data costs 40–60%:
-
Always partition by date (or the primary filter column). Add partition filters in all production queries. A single missing
WHERE event_date = ...turns a 50 query in BigQuery. -
Select only the columns you need. In BigQuery, a 20-column table where you use 3 columns: select 3 columns and pay 15% of the full-scan cost.
SELECT *is expensive. -
Convert CSV to Parquet. Raw CSV storage is 4–7x larger than equivalent Parquet with Snappy. For a 100TB data lake, this saves $1,500/month in storage alone. Add compute savings from less data to process.
-
Enable Spark AQE.
spark.sql.adaptive.enabled=trueis a free performance win on Spark 3.0+. It reduces shuffle partitions, handles moderate skew automatically, and converts joins dynamically. -
Use Spot/Preemptible instances for batch jobs. Spark jobs with checkpointing tolerate interruption. AWS Spot instances are 60–80% cheaper than On-Demand. For batch jobs without strict SLAs, Spot is an easy win.
-
Identify and fix shuffle-heavy jobs. In the Spark UI, any stage with high "Shuffle Read" is a candidate for optimization: broadcast the smaller side of joins, pre-bucket repeatedly joined tables, increase
spark.sql.shuffle.partitionsto reduce spilling. -
Set up S3 lifecycle rules. Move data older than 90 days to S3 Standard-IA, older than 1 year to Glacier. For a data lake with 5 years of history, 80% of data is cold - tiering can save 60% on storage.
-
Use materialized views for repeated aggregations. If 50 different dashboards query the same aggregation of a 10TB table, compute the aggregation once as a materialized view. Each dashboard reads the 10GB view instead of the 10TB source.
-
Cache frequently-read Spark DataFrames. Any DataFrame read more than once in a Spark job should be
.cache()'d. Without caching, Spark recomputes the DataFrame from scratch each time it is needed downstream. -
Profile before optimizing. The biggest cost wins come from the biggest bottlenecks. Use Spark UI (stages → shuffle size, task duration), BigQuery Query Plan (bytes processed per stage), and cloud cost dashboards to find the 20% of queries causing 80% of costs. Do not optimize queries that cost $0.01.
Decision Tree Diagram
YouTube Resources
| Title | Channel | Why Watch |
|---|---|---|
| How to Optimize Your Spark Jobs | Data Engineering Podcast | Deep dive into shuffle, skew, AQE - with Spark UI walkthrough |
| BigQuery Cost Optimization Strategies | Google Cloud Tech | Official guide to partitioning, clustering, and materialized views |
| Apache Spark Performance Tuning | Databricks | Broadcast joins, AQE, data skew - from the Spark maintainers |
| Data Skew in Apache Spark Explained | DataWithBaraa | Clear explanation of skew diagnosis and salting fix |
| Cloud Data Engineering Cost Optimization | Seattle Data Guy | Real cost numbers from production BigQuery, Redshift, Snowflake |
Production Engineering Notes
The Spark UI Is Your Best Tool
Every Spark job you tune should start with the Spark UI. The stages tab shows you: which stages take the most time (sort by "Duration"), which stages have the most shuffle (sort by "Shuffle Read"), and which tasks within a stage are slow (look at max task duration vs median). The "SQL" tab shows the physical query plan with estimated row counts and bytes at each step - discrepancies between estimated and actual counts indicate statistics are stale (run ANALYZE TABLE to fix).
The Spark UI is ephemeral - it disappears when the cluster terminates. For post-hoc analysis, persist Spark event logs to S3 and use the Spark History Server or Databricks SQL analytics.
BigQuery Slot Utilization vs Cost
BigQuery on-demand pricing charges per byte scanned. BigQuery reservations charge per slot-hour. A slot is a unit of compute - approximately one virtual CPU. For high-volume workloads (querying 100+ TB/day), reservations are significantly cheaper than on-demand pricing.
The break-even calculation: if your daily on-demand spend exceeds slots × $0.05 / hour × 24, reservations are cheaper. For most data engineering teams running scheduled pipelines, reservations with slot sharing across workloads are the right model. On-demand is appropriate for development and ad-hoc analytics with unpredictable query volumes.
Parquet File Size Matters
Parquet files that are too small cause problems: the file reader overhead (opening files, reading footers, deserializing row group metadata) dominates over actual data reading. Parquet files that are too large prevent parallelism: Spark cannot split a single file across multiple tasks.
The right Parquet file size is 128MB–1GB per file. Spark's target partition size is controlled by spark.sql.files.maxPartitionBytes (default 128MB). After writing Parquet, if you have many small files (common with streaming pipelines), run a compaction job periodically:
# Compact small files into target-size Parquet files
spark.read.parquet("s3://bucket/events/date=2024-01-15/") \
.repartition(100) \ # adjust to target ~512MB per file
.write \
.mode("overwrite") \
.parquet("s3://bucket/events/date=2024-01-15/")
Columnar Predicate Pushdown Is Not Free
Parquet predicate pushdown is free for partition column filters (file-level elimination). For non-partition column filters, Parquet uses row group min/max statistics to skip row groups - but this requires that the data is sorted or clustered by the filter column. Random data provides no row group skipping benefit.
For frequently filtered non-partition columns, write Parquet sorted by those columns. Spark's sortWithinPartitions sorts within each Parquet file without a global sort (which requires a shuffle). BigQuery clustering achieves the same effect for BigQuery tables.
Common Mistakes
:::danger Missing partition filters in BigQuery (the most expensive mistake)
-- WRONG: full table scan of 10TB table - costs $50 per run
SELECT user_id, SUM(amount)
FROM `project.transactions.events`
WHERE country_code = 'US'
GROUP BY user_id;
-- RIGHT: partition filter limits scan to 1 day - costs $0.14
SELECT user_id, SUM(amount)
FROM `project.transactions.events`
WHERE country_code = 'US'
AND DATE(event_timestamp) = '2024-01-15' -- partition filter
GROUP BY user_id;
If your BigQuery table is partitioned by event_timestamp, you must filter on the partition column in every production query. Without the filter, BigQuery scans the entire table. Monitor query costs with INFORMATION_SCHEMA.JOBS and alert when a query exceeds a cost threshold.
:::
:::danger Forgetting to persist (cache) DataFrames used multiple times in Spark
# WRONG: expensive DataFrame computed twice
features = transactions.join(merchants, "merchant_id") \
.withColumn("fee", F.col("amount") * F.col("fee_rate"))
# Each action triggers recomputation from source
count = features.count() # full recomputation
fraud_rate = features.filter("is_fraud").count() / count # full recomputation again
# RIGHT: cache after expensive computation
features = transactions.join(merchants, "merchant_id") \
.withColumn("fee", F.col("amount") * F.col("fee_rate"))
features.cache()
features.count() # triggers computation and caching
count = features.count() # served from cache
fraud_rate = features.filter("is_fraud").count() / count # served from cache
# IMPORTANT: always unpersist when done
features.unpersist()
Every Spark action re-triggers the entire lineage unless the DataFrame is cached. For pipelines with branching or multiple downstream operations on the same base DataFrame, forgetting .cache() can 2–5x your compute time.
:::
:::warning Partition explosion from high-cardinality partition keys
# WRONG: partition by user_id - millions of partitions, millions of tiny files
df.write.partitionBy("user_id").parquet("output/")
# Creates 10M directories with 10M files, each ~1KB
# Every query opens 10M files - slower than no partitioning
# RIGHT: partition by date (and optionally one categorical column)
df.write.partitionBy("event_date", "country_code").parquet("output/")
# Creates 365 * 200 = 73,000 partitions - manageable
# Each file is 10–500MB - efficient
Partition cardinality should be in the hundreds to tens of thousands. Millions of partitions kill query performance because file listing overhead dominates. The symptom is that queries against a partitioned table are slower than the same query against a non-partitioned table. :::
:::warning Oversizing Spark clusters for jobs that are bottlenecked on network If a Spark job is bottlenecked on shuffle (network I/O), adding more executors does not help - more executors means more network connections and potentially more shuffle data. The bottleneck is the network bandwidth per executor, not the number of executors.
For shuffle-heavy jobs, the right optimization is reducing the shuffle itself (broadcast joins, pre-bucketing, better partitioning), not adding more machines. Diagnose before scaling: if adding 2x machines reduces runtime by less than 1.5x, you have a bottleneck that is not compute-bound. :::
Interview Q&A
Q1: Your Spark job takes 4 hours and costs $500 per run. Walk through how you diagnose and fix it.
Start with diagnosis before making any changes. The Spark UI is the first stop:
Step 1: Find the bottleneck stage. In the Spark UI stages tab, sort by duration. One stage will dominate - that is where you spend your time. Note whether it is a shuffle stage (blue/green "shuffle read" bytes) or a compute stage.
Step 2: Check for skew. Click into the slow stage, look at the Tasks tab. If the max task duration is 10x+ the median, you have data skew. Check "Input Size / Records" per task to confirm one partition has far more data.
Step 3: Check the query plan. In the SQL tab, look at the physical plan. Sort-merge joins on large tables are expensive. Identify joins where one side is small enough to broadcast (under 200MB typically).
Step 4: Check shuffle metrics. Large "Shuffle Read" bytes in a stage indicates excessive data movement. This can come from too many GROUP BY columns, unnecessary repartition calls, or Cartesian joins.
Common fixes:
- If skew detected: apply salting for the skewed join key, or enable
spark.sql.adaptive.skewJoin.enabled=true - If large shuffle join: convert to broadcast join with
broadcast(small_df) - If too many partitions:
spark.conf.set("spark.sql.shuffle.partitions", "200")instead of the default 200 or 2000 depending on data size - If reading too much data: add partition filters, select only needed columns
- If recomputing: cache DataFrames used in multiple downstream operations
The 512 to $87. The diagnosis took 20 minutes, the fix took 25 minutes.
Q2: Explain the latency vs throughput vs cost triangle. Give a real example where you made a deliberate trade-off.
The triangle states: you can optimize any two of latency, throughput, and cost, but not all three simultaneously.
Real example: A feature engineering job for a fraud detection model. The SLA says features must be ready within 3 hours of transaction data landing. Current setup: 50-node Spark cluster running for 2.5 hours = $350/run.
If we optimize for cost over latency (while keeping throughput): move to a 20-node cluster. Runtime increases to 5 hours - violates the 3-hour SLA. Cost drops to $140/run. This trade-off is unacceptable.
If we optimize for latency and throughput within cost constraint: use Spark AQE + broadcast join optimization (no cost change), reducing runtime to 1.5 hours on the 50-node cluster. Or switch to DuckDB on a single high-memory machine (350): runtime 2.1 hours within SLA, cost $105/run. This is the acceptable trade-off.
The key insight: always identify your hard constraints (SLA = 3 hours) first, then optimize the free variables (cost, within the constraint). Optimizing for speed beyond the SLA is wasted money.
Q3: What is data skew in Spark and how do you fix it? Why does salting work?
Data skew occurs when one partition has significantly more rows than others. In Spark, one task processes one partition. If partition A has 1 billion rows and all other partitions have 1 million rows each, task A takes 1000x longer than the others. The entire stage cannot complete until all tasks finish - so you wait for task A while 499 executors sit idle.
Skew typically occurs on join keys with high concentration - a single merchant processing 90% of transactions, a single event type dominating the logs, or a single user generating most of the activity.
Salting works by splitting the skewed key into multiple virtual keys. "AMAZON" becomes "AMAZON_0", "AMAZON_1", ..., "AMAZON_9". The rows with key "AMAZON" are randomly distributed across 10 partitions instead of one. Each partition has 1/10 the rows of the original skewed partition.
For the join to work, the dimension table must also be expanded: the "AMAZON" row in the merchant table is duplicated 10 times, one for each salt value. When you join "AMAZON_0" transactions to "AMAZON_0" merchants, "AMAZON_1" to "AMAZON_1", etc., you get the correct result.
The trade-off: the dimension table grows by SALT_FACTOR. For a 200MB dimension table with salt factor 10, it becomes 2GB. This is usually still broadcastable. The fact table's salted key column adds a small amount of memory overhead. The key metric: if the skewed partition is 10x larger than average, a salt factor of 10 should make all partitions roughly equal in size.
Q4: When is BigQuery better than Spark, and when is Spark better than BigQuery?
BigQuery wins when:
- Workloads are ad-hoc or highly variable in query patterns - no cluster to right-size
- Queries are purely SQL (joins, aggregations, window functions) - no custom code needed
- The team does not have Spark expertise - BigQuery needs no infrastructure management
- Queries are infrequent enough that on-demand pricing is cheaper than reservation
- Fast individual query latency matters (BigQuery often starts in under 5 seconds vs Spark cluster startup of 3–5 minutes)
Spark wins when:
- You need custom Python/Scala/Java code in the transformation (ML model scoring, complex UDFs, streaming)
- You run the same heavy transformation repeatedly on large data - bulk compute is cheaper with reserved instances than per-byte pricing
- You need exact control over execution (custom partitioning, specific join strategies)
- You need Spark's streaming capabilities (Spark Structured Streaming for Kafka integration)
- The transformation involves multiple hops and intermediate materializations - Spark handles DAG execution natively
The practical boundary: under 1TB per query and SQL-expressible → BigQuery. Over 1TB repeatedly or needing custom code → Spark.
Q5: What is partition pruning and how does it affect both cost and performance?
Partition pruning is the query engine's ability to skip entire partitions (directories of files) that cannot contain rows matching the query's filter predicates.
In a Parquet dataset partitioned by event_date, stored as:
s3://data-lake/events/event_date=2024-01-01/part-001.parquet
s3://data-lake/events/event_date=2024-01-02/part-001.parquet
...
s3://data-lake/events/event_date=2024-12-31/part-001.parquet
A query with WHERE event_date = '2024-01-15' triggers partition pruning: the query engine reads the directory listing, identifies that only event_date=2024-01-15 matches, and schedules tasks only for those files. The other 364 directories are never read.
The performance impact: instead of reading 365 files (~10TB), the query reads 1 file (~27GB). Runtime drops from hours to minutes. In BigQuery, cost drops proportionally (365x cheaper). In Spark, the number of tasks drops 365x - the job is parallelizable across far fewer tasks.
Partition pruning only works when:
- The table is actually partitioned (not just stored in flat directories)
- The query filter is on the partition column
- The filter value is a literal, not a computed expression -
WHERE DATE(event_timestamp) = '2024-01-15'may not prune efficiently;WHERE event_date = '2024-01-15'(ifevent_dateis the partition column) will prune.
Q6: Explain BigQuery capacity reservations vs on-demand pricing. When should a team switch?
BigQuery on-demand charges $5 per TB scanned. For light usage (a few TB/day), this is cost-effective - you pay only for what you use.
Reservations purchase a fixed number of BigQuery slots (compute units) for a flat monthly price. In 2024, one slot costs ~29/month in annual commitment. Slots are shared across all jobs in the reservation.
The math for switching: If your team processes 1,000 TB/day on on-demand pricing:
Daily cost (on-demand): 1,000 TB × $5/TB = $5,000/day
Monthly cost: ~$150,000
With 1,000 slots (sufficient for moderate query parallelism):
Monthly cost (commitment): 1,000 slots × $29/slot = $29,000/month
Monthly savings: $121,000
The break-even is typically around 200–500 TB/month for most workloads. Below that, on-demand is simpler and cheaper. Above that, reservations save 60–80%.
The caveat: reservations require planning slot capacity. If your query load spikes and you are out of slots, queries queue rather than auto-scaling. On-demand has no such constraint. Many teams use a hybrid: reservations for predictable scheduled workloads, on-demand for ad-hoc analytics.
Summary
Cost and performance in data infrastructure are inseparable. The latency-throughput-cost triangle defines the trade-space for every architectural decision. BigQuery's per-byte pricing makes partitioning and column selection economically critical - not just for performance but for literally how much you pay per query. Spark's per-hour pricing makes shuffle minimization the primary optimization target - shuffle is both the performance bottleneck and the driver of cluster hours billed.
Data skew is the single most common cause of Spark jobs that take 10x longer than they should. The diagnosis is straightforward (Spark UI → slow stage → skewed task duration). The fix - salting for large join key concentration, broadcast joins for small dimension tables, Adaptive Query Execution for automatic handling - is well-understood and reliably effective.
The cost optimization playbook compresses to a few key practices: partition by date and always filter on the partition column, select only the columns you need, use broadcast joins for small dimension tables, enable Spark AQE, and tier cold storage to cheaper storage classes. These five practices alone can reduce cloud data costs by 40–60% on most data lakes. The engineers who master these trade-offs build systems that scale with data volume without scaling the cost proportionally.
