:::tip ๐ฎ Interactive Playground Visualize this concept: Try the Spark Batch Processing demo on the EngineersOfAI Playground - no code required. :::
SQL at Scale for ML Feature Engineering
import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
Reading time: 30 min | Relevance: High for Data Engineers, ML Engineers, Data Scientists | Level: IntermediateโAdvanced
The $400 Queryโ
A data scientist at a major streaming platform is building features for a content recommendation model. The feature set includes user engagement metrics - watch time, session frequency, completion rates, genre preferences - computed over the last 90 days. She writes the SQL in the development environment against a 1M-row sample. The query runs in 8 seconds. It looks clean. She's confident. She promotes it to production.
The production dataset is 10 billion rows, 3.5 years of streaming events across 180 million users. The query hits BigQuery and a progress bar appears. Then it keeps going. Forty-five minutes later, it times out with a "Resources exceeded" error. She adjusts the timeout config and reruns. This time it finishes - after 2 hours and 40 minutes. The BigQuery bill for that single query: $412.
The query has four problems: it scans 10 years of partitions when it only needs 90 days, it uses COUNT(DISTINCT user_id) across 180 million rows in a subquery (one of the most expensive operations in distributed SQL), it has a correlated subquery inside a WHERE clause that executes once per row, and its CTEs are materialized in a way that forces three full scans of the events table. None of these problems were visible on 1M rows.
Learning to write SQL that scales from 1M to 10B rows is one of the highest-leverage skills a data engineer or ML engineer can develop. The difference is not cleverness - it is understanding how a query planner works, where the data lives (columns, partitions, clusters), and which SQL patterns are 400 operations at scale.
This lesson systematically covers the patterns that matter.
Why SQL at Scale Is Differentโ
On a small dataset, every SQL operation is fast because everything fits in memory. The query planner's decisions do not matter - even a bad plan finishes in seconds. At 10 billion rows, the query planner's decisions are the difference between a query that runs in 30 seconds and one that runs for 3 hours.
The fundamental constraint at scale is I/O, not compute. Reading data from disk - or from cloud storage - is orders of magnitude slower than computing on data already in memory. SQL at scale is therefore about reading as little data as possible: using partition pruning to skip entire files, using column-oriented storage to scan only the columns you need, using approximate algorithms when exact answers are not required.
The key mental model: at 10B rows, every SQL operation implicitly answers the question "how many bytes does this force the engine to read?"
Historical Contextโ
Google's Dremel paper (2010) introduced the columnar-storage, massively-parallel query engine that became BigQuery. The key insight was separating compute from storage and using a columnar format (Capacitor in BigQuery, Parquet in open-source systems) that allows the engine to read only the columns referenced in the query. This made analytical SQL on petabyte-scale datasets economically viable.
DuckDB emerged from academic research at CWI Amsterdam in 2019. The key observation was that the "big data" tools (Spark, BigQuery) were dramatically over-engineered for the terabyte-scale workloads that most practitioners actually had. DuckDB brought columnar, vectorized query execution to the local process - no cluster, no server, just import duckdb. For ML feature development on datasets up to ~100GB, DuckDB is often faster than BigQuery and runs on a laptop.
BigQuery Architecture: What You Actually Need to Knowโ
BigQuery stores data in Capacitor - Google's proprietary columnar format. Tables are split into partitions (typically by date or a user-defined column) and within partitions, data is organized into clusters (sorted by up to 4 columns).
The query planner reads the partition metadata before touching any data. If your query's WHERE clause filters on the partition column, the planner skips all non-matching partitions entirely - no I/O, no cost. This is partition pruning.
Bytes billed is the BigQuery cost model: you pay for how many bytes are read, not compute time. This means:
- Scanning a 10TB table costs ~5/TB)
- Scanning 100GB of a 10TB table (via partition pruning) costs ~$0.50
- A
SELECT *on a 10TB table costs ~$50 regardless of what you do with the result
The practical implication: always filter on partition columns, never use SELECT *, and be aware that operations that force a full table scan (cross joins, functions applied to indexed columns) cost full table price.
Partitioned Tables and Partition Pruningโ
Creating a Partitioned Tableโ
-- BigQuery: create a table partitioned by date
CREATE TABLE `my_project.ml_features.streaming_events`
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, content_id
OPTIONS (
partition_expiration_days = 365,
require_partition_filter = true -- forces callers to always filter on partition
)
AS
SELECT
user_id,
content_id,
event_type,
watch_duration_seconds,
event_timestamp
FROM `my_project.raw.events_raw`
The CLUSTER BY user_id, content_id sorts data within each partition. Queries that filter on user_id can skip files within the partition that don't contain matching user IDs - a second layer of pruning called block pruning or micro-partition pruning (Snowflake's term).
Ensuring Partition Pruning Is Usedโ
-- GOOD: partition column in WHERE clause - BigQuery reads only the relevant partitions
SELECT
user_id,
SUM(watch_duration_seconds) AS total_watch_seconds_90d
FROM `my_project.ml_features.streaming_events`
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
AND event_type = 'watch'
GROUP BY user_id
-- BAD: wrapping the partition column in a function breaks pruning
-- BigQuery cannot evaluate DATE_ADD(event_timestamp, ...) at planning time
WHERE DATE_ADD(event_timestamp, INTERVAL 1 DAY) >= CURRENT_TIMESTAMP() -- full scan!
:::danger Non-sargable predicates destroy pruning
A non-sargable predicate (Search ARGument ABLE) is a filter condition that the query engine cannot use to prune data at the storage layer. Functions applied to indexed or partitioned columns are the most common example: WHERE YEAR(event_timestamp) = 2024 forces a full scan, while WHERE event_timestamp >= '2024-01-01' AND event_timestamp < '2025-01-01' enables pruning. Always write filter conditions in the form column [operator] value, not function(column) [operator] value.
:::
Window Functions: The Core ML Feature Engineโ
Window functions are the most powerful SQL tool for ML feature engineering. They allow you to compute aggregations over a sliding window without a GROUP BY - preserving the row-level granularity needed for training data.
Anatomy of a Window Functionโ
function_name([expression]) OVER (
PARTITION BY partition_columns
ORDER BY order_column
ROWS BETWEEN start AND end
)
The OVER() clause defines the window. Every combination of these three clauses produces a different computation.
ROWS BETWEEN vs RANGE BETWEENโ
This distinction confuses even experienced SQL engineers and causes subtle ML feature errors.
ROWS BETWEEN: a physical row offset. "The last 30 rows."
RANGE BETWEEN: a logical value range based on the ORDER BY column. "All rows where the ORDER BY value is within 30 of the current row's value."
For time-based ML features, ROWS BETWEEN on a day-level aggregate is usually what you want - but only if you have exactly one row per day per user. If a user has multiple events per day, ROWS BETWEEN 29 PRECEDING AND CURRENT ROW gives you the last 30 rows (events), not the last 30 days. To get the last 30 days, use RANGE BETWEEN INTERVAL 30 DAY PRECEDING AND CURRENT ROW on a timestamp column:
-- Correct 30-day rolling window regardless of event frequency
SELECT
user_id,
event_timestamp,
SUM(watch_duration_seconds) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
RANGE BETWEEN INTERVAL 30 DAY PRECEDING AND CURRENT ROW
) AS watch_seconds_30d_range,
-- Versus: last 30 ROWS (not 30 DAYS) - wrong for variable-frequency data
SUM(watch_duration_seconds) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS watch_seconds_last_30_rows -- misleading name, not a time window
FROM `my_project.ml_features.streaming_events`
WHERE event_type = 'watch'
Complete Rolling Window Feature Queryโ
-- Rolling engagement features: 7d, 30d, 90d windows
-- One row per user per day - used for time-series ML training
WITH daily_events AS (
SELECT
user_id,
DATE(event_timestamp) AS event_date,
SUM(watch_duration_seconds) AS daily_watch_seconds,
COUNT(*) AS daily_session_count,
COUNT(DISTINCT content_id) AS daily_unique_content
FROM `my_project.ml_features.streaming_events`
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 120 DAY) -- partition prune
AND event_type = 'watch'
GROUP BY user_id, event_date
)
SELECT
user_id,
event_date,
-- 7-day rolling features
SUM(daily_watch_seconds) OVER w7 AS watch_seconds_7d,
SUM(daily_session_count) OVER w7 AS sessions_7d,
AVG(daily_watch_seconds) OVER w7 AS avg_daily_watch_7d,
COUNT(*) OVER w7 AS active_days_7d,
-- 30-day rolling features
SUM(daily_watch_seconds) OVER w30 AS watch_seconds_30d,
SUM(daily_session_count) OVER w30 AS sessions_30d,
SUM(daily_unique_content) OVER w30 AS unique_content_30d,
COUNT(*) OVER w30 AS active_days_30d,
-- 90-day rolling features
SUM(daily_watch_seconds) OVER w90 AS watch_seconds_90d,
SUM(daily_session_count) OVER w90 AS sessions_90d,
-- Recency features
LAG(daily_watch_seconds, 1) OVER (PARTITION BY user_id ORDER BY event_date) AS prev_day_watch,
LEAD(daily_watch_seconds, 1) OVER (PARTITION BY user_id ORDER BY event_date) AS next_day_watch,
-- Trend: last 7d vs prior 7d
SUM(daily_watch_seconds) OVER w7 /
NULLIF(SUM(daily_watch_seconds) OVER w14_prior, 0) AS watch_trend_ratio
FROM daily_events
WINDOW
w7 AS (PARTITION BY user_id ORDER BY event_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW),
w30 AS (PARTITION BY user_id ORDER BY event_date ROWS BETWEEN 29 PRECEDING AND CURRENT ROW),
w90 AS (PARTITION BY user_id ORDER BY event_date ROWS BETWEEN 89 PRECEDING AND CURRENT ROW),
w14_prior AS (PARTITION BY user_id ORDER BY event_date ROWS BETWEEN 13 PRECEDING AND 7 PRECEDING)
The named WINDOW clause at the bottom defines window specifications once and reuses them across multiple function calls. This is not only cleaner - it is also a hint to the query planner that these windows share the same partition and order, enabling it to compute them in a single pass over the data.
Point-in-Time Join in Pure SQLโ
When you do not have SCD2 snapshots, you can implement a point-in-time join entirely in SQL using temporal conditions:
-- Point-in-time join: get user features as they existed at each training event
WITH training_events AS (
-- The events that define the training examples
SELECT
user_id,
event_timestamp AS label_time,
churned_within_30d AS label
FROM `my_project.ml.churn_labels`
WHERE DATE(event_timestamp) BETWEEN '2023-01-01' AND '2024-01-01'
),
user_feature_history AS (
-- A historical log of user feature snapshots
-- (e.g., computed daily and stored, or from a CDC pipeline)
SELECT
user_id,
snapshot_timestamp,
account_age_days,
plan_tier,
total_spend_usd,
feature_count
FROM `my_project.ml_features.user_feature_snapshots`
),
-- As-of join: for each training event, find the most recent feature snapshot
-- that existed BEFORE the label time (strict less-than to prevent leakage)
latest_features_at_label_time AS (
SELECT
te.user_id,
te.label_time,
te.label,
ufs.account_age_days,
ufs.plan_tier,
ufs.total_spend_usd,
ufs.feature_count,
ROW_NUMBER() OVER (
PARTITION BY te.user_id, te.label_time
ORDER BY ufs.snapshot_timestamp DESC
) AS rn
FROM training_events te
LEFT JOIN user_feature_history ufs
ON te.user_id = ufs.user_id
AND ufs.snapshot_timestamp < te.label_time -- strict less-than: no future information
)
SELECT
user_id,
label_time,
label,
account_age_days,
plan_tier,
total_spend_usd,
feature_count
FROM latest_features_at_label_time
WHERE rn = 1 -- keep only the most recent snapshot before label_time
The ROW_NUMBER() ... ORDER BY snapshot_timestamp DESC ... WHERE rn = 1 pattern efficiently selects the most recent record before each label time. The strict less-than (< not <=) on the timestamp is critical: using <= would allow a snapshot computed at exactly the label time to be used, which could include information from the event itself.
COUNT(DISTINCT) at Scale: HyperLogLog Sketchesโ
COUNT(DISTINCT user_id) on a 10-billion-row table is one of the most expensive operations in distributed SQL. It requires shuffling all rows to a single node (or coordinating across nodes) to deduplicate. For exact counts, this is unavoidable. But for ML features where you need "approximate number of unique users who viewed this item in the last 30 days," an approximate answer is almost always sufficient.
BigQuery implements HyperLogLog++ (Google's enhancement of HyperLogLog) behind the scenes for COUNT(DISTINCT) on very large tables. You can also use the sketch functions directly for even better performance and composability:
-- Standard COUNT(DISTINCT) - expensive, exact
SELECT
content_id,
COUNT(DISTINCT user_id) AS exact_unique_viewers
FROM streaming_events
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY content_id
-- HyperLogLog approximate COUNT(DISTINCT) - fast, ~1% error
SELECT
content_id,
APPROX_COUNT_DISTINCT(user_id) AS approx_unique_viewers
FROM streaming_events
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY content_id
-- HyperLogLog sketch: compute sketch per day, then merge
-- This allows incremental computation - you don't need to reprocess all 30 days
WITH daily_sketches AS (
SELECT
content_id,
DATE(event_timestamp) AS event_date,
HLL_COUNT.INIT(user_id) AS user_sketch -- build sketch per day
FROM streaming_events
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY content_id, event_date
)
SELECT
content_id,
HLL_COUNT.MERGE(user_sketch) AS approx_unique_viewers_30d -- merge daily sketches
FROM daily_sketches
GROUP BY content_id
The HLL sketch approach enables incremental approximate counting: store daily sketches, and to compute a 30-day window, merge 30 small sketch files instead of rescanning 30 days of raw events. For feature tables that update nightly, this reduces the daily cost from scanning 30 days of events to scanning 1 day of events plus 29 sketch files.
The mathematical guarantee of HyperLogLog: for a cardinality estimate , the standard error is approximately where is the number of registers (controlled by precision parameter). At the default precision, error is typically under 1%.
SQL Query Execution Pipelineโ
The optimizer stage is where partition pruning, predicate pushdown (moving filters as early as possible in the plan), and join reordering (joining the smallest tables first) happen. For ML engineers, the most actionable insight is: filters on partition columns must be in a form the optimizer can evaluate at planning time (constants, date functions with constant arguments). If the optimizer cannot evaluate the filter at plan time, it cannot prune - and you pay for a full scan.
Aggregation Patterns for ML Featuresโ
Conditional Aggregation: The Pivot Patternโ
SQL's CASE WHEN inside an aggregate function is one of the most powerful patterns for generating feature tables with many columns:
-- Generate one row per user with multiple behavioral signals
-- All computed in a single scan of the events table
SELECT
user_id,
-- Activity by time of day
COUNT(CASE WHEN EXTRACT(HOUR FROM event_time) BETWEEN 6 AND 12
THEN 1 END) AS morning_sessions,
COUNT(CASE WHEN EXTRACT(HOUR FROM event_time) BETWEEN 12 AND 18
THEN 1 END) AS afternoon_sessions,
COUNT(CASE WHEN EXTRACT(HOUR FROM event_time) BETWEEN 18 AND 24
THEN 1 END) AS evening_sessions,
-- Activity by day of week
COUNT(CASE WHEN EXTRACT(DAYOFWEEK FROM event_date) IN (1, 7)
THEN 1 END) AS weekend_sessions,
COUNT(CASE WHEN EXTRACT(DAYOFWEEK FROM event_date) NOT IN (1, 7)
THEN 1 END) AS weekday_sessions,
-- Recency segmentation
SUM(CASE WHEN event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
THEN revenue ELSE 0 END) AS revenue_7d,
SUM(CASE WHEN event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
THEN revenue ELSE 0 END) AS revenue_30d,
SUM(revenue) AS revenue_all_time,
-- Ratio features (handle division by zero)
SAFE_DIVIDE(
SUM(CASE WHEN event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
THEN revenue END),
NULLIF(SUM(revenue), 0)
) AS recent_revenue_share -- what fraction of total revenue is from the last 7 days
FROM `my_project.ml_features.user_events`
WHERE event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
GROUP BY user_id
The key insight: this is a single scan of the events table that produces dozens of feature columns. Writing this as 8 separate queries and joining them would be 8ร more expensive. Aggregate everything you need from the same base table in one pass.
The QUALIFY Clause: Efficient Row Deduplicationโ
QUALIFY (BigQuery, Snowflake, DuckDB) allows you to filter window function results without a subquery - essential for building training datasets where you want the most recent record per entity:
-- Get the most recent feature snapshot per user
-- Without QUALIFY (subquery approach - more verbose)
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY snapshot_ts DESC) AS rn
FROM user_feature_snapshots
) WHERE rn = 1
-- With QUALIFY (cleaner, equally efficient)
SELECT *
FROM user_feature_snapshots
QUALIFY ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY snapshot_ts DESC) = 1
QUALIFY is particularly useful in training data preparation pipelines where you routinely need to deduplicate or select ranked rows.
LATERAL Joins and Array Featuresโ
BigQuery and Snowflake natively support array columns. For ML features derived from array-structured data (tags, categories, events), CROSS JOIN UNNEST is the efficient pattern:
-- User genre preference features from an array column
WITH user_genre_events AS (
SELECT
user_id,
genre_tag,
COUNT(*) AS views,
SUM(watch_duration_seconds) AS watch_seconds
FROM `my_project.ml_features.streaming_events`,
UNNEST(genre_tags) AS genre_tag -- lateral unnest: one row per genre tag per event
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY user_id, genre_tag
),
-- Pivot genres into columns
genre_features AS (
SELECT
user_id,
SUM(CASE WHEN genre_tag = 'action' THEN watch_seconds ELSE 0 END) AS watch_action_30d,
SUM(CASE WHEN genre_tag = 'drama' THEN watch_seconds ELSE 0 END) AS watch_drama_30d,
SUM(CASE WHEN genre_tag = 'comedy' THEN watch_seconds ELSE 0 END) AS watch_comedy_30d,
SUM(CASE WHEN genre_tag = 'thriller' THEN watch_seconds ELSE 0 END) AS watch_thriller_30d,
COUNT(DISTINCT genre_tag) AS genre_diversity_30d -- entropy proxy
FROM user_genre_events
GROUP BY user_id
)
SELECT * FROM genre_features
CTEs vs Subqueries vs Temp Tablesโ
This is a frequent source of confusion that has real performance implications.
CTEs (Common Table Expressions) - defined with WITH. In BigQuery, CTEs are inlined by the optimizer, not materialized. This means a CTE referenced 3 times executes 3 times. In Snowflake, CTEs may or may not be materialized depending on the optimizer. In PostgreSQL, CTEs are historically materialized (treated as an optimization fence) unless you use WITH ... AS NOT MATERIALIZED.
-- Potential issue: daily_events CTE is referenced twice
-- In BigQuery, this may scan the events table twice
WITH daily_events AS (
SELECT user_id, DATE(event_timestamp) AS d, COUNT(*) AS cnt
FROM streaming_events
WHERE DATE(event_timestamp) >= '2024-01-01'
GROUP BY 1, 2
)
SELECT a.user_id, a.cnt AS cnt_7d, b.cnt AS cnt_30d
FROM (SELECT * FROM daily_events WHERE d >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)) a
JOIN (SELECT * FROM daily_events WHERE d >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)) b
ON a.user_id = b.user_id
Temp tables are explicitly materialized. In BigQuery, use a CREATE TEMP TABLE to force materialization when a CTE is expensive and referenced multiple times:
-- Force materialization: compute once, reference twice
CREATE TEMP TABLE daily_events AS
SELECT
user_id,
DATE(event_timestamp) AS event_date,
COUNT(*) AS session_count,
SUM(watch_duration_seconds) AS watch_seconds
FROM `my_project.ml_features.streaming_events`
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY 1, 2;
-- Now reference it cheaply, as it's already materialized
SELECT
user_id,
SUM(CASE WHEN event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
THEN watch_seconds END) AS watch_7d,
SUM(CASE WHEN event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
THEN watch_seconds END) AS watch_30d,
SUM(watch_seconds) AS watch_90d
FROM daily_events
GROUP BY user_id;
DuckDB: Local ML Data Preparationโ
DuckDB is an in-process analytical database that runs inside your Python process. It executes columnar, vectorized SQL against Parquet files, CSV files, and Pandas DataFrames - no server, no cluster, no configuration. For ML feature development on datasets up to ~100GB, it is frequently faster than BigQuery and runs entirely on your laptop.
import duckdb
import pandas as pd
# Connect to an in-process database (no file = in-memory)
conn = duckdb.connect()
# Query Parquet files directly - DuckDB reads them natively
conn.execute("""
CREATE VIEW streaming_events AS
SELECT * FROM read_parquet('/data/streaming_events/*.parquet')
""")
# Run the full rolling window feature query locally
features_df = conn.execute("""
WITH daily_events AS (
SELECT
user_id,
CAST(event_timestamp AS DATE) AS event_date,
SUM(watch_duration_seconds) AS daily_watch_seconds,
COUNT(*) AS daily_sessions,
COUNT(DISTINCT content_id) AS daily_unique_content
FROM streaming_events
WHERE event_type = 'watch'
AND event_timestamp >= CURRENT_TIMESTAMP - INTERVAL 90 DAY
GROUP BY user_id, event_date
)
SELECT
user_id,
event_date,
SUM(daily_watch_seconds) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS watch_seconds_7d,
SUM(daily_watch_seconds) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS watch_seconds_30d,
SUM(daily_sessions) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS sessions_30d
FROM daily_events
ORDER BY user_id, event_date
""").df() # .df() returns a pandas DataFrame
print(f"Feature rows: {len(features_df):,}")
print(features_df.head())
# Export as Parquet for training
features_df.to_parquet('/data/features/user_engagement_features.parquet', index=False)
# Or let DuckDB write directly - much faster for large outputs
conn.execute("""
COPY (
SELECT * FROM features_view
ORDER BY user_id, event_date
) TO '/data/features/user_engagement_features.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000)
""")
DuckDB's performance on a modern laptop (M2 MacBook Pro, 32GB RAM) on a 50GB Parquet dataset is typically comparable to a medium Spark cluster for single-node-parallelizable workloads. For ML feature development workflows - where you iterate quickly on feature definitions and export training data - DuckDB eliminates the cluster overhead entirely.
:::tip DuckDB for feature prototyping, BigQuery for production The standard workflow: prototype features in DuckDB on a representative sample (1-5GB Parquet file on your local machine), iterate quickly without cloud costs, then translate the final SQL to BigQuery or Spark SQL for the production pipeline. DuckDB SQL is ANSI-standard and nearly identical to BigQuery Standard SQL - the translation is usually copy-paste with minor syntax adjustments. :::
Reading EXPLAIN ANALYZE Outputโ
Understanding query plans is essential for diagnosing slow queries. In BigQuery, use the Query Execution Details panel in the console. In PostgreSQL/DuckDB, use EXPLAIN ANALYZE:
-- DuckDB EXPLAIN ANALYZE example
EXPLAIN ANALYZE
SELECT
user_id,
SUM(watch_duration_seconds) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS watch_30d
FROM daily_events;
Example output sections and what they mean:
HASH_GROUP_BY โ aggregation step
- groups: 45,291,022 โ 45M user+date combinations
- actual time: 12.4s โ expensive - may need index or partition pruning
SEQ_SCAN streaming_events โ sequential scan of the table
- filters: event_type='watch' โ filter applied during scan (good)
- rows: 10,234,567,891 โ 10B rows scanned - too many?
- actual time: 48.2s
WINDOW โ window function computation
- function: sum(watch_duration_seconds)
- partition: user_id
- rows: 45,291,022
- actual time: 8.1s
The key numbers to look for:
- Rows scanned vs rows output: a high ratio indicates filtering could be pushed earlier
- Hash group by with very high group count: may benefit from pre-aggregation
- SEQ_SCAN on a large table with no partition filter: the classic $400 query sign
Materialized Views for Pre-Computed Featuresโ
When the same expensive feature computation is needed by multiple downstream queries - ML training, A/B test analysis, dashboards - a materialized view computes it once and serves all consumers. Unlike a regular view (re-executed on every query), a materialized view is a physical table that is automatically refreshed when the base table changes.
BigQuery Materialized Viewsโ
-- Create a materialized view for 7-day engagement features
-- BigQuery refreshes automatically within 30 minutes of base table changes
CREATE MATERIALIZED VIEW `my_project.ml_features.mv_user_engagement_7d`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 60
)
AS
SELECT
user_id,
DATE(event_timestamp) AS event_date,
COUNT(*) AS sessions,
SUM(watch_duration_seconds) AS watch_seconds,
COUNT(DISTINCT content_id) AS unique_content,
-- 7-day rolling window computed at creation time
SUM(SUM(watch_duration_seconds)) OVER (
PARTITION BY user_id
ORDER BY DATE(event_timestamp)
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS watch_seconds_7d
FROM `my_project.ml_features.streaming_events`
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY)
GROUP BY user_id, event_date;
BigQuery automatically rewrites queries that match the materialized view definition to use the pre-computed result. This means downstream SQL that queries streaming_events with the same aggregation structure may automatically benefit from the materialized view without any code changes - the optimizer detects the equivalence.
When to Use Materialized Views vs dbt Incremental Modelsโ
| Factor | Materialized View | dbt Incremental |
|---|---|---|
| Refresh trigger | Automatic (base table change) | Manual / scheduled |
| Refresh granularity | Partial (only changed partitions) | Configurable |
| Test integration | None | Full dbt test suite |
| Lineage tracking | Partial (warehouse-level) | Full dbt DAG |
| Cross-warehouse portability | No (warehouse-specific) | Yes (dbt adapter) |
| Recommended for | High-frequency, simple aggregations consumed by many | Complex feature tables with business logic and tests |
The practical rule: use materialized views for simple, high-frequency aggregations that many teams consume. Use dbt incremental models for complex, business-logic-heavy feature tables that need testing, documentation, and version control.
Spark SQL: Distributed SQL for Very Large Datasetsโ
When your dataset is genuinely too large for BigQuery's per-query cost model (petabytes of historical data queried many times per day), Spark SQL on Databricks or EMR is the alternative. Spark SQL supports almost all of the same SQL syntax as BigQuery Standard SQL, with a few differences:
# PySpark: SQL-first interface for ML feature engineering
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("UserEngagementFeatures") \
.config("spark.sql.adaptive.enabled", "true") \ # AQE: auto-tunes shuffle
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
# Register a temp view for SQL queries
spark.read.parquet("s3://my-bucket/streaming-events/") \
.createOrReplaceTempView("streaming_events")
features_df = spark.sql("""
WITH daily_events AS (
SELECT
user_id,
CAST(event_timestamp AS DATE) AS event_date,
SUM(watch_duration_seconds) AS daily_watch_seconds,
COUNT(*) AS daily_sessions
FROM streaming_events
WHERE TO_DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), 90)
AND event_type = 'watch'
GROUP BY user_id, event_date
)
SELECT
user_id,
event_date,
SUM(daily_watch_seconds) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS watch_seconds_30d
FROM daily_events
""")
# Write as Delta Lake table (efficient for incremental updates)
features_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("event_date") \
.save("s3://my-bucket/ml-features/user-engagement/")
The key difference from BigQuery: Spark requires explicit cluster sizing and configuration. BigQuery is serverless - you pay per byte scanned. Spark is cluster-based - you pay per cluster-hour regardless of how much data you actually process. For steady, high-throughput workloads, Spark can be cheaper. For intermittent, variable-scale workloads, BigQuery's serverless model wins.
SQL at Scale: When to Use Whatโ
| Dataset Size | Frequency | Tool | Reason |
|---|---|---|---|
| Less than 10GB | Interactive | DuckDB | Fastest for local iteration, zero cost |
| 10GBโ1TB | Daily batch | BigQuery/Snowflake | Serverless, partition pruning, low ops cost |
| 1TBโ100TB | Daily batch | BigQuery or Spark SQL | BigQuery for ad-hoc; Spark for predictable workloads |
| 100TB+ | Daily batch | Spark SQL on Databricks | BigQuery cost becomes prohibitive at this scale |
| Any size | Real-time | Flink SQL / Kafka Streams | SQL at scale for streaming - different lesson |
Practical BigQuery Cost Controlโ
BigQuery charges 20-100. Here are the concrete techniques that reduce that:
1. Column Projection: Never SELECT *โ
Columnar storage means you pay only for columns you read. On a 100-column table, SELECT col1, col2, col3 costs ~3% of SELECT *:
-- BAD: scans all 100 columns = full cost
SELECT * FROM streaming_events WHERE ...
-- GOOD: scans only 3 columns = 3% of the cost
SELECT user_id, event_type, watch_duration_seconds
FROM streaming_events WHERE ...
2. LIMIT Does Not Reduce Costโ
A common misconception: SELECT * FROM huge_table LIMIT 100 still scans the full table in BigQuery. Use a partition filter to limit scan scope.
3. Clustering Keys for Sub-Partition Pruningโ
After partitioning (which prunes at the file level), clustering sorts data within each partition by up to 4 columns. Queries that filter on clustered columns skip blocks within partitions without reading them:
-- Table clustered by user_id - queries filtering on user_id are faster and cheaper
SELECT SUM(watch_duration_seconds)
FROM streaming_events
WHERE DATE(event_timestamp) = '2024-03-01' -- partition filter
AND user_id = 'user_12345' -- clustering filter: skips blocks
4. Cost Monitoring in BigQueryโ
-- Query your information_schema to find expensive queries (run this in BigQuery)
SELECT
query,
user_email,
total_bytes_processed / POW(1024, 4) AS tb_processed,
total_bytes_processed / POW(1024, 4) * 5 AS estimated_cost_usd,
creation_time
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
ORDER BY total_bytes_processed DESC
LIMIT 20
This query identifies the 20 most expensive queries run in the last 7 days. Run it weekly to catch runaway queries before the monthly bill arrives.
5. BI Engine and Cached Resultsโ
BigQuery caches query results for 24 hours. Identical queries (same SQL, same underlying data) return instantly and at zero cost. For ML feature pipelines that run the same feature query nightly, the second run within the same day is free if the data has not changed.
Production Engineering Notesโ
:::warning APPROX_COUNT_DISTINCT is not always approximate
In BigQuery, APPROX_COUNT_DISTINCT always returns an approximate result. COUNT(DISTINCT) on very large datasets internally uses HyperLogLog as well in recent BigQuery versions, but the result is reported as exact. For ML features where a 1% error in "unique viewers" is acceptable (almost always), use APPROX_COUNT_DISTINCT explicitly to document that approximation is intentional and to enable sketch-based optimizations.
:::
:::tip Materialized views for pre-computed expensive features BigQuery materialized views are automatically refreshed when base table data changes, and queries that match the materialized view definition are automatically rewritten to use the materialized view. For features like "total orders per user in the last 7 days" that are computed by many downstream queries, a materialized view computes once and serves many - similar to a dbt incremental model but managed at the warehouse layer. :::
:::danger Correlated subqueries kill performance
A correlated subquery references a column from the outer query. The database executes it once per row of the outer query. At 10B rows, a correlated subquery that takes 1ms becomes 10,000 hours of runtime. Always rewrite correlated subqueries as JOINs or window functions. EXISTS (SELECT 1 FROM t2 WHERE t2.id = t1.id) can always be rewritten as a LEFT JOIN with a NULL check, and is typically 100x faster at scale.
:::
Common Mistakesโ
Mistake 1: Applying functions to partition columns in WHERE clauses.
WHERE YEAR(event_timestamp) = 2024 prevents partition pruning. WHERE event_timestamp >= '2024-01-01' AND event_timestamp < '2025-01-01' enables it. This single mistake is responsible for the majority of runaway BigQuery costs.
Mistake 2: Using ROWS BETWEEN when you need RANGE BETWEEN for time windows.
If users can have multiple events per day, ROWS BETWEEN 29 PRECEDING AND CURRENT ROW gives you the last 30 events, not the last 30 days. The correct pattern for a 30-day time window with variable event frequency is RANGE BETWEEN INTERVAL 30 DAY PRECEDING AND CURRENT ROW on a timestamp column, or pre-aggregating to daily granularity first and then using ROWS BETWEEN.
Mistake 3: Using CTEs as optimization barriers when you need materialization. In BigQuery, a CTE referenced multiple times re-executes each time. If the CTE is expensive (scanning a large table, computing complex aggregations), create a temp table instead. In dbt, use ephemeral models only for lightweight transformations - use table or incremental materializations for expensive CTEs that are referenced by multiple downstream models.
Mistake 4: SELECT * in production SQL.
Columnar databases read only the columns you reference. SELECT * forces a scan of every column in the table. In BigQuery, where you pay per byte scanned, SELECT * on a 100-column table costs roughly 100ร more than SELECT col1, col2, col3. Always specify exactly the columns you need.
Mistake 5: Running development SQL against production scale without checking the query plan first. Always check the estimated bytes processed in BigQuery (or the EXPLAIN output in Spark/DuckDB) before running an unfamiliar query against a production dataset. BigQuery shows estimated bytes processed before you run the query. A query that estimates "1.2 TB processed" should trigger a review before execution, not after the $6 charge.
Interview Q&Aโ
Q: What is partition pruning and how do you ensure queries use it?
Partition pruning is the query optimizer's ability to skip entire partitions (files or file groups) based on a filter condition, without reading them at all. In BigQuery, tables are partitioned by a date column, and the planner checks partition metadata before any I/O. To ensure pruning is used: filter on the partition column using operators the optimizer can evaluate at plan time (constants, DATE_SUB(CURRENT_DATE(), INTERVAL N DAY), string literals). Never wrap the partition column in a function - WHERE DATE_TRUNC(event_timestamp, MONTH) = '2024-01-01' prevents pruning even though it looks like a date filter. Use require_partition_filter = true in BigQuery table options to force all queries to include a partition filter, which prevents accidental full scans in production.
Q: How do you compute rolling window features in SQL?
The standard approach is a two-step process: first aggregate to the smallest granularity you need (usually daily), then apply window functions over the aggregated result. In the window function, the key decision is ROWS BETWEEN versus RANGE BETWEEN: ROWS BETWEEN N PRECEDING AND CURRENT ROW counts the last N physical rows, which is only correct for a time-based window if you have exactly one row per time unit per entity. For variable-frequency events, pre-aggregate to daily first, then use ROWS BETWEEN 29 PRECEDING AND CURRENT ROW on the daily rows. Named WINDOW clauses are a performance optimization: define the window spec once, reference it across multiple function calls, and give the planner a hint that they share the same partition key.
Q: What is the difference between ROWS BETWEEN and RANGE BETWEEN?
ROWS BETWEEN defines the window frame using physical row offsets relative to the current row. ROWS BETWEEN 6 PRECEDING AND CURRENT ROW includes exactly the current row and the 6 rows before it in the partition's ORDER BY sequence - regardless of what values those rows have. RANGE BETWEEN defines the frame using value ranges on the ORDER BY column. RANGE BETWEEN INTERVAL 7 DAY PRECEDING AND CURRENT ROW includes all rows where the ORDER BY column (a timestamp) is within 7 days of the current row's timestamp. This means if multiple rows share the same timestamp, they are all included or excluded together. For ML features, the practical consequence: use ROWS BETWEEN after pre-aggregating to a fixed granularity (one row per day per entity); use RANGE BETWEEN when events can occur at arbitrary timestamps and you want a true calendar-based window.
Q: When would you use DuckDB vs BigQuery vs Spark SQL?
DuckDB is for local development and datasets up to ~100GB: zero cost, fast iteration, ANSI SQL with Parquet support, runs in-process in Python. Use it for feature prototyping, data exploration, and offline experimentation. BigQuery is for production batch features on datasets from 100GB to a few petabytes: serverless (no cluster management), partition-pruning-based cost model makes intermittent queries economical, excellent for teams that need SQL without infrastructure expertise. Spark SQL on Databricks is for very large datasets (100TB+) or very high query frequency where BigQuery's per-byte billing becomes expensive: requires cluster management but provides more control over cost and performance, and integrates with Delta Lake for incremental processing. For most ML teams, the practical answer is: DuckDB for development, BigQuery for production.
Q: How do you optimize a slow SQL query that costs too much on BigQuery?
Five steps in order: (1) Check if partition pruning is active - look at the bytes processed estimate; if it is scanning the full table, add or fix the partition filter. (2) Remove SELECT * - select only the columns needed, which can reduce bytes scanned by 10-100x on wide tables. (3) Replace COUNT(DISTINCT) with APPROX_COUNT_DISTINCT for any high-cardinality deduplication that doesn't require an exact result. (4) Identify CTEs that are referenced multiple times and convert them to temp tables to avoid re-scanning. (5) Look for correlated subqueries (subqueries in WHERE or SELECT that reference outer query columns) and rewrite them as JOINs. In practice, steps 1 and 2 alone typically account for 80% of cost reductions on runaway BigQuery queries.
Q: What is a non-sargable predicate and why does it matter?
A sargable predicate (Search ARGument ABLE) is a filter condition that the query engine can use to seek into an index, skip partitions, or prune storage blocks without reading the full dataset. A non-sargable predicate cannot be used this way - the engine must read every row and evaluate the function to determine if it passes the filter. In SQL at scale, the most common non-sargable patterns are: applying functions to the filtered column (WHERE YEAR(created_at) = 2024, WHERE LOWER(email) = '[email protected]'), using LIKE with a leading wildcard (WHERE name LIKE '%smith'), and arithmetic on the filtered column (WHERE total_amount / 100 > 50). The fix is always to reformulate the predicate so the column appears alone on one side: WHERE created_at >= '2024-01-01' AND created_at < '2025-01-01', WHERE email = LOWER('[email protected]'), WHERE total_amount > 5000.
