:::tip 🎮 Interactive Playground Visualize this concept: Try the Spark Batch Processing demo on the EngineersOfAI Playground - no code required. :::
dbt for ML Feature Preparation
import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
Reading time: 25 min | Relevance: High for Data Engineers, Analytics Engineers, MLOps Engineers | Level: Intermediate
The Night the Recommendation Model Went Silent
It is 2 AM on a Tuesday. The on-call data engineer at a mid-size e-commerce company gets paged: the recommendation model is returning near-random rankings. Revenue per session is down 18%. The SRE already confirmed the serving infrastructure is healthy. Something upstream has changed.
The on-call engineer opens a folder called etl_scripts/. There are 50 SQL files numbered 001_clean_orders.sql through 050_build_user_features.sql. They run in sequence via a cron job using a shell script that calls psql with each file in turn. There is no dependency graph. The README was last updated two years ago and references a table that no longer exists. The engineer starts reading scripts from the top, manually tracing which tables feed which.
Three hours later - at 5 AM - the root cause is found: a schema migration on the raw orders table renamed product_category_id to product_category_code. Script 023_category_rollup.sql silently produced a table full of NULLs because the column reference broke without error (the old column was still present in a legacy view). Scripts 031 through 044 happily consumed those NULLs and built a feature table where the category affinity scores were all zero. The model used three days of stale feature data before anyone noticed.
This is not a hypothetical. Variants of this incident happen at companies of every size. The core problem is not that engineers wrote bad SQL - it is that raw SQL scripts have no mechanism to express dependencies, no built-in tests, no documentation that stays in sync with the code, and no visibility into what data flows where. When something breaks, the only debugging tool is reading every file top to bottom.
dbt - the data build tool - was built specifically to solve this. It does not replace SQL. It wraps SQL in a layer of software engineering best practices: dependency graphs, version control, automated testing, and auto-generated documentation. For ML teams building feature pipelines, dbt is the difference between "we think the features are correct" and "we know the features are correct because 47 tests passed before this run was used for training."
Why This Exists
The Problem with Raw SQL Scripts
Before dbt, the standard pattern for data transformation was exactly what the opening scenario describes: a folder of SQL files run in sequence. The fundamental issues are:
No lineage. You cannot automatically determine which tables depend on which other tables. When a source table changes, you have to manually audit every downstream script.
No testing. SQL is executed and the results land in a table. Whether those results are correct - whether there are unexpected NULLs, whether counts match expectations, whether a join produced a fan-out - is unknown unless you write separate validation queries, and nobody writes them consistently.
Documentation is always stale. If documentation exists at all, it lives in a separate wiki that diverges from the code within weeks of being written.
No version control integration. The SQL files might be in Git, but the dependency between them is encoded in a shell script or a cron ordering, not in the SQL itself. Refactoring any step requires manual surgery.
Silent failures. SQL does not throw exceptions by default. A broken column reference on a LEFT JOIN produces NULLs, not an error. A missing partition produces an empty table, not a failure. Your pipeline finishes with exit code 0 while feeding corrupted features to your model.
The ELT Paradigm Shift
dbt emerged from a paradigm shift from ETL (Extract, Transform, Load) to ELT (Extract, Load, Transform). In ETL, data is transformed before being loaded into the warehouse. In ELT, raw data is loaded first, and transformation happens inside the warehouse using SQL - exploiting the warehouse's own compute (BigQuery, Snowflake, Redshift, Databricks).
This shift was enabled by the economics of cloud data warehouses: compute became cheap and elastic, so doing transformation inside the warehouse became viable at any scale. dbt is the tool that makes ELT engineering-grade.
Historical Context
dbt was created by Drew Banin and Tristan Handy at Fishtown Analytics in 2016. The original insight was simple: what if every transformation step was just a SELECT statement, and the tool took care of the boilerplate (CREATE TABLE AS, INSERT INTO, etc.), dependency resolution, and testing? The first version was a side project. By 2020 it was the center of what became known as the "modern data stack." Fishtown Analytics rebranded to dbt Labs in 2021 and raised $150M Series C. Today dbt has over 35,000 active projects and is the de facto standard for SQL-based data transformation in the industry.
The key intellectual contribution was the ref() function: a way to reference another model by name rather than by table name. This single decision - the fact that ref('stg_orders') instead of schema.stg_orders - is what allows dbt to build a directed acyclic graph (DAG) of dependencies automatically.
dbt Core Concepts
What a dbt Model Is
In dbt, a model is a single SQL file containing a SELECT statement. That is it. No CREATE TABLE. No INSERT INTO. No DROP TABLE IF EXISTS. Just a SELECT.
dbt reads this SELECT, wraps it in the appropriate DDL for the configured materialization, and executes it against your data warehouse. The result is a relation (table or view) in your warehouse schema.
-- models/staging/stg_orders.sql
-- This entire file is just a SELECT. dbt handles the rest.
SELECT
order_id,
customer_id,
created_at,
total_amount_cents / 100.0 AS total_amount_usd,
status,
product_category_code -- use the new column name
FROM {{ source('raw', 'orders') }}
WHERE created_at >= '2020-01-01'
When you run dbt run, dbt executes CREATE TABLE AS (SELECT ...) or CREATE VIEW AS (SELECT ...) depending on the materialization you have configured.
The ref() Function: The Heart of dbt
The ref() function is how one model references another:
-- models/intermediate/int_user_order_stats.sql
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total_amount_usd) AS total_revenue_usd,
MAX(created_at) AS last_order_at,
MIN(created_at) AS first_order_at
FROM {{ ref('stg_orders') }} -- ← this is the magic
WHERE status = 'completed'
GROUP BY customer_id
When dbt sees ref('stg_orders'), it does two things:
- Resolves the name to the actual table/view name in the warehouse, including the correct schema and database prefix.
- Registers a dependency edge:
int_user_order_statsdepends onstg_orders. This edge is stored in a DAG.
After parsing every model file, dbt has a complete DAG of your entire transformation pipeline. It executes models in topological order, running independent models in parallel. This is the fundamental mechanism that replaces the numbered-file-cron-script pattern.
Sources: Documenting Raw Data
The source() function is the entry point for raw data - tables that dbt did not create:
# models/sources.yml
version: 2
sources:
- name: raw
database: analytics
schema: raw_data
tables:
- name: orders
description: "Raw orders from the transactional database (Postgres replica)"
loaded_at_field: created_at # used for freshness checks
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 24, period: hour}
columns:
- name: order_id
description: "Primary key. UUID format."
tests:
- unique
- not_null
- name: product_category_code
description: "Product category identifier. Renamed from product_category_id in March 2024."
The freshness block is critical for ML: if the raw orders table has not been updated in 24 hours, dbt will raise an error before running any models that depend on it. This catches the "silent stale data" problem at the source.
Run dbt source freshness to check all sources independently of a full run.
Model Materializations
Materialization is how dbt persists a model's result. Choosing the right materialization is one of the highest-leverage decisions in a dbt project - especially for ML feature tables that can be very large.
View
# dbt_project.yml
models:
my_project:
staging:
+materialized: view
A view is re-computed on every query. Zero storage cost, always fresh. Use views for staging models and lightweight transformations. Do not use views for expensive aggregations that are queried repeatedly - every downstream query will re-run the full computation.
Table
A full CREATE TABLE AS SELECT on every dbt run. Simple, fast to query, but re-scans all input data on every run. Use for small-to-medium models where a full rebuild is acceptable. For a model that aggregates 500GB of history, this is too expensive to run nightly.
Incremental
The workhorse for ML feature tables. dbt only processes new or updated records:
-- models/marts/fct_user_features.sql
{{
config(
materialized='incremental',
unique_key='customer_id',
incremental_strategy='merge'
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
-- Only process orders from the last 35 days on incremental runs
-- (30-day rolling window needs 30 days of lookback)
WHERE created_at >= CURRENT_DATE - INTERVAL '35 days'
{% endif %}
)
SELECT
customer_id,
COUNT(*) AS orders_30d,
SUM(total_amount_usd) AS revenue_30d,
MAX(created_at) AS last_order_at,
CURRENT_TIMESTAMP AS updated_at
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY customer_id
The {% if is_incremental() %} Jinja block is only active when the target table already exists. On the first run (full refresh), it processes all data. On subsequent runs, it only processes the last 35 days and merges the results into the existing table using customer_id as the unique key.
Ephemeral
Ephemeral models are not materialized at all - they are inlined as CTEs into the queries of models that reference them:
# dbt_project.yml
models:
my_project:
intermediate:
+materialized: ephemeral
Use ephemeral models for intermediate transformations that are only used by one downstream model and do not need to be inspected independently. Overusing ephemeral models can make queries very large and hard to debug.
The Full dbt Project Structure for ML
ecomm_dbt/
├── dbt_project.yml # project config, materialization defaults
├── packages.yml # dbt-utils, dbt-expectations, etc.
├── profiles.yml # warehouse connection (NOT in Git - keep in ~/.dbt/)
│
├── models/
│ ├── sources.yml # all raw source tables defined here
│ │
│ ├── staging/ # 1:1 with source tables, light cleaning
│ │ ├── stg_orders.sql
│ │ ├── stg_customers.sql
│ │ ├── stg_products.sql
│ │ └── schema.yml # tests + descriptions for staging models
│ │
│ ├── intermediate/ # business logic, joins, intermediate aggregations
│ │ ├── int_user_order_stats.sql
│ │ ├── int_user_session_stats.sql
│ │ └── schema.yml
│ │
│ └── marts/ # final outputs - feature tables for ML, reporting
│ ├── ml/
│ │ ├── fct_user_features.sql # incremental, feeds feature store
│ │ ├── fct_product_features.sql # incremental
│ │ └── schema.yml
│ └── reporting/
│ └── dim_customer_360.sql
│
├── tests/
│ └── assert_feature_completeness.sql # custom data test
│
├── macros/
│ ├── rolling_aggregates.sql # Jinja macro for 7d/14d/30d features
│ └── generate_schema_name.sql # custom schema routing
│
├── seeds/
│ └── product_category_mapping.csv # small lookup tables, versioned in Git
│
└── analyses/
└── feature_drift_check.sql # ad-hoc SQL, not run by dbt run
The three-layer architecture - staging → intermediate → marts - maps directly to the medallion architecture (bronze → silver → gold). Staging is the silver layer: cleaned, typed, renamed. Marts are the gold layer: business-ready feature tables.
Code Walkthrough
Staging Model: Clean Raw Data
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
),
renamed AS (
SELECT
-- primary key
order_id,
-- foreign keys
customer_id,
product_id,
-- standardize naming (source had inconsistent conventions)
product_category_code,
LOWER(TRIM(status)) AS status,
-- cast and clean amounts
total_amount_cents / 100.0 AS total_amount_usd,
discount_cents / 100.0 AS discount_usd,
-- timestamps - always store in UTC
created_at AT TIME ZONE 'UTC' AS created_at_utc,
completed_at AT TIME ZONE 'UTC' AS completed_at_utc,
-- metadata
_loaded_at -- when the raw loader inserted this row
FROM source
WHERE order_id IS NOT NULL -- filter known garbage rows
)
SELECT * FROM renamed
This model does no business logic - only renaming, type casting, and light filtering. If the source schema changes, only this model needs to be updated. Everything downstream uses ref('stg_orders') and is automatically protected.
Incremental Feature Model: Rolling 30-Day Aggregates
-- models/marts/ml/fct_user_features.sql
{{
config(
materialized='incremental',
unique_key='customer_id',
incremental_strategy='merge',
on_schema_change='sync_all_columns'
)
}}
WITH orders AS (
SELECT
customer_id,
total_amount_usd,
discount_usd,
product_category_code,
created_at_utc
FROM {{ ref('stg_orders') }}
WHERE status = 'completed'
{% if is_incremental() %}
-- On incremental runs: only recompute for customers with
-- activity in the last 35 days (30d window + 5d buffer)
AND created_at_utc >= CURRENT_TIMESTAMP - INTERVAL '35 days'
{% endif %}
),
features AS (
SELECT
customer_id,
-- Volume features
COUNT(*) AS orders_30d,
SUM(total_amount_usd) AS revenue_30d,
AVG(total_amount_usd) AS avg_order_value_30d,
-- Recency features
MAX(created_at_utc) AS last_order_at,
CURRENT_TIMESTAMP - MAX(created_at_utc) AS days_since_last_order,
-- Discount sensitivity
SUM(discount_usd) / NULLIF(SUM(total_amount_usd), 0) AS discount_rate_30d,
-- Category diversity
COUNT(DISTINCT product_category_code) AS unique_categories_30d,
-- Metadata for lineage tracking
CURRENT_TIMESTAMP AS feature_computed_at
FROM orders
WHERE created_at_utc >= CURRENT_TIMESTAMP - INTERVAL '30 days'
GROUP BY customer_id
)
SELECT * FROM features
:::tip Why the 35-day lookback in the incremental filter? The feature window is 30 days, but you filter at 35 days to ensure customers who ordered exactly 30 days ago are included even if there is clock skew or timezone drift. This 5-day buffer is a common production pattern. Forgetting this causes subtle feature staleness at the edge of the window. :::
Jinja Macro: Generate Rolling Features for Multiple Windows
Instead of copy-pasting the same aggregation for 7d, 14d, and 30d, write a macro:
-- macros/rolling_aggregates.sql
{% macro rolling_order_features(days_list=[7, 14, 30]) %}
{% for days in days_list %}
-- {{ days }}-day window features
COUNT(CASE WHEN created_at_utc >= CURRENT_TIMESTAMP - INTERVAL '{{ days }} days'
THEN 1 END) AS orders_{{ days }}d,
SUM(CASE WHEN created_at_utc >= CURRENT_TIMESTAMP - INTERVAL '{{ days }} days'
THEN total_amount_usd ELSE 0 END) AS revenue_{{ days }}d,
AVG(CASE WHEN created_at_utc >= CURRENT_TIMESTAMP - INTERVAL '{{ days }} days'
THEN total_amount_usd END) AS avg_order_value_{{ days }}d{% if not loop.last %},{% endif %}
{% endfor %}
{% endmacro %}
Use it in a model:
-- models/marts/ml/fct_user_multi_window_features.sql
{{
config(materialized='incremental', unique_key='customer_id')
}}
SELECT
customer_id,
{{ rolling_order_features([7, 14, 30, 90]) }},
CURRENT_TIMESTAMP AS feature_computed_at
FROM {{ ref('stg_orders') }}
WHERE status = 'completed'
GROUP BY customer_id
This macro generates 12 columns (4 windows × 3 metrics) without copy-paste. When the product team asks for a 60-day window, you add 60 to the list and re-run.
dbt Tests: Making Data Quality Explicit
Schema Tests (YAML)
# models/marts/ml/schema.yml
version: 2
models:
- name: fct_user_features
description: >
One row per customer. Rolling 30-day aggregations used as input to
the recommendation and churn prediction models. Updated nightly at 02:00 UTC.
columns:
- name: customer_id
description: "Foreign key to dim_customers. Primary key for this table."
tests:
- unique
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: orders_30d
description: "Number of completed orders in the last 30 days."
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 10000
- name: discount_rate_30d
description: "Discount as a fraction of total revenue. Between 0 and 1."
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0.0
max_value: 1.0
- name: feature_computed_at
description: "Timestamp when this row was last computed."
tests:
- not_null
Custom Data Test
Schema tests catch column-level issues. Custom data tests catch business-logic issues:
-- tests/assert_feature_completeness.sql
-- This test FAILS if less than 90% of active customers have features
WITH active_customers AS (
SELECT COUNT(DISTINCT customer_id) AS cnt
FROM {{ ref('stg_orders') }}
WHERE created_at_utc >= CURRENT_TIMESTAMP - INTERVAL '30 days'
AND status = 'completed'
),
featured_customers AS (
SELECT COUNT(DISTINCT customer_id) AS cnt
FROM {{ ref('fct_user_features') }}
WHERE orders_30d > 0
),
coverage AS (
SELECT
featured_customers.cnt::float / NULLIF(active_customers.cnt, 0) AS coverage_ratio
FROM active_customers, featured_customers
)
-- dbt tests pass when the query returns 0 rows
-- Return a row (which causes failure) when coverage drops below 90%
SELECT *
FROM coverage
WHERE coverage_ratio < 0.90
:::note How dbt tests work A dbt test is a SQL query. The test passes when the query returns zero rows. The test fails when the query returns one or more rows. This is why custom tests are written as "find the problem" queries - if no problems are found, no rows are returned, and the test passes. :::
dbt DAG: The Lineage Graph
This DAG is not drawn manually - dbt generates it automatically by parsing every ref() and source() call in your project. When you run dbt docs generate && dbt docs serve, you get an interactive version in your browser with search, lineage exploration, and column descriptions.
The dbt Compilation Process: What Happens Under the Hood
Understanding what dbt does when you run dbt run demystifies a lot of production issues.
Step 1: Parse. dbt reads every .sql, .yml, and .py file in the project. It builds an in-memory graph of all models, sources, tests, and their dependencies. This produces the manifest.json artifact - a complete description of the project state.
Step 2: Compile. Jinja templating is resolved: {{ ref('stg_orders') }} becomes the actual fully-qualified table name (my_project.dbt_prod.stg_orders), {% if is_incremental() %} blocks are evaluated, and macros are expanded. The output is pure SQL, stored in target/compiled/.
Step 3: Execute. dbt establishes connections to the warehouse and executes the compiled SQL, respecting the dependency order from the DAG. With threads: 8, up to 8 independent models run in parallel.
You can inspect what dbt will run without actually running it:
# Compile without executing - inspect the SQL that dbt will run
dbt compile --select fct_user_features
# View the compiled SQL
cat target/compiled/ecomm_dbt/models/marts/ml/fct_user_features.sql
This is invaluable when debugging: you see exactly the SQL that hit your warehouse, with all Jinja resolved.
dbt _project.yml: Configuration Deep Dive
The dbt_project.yml is the central configuration file. Key sections for ML teams:
# dbt_project.yml
name: ecomm_dbt
version: '1.0.0'
config-version: 2
# Warehouse connection profile (defined in ~/.dbt/profiles.yml)
profile: ecomm_dbt
# Directories
model-paths: ["models"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
# Default target schema for compiled artifacts
target-path: "target"
clean-targets: ["target", "dbt_packages"]
models:
ecomm_dbt:
# Staging: views are cheap, always fresh
staging:
+materialized: view
+tags: ['staging']
# Intermediate: ephemeral - CTEs in downstream queries
intermediate:
+materialized: ephemeral
# Marts: tables for performance, incremental for large ones
marts:
+materialized: table
ml:
+materialized: incremental # override for ML subfolder
+tags: ['ml', 'nightly']
+post-hook: "GRANT SELECT ON {{ this }} TO ROLE ml_training_role"
reporting:
+tags: ['reporting']
# Grant access to all mart models after they are created
on-run-end:
- "{{ grant_select_on_schemas(schemas, 'analyst_role') }}"
The + prefix means "apply to this directory and all subdirectories." Configuration cascades: a setting at ecomm_dbt applies to every model; a setting at marts.ml overrides it for the ml/ subdirectory only.
dbt Docs: Auto-Generated Data Catalog
The dbt docs command generates a fully interactive data catalog from your project:
dbt docs generate # generates catalog.json and manifest.json
dbt docs serve # starts a local web server at localhost:8080
The generated documentation includes:
- DAG visualization: an interactive graph of every model, source, seed, and snapshot, with the ability to click any node and see its full lineage
- Model descriptions: pulled from the
descriptionfields inschema.yml - Column descriptions: every documented column, with its type and tests
- Test coverage: which columns have which tests, and their last pass/fail status
- Source documentation: raw tables, their freshness status, and their downstream dependents
For ML teams, the docs serve as the data contract between the data engineering team and the ML team. When the ML engineer asks "what does orders_30d actually mean - is it calendar days or business days, does it include cancelled orders?" - the answer should be in the docs, not in Slack.
:::note Documenting ML features pays dividends Models and columns without descriptions are a support burden: engineers repeatedly answer the same questions about what a column means. A one-time investment in writing good descriptions for ML feature columns - what the window is, what events are included, how NULLs are handled - eliminates dozens of hours of future context-switching. Treat feature documentation like docstrings in a library. :::
Running dbt in Practice
# Run all models in the project
dbt run
# Run only a specific model and all its upstream dependencies
dbt run --select +fct_user_features
# Run only models that have changed since the last successful run (slim CI)
dbt run --select state:modified+
# Run all tests
dbt test
# Run tests only for a specific model
dbt test --select fct_user_features
# Check source freshness (is raw data up to date?)
dbt source freshness
# Generate and serve the data catalog
dbt docs generate
dbt docs serve # opens at http://localhost:8080
# Full refresh of an incremental model (reprocess all history)
dbt run --full-refresh --select fct_user_features
# Run everything: models + tests + snapshots
dbt build
The dbt build command is the most important one for production: it runs models, tests, snapshots, and seeds in dependency order, stopping if any test fails. This is what you put in your nightly CI job.
The Incremental Strategy Decision
Choosing the right incremental_strategy is one of the most impactful decisions for ML feature table performance. The options depend on your warehouse:
| Strategy | BigQuery | Snowflake | Databricks | Use When |
|---|---|---|---|---|
merge | Yes | Yes | Yes | You have a unique key; updates and inserts both needed |
insert_overwrite | Yes | No | Yes | You can re-process entire partitions; append-only |
append | Yes | Yes | Yes | Truly append-only data; no updates ever |
delete+insert | No | Yes | Yes | Snowflake alternative to merge; faster for large updates |
merge is the most common for ML feature tables: it upserts based on a unique_key, which handles both new users (insert) and existing users whose features have been recomputed (update).
insert_overwrite with partition_by is often faster than merge for date-partitioned feature tables: instead of row-level merge, dbt overwrites entire date partitions. This is correct when your feature computation is fully deterministic for a given date partition:
-- models/marts/ml/fct_daily_user_features.sql
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
'field': 'feature_date',
'data_type': 'date',
'granularity': 'day'
}
)
}}
SELECT
customer_id,
DATE(CURRENT_TIMESTAMP) AS feature_date,
COUNT(*) AS orders_today,
SUM(total_amount_usd) AS revenue_today
FROM {{ ref('stg_orders') }}
WHERE DATE(created_at_utc) = DATE(CURRENT_TIMESTAMP)
{% if is_incremental() %}
-- Filter to only process today's partition on incremental runs
-- dbt will overwrite only today's partition
{% endif %}
GROUP BY customer_id
The insert_overwrite approach for feature tables has a key advantage: it is idempotent. If the pipeline fails halfway through, re-running it will simply overwrite the partial partition. With merge, a partial run may leave the table in an inconsistent state.
dbt + Spark: Running at Scale
For feature tables that are too large for your data warehouse, dbt has adapters for distributed compute:
# profiles.yml - Databricks / Spark adapter
ecomm_dbt:
target: prod
outputs:
prod:
type: databricks
host: adb-xxxxx.azuredatabricks.net
http_path: /sql/1.0/warehouses/xxxxx
token: "{{ env_var('DATABRICKS_TOKEN') }}"
catalog: prod_catalog
schema: ml_features
threads: 8
With the dbt-spark or dbt-databricks adapter, the same SQL models run on Spark clusters via Databricks SQL. Incremental models use Spark's MERGE INTO syntax. For very large feature tables (billions of rows), you can also use file_format: delta and partition_by:
-- models/marts/ml/fct_user_features_spark.sql
{{
config(
materialized='incremental',
unique_key='customer_id',
incremental_strategy='merge',
file_format='delta',
partition_by={'field': 'event_date', 'data_type': 'date'}
)
}}
Integrating dbt with Orchestration: Airflow and Prefect
dbt is not an orchestrator - it does not schedule itself, manage retries, or integrate with external systems. It needs to be called from an orchestrator. The two most common patterns are:
Pattern 1: dbt as a shell task in Airflow
# dags/nightly_feature_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=10),
'on_failure_callback': alert_slack,
}
with DAG(
'nightly_ml_features',
default_args=default_args,
schedule_interval='0 2 * * *', # 2 AM UTC nightly
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
check_freshness = BashOperator(
task_id='dbt_source_freshness',
bash_command='dbt source freshness --profiles-dir /opt/dbt',
)
run_models = BashOperator(
task_id='dbt_build',
bash_command='dbt build --select tag:nightly --profiles-dir /opt/dbt',
)
export_to_feature_store = PythonOperator(
task_id='export_features',
python_callable=export_features_to_feast,
)
check_freshness >> run_models >> export_to_feature_store
Pattern 2: Using the dbt Python API for programmatic control
# export_features.py - run programmatically, not via CLI
from dbt.cli.main import dbtRunner, dbtRunnerResult
runner = dbtRunner()
# Run specific models and capture the result
result: dbtRunnerResult = runner.invoke([
"build",
"--select", "tag:ml",
"--target", "prod"
])
if result.success:
print("dbt build succeeded")
# Parse results to check individual model status
for r in result.result.results:
print(f" {r.node.name}: {r.status} ({r.execution_time:.1f}s)")
else:
raise Exception(f"dbt build failed: {result.exception}")
The programmatic API gives you access to the full result object - individual model runtimes, row counts, test outcomes - which you can feed into your pipeline monitoring system or use to gate downstream processes.
Production Engineering Notes
:::warning Incremental model correctness The most common production bug with incremental models is forgetting that a rolling window aggregation must look back further than the window size. If your feature window is 30 days and you only filter the incremental run to the last 30 days, customers with no new activity in the last 30 days will have stale features. Always add a buffer (35 days for a 30-day window) and always verify the incremental logic with a spot-check query. :::
:::danger Silent schema changes
dbt's on_schema_change='sync_all_columns' config does not protect you if an upstream source drops a column that your staging model references - that will raise a warehouse error. Use dbt source freshness and column-level tests to catch this early. Consider using {{ source('raw', 'orders') }} in staging rather than hardcoding column names to make it easier to audit when source schemas change.
:::
:::tip seeds for lookup tables
Small reference tables (category mappings, country codes, business rules) should be seeds - CSV files committed to your dbt project. This gives you version control for lookup data, not just code. Use dbt seed to load them. Reference them with ref('product_category_mapping') just like a model.
:::
Common Mistakes
Mistake 1: Putting business logic in staging models. Staging models should be 1:1 with source tables - only renaming, type casting, deduplication. Business logic (joins, aggregations, business rules) belongs in intermediate or marts models. If your staging model has a 5-table join, you have already violated this.
Mistake 2: Using table materialization for large models. A model that aggregates 2 years of event data should be incremental, not table. A full rebuild that takes 4 hours nightly is a time bomb. Move to incremental as soon as a model exceeds a few minutes of rebuild time.
Mistake 3: Not running dbt test in your production pipeline.
Running dbt run without dbt test is like deploying code without running tests. Use dbt build (which runs both) or add a dbt test step after dbt run in your CI/CD pipeline. If a test fails, the pipeline should stop before the ML training job starts.
Mistake 4: Circular refs. dbt will raise an error for circular dependencies, but the error message can be confusing. The fix is always architectural: identify which model should be the source of truth and restructure the dependency direction.
Mistake 5: Ignoring source freshness.
Teams configure sources without freshness checks because it seems optional. When a raw data loader fails silently for 8 hours, every model in your DAG runs successfully on stale data. Source freshness checks are your first line of defense.
Interview Q&A
Q: What is the difference between a view and an incremental materialization in dbt?
A view is a stored SQL definition that is executed at query time - no data is stored, and every query re-runs the full SQL. This is free in storage but expensive if queried repeatedly. An incremental model is a physical table that dbt updates on each run by only processing new or changed records, using a MERGE INTO or INSERT strategy. For ML feature tables with hundreds of millions of rows, incrementals are essential - a full rebuild every night would be prohibitively expensive. The trade-off is that incremental logic introduces complexity: you must correctly define the lookback window and the unique key, and you must periodically do a --full-refresh to reprocess corrected historical data.
Q: How does dbt construct the execution DAG?
dbt parses every model file at compile time, scanning for ref() and source() function calls. Each call creates a directed edge in a graph: the calling model depends on the referenced model. dbt then performs a topological sort of this graph to determine execution order. Models with no dependencies run first; models that multiple others depend on run before all of them. This allows dbt to also parallelize execution: independent models - those with no shared dependencies - run concurrently, controlled by the threads setting in profiles.yml.
Q: How do you use dbt to build ML feature tables?
The standard pattern is: staging models clean raw source data (one per source table), intermediate models perform joins and business logic, and mart models in a dedicated ml/ subfolder produce the final feature tables. Mart feature models use incremental materialization with a rolling window filter in the is_incremental() block. Schema tests verify uniqueness, null rates, and value ranges on every feature column. A custom data test verifies that coverage - the percentage of active users with computed features - stays above a threshold. The output table feeds either a feature store or is exported directly to S3 as Parquet for training.
Q: What are dbt sources and why are they different from refs?
ref() references a model that dbt itself created. source() references a raw table that exists in the warehouse but was loaded by an external process (a data loader, Fivetran, a Kafka consumer). The distinction matters because sources have different lifecycle management: they can have freshness checks, they are documented separately, and they represent the boundary between "data dbt controls" and "data that comes from elsewhere." If you use ref('raw_orders') instead of source('raw', 'orders'), you lose freshness checking and source documentation, and you imply that dbt created that table - which it did not.
Q: How do you test data quality in dbt?
dbt has two test types. Schema tests are declared in YAML and apply to specific columns: unique, not_null, accepted_values (like an enum check), and relationships (foreign key integrity). Custom data tests are SQL files in the tests/ directory - any query that returns rows is treated as a failure. For ML specifically, you want: not_null and unique on the primary key, accepted_range (from dbt-utils) on feature values to catch numerical anomalies (negative counts, rates above 1.0), and a custom coverage test that verifies the proportion of active users with computed features. Run all tests with dbt test or dbt build.
Q: How does dbt fit into the modern data stack?
dbt sits in the "T" of ELT. Raw data loaders (Fivetran, Airbyte, custom Kafka consumers) handle the "EL" - extracting data from operational systems and loading it into a cloud data warehouse (BigQuery, Snowflake, Databricks). dbt then handles the "T" - building the transformation layer inside the warehouse using SQL. Downstream consumers - BI tools (Looker, Metabase), ML training pipelines, feature stores - query the dbt-managed tables. The modern data stack is: source systems → loader → warehouse → dbt → consumers. dbt's role is to make the transformation layer reproducible, testable, and documented, replacing the ad-hoc SQL script approach that predated it.
