:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Quality Checks demo on the EngineersOfAI Playground - no code required. :::
Data Contracts - The API Contract for Data Pipelines
Reading time: ~24 minutes | Level: Data Engineering → AI Systems
The upstream team changed the user_id field from int to string. They had a good reason: the system was migrating from sequential integer IDs to GUIDs to support multi-region sharding. The change was backward compatible in their view - short GUIDs look like numbers. They updated their README. They sent a Slack message to #data-infra. They did not update any machine-readable contract.
Three downstream ML pipelines picked up the change silently. Each one cast the new string IDs using int(). Short GUIDs like "12847" parsed correctly for about two weeks. Then the system started generating GUIDs with alphabetic characters. The casts returned NaN. The feature pipelines did not fail - they propagated NaN into the feature store. The models trained on the next cycle included two months of poisoned user_id join keys, producing features that matched nothing at serving time.
No exception was raised. No alert fired. The model's offline metrics looked fine - the training set was internally consistent. The degradation appeared in a monthly business review, attributed to "seasonal factors." It took a senior engineer three days of log archaeology to find the field type change.
This is the canonical data contract failure. It happens because data teams treat data like a database table - something that can change quietly - rather than like an API - something with a contract, a version, and a breaking-change policy.
Data contracts are the solution. They are machine-readable, versioned agreements between a data producer and every consumer. They specify what the data contains, what it means, how fresh it will be, and who owns it when it breaks. They do not prevent all failures. But they make violations loud and immediate rather than silent and delayed.
What You Will Learn
- What a data contract is - and what distinguishes it from a database schema
- The open-source data contract specification (YAML structure)
- How to implement contract validation in Python - programmatic enforcement
- Contract lifecycle: define → publish → validate → monitor → evolve
- Versioning strategy for breaking vs. non-breaking changes
- Enforcement patterns across CI/CD pipelines and at read time
- Real tools: Data Contract CLI, Soda Core, dbt contracts, Monte Carlo
- Interview questions and how to answer "how do you convince upstream teams to adopt contracts"
Prerequisites
- Lesson 01 - Data Quality Dimensions (understand the five dimensions)
- Basic Python: classes, dataclasses, type annotations
- Familiarity with YAML syntax
- Awareness that ML pipelines have upstream/downstream dependency structures
Part 1 - Why Schemas Alone Are Not Enough
The most common response to the user_id type change story is: "Just use schema validation." It is a reasonable instinct. The problem is that schema validation only catches structural problems.
A schema tells you:
- The column exists
- The column has a specific data type
- The column may or may not be nullable
A schema does not tell you:
- What the column means (is it a stable join key, or does it recycle IDs after deletion?)
- What values are semantically valid (not just structurally typed)
- How fresh the data should be when it arrives
- Who is responsible when it breaks
- What downstream systems are allowed to depend on it
- When and how it is allowed to change
This is the gap between a schema and a contract. Schema is syntax. A contract adds semantics, operational guarantees, and ownership.
:::note Historical Context The concept of data contracts gained formal momentum around 2022 when Chad Sanderson (then at Convoy) articulated the "data contract" pattern as a solution to the coordination problem in data mesh architectures. Andrew Jones at PayPal formalized the open-source YAML specification in 2023. The insight was simple but transformative: data pipelines should treat data interfaces the same way software engineers treat API interfaces - with explicit, versioned, machine-enforceable contracts. :::
Part 2 - The Data Contract Specification
The open-source data contract specification defines a standard YAML structure that any tool or team can use. Here is the full structure for a production-grade contract:
# data-contracts/user_events.contract.yaml
dataContractSpecification: 0.9.3
id: urn:datacontract:user-events:v2
info:
title: User Events
version: 2.1.0
status: active
description: >
Clickstream events for all authenticated users.
Produced by the web tracking service. Primary consumer:
recommendation engine feature pipeline.
owner: platform-data-team
contact:
name: Platform Data Team
url: https://wiki.internal/teams/platform-data
slackChannel: "#platform-data-oncall"
servers:
production:
type: kafka
host: kafka.prod.company.com:9092
topic: user.events.v2
format: avro
schemaRegistry: https://schema-registry.prod.company.com
staging:
type: kafka
host: kafka.staging.company.com:9092
topic: user.events.v2.staging
format: avro
terms:
usage: >
Consumers may use this dataset for ML feature computation,
analytics, and product experimentation. Real-time consumer
decisions (fraud, content moderation) are prohibited without
explicit approval from the data governance board.
limitations: >
Data is retained for 90 days in the Kafka topic.
Historical data is available in S3 at s3://data/user-events/.
billing: Internal - no chargeback.
noticePeriod: P30D # 30 days notice before breaking changes
models:
user_event:
description: A single user interaction event.
type: object
fields:
event_id:
description: Globally unique event identifier (UUID v4).
type: string
format: uuid
required: true
unique: true
example: "550e8400-e29b-41d4-a716-446655440000"
user_id:
description: >
Stable user identifier. Assigned at registration and
never reused. Encoded as UUID v4 string since 2024-01.
Legacy integer IDs were migrated; all IDs are now UUIDs.
type: string
format: uuid
required: true
pii: true
classification: internal
event_type:
description: The user action that generated this event.
type: string
required: true
enum:
- page_view
- product_click
- add_to_cart
- purchase
- search
- review_submitted
event_timestamp:
description: >
UTC timestamp of the event as recorded by the client.
Monotonically increasing per user_id within a session.
type: timestamp
required: true
format: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
session_id:
description: Groups events into a contiguous browsing session.
type: string
required: false # null for mobile app events pre-2024
example: "sess_8xK2mNpQrL"
properties:
description: Event-specific metadata as key-value pairs.
type: object
required: false
quality:
type: SodaCL
specification:
checks for user_event:
- row_count > 10000:
name: Minimum event volume (alerts on pipeline failure)
- duplicate_count(event_id) = 0:
name: Event IDs must be globally unique
- missing_count(user_id) = 0:
name: All events must have a user_id
- missing_count(event_type) = 0:
name: Event type is required
- invalid_count(event_type) = 0:
valid values: [page_view, product_click, add_to_cart, purchase, search, review_submitted]
name: Event type must be in approved enum
- freshness(event_timestamp) < 10m:
name: Events must arrive within 10 minutes of occurrence
servicelevels:
availability:
description: Kafka topic availability
percentage: "99.9%"
retention:
description: Data available in Kafka
period: P90D
freshness:
description: Maximum lag between event occurrence and topic availability
threshold: PT10M # ISO 8601 duration: 10 minutes
completeness:
description: Percentage of events that arrive without data loss
percentage: "99.5%"
This YAML is not documentation. It is a machine-readable specification that tooling can parse, validate against, and monitor in production.
Part 3 - Implementing Contract Validation in Python
The YAML contract is useful as a specification. But contracts only create value when they are enforced - programmatically, automatically, and loudly when violated.
Here is a production-grade DataContract class that parses the YAML and enforces it:
from __future__ import annotations
import re
import yaml
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any
import pandas as pd
@dataclass
class FieldSpec:
name: str
dtype: str
required: bool
unique: bool = False
enum: list[str] | None = None
format: str | None = None
min_value: float | None = None
max_value: float | None = None
description: str = ""
@dataclass
class QualitySpec:
min_row_count: int = 0
max_freshness_minutes: int | None = None
timestamp_column: str | None = None
@dataclass
class ContractViolation:
field: str
rule: str
severity: str # "critical" or "warning"
message: str
sample_values: list = field(default_factory=list)
class DataContract:
"""
Parses a data contract YAML and validates a DataFrame against it.
Usage:
contract = DataContract.from_yaml("contracts/user_events.contract.yaml")
violations = contract.validate(df)
contract.enforce(df) # raises ContractViolationError if critical violations exist
"""
def __init__(self, spec: dict):
self.spec = spec
self.id = spec.get("id", "unknown")
self.version = spec.get("info", {}).get("version", "0.0.0")
self.title = spec.get("info", {}).get("title", "Untitled Contract")
self._parse_fields()
self._parse_quality()
@classmethod
def from_yaml(cls, path: str) -> "DataContract":
with open(path) as f:
spec = yaml.safe_load(f)
return cls(spec)
def _parse_fields(self):
self.fields: dict[str, FieldSpec] = {}
models = self.spec.get("models", {})
for model_name, model_def in models.items():
for col_name, col_def in model_def.get("fields", {}).items():
self.fields[col_name] = FieldSpec(
name=col_name,
dtype=col_def.get("type", "string"),
required=col_def.get("required", False),
unique=col_def.get("unique", False),
enum=col_def.get("enum"),
format=col_def.get("format"),
min_value=col_def.get("minimum"),
max_value=col_def.get("maximum"),
description=col_def.get("description", ""),
)
def _parse_quality(self):
sla = self.spec.get("servicelevels", {})
freshness_str = sla.get("freshness", {}).get("threshold", None)
self.quality = QualitySpec(
max_freshness_minutes=self._parse_iso_duration_minutes(freshness_str),
)
@staticmethod
def _parse_iso_duration_minutes(duration_str: str | None) -> int | None:
"""Parse ISO 8601 duration like PT10M → 10, PT2H → 120."""
if not duration_str:
return None
match = re.match(r"PT?(\d+)([HM])", duration_str)
if not match:
return None
value, unit = int(match.group(1)), match.group(2)
return value * 60 if unit == "H" else value
def validate(self, df: pd.DataFrame, timestamp_col: str | None = None) -> list[ContractViolation]:
"""
Validate a DataFrame against the contract.
Returns a list of ContractViolation objects.
"""
violations: list[ContractViolation] = []
# 1. Schema - required columns present
for col_name, field_spec in self.fields.items():
if field_spec.required and col_name not in df.columns:
violations.append(ContractViolation(
field=col_name,
rule="required_column_present",
severity="critical",
message=f"Required column '{col_name}' is missing from the dataset.",
))
continue
if col_name not in df.columns:
continue
col = df[col_name]
# 2. Nullability
if field_spec.required:
null_count = col.isna().sum()
if null_count > 0:
violations.append(ContractViolation(
field=col_name,
rule="not_null",
severity="critical",
message=f"'{col_name}' has {null_count} null values ({null_count/len(df):.1%}).",
sample_values=df[col.isna()][col_name].head(3).tolist(),
))
# 3. Enum validation
if field_spec.enum:
invalid_mask = ~col.isna() & ~col.isin(field_spec.enum)
invalid_count = invalid_mask.sum()
if invalid_count > 0:
violations.append(ContractViolation(
field=col_name,
rule="enum_values",
severity="critical",
message=(
f"'{col_name}' has {invalid_count} values not in approved enum "
f"{field_spec.enum}."
),
sample_values=col[invalid_mask].unique()[:5].tolist(),
))
# 4. Uniqueness
if field_spec.unique:
dupe_count = col.duplicated().sum()
if dupe_count > 0:
violations.append(ContractViolation(
field=col_name,
rule="unique",
severity="critical",
message=f"'{col_name}' has {dupe_count} duplicate values.",
sample_values=col[col.duplicated(keep=False)].head(3).tolist(),
))
# 5. UUID format check
if field_spec.format == "uuid":
non_null = col.dropna()
def is_valid_uuid(v):
try:
uuid.UUID(str(v))
return True
except ValueError:
return False
invalid_uuids = ~non_null.apply(is_valid_uuid)
if invalid_uuids.sum() > 0:
violations.append(ContractViolation(
field=col_name,
rule="uuid_format",
severity="critical",
message=f"'{col_name}' has {invalid_uuids.sum()} values that are not valid UUIDs.",
sample_values=non_null[invalid_uuids].head(3).tolist(),
))
# 6. Freshness check
ts_col = timestamp_col or self.quality.timestamp_column
max_fresh = self.quality.max_freshness_minutes
if ts_col and max_fresh and ts_col in df.columns:
latest_ts = pd.to_datetime(df[ts_col]).max()
if latest_ts.tzinfo is None:
latest_ts = latest_ts.replace(tzinfo=timezone.utc)
now = datetime.now(tz=timezone.utc)
staleness_min = (now - latest_ts).total_seconds() / 60
if staleness_min > max_fresh:
violations.append(ContractViolation(
field=ts_col,
rule="freshness",
severity="critical",
message=(
f"Data is {staleness_min:.0f} minutes stale "
f"(SLA: max {max_fresh} minutes). "
f"Latest record: {latest_ts.isoformat()}"
),
))
return violations
def enforce(self, df: pd.DataFrame, timestamp_col: str | None = None) -> None:
"""
Validate and raise ContractViolationError if any critical violations exist.
Call this at pipeline ingestion boundaries.
"""
violations = self.validate(df, timestamp_col=timestamp_col)
critical = [v for v in violations if v.severity == "critical"]
warnings = [v for v in violations if v.severity == "warning"]
for w in warnings:
print(f"[CONTRACT WARNING] {w.field} / {w.rule}: {w.message}")
if critical:
lines = [f"Data contract '{self.id}' (v{self.version}) violated:"]
for v in critical:
lines.append(f" [CRITICAL] {v.field} / {v.rule}: {v.message}")
if v.sample_values:
lines.append(f" Sample values: {v.sample_values}")
raise ContractViolationError("\n".join(lines))
print(f"[CONTRACT OK] '{self.title}' v{self.version} - {len(df)} rows validated.")
class ContractViolationError(Exception):
"""Raised when a critical data contract violation is detected."""
pass
Now here is how you use it in a production Airflow task:
# dags/feature_pipeline/user_events_task.py
from airflow.decorators import task
from data_contracts import DataContract, ContractViolationError
import pandas as pd
@task
def validate_and_ingest_user_events(s3_path: str, **context) -> str:
"""
Reads raw events from S3, enforces the data contract,
and writes validated data to the feature store.
Fails the Airflow task (not just logs) on contract violation.
"""
contract = DataContract.from_yaml(
"/contracts/user_events.contract.yaml"
)
df = pd.read_parquet(s3_path)
try:
contract.enforce(df, timestamp_col="event_timestamp")
except ContractViolationError as e:
# This raises → Airflow marks task as FAILED → triggers alert
raise RuntimeError(
f"Contract validation failed for {s3_path}:\n{e}"
) from e
# Contract passed - safe to write to feature store
write_to_feature_store(df, table="user_events")
return f"Ingested {len(df)} events from {s3_path}"
Part 4 - The Data Contract Lifecycle
Contracts are not static documents. They have a lifecycle that mirrors the lifecycle of the data they govern.
Part 5 - Versioning Strategy for Data Contracts
Data contracts use semantic versioning (MAJOR.MINOR.PATCH) applied to data, not software. The rules are different from API versioning because consumers often cannot update immediately:
| Change Type | Version Bump | Examples | Notice Period |
|---|---|---|---|
| Patch | 0.0.X | Fix typo in description, add example value | None |
| Minor (backward-compatible) | 0.X.0 | Add a new optional column, relax a constraint, extend an enum | 7 days |
| Major (breaking) | X.0.0 | Rename a column, change a type, remove a column, make nullable column required, change primary key | 30 days minimum |
:::danger The Backward-Compatible Type Change Trap The most dangerous versioning mistake is treating a type change as a minor (backward-compatible) change because "it still holds the same information."
Changing user_id from int64 to string is a breaking change. Changing price from float32 to float64 may seem safe but silently breaks consumers that assume 32-bit precision. Changing event_date from a date type to timestamp looks like an upgrade but breaks joins against tables that still use date.
Rule: Any type change is a MAJOR version bump. No exceptions. The cost of a 30-day deprecation cycle is always lower than the cost of debugging a silent type coercion. :::
Here is how to implement version checking in the contract class:
from packaging.version import Version
def check_contract_compatibility(
current_contract: DataContract,
new_contract: DataContract,
) -> dict:
"""
Compare two contract versions and classify the change type.
Returns a dict with 'change_type' and 'violations' list.
"""
current_v = Version(current_contract.version)
new_v = Version(new_contract.version)
issues = []
change_type = "patch"
current_fields = current_contract.fields
new_fields = new_contract.fields
# Check for removed columns (breaking)
for col in current_fields:
if col not in new_fields:
issues.append(f"BREAKING: Column '{col}' removed.")
change_type = "major"
# Check for type changes (breaking)
for col in current_fields:
if col in new_fields:
old_type = current_fields[col].dtype
new_type = new_fields[col].dtype
if old_type != new_type:
issues.append(
f"BREAKING: Column '{col}' type changed "
f"from '{old_type}' to '{new_type}'."
)
change_type = "major"
# Check for new required columns (breaking for existing consumers)
for col, spec in new_fields.items():
if col not in current_fields and spec.required:
issues.append(
f"BREAKING: New required column '{col}' added "
f"- existing producers do not emit this field."
)
change_type = "major"
# Check for enum restriction (breaking - values that were valid are now rejected)
for col in current_fields:
if col in new_fields:
old_enum = set(current_fields[col].enum or [])
new_enum = set(new_fields[col].enum or [])
removed_values = old_enum - new_enum
if removed_values:
issues.append(
f"BREAKING: Enum values removed from '{col}': {removed_values}"
)
change_type = "major"
# New optional column → minor
for col, spec in new_fields.items():
if col not in current_fields and not spec.required:
if change_type == "patch":
change_type = "minor"
# Validate version bump matches change type
if change_type == "major" and new_v.major <= current_v.major:
issues.append(
f"ERROR: Breaking changes detected but version was not bumped to "
f"a new major version. Current: {current_v}, New: {new_v}"
)
return {"change_type": change_type, "issues": issues}
Part 6 - Contract Enforcement Patterns
Defining a contract in YAML creates no value. Enforcement creates value. There are four enforcement points in a real data pipeline, and each has a different role:
Pattern 1 - Pre-Commit Hook on Schema Changes
# .git/hooks/pre-commit (or pre-commit.yaml with pre-commit framework)
#!/bin/bash
# Validate any modified contract files before committing
changed_contracts=$(git diff --cached --name-only | grep ".contract.yaml")
for contract_file in $changed_contracts; do
python -m datacontract lint "$contract_file"
if [ $? -ne 0 ]; then
echo "Contract validation failed: $contract_file"
exit 1
fi
# Check backward compatibility against the version in main
git show main:"$contract_file" > /tmp/current_contract.yaml 2>/dev/null
if [ $? -eq 0 ]; then
python scripts/check_contract_compat.py \
/tmp/current_contract.yaml \
"$contract_file"
if [ $? -ne 0 ]; then
echo "Breaking change detected in $contract_file - bump major version and add deprecation notice."
exit 1
fi
fi
done
Pattern 2 - CI/CD Pipeline Validation
# .gitlab-ci.yml (or GitHub Actions equivalent)
validate-data-contracts:
stage: validate
image: python:3.11-slim
script:
- pip install datacontract-cli soda-core
- |
for f in contracts/**/*.contract.yaml; do
echo "Validating $f..."
datacontract lint "$f"
datacontract test "$f" --server staging
done
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
changes:
- contracts/**/*.contract.yaml
- models/**/*.yaml
Pattern 3 - Consumer-Side Enforcement at Read Time
This is the most important pattern. Every pipeline that reads data should verify the contract at the point of consumption - not trust that the producer did the right thing.
# The reader contract - what the consumer guarantees it needs
REQUIRED_CONTRACT = {
"user_id": {"type": "string", "format": "uuid", "required": True},
"event_type": {
"type": "string",
"required": True,
"enum": ["page_view", "product_click", "add_to_cart", "purchase"],
},
"event_timestamp": {"type": "timestamp", "required": True},
}
def read_with_contract(
path: str,
contract_path: str,
timestamp_col: str = "event_timestamp",
) -> pd.DataFrame:
"""
Read data and enforce the contract before returning.
Fail fast at the pipeline boundary, not three steps later.
"""
contract = DataContract.from_yaml(contract_path)
df = pd.read_parquet(path)
# Enforce - raises ContractViolationError on failure
contract.enforce(df, timestamp_col=timestamp_col)
return df
Pattern 4 - Producer-Side Contract Publishing
Producers should publish their contract to a central registry as part of their deployment process. This makes all contracts discoverable:
# In the producer's deployment pipeline
import requests
import yaml
def publish_contract(contract_path: str, registry_url: str, api_key: str) -> None:
"""
Publish a contract to the central data contract registry.
Called from CI/CD on every merge to main.
"""
with open(contract_path) as f:
contract = yaml.safe_load(f)
response = requests.put(
f"{registry_url}/contracts/{contract['id']}",
json=contract,
headers={"Authorization": f"Bearer {api_key}"},
)
response.raise_for_status()
print(f"Published contract {contract['id']} v{contract['info']['version']}")
# In CI/CD (GitLab CI example):
# publish-contract:
# stage: deploy
# script: python scripts/publish_contract.py contracts/user_events.contract.yaml
Part 7 - Contract Violation Handling
When a violation is detected, three strategies are available. The choice depends on the severity and the pipeline context:
Here is the quarantine pattern implemented in Python:
import boto3
import json
from datetime import datetime
def ingest_with_quarantine(
df: pd.DataFrame,
contract: DataContract,
good_path: str,
quarantine_path: str,
timestamp_col: str = "event_timestamp",
) -> dict:
"""
Separate clean records from violating records.
Write clean records to the pipeline. Write violators to quarantine.
Returns a summary dict with counts.
"""
violations = contract.validate(df, timestamp_col=timestamp_col)
# Build a mask of which rows have violations
bad_row_indices: set[int] = set()
for violation in violations:
# For field-level violations, try to identify the specific rows
col = violation.field
if col in df.columns:
if violation.rule == "not_null":
bad_idx = df[df[col].isna()].index.tolist()
bad_row_indices.update(bad_idx)
elif violation.rule == "enum_values" and contract.fields.get(col):
enum_vals = contract.fields[col].enum
if enum_vals:
bad_idx = df[~df[col].isna() & ~df[col].isin(enum_vals)].index.tolist()
bad_row_indices.update(bad_idx)
bad_df = df.loc[list(bad_row_indices)]
good_df = df.drop(index=list(bad_row_indices))
# Write good records downstream
good_df.to_parquet(good_path, index=False)
# Write bad records to quarantine with metadata
if len(bad_df) > 0:
bad_df = bad_df.copy()
bad_df["_quarantine_reason"] = str([v.rule for v in violations])
bad_df["_quarantine_timestamp"] = datetime.utcnow().isoformat()
bad_df["_contract_id"] = contract.id
bad_df.to_parquet(quarantine_path, index=False)
# Alert
violation_rate = len(bad_df) / len(df) * 100
print(
f"[QUARANTINE] {len(bad_df)} records ({violation_rate:.2f}%) quarantined "
f"to {quarantine_path}. Violations: {[v.rule for v in violations]}"
)
return {
"total_records": len(df),
"clean_records": len(good_df),
"quarantined_records": len(bad_df),
"violation_rate_pct": round(len(bad_df) / len(df) * 100, 4),
"violations": [v.rule for v in violations],
}
Part 8 - Real Tools for Data Contracts
You do not need to build everything from scratch. A mature ecosystem exists:
| Tool | What It Does | Best For |
|---|---|---|
Data Contract CLI (datacontract-cli) | Lint, test, and publish contracts from YAML | Teams adopting the open-source spec |
| Soda Core | Declarative quality checks with SodaCL, contract enforcement in CI | Python-native pipelines with Airflow |
| dbt contracts | Schema and constraint enforcement in the dbt transformation layer | dbt-heavy warehouses (Snowflake, BigQuery, Databricks) |
| Monte Carlo | Data observability platform - automated anomaly detection + contract monitoring | Enterprise teams needing a managed SaaS solution |
| Atlan / DataHub | Data catalog + contract registry - tracks producers, consumers, and contract status | Organizations needing discoverability at scale |
| Great Expectations | Expectation suites for dataset-level validation - integrates with contracts as the quality spec | Standalone validation with rich HTML data docs |
Data Contract CLI in Practice
# Install
pip install datacontract-cli
# Lint a contract YAML for syntax and semantic correctness
datacontract lint contracts/user_events.contract.yaml
# Test a contract against actual data
datacontract test contracts/user_events.contract.yaml \
--server production
# Export to HTML data catalog documentation
datacontract export contracts/user_events.contract.yaml \
--format html \
--output docs/contracts/user_events.html
# Check backward compatibility
datacontract diff \
contracts/user_events.v1.contract.yaml \
contracts/user_events.v2.contract.yaml
dbt Contracts (dbt 1.5+)
dbt introduced native contract support as of version 1.5. In schema.yml:
models:
- name: user_features
description: "User-level features for recommendation, refreshed hourly."
config:
contract:
enforced: true # dbt will validate column types on every run
columns:
- name: user_id
data_type: varchar
constraints:
- type: not_null
- type: unique
tests:
- not_null
- unique
- name: event_type
data_type: varchar
constraints:
- type: not_null
tests:
- not_null
- accepted_values:
values: [page_view, product_click, add_to_cart, purchase]
- name: computed_at
data_type: timestamp
constraints:
- type: not_null
When contract.enforced: true, dbt compares the actual column types in the database to the declared types in schema.yml and fails the run if they differ. This turns every dbt run into a contract enforcement step.
Part 9 - Production Engineering Notes
The Nullable: False Trap
:::danger The nullable: false Trap
Declaring required: true (or nullable: false) on a column that becomes null in edge cases is the most common contract misconfiguration. When your contract says a field is required and a legitimate upstream event omits it (mobile app offline events, new user sign-ups without a session), your pipeline blocks on valid data.
Correct approach:
- Distinguish between "always required" fields (primary keys, event types) and "conditionally required" fields (session_id is null for mobile events).
- For conditionally required fields, use a conditional constraint:
required: falsewith a custom quality check that validates: ifplatform != 'mobile'thensession_idmust not be null. - Never use
required: trueas a shortcut for "this seems important." Every required constraint will eventually cause a production incident. :::
The Silent Type Coercion Trap
:::danger Silent Type Coercion
Pandas and most SQL engines will silently coerce types when a column changes. int → float succeeds in pandas but introduces NaN where 0 previously existed. string → int succeeds for numeric-looking strings and produces NaN for non-numeric ones.
The result: your pipeline does not fail. It continues producing output. But 5-20% of records now carry corrupted values that look valid.
Pattern to detect this:
def detect_coercion_failures(df: pd.DataFrame, column: str, target_type: type) -> pd.Series:
"""Find values that fail type coercion - these would become NaN silently."""
col = df[column].dropna()
coercion_failures = col[
col.apply(lambda x: pd.isna(pd.to_numeric(x, errors="coerce")))
if target_type in (int, float)
else col.apply(lambda x: False)
]
return coercion_failures
Always test type conversions explicitly. Do not trust that a type change is "just a widening." :::
Part 10 - Convincing Teams to Adopt Contracts
The technical case for data contracts is easy. The organizational case is harder. Upstream teams perceive contracts as constraints on their autonomy. Here is how to navigate this:
Start with the blast radius argument. Show the upstream team a diagram of every downstream system that depends on their data. When they can see that a schema change in their Kafka topic affects 12 downstream pipelines, they are more motivated to communicate changes formally.
Make the contract protect them too. Contracts define what consumers are allowed to depend on. If a consumer is not in the contract's declared downstream list, the producer owes them nothing. Contracts protect producers from being blamed for undocumented consumer assumptions.
Offer tooling, not process. "We need you to write a YAML file" is friction. "Here is a script that generates a contract YAML from your existing schema - it takes 5 minutes" is an offer of help.
Start with the most painful relationship. Find the upstream-downstream pair that has had the most production incidents. Instrument that relationship with a contract. Show the reduction in incidents. The story sells itself.
Interview Questions
Q1: What is a data contract, and how does it differ from a database schema?
A database schema is a structural declaration: column names, types, and nullability. It describes the shape of data at a point in time.
A data contract is a broader, machine-readable agreement between a data producer and its consumers. It includes the schema, but adds:
- Semantic constraints: valid values, business rules, conditional nullability
- Operational SLAs: freshness requirements, availability guarantees
- Ownership: which team owns the data, who is on-call when it breaks
- Versioning: what constitutes a breaking change, how much notice is required
- Consumer registry: which downstream systems are allowed to depend on this data
The critical difference: a schema change can happen silently in a database migration. A contract change requires a version bump, a notice period, and (for breaking changes) explicit consumer sign-off.
A schema says "the data looks like this." A contract says "the data looks like this, means this, will always be this fresh, and will change in these ways with this much notice."
Q2: How do you handle backward compatibility when versioning a data contract?
Use semantic versioning with data-specific rules:
- Patch (0.0.X): No structural or semantic changes - only documentation, examples, or description fixes. No consumer action required.
- Minor (0.X.0): Backward-compatible additions - new optional columns, relaxed constraints, extended enums. Consumers may adopt the new features but are not required to. 7-day notice.
- Major (X.0.0): Breaking changes - column renames, type changes, removals, making optional columns required, restricting enums. All consumers must be notified and given time to adapt. 30-day minimum notice.
The key rule: a type change is always major. Even "widening" type changes (int → long, float32 → float64) are breaking because consumers may have downstream casts or storage assumptions.
In practice, implement this with:
- A
check_contract_compat.pyscript that runs in the pre-commit hook and CI - A contract registry where consumers register their dependency on a contract version
- A deprecation webhook that notifies all registered consumers when a major version is published
Q3: An upstream team wants to rename a column. They say it is a "minor" change because the data is the same. How do you respond?
A rename is a breaking change, not a minor one. It is MAJOR version bump territory.
Here is why: every consumer that reads the old column name will either throw an error (if they access it directly) or silently receive null/KeyError (if they use .get() or pandas column selection with defaults). The data is the same, but the interface has changed - and interface changes break consumers.
The correct process:
- The new column name is added as a new optional column in the current major version (minor bump).
- Both old and new column names are emitted for the duration of the deprecation period (30 days).
- All registered consumers update their code to use the new column name.
- The old column is removed in the next major version, after all consumers have migrated.
- The major version bump is published with a "removed: old_column_name" change log.
This dual-emit pattern is the data equivalent of a blue/green deployment - it allows zero-downtime migration across a breaking interface change.
Q4: How do you convince an upstream team to adopt data contracts when they see it as extra work?
The mistake is framing contracts as work the upstream team does for the benefit of others. Reframe it as work they do for their own protection.
The protection argument: A contract defines the boundary of what downstream consumers are allowed to depend on. Anything not in the contract is not a guaranteed interface. If a downstream team builds a dependency on an undocumented behavior, and the upstream team changes it, the upstream team is not responsible. Without a contract, every undocumented field is an implicit promise - and implicit promises get violated and create blame.
The practical argument: Show them how many times they were pinged in Slack about data questions in the last quarter. Contracts answer those questions automatically, in a machine-readable format that tooling can query.
The low-friction argument: Offer to write the first contract for them, generated from their existing schema. A contract generator that produces 80% of the YAML automatically - the team only needs to add descriptions, owners, and SLAs.
The incident argument: Find a recent incident where a schema change by this team broke a downstream pipeline. Show the timeline of the impact, the debugging cost, the engineering hours lost. Then show how a contract with a 30-day deprecation window would have prevented it.
Most teams agree in principle. The remaining resistance is friction. Removing friction - tooling, templates, contract generators - is the fastest path to adoption.
Q5: How would you implement data contract enforcement in a Kafka-based streaming pipeline?
In a Kafka pipeline, the enforcement point is the consumer - every pipeline that reads from the topic should validate the contract before processing each batch.
Step 1 - Use a schema registry for structural enforcement. The Confluent Schema Registry with Avro or Protobuf enforces column names and types at the Kafka serialization layer. Producers cannot emit messages that violate the registered schema. This is the structural layer of the contract.
Step 2 - Add semantic enforcement at the consumer. After deserializing from Kafka, validate the business rules from the contract - enum values, conditional nullability, range checks. Do this in the first Flink or Spark operator, before any stateful computation.
# In Flink: contract enforcement as a flat_map operator
class ContractEnforcer(FlatMapFunction):
def __init__(self, contract: DataContract):
self.contract = contract
self.quarantine_sink = get_quarantine_sink()
def flat_map(self, record: dict, collector):
violations = self.contract.validate_record(record)
if violations:
# Route to quarantine, do not forward to main pipeline
self.quarantine_sink.send(record | {"violations": str(violations)})
else:
collector.collect(record)
Step 3 - Monitor contract compliance metrics in Prometheus/Grafana. Export a counter for each violation type. Alert when the violation rate crosses a threshold (e.g., > 0.1% of messages in a 5-minute window).
Step 4 - Use consumer group lag as a freshness proxy. If the consumer group lag grows, the producer is not keeping up with the freshness SLA in the contract. Alert on lag rather than waiting for downstream effects.
Q6: What is the difference between a data contract and a data catalog?
A data catalog is a discovery tool - it answers "what data exists, where is it, and what does it contain?" It is primarily read-only documentation.
A data contract is an enforcement tool - it answers "what do I guarantee about this data, and what happens when I violate that guarantee?" It is an active, machine-enforced agreement.
The relationship: a good data catalog imports and displays contracts. Contracts provide the machine-readable content (schema, SLAs, owners) that populates the catalog. But a catalog without contracts is documentation that may be wrong. Contracts without a catalog are enforcement without discoverability.
In practice: use a catalog (DataHub, Atlan, OpenMetadata) as the UI layer that makes contracts discoverable. Use the contract YAML as the source of truth that the catalog ingests. This separation means the contract file is always authoritative - the catalog is a view on top of it.
Quick Reference
| Concept | Definition | Why It Matters |
|---|---|---|
| Data contract | Machine-readable agreement on schema, semantics, SLAs, and ownership | Makes silent breaking changes into loud, catchable violations |
| Breaking change | Any change that could cause a correct consumer to fail or produce wrong output | Requires major version bump + notice period |
| Non-breaking change | Adding optional fields, relaxing constraints, extending enums | Minor version bump, no consumer action required |
| Contract registry | Central store of all published contracts and their versions | Enables discoverability and consumer notification |
| Quarantine | Routing invalid records to a dead-letter store instead of failing the pipeline | Allows clean records to continue while bad records are investigated |
| Fail-fast | Raising an exception immediately on contract violation | Correct for critical fields where downstream corruption is worse than pipeline downtime |
Key Takeaways
- A database schema describes structure. A data contract adds semantics, SLAs, and ownership.
- Every type change is a breaking change. No exceptions.
- Enforce contracts at the consumer, not just the producer - never trust that the upstream team did the right thing.
- The organizational case for contracts is stronger than the technical case: contracts protect producers from undocumented consumer assumptions.
- Use the open-source data contract YAML specification as a standard - it integrates with Data Contract CLI, Soda Core, and dbt.
- Quarantine invalid records rather than failing the entire pipeline - except for critical fields where partial data is worse than no data.
Next: Great Expectations →
