Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Quality Checks demo on the EngineersOfAI Playground - no code required. :::

dbt Tests for Quality: Encoding Contracts at the Transformation Layer

The 340,000-Row Silent Failure

The churn prediction model had been in production for seven months. The ML team was happy with it - early metrics showed strong AUC and the business team had approved the quarterly plan based on its retention forecasts. Then someone noticed the model's lift was declining. Not crashing - just slowly, methodically eroding. From 0.71 AUC to 0.68 over four months. Nobody raised an alarm because there was no sudden event to trigger one.

When they finally dug into the feature table, they found it. The join that built the user_engagement_features table used three upstream tables: events, users, and subscriptions. The join key between events and users was user_id. The users table had migrated user_id to a new bigint type six months ago. The events table still had it as varchar. The join was silently producing NULL join keys - which dbt's LEFT JOIN happily kept, filling the downstream features with zeros and NULLs.

Three hundred and forty thousand rows in the feature table had NULL user identifiers. They had been training the churn model on ghost users - rows that represented no real person, filled with imputed zeros that taught the model nothing useful. The model learned from noise. It performed at noise level.

A single line in schema.yml:

- name: user_id
tests:
- not_null

That is all it would have taken. One test, run every time the model built. The failure would have surfaced on day one, before a single bad row reached the feature store.


Why This Exists

dbt is a transformation framework - you write SQL SELECT statements and dbt handles materialization, dependency management, and incremental builds. But transformation is only half of the job. The other half is verification: ensuring that the transformations produce correct output.

Without tests, a dbt project is a trust exercise. You trust that the SQL is correct. You trust that upstream schema hasn't changed. You trust that join keys are clean. You trust that the data that enters your ML pipeline is what you think it is. That trust is routinely misplaced.

dbt tests solve this by embedding quality checks directly in the transformation layer - at the same place and time where transformation happens. When you run dbt build, it builds and tests in one pass. The test runs immediately after each model materializes, before downstream models consume its output. A failing test is a failing build. The bad data never propagates.

This is the critical insight: test at the transformation layer, not after. Testing downstream (in the serving system, in the ML pipeline, after the data has already flowed) catches bugs after they've had time to cause damage. Testing in dbt catches them at the source.


Historical Context

dbt (data build tool) was created by Fishtown Analytics (now dbt Labs) and open-sourced in 2016. The original insight was that analytics engineers - people who write SQL for data transformation - deserved software engineering practices: version control, modularity, documentation, and testing. Before dbt, SQL transformations lived in stored procedures, ad-hoc scripts, or buried inside Airflow PythonOperators with no testing whatsoever.

The testing framework was part of dbt from early on, but it evolved significantly. Early versions had "schema tests" (applied via YAML to model columns) and "data tests" (custom SQL queries in a tests/ directory). dbt Core 0.20 (2021) rebranded these as "generic tests" and "singular tests" respectively and introduced the ability to write reusable generic tests as Jinja macros. The dbt-utils and dbt-expectations packages extended the test library further. By 2023, dbt had become the de facto transformation standard, and its test framework became the primary data quality layer for analytics engineering workflows.


Test Types: A Taxonomy

dbt has two fundamental test categories, each with distinct use cases:

Generic tests (formerly schema tests): Written once, applied to many models via YAML. They are parameterized - you specify the column and parameters in schema.yml, and dbt generates the SQL. Four are built in; any number can be added via packages or custom macros.

Singular tests (formerly data tests): Custom SQL files in the tests/ directory. Each file is a complete SQL query. dbt runs the query and considers the test failing if the query returns any rows. These encode business logic too complex for generic tests.

The decision rule: if a check applies to a single column or a standard structural property (nulls, uniqueness, referential integrity, accepted values), use a generic test. If the check requires joins across multiple tables, complex business logic, or multi-column aggregations, use a singular test.


Built-in Generic Tests

The four built-in generic tests cover the most common structural checks. They are defined in schema.yml (or models.yml, same file) alongside model documentation.

Full Schema Example: User Features Model

# models/features/schema.yml
version: 2

models:
- name: user_engagement_features
description: >
Daily user engagement features for churn prediction.
Grain: one row per user per day. Joined from events, users, subscriptions.
columns:
- name: user_id
description: Unique identifier for the user. Primary key.
tests:
- not_null # never null - JOIN key
- unique # one row per user per partition date

- name: partition_date
description: Date this feature snapshot was computed for.
tests:
- not_null

- name: subscription_tier
description: User's subscription tier at partition_date.
tests:
- not_null
- accepted_values:
values: ['free', 'pro', 'enterprise', 'churned']

- name: source_system_user_id
description: Foreign key back to the users table.
tests:
- relationships:
to: ref('users')
field: user_id

- name: events_30d
description: Count of events in last 30 days.
tests:
- not_null
# events count must be non-negative
- dbt_utils.expression_is_true:
expression: ">= 0"

- name: last_active_date
description: Most recent event date for this user.
tests:
- not_null:
severity: warn # warn only - some users may be inactive

The Four Built-in Tests in Detail

not_null: Verifies no rows have NULL in the column. Generated SQL:

-- Generated by dbt for not_null test on user_id
select user_id
from {{ model }}
where user_id is null

unique: Verifies no duplicate values. Generated SQL:

-- Generated by dbt for unique test on user_id
select user_id, count(*) as n
from {{ model }}
group by user_id
having count(*) > 1

accepted_values: Verifies all values are in the allowed set. Generated SQL:

-- Generated by dbt for accepted_values test on subscription_tier
select subscription_tier
from {{ model }}
where subscription_tier not in ('free', 'pro', 'enterprise', 'churned')
and subscription_tier is not null

relationships: Verifies referential integrity - every value in this column exists in the referenced table's column:

-- Generated by dbt for relationships test
select source_system_user_id
from {{ model }}
where source_system_user_id is not null
and source_system_user_id not in (
select user_id from {{ ref('users') }}
)

Source Freshness Tests

For ML pipelines, data freshness is as important as data correctness. A feature table with correct values but stale data is worse than no data - the model makes predictions based on outdated reality.

dbt's source freshness tests run on sources.yml and check how recently the source table was updated:

# models/sources.yml
version: 2

sources:
- name: events_raw
database: production
schema: raw_events
loaded_at_field: _loaded_at # timestamp column that tracks ingestion time
freshness:
warn_after: {count: 6, period: hour} # warn if older than 6 hours
error_after: {count: 24, period: hour} # error if older than 24 hours
tables:
- name: clickstream_events
description: Raw clickstream events from the web app.
freshness:
warn_after: {count: 1, period: hour} # stricter: real-time source
error_after: {count: 4, period: hour}

- name: subscriptions_raw
database: production
schema: raw_crm
loaded_at_field: synced_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 48, period: hour}
tables:
- name: subscription_changes
description: CRM sync of subscription state changes.

Run source freshness separately:

dbt source freshness
# Or as part of a full build:
dbt build --select source:events_raw+

:::tip Freshness for ML Feature Stores

For ML features served at inference time, freshness thresholds need to be tighter than for analytical queries. A customer analytics dashboard can tolerate 24-hour-old data. A real-time fraud detection model serving predictions on live transactions cannot. Set freshness error thresholds to match your model's staleness tolerance, not your analyst's.

Also: test both source freshness (when did raw data arrive?) and feature freshness (when were features computed?). These can diverge when feature engineering jobs fail silently. :::


Custom Generic Tests

When built-in tests don't cover your quality requirement, you write custom generic tests as Jinja macros. These live in the macros/ directory and are applied in schema.yml like built-in tests.

test_not_null_proportion: Partial Completeness Check

-- macros/test_not_null_proportion.sql
{% test not_null_proportion(model, column_name, min_proportion=0.95) %}
/*
Passes if at least min_proportion of values are non-null.
Useful for optional fields where some nulls are acceptable.
Returns rows where the proportion falls below the threshold (i.e., fails if any rows returned).
*/
with validation as (
select
count(*) as total_rows,
count({{ column_name }}) as non_null_rows,
count({{ column_name }}) * 1.0 / nullif(count(*), 0) as non_null_proportion
from {{ model }}
),

check_result as (
select
total_rows,
non_null_rows,
non_null_proportion,
{{ min_proportion }} as required_proportion
from validation
where non_null_proportion < {{ min_proportion }}
)

select * from check_result

{% endtest %}

Apply it in schema.yml:

- name: age_years
description: User age in years. Optional field from profile completion.
tests:
- not_null_proportion:
min_proportion: 0.80 # 80% of users must have age filled in

test_no_future_dates: Label Leakage Detection

-- macros/test_no_future_dates.sql
{% test no_future_dates(model, column_name, max_lag_hours=1) %}
/*
Passes if all values in column_name are not in the future
(allowing max_lag_hours of clock skew).
Critical for ML: ensures no label leakage from future events.
*/
with validation as (
select
{{ column_name }} as date_value,
current_timestamp as check_time,
dateadd(hour, {{ max_lag_hours }}, current_timestamp) as allowed_upper_bound
from {{ model }}
where {{ column_name }} is not null
),

failures as (
select *
from validation
where date_value > allowed_upper_bound
)

select * from failures

{% endtest %}

test_column_sum_equals_reference: Reconciliation

-- macros/test_column_sum_equals_reference.sql
{% test column_sum_equals_reference(
model,
column_name,
reference_model,
reference_column,
tolerance_pct=0.01
) %}
/*
Reconciliation test: sum of column_name in this model should equal
sum of reference_column in reference_model, within tolerance_pct.
Use for financial reconciliation, ensuring no rows dropped in transformation.
*/
with model_sum as (
select sum({{ column_name }}) as total
from {{ model }}
),

reference_sum as (
select sum({{ reference_column }}) as total
from {{ ref(reference_model) }}
),

comparison as (
select
m.total as model_total,
r.total as reference_total,
abs(m.total - r.total) as absolute_diff,
abs(m.total - r.total) / nullif(abs(r.total), 0) as relative_diff,
{{ tolerance_pct }} as allowed_tolerance
from model_sum m
cross join reference_sum r
)

select *
from comparison
where relative_diff > allowed_tolerance

{% endtest %}

Apply for financial pipeline reconciliation:

- name: revenue_usd
tests:
- column_sum_equals_reference:
reference_model: payments_raw
reference_column: payment_amount_usd
tolerance_pct: 0.001 # within 0.1%

test_distribution_within_bounds: Statistical Drift Check

-- macros/test_distribution_within_bounds.sql
{% test distribution_within_bounds(
model,
column_name,
min_mean,
max_mean,
max_stddev=None
) %}
/*
Passes if the mean of column_name is within [min_mean, max_mean].
Optionally checks that stddev doesn't exceed max_stddev.
Use for ML features to detect distribution drift.
*/
with stats as (
select
avg({{ column_name }}) as col_mean,
stddev({{ column_name }}) as col_stddev
from {{ model }}
where {{ column_name }} is not null
),

failures as (
select
col_mean,
col_stddev,
{{ min_mean }} as min_allowed_mean,
{{ max_mean }} as max_allowed_mean
from stats
where col_mean < {{ min_mean }}
or col_mean > {{ max_mean }}
{% if max_stddev is not none %}
or col_stddev > {{ max_stddev }}
{% endif %}
)

select * from failures

{% endtest %}

Singular Tests: Complex Business Logic

Singular tests are SQL files in the tests/ directory. The test fails if the SQL returns any rows. These handle business rules that require joins, subqueries, or logic that generic tests cannot express.

Test: No Duplicate Revenue on Same Day

-- tests/no_duplicate_revenue_per_user_day.sql
/*
Each user should have at most one revenue record per day.
Duplicates indicate double-counting in the billing pipeline.
FAILING THIS TEST means revenue is overstated.
*/
select
user_id,
revenue_date,
count(*) as record_count,
sum(revenue_usd) as total_revenue
from {{ ref('daily_user_revenue') }}
group by user_id, revenue_date
having count(*) > 1

Test: Churn Label Consistency

-- tests/churn_label_consistency.sql
/*
A user marked as churned should have zero activity in the 30 days
following their churn date. If we see activity from a churned user,
the churn label or the activity join is wrong.
*/
select
u.user_id,
u.churn_date,
e.event_date,
e.event_count
from {{ ref('churned_users') }} u
inner join {{ ref('daily_user_events') }} e
on u.user_id = e.user_id
and e.event_date > u.churn_date
and e.event_date <= dateadd(day, 30, u.churn_date)
where e.event_count > 0

Test: Feature Store - No Future Features in Training Set

-- tests/no_future_features_in_training.sql
/*
Label leakage check: features computed AFTER the label event date
should not appear in the training dataset.
Any row returned is a potential label leakage case.
*/
select
t.example_id,
t.user_id,
t.label_event_date,
t.feature_snapshot_date,
datediff(day, t.label_event_date, t.feature_snapshot_date) as days_ahead
from {{ ref('training_dataset') }} t
where t.feature_snapshot_date > t.label_event_date

Test: Partition Completeness

-- tests/partition_row_count_sanity.sql
/*
Today's partition should have at least 80% of yesterday's row count.
A sudden drop indicates ETL failure or upstream data loss.
*/
with today_count as (
select count(*) as rows
from {{ ref('user_engagement_features') }}
where partition_date = current_date
),

yesterday_count as (
select count(*) as rows
from {{ ref('user_engagement_features') }}
where partition_date = current_date - 1
),

comparison as (
select
t.rows as today_rows,
y.rows as yesterday_rows,
t.rows * 1.0 / nullif(y.rows, 0) as ratio
from today_count t
cross join yesterday_count y
)

select *
from comparison
where ratio < 0.80 -- fail if today has less than 80% of yesterday's rows

Test Severity Configuration

Not every failing test should halt the pipeline. dbt supports two severity levels:

  • error (default): test failure causes dbt build to return a non-zero exit code, stopping downstream tasks
  • warn: test failure is logged and reported but dbt build succeeds, allowing downstream models to continue
models:
- name: user_engagement_features
columns:
- name: user_id
tests:
- not_null:
severity: error # HARD BLOCK: null user_ids are never acceptable

- unique:
severity: error # HARD BLOCK: duplicates corrupt the join downstream

- name: last_active_date
tests:
- not_null:
severity: warn # SOFT: some inactive users may have null here

- name: predicted_ltv_usd
tests:
- not_null:
severity: warn # SOFT: derived field, null means model didn't run
- distribution_within_bounds:
min_mean: 10.0
max_mean: 500.0
severity: warn # SOFT: alert if distribution shifts, don't block

:::tip Severity Strategy for ML Pipelines

Use error (hard block) for:

  • Primary and foreign key integrity (nulls, duplicates, referential integrity)
  • Schema existence checks
  • Anti-leakage checks (future timestamps in features)
  • Source freshness for real-time features

Use warn (soft alert) for:

  • Statistical distribution bounds (mean, stddev)
  • Optional field completeness
  • Trend-based checks (row count vs. yesterday)

The guiding principle: block on structural failures (data is wrong), warn on distributional shifts (data might be drifting). :::


Integrating dbt Tests in CI/CD

Running the full test suite on every commit is expensive. dbt's state-based CI solves this: only run tests for models that changed or are downstream of changes.

# In CI (GitHub Actions, GitLab CI, etc.):

# Step 1: Compile the project against the production state
dbt compile --target prod --artifact-path prod-artifacts/

# Step 2: Run only modified models and their downstream dependencies
dbt build --select state:modified+ \
--defer \
--state prod-artifacts/

# The + means: modified models AND everything downstream of them
# --defer: use production tables for any models not in the modified set
# --state: compare against the production manifest to identify changes

GitLab CI Integration

# .gitlab-ci.yml
dbt_test_ci:
stage: test
image: python:3.11
script:
- pip install dbt-snowflake==1.7.0
- |
# Download production manifest for state comparison
aws s3 cp s3://my-bucket/dbt-artifacts/manifest.json prod-artifacts/manifest.json
- |
# Run only tests for modified models
dbt build \
--select state:modified+ \
--defer \
--state prod-artifacts/ \
--target ci
artifacts:
paths:
- target/run_results.json
- target/manifest.json
when: always # upload even on failure - needed for debugging
only:
- merge_requests

Full Production Build with Quality Gate

# Production DAG (Airflow, Prefect, etc.)

# 1. Run source freshness - fail if sources are stale
dbt source freshness --select source:events_raw

# 2. Build all models and run all tests
dbt build --target prod

# 3. If tests pass, trigger ML training
# (triggered by the DAG downstream of dbt build success)

dbt + Great Expectations: dbt-expectations

The dbt-expectations package brings Great Expectations-style tests into dbt's generic test framework. Install it via packages.yml:

# packages.yml
packages:
- package: calogica/dbt_expectations
version: [">=0.10.0", "<0.11.0"]
- package: dbt-labs/dbt_utils
version: [">=1.1.0", "<2.0.0"]
dbt deps

Now use GX-style tests directly in schema.yml:

models:
- name: user_engagement_features
columns:
- name: age_years
tests:
# GX-style range check
- dbt_expectations.expect_column_values_to_be_between:
min_value: 13
max_value: 120
row_condition: "age_years is not null" # only check non-null rows

# GX-style regex match
- dbt_expectations.expect_column_values_to_match_regex:
regex: "^[0-9]+$"

- name: subscription_tier
tests:
# Proportion of values in set (at least 99%)
- dbt_expectations.expect_column_proportion_of_unique_values_to_be_between:
min_value: 0.001 # at least 0.1% of values are distinct

- name: events_30d
tests:
# Statistical bounds
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 1.0
max_value: 500.0

- dbt_expectations.expect_column_quantile_values_to_be_between:
quantile: 0.95
min_value: 50
max_value: 2000

tests:
# Table-level: row count must be substantial
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 100000
max_value: 50000000

ML Pipeline Quality Gates

The integration between dbt tests and ML training is the highest-value application. dbt tests become the automated gate that prevents training on bad data.

# airflow/dags/churn_model_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.exceptions import AirflowFailException
import subprocess
import json

def run_dbt_quality_gate(models: list[str], **context):
"""
Run dbt tests for specific models. Raise AirflowFailException if any fail.
This function is the ML training gate - training only starts if this passes.
"""
select_clause = " ".join(models)

result = subprocess.run(
[
"dbt", "test",
"--select", select_clause,
"--target", "prod",
"--output", "json",
],
capture_output=True,
text=True,
cwd="/opt/airflow/dbt_project"
)

if result.returncode != 0:
# Parse the test results for a meaningful error message
failed_tests = []
for line in result.stdout.split("\n"):
if line.strip():
try:
entry = json.loads(line)
if entry.get("status") == "fail":
failed_tests.append(
f"FAIL: {entry.get('unique_id', 'unknown')} "
f"- {entry.get('failures', 0)} failures"
)
except json.JSONDecodeError:
pass

error_msg = (
f"dbt quality gate failed - {len(failed_tests)} tests failed.\n"
+ "\n".join(failed_tests)
+ "\nAborting ML training to prevent training on bad data."
)
raise AirflowFailException(error_msg)

return f"All dbt tests passed. Proceeding to model training."


with DAG(
dag_id="churn_model_weekly_retrain",
schedule_interval="0 6 * * 1", # Monday 6am
catchup=False,
) as dag:

# Gate 1: Source freshness
check_source_freshness = BashOperator(
task_id="check_source_freshness",
bash_command=(
"cd /opt/airflow/dbt_project && "
"dbt source freshness --select source:events_raw source:subscriptions_raw"
),
)

# Gate 2: Feature table quality
validate_features = PythonOperator(
task_id="validate_feature_table",
python_callable=run_dbt_quality_gate,
op_kwargs={
"models": [
"user_engagement_features",
"subscription_history_features",
"churn_training_dataset",
]
},
)

# Only runs if Gate 2 passes
trigger_training = BashOperator(
task_id="trigger_ml_training",
bash_command="python /opt/airflow/scripts/trigger_training_job.py",
)

check_source_freshness >> validate_features >> trigger_training

Test Coverage Tracking

Just as you track code coverage in software engineering, track test coverage across your dbt project:

# scripts/dbt_test_coverage.py
"""
Computes test coverage for all dbt models.
Reports: tested columns / total columns, by model and overall.
"""
import json
from pathlib import Path


def compute_test_coverage(manifest_path: str) -> dict:
with open(manifest_path) as f:
manifest = json.load(f)

coverage = {}

# Find all models
models = {
k: v for k, v in manifest["nodes"].items()
if v["resource_type"] == "model"
}

# Find all tests
tests = {
k: v for k, v in manifest["nodes"].items()
if v["resource_type"] == "test"
}

for model_id, model in models.items():
model_name = model["name"]
all_columns = set(model.get("columns", {}).keys())

# Find which columns have tests
tested_columns = set()
model_tests = []
for test_id, test in tests.items():
# Check if this test is attached to this model
if model_name in test_id:
model_tests.append(test["name"])
# Extract column from test config
config = test.get("config", {})
col = config.get("column_name")
if col:
tested_columns.add(col)

total_cols = len(all_columns)
tested_cols = len(tested_columns)
coverage_pct = (tested_cols / total_cols * 100) if total_cols > 0 else 0

coverage[model_name] = {
"total_columns": total_cols,
"tested_columns": tested_cols,
"coverage_pct": round(coverage_pct, 1),
"test_count": len(model_tests),
"untested_columns": sorted(all_columns - tested_columns),
}

return coverage


if __name__ == "__main__":
coverage = compute_test_coverage("target/manifest.json")

print(f"{'Model':<40} {'Cols':<6} {'Tested':<8} {'Coverage':<10} {'Tests':<6}")
print("-" * 75)

for model, stats in sorted(coverage.items()):
flag = " LOW" if stats["coverage_pct"] < 50 else ""
print(
f"{model:<40} "
f"{stats['total_columns']:<6} "
f"{stats['tested_columns']:<8} "
f"{stats['coverage_pct']:<10}%"
f"{stats['test_count']:<6}"
f"{flag}"
)

Common Mistakes

:::danger The unique Test on Non-Grain Columns

The unique test is only meaningful on the grain of the model - the column or set of columns that uniquely identifies one row. Applying unique to other columns is a logic error.

# WRONG: applying unique to a non-grain column
- name: subscription_tier # not a grain column!
tests:
- unique # this will always fail - multiple users have "pro" tier

# CORRECT: unique only on the primary key
- name: user_id
tests:
- unique
- not_null

For composite grains (one row per user per day), use dbt_utils.unique_combination_of_columns at the model level:

models:
- name: user_engagement_features
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- user_id
- partition_date

:::

:::danger Testing Transformed Data But Not Source Data

A common gap: testing the output of dbt models thoroughly, but not testing the raw source tables. Problems in source data pass through transformations, and while your model tests might catch them, they might not - especially if the transformation masks the defect.

Always add source tests in sources.yml alongside model tests:

sources:
- name: events_raw
tables:
- name: clickstream_events
columns:
- name: user_id
tests:
- not_null # test the SOURCE, not just the model
- relationships:
to: source('users_raw', 'users')
field: user_id
- name: event_timestamp
tests:
- not_null
- no_future_dates # custom test for leakage in source

The rule: test at every layer - sources, staging models, intermediate models, and final marts. :::

:::warning Test Failure Without Alerting

Running dbt test in a DAG but only checking the overall exit code is not enough. When tests fail, you need to know which tests failed and why - not just that something went wrong. Configure your orchestrator to:

  1. Capture dbt test output and store it
  2. Parse target/run_results.json to extract specific failures
  3. Send failures to Slack/PagerDuty with the model name, test name, and failure count
  4. Store historical test results so you can track when a test first started failing :::

Interview Q&A

Q1: What is the difference between a generic test and a singular test in dbt, and when do you use each?

Generic tests are parameterized macros defined in YAML - you apply them to columns in schema.yml with parameters like accepted_values: ['free', 'pro']. They're reusable across models and auto-generate SQL that dbt runs. Built-in examples are not_null, unique, accepted_values, relationships. Custom generic tests are Jinja macros in the macros/ directory. Use generic tests for structural properties: null checks, uniqueness, value set membership, referential integrity, or any check that applies to a single column and can be parameterized.

Singular tests are standalone SQL files in tests/. The test fails if the query returns any rows. Use singular tests for complex business logic: checks that require joining multiple tables, multi-column aggregations, business rules that don't fit a parameterizable template (like "churned users should have zero activity in the 30 days after churn date"). The decision rule: if you can express the check as a parameterized column check, use generic. If you need a full SQL query with JOINs and complex logic, use singular.

Q2: How do you use dbt source freshness tests, and why are they critical for ML pipelines?

Source freshness tests check how recently the raw source tables were updated, using a timestamp column you specify with loaded_at_field. You configure warn_after and error_after thresholds per source. Running dbt source freshness queries the max value of the timestamp column and compares it to the current time. For ML pipelines, freshness is critical because a model trained on features that are 48 hours stale makes predictions based on outdated user state. Real-time serving models are especially sensitive: if the feature store is stale, the model receives inputs that don't match the distribution it was trained on, degrading performance. Configure freshness error thresholds to match your model's actual staleness tolerance - not whatever is convenient for the data pipeline.

Q3: How do you implement a "slim CI" workflow with dbt to avoid running all tests on every commit?

Use dbt's state:modified+ selector with --defer and --state. In CI, you download the production manifest.json (the artifact that records what was last built in production) and run: dbt build --select state:modified+ --defer --state prod-artifacts/. The state:modified+ selector builds only models that changed since the last production run, plus their downstream dependents. --defer makes dbt use production tables for any unmodified upstream dependencies instead of rebuilding them in CI. This means a one-line change to a single model runs only that model's tests and the tests of models downstream of it, rather than the full suite - reducing CI time from 45 minutes to 2-5 minutes for typical changes.

Q4: How do you prevent dbt tests from becoming a bottleneck that teams ignore?

Two failure modes to avoid: tests that are too slow (engineers skip them), and tests that fail too often on acceptable variation (alert fatigue). For speed: use state:modified+ in CI, run expensive singular tests on a schedule rather than every commit, and push aggregate computations to the warehouse rather than Python. For alert fatigue: separate tests into error-severity (hard blocks) and warn-severity (soft alerts), and be disciplined about what belongs in each category. Error tests should fail rarely and always require action. Warn tests should surface trends but not stop deployment. Also track test failure history - if a warn-severity test fires every day for two weeks, either fix the underlying data issue or adjust the threshold to reflect the actual acceptable range.

Q5: A dbt model's unique test is passing, but the ML feature store has duplicate rows. How?

The unique test runs against the materialized model in the database. If the model is incremental, duplicate rows can arise from:

  1. Backfill runs that overlap with incremental runs - if the same date range is processed twice, the incremental logic might append rather than replace
  2. The unique test running on a different grain - the test checks user_id uniqueness globally, but the grain is (user_id, partition_date). Multiple rows with the same user_id but different dates all pass the unique test
  3. The test running before a full refresh - if you last ran dbt build in full-refresh mode and then only ran incremental, the unique test from the previous run passed but subsequent increments added duplicates

Fix: test the composite grain with dbt_utils.unique_combination_of_columns, run tests after every build (not just after full refreshes), and add a separate test that checks for exact row counts per partition date.

Q6: How do you implement a dbt test that detects label leakage in training data?

Write a singular test that compares the feature snapshot timestamp to the label event timestamp. Any row where feature_snapshot_date > label_event_date is a potential leakage case:

-- tests/no_feature_leakage.sql
select
example_id,
feature_snapshot_date,
label_event_date,
datediff(day, label_event_date, feature_snapshot_date) as days_ahead
from {{ ref('churn_training_dataset') }}
where feature_snapshot_date > label_event_date

Apply this as an error-severity test so it blocks training. Also add a custom generic test no_future_dates on individual feature timestamp columns to catch the case where a feature column contains future timestamps at the row level. Run both tests in the DAG before any training trigger. The combination of row-level and aggregate-level checks catches both systematic leakage (entire batch of features from the future) and row-level leakage (individual rows with bad timestamps).

© 2026 EngineersOfAI. All rights reserved.