Feature Validation and Testing
The Bug That Ran for Three Weeks
The customer lifetime value model had been in production for four months. CLV predictions drove personalization: high-CLV customers received premium support routing, early access to sales, and higher-value promotional offers. The model was performing well - or so everyone thought.
A data scientist was doing routine feature exploration for a new model when she noticed something in the feature distribution dashboard. The days_since_last_purchase feature, one of the top three predictors in the CLV model, showed a spike in NaN values starting exactly 21 days ago. Not a few nulls - 2.1% of all customer records, every day for three weeks.
She traced the NaN back to its source. A data engineering team had refactored the purchase events pipeline three weeks earlier. As part of the refactor, they had renamed a column in the events table - transaction_date became purchase_timestamp. The feature pipeline joined on transaction_date, which now returned null for all records. The join silently failed and propagated NaN through every downstream computation that depended on days_since_last_purchase.
The feature pipeline had no schema validation. There were no unit tests. The monitoring system watched the model's prediction distribution, not the feature distributions. The column rename was never communicated to the ML platform team. The result: a 2% NaN rate silently degraded CLV model quality for 21 days. The business impact was real - some low-CLV customers had been over-served while some high-CLV customers had been under-served.
The fix: add the correct column name, re-run the pipeline for the affected dates, and rebuild the affected predictions. The prevention: feature validation and testing.
:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Contracts & Schema Validation demo on the EngineersOfAI Playground - no code required. :::
Why This Exists: The Silent Failure Problem
Software bugs in application code usually fail loudly: exceptions are thrown, services crash, error rates spike. Bugs in ML feature pipelines often fail silently. The pipeline completes successfully, the feature values are written to storage, and the model continues to produce predictions - just from degraded input data. If no one is looking at feature distributions, the degradation is invisible.
This happens because feature pipelines sit at the boundary between data engineering and ML. They are not simple functions that can be unit-tested with obvious expected outputs. Their correctness depends on statistical properties of data: distributions, ranges, cardinalities, completeness rates. These properties are not checked by standard software tests.
Feature validation and testing is the discipline of making feature quality explicit and verifiable:
- Schema validation ensures features have the expected data types and column names
- Statistical validation ensures feature values are within expected ranges and distributions
- Unit tests verify the computation logic of individual feature transformations
- Integration tests verify the end-to-end pipeline from raw data to feature output
- Monitoring detects drift or degradation in production features before it impacts models
Historical Context
The concept of "data testing" as a software engineering practice was advocated by Jeff Leek and Roger Peng in their 2015 essay on reproducible data science, which called for treating data cleaning and feature engineering as code subject to version control and testing.
Great Expectations, the dominant open-source data validation library, was released in 2019. It introduced the concept of "expectations" - assertions about data that can be defined in code, versioned, and run automatically. The name comes from the idea that a data scientist forms expectations about what valid data looks like, and the library makes those expectations executable.
Pandera (released 2019) focused specifically on schema-level validation for pandas DataFrames, with a declarative API that makes schema definitions readable and maintainable.
The MLOps community's emphasis on "data validation as a first-class ML concern" was crystallized in the TensorFlow Extended (TFX) paper (2017), which described how Google's ML production system validates features automatically as part of every training and serving pipeline.
Core Concepts
Feature Validation Framework
Feature validation operates at three levels:
- Schema validation: Does the feature have the expected data type? Do all expected columns exist? Are there unexpected columns?
- Statistical validation: Are values within expected ranges? Is the null rate below threshold? Is the distribution consistent with historical baselines?
- Business rule validation: Does the feature satisfy domain-specific constraints? (e.g.,
agemust be positive,spendcannot be negative)
import pandas as pd
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
from enum import Enum
class ValidationSeverity(Enum):
WARN = "WARN" # log warning, continue
ERROR = "ERROR" # fail pipeline
@dataclass
class FeatureExpectation:
"""A single validation rule for a feature."""
feature_name: str
check_type: str # "not_null", "range", "dtype", "cardinality", "regex"
severity: ValidationSeverity = ValidationSeverity.ERROR
params: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ValidationResult:
feature_name: str
check_type: str
passed: bool
severity: ValidationSeverity
message: str
observed_value: Any = None
class FeatureValidator:
"""
Validates a feature DataFrame against a set of expectations.
"""
def __init__(self, expectations: List[FeatureExpectation]):
self.expectations = expectations
def validate(self, df: pd.DataFrame) -> List[ValidationResult]:
results = []
for exp in self.expectations:
if exp.feature_name not in df.columns:
results.append(ValidationResult(
feature_name=exp.feature_name,
check_type="column_exists",
passed=False,
severity=ValidationSeverity.ERROR,
message=f"Column '{exp.feature_name}' not found in DataFrame"
))
continue
series = df[exp.feature_name]
if exp.check_type == "not_null":
null_rate = series.isnull().mean()
max_null = exp.params.get("max_null_rate", 0.01)
passed = null_rate <= max_null
results.append(ValidationResult(
feature_name=exp.feature_name,
check_type="not_null",
passed=passed,
severity=exp.severity,
message=f"Null rate {null_rate:.3%} {'<=' if passed else '>'} threshold {max_null:.3%}",
observed_value=null_rate
))
elif exp.check_type == "range":
min_val = exp.params.get("min")
max_val = exp.params.get("max")
below_min = (series < min_val).sum() if min_val is not None else 0
above_max = (series > max_val).sum() if max_val is not None else 0
violations = below_min + above_max
passed = violations == 0
results.append(ValidationResult(
feature_name=exp.feature_name,
check_type="range",
passed=passed,
severity=exp.severity,
message=f"{violations} values outside [{min_val}, {max_val}]",
observed_value={"below_min": int(below_min), "above_max": int(above_max)}
))
elif exp.check_type == "dtype":
expected_dtype = exp.params["dtype"]
actual_dtype = str(series.dtype)
passed = actual_dtype == expected_dtype
results.append(ValidationResult(
feature_name=exp.feature_name,
check_type="dtype",
passed=passed,
severity=exp.severity,
message=f"Expected dtype {expected_dtype}, got {actual_dtype}",
observed_value=actual_dtype
))
elif exp.check_type == "cardinality":
actual_card = series.nunique()
min_card = exp.params.get("min_unique")
max_card = exp.params.get("max_unique")
passed = True
if min_card is not None and actual_card < min_card:
passed = False
if max_card is not None and actual_card > max_card:
passed = False
results.append(ValidationResult(
feature_name=exp.feature_name,
check_type="cardinality",
passed=passed,
severity=exp.severity,
message=f"Cardinality {actual_card}, expected [{min_card}, {max_card}]",
observed_value=actual_card
))
return results
def validate_and_raise(self, df: pd.DataFrame) -> List[ValidationResult]:
"""Run validation. Raise on any ERROR-severity failures."""
results = self.validate(df)
errors = [r for r in results if not r.passed and r.severity == ValidationSeverity.ERROR]
warnings = [r for r in results if not r.passed and r.severity == ValidationSeverity.WARN]
for w in warnings:
print(f"[WARN] {w.feature_name}: {w.message}")
if errors:
error_messages = "\n".join(f" - {e.feature_name}: {e.message}" for e in errors)
raise ValueError(f"Feature validation failed:\n{error_messages}")
return results
# Define expectations for the CLV feature pipeline
clv_expectations = [
FeatureExpectation("days_since_last_purchase", "not_null",
ValidationSeverity.ERROR, {"max_null_rate": 0.005}),
FeatureExpectation("days_since_last_purchase", "range",
ValidationSeverity.ERROR, {"min": 0, "max": 3650}),
FeatureExpectation("spend_30d", "not_null",
ValidationSeverity.ERROR, {"max_null_rate": 0.01}),
FeatureExpectation("spend_30d", "range",
ValidationSeverity.WARN, {"min": 0}), # negative spend is suspicious
FeatureExpectation("customer_segment", "cardinality",
ValidationSeverity.ERROR, {"min_unique": 3, "max_unique": 10}),
FeatureExpectation("customer_segment", "dtype",
ValidationSeverity.ERROR, {"dtype": "object"}),
]
validator = FeatureValidator(clv_expectations)
Feature Unit Tests
Unit tests verify that individual feature transformation functions produce the correct output for known inputs.
import pytest
import pandas as pd
import numpy as np
# The feature function being tested
def compute_days_since_last_purchase(
events_df: pd.DataFrame,
as_of_date: pd.Timestamp,
customer_id_col: str = "customer_id",
event_date_col: str = "purchase_timestamp" # note: new column name after refactor
) -> pd.DataFrame:
"""Compute days since last purchase for each customer as of a reference date."""
# Filter events before as_of_date (no future leakage)
valid_events = events_df[events_df[event_date_col] <= as_of_date]
last_purchase = valid_events.groupby(customer_id_col)[event_date_col].max().reset_index()
last_purchase.columns = [customer_id_col, "last_purchase_date"]
last_purchase["days_since_last_purchase"] = (
as_of_date - last_purchase["last_purchase_date"]
).dt.days
return last_purchase
class TestDaysSinceLastPurchase:
"""Unit tests for the days_since_last_purchase feature."""
def test_basic_computation(self):
events = pd.DataFrame({
"customer_id": ["c1", "c1", "c2"],
"purchase_timestamp": pd.to_datetime([
"2024-01-10", "2024-01-20", "2024-01-15"
])
})
as_of = pd.Timestamp("2024-02-01")
result = compute_days_since_last_purchase(events, as_of)
c1_row = result[result["customer_id"] == "c1"].iloc[0]
c2_row = result[result["customer_id"] == "c2"].iloc[0]
assert c1_row["days_since_last_purchase"] == 12 # Jan 20 to Feb 1
assert c2_row["days_since_last_purchase"] == 17 # Jan 15 to Feb 1
def test_no_future_leakage(self):
"""Events after as_of_date must not influence the result."""
events = pd.DataFrame({
"customer_id": ["c1", "c1"],
"purchase_timestamp": pd.to_datetime([
"2024-01-10",
"2024-02-10" # This is in the future relative to as_of
])
})
as_of = pd.Timestamp("2024-01-31")
result = compute_days_since_last_purchase(events, as_of)
# Only the Jan 10 event should count
assert result.iloc[0]["days_since_last_purchase"] == 21
def test_customer_with_no_purchases_not_in_output(self):
"""Customers with no purchases before as_of_date should not appear."""
events = pd.DataFrame({
"customer_id": ["c1"],
"purchase_timestamp": pd.to_datetime(["2024-02-15"])
})
as_of = pd.Timestamp("2024-01-31")
result = compute_days_since_last_purchase(events, as_of)
assert len(result) == 0
def test_non_negative_output(self):
"""Days since last purchase must always be >= 0."""
events = pd.DataFrame({
"customer_id": ["c1", "c2", "c3"],
"purchase_timestamp": pd.to_datetime([
"2024-01-01", "2024-01-15", "2024-01-30"
])
})
as_of = pd.Timestamp("2024-01-31")
result = compute_days_since_last_purchase(events, as_of)
assert (result["days_since_last_purchase"] >= 0).all()
Integration Tests for Feature Pipelines
Integration tests verify the end-to-end pipeline from raw data to feature output, catching bugs that individual unit tests miss - especially schema mismatches and join errors.
import tempfile
import pandas as pd
import numpy as np
from pathlib import Path
class TestCLVFeaturePipeline:
"""
Integration tests for the full CLV feature pipeline.
Uses a small synthetic dataset to verify end-to-end behavior.
"""
@pytest.fixture
def synthetic_raw_data(self, tmp_path):
"""Create synthetic raw data files in a temp directory."""
# Customer table
customers = pd.DataFrame({
"customer_id": ["c1", "c2", "c3", "c4"],
"signup_date": pd.to_datetime(["2022-01-01", "2022-06-15",
"2023-03-10", "2023-11-01"]),
"customer_segment": ["premium", "standard", "premium", "standard"]
})
# Purchase events table (new schema: purchase_timestamp)
purchases = pd.DataFrame({
"customer_id": ["c1", "c1", "c2", "c3"],
"purchase_timestamp": pd.to_datetime([ # Note: new column name
"2024-01-10", "2024-01-20", "2024-01-05", "2024-01-25"
]),
"amount": [120.0, 85.0, 45.0, 200.0]
})
customers.to_parquet(tmp_path / "customers.parquet")
purchases.to_parquet(tmp_path / "purchases.parquet")
return tmp_path
def test_pipeline_produces_expected_schema(self, synthetic_raw_data):
"""Pipeline output must have all expected feature columns."""
from clv_pipeline import run_clv_feature_pipeline # hypothetical module
output_df = run_clv_feature_pipeline(
data_dir=synthetic_raw_data,
as_of_date=pd.Timestamp("2024-02-01")
)
expected_columns = [
"customer_id", "days_since_last_purchase",
"spend_30d", "txn_count_30d", "customer_segment",
"tenure_days"
]
for col in expected_columns:
assert col in output_df.columns, f"Missing expected column: {col}"
def test_no_unexpected_nulls(self, synthetic_raw_data):
"""Customers with purchase history must have non-null features."""
from clv_pipeline import run_clv_feature_pipeline
output_df = run_clv_feature_pipeline(
data_dir=synthetic_raw_data,
as_of_date=pd.Timestamp("2024-02-01")
)
# c4 has no purchases, so their days_since_last_purchase may be null
# c1, c2, c3 all have purchases - must be non-null
customers_with_purchases = ["c1", "c2", "c3"]
filtered = output_df[output_df["customer_id"].isin(customers_with_purchases)]
assert filtered["days_since_last_purchase"].isnull().sum() == 0
def test_record_count_matches_customers(self, synthetic_raw_data):
"""Output must have exactly one row per customer."""
from clv_pipeline import run_clv_feature_pipeline
output_df = run_clv_feature_pipeline(
data_dir=synthetic_raw_data,
as_of_date=pd.Timestamp("2024-02-01")
)
assert len(output_df) == 4 # 4 customers in synthetic data
assert output_df["customer_id"].nunique() == 4
Great Expectations in Production
Great Expectations provides a rich DSL for defining data expectations and generating data documentation (called "data docs").
import great_expectations as gx
def create_feature_expectation_suite(suite_name: str = "clv_features"):
"""
Create a Great Expectations suite for CLV feature validation.
"""
context = gx.get_context()
# Create expectation suite
suite = context.suites.add(
gx.ExpectationSuite(name=suite_name)
)
# Define expectations
expectations = [
# Schema expectations
gx.expectations.ExpectColumnToExist(column="customer_id"),
gx.expectations.ExpectColumnToExist(column="days_since_last_purchase"),
gx.expectations.ExpectColumnToExist(column="spend_30d"),
# Null rate expectations
gx.expectations.ExpectColumnValuesToNotBeNull(
column="customer_id"
),
gx.expectations.ExpectColumnValuesToNotBeNull(
column="days_since_last_purchase",
mostly=0.995 # allow up to 0.5% nulls
),
# Range expectations
gx.expectations.ExpectColumnValuesToBeBetween(
column="days_since_last_purchase",
min_value=0,
max_value=3650
),
gx.expectations.ExpectColumnValuesToBeBetween(
column="spend_30d",
min_value=0
),
# Cardinality expectations
gx.expectations.ExpectColumnUniqueValueCountToBeBetween(
column="customer_segment",
min_value=3,
max_value=10
),
# Distribution expectations (set thresholds from historical baselines)
gx.expectations.ExpectColumnMeanToBeBetween(
column="days_since_last_purchase",
min_value=10,
max_value=200
),
]
for exp in expectations:
suite.add_expectation(exp)
return suite
Feature Monitoring in Pipelines
Validation runs at pipeline execution time. Monitoring runs continuously in production, watching feature distributions over time.
from scipy import stats
import json
def compute_feature_statistics(series: pd.Series) -> dict:
"""Compute summary statistics for feature monitoring."""
non_null = series.dropna()
return {
"count": len(series),
"null_rate": series.isnull().mean(),
"mean": float(non_null.mean()) if len(non_null) > 0 else None,
"std": float(non_null.std()) if len(non_null) > 0 else None,
"min": float(non_null.min()) if len(non_null) > 0 else None,
"p25": float(non_null.quantile(0.25)) if len(non_null) > 0 else None,
"p50": float(non_null.quantile(0.50)) if len(non_null) > 0 else None,
"p75": float(non_null.quantile(0.75)) if len(non_null) > 0 else None,
"max": float(non_null.max()) if len(non_null) > 0 else None,
}
def detect_distribution_shift(
current: pd.Series,
reference: pd.Series,
alpha: float = 0.05
) -> dict:
"""
Detect distribution shift using KS test and mean comparison.
Returns a dict with test results and a severity assessment.
"""
current_clean = current.dropna()
reference_clean = reference.dropna()
# KS test
ks_stat, ks_pval = stats.ks_2samp(current_clean, reference_clean)
# Mean shift
mean_diff_pct = abs(current_clean.mean() - reference_clean.mean()) / (abs(reference_clean.mean()) + 1e-9)
# Null rate change
null_rate_change = abs(current.isnull().mean() - reference.isnull().mean())
return {
"ks_statistic": ks_stat,
"ks_p_value": ks_pval,
"is_shifted_ks": ks_pval < alpha,
"mean_diff_pct": mean_diff_pct,
"null_rate_change": null_rate_change,
"severity": (
"CRITICAL" if null_rate_change > 0.05 or mean_diff_pct > 0.5 else
"HIGH" if ks_pval < 0.001 else
"MEDIUM" if ks_pval < alpha else
"NONE"
)
}
Production Engineering Notes
CI integration: Run feature unit tests and schema validation in CI on every pipeline code change. A commit that breaks feature computation should fail the CI pipeline before deployment.
Baseline capture: To detect drift, you need a reference distribution. Capture feature statistics (mean, std, quantiles, null rate) when a model is first deployed. Use these as the "approved baseline" for subsequent drift detection.
Alerting thresholds: Set alert thresholds conservatively at first - you will get many false alarms if you alert on every 5% distribution shift. Tune based on how much shift actually correlates with production metric degradation in your specific use case.
Common Mistakes
:::danger No tests for schema changes
The most common feature pipeline failure is a silent column rename in an upstream data source. Without schema validation, the pipeline completes successfully with null values everywhere. Add schema tests that run on every pipeline execution. Catch the KeyError or the silent null join before the features are written to the feature store.
:::
:::danger Testing only the happy path Feature pipeline unit tests that only verify correct behavior on clean, complete data miss the cases that matter most in production: nulls, empty dataframes, all-null columns, negative values where only positive are valid, extreme outliers. Write explicit tests for edge cases and boundary conditions. :::
:::warning Monitoring model outputs instead of features Model output monitoring (watching prediction distributions) catches degradation late. By the time the model's output distribution shifts measurably, the model has already been degraded for days or weeks. Monitor feature distributions first - a feature distribution shift is an early warning that model quality will degrade. :::
:::tip Run validation as part of the pipeline, not after it Validation that runs as a post-hoc check catches failures after data has already been written to the feature store. Run validation inline, before writing outputs, and fail the pipeline immediately on error. This prevents corrupted features from being served. :::
Interview Q&A
Q: What is the difference between feature validation and feature monitoring?
A: Feature validation is a point-in-time check run at pipeline execution time - it asserts that the features produced by today's pipeline run satisfy defined expectations (null rates, ranges, schema). If validation fails, the pipeline fails. Feature monitoring is a continuous process that watches feature distributions over time in production - it detects statistical drift by comparing current distributions against a historical baseline. Validation catches pipeline bugs immediately; monitoring catches gradual data drift that pipelines cannot detect. You need both.
Q: How do you test a feature pipeline that reads from external data sources?
A: Use test fixtures with synthetic data that you control. Create small, curated datasets that cover the expected cases (normal data, nulls, edge cases) and test the pipeline logic against those rather than against live production data. For integration tests that must verify end-to-end behavior including database reads, use a test database or a local file-based substitute. Mock the data access layer so that unit tests of transformation logic don't require any external connections. Never run tests against the production database - use a staging environment with a copy of a recent snapshot.
Q: A feature that was previously stable suddenly has 15% null rate. How do you investigate?
A: Start by pinpointing when the null rate changed - look at daily feature statistics going back until the null rate was at its baseline. The change date often correlates with a deployment or schema change. Then trace backward through the computation graph: which upstream table does this feature join to? Check if that table's schema changed (column rename, type change), if a filter was added that excludes more records, or if the join key changed. Use the pipeline's execution logs to find any errors that were swallowed. Run the pipeline manually on yesterday's data and print intermediate DataFrames to find where nulls are introduced. In the CLV example from this lesson, the null appeared exactly at the join to the purchase events table - tracing one step back to that table immediately revealed the column rename.
Q: How do you decide which features to monitor in production and what thresholds to set?
A: Monitor all features used by production models. For threshold setting: start with the null rate (alert if it increases by more than 2× the historical baseline), the KS test (alert at p less than 0.001 for a conservative threshold that avoids false alarms), and mean shift greater than 20% of the feature's standard deviation. These are starting points - tune based on your use case. For features that are top predictors by SHAP value, set tighter thresholds. For less important features, wider. Run the monitoring in shadow mode for 2–4 weeks before enabling alerts, to calibrate thresholds against real variation. Correlate alert triggers with actual model performance degradation - eliminate alerts that trigger without corresponding degradation (they will be ignored over time).
Q: How would you add Great Expectations validation to a production Spark pipeline?
A: Great Expectations has native Spark integration. You create a SparkDFDataset from a Spark DataFrame and run your expectation suite against it. The validation runs in the Spark context, distributed across executors. For production pipelines: define expectation suites in a version-controlled YAML or JSON file, run validation as a pipeline step before the write action, push validation results to a data docs site for observability, and fail the pipeline if any ERROR-severity expectations fail. The pipeline code calls context.run_checkpoint(checkpoint_name) with the current batch data, and the result is a CheckpointResult you can inspect programmatically. Store validation results alongside feature data so you can audit quality for any historical date.
