Skip to main content

Data Contracts

Two Weeks of Silent Degradation​

The on-call engineer gets a PagerDuty alert at 11 PM: "recommendation CTR dropped 8% vs 7-day average." She opens the dashboard. The model's prediction scores are still within normal range. The feature distributions look slightly different but nothing obviously wrong.

The next morning, the ML team digs deeper. They find that two weeks ago, the data platform team updated the user segmentation logic: what was previously called segment=3 (budget-conscious shoppers) was recategorized into segment=3a (budget-urban) and segment=3b (budget-suburban). The ML model was never notified. Its feature pipeline still produced segment values of 1 through 5, but the semantics had changed: segment 3 now only captured urban budget shoppers, while segment 3b was a new, unlabeled category that defaulted to 0 in the pipeline.

The model had been silently making predictions on corrupted features for 14 days before anyone noticed. The CTR drop was real. The model needed retraining on data with the new segmentation.

The schema of the column user_segment had not changed - it was still an integer. The column's meaning had changed. A schema validator would not have caught this. A data contract would have.


:::tip šŸŽ® Interactive Playground Visualize this concept: Try the Data Contracts & Schema Validation demo on the EngineersOfAI Playground - no code required. :::

What Is a Data Contract​

A data contract is a formal, enforceable agreement between a data producer (the team that creates or updates a dataset) and a data consumer (the team that uses it, typically for ML training or serving). It specifies:

  1. Schema contract: column names, types, nullability, value ranges
  2. Statistical contract: distributions, means, standard deviations, cardinalities
  3. Semantic contract: the meaning of values - what does segment=3 represent?
  4. SLA contract: freshness (when will data be updated?), completeness (what fraction of records is expected?)
  5. Versioning contract: when the contract changes, how will consumers be notified?

A contract is not documentation. Documentation can be ignored. A contract is code that fails the data pipeline if violated - creating an automatic notification to both producer and consumer.


Why Contracts Fail Without Code​

Most data teams try to enforce contracts through documentation and communication. This fails for predictable reasons:

  • Documentation goes stale when the pipeline changes
  • Slack messages get missed, especially across team boundaries
  • Schema-only validation misses semantic changes (the segment=3 problem)
  • Manual validation requires an engineer to remember to run it
  • Violations are discovered weeks later when a model degrades, not when the bad data is written

Encoded contracts - checked automatically in CI - solve all of these.


Contract Architecture​


Schema Contracts with Pandera​

Pandera is a Python library for DataFrame schema validation. It is the most ergonomic tool for schema contracts on pandas and Spark DataFrames.

pip install pandera

Basic Schema Contract​

import pandera as pa
from pandera.typing import DataFrame, Series
import pandas as pd
import numpy as np

# Define the contract as a Python class
class UserFeaturesSchema(pa.DataFrameModel):
"""
Contract for the user_behavioral_features dataset.

This schema is co-owned by:
- Producer: data-platform team ([email protected])
- Consumer: ML team ([email protected])

Version: 3.1
Last updated: 2024-10-01
Change log: Added user_ltv_90d column. Deprecated user_segment_v2.
"""

user_id: Series[str] = pa.Field(
nullable=False,
unique=True,
description="Unique user identifier (UUID format)",
)

session_count_30d: Series[int] = pa.Field(
ge=0,
le=10_000, # sanity bound - more than 10k sessions is suspicious
nullable=True,
description="Number of sessions in the past 30 days",
)

purchase_amount_90d: Series[float] = pa.Field(
ge=0.0,
le=1_000_000.0,
nullable=True,
description="Total purchase amount in USD, past 90 days",
)

user_segment: Series[int] = pa.Field(
isin=[1, 2, 3, 4, 5], # SEMANTIC CONTRACT: valid segment values
nullable=False,
description=(
"User segment (1=price-sensitive, 2=convenience, 3=brand-loyal, "
"4=frequent, 5=VIP). "
"NOTE: Do not add new values without updating all ML consumers."
),
)

last_active_date: Series[pa.DateTime] = pa.Field(
nullable=True,
description="Date of most recent session",
)

user_ltv_90d: Series[float] = pa.Field(
ge=0.0,
nullable=True,
description="Predicted LTV over next 90 days (model score)",
)

class Config:
strict = True # fail on any unknown columns
coerce = False # do not silently coerce types - fail instead
name = "UserFeaturesSchema"
ordered = False

@pa.dataframe_check
def no_future_dates(cls, df: pd.DataFrame) -> bool:
"""last_active_date must not be in the future."""
today = pd.Timestamp.now().normalize()
valid = df["last_active_date"].dropna() <= today
return valid.all()

@pa.dataframe_check
def segment_distribution_not_dominated(cls, df: pd.DataFrame) -> bool:
"""No single segment should represent more than 60% of users."""
counts = df["user_segment"].value_counts(normalize=True)
return bool((counts <= 0.60).all())

Applying the Contract​

@pa.check_types
def load_and_validate_features(path: str) -> DataFrame[UserFeaturesSchema]:
"""
Load user features and validate against the contract.
Raises SchemaError if contract is violated.
"""
df = pd.read_parquet(path)
return df # @pa.check_types validates the return type

# Usage in training pipeline
try:
df = load_and_validate_features("s3://ml-data/user_features/v2024q3_002/")
print(f"Contract passed. Loaded {len(df):,} records.")
except pa.errors.SchemaError as e:
print(f"CONTRACT VIOLATION: {e}")
raise # re-raise to block the pipeline

Pandera for Spark (PySpark)​

import pandera.pyspark as pa_spark
from pandera.pyspark import DataFrameSchema, Column
from pyspark.sql.types import IntegerType, StringType, DoubleType

spark_schema = DataFrameSchema(
columns={
"user_id": Column(StringType(), nullable=False),
"session_count_30d": Column(
IntegerType(),
checks=[
pa_spark.Check.ge(0),
pa_spark.Check.le(10_000),
],
nullable=True,
),
"user_segment": Column(
IntegerType(),
checks=pa_spark.Check.isin([1, 2, 3, 4, 5]),
nullable=False,
),
},
strict=True,
)

# Validate a Spark DataFrame
errors = spark_schema.validate(df_spark, lazy=True)
if errors:
for error in errors:
print(f"VIOLATION: Column={error.column}, Check={error.check}")
raise ValueError(f"Contract violated: {len(errors)} errors")

Statistical Contracts with Great Expectations​

Great Expectations (GX) specializes in statistical contracts - checking that the data's distribution matches expectations, not just its schema.

pip install great_expectations

Defining a Statistical Contract​

import great_expectations as gx
from great_expectations.core import ExpectationSuite

# Initialize a Great Expectations data context
context = gx.get_context(mode="ephemeral")

# Define expectations for user_features
suite = context.suites.add(ExpectationSuite(name="user_features_v3_contract"))

# Add the dataset as a data source
datasource = context.data_sources.add_pandas_filesystem(
name="ml_data",
base_directory="s3://ml-data/user_features/",
)

# Define statistical expectations
expectations = [
# --- Schema expectations ---
gx.expectations.ExpectTableColumnsToMatchSet(
column_set=["user_id", "session_count_30d", "purchase_amount_90d",
"user_segment", "last_active_date", "user_ltv_90d"],
exact_match=True,
),
gx.expectations.ExpectColumnValuesToNotBeNull(column="user_id"),
gx.expectations.ExpectColumnValuesToBeUnique(column="user_id"),

# --- Type expectations ---
gx.expectations.ExpectColumnValuesToBeOfType(
column="user_segment", type_="int64"
),

# --- Value set expectations ---
gx.expectations.ExpectColumnDistinctValuesToBeInSet(
column="user_segment",
value_set={1, 2, 3, 4, 5},
),

# --- Statistical expectations ---
gx.expectations.ExpectColumnMeanToBeBetween(
column="session_count_30d",
min_value=2.0,
max_value=50.0,
notes="Historical mean is ~12. Alarm if outside 2x of that.",
),
gx.expectations.ExpectColumnQuantileValuesToBeBetween(
column="purchase_amount_90d",
quantile_ranges={
"quantiles": [0.25, 0.50, 0.75, 0.90, 0.99],
"value_ranges": [
[0, 100],
[0, 500],
[50, 2000],
[100, 5000],
[500, 100000],
],
},
notes="Distribution bounds from Q3 2024 baseline. Update if business model changes.",
),
gx.expectations.ExpectColumnValuesToBeBetween(
column="purchase_amount_90d",
min_value=0.0,
max_value=1_000_000.0,
),

# --- Completeness expectations ---
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=10_000_000,
max_value=25_000_000,
notes="Expect 15M ± 40% users. Alert if far outside.",
),
gx.expectations.ExpectColumnValuesToNotBeNull(
column="user_segment",
mostly=0.99, # 99% non-null - allow 1% for new users
),
]

for expectation in expectations:
suite.expectations.append(expectation)

# Run validation
batch_definition = datasource.add_batch_definition_whole_directory(
name="user_features_batch",
batching_regex=r"part-\d+-.*\.parquet",
)

validation_result = context.run_validation_definition(
definition=gx.ValidationDefinition(
name="user_features_validation",
data=batch_definition,
suite=suite,
)
)

if not validation_result.success:
failed = [r for r in validation_result.results if not r.success]
print(f"CONTRACT VIOLATED: {len(failed)} expectations failed")
for r in failed:
print(f" FAIL: {r.expectation_config.expectation_type} - {r.result}")
raise ValueError("Data contract validation failed - blocking downstream pipeline")

print(f"Contract passed: {len(validation_result.results)} checks, "
f"{validation_result.statistics['successful_expectations']} passed")

SLA Contracts: Freshness and Completeness​

Schema and statistical contracts check what the data contains. SLA contracts check when the data arrives and whether it is complete.

from datetime import datetime, timedelta
import pandas as pd
import boto3

@dataclass
class DataSLAContract:
"""SLA contract for a dataset."""
dataset_name: str
expected_update_frequency: str # "daily", "hourly", "weekly"
max_staleness_hours: float # alert if data is older than this
min_completeness: float # min fraction of expected records
expected_record_count: int # rough expected count (±30%)

def check_data_sla(
table_path: str,
contract: DataSLAContract,
spark=None,
) -> dict:
"""Check SLA contract for a dataset."""
violations = []
results = {}

# 1. Freshness check
s3 = boto3.client("s3")
bucket, prefix = table_path.replace("s3://", "").split("/", 1)

# Find the most recent file
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
if not response.get("Contents"):
violations.append("NO_DATA: No files found at dataset path")
return {"violations": violations, "passed": False}

latest_modified = max(
obj["LastModified"] for obj in response["Contents"]
)
age_hours = (datetime.now(latest_modified.tzinfo) - latest_modified).total_seconds() / 3600

results["data_age_hours"] = age_hours
results["last_modified"] = latest_modified.isoformat()

if age_hours > contract.max_staleness_hours:
violations.append(
f"STALE_DATA: Data is {age_hours:.1f} hours old, "
f"max allowed is {contract.max_staleness_hours} hours"
)

# 2. Record count / completeness check
if spark:
actual_count = spark.read.parquet(table_path).count()
results["actual_record_count"] = actual_count
completeness = actual_count / contract.expected_record_count
results["completeness"] = completeness

if completeness < contract.min_completeness:
violations.append(
f"LOW_COMPLETENESS: {actual_count:,} records found, "
f"expected ~{contract.expected_record_count:,} "
f"({100*completeness:.1f}% < {100*contract.min_completeness:.0f}% threshold)"
)

results["violations"] = violations
results["passed"] = len(violations) == 0
return results

# Define and check SLA
sla = DataSLAContract(
dataset_name="user_behavioral_features",
expected_update_frequency="daily",
max_staleness_hours=26.0, # daily update + 2 hour buffer
min_completeness=0.85, # at least 85% of expected records
expected_record_count=15_000_000,
)

result = check_data_sla("s3://ml-data/user_features/latest/", sla, spark=spark)
if not result["passed"]:
send_alert(contract=sla, violations=result["violations"])
raise ValueError(f"SLA violation: {result['violations']}")

Semantic Contracts: Catching the Hardest Bugs​

The segment=3 problem from our opening scenario is a semantic change - the column type and range are still valid, but the meaning changed. Catching this requires defining semantic invariants.

class SemanticContract:
"""
Semantic invariants that must hold across data versions.
Detects meaning changes even when schema is unchanged.
"""

def __init__(self, baseline_stats: dict):
"""
baseline_stats: statistical properties of a known-good dataset version.
Generated by: contract.compute_baseline(df_baseline)
"""
self.baseline = baseline_stats

@classmethod
def compute_baseline(cls, df: pd.DataFrame) -> dict:
"""Compute semantic baseline statistics from a known-good dataset."""
numeric_cols = df.select_dtypes(include="number").columns
categorical_cols = df.select_dtypes(include="object").columns

stats = {}
for col in numeric_cols:
stats[col] = {
"mean": df[col].mean(),
"std": df[col].std(),
"median": df[col].median(),
"q10": df[col].quantile(0.1),
"q90": df[col].quantile(0.9),
}

for col in categorical_cols:
# Store the value distribution
stats[col] = {
"value_frequencies": df[col].value_counts(normalize=True).to_dict(),
"n_unique": df[col].nunique(),
}

return stats

def check_distribution_drift(
self,
df_new: pd.DataFrame,
numeric_z_threshold: float = 3.0,
categorical_psi_threshold: float = 0.2,
) -> list:
"""
Check for semantic drift using PSI (Population Stability Index)
and z-score for numeric columns.
PSI < 0.1: no change; 0.1-0.2: slight change; > 0.2: significant change
"""
violations = []

for col, baseline in self.baseline.items():
if col not in df_new.columns:
violations.append({"col": col, "type": "MISSING_COLUMN"})
continue

if "mean" in baseline:
# Numeric column: z-score test
new_mean = df_new[col].mean()
z_score = abs(new_mean - baseline["mean"]) / (baseline["std"] + 1e-10)
if z_score > numeric_z_threshold:
violations.append({
"col": col,
"type": "DISTRIBUTION_DRIFT",
"metric": "mean_z_score",
"value": z_score,
"threshold": numeric_z_threshold,
"baseline_mean": baseline["mean"],
"current_mean": new_mean,
})

elif "value_frequencies" in baseline:
# Categorical column: PSI
psi = compute_psi(
baseline["value_frequencies"],
df_new[col].value_counts(normalize=True).to_dict()
)
if psi > categorical_psi_threshold:
violations.append({
"col": col,
"type": "SEMANTIC_DRIFT",
"metric": "PSI",
"value": psi,
"threshold": categorical_psi_threshold,
"note": (
"Category distribution shifted significantly. "
"Check if category MEANINGS have changed, "
"not just their frequencies."
),
})

return violations


def compute_psi(expected: dict, actual: dict, epsilon: float = 1e-10) -> float:
"""Population Stability Index between two categorical distributions."""
all_categories = set(expected.keys()) | set(actual.keys())
psi = 0.0
for cat in all_categories:
e = expected.get(cat, epsilon)
a = actual.get(cat, epsilon)
psi += (a - e) * np.log(a / e)
return psi

Contract Testing in CI​

Contracts must run automatically. Manual validation is not a contract - it is a wishful guideline.

# .github/workflows/data-contracts.yml
name: Data Contract Validation

on:
schedule:
- cron: "0 6 * * *" # Run daily at 6 AM, before training jobs
workflow_dispatch: # Allow manual trigger

jobs:
validate-contracts:
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install dependencies
run: |
pip install pandera great_expectations boto3 pyspark

- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
aws-region: us-east-1

- name: Run schema contracts
run: python scripts/validate_schema_contracts.py
env:
DATASET_PATH: s3://ml-data/user_features/latest/
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}

- name: Run statistical contracts
run: python scripts/validate_statistical_contracts.py

- name: Run SLA contracts
run: python scripts/validate_sla_contracts.py

- name: Run semantic drift checks
run: python scripts/validate_semantic_contracts.py

- name: Notify on failure
if: failure()
uses: rtCamp/action-slack-notify@v2
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_COLOR: "danger"
SLACK_TITLE: "DATA CONTRACT VIOLATION"
SLACK_MESSAGE: "Contract validation failed. Training pipeline blocked. See logs."

GitLab CI Integration​

# .gitlab-ci.yml
data-contract-validation:
stage: validate
image: python:3.11-slim
schedule:
- cron: "0 5 * * *" # daily before training
script:
- pip install pandera great_expectations
- python scripts/validate_schema_contracts.py
- python scripts/validate_statistical_contracts.py
after_script:
- |
if [ $CI_JOB_STATUS = "failed" ]; then
python scripts/send_contract_violation_alert.py \
--channel "#ml-data-alerts" \
--job-url "$CI_JOB_URL"
fi

Violation Alerting​

A contract that fires silently is not a contract. Violations must reach both the producer and the consumer.

import requests
import json
from dataclasses import dataclass
from typing import Optional

def send_contract_violation_alert(
violations: list[dict],
contract_name: str,
dataset_name: str,
producer_channel: str,
consumer_channel: str,
slack_webhook_url: str,
pagerduty_routing_key: Optional[str] = None,
):
"""
Send contract violation alerts to producer AND consumer teams.
Optionally page on-call if violations are critical.
"""
severity = "critical" if any(v.get("type") == "MISSING_COLUMN" for v in violations) \
else "warning"

violation_text = "\n".join([
f"• `{v.get('col', 'table')}`: {v['type']} - {v.get('note', '')}"
for v in violations[:5] # limit to first 5 for readability
])
if len(violations) > 5:
violation_text += f"\n• ... and {len(violations) - 5} more"

slack_payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f":warning: DATA CONTRACT VIOLATION: {contract_name}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Dataset:*\n`{dataset_name}`"},
{"type": "mrkdwn", "text": f"*Severity:*\n`{severity}`"},
{"type": "mrkdwn", "text": f"*Violations:*\n{violation_text}"},
{"type": "mrkdwn", "text": (
f"*Action Required:*\n"
f"Data producer: fix the violation and re-publish\n"
f"Data consumer: do NOT run training until resolved"
)},
]
},
]
}

# Alert the consumer team
requests.post(
f"{slack_webhook_url}?channel={consumer_channel}",
json=slack_payload
)

# Alert the producer team (so they know what they broke)
requests.post(
f"{slack_webhook_url}?channel={producer_channel}",
json={
**slack_payload,
"text": (
f":rotating_light: Your dataset `{dataset_name}` violated the "
f"data contract that the ML team depends on. "
f"Please fix before the next training run."
)
}
)

# Page on-call for critical violations
if pagerduty_routing_key and severity == "critical":
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": pagerduty_routing_key,
"event_action": "trigger",
"payload": {
"summary": f"Data contract violation: {contract_name}",
"severity": "critical",
"source": "data-contract-ci",
"custom_details": {"violations": violations},
},
}
)

Contract Governance​

Contracts must be versioned and co-owned. Establish these norms before your first contract:

  1. Contracts live in git: the canonical contract definition is a Python file in the shared ML repository. Changes are PRs, reviewed by both producer and consumer.

  2. Contracts have versions: UserFeaturesSchema_v3_1. When the producer makes a breaking change, they bump the major version and give consumers a migration window (typically 2–4 weeks) before the old schema is removed.

  3. Breaking vs. non-breaking changes:

    • Non-breaking: adding a new nullable column, relaxing a constraint
    • Breaking: removing a column, changing a column's type, changing value semantics, tightening a constraint
  4. Change notification: any change to a contract requires an issue filed in the consuming team's tracker, not just a Slack message.


Common Mistakes​

:::danger Using Schema Validation Only for Type Checking Type checking (user_segment is an integer) catches the least interesting bugs. Value set validation (user_segment is in {1,2,3,4,5}) catches the next layer. Statistical validation (the distribution of user_segment has not shifted by more than 0.2 PSI) catches semantic changes. You need all three layers. :::

:::danger Alerting Only the Consumer on Contract Violations The consumer team discovering a violation does not help if the producer team does not know they caused it. Always alert both: the producer (so they fix it), and the consumer (so they do not run training on bad data). The producer alert should include enough information for them to identify what changed. :::

:::warning Not Versioning Contracts Alongside Data A contract that validates v2024q3_001 of a dataset may be invalid for v2024q3_002 if the schema changed intentionally. Version your contracts alongside your datasets. Store the contract version used for each training run in your experiment tracker. :::

:::warning Running Contract Checks Only at Training Time If a contract violation is discovered 6 hours into a training run, you have wasted 6 GPU-hours. Run contracts immediately when data is written, not when it is consumed. Integrate contract checks into the data producer's pipeline as a post-write validation step. :::


Interview Q&A​

Q: What is a data contract, and how is it different from schema validation?

A: A data contract is a formal, enforceable agreement between data producers and consumers specifying the structure, statistical properties, semantics, and SLAs of a dataset. Schema validation is one component of a data contract - it checks column names, types, and nullability. But schema validation alone misses: value set violations (the column is still an integer, but valid values changed from {1,2,3} to {1,2,3,4}), distribution drift (column values are in the right range but the distribution has shifted significantly), semantic changes (column values are valid but their meaning changed), and SLA violations (data arrives 4 hours late). A full data contract checks all of these.

Q: How would you implement a data contract between two teams that do not share a codebase?

A: Use a contract-as-code approach with a shared, git-hosted contract library that both teams depend on. The contract is a Python package that the producer's pipeline imports for post-write validation and the consumer's pipeline imports for pre-read validation. Both teams submit PRs to update the contract. Breaking changes (column removals, type changes) require a migration window: the old schema version remains valid for a specified period, giving consumers time to adapt. CI in both codebases runs the contract checks. Violations alert both teams' Slack channels.

Q: What is PSI (Population Stability Index) and why is it used for semantic drift detection?

A: PSI measures how much a distribution has changed between a baseline period and a current period. The formula is PSI=āˆ‘i(Aiāˆ’Ei)ln⁔(Ai/Ei)\text{PSI} = \sum_i (A_i - E_i) \ln(A_i / E_i) where AiA_i is the actual proportion in category ii and EiE_i is the expected proportion from the baseline. Interpretation: PSI below 0.1 means no significant change; 0.1–0.2 means moderate change worth investigating; above 0.2 means significant change that likely affects model performance. PSI is used for semantic drift detection because a semantic change (a category's meaning changes) typically manifests as a distribution shift: users who were previously classified as segment 3 are now split between 3a and 3b, shifting the distribution of the user_segment column.

Q: How do you handle a contract violation that is discovered while a model is already in production?

A: Two immediate actions: (1) alert the data producer to revert or fix the violating change; (2) block the next retraining run until the contract is satisfied, so a new model is not trained on corrupted features. For the currently deployed model: assess whether the production data the model is receiving is also corrupted (if features are computed from the same violating dataset, yes). If the serving features are corrupted, consider rolling back to a previous model version while the data issue is resolved. Document the incident: when the violation started, what the impact on model predictions was, and what the fix was. Update the contract to catch this class of violation earlier (if a statistical check would have caught it, add that check).

Q: What is the difference between a hard contract (fail the pipeline) and a soft contract (warn only)?

A: A hard contract blocks the pipeline when violated - the downstream training job does not run. Use hard contracts for: schema violations (missing required columns, wrong types), value set violations (values outside the allowed set), and large distribution shifts (PSI above 0.5). A soft contract logs a warning and continues. Use soft contracts for: minor distribution shifts (PSI 0.1–0.2), slight SLA lateness (data is 1 hour late instead of on time), or novel values that might be legitimate expansion (new category values that were not in the baseline). The rule: if training on violating data could silently produce a worse model without any visible error, use a hard contract. If the violation is informational and the team can decide manually, use a soft contract.

Ā© 2026 EngineersOfAI. All rights reserved.