Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required. :::

Data Quality and Validation for ML

Bad data does not announce itself. It arrives silently, trains your model, corrupts your features, and shows up as a 15% drop in production metrics three weeks later - by which time it has touched every layer of your system.

The Production Moment

The recommendation model had been working well for six months. Then the metrics started a slow, silent decline. Click-through rate dropped 3% in week one. Four more percent in week two. The ML team spent a week retraining the model with different hyperparameters, trying new architectures, A/B testing feature combinations. Nothing helped.

In week three, a data engineer noticed that the session_count_7d feature - number of user sessions in the last 7 days - had a median value of 0. For six months, the median had been 2.3.

Root cause: three weeks earlier, the session tracking service had deployed a new version that started writing session IDs in a different format. The feature pipeline joined on session_id using an exact string match. The new-format session IDs never matched, so all users appeared to have zero sessions. The model had been trained to weight session count as a strong recency signal. With it zeroed out, recommendations degraded.

The feature pipeline had no data quality checks. It had no schema validation. It had no distribution monitoring. It produced a DataFrame, wrote it to S3, and never verified that the output made sense.

The fix was two lines of code: a nullable check and a median value assertion. Had they existed, the incident would have been caught the morning of the session tracking deployment and resolved in an hour. Instead, it cost three weeks of degraded recommendations.

Why Data Quality Is the Number-One ML Failure Mode

Google's seminal paper "Hidden Technical Debt in Machine Learning Systems" (Sculley et al., NeurIPS 2015) identified data dependency as the primary source of ML technical debt. The subsequent decade of ML production experience has consistently confirmed this: most production ML failures trace to data, not model architecture.

The reasons data quality failures are dangerous for ML:

Silent degradation: A null value in a feature column does not throw an exception - it silently turns the feature to 0 (or NaN, or mean imputed), changing the model's input distribution without any error. The model continues serving, just worse.

Temporal distance: Data quality issues at the feature engineering stage show up as model degradation during serving - which is often 1–7 days later (after model training and deployment). By then the root cause is buried in logs.

Cascading effects: A single upstream schema change can corrupt dozens of downstream features, which corrupt every model that uses those features.

No automated testing: Software systems have unit tests that catch breaking changes immediately. Data pipelines typically have no equivalent - features are computed and written without assertions about the output.

The solution is a systematic data quality framework applied at every stage of the ML pipeline.

Data Quality Dimensions

Data quality is not a single property - it has five distinct dimensions, each with different failure modes and detection methods.

Completeness

Are all expected records present? Are key fields populated?

from pyspark.sql import SparkSession, functions as F

def check_completeness(df, required_columns: list[str],
min_row_count: int, entity_col: str) -> dict:
"""Check that all required data is present."""
issues = []
row_count = df.count()

# Row count check: did the pipeline produce a reasonable number of rows?
if row_count < min_row_count:
issues.append({
"check": "row_count",
"expected": f">= {min_row_count}",
"actual": row_count,
"severity": "critical"
})

# Null check: key features should not be null
for col in required_columns:
null_count = df.filter(df[col].isNull()).count()
null_rate = null_count / max(row_count, 1)
if null_rate > 0.01: # Alert if >1% null
issues.append({
"check": f"null_rate:{col}",
"expected": "< 1%",
"actual": f"{null_rate:.2%}",
"severity": "high" if null_rate > 0.05 else "medium"
})

# Uniqueness: entity_col should not have unexpected duplicates
distinct_count = df.select(entity_col).distinct().count()
duplicate_rate = 1 - (distinct_count / max(row_count, 1))
if duplicate_rate > 0.001: # Alert if >0.1% duplicates
issues.append({
"check": f"duplicates:{entity_col}",
"expected": "< 0.1%",
"actual": f"{duplicate_rate:.3%}",
"severity": "medium"
})

return {"passed": len(issues) == 0, "issues": issues, "row_count": row_count}

Accuracy

Are values within expected ranges and semantically correct?

def check_value_ranges(df, feature_specs: dict) -> list[dict]:
"""
Check that feature values fall within expected ranges.
feature_specs: {
"age": {"min": 0, "max": 120},
"amount": {"min": 0.01, "max": 100000},
"country_code": {"allowed_values": ["US", "GB", "DE", "FR"]}
}
"""
issues = []
row_count = df.count()

for feature_name, spec in feature_specs.items():
if "min" in spec and "max" in spec:
out_of_range = df.filter(
(F.col(feature_name) < spec["min"]) |
(F.col(feature_name) > spec["max"])
).count()
out_of_range_rate = out_of_range / max(row_count, 1)
if out_of_range_rate > 0:
issues.append({
"feature": feature_name,
"check": "range",
"expected": f"[{spec['min']}, {spec['max']}]",
"out_of_range_rows": out_of_range,
"out_of_range_rate": out_of_range_rate
})

if "allowed_values" in spec:
invalid = df.filter(
~F.col(feature_name).isin(spec["allowed_values"])
).count()
if invalid > 0:
issues.append({
"feature": feature_name,
"check": "allowed_values",
"invalid_rows": invalid
})

return issues

Consistency

Do related fields agree with each other?

def check_consistency(df) -> list[dict]:
"""Check inter-field consistency rules."""
issues = []

# Purchase amount > 0 whenever event_type = 'purchase'
inconsistent_purchase = df.filter(
(F.col("event_type") == "purchase") &
((F.col("amount").isNull()) | (F.col("amount") <= 0))
).count()
if inconsistent_purchase > 0:
issues.append({
"check": "purchase_amount_positive",
"failed_rows": inconsistent_purchase,
"description": "Purchase events must have positive amount"
})

# No future timestamps
future_events = df.filter(
F.col("event_timestamp") > F.current_timestamp()
).count()
if future_events > 0:
issues.append({
"check": "no_future_timestamps",
"failed_rows": future_events
})

return issues

Timeliness

Is the data fresh enough?

from datetime import datetime, timezone

def check_timeliness(df, timestamp_col: str, max_lag_hours: float = 24.0) -> dict:
"""Check that data is within the expected freshness window."""
latest_ts = df.agg(F.max(timestamp_col)).collect()[0][0]
now = datetime.now(timezone.utc)

if latest_ts:
lag_hours = (now - latest_ts.replace(tzinfo=timezone.utc)).total_seconds() / 3600
return {
"check": "timeliness",
"latest_timestamp": latest_ts.isoformat(),
"lag_hours": lag_hours,
"max_allowed_hours": max_lag_hours,
"passed": lag_hours <= max_lag_hours,
"severity": "critical" if lag_hours > max_lag_hours * 2 else "warning"
}
return {"check": "timeliness", "passed": False, "reason": "No timestamps found"}

Great Expectations: Data Quality as Code

Great Expectations (GE) is the standard open-source library for defining and running data quality tests. The core concept: expectations - assertions about data that can be tested automatically and re-run on every pipeline execution.

import great_expectations as gx
from great_expectations.dataset import SparkDFDataset

def validate_ml_features_with_ge(spark_df) -> dict:
"""
Run comprehensive data quality validation using Great Expectations.
Returns a validation report suitable for pipeline gating decisions.
"""
ge_dataset = SparkDFDataset(spark_df)
validation_results = []

# Completeness
validation_results.append(ge_dataset.expect_column_to_exist("user_id"))
validation_results.append(ge_dataset.expect_column_values_to_not_be_null("user_id"))
validation_results.append(
ge_dataset.expect_column_values_to_not_be_null("tx_count_30d", mostly=0.99)
)

# Range checks
validation_results.append(
ge_dataset.expect_column_values_to_be_between("tx_count_30d", min_value=0, max_value=10000)
)
validation_results.append(
ge_dataset.expect_column_values_to_be_between(
"avg_amount_30d", min_value=0.0, max_value=100000.0, mostly=0.999
)
)

# Distribution expectations (based on historical baseline)
# This is the check that would have caught the session_count_7d bug
validation_results.append(
ge_dataset.expect_column_median_to_be_between(
"session_count_7d",
min_value=1.5, # Historically ~2.3, alert if drops below 1.5
max_value=6.0
)
)

# Row count
validation_results.append(
ge_dataset.expect_table_row_count_to_be_between(
min_value=1_000_000,
max_value=200_000_000
)
)

# Categorical constraints
validation_results.append(
ge_dataset.expect_column_values_to_be_in_set(
"device_type", value_set=["mobile", "desktop", "tablet", "unknown"]
)
)

passed = sum(1 for r in validation_results if r.success)
failed = sum(1 for r in validation_results if not r.success)
failed_checks = [str(r.expectation_config) for r in validation_results if not r.success]

return {
"total_checks": len(validation_results),
"passed": passed,
"failed": failed,
"failed_checks": failed_checks,
"gate_decision": "proceed" if failed == 0 else "halt"
}

def feature_engineering_pipeline(**kwargs):
"""Airflow task: compute features, validate, then write."""
spark = SparkSession.getActiveSession()
features_df = compute_user_features(spark)

# Quality gate: validate BEFORE writing
validation = validate_ml_features_with_ge(features_df)
if validation["gate_decision"] == "halt":
raise ValueError(f"Data quality validation failed: {validation['failed_checks']}")

# Only write if all quality checks pass
features_df.write.format("delta").mode("overwrite").save("s3://lake/features/users/")
print(f"Features written. {validation['passed']}/{validation['total_checks']} checks passed")

:::tip Building Expectation Suites Generate initial expectations from a known-good reference dataset: gx.from_pandas(reference_df).profile() auto-generates expectations for all columns. Then review and adjust thresholds. This gives you a working expectation suite in 30 minutes rather than writing 50 expectations by hand. :::

Distribution Monitoring: Detecting Feature Drift

Schema validation catches structural problems. Distribution monitoring catches semantic problems - values that are structurally valid but statistically unusual.

Population Stability Index (PSI)

PSI is the standard metric for detecting distribution shift between training and serving:

PSI=i=1N(PiQi)×ln(PiQi)PSI = \sum_{i=1}^{N} (P_i - Q_i) \times \ln\left(\frac{P_i}{Q_i}\right)

Where PiP_i is the fraction of values in bucket ii for the reference (training) distribution and QiQ_i is the fraction for the current (serving) distribution.

PSI interpretation:

  • PSI below 0.1: No significant change
  • 0.1 to 0.2: Small shift - monitor closely
  • PSI above 0.2: Significant shift - investigate and consider retraining
import numpy as np
from scipy import stats

def compute_psi(reference: np.ndarray, current: np.ndarray,
num_bins: int = 10) -> float:
"""
Compute Population Stability Index between reference and current distributions.
reference: training data feature values (the baseline)
current: recent serving data feature values
"""
# Create bins based on reference distribution percentiles (equal-frequency binning)
percentiles = np.linspace(0, 100, num_bins + 1)
bin_edges = np.percentile(reference, percentiles)
bin_edges = np.unique(bin_edges) # Remove duplicate edges from ties

if len(bin_edges) < 2:
return 0.0

# Bin the data
ref_counts, _ = np.histogram(reference, bins=bin_edges)
curr_counts, _ = np.histogram(current, bins=bin_edges)

# Convert to proportions with epsilon to avoid log(0)
epsilon = 1e-10
ref_prop = (ref_counts + epsilon) / len(reference)
curr_prop = (curr_counts + epsilon) / len(current)

psi = np.sum((curr_prop - ref_prop) * np.log(curr_prop / ref_prop))
return float(psi)

class FeatureDriftMonitor:
"""
Monitor ML features for distribution drift compared to training baseline.
Run daily - comparing yesterday's serving distribution against training.
"""
def __init__(self, training_data: dict[str, np.ndarray],
psi_alert_threshold: float = 0.2):
self.training_distributions = training_data
self.psi_threshold = psi_alert_threshold

def monitor_batch(self, serving_sample: dict[str, np.ndarray]) -> dict:
"""Compare serving sample distributions against training baselines."""
results = {}
alerts = []

for feature_name, serving_values in serving_sample.items():
if feature_name not in self.training_distributions:
continue

training_values = self.training_distributions[feature_name]
psi = compute_psi(training_values, serving_values)

# KS test for additional signal
ks_stat, p_value = stats.ks_2samp(training_values, serving_values)

results[feature_name] = {
"psi": round(psi, 4),
"psi_severity": "high" if psi > 0.2 else "medium" if psi > 0.1 else "ok",
"ks_statistic": round(ks_stat, 4),
"ks_p_value": round(p_value, 4),
"training_mean": float(np.mean(training_values)),
"serving_mean": float(np.mean(serving_values)),
"training_p50": float(np.percentile(training_values, 50)),
"serving_p50": float(np.percentile(serving_values, 50)),
}

if psi > self.psi_threshold:
alerts.append({
"feature": feature_name,
"psi": psi,
"action": "investigate and consider retraining"
})

return {
"feature_results": results,
"drift_alerts": alerts,
"num_drifted_features": len(alerts)
}

# Usage in production monitoring pipeline
def daily_drift_check(spark, training_path: str, serving_log_path: str) -> None:
training_df = spark.read.parquet(training_path).sample(fraction=0.01).toPandas()
serving_df = spark.read.parquet(serving_log_path).sample(fraction=0.01).toPandas()

feature_cols = ["tx_count_30d", "avg_amount_30d", "session_count_7d", "days_active_30d"]

monitor = FeatureDriftMonitor(
training_data={col: training_df[col].dropna().values for col in feature_cols}
)

results = monitor.monitor_batch(
{col: serving_df[col].dropna().values for col in feature_cols}
)

for alert in results["drift_alerts"]:
print(f"DRIFT ALERT: {alert['feature']} PSI={alert['psi']:.3f} - {alert['action']}")

Data Contracts

A data contract is a formal agreement between a data producer (application service, ETL pipeline) and a data consumer (ML feature pipeline). It specifies the schema, quality guarantees, and SLAs the producer commits to maintaining.

Data contracts address a specific failure mode: a producer changes their data format or semantics - for perfectly valid application reasons - and silently breaks all downstream ML consumers.

from dataclasses import dataclass, field

@dataclass
class DataContract:
"""
Formal contract between a data producer and ML consumers.
Stored in a schema registry or version-controlled repository.
"""
version: str
producer: str # "payment-service"
topic_or_table: str # "transactions" Kafka topic
consumers: list[str] # ["fraud-model", "recommendation-model"]

# Schema: field definitions with types and constraints
# Example:
# {
# "user_id": {"type": "string", "nullable": False, "format": "UUID"},
# "amount": {"type": "float", "nullable": False, "min": 0.01, "max": 100000},
# "currency": {"type": "string", "nullable": False, "allowed": ["USD", "EUR", "GBP"]}
# }
schema: dict = field(default_factory=dict)

# Quality SLAs the producer commits to
completeness_sla: float = 0.999 # 99.9% non-null on required fields
latency_sla_minutes: float = 5.0 # Data arrives within 5 minutes of event
availability_sla: float = 0.999 # Pipeline available 99.9%

# Notification channels
slack_channel: str = "#data-quality-alerts"

class DataContractValidator:
"""Validate incoming data against a registered contract."""

def __init__(self, contract: DataContract):
self.contract = contract

def validate(self, df) -> dict:
"""Validate DataFrame against contract schema and quality SLAs."""
violations = []
row_count = df.count()

for field_name, field_spec in self.contract.schema.items():
# Check field existence
if field_name not in df.columns:
violations.append({
"field": field_name,
"violation": "MISSING_FIELD",
"severity": "critical"
})
continue

# Check nullability
if not field_spec.get("nullable", True):
null_count = df.filter(F.col(field_name).isNull()).count()
null_rate = null_count / max(row_count, 1)
if null_rate > (1 - self.contract.completeness_sla):
violations.append({
"field": field_name,
"violation": "NULL_RATE_EXCEEDS_SLA",
"null_rate": null_rate,
"severity": "high"
})

# Check allowed values
if "allowed" in field_spec:
invalid_count = df.filter(
~F.col(field_name).isin(field_spec["allowed"])
).count()
if invalid_count > 0:
violations.append({
"field": field_name,
"violation": "INVALID_VALUES",
"count": invalid_count,
"allowed": field_spec["allowed"],
"severity": "high"
})

critical = [v for v in violations if v["severity"] == "critical"]
return {
"contract_version": self.contract.version,
"total_violations": len(violations),
"critical_violations": len(critical),
"violations": violations,
"contract_satisfied": len(critical) == 0
}

Pipeline Quality Gates

Quality gates are checkpoints that halt ML pipeline execution if data quality drops below threshold. They are the CI/CD quality gates equivalent for data pipelines.

from enum import Enum
from dataclasses import dataclass

class QualityGateAction(Enum):
PROCEED = "proceed"
WARN = "warn"
HALT = "halt"

@dataclass
class QualityGate:
name: str
threshold: float
on_failure: QualityGateAction
description: str

class MLPipelineQualityGates:
def __init__(self, gates: list[QualityGate], alert_fn=None):
self.gates = {gate.name: gate for gate in gates}
self.alert_fn = alert_fn or print

def evaluate(self, gate_name: str, actual_value: float) -> QualityGateAction:
"""Evaluate a named quality gate. Returns action to take."""
gate = self.gates.get(gate_name)
if gate is None:
return QualityGateAction.PROCEED

if actual_value < gate.threshold:
message = (f"Quality gate FAILED: {gate_name}\n"
f"Expected >= {gate.threshold:.4f}, Got {actual_value:.4f}\n"
f"Description: {gate.description}")
self.alert_fn(message)
return gate.on_failure

return QualityGateAction.PROCEED

# Define quality gates
feature_gates = MLPipelineQualityGates([
QualityGate(
name="row_count_ratio",
threshold=0.8, # Must have >= 80% of yesterday's rows
on_failure=QualityGateAction.HALT,
description="Significant row count drop suggests upstream failure"
),
QualityGate(
name="feature_completeness",
threshold=0.99, # 99% of required fields must be non-null
on_failure=QualityGateAction.HALT,
description="High null rate in required features"
),
QualityGate(
name="median_session_count_min",
threshold=1.5, # Session count median must be > 1.5
on_failure=QualityGateAction.HALT,
description="Session count median has dropped - possible join failure"
),
])

# Usage in pipeline
def validated_feature_write(features_df, yesterday_row_count: int):
today_count = features_df.count()

# Gate 1: Row count
ratio = today_count / max(yesterday_row_count, 1)
action = feature_gates.evaluate("row_count_ratio", ratio)
if action == QualityGateAction.HALT:
raise ValueError(f"Row count gate failed: today={today_count}, yesterday={yesterday_row_count}")

# Gate 2: Completeness
null_rates = {col: features_df.filter(F.col(col).isNull()).count() / today_count
for col in ["user_id", "tx_count_30d", "session_count_7d"]}
completeness = 1 - max(null_rates.values())
action = feature_gates.evaluate("feature_completeness", completeness)
if action == QualityGateAction.HALT:
raise ValueError(f"Completeness gate failed: null rates = {null_rates}")

# Gate 3: Distribution (the check that would have caught the session bug)
from pyspark.sql.functions import expr
session_median = features_df.approxQuantile("session_count_7d", [0.5], 0.01)[0]
action = feature_gates.evaluate("median_session_count_min", session_median)
if action == QualityGateAction.HALT:
raise ValueError(f"Session count median gate failed: median = {session_median}")

# All gates passed
features_df.write.format("delta").mode("overwrite").save("s3://lake/features/users/")
print(f"Features written successfully. Row count: {today_count:,}")

Anomaly Detection on Pipeline Metrics

Beyond static thresholds, production pipelines use anomaly detection to catch issues that don't violate predefined rules but are statistically unusual compared to historical patterns.

import numpy as np
from collections import deque

class TimeSeriesAnomalyDetector:
"""
Detect anomalies in pipeline metrics using a rolling window control chart.
Uses 3-sigma rule: value beyond 3 standard deviations from rolling mean is flagged.
Production systems often use Prophet or ARIMA for more sophisticated detection.
"""
def __init__(self, window_size: int = 14, sigma_threshold: float = 3.0):
self.window_size = window_size
self.sigma_threshold = sigma_threshold
self.history: dict[str, deque] = {}

def update(self, metric_name: str, value: float) -> dict:
if metric_name not in self.history:
self.history[metric_name] = deque(maxlen=self.window_size)

history = self.history[metric_name]

if len(history) >= 7:
mean = float(np.mean(list(history)))
std = float(np.std(list(history)))

if std > 0:
z_score = abs(value - mean) / std
is_anomaly = z_score > self.sigma_threshold
else:
is_anomaly = (value != mean)
z_score = 0.0

result = {
"metric": metric_name,
"value": value,
"rolling_mean": mean,
"rolling_std": std,
"z_score": z_score,
"is_anomaly": is_anomaly,
"severity": "high" if z_score > 5 else "medium" if is_anomaly else "ok"
}
else:
result = {
"metric": metric_name,
"value": value,
"is_anomaly": False,
"note": "Insufficient history (need >= 7 data points)"
}

history.append(value)
return result

# Daily pipeline health check
def run_daily_health_check(spark, features_path: str) -> list[dict]:
"""Comprehensive health check on daily feature pipeline output."""
detector = TimeSeriesAnomalyDetector()
features = spark.read.parquet(features_path)
anomalies = []

# Check row count trend
row_count = features.count()
result = detector.update("daily_row_count", row_count)
if result["is_anomaly"]:
anomalies.append(result)

# Check null rate stability
for col in ["tx_count_30d", "avg_amount_30d", "session_count_7d"]:
null_rate = features.filter(F.col(col).isNull()).count() / max(row_count, 1)
result = detector.update(f"null_rate:{col}", null_rate)
if result["is_anomaly"]:
anomalies.append(result)

# Check mean stability (the session bug would show up here)
for col in ["tx_count_30d", "session_count_7d"]:
mean_val = features.agg(F.mean(col)).collect()[0][0] or 0.0
result = detector.update(f"mean:{col}", float(mean_val))
if result["is_anomaly"]:
anomalies.append(result)

return anomalies

Incident Response for Data Quality Issues

class DataQualityIncidentPlaybook:
"""Standard operating procedure for data quality incidents."""

@staticmethod
def p1_response_steps() -> list[str]:
"""P1: Immediate model degradation affecting users."""
return [
"1. IMMEDIATE: Roll back to previous model version or enable rule-based fallback",
"2. IDENTIFY: Which feature(s) are affected? Check distribution dashboard",
"3. TRACE: Follow data lineage backward - which upstream service changed?",
"4. ISOLATE: Use default values for corrupted features instead of computed values",
"5. FIX: Correct the upstream pipeline issue",
"6. VALIDATE: Run quality gates on fixed data before re-enabling model",
"7. RETRAIN: If corrupted features reached model training, retrain from fresh data",
"8. POST-MORTEM: Add quality gate that would have caught this issue within 1 hour"
]

@staticmethod
def p2_response_steps() -> list[str]:
"""P2: Data quality issue detected, model still serving but may degrade."""
return [
"1. ASSESS: How long has the issue existed? Check git blame for upstream changes",
"2. FREEZE: Disable automated retraining until root cause is identified",
"3. IDENTIFY: Which pipeline stage introduced the issue?",
"4. FIX and VALIDATE: Correct and run full quality gate suite",
"5. BACKFILL: Reprocess any corrupted feature dates from raw data",
"6. MONITOR: Watch model metrics for 48h after fix",
"7. POST-MORTEM: Document, add prevention measures, update data contracts"
]

Common Mistakes

:::danger No Quality Gates in ML Pipelines The most expensive mistake: a feature pipeline that computes features and writes them to storage with no validation. When something goes wrong - and it will - you discover it from degraded model metrics, not from a quality alert, days later. Every pipeline stage that produces features must have quality gates that run before the write commits. Fail fast and loudly. :::

:::danger Monitoring Input Data but Not Output Features Many teams validate raw input data (schema of Kafka messages, API responses) but not output features. Input validation catches upstream schema changes. But feature bugs - wrong window computations, incorrect joins, timezone issues, format mismatches - produce structurally valid output that only shows up on distribution checks. Monitor both input and output of every pipeline stage. :::

:::warning Setting Thresholds Once and Never Revisiting Data distributions change over time due to seasonality, product changes, and user base growth. A session count median threshold of 1.5 calibrated when DAU was 1M may be wrong at 50M DAU. Review quality thresholds quarterly or after any significant product change. Automate threshold suggestions by using the p5 and p95 of the rolling 30-day distribution as dynamic thresholds. :::

:::warning Not Validating Validation Sets Every ML team validates training data. Few validate validation sets. But validation sets are equally susceptible to temporal leakage, data quality issues, and distribution shift. Run the same quality checks on your validation set as your training set. A corrupted validation set produces misleading evaluation metrics that propagate into model selection decisions. :::

Interview Q&A

Q1: What are the five dimensions of data quality and how do you check each one for ML features?

Completeness: all expected records are present and required fields are populated. Check row counts against daily baselines and null rates per column. Alert when row count drops more than 20% from prior day or null rate exceeds 1% for required fields.

Accuracy: values are within semantically correct ranges. Check min/max ranges, allowed value sets (enum validation), and cross-field consistency rules (purchases must have positive amounts, end_time must be after start_time).

Consistency: related fields agree. Check that derived fields are logically consistent with source fields and that temporal ordering constraints hold.

Timeliness: data is fresh enough for its intended use. Check the maximum timestamp in the dataset against the expected arrival time. For ML features that must be under 1 hour stale, alert when the maximum event timestamp is more than 1 hour behind current time.

Validity: data conforms to the defined schema and format. Check data types, string formats (UUID format for IDs, ISO 8601 for timestamps), and referential integrity.

Q2: What is the Population Stability Index and how do you use it to detect training-serving skew?

PSI measures distribution shift between a reference (training) distribution and a current (serving) distribution. The formula: PSI=(PiQi)×ln(Pi/Qi)PSI = \sum (P_i - Q_i) \times \ln(P_i/Q_i) where bins are created using equal-frequency binning on the reference data.

Interpretation: PSI below 0.1 = no significant change; 0.1 to 0.2 = small shift worth monitoring; above 0.2 = significant shift likely impacting model performance.

Production monitoring workflow: (1) store a 1% sample of features used for every serving request; (2) run daily PSI computation comparing the last 7 days of serving features against the training distribution; (3) emit PSI per feature as a metric to your observability system; (4) alert when PSI exceeds 0.2 for any top-10 most important feature. When multiple features drift simultaneously, suspect an upstream pipeline change rather than genuine distribution shift - and investigate data lineage.

Q3: What is a data contract and how does it prevent ML pipeline failures?

A data contract is a formal schema-and-quality agreement between a data producer (application service) and its consumers (ML feature pipelines). It specifies what fields exist, their types and constraints, nullability, and quality SLAs like completeness and latency.

Without contracts: an application team deploys a new service version that changes a field format - say, session_id changes from uuid-v4 to a new format. The ML feature pipeline's JOIN ON session_id silently stops matching. Features become zero. The model degrades. The application team never knew they broke anything.

With contracts: the contract specifies the session_id format. An automated validator runs on every batch of incoming data. When the new-format session IDs fail the format check, an alert fires immediately, the application team is notified before deployment, and the ML team can update the join logic before data is corrupted.

Contracts are stored in schema registries (Confluent Schema Registry for Kafka, custom registries for batch pipelines) and validated automatically by the consuming pipeline on every run.

Q4: How do you build a quality gate that prevents a model from training on bad data?

A quality gate is a pre-training validation checkpoint that halts the pipeline if data quality is below threshold. Build it in layers:

Layer 1 - Freshness: check that the training dataset was generated within the expected freshness window. If the daily feature job failed silently 2 days ago, training on stale features is worse than skipping training.

Layer 2 - Row count: check that the training dataset has a reasonable number of rows compared to historical training runs (within 20%). A sudden 50% drop in training data suggests a pipeline failure.

Layer 3 - Feature completeness: null rates for required features must be below threshold. Use Great Expectations expect_column_values_to_not_be_null(mostly=0.99).

Layer 4 - Distribution check: PSI for key features must be under 0.2 compared to the prior training dataset. This catches the case where data is complete and structurally valid but statistically wrong.

Layer 5 - Post-training validation: before deploying, compare the new model's validation metrics against the previous production model. If the new model is worse by more than X%, halt deployment and keep the current model running.

If any layer fails: send an alert with the specific violation, do not proceed to the next pipeline stage, keep the current production model serving, and create an incident for investigation.

Q5: How do you detect when the features served by a production model have drifted from the features it was trained on?

This requires a three-part monitoring setup:

First, log serving features: at model serving time, sample 1% of all inference requests and log the feature values used. Store these logs in a time-series table (S3 + Parquet, partitioned by day).

Second, run drift detection: daily job computes PSI for each feature by comparing the last 7 days of serving logs against the training dataset's feature distribution. Emit PSI metrics to your observability system (Prometheus/Datadog/CloudWatch) with feature name as a dimension.

Third, alert and investigate: alert when PSI exceeds 0.1 for any feature in the model's top-10 by importance. When alerted, distinguish between: (a) genuine distribution shift - gradual drift in user behavior or market conditions, suggests retraining; (b) pipeline failure - sudden large shift in multiple features simultaneously, suggests upstream data issue, requires pipeline investigation before retraining; (c) preprocessing inconsistency - features drift immediately after model deployment, suggests the serving preprocessing logic differs from training, the worst case.

The key diagnostic: plot PSI over time per feature. Gradual drift looks like a slow upward trend. Pipeline failures look like a step change. Preprocessing inconsistencies look like an immediate jump at deployment time.

Summary

Data quality is the discipline that determines whether ML systems are reliable in production. Schema validation catches structural problems immediately. Distribution monitoring detects semantic drift over time. Data contracts prevent silent breakage from upstream changes. Quality gates prevent bad data from reaching model training or serving.

The production principle: every data pipeline that produces ML features should validate its output before committing the write. A pipeline that fails loudly on bad data is infinitely more valuable than one that silently produces corrupt features.

The habit to build: add at least three quality checks to every feature pipeline you write - a row count check, a null rate check on key columns, and a median/mean check on the most important numerical features. Those three checks would have prevented the session_count_7d incident that opened this lesson.

:::tip Key Takeaway The highest-ROI investment in an ML platform is data quality monitoring, not model architecture research. A 10-hour investment in Great Expectations expectations and PSI monitoring will prevent more production incidents than a week of hyperparameter tuning. Instrument your data pipelines as thoroughly as you instrument your software services - because they are software services that happen to produce data instead of API responses. :::

© 2026 EngineersOfAI. All rights reserved.