Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Lakehouse Architecture demo on the EngineersOfAI Playground - no code required. :::

Data Lake and Data Warehouse for ML

Every database decision is a bet on the future shape of your queries. The data lake is what you build when you admit you don't know what shape the queries will take.

The Production Moment

The analytics team had built a clean, well-structured data warehouse in Snowflake. Three years of carefully modeled tables: fact tables, dimension tables, star schema. SQL queries ran in seconds. Dashboards were crisp. The business intelligence team was happy.

Then the ML team arrived.

Their first request: "We need to train a recommendation model. Can we get the raw click logs for the past two years?" The warehouse contained aggregated daily click statistics. The raw events - which user clicked which item at what exact timestamp, with what session context and what device fingerprint - had been thrown away during the ETL transformation. The warehouse had the summaries but not the raw material.

Their second request: "We need to compute 'number of sessions in the last 6 hours' as a feature. Can the warehouse compute that?" The warehouse was optimized for daily aggregations. Hourly queries were slow; 6-hour rolling windows were excruciatingly slow - full table scans with expensive window functions.

Their third request: "We want to store model predictions alongside the user features for training." The strict schema enforcement rejected their attempt to add a float column for a predicted probability alongside categorical features.

The warehouse was built for BI. The ML team needed something different.

This is the story that drove the evolution from data warehouse to data lake to lakehouse: each successive architecture responding to the limitations of the previous one, adding capabilities ML teams needed.

The Evolution of Data Storage for ML

The history of data infrastructure for ML follows four generations, each addressing specific failures of the previous:

Generation 1 - RDBMS (Oracle, PostgreSQL, MySQL): Designed for transactional workloads - row-level operations, ACID guarantees, strong consistency. Terrible for ML training: too small (gigabytes, not terabytes), optimized for row access (bad for column scans), schema changes are expensive, and no support for unstructured data.

Generation 2 - Data Warehouse (Teradata, then Redshift, BigQuery, Snowflake): Columnar storage made analytical queries fast. SQL window functions enabled complex feature computation. Petabyte scale achieved. But: expensive for cold data storage, rigid schema (bad for raw ML data), and the "warehouse wall" - raw data is transformed before storage, losing the ability to reprocess from scratch.

Generation 3 - Data Lake (HDFS, then S3/GCS with Hive): Store everything raw, process on demand. Schema on read. Cheap storage ($0.023/GB/month). But: no ACID transactions (corrupt partial writes), no DML (can't update records), small file problem (thousands of small files kill query performance), and no SQL without setting up a query engine (Hive, Presto, Spark SQL).

Generation 4 - Lakehouse (Delta Lake, Apache Iceberg, Apache Hudi): ACID transactions on object storage, time travel, schema enforcement, DML operations - all while keeping the cost and scale advantages of the data lake. This is where production ML teams are today.

Data Warehouse Deep Dive

A data warehouse stores structured data in a format optimized for analytical queries. The key technology enabling this is columnar storage.

Why Columnar Storage Wins for ML Feature Engineering

In a row-oriented database (like PostgreSQL), a table is stored as a sequence of rows:

[user_id=1, age=25, income=50000, city="NYC", purchases_30d=12, ...]
[user_id=2, age=31, income=75000, city="LA", purchases_30d=5, ...]
[user_id=3, age=28, income=42000, city="NYC", purchases_30d=18, ...]

To compute "average purchases_30d for users in NYC," you must read every field of every row - even fields you don't need (age, income, city of non-NYC users).

In a columnar database (BigQuery, Snowflake), each column is stored together:

user_id column: [1, 2, 3, 4, 5, ...]
age column: [25, 31, 28, 42, 19, ...]
city column: ["NYC", "LA", "NYC", "CHI", "NYC", ...]
purchases_30d: [12, 5, 18, 3, 22, ...]

To compute "average purchases_30d for NYC users," you only read the city column (to filter) and the purchases_30d column. If there are 100 columns, you only read 2% of the data. This translates to 50× faster queries and proportionally lower cost.

For ML feature engineering, which typically computes aggregations over a small subset of columns across all rows, columnar storage is transformative.

-- BigQuery: compute user features for ML training
-- Scans only the required columns (not the entire table)
SELECT
user_id,
DATE(event_time) AS feature_date,

-- Transaction features
COUNT(DISTINCT session_id) AS sessions_30d,
COUNT(*) AS pageviews_30d,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchases_30d,
AVG(CASE WHEN event_type = 'purchase' THEN amount ELSE NULL END) AS avg_purchase_amount,

-- Recency features
DATE_DIFF(DATE(event_time),
DATE(MIN(event_time) OVER (PARTITION BY user_id)), DAY) AS days_since_first_event,
DATE_DIFF(DATE(MAX(event_time) OVER (PARTITION BY user_id)),
DATE(event_time), DAY) AS days_since_last_event,

-- Behavioral pattern
COUNTIF(EXTRACT(HOUR FROM event_time) BETWEEN 9 AND 17) / COUNT(*) AS business_hours_ratio

FROM `project.events.user_events`
WHERE event_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY user_id, DATE(event_time)

Data Warehouse Cost Model

Understanding warehouse costs is critical for ML teams that process large volumes of training data.

BigQuery pricing (query-based, serverless):

  • $5 per TB of data scanned
  • Partitioned tables + column selection = massive cost reduction
  • No idle costs - you pay only when querying

Snowflake pricing (compute-based):

  • 22–4 per credit (one credit = one vCPU-hour)
  • XS warehouse (1 credit/hr) to 4XL warehouse (128 credits/hr)
  • Storage: $23/TB/month (compressed)

Cost comparison at scale:

def warehouse_cost_estimate(
table_size_tb: float,
columns_accessed_fraction: float, # e.g., 0.1 = 10% of columns
queries_per_day: int,
# BigQuery pricing
bigquery_cost_per_tb_scanned: float = 5.0,
# Snowflake pricing
snowflake_storage_per_tb_month: float = 23.0,
snowflake_compute_credits_per_hour: float = 2.0
) -> dict:
"""Estimate warehouse costs for ML feature engineering workloads."""
# BigQuery: columnar pruning significantly reduces effective scan size
effective_scan_tb = table_size_tb * columns_accessed_fraction
bigquery_daily_cost = effective_scan_tb * queries_per_day * bigquery_cost_per_tb_scanned

# Snowflake: pay for compute time, not data scanned
# Assume Medium warehouse (4 credits/hr) running 4 hours/day for feature jobs
snowflake_compute_daily = 4 * 4 * snowflake_compute_credits_per_hour # 4 hrs * 4 credits * $2
snowflake_storage_monthly = table_size_tb * snowflake_storage_per_tb_month
snowflake_daily_total = snowflake_compute_daily + snowflake_storage_monthly / 30

return {
"table_size_tb": table_size_tb,
"bigquery_daily_usd": bigquery_daily_cost,
"bigquery_monthly_usd": bigquery_daily_cost * 30,
"snowflake_daily_usd": snowflake_daily_total,
"snowflake_monthly_usd": snowflake_daily_total * 30,
}

# 10 TB table, 10 feature engineering queries/day
result = warehouse_cost_estimate(10.0, 0.1, 10)
for k, v in result.items():
if isinstance(v, float):
print(f" {k}: ${v:,.0f}")
else:
print(f" {k}: {v}")

Data Lake Deep Dive

A data lake is object storage (S3, GCS, ADLS) storing files in any format. The defining characteristic: schema on read. The lake accepts data in whatever format it arrives; structure is imposed only when the data is read.

Why the Data Lake Was Revolutionary for ML

Immutability and reprocessing: Raw data in a data lake is never modified. If your feature engineering logic has a bug (and it will), you fix the bug and reprocess from the raw data. With a traditional ETL pipeline that overwrites data, you've lost the ability to reproduce historical features.

Any format: JSON event logs, CSV user profiles, Parquet feature tables, Avro Kafka messages, binary image files, unstructured text documents - all can coexist in the same data lake. ML teams need all of these.

Decoupled compute and storage: You pay for storage regardless of whether you process the data (cheap). You pay for compute only when you run a processing job (variable). This is fundamentally different from a warehouse where compute and storage are bundled.

File Formats: Parquet, ORC, Avro

The choice of file format significantly impacts query performance and storage cost.

Parquet: Columnar storage, highly compressed, excellent for analytical workloads. The default choice for ML feature data, training datasets, and any data that will be read column-by-column.

Parquet characteristics:
- Columnar: excellent for analytical queries (scan only needed columns)
- Compression: Snappy or ZSTD (4-6x compression ratio)
- Predicate pushdown: filter evaluation inside the file reader (skip irrelevant row groups)
- Splittable: can be read in parallel across multiple tasks
- Best for: training data, feature tables, historical analytics

ORC: Also columnar, slightly better compression than Parquet, but less ecosystem support. Used primarily in Hive-heavy environments.

Avro: Row-oriented, schema embedded in file. Excellent for streaming data (write one record at a time, schema always available). Not ideal for analytical queries.

# Writing Parquet with optimal settings for ML workloads
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd

def write_ml_training_data(df: pd.DataFrame, path: str, partition_cols: list[str] = None):
"""
Write training data to Parquet with ML-optimized settings.
"""
table = pa.Table.from_pandas(df)

# Compression: ZSTD provides better ratio than Snappy with comparable speed
write_options = pq.ParquetWriter(
path,
table.schema,
compression='zstd',
compression_level=3,
# Row group size: 128 MB default, affects parallelism
row_group_size=128 * 1024 * 1024,
# Statistics enable predicate pushdown at the row group level
write_statistics=True,
# Data page size: smaller = better predicate pushdown, larger = better compression
data_page_size=1024 * 1024 # 1 MB
)
write_options.write_table(table)
write_options.close()

# Better: use partitioned dataset writer for large files
def write_partitioned_dataset(df: pd.DataFrame, base_path: str,
partition_cols: list[str]):
"""Write partitioned Parquet dataset for efficient querying by date/user_id."""
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=base_path,
partition_cols=partition_cols, # e.g., ["year", "month", "day"]
compression="zstd",
existing_data_behavior="overwrite_or_ignore"
)

Partitioning Strategy

How you partition your data lake determines how much data must be scanned for any given query. Poor partitioning = full table scans = slow and expensive. Good partitioning = predicate pruning = fast and cheap.

Partition by date for time-series ML data (the most common pattern):

s3://data-lake/events/
year=2024/month=11/day=15/hour=14/
part-000.parquet (~128 MB)
part-001.parquet (~128 MB)

A query for "last 7 days" only opens 7 × 24 = 168 partitions instead of all data.

Partition by entity type + date for feature data:

s3://data-lake/features/user_features/
feature_date=2024-11-15/
part-000.parquet (all users for this date)

Anti-patterns to avoid:

  • Partitioning by high-cardinality columns (user_id with 100M distinct values = 100M partitions = disaster)
  • Tiny files: thousands of 1 MB files perform worse than tens of 256 MB files
  • Too many partition columns: each additional partition level multiplies the metadata overhead
# Detecting and fixing the small file problem
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SmallFileChecker").getOrCreate()

def check_partition_sizes(path: str, min_size_mb: float = 64.0) -> dict:
"""Check for small file problem in data lake partitions."""
sc = spark.sparkContext
uri_info = spark._jvm.org.apache.hadoop.fs.Path(path)
fs = uri_info.getFileSystem(spark._jsc.hadoopConfiguration())

stats = {"total_files": 0, "small_files": 0, "total_size_gb": 0.0}
file_statuses = fs.listFiles(uri_info, True)

while file_statuses.hasNext():
status = file_statuses.next()
size_mb = status.getLen() / (1024 * 1024)
stats["total_files"] += 1
stats["total_size_gb"] += size_mb / 1024
if size_mb < min_size_mb:
stats["small_files"] += 1

stats["small_file_pct"] = stats["small_files"] / max(stats["total_files"], 1) * 100
return stats

# Fix: compact small files during off-peak hours
def compact_partition(path: str, target_file_size_mb: int = 256):
"""Compact small files into larger ones using Spark."""
df = spark.read.parquet(path)
row_count = df.count()

# Estimate target partition count based on desired file size
# Rough assumption: 10 KB per row (adjust based on schema)
estimated_size_mb = row_count * 10_000 / (1024 * 1024)
target_partitions = max(1, int(estimated_size_mb / target_file_size_mb))

df.coalesce(target_partitions) \
.write \
.mode("overwrite") \
.parquet(path)

When Each Wins for ML

The practical decision guide:

Use caseBest storageReason
Raw event storageData lake (S3 + Parquet)Cheap, flexible, immutable
Interactive feature explorationData warehouse (BigQuery)Fast SQL, serverless
Daily feature engineering jobsData lake + Spark or DWHDepends on volume
Training data exportData lake (S3 + Parquet)Spark/PyTorch reads directly
Online feature servingRedis / CassandraSub-10ms access needed
Model prediction loggingData lakeCheap, append-only
A/B test analysisData warehouseComplex SQL, analyst-friendly
Historical feature debuggingLakehouse (Delta)Time travel capability

Cost Comparison at 1 PB Scale

def cost_comparison_1pb():
"""Compare storage costs for 1 PB of ML training data."""
size_tb = 1000 # 1 PB

# S3 Standard (uncompressed): $23/TB/month
# S3 with Parquet + Snappy compression (5x): effectively $4.60/TB/month
s3_parquet_monthly = size_tb * (23 / 5) # after compression

# BigQuery active storage: $20/TB/month
# BigQuery long-term storage (>90 days): $10/TB/month
bigquery_monthly = size_tb * 10 # long-term

# Snowflake: $23/TB/month (compressed)
snowflake_monthly = size_tb * 23

print("Monthly storage costs for 1 PB (Parquet-compressed):")
print(f" S3 + Parquet (5x compression): ${s3_parquet_monthly:,.0f}/month")
print(f" BigQuery long-term storage: ${bigquery_monthly:,.0f}/month")
print(f" Snowflake: ${snowflake_monthly:,.0f}/month")

# Query cost comparison for 100 daily ML jobs each scanning 1 TB
bq_daily_query_cost = 100 * 1 * 5 # 100 jobs * 1 TB * $5/TB
print(f"\nBigQuery: daily query cost = ${bq_daily_query_cost:,}/day")
print(f" (but only when querying - storage is separate)")
print(f" Use partitioning + column selection to reduce to 10% of this")

cost_comparison_1pb()

Production Engineering Notes

Data Skew in ML Training Data

Data skew - where certain partition keys have far more data than others - is a major problem for distributed ML training and feature engineering.

# Detecting and fixing data skew in Spark
from pyspark.sql import functions as F

def detect_skew(df, partition_col: str, threshold_factor: float = 5.0):
"""Identify skewed partitions (significantly larger than average)."""
partition_sizes = df.groupBy(partition_col).count()
stats = partition_sizes.agg(
F.avg("count").alias("avg_count"),
F.max("count").alias("max_count"),
F.stddev("count").alias("stddev_count")
).collect()[0]

avg = stats["avg_count"]
max_count = stats["max_count"]
skew_ratio = max_count / avg

print(f"Average rows per {partition_col}: {avg:,.0f}")
print(f"Maximum rows per {partition_col}: {max_count:,.0f}")
print(f"Skew ratio: {skew_ratio:.1f}x")

if skew_ratio > threshold_factor:
print(f"WARNING: Data skew detected (>{threshold_factor}x). Consider:")
print(" 1. Salting the skewed key")
print(" 2. Repartitioning before joins")
print(" 3. Broadcast join if the smaller table fits in memory")

# Return top 10 largest partitions
return partition_sizes.orderBy(F.desc("count")).limit(10)

# Fix skew with key salting (for joins)
def salt_join(large_df, small_df, join_key: str, num_buckets: int = 10):
"""
Salt technique: artificially spread skewed keys across multiple buckets.
Increases parallelism for highly imbalanced joins.
"""
import random

# Add random salt to large table
large_salted = large_df.withColumn(
"salted_key",
F.concat(F.col(join_key), F.lit("_"),
(F.rand() * num_buckets).cast("int").cast("string"))
)

# Explode small table to match all salt values
from pyspark.sql.types import IntegerType
small_exploded = small_df.withColumn(
"salt",
F.explode(F.array(*[F.lit(i) for i in range(num_buckets)]))
).withColumn(
"salted_key",
F.concat(F.col(join_key), F.lit("_"), F.col("salt").cast("string"))
).drop("salt")

# Join on salted key
result = large_salted.join(small_exploded, on="salted_key", how="left")
return result.drop("salted_key")

Data Lake Zone Architecture

Production data lakes are organized into zones, each with a specific contract on data quality and transformation state. This is often called the medallion architecture.

Bronze zone (also called "raw" or "landing"): Data lands exactly as received. No transformations. Immutable. Everything is preserved - even malformed records. S3 path: s3://data-lake/bronze/events/year=2024/month=11/day=15/. Retention: unlimited (this is the source of truth you can reprocess from).

Silver zone (also called "cleaned" or "conformed"): Bronze data after deduplication, schema validation, null handling, and format normalization (everything converted to Parquet). Records that fail validation are routed to a quarantine path, not dropped. S3 path: s3://data-lake/silver/events/year=2024/month=11/day=15/.

Gold zone (also called "curated" or "business"): Silver data aggregated to business-level entities. For ML: this is where feature snapshots live - one row per user per day, with all computed features. S3 path: s3://data-lake/gold/user_features/feature_date=2024-11-15/.

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("medallion_pipeline").getOrCreate()

def bronze_to_silver(
bronze_path: str,
silver_path: str,
quarantine_path: str,
partition_date: str
) -> dict:
"""
Clean and validate raw events.
Routes invalid records to quarantine instead of dropping them.
Returns: dict with counts for valid, duplicate, and quarantined records.
"""
# Read raw data with permissive schema (don't fail on malformed records)
raw_df = spark.read \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.json(f"{bronze_path}/date={partition_date}/")

# Separate clean vs corrupt records
corrupt_mask = F.col("_corrupt_record").isNotNull()
corrupt_df = raw_df.filter(corrupt_mask)
clean_df = raw_df.filter(~corrupt_mask).drop("_corrupt_record")

# Deduplication: keep one record per (user_id, event_timestamp, event_type)
deduplicated = clean_df.dropDuplicates(["user_id", "event_timestamp", "event_type"])

# Validation: remove records with null required fields
required_fields = ["user_id", "event_type", "event_timestamp"]
valid_df = deduplicated
invalid_df_list = []
for field in required_fields:
null_mask = F.col(field).isNull()
invalid_df_list.append(deduplicated.filter(null_mask).withColumn("_validation_error", F.lit(f"null_{field}")))
valid_df = valid_df.filter(~null_mask)

# Write valid data to silver
valid_df.write \
.format("parquet") \
.mode("overwrite") \
.partitionBy("year", "month", "day") \
.save(silver_path)

# Write invalid data to quarantine for investigation
if invalid_df_list:
from functools import reduce
quarantine_df = reduce(lambda a, b: a.union(b), invalid_df_list)
quarantine_df.union(
corrupt_df.withColumn("_validation_error", F.lit("corrupt_json"))
).write.mode("append").json(f"{quarantine_path}/date={partition_date}/")

return {
"total_read": raw_df.count(),
"corrupt_records": corrupt_df.count(),
"valid_written": valid_df.count(),
}


def silver_to_gold_user_features(
silver_path: str,
gold_path: str,
feature_date: str
) -> None:
"""
Compute daily user feature snapshot from silver events.
Result: one row per user with 30-day rolling aggregates.
"""
from datetime import datetime, timedelta

date_obj = datetime.strptime(feature_date, "%Y-%m-%d")
window_start = (date_obj - timedelta(days=30)).strftime("%Y-%m-%d")

events = spark.read.parquet(silver_path) \
.filter(f"event_date BETWEEN '{window_start}' AND '{feature_date}'")

user_features = events.groupBy("user_id").agg(
F.count("*").alias("total_events_30d"),
F.countDistinct("session_id").alias("sessions_30d"),
F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchase_count_30d"),
F.sum(F.when(F.col("event_type") == "purchase", F.col("amount")).otherwise(0)).alias("purchase_amount_30d"),
F.avg(F.when(F.col("event_type") == "purchase", F.col("amount"))).alias("avg_purchase_amount_30d"),
F.countDistinct("item_id").alias("unique_items_viewed_30d"),
F.max("event_timestamp").alias("last_seen_at"),
).withColumn("feature_date", F.lit(feature_date))

user_features.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"feature_date = '{feature_date}'") \
.save(gold_path)

Data Catalog and Lineage

A data lake without a catalog is an archaeological excavation site - full of artifacts nobody can identify. Production data lakes require metadata management to stay navigable at scale.

from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime

@dataclass
class DataAsset:
"""Metadata record for a data lake asset."""
name: str
path: str # s3://bucket/prefix/
zone: str # bronze, silver, gold
format: str # parquet, delta, json
schema_version: str # "v1.2.0"
owner_team: str
owner_email: str
description: str
partition_columns: list[str]
row_count_approx: int
size_gb_approx: float
created_at: str
last_updated_at: str
upstream_assets: list[str] = field(default_factory=list) # lineage: what feeds this
downstream_assets: list[str] = field(default_factory=list) # lineage: what uses this
tags: list[str] = field(default_factory=list)
quality_score: float = 1.0 # 0.0 to 1.0, from data quality checks

class DataCatalog:
"""Centralized registry for data lake assets."""

def __init__(self):
self.assets: dict[str, DataAsset] = {}

def register(self, asset: DataAsset):
self.assets[asset.name] = asset

def find_assets_by_tag(self, tag: str) -> list[DataAsset]:
return [a for a in self.assets.values() if tag in a.tags]

def get_lineage(self, asset_name: str, direction: str = "both") -> dict:
"""Return upstream and downstream lineage graph for an asset."""
asset = self.assets.get(asset_name)
if not asset:
return {}

result = {"asset": asset_name}
if direction in ("upstream", "both"):
result["upstream"] = [
self.get_lineage(up, "upstream")
for up in asset.upstream_assets
]
if direction in ("downstream", "both"):
result["downstream"] = [
self.get_lineage(down, "downstream")
for down in asset.downstream_assets
]
return result

def impact_analysis(self, asset_name: str) -> list[str]:
"""Which downstream assets are affected if this asset changes?"""
asset = self.assets.get(asset_name)
if not asset:
return []
affected = list(asset.downstream_assets)
for downstream in asset.downstream_assets:
affected.extend(self.impact_analysis(downstream))
return list(set(affected))

# Usage example
catalog = DataCatalog()
catalog.register(DataAsset(
name="silver_user_events",
path="s3://company-lake/silver/user_events/",
zone="silver",
format="parquet",
schema_version="v2.0.0",
owner_team="data-platform",
owner_email="[email protected]",
description="Cleaned and deduplicated user event stream, 30-day retention in silver",
partition_columns=["year", "month", "day"],
row_count_approx=5_000_000_000,
size_gb_approx=850.0,
created_at="2023-06-01",
last_updated_at=datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
upstream_assets=["bronze_user_events"],
downstream_assets=["gold_user_features", "gold_session_aggregates"],
tags=["user", "events", "ml-input"],
quality_score=0.99
))

Storage Cost Optimization for ML Workloads

At petabyte scale, storage costs dominate the data infrastructure budget. Understanding the cost levers prevents runaway bills.

from dataclasses import dataclass
from typing import Literal

@dataclass
class StorageCostEstimate:
storage_class: str
monthly_cost_per_tb: float
retrieval_cost_per_tb: float # cost to read
min_storage_days: int

# AWS S3 pricing tiers (approximate as of 2024)
S3_TIERS = {
"standard": StorageCostEstimate("S3 Standard", 23.0, 0.0, 0),
"intelligent_tiering": StorageCostEstimate("S3 Intelligent-Tiering", 23.0, 0.0, 0),
"standard_ia": StorageCostEstimate("S3 Standard-IA", 12.5, 10.0, 30),
"glacier_ir": StorageCostEstimate("S3 Glacier Instant Retrieval", 4.0, 10.0, 90),
"glacier_flexible": StorageCostEstimate("S3 Glacier Flexible", 3.6, 2.0, 90),
"deep_archive": StorageCostEstimate("S3 Glacier Deep Archive", 0.99, 2.0, 180),
}

def recommend_storage_tier_for_ml_data(
data_age_days: int,
access_frequency_per_month: int,
data_size_tb: float
) -> dict:
"""
Recommend the optimal S3 storage tier based on data access patterns.
ML data has very predictable access patterns:
- Recent data (0-30 days): hot, accessed daily for features
- Mid-age data (30-90 days): warm, accessed for retraining
- Old data (90+ days): cold, accessed rarely for audits or experiments
- Archive (365+ days): frozen, GDPR evidence, never accessed except legal holds
"""
if data_age_days <= 30 and access_frequency_per_month >= 10:
tier = S3_TIERS["standard"]
reasoning = "Hot data: daily feature computation accesses this"
elif data_age_days <= 90 and access_frequency_per_month >= 2:
tier = S3_TIERS["standard_ia"]
reasoning = "Warm data: weekly model retraining reads"
elif data_age_days <= 365 and access_frequency_per_month >= 1:
tier = S3_TIERS["glacier_ir"]
reasoning = "Cool data: monthly experiments, instant retrieval needed"
elif data_age_days <= 730:
tier = S3_TIERS["glacier_flexible"]
reasoning = "Cold data: rare access, 3-5 hour retrieval acceptable"
else:
tier = S3_TIERS["deep_archive"]
reasoning = "Archive: legal/compliance holds only, 12-48 hour retrieval"

monthly_storage_cost = data_size_tb * tier.monthly_cost_per_tb
monthly_retrieval_cost = (access_frequency_per_month / 30) * data_size_tb * tier.retrieval_cost_per_tb

return {
"recommended_tier": tier.storage_class,
"reasoning": reasoning,
"monthly_storage_cost_usd": round(monthly_storage_cost, 2),
"monthly_retrieval_cost_usd": round(monthly_retrieval_cost, 2),
"total_monthly_cost_usd": round(monthly_storage_cost + monthly_retrieval_cost, 2),
}

# Lifecycle policy automation using boto3
import boto3

def configure_ml_data_lifecycle(bucket: str, prefix: str):
"""
Set up S3 lifecycle rules to automatically tier ML training data
from Standard → Standard-IA → Glacier based on age.
"""
s3 = boto3.client("s3")

lifecycle_policy = {
"Rules": [
{
"ID": f"ml-data-tiering-{prefix}",
"Filter": {"Prefix": prefix},
"Status": "Enabled",
"Transitions": [
{"Days": 30, "StorageClass": "STANDARD_IA"},
{"Days": 90, "StorageClass": "GLACIER_IR"},
{"Days": 365, "StorageClass": "DEEP_ARCHIVE"},
],
"NoncurrentVersionTransitions": [
{"NoncurrentDays": 7, "StorageClass": "STANDARD_IA"},
{"NoncurrentDays": 30, "StorageClass": "GLACIER_IR"},
],
# Delete non-current versions after 90 days (saves cost on versioned buckets)
"NoncurrentVersionExpiration": {"NoncurrentDays": 90},
}
]
}

s3.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration=lifecycle_policy
)
print(f"Lifecycle policy configured for s3://{bucket}/{prefix}")
print("Expected cost reduction: 60-80% for data older than 90 days")

Common Mistakes

:::danger Storing Only Aggregated Data (The Warehouse Wall) The most expensive data infrastructure mistake for ML teams: ETL pipelines that throw away raw events and store only daily aggregates. Three months later, the ML team needs raw events for feature engineering and they don't exist anymore. Always retain raw data in the data lake before aggregating. Storage is cheap. Re-collecting historical data is impossible. :::

:::warning Ignoring File Sizes in the Data Lake The "small file problem" is the data lake's silent killer. Thousands of 1 MB files from a streaming pipeline run significantly slower than dozens of 256 MB files with the same total size. Each file requires a separate S3 GET request, and Spark tasks have fixed overhead per file. Run compaction jobs during off-peak hours to maintain healthy file sizes (128–512 MB per file). :::

:::warning Not Partitioning for Your Query Patterns Partitioning by year/month/day is not always optimal. If your ML queries filter by user_id or product category, partition by those columns too. But never partition by a column with millions of distinct values - you end up with millions of tiny partitions, each with overhead. The rule: partition columns should have low cardinality (days, months, categories) and be frequently used in WHERE clauses. :::

Interview Q&A

Q1: What is the difference between schema-on-write (data warehouse) and schema-on-read (data lake), and why does it matter for ML?

Schema-on-write enforces the schema at write time: data must conform to the defined structure before it can be stored. This guarantees query-time correctness but makes ingesting new or variable data expensive (requires schema migration).

Schema-on-read stores data in any format and imposes schema only when reading. This makes ingestion flexible - any data lands immediately - but moves the correctness burden to query time. If data is malformed, you discover it when querying, not when ingesting.

For ML: schema-on-read (data lake) is critical because ML training often uses data from dozens of sources with different schemas, and you can't always define the schema upfront. But schema-on-read creates technical debt: the lakehouse pattern adds schema enforcement (schema-on-write guarantees) to the data lake's flexibility, giving you the best of both.

Q2: When would you use BigQuery vs S3 + Spark for ML feature engineering?

BigQuery (or Snowflake): Use when your feature engineering can be expressed in SQL, the team has strong SQL skills, ad-hoc interactive exploration is needed, and the dataset is under a few hundred TB. BigQuery's serverless model means no cluster management and you pay only for what you scan. Excellent for window functions, complex joins, and aggregations.

S3 + Spark: Use when you need Python transforms that can't be expressed in SQL (custom ML preprocessing, model inference for feature generation), when you're joining datasets from different sources (Kafka streams + database snapshots + S3 files), or when you need more control over memory management and parallelism for very large datasets. Spark can process data that exceeds SQL warehouse capabilities, and integrates directly with PyTorch DataLoaders and ML frameworks.

In practice: most production ML teams use both. BigQuery/Snowflake for SQL-based feature engineering and data exploration; Spark for complex Python transforms, joining heterogeneous data sources, and final training dataset generation.

Q3: What is columnar storage and why is it better for ML feature engineering than row storage?

In row storage (PostgreSQL, MySQL), each database page contains complete rows. To compute "average purchase amount for users in NYC," the database must read every column of every row - even columns you don't need (name, email, phone) and rows that fail the filter (non-NYC users). For a table with 100 columns where you only need 3, you're reading 97% wasted data.

In columnar storage (BigQuery, Parquet), each column is stored together. The query engine reads only the city column (to filter) and the purchase_amount column (to aggregate). For ML feature engineering, which typically computes aggregations over a small subset of columns, columnar storage can be 10–50× faster and cheaper than row storage.

Additionally, columnar storage achieves much better compression because similar values (all integers, all strings of the same type) are stored together, enabling specialized codecs. Parquet with ZSTD compression typically achieves 4–6× compression on ML feature data.

Q4: What is the small file problem in data lakes and how do you fix it?

The small file problem occurs when a data lake partition contains thousands of small files (1–10 MB) instead of a small number of large files (128–512 MB). Causes: streaming ingestion (Kafka consumer writing one file per batch per partition), frequent micro-batch Spark jobs, or hourly ETL jobs on low-volume data.

The impact on ML workloads: Spark reads files in parallel, but each file incurs fixed overhead for task scheduling, S3 metadata operations, and JVM task startup. 1,000 files of 1 MB each takes much longer to read than 4 files of 256 MB each containing the same data - easily 10–100× slower.

Solutions: (1) Coalesce streaming writes before committing to the lake (Kafka Connect with S3 sink can buffer to target file size); (2) Compaction jobs that run during off-peak hours, reading partitions and rewriting with optimal file sizes using df.coalesce(N) or df.repartition(N); (3) Delta Lake / Iceberg OPTIMIZE command that automates compaction; (4) Set Spark's spark.sql.files.maxPartitionBytes to control target partition size when reading.

Q5: How would you design a data lake partition strategy for a recommendation system's training data?

The partition strategy depends on the query patterns. For recommendation system training data:

Layer 1 - Raw events: Partition by event_date/hour. Queries always filter by date range. This prunes irrelevant files efficiently. events/year=2024/month=11/day=15/hour=14/

Layer 2 - User feature snapshots: Partition by snapshot_date. ML training queries select "all users as of date X." user_features/snapshot_date=2024-11-15/

Layer 3 - Training datasets: Partition by model_version + training_run_id. Enables reproducibility - every training run has its own immutable dataset snapshot. training_data/model_version=v3.2/run_id=20241115/

Layer 4 - Model predictions (for training new models): Partition by model_version + date. predictions/model_version=v3.1/date=2024-11-15/

Key rule: partition columns should be used in WHERE clauses of actual queries. Never partition by user_id (too many unique values - creates millions of partitions). Limit partition cardinality to the number of directories you'd be comfortable with (hundreds to thousands, not millions).

Summary

The evolution from RDBMS to data warehouse to data lake to lakehouse reflects the iterative discovery of what ML teams actually need: flexible ingestion, historical retention, cheap storage for training-scale data, fast SQL for feature engineering, and increasingly - ACID guarantees and time travel for reproducibility and debugging.

The practical rule: data lake for raw storage and training data export; data warehouse for SQL-based feature engineering and analytics; lakehouse (Delta Lake or Iceberg) when you need ACID transactions, time travel, or DML operations on the data lake.

The decision is rarely either/or. Most production ML teams use all three in combination, routing different workloads to the storage architecture best suited to the job.

:::tip Key Takeaway The data lake's core value proposition for ML is not its query capabilities (warehouses are faster at SQL) - it is the ability to store raw, immutable data indefinitely and reprocess it with corrected logic when bugs are discovered. The warehouse wall problem - where ETL pipelines discard raw data - is the most expensive mistake in ML data infrastructure. Always keep the raw data. :::

© 2026 EngineersOfAI. All rights reserved.