Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Quality Checks demo on the EngineersOfAI Playground - no code required. :::

Data Contracts - The API Contract for Data Pipelines

Reading time: ~24 minutes | Level: Data Engineering → AI Systems

The upstream team changed the user_id field from int to string. They had a good reason: the system was migrating from sequential integer IDs to GUIDs to support multi-region sharding. The change was backward compatible in their view - short GUIDs look like numbers. They updated their README. They sent a Slack message to #data-infra. They did not update any machine-readable contract.

Three downstream ML pipelines picked up the change silently. Each one cast the new string IDs using int(). Short GUIDs like "12847" parsed correctly for about two weeks. Then the system started generating GUIDs with alphabetic characters. The casts returned NaN. The feature pipelines did not fail - they propagated NaN into the feature store. The models trained on the next cycle included two months of poisoned user_id join keys, producing features that matched nothing at serving time.

No exception was raised. No alert fired. The model's offline metrics looked fine - the training set was internally consistent. The degradation appeared in a monthly business review, attributed to "seasonal factors." It took a senior engineer three days of log archaeology to find the field type change.

This is the canonical data contract failure. It happens because data teams treat data like a database table - something that can change quietly - rather than like an API - something with a contract, a version, and a breaking-change policy.

Data contracts are the solution. They are machine-readable, versioned agreements between a data producer and every consumer. They specify what the data contains, what it means, how fresh it will be, and who owns it when it breaks. They do not prevent all failures. But they make violations loud and immediate rather than silent and delayed.


What You Will Learn

  • What a data contract is - and what distinguishes it from a database schema
  • The open-source data contract specification (YAML structure)
  • How to implement contract validation in Python - programmatic enforcement
  • Contract lifecycle: define → publish → validate → monitor → evolve
  • Versioning strategy for breaking vs. non-breaking changes
  • Enforcement patterns across CI/CD pipelines and at read time
  • Real tools: Data Contract CLI, Soda Core, dbt contracts, Monte Carlo
  • Interview questions and how to answer "how do you convince upstream teams to adopt contracts"

Prerequisites

  • Lesson 01 - Data Quality Dimensions (understand the five dimensions)
  • Basic Python: classes, dataclasses, type annotations
  • Familiarity with YAML syntax
  • Awareness that ML pipelines have upstream/downstream dependency structures

Part 1 - Why Schemas Alone Are Not Enough

The most common response to the user_id type change story is: "Just use schema validation." It is a reasonable instinct. The problem is that schema validation only catches structural problems.

A schema tells you:

  • The column exists
  • The column has a specific data type
  • The column may or may not be nullable

A schema does not tell you:

  • What the column means (is it a stable join key, or does it recycle IDs after deletion?)
  • What values are semantically valid (not just structurally typed)
  • How fresh the data should be when it arrives
  • Who is responsible when it breaks
  • What downstream systems are allowed to depend on it
  • When and how it is allowed to change

This is the gap between a schema and a contract. Schema is syntax. A contract adds semantics, operational guarantees, and ownership.

:::note Historical Context The concept of data contracts gained formal momentum around 2022 when Chad Sanderson (then at Convoy) articulated the "data contract" pattern as a solution to the coordination problem in data mesh architectures. Andrew Jones at PayPal formalized the open-source YAML specification in 2023. The insight was simple but transformative: data pipelines should treat data interfaces the same way software engineers treat API interfaces - with explicit, versioned, machine-enforceable contracts. :::


Part 2 - The Data Contract Specification

The open-source data contract specification defines a standard YAML structure that any tool or team can use. Here is the full structure for a production-grade contract:

# data-contracts/user_events.contract.yaml
dataContractSpecification: 0.9.3
id: urn:datacontract:user-events:v2
info:
title: User Events
version: 2.1.0
status: active
description: >
Clickstream events for all authenticated users.
Produced by the web tracking service. Primary consumer:
recommendation engine feature pipeline.
owner: platform-data-team
contact:
name: Platform Data Team
url: https://wiki.internal/teams/platform-data
email: platform-[email protected]
slackChannel: "#platform-data-oncall"

servers:
production:
type: kafka
host: kafka.prod.company.com:9092
topic: user.events.v2
format: avro
schemaRegistry: https://schema-registry.prod.company.com

staging:
type: kafka
host: kafka.staging.company.com:9092
topic: user.events.v2.staging
format: avro

terms:
usage: >
Consumers may use this dataset for ML feature computation,
analytics, and product experimentation. Real-time consumer
decisions (fraud, content moderation) are prohibited without
explicit approval from the data governance board.
limitations: >
Data is retained for 90 days in the Kafka topic.
Historical data is available in S3 at s3://data/user-events/.
billing: Internal - no chargeback.
noticePeriod: P30D # 30 days notice before breaking changes

models:
user_event:
description: A single user interaction event.
type: object
fields:
event_id:
description: Globally unique event identifier (UUID v4).
type: string
format: uuid
required: true
unique: true
example: "550e8400-e29b-41d4-a716-446655440000"

user_id:
description: >
Stable user identifier. Assigned at registration and
never reused. Encoded as UUID v4 string since 2024-01.
Legacy integer IDs were migrated; all IDs are now UUIDs.
type: string
format: uuid
required: true
pii: true
classification: internal

event_type:
description: The user action that generated this event.
type: string
required: true
enum:
- page_view
- product_click
- add_to_cart
- purchase
- search
- review_submitted

event_timestamp:
description: >
UTC timestamp of the event as recorded by the client.
Monotonically increasing per user_id within a session.
type: timestamp
required: true
format: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"

session_id:
description: Groups events into a contiguous browsing session.
type: string
required: false # null for mobile app events pre-2024
example: "sess_8xK2mNpQrL"

properties:
description: Event-specific metadata as key-value pairs.
type: object
required: false

quality:
type: SodaCL
specification:
checks for user_event:
- row_count > 10000:
name: Minimum event volume (alerts on pipeline failure)
- duplicate_count(event_id) = 0:
name: Event IDs must be globally unique
- missing_count(user_id) = 0:
name: All events must have a user_id
- missing_count(event_type) = 0:
name: Event type is required
- invalid_count(event_type) = 0:
valid values: [page_view, product_click, add_to_cart, purchase, search, review_submitted]
name: Event type must be in approved enum
- freshness(event_timestamp) < 10m:
name: Events must arrive within 10 minutes of occurrence

servicelevels:
availability:
description: Kafka topic availability
percentage: "99.9%"
retention:
description: Data available in Kafka
period: P90D
freshness:
description: Maximum lag between event occurrence and topic availability
threshold: PT10M # ISO 8601 duration: 10 minutes
completeness:
description: Percentage of events that arrive without data loss
percentage: "99.5%"

This YAML is not documentation. It is a machine-readable specification that tooling can parse, validate against, and monitor in production.


Part 3 - Implementing Contract Validation in Python

The YAML contract is useful as a specification. But contracts only create value when they are enforced - programmatically, automatically, and loudly when violated.

Here is a production-grade DataContract class that parses the YAML and enforces it:

from __future__ import annotations

import re
import yaml
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any

import pandas as pd


@dataclass
class FieldSpec:
name: str
dtype: str
required: bool
unique: bool = False
enum: list[str] | None = None
format: str | None = None
min_value: float | None = None
max_value: float | None = None
description: str = ""


@dataclass
class QualitySpec:
min_row_count: int = 0
max_freshness_minutes: int | None = None
timestamp_column: str | None = None


@dataclass
class ContractViolation:
field: str
rule: str
severity: str # "critical" or "warning"
message: str
sample_values: list = field(default_factory=list)


class DataContract:
"""
Parses a data contract YAML and validates a DataFrame against it.

Usage:
contract = DataContract.from_yaml("contracts/user_events.contract.yaml")
violations = contract.validate(df)
contract.enforce(df) # raises ContractViolationError if critical violations exist
"""

def __init__(self, spec: dict):
self.spec = spec
self.id = spec.get("id", "unknown")
self.version = spec.get("info", {}).get("version", "0.0.0")
self.title = spec.get("info", {}).get("title", "Untitled Contract")
self._parse_fields()
self._parse_quality()

@classmethod
def from_yaml(cls, path: str) -> "DataContract":
with open(path) as f:
spec = yaml.safe_load(f)
return cls(spec)

def _parse_fields(self):
self.fields: dict[str, FieldSpec] = {}
models = self.spec.get("models", {})
for model_name, model_def in models.items():
for col_name, col_def in model_def.get("fields", {}).items():
self.fields[col_name] = FieldSpec(
name=col_name,
dtype=col_def.get("type", "string"),
required=col_def.get("required", False),
unique=col_def.get("unique", False),
enum=col_def.get("enum"),
format=col_def.get("format"),
min_value=col_def.get("minimum"),
max_value=col_def.get("maximum"),
description=col_def.get("description", ""),
)

def _parse_quality(self):
sla = self.spec.get("servicelevels", {})
freshness_str = sla.get("freshness", {}).get("threshold", None)
self.quality = QualitySpec(
max_freshness_minutes=self._parse_iso_duration_minutes(freshness_str),
)

@staticmethod
def _parse_iso_duration_minutes(duration_str: str | None) -> int | None:
"""Parse ISO 8601 duration like PT10M → 10, PT2H → 120."""
if not duration_str:
return None
match = re.match(r"PT?(\d+)([HM])", duration_str)
if not match:
return None
value, unit = int(match.group(1)), match.group(2)
return value * 60 if unit == "H" else value

def validate(self, df: pd.DataFrame, timestamp_col: str | None = None) -> list[ContractViolation]:
"""
Validate a DataFrame against the contract.
Returns a list of ContractViolation objects.
"""
violations: list[ContractViolation] = []

# 1. Schema - required columns present
for col_name, field_spec in self.fields.items():
if field_spec.required and col_name not in df.columns:
violations.append(ContractViolation(
field=col_name,
rule="required_column_present",
severity="critical",
message=f"Required column '{col_name}' is missing from the dataset.",
))
continue

if col_name not in df.columns:
continue

col = df[col_name]

# 2. Nullability
if field_spec.required:
null_count = col.isna().sum()
if null_count > 0:
violations.append(ContractViolation(
field=col_name,
rule="not_null",
severity="critical",
message=f"'{col_name}' has {null_count} null values ({null_count/len(df):.1%}).",
sample_values=df[col.isna()][col_name].head(3).tolist(),
))

# 3. Enum validation
if field_spec.enum:
invalid_mask = ~col.isna() & ~col.isin(field_spec.enum)
invalid_count = invalid_mask.sum()
if invalid_count > 0:
violations.append(ContractViolation(
field=col_name,
rule="enum_values",
severity="critical",
message=(
f"'{col_name}' has {invalid_count} values not in approved enum "
f"{field_spec.enum}."
),
sample_values=col[invalid_mask].unique()[:5].tolist(),
))

# 4. Uniqueness
if field_spec.unique:
dupe_count = col.duplicated().sum()
if dupe_count > 0:
violations.append(ContractViolation(
field=col_name,
rule="unique",
severity="critical",
message=f"'{col_name}' has {dupe_count} duplicate values.",
sample_values=col[col.duplicated(keep=False)].head(3).tolist(),
))

# 5. UUID format check
if field_spec.format == "uuid":
non_null = col.dropna()
def is_valid_uuid(v):
try:
uuid.UUID(str(v))
return True
except ValueError:
return False
invalid_uuids = ~non_null.apply(is_valid_uuid)
if invalid_uuids.sum() > 0:
violations.append(ContractViolation(
field=col_name,
rule="uuid_format",
severity="critical",
message=f"'{col_name}' has {invalid_uuids.sum()} values that are not valid UUIDs.",
sample_values=non_null[invalid_uuids].head(3).tolist(),
))

# 6. Freshness check
ts_col = timestamp_col or self.quality.timestamp_column
max_fresh = self.quality.max_freshness_minutes
if ts_col and max_fresh and ts_col in df.columns:
latest_ts = pd.to_datetime(df[ts_col]).max()
if latest_ts.tzinfo is None:
latest_ts = latest_ts.replace(tzinfo=timezone.utc)
now = datetime.now(tz=timezone.utc)
staleness_min = (now - latest_ts).total_seconds() / 60
if staleness_min > max_fresh:
violations.append(ContractViolation(
field=ts_col,
rule="freshness",
severity="critical",
message=(
f"Data is {staleness_min:.0f} minutes stale "
f"(SLA: max {max_fresh} minutes). "
f"Latest record: {latest_ts.isoformat()}"
),
))

return violations

def enforce(self, df: pd.DataFrame, timestamp_col: str | None = None) -> None:
"""
Validate and raise ContractViolationError if any critical violations exist.
Call this at pipeline ingestion boundaries.
"""
violations = self.validate(df, timestamp_col=timestamp_col)
critical = [v for v in violations if v.severity == "critical"]
warnings = [v for v in violations if v.severity == "warning"]

for w in warnings:
print(f"[CONTRACT WARNING] {w.field} / {w.rule}: {w.message}")

if critical:
lines = [f"Data contract '{self.id}' (v{self.version}) violated:"]
for v in critical:
lines.append(f" [CRITICAL] {v.field} / {v.rule}: {v.message}")
if v.sample_values:
lines.append(f" Sample values: {v.sample_values}")
raise ContractViolationError("\n".join(lines))

print(f"[CONTRACT OK] '{self.title}' v{self.version} - {len(df)} rows validated.")


class ContractViolationError(Exception):
"""Raised when a critical data contract violation is detected."""
pass

Now here is how you use it in a production Airflow task:

# dags/feature_pipeline/user_events_task.py
from airflow.decorators import task
from data_contracts import DataContract, ContractViolationError
import pandas as pd


@task
def validate_and_ingest_user_events(s3_path: str, **context) -> str:
"""
Reads raw events from S3, enforces the data contract,
and writes validated data to the feature store.
Fails the Airflow task (not just logs) on contract violation.
"""
contract = DataContract.from_yaml(
"/contracts/user_events.contract.yaml"
)

df = pd.read_parquet(s3_path)

try:
contract.enforce(df, timestamp_col="event_timestamp")
except ContractViolationError as e:
# This raises → Airflow marks task as FAILED → triggers alert
raise RuntimeError(
f"Contract validation failed for {s3_path}:\n{e}"
) from e

# Contract passed - safe to write to feature store
write_to_feature_store(df, table="user_events")
return f"Ingested {len(df)} events from {s3_path}"

Part 4 - The Data Contract Lifecycle

Contracts are not static documents. They have a lifecycle that mirrors the lifecycle of the data they govern.


Part 5 - Versioning Strategy for Data Contracts

Data contracts use semantic versioning (MAJOR.MINOR.PATCH) applied to data, not software. The rules are different from API versioning because consumers often cannot update immediately:

Change TypeVersion BumpExamplesNotice Period
Patch0.0.XFix typo in description, add example valueNone
Minor (backward-compatible)0.X.0Add a new optional column, relax a constraint, extend an enum7 days
Major (breaking)X.0.0Rename a column, change a type, remove a column, make nullable column required, change primary key30 days minimum

:::danger The Backward-Compatible Type Change Trap The most dangerous versioning mistake is treating a type change as a minor (backward-compatible) change because "it still holds the same information."

Changing user_id from int64 to string is a breaking change. Changing price from float32 to float64 may seem safe but silently breaks consumers that assume 32-bit precision. Changing event_date from a date type to timestamp looks like an upgrade but breaks joins against tables that still use date.

Rule: Any type change is a MAJOR version bump. No exceptions. The cost of a 30-day deprecation cycle is always lower than the cost of debugging a silent type coercion. :::

Here is how to implement version checking in the contract class:

from packaging.version import Version


def check_contract_compatibility(
current_contract: DataContract,
new_contract: DataContract,
) -> dict:
"""
Compare two contract versions and classify the change type.
Returns a dict with 'change_type' and 'violations' list.
"""
current_v = Version(current_contract.version)
new_v = Version(new_contract.version)

issues = []
change_type = "patch"

current_fields = current_contract.fields
new_fields = new_contract.fields

# Check for removed columns (breaking)
for col in current_fields:
if col not in new_fields:
issues.append(f"BREAKING: Column '{col}' removed.")
change_type = "major"

# Check for type changes (breaking)
for col in current_fields:
if col in new_fields:
old_type = current_fields[col].dtype
new_type = new_fields[col].dtype
if old_type != new_type:
issues.append(
f"BREAKING: Column '{col}' type changed "
f"from '{old_type}' to '{new_type}'."
)
change_type = "major"

# Check for new required columns (breaking for existing consumers)
for col, spec in new_fields.items():
if col not in current_fields and spec.required:
issues.append(
f"BREAKING: New required column '{col}' added "
f"- existing producers do not emit this field."
)
change_type = "major"

# Check for enum restriction (breaking - values that were valid are now rejected)
for col in current_fields:
if col in new_fields:
old_enum = set(current_fields[col].enum or [])
new_enum = set(new_fields[col].enum or [])
removed_values = old_enum - new_enum
if removed_values:
issues.append(
f"BREAKING: Enum values removed from '{col}': {removed_values}"
)
change_type = "major"

# New optional column → minor
for col, spec in new_fields.items():
if col not in current_fields and not spec.required:
if change_type == "patch":
change_type = "minor"

# Validate version bump matches change type
if change_type == "major" and new_v.major <= current_v.major:
issues.append(
f"ERROR: Breaking changes detected but version was not bumped to "
f"a new major version. Current: {current_v}, New: {new_v}"
)

return {"change_type": change_type, "issues": issues}

Part 6 - Contract Enforcement Patterns

Defining a contract in YAML creates no value. Enforcement creates value. There are four enforcement points in a real data pipeline, and each has a different role:

Pattern 1 - Pre-Commit Hook on Schema Changes

# .git/hooks/pre-commit (or pre-commit.yaml with pre-commit framework)
#!/bin/bash
# Validate any modified contract files before committing

changed_contracts=$(git diff --cached --name-only | grep ".contract.yaml")

for contract_file in $changed_contracts; do
python -m datacontract lint "$contract_file"
if [ $? -ne 0 ]; then
echo "Contract validation failed: $contract_file"
exit 1
fi

# Check backward compatibility against the version in main
git show main:"$contract_file" > /tmp/current_contract.yaml 2>/dev/null
if [ $? -eq 0 ]; then
python scripts/check_contract_compat.py \
/tmp/current_contract.yaml \
"$contract_file"
if [ $? -ne 0 ]; then
echo "Breaking change detected in $contract_file - bump major version and add deprecation notice."
exit 1
fi
fi
done

Pattern 2 - CI/CD Pipeline Validation

# .gitlab-ci.yml (or GitHub Actions equivalent)
validate-data-contracts:
stage: validate
image: python:3.11-slim
script:
- pip install datacontract-cli soda-core
- |
for f in contracts/**/*.contract.yaml; do
echo "Validating $f..."
datacontract lint "$f"
datacontract test "$f" --server staging
done
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- contracts/**/*.contract.yaml
- models/**/*.yaml

Pattern 3 - Consumer-Side Enforcement at Read Time

This is the most important pattern. Every pipeline that reads data should verify the contract at the point of consumption - not trust that the producer did the right thing.

# The reader contract - what the consumer guarantees it needs
REQUIRED_CONTRACT = {
"user_id": {"type": "string", "format": "uuid", "required": True},
"event_type": {
"type": "string",
"required": True,
"enum": ["page_view", "product_click", "add_to_cart", "purchase"],
},
"event_timestamp": {"type": "timestamp", "required": True},
}


def read_with_contract(
path: str,
contract_path: str,
timestamp_col: str = "event_timestamp",
) -> pd.DataFrame:
"""
Read data and enforce the contract before returning.
Fail fast at the pipeline boundary, not three steps later.
"""
contract = DataContract.from_yaml(contract_path)
df = pd.read_parquet(path)

# Enforce - raises ContractViolationError on failure
contract.enforce(df, timestamp_col=timestamp_col)

return df

Pattern 4 - Producer-Side Contract Publishing

Producers should publish their contract to a central registry as part of their deployment process. This makes all contracts discoverable:

# In the producer's deployment pipeline
import requests
import yaml


def publish_contract(contract_path: str, registry_url: str, api_key: str) -> None:
"""
Publish a contract to the central data contract registry.
Called from CI/CD on every merge to main.
"""
with open(contract_path) as f:
contract = yaml.safe_load(f)

response = requests.put(
f"{registry_url}/contracts/{contract['id']}",
json=contract,
headers={"Authorization": f"Bearer {api_key}"},
)
response.raise_for_status()
print(f"Published contract {contract['id']} v{contract['info']['version']}")


# In CI/CD (GitLab CI example):
# publish-contract:
# stage: deploy
# script: python scripts/publish_contract.py contracts/user_events.contract.yaml

Part 7 - Contract Violation Handling

When a violation is detected, three strategies are available. The choice depends on the severity and the pipeline context:

Here is the quarantine pattern implemented in Python:

import boto3
import json
from datetime import datetime


def ingest_with_quarantine(
df: pd.DataFrame,
contract: DataContract,
good_path: str,
quarantine_path: str,
timestamp_col: str = "event_timestamp",
) -> dict:
"""
Separate clean records from violating records.
Write clean records to the pipeline. Write violators to quarantine.

Returns a summary dict with counts.
"""
violations = contract.validate(df, timestamp_col=timestamp_col)

# Build a mask of which rows have violations
bad_row_indices: set[int] = set()

for violation in violations:
# For field-level violations, try to identify the specific rows
col = violation.field
if col in df.columns:
if violation.rule == "not_null":
bad_idx = df[df[col].isna()].index.tolist()
bad_row_indices.update(bad_idx)
elif violation.rule == "enum_values" and contract.fields.get(col):
enum_vals = contract.fields[col].enum
if enum_vals:
bad_idx = df[~df[col].isna() & ~df[col].isin(enum_vals)].index.tolist()
bad_row_indices.update(bad_idx)

bad_df = df.loc[list(bad_row_indices)]
good_df = df.drop(index=list(bad_row_indices))

# Write good records downstream
good_df.to_parquet(good_path, index=False)

# Write bad records to quarantine with metadata
if len(bad_df) > 0:
bad_df = bad_df.copy()
bad_df["_quarantine_reason"] = str([v.rule for v in violations])
bad_df["_quarantine_timestamp"] = datetime.utcnow().isoformat()
bad_df["_contract_id"] = contract.id
bad_df.to_parquet(quarantine_path, index=False)

# Alert
violation_rate = len(bad_df) / len(df) * 100
print(
f"[QUARANTINE] {len(bad_df)} records ({violation_rate:.2f}%) quarantined "
f"to {quarantine_path}. Violations: {[v.rule for v in violations]}"
)

return {
"total_records": len(df),
"clean_records": len(good_df),
"quarantined_records": len(bad_df),
"violation_rate_pct": round(len(bad_df) / len(df) * 100, 4),
"violations": [v.rule for v in violations],
}

Part 8 - Real Tools for Data Contracts

You do not need to build everything from scratch. A mature ecosystem exists:

ToolWhat It DoesBest For
Data Contract CLI (datacontract-cli)Lint, test, and publish contracts from YAMLTeams adopting the open-source spec
Soda CoreDeclarative quality checks with SodaCL, contract enforcement in CIPython-native pipelines with Airflow
dbt contractsSchema and constraint enforcement in the dbt transformation layerdbt-heavy warehouses (Snowflake, BigQuery, Databricks)
Monte CarloData observability platform - automated anomaly detection + contract monitoringEnterprise teams needing a managed SaaS solution
Atlan / DataHubData catalog + contract registry - tracks producers, consumers, and contract statusOrganizations needing discoverability at scale
Great ExpectationsExpectation suites for dataset-level validation - integrates with contracts as the quality specStandalone validation with rich HTML data docs

Data Contract CLI in Practice

# Install
pip install datacontract-cli

# Lint a contract YAML for syntax and semantic correctness
datacontract lint contracts/user_events.contract.yaml

# Test a contract against actual data
datacontract test contracts/user_events.contract.yaml \
--server production

# Export to HTML data catalog documentation
datacontract export contracts/user_events.contract.yaml \
--format html \
--output docs/contracts/user_events.html

# Check backward compatibility
datacontract diff \
contracts/user_events.v1.contract.yaml \
contracts/user_events.v2.contract.yaml

dbt Contracts (dbt 1.5+)

dbt introduced native contract support as of version 1.5. In schema.yml:

models:
- name: user_features
description: "User-level features for recommendation, refreshed hourly."
config:
contract:
enforced: true # dbt will validate column types on every run
columns:
- name: user_id
data_type: varchar
constraints:
- type: not_null
- type: unique
tests:
- not_null
- unique

- name: event_type
data_type: varchar
constraints:
- type: not_null
tests:
- not_null
- accepted_values:
values: [page_view, product_click, add_to_cart, purchase]

- name: computed_at
data_type: timestamp
constraints:
- type: not_null

When contract.enforced: true, dbt compares the actual column types in the database to the declared types in schema.yml and fails the run if they differ. This turns every dbt run into a contract enforcement step.


Part 9 - Production Engineering Notes

The Nullable: False Trap

:::danger The nullable: false Trap Declaring required: true (or nullable: false) on a column that becomes null in edge cases is the most common contract misconfiguration. When your contract says a field is required and a legitimate upstream event omits it (mobile app offline events, new user sign-ups without a session), your pipeline blocks on valid data.

Correct approach:

  1. Distinguish between "always required" fields (primary keys, event types) and "conditionally required" fields (session_id is null for mobile events).
  2. For conditionally required fields, use a conditional constraint: required: false with a custom quality check that validates: if platform != 'mobile' then session_id must not be null.
  3. Never use required: true as a shortcut for "this seems important." Every required constraint will eventually cause a production incident. :::

The Silent Type Coercion Trap

:::danger Silent Type Coercion Pandas and most SQL engines will silently coerce types when a column changes. intfloat succeeds in pandas but introduces NaN where 0 previously existed. stringint succeeds for numeric-looking strings and produces NaN for non-numeric ones.

The result: your pipeline does not fail. It continues producing output. But 5-20% of records now carry corrupted values that look valid.

Pattern to detect this:

def detect_coercion_failures(df: pd.DataFrame, column: str, target_type: type) -> pd.Series:
"""Find values that fail type coercion - these would become NaN silently."""
col = df[column].dropna()
coercion_failures = col[
col.apply(lambda x: pd.isna(pd.to_numeric(x, errors="coerce")))
if target_type in (int, float)
else col.apply(lambda x: False)
]
return coercion_failures

Always test type conversions explicitly. Do not trust that a type change is "just a widening." :::


Part 10 - Convincing Teams to Adopt Contracts

The technical case for data contracts is easy. The organizational case is harder. Upstream teams perceive contracts as constraints on their autonomy. Here is how to navigate this:

Start with the blast radius argument. Show the upstream team a diagram of every downstream system that depends on their data. When they can see that a schema change in their Kafka topic affects 12 downstream pipelines, they are more motivated to communicate changes formally.

Make the contract protect them too. Contracts define what consumers are allowed to depend on. If a consumer is not in the contract's declared downstream list, the producer owes them nothing. Contracts protect producers from being blamed for undocumented consumer assumptions.

Offer tooling, not process. "We need you to write a YAML file" is friction. "Here is a script that generates a contract YAML from your existing schema - it takes 5 minutes" is an offer of help.

Start with the most painful relationship. Find the upstream-downstream pair that has had the most production incidents. Instrument that relationship with a contract. Show the reduction in incidents. The story sells itself.


Interview Questions

Q1: What is a data contract, and how does it differ from a database schema?

A database schema is a structural declaration: column names, types, and nullability. It describes the shape of data at a point in time.

A data contract is a broader, machine-readable agreement between a data producer and its consumers. It includes the schema, but adds:

  • Semantic constraints: valid values, business rules, conditional nullability
  • Operational SLAs: freshness requirements, availability guarantees
  • Ownership: which team owns the data, who is on-call when it breaks
  • Versioning: what constitutes a breaking change, how much notice is required
  • Consumer registry: which downstream systems are allowed to depend on this data

The critical difference: a schema change can happen silently in a database migration. A contract change requires a version bump, a notice period, and (for breaking changes) explicit consumer sign-off.

A schema says "the data looks like this." A contract says "the data looks like this, means this, will always be this fresh, and will change in these ways with this much notice."

Q2: How do you handle backward compatibility when versioning a data contract?

Use semantic versioning with data-specific rules:

  • Patch (0.0.X): No structural or semantic changes - only documentation, examples, or description fixes. No consumer action required.
  • Minor (0.X.0): Backward-compatible additions - new optional columns, relaxed constraints, extended enums. Consumers may adopt the new features but are not required to. 7-day notice.
  • Major (X.0.0): Breaking changes - column renames, type changes, removals, making optional columns required, restricting enums. All consumers must be notified and given time to adapt. 30-day minimum notice.

The key rule: a type change is always major. Even "widening" type changes (int → long, float32 → float64) are breaking because consumers may have downstream casts or storage assumptions.

In practice, implement this with:

  1. A check_contract_compat.py script that runs in the pre-commit hook and CI
  2. A contract registry where consumers register their dependency on a contract version
  3. A deprecation webhook that notifies all registered consumers when a major version is published
Q3: An upstream team wants to rename a column. They say it is a "minor" change because the data is the same. How do you respond?

A rename is a breaking change, not a minor one. It is MAJOR version bump territory.

Here is why: every consumer that reads the old column name will either throw an error (if they access it directly) or silently receive null/KeyError (if they use .get() or pandas column selection with defaults). The data is the same, but the interface has changed - and interface changes break consumers.

The correct process:

  1. The new column name is added as a new optional column in the current major version (minor bump).
  2. Both old and new column names are emitted for the duration of the deprecation period (30 days).
  3. All registered consumers update their code to use the new column name.
  4. The old column is removed in the next major version, after all consumers have migrated.
  5. The major version bump is published with a "removed: old_column_name" change log.

This dual-emit pattern is the data equivalent of a blue/green deployment - it allows zero-downtime migration across a breaking interface change.

Q4: How do you convince an upstream team to adopt data contracts when they see it as extra work?

The mistake is framing contracts as work the upstream team does for the benefit of others. Reframe it as work they do for their own protection.

The protection argument: A contract defines the boundary of what downstream consumers are allowed to depend on. Anything not in the contract is not a guaranteed interface. If a downstream team builds a dependency on an undocumented behavior, and the upstream team changes it, the upstream team is not responsible. Without a contract, every undocumented field is an implicit promise - and implicit promises get violated and create blame.

The practical argument: Show them how many times they were pinged in Slack about data questions in the last quarter. Contracts answer those questions automatically, in a machine-readable format that tooling can query.

The low-friction argument: Offer to write the first contract for them, generated from their existing schema. A contract generator that produces 80% of the YAML automatically - the team only needs to add descriptions, owners, and SLAs.

The incident argument: Find a recent incident where a schema change by this team broke a downstream pipeline. Show the timeline of the impact, the debugging cost, the engineering hours lost. Then show how a contract with a 30-day deprecation window would have prevented it.

Most teams agree in principle. The remaining resistance is friction. Removing friction - tooling, templates, contract generators - is the fastest path to adoption.

Q5: How would you implement data contract enforcement in a Kafka-based streaming pipeline?

In a Kafka pipeline, the enforcement point is the consumer - every pipeline that reads from the topic should validate the contract before processing each batch.

Step 1 - Use a schema registry for structural enforcement. The Confluent Schema Registry with Avro or Protobuf enforces column names and types at the Kafka serialization layer. Producers cannot emit messages that violate the registered schema. This is the structural layer of the contract.

Step 2 - Add semantic enforcement at the consumer. After deserializing from Kafka, validate the business rules from the contract - enum values, conditional nullability, range checks. Do this in the first Flink or Spark operator, before any stateful computation.

# In Flink: contract enforcement as a flat_map operator
class ContractEnforcer(FlatMapFunction):
def __init__(self, contract: DataContract):
self.contract = contract
self.quarantine_sink = get_quarantine_sink()

def flat_map(self, record: dict, collector):
violations = self.contract.validate_record(record)
if violations:
# Route to quarantine, do not forward to main pipeline
self.quarantine_sink.send(record | {"violations": str(violations)})
else:
collector.collect(record)

Step 3 - Monitor contract compliance metrics in Prometheus/Grafana. Export a counter for each violation type. Alert when the violation rate crosses a threshold (e.g., > 0.1% of messages in a 5-minute window).

Step 4 - Use consumer group lag as a freshness proxy. If the consumer group lag grows, the producer is not keeping up with the freshness SLA in the contract. Alert on lag rather than waiting for downstream effects.

Q6: What is the difference between a data contract and a data catalog?

A data catalog is a discovery tool - it answers "what data exists, where is it, and what does it contain?" It is primarily read-only documentation.

A data contract is an enforcement tool - it answers "what do I guarantee about this data, and what happens when I violate that guarantee?" It is an active, machine-enforced agreement.

The relationship: a good data catalog imports and displays contracts. Contracts provide the machine-readable content (schema, SLAs, owners) that populates the catalog. But a catalog without contracts is documentation that may be wrong. Contracts without a catalog are enforcement without discoverability.

In practice: use a catalog (DataHub, Atlan, OpenMetadata) as the UI layer that makes contracts discoverable. Use the contract YAML as the source of truth that the catalog ingests. This separation means the contract file is always authoritative - the catalog is a view on top of it.


Quick Reference

ConceptDefinitionWhy It Matters
Data contractMachine-readable agreement on schema, semantics, SLAs, and ownershipMakes silent breaking changes into loud, catchable violations
Breaking changeAny change that could cause a correct consumer to fail or produce wrong outputRequires major version bump + notice period
Non-breaking changeAdding optional fields, relaxing constraints, extending enumsMinor version bump, no consumer action required
Contract registryCentral store of all published contracts and their versionsEnables discoverability and consumer notification
QuarantineRouting invalid records to a dead-letter store instead of failing the pipelineAllows clean records to continue while bad records are investigated
Fail-fastRaising an exception immediately on contract violationCorrect for critical fields where downstream corruption is worse than pipeline downtime

Key Takeaways

  • A database schema describes structure. A data contract adds semantics, SLAs, and ownership.
  • Every type change is a breaking change. No exceptions.
  • Enforce contracts at the consumer, not just the producer - never trust that the upstream team did the right thing.
  • The organizational case for contracts is stronger than the technical case: contracts protect producers from undocumented consumer assumptions.
  • Use the open-source data contract YAML specification as a standard - it integrates with Data Contract CLI, Soda Core, and dbt.
  • Quarantine invalid records rather than failing the entire pipeline - except for critical fields where partial data is worse than no data.

Next: Great Expectations →

© 2026 EngineersOfAI. All rights reserved.