Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Quality Checks demo on the EngineersOfAI Playground - no code required. :::

Great Expectations: Data Quality as Code

The 400-Check Disaster

It started as a reasonable idea. A data engineering team of six people, shipping a feature platform for a recommendation system, decided they needed data quality checks. The checks went into Airflow DAGs as custom SQL queries. One DAG for the user features table, one for the item catalog, one for the interaction logs. Over eighteen months, the check count grew to four hundred. Twelve different DAGs, each with its own custom Python helper that connected to Redshift, ran a SQL query, and raised an exception if the result was unexpected.

The system worked, after a fashion. Engineers added checks when they found bugs. "NULL user_ids are now blocked" - great, add a check. "Item prices should never exceed $50,000" - add a check. "The feature engineering job should process at least ten thousand rows per day" - add a check. Nobody removed old checks. Nobody documented what any check was testing. The CI pipeline took forty-five minutes just to run the quality gates.

Then the schema changed. The data warehouse team renamed user_id to uid across fifteen tables to align with a new naming convention. They sent an email. The data engineering team acknowledged the email. They updated the column references in their dbt models and their Spark jobs. They did not update seventy-three of the four hundred SQL quality checks.

Nobody noticed for eleven days. The checks were still running. They were still passing. The SQL queries were using the old column name, matching zero rows - and returning zero failures, because the query logic was checking whether a count was greater than a threshold, and zero is never greater than anything suspicious when you haven't calibrated the baseline. Silent success on broken checks.

The incident cost two weeks of investigation, a degraded recommendation model shipped to ten percent of users, and an architecture review where the team had to admit that their quality system was actually an elaborate illusion. The checks existed. The checks ran. The checks told them nothing.

They adopted Great Expectations. Four hundred SQL checks became eighty declarative expectations. The suite lived in version control alongside the data models. When the schema changed again - and it did, three months later - the entire suite failed immediately, clearly, with a validation report that named every broken expectation in plain English. The Data Docs site updated automatically. The on-call engineer got a Slack message with a link to the report. The fix took two hours.


Why This Exists

Before frameworks like Great Expectations, data quality was a folklore practice. Every team invented their own approach: custom SQL scripts, ad-hoc Python assertions, whatever the senior engineer had done at their previous company. These approaches shared a common failure mode: they were imperative rather than declarative. You wrote how to check the data, not what you expected the data to be. The distinction matters enormously.

An imperative check looks like this:

result = engine.execute("SELECT COUNT(*) FROM users WHERE user_id IS NULL")
count = result.scalar()
if count > 0:
raise ValueError(f"Found {count} null user_ids")

A declarative expectation looks like this:

validator.expect_column_values_to_not_be_null("user_id")

These look similar on the surface. The difference emerges at scale:

  • The imperative check is opaque - you have to read the SQL to understand what it's testing
  • The declarative expectation self-documents - any engineer reading it immediately understands the contract
  • The imperative check has no metadata - nobody knows when it was written, what table it covers, or whether it's still valid
  • The declarative expectation is tracked in a versioned suite - you can see every expectation, when it was added, and what its configuration is
  • The imperative check fails silently when the column it references no longer exists
  • The declarative expectation surfaces the schema mismatch explicitly

Great Expectations was built to solve this. It provides a language for expressing data contracts - what you expect the data to be - and an execution engine for verifying those contracts against real data, with human-readable reports that non-engineers can understand.


Historical Context

Great Expectations was created by Abe Gong and James Campbell at the data consultancy Superconductive (later renamed Great Expectations) around 2017. The origin story is telling: Gong was working on data pipelines at Jawbone, a health data company, and kept running into the same problem. Data arrived with subtle defects that downstream models amplified into major failures. The defects were hard to catch because nobody had written down what "correct" looked like. When you don't have a specification, you can't verify compliance.

The project went open-source in 2017 and grew rapidly because it addressed a universal pain point. The 1.0 release in 2023 brought a significantly redesigned API (the "fluent" API), breaking some backward compatibility with earlier versions but providing a cleaner, more Pythonic interface. The ecosystem grew to include connectors for Pandas, Spark, SQLAlchemy, Snowflake, BigQuery, Redshift, and most major data storage systems.

The core insight that makes Great Expectations powerful is treating expectations as data. A GX expectation is not a Python function - it is a JSON-serializable object with a type, parameters, and metadata. This means suites can be stored in version control, diffed across time, compared between environments, and rendered into documentation automatically. The expectation becomes the specification, and the specification becomes the test.


Core Concepts

Understanding Great Expectations requires understanding six concepts and how they compose:

Expectation: A single assertion about data. "Column user_id should never be null." "Column age should have values between 0 and 120." Each expectation has a type (like expect_column_values_to_not_be_null) and parameters (like which column, what thresholds). Expectations are serializable - they can be written to a JSON file and reloaded later.

ExpectationSuite: A named collection of expectations. Think of it as a test suite in unit testing. A suite called user_features_suite might contain twenty expectations covering all the quality contracts for the user features table. Suites are stored as JSON files.

Validator: The bridge between your data and your expectations. A Validator wraps a dataset (a Pandas DataFrame, a Spark DataFrame, a database table) and provides methods to run expectations against it. You call validator.expect_column_to_exist("user_id") and the Validator runs the check and records the result.

Checkpoint: The orchestration primitive. A Checkpoint ties together a Validator (your data), an ExpectationSuite (your expectations), and a set of Actions (what to do with the results). You define a Checkpoint once and run it repeatedly - in Airflow, in Prefect, in a CI job. The Checkpoint is the thing you call in production.

DataContext: The configuration hub. All GX configuration - where suites are stored, where validation results are saved, how Data Docs are built and hosted - flows through the DataContext. Think of it as the project-level configuration object that wires everything together.

Data Docs: Auto-generated HTML reports that show the state of your validation results. For every Checkpoint run, GX updates the Data Docs with a new validation result page showing which expectations passed, which failed, and the observed statistics. Data Docs can be hosted in S3, served locally, or embedded in internal tooling.


Installation and Setup

Install Great Expectations (version 1.x uses the fluent API):

pip install great-expectations
# For specific connectors:
pip install 'great-expectations[snowflake]'
pip install 'great-expectations[spark]'
pip install 'great-expectations[sqlalchemy]'

Initialize a project:

great_expectations init

This creates a great_expectations/ directory with the following structure:

great_expectations/
├── great_expectations.yml # DataContext configuration
├── expectations/ # ExpectationSuites stored as JSON
├── checkpoints/ # Checkpoint configurations
└── uncommitted/
├── config_variables.yml # Secrets (gitignored)
└── data_docs/ # Generated HTML reports (gitignored)

The great_expectations.yml file configures your DataContext programmatically. In production, you often skip the YAML and configure entirely in Python:

import great_expectations as gx

# Create an ephemeral context (in-memory, for pipelines)
context = gx.get_context(mode="ephemeral")

# Or load from a project directory
context = gx.get_context() # reads great_expectations.yml

# Or create a filesystem context explicitly
context = gx.get_context(
context_root_dir="/opt/airflow/great_expectations"
)

Connecting to Data Sources

The fluent API in GX 1.x makes connecting to data sources clean and explicit:

import great_expectations as gx
import pandas as pd

context = gx.get_context(mode="ephemeral")

# --- Pandas datasource ---
datasource = context.data_sources.add_pandas("my_pandas_source")
data_asset = datasource.add_dataframe_asset("user_features")
batch_def = data_asset.add_batch_definition_whole_dataframe("batch")

df = pd.read_parquet("s3://my-bucket/user_features/2024-01-15.parquet")
batch = batch_def.get_batch(batch_parameters={"dataframe": df})

# --- SQLAlchemy datasource (Postgres, Snowflake, Redshift, BigQuery) ---
pg_datasource = context.data_sources.add_postgres(
name="postgres_prod",
connection_string="postgresql+psycopg2://user:pass@host:5432/dbname"
)
table_asset = pg_datasource.add_table_asset(
name="user_features_table",
table_name="user_features",
schema_name="ml_features"
)
pg_batch_def = table_asset.add_batch_definition_whole_table("daily_batch")

# --- Spark datasource ---
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("gx_validation").getOrCreate()

spark_datasource = context.data_sources.add_spark("spark_source")
spark_asset = spark_datasource.add_dataframe_asset("events")
spark_batch_def = spark_asset.add_batch_definition_whole_dataframe("batch")

spark_df = spark.read.parquet("s3://my-bucket/events/dt=2024-01-15/")
spark_batch = spark_batch_def.get_batch(
batch_parameters={"dataframe": spark_df}
)

Writing Expectations

The core of GX is the expectation API. You get a Validator from a batch and call expectation methods on it. Each method runs the check and stores the result.

import great_expectations as gx
import pandas as pd

context = gx.get_context(mode="ephemeral")
datasource = context.data_sources.add_pandas("src")
asset = datasource.add_dataframe_asset("users")
batch_def = asset.add_batch_definition_whole_dataframe("b")

df = pd.read_parquet("users.parquet")
batch = batch_def.get_batch(batch_parameters={"dataframe": df})

# Create a suite
suite = context.suites.add(
gx.ExpectationSuite(name="user_features_suite")
)

# Get a validator
validator = context.get_validator(
batch_definition=batch_def,
batch_parameters={"dataframe": df},
expectation_suite=suite,
)

Structural Expectations

# Column must exist
validator.expect_column_to_exist("user_id")
validator.expect_column_to_exist("age")
validator.expect_column_to_exist("signup_date")
validator.expect_column_to_exist("subscription_tier")

# Table must have at least 10,000 rows
validator.expect_table_row_count_to_be_between(
min_value=10_000,
max_value=None # no upper bound
)

# Exactly these columns (strict schema enforcement)
validator.expect_table_columns_to_match_ordered_list(
column_list=["user_id", "age", "signup_date", "subscription_tier", "ltv_usd"]
)

Null and Completeness Expectations

# Primary key must never be null
validator.expect_column_values_to_not_be_null("user_id")

# Optional field - at most 5% null (95% complete)
validator.expect_column_values_to_not_be_null(
"age",
mostly=0.95 # 95% of values must satisfy the expectation
)

# Revenue must be non-null for paying users
# (use a custom expectation or filter the batch first)
validator.expect_column_values_to_not_be_null("ltv_usd")

Range and Value Expectations

# Age must be between 13 and 120
validator.expect_column_values_to_be_between(
column="age",
min_value=13,
max_value=120
)

# Price can be zero (free items) but never negative
validator.expect_column_values_to_be_between(
column="price_usd",
min_value=0.0,
max_value=None
)

# Subscription tier must be one of the valid values
validator.expect_column_values_to_be_in_set(
column="subscription_tier",
value_set=["free", "pro", "enterprise"]
)

# Exactly these distinct values - stricter than be_in_set
validator.expect_column_distinct_values_to_equal_set(
column="event_type",
value_set={"click", "view", "purchase", "refund"}
)

Format and Pattern Expectations

# Email format validation
validator.expect_column_values_to_match_regex(
column="email",
regex=r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$"
)

# ISO date format
validator.expect_column_values_to_match_regex(
column="signup_date",
regex=r"^\d{4}-\d{2}-\d{2}$"
)

# UUID format for generated keys
validator.expect_column_values_to_match_regex(
column="session_id",
regex=r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
)

# User IDs must be positive integers as strings
validator.expect_column_values_to_match_regex(
column="user_id",
regex=r"^\d+$"
)

Uniqueness Expectations

# Primary key must be unique
validator.expect_column_values_to_be_unique("user_id")

# Composite key must be unique (event_id + timestamp combination)
validator.expect_compound_columns_to_be_unique(
column_list=["session_id", "event_sequence_number"]
)

The Critical Temporal Ordering Expectation (ML Anti-Leakage)

This expectation matters enormously in ML pipelines. Label leakage - where information from the future contaminates training features - is one of the most dangerous and hardest-to-detect data quality failures. A feature timestamp that occurs after the label timestamp is a red flag.

# feature_timestamp must be LESS THAN OR EQUAL TO label_timestamp
# This prevents label leakage in training data
validator.expect_column_pair_values_a_to_be_greater_than_b(
column_A="label_timestamp", # the target event time
column_B="feature_snapshot_time", # when features were computed
or_equal=True, # allow equal (feature at same time as label)
mostly=0.999 # allow tiny floating point edge cases
)

# In a loan default prediction dataset:
# loan_default_date must be > feature_snapshot_date
validator.expect_column_pair_values_a_to_be_greater_than_b(
column_A="default_date",
column_B="snapshot_date",
or_equal=False # strict: features must precede label
)

Statistical Distribution Expectations

# Mean age should be roughly 28-45 (business domain knowledge)
validator.expect_column_mean_to_be_between(
column="age",
min_value=25.0,
max_value=50.0
)

# Median revenue should be positive
validator.expect_column_median_to_be_between(
column="revenue_usd",
min_value=0.01,
max_value=None
)

# Standard deviation of age shouldn't be extreme
validator.expect_column_stdev_to_be_between(
column="age",
min_value=1.0,
max_value=30.0
)

# Quantile checks - important for drift detection
validator.expect_column_quantile_values_to_be_between(
column="session_duration_seconds",
quantile_ranges={
"quantiles": [0.25, 0.5, 0.75, 0.95],
"value_ranges": [
[10, 120], # p25: 10s to 2 minutes
[30, 300], # p50: 30s to 5 minutes
[60, 900], # p75: 1 minute to 15 minutes
[120, 3600], # p95: 2 minutes to 1 hour
]
}
)

Custom Expectations

When built-in expectations don't cover your use case, you write custom ones. The GX framework provides a base class ColumnMapExpectation for row-level checks (does each row satisfy this condition?) and ColumnAggregateExpectation for aggregate checks (does the whole column satisfy this condition?).

from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.execution_engine import (
PandasExecutionEngine,
SparkDFExecutionEngine,
)
from great_expectations.expectations.metrics import (
ColumnMapMetricProvider,
column_condition_partial,
)


class ExpectColumnValuesToBeValidUsPhone(ColumnMapExpectation):
"""Expect column values to be valid US phone numbers (10 digits, various formats)."""

map_metric = "column_values.valid_us_phone"

success_keys = ("mostly",)

default_kwarg_values = {
"mostly": 1.0,
}

examples = [
{
"data": {
"phones": ["555-867-5309", "(555) 867-5309", "5558675309", "867-5309"]
},
"tests": [
{
"title": "valid_us_phones",
"exact_match_out": False,
"include_in_gallery": True,
"in": {"column": "phones"},
"out": {
"success": False, # "867-5309" has only 7 digits
"unexpected_index_list": [3],
}
}
]
}
]


class ColumnValuesValidUsPhone(ColumnMapMetricProvider):
condition_metric_name = "column_values.valid_us_phone"

@column_condition_partial(engine=PandasExecutionEngine)
def _pandas(cls, column, **kwargs):
import re
pattern = re.compile(r"""
^(\+1)?[\s.-]?
\(?([0-9]{3})\)?
[\s.-]?([0-9]{3})
[\s.-]?([0-9]{4})$
""", re.VERBOSE)
return column.astype(str).str.match(pattern)

@column_condition_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, **kwargs):
from pyspark.sql import functions as F
pattern = r"^(\+1)?[\s.-]?\(?([0-9]{3})\)?[\s.-]?([0-9]{3})[\s.-]?([0-9]{4})$"
return column.cast("string").rlike(pattern)

A simpler custom expectation using the functional API (GX 1.x):

from great_expectations.expectations.expectation import ColumnMapExpectation


class ExpectColumnValuesToNotContainPII(ColumnMapExpectation):
"""Detect likely PII patterns: SSNs, credit card numbers."""

map_metric = "column_values.no_pii_pattern"
success_keys = ("mostly",)
default_kwarg_values = {"mostly": 1.0}

# Register metric inline using decorator pattern
@staticmethod
def _check_no_pii(series):
import re
ssn_pattern = re.compile(r"\b\d{3}-\d{2}-\d{4}\b")
cc_pattern = re.compile(r"\b(?:\d{4}[-\s]?){3}\d{4}\b")

def is_clean(val):
s = str(val)
return not (ssn_pattern.search(s) or cc_pattern.search(s))

return series.apply(is_clean)

Profiling: Auto-Generating Suites

GX can profile an existing dataset and auto-generate an expectation suite based on what it observes. This is useful as a starting point, but comes with serious caveats.

import great_expectations as gx
import pandas as pd
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler

context = gx.get_context()

# Load your data
df = pd.read_parquet("user_features_sample.parquet")

# Create datasource and validator
datasource = context.data_sources.add_pandas("profiling_src")
asset = datasource.add_dataframe_asset("users")
batch_def = asset.add_batch_definition_whole_dataframe("b")
batch = batch_def.get_batch(batch_parameters={"dataframe": df})

# Profile to generate a suite automatically
suite, validation_result = context.assistants.onboarding.run(
batch_request=batch_def.build_batch_request(
batch_parameters={"dataframe": df}
)
)

print(f"Generated {len(suite.expectations)} expectations")
context.suites.add(suite)

:::danger Profiling Anti-Pattern: The Garbage-In Problem

Profiling learns what the data currently is, not what it should be. If your data has 3% NULL values in a critical column, the profiler generates an expectation that allows 3% NULLs - encoding a data quality problem as an accepted expectation.

Never deploy a profiled suite directly to production. Always review every generated expectation against your business rules and domain knowledge. Use profiling to discover what expectations might be useful, then manually write the expectations with the correct thresholds.

The worst outcome: you profile on a "bad" day when an upstream system sent malformed data, and now your expectation suite silently accepts that malformed shape forever. :::


Checkpoints: Running Suites in Production

A Checkpoint ties together a batch of data, an expectation suite, and actions to take on the results. Checkpoints are what you call in your Airflow DAGs, Prefect flows, or CI pipelines.

import great_expectations as gx
from great_expectations.checkpoint import Checkpoint

context = gx.get_context()

# Create a checkpoint
checkpoint = context.checkpoints.add(
Checkpoint(
name="user_features_daily_checkpoint",
validations=[
{
"batch_request": {
"datasource_name": "postgres_prod",
"data_asset_name": "user_features_table",
},
"expectation_suite_name": "user_features_suite",
}
],
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "failure", # only on failure
"renderer": {
"module_name": "great_expectations.render.renderer",
"class_name": "SlackRenderer",
},
},
},
],
)
)

Running a Checkpoint in Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import great_expectations as gx


def run_gx_checkpoint(checkpoint_name: str, **context):
"""Run a GX checkpoint and raise on failure."""
gx_context = gx.get_context(
context_root_dir="/opt/airflow/great_expectations"
)

checkpoint = gx_context.checkpoints.get(checkpoint_name)
result = checkpoint.run()

if not result.success:
failed_expectations = []
for validation_result in result.run_results.values():
for er in validation_result.results:
if not er.success:
failed_expectations.append(
f"{er.expectation_config.expectation_type}: "
f"{er.expectation_config.kwargs}"
)

raise ValueError(
f"Data quality check failed. "
f"Failed expectations:\n" + "\n".join(failed_expectations)
)

return f"All {result.statistics['successful_expectations']} expectations passed"


with DAG(
dag_id="user_features_pipeline",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
catchup=False,
) as dag:

validate_source = PythonOperator(
task_id="validate_source_data",
python_callable=run_gx_checkpoint,
op_kwargs={"checkpoint_name": "user_features_source_checkpoint"},
)

run_feature_engineering = PythonOperator(
task_id="run_feature_engineering",
python_callable=lambda: print("Running feature engineering..."),
)

validate_output = PythonOperator(
task_id="validate_output_features",
python_callable=run_gx_checkpoint,
op_kwargs={"checkpoint_name": "user_features_output_checkpoint"},
)

validate_source >> run_feature_engineering >> validate_output

Data Docs: Automated Quality Reports

Data Docs are HTML reports generated from validation results. Every time a Checkpoint runs, GX updates the Data Docs with the new results. The report shows:

  • Which expectations passed or failed, with expected vs. observed values
  • Historical validation results (pass rate over time)
  • The full expectation suite as documentation
  • Statistics about the data (min, max, mean, null rates) for every column checked
# Configure Data Docs to publish to S3
context = gx.get_context()

# Add S3-backed Data Docs site
context.add_data_docs_site(
site_name="s3_data_docs",
site_config={
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": "my-data-docs-bucket",
"prefix": "data-docs/",
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
},
},
)

# After running a checkpoint, open locally for review
context.open_data_docs() # opens in browser

The Data Docs site becomes the shared source of truth for data quality. Business analysts can see which tables passed validation and when. Data scientists can audit what was verified before training. Incident responders can trace when quality started degrading.


ML-Specific Expectation Patterns

Training Data Quality Gates

def validate_training_data(df: pd.DataFrame, context: gx.DataContext) -> bool:
"""
Run ML-specific validations on training data before model training.
Returns True if all critical checks pass.
"""
datasource = context.data_sources.add_pandas("training_src")
asset = datasource.add_dataframe_asset("training_data")
batch_def = asset.add_batch_definition_whole_dataframe("b")

suite = context.suites.add(
gx.ExpectationSuite(name="training_data_suite")
)

validator = context.get_validator(
batch_definition=batch_def,
batch_parameters={"dataframe": df},
expectation_suite=suite,
)

# 1. Label distribution - binary classification shouldn't be >95% one class
validator.expect_column_proportion_of_unique_values_to_be_between(
column="label",
min_value=0.01, # at least 1% positive examples
max_value=0.99 # at most 99% positive
)

# 2. No future timestamps in features (anti-leakage)
from datetime import datetime
validator.expect_column_values_to_be_between(
column="feature_timestamp",
max_value=datetime.utcnow().isoformat(),
)

# 3. Temporal ordering: feature must precede label
validator.expect_column_pair_values_a_to_be_greater_than_b(
column_A="label_event_time",
column_B="feature_snapshot_time",
or_equal=True
)

# 4. Minimum training set size
validator.expect_table_row_count_to_be_between(
min_value=50_000,
max_value=None
)

# 5. Feature completeness - key features must be >95% populated
for critical_feature in ["user_age_days", "session_count_30d", "ltv_usd"]:
validator.expect_column_values_to_not_be_null(
critical_feature,
mostly=0.95
)

# 6. No duplicate training examples
validator.expect_compound_columns_to_be_unique(
column_list=["user_id", "label_event_time"]
)

results = validator.validate()
return results.success


### Inference-Time Validation (Serving Freshness)

def validate_serving_features(
feature_df: pd.DataFrame,
training_stats: dict,
context: gx.DataContext
) -> bool:
"""
Validate that inference-time features match training distribution.
training_stats: computed from training set (mean, stddev per feature)
"""
datasource = context.data_sources.add_pandas("serving_src")
asset = datasource.add_dataframe_asset("serving_features")
batch_def = asset.add_batch_definition_whole_dataframe("b")

suite = context.suites.add(
gx.ExpectationSuite(name="serving_features_suite")
)

validator = context.get_validator(
batch_definition=batch_def,
batch_parameters={"dataframe": feature_df},
expectation_suite=suite,
)

# Freshness: features must be recent (within 2 hours)
import pandas as pd
two_hours_ago = (pd.Timestamp.utcnow() - pd.Timedelta(hours=2)).isoformat()
validator.expect_column_values_to_be_between(
column="feature_computed_at",
min_value=two_hours_ago
)

# Distribution drift: mean within 3 standard deviations of training mean
for feature, stats in training_stats.items():
lower = stats["mean"] - 3 * stats["std"]
upper = stats["mean"] + 3 * stats["std"]
validator.expect_column_mean_to_be_between(
column=feature,
min_value=lower,
max_value=upper
)

results = validator.validate()
return results.success

Performance: Running GX on Large Datasets

Running GX on billion-row tables requires careful design. The key principle: never pull all the data to the GX process. Use native execution wherever possible.

# For Spark: GX pushes computation to Spark
# Don't collect to Pandas first
spark_df = spark.read.parquet("s3://bucket/events/dt=2024-01-15/")

# GX SparkDFExecutionEngine runs expectations as Spark jobs
# Aggregates are computed with Spark, not Python
spark_datasource = context.data_sources.add_spark("spark_prod")
asset = spark_datasource.add_dataframe_asset("events")
batch_def = asset.add_batch_definition_whole_dataframe("b")

# For SQL sources: GX generates and executes SQL
# e.g., expect_column_mean_to_be_between becomes:
# SELECT AVG(age) FROM user_features WHERE ...
# This runs in Snowflake/BigQuery/Redshift, not in Python

# Sampling strategy for exploratory validation
def sample_for_validation(
spark_df,
sample_fraction: float = 0.01,
min_rows: int = 100_000
) -> "DataFrame":
"""Sample large datasets for faster GX runs on expensive checks."""
total_rows = spark_df.count()

if total_rows < 1_000_000:
return spark_df # small enough to run full

# Ensure minimum sample size
actual_fraction = max(
sample_fraction,
min_rows / total_rows
)

return spark_df.sample(
withReplacement=False,
fraction=actual_fraction,
seed=42
)

:::tip Sampling Strategy for GX

For tables over 100M rows, run two tiers of validation:

  1. Full scan (via SQL pushdown): row count, null counts, min/max, distinct counts - these are aggregate queries that databases run efficiently
  2. Sampled (1-5% sample): regex matching, pair comparisons, quantile checks - these require row-level inspection

Configure expectations with mostly parameter to accommodate statistical sampling uncertainty. :::


Production Engineering Notes

Version-Controlling Expectation Suites

Treat expectation suites like test files - they live in Git alongside the code that uses them:

great_expectations/
├── expectations/
│ ├── user_features_suite.json # committed to git
│ ├── item_catalog_suite.json # committed to git
│ ├── interaction_events_suite.json # committed to git
│ └── training_data_suite.json # committed to git
└── checkpoints/
├── daily_feature_validation.yml # committed to git
└── training_gate_checkpoint.yml # committed to git

When a schema change requires updating expectations, the PR diff shows exactly which expectations changed. Code review for data contracts.

Expectation Suite Naming Convention

Use a naming convention that encodes the table/model, the environment, and the criticality:

{table_name}_{environment}_{criticality}_suite

user_features_prod_critical_suite # fails DAG if violated
item_catalog_prod_warning_suite # Slack alert only, DAG continues
training_data_staging_gate_suite # blocks model training

Separating Blocking from Warning Expectations

Not every expectation should block the pipeline. Use separate suites:

# Run both suites
critical_result = context.checkpoints.get("critical_checkpoint").run()
warning_result = context.checkpoints.get("warning_checkpoint").run()

# Only critical failures stop the pipeline
if not critical_result.success:
raise ValueError("Critical data quality check failed - aborting pipeline")

# Warning failures are logged and alerted but pipeline continues
if not warning_result.success:
logger.warning("Warning-level data quality issues detected - continuing pipeline")
send_slack_alert(warning_result)

Common Mistakes

:::danger The Checkpoint Without Action Anti-Pattern

Running a Checkpoint that doesn't fail the DAG on validation failure is worse than having no checkpoint at all. It creates false confidence. Engineers see "checkpoint runs" in the DAG and assume quality is being enforced.

# WRONG: checkpoint runs but doesn't fail the DAG
result = checkpoint.run()
# No check of result.success - pipeline always continues

# CORRECT: raise on failure
result = checkpoint.run()
if not result.success:
raise ValueError("Data quality check failed")

Always check result.success. Always raise an exception (or return a non-zero exit code) on failure in production pipelines. :::

:::danger The Profiled Suite in Production Anti-Pattern

Using a profiled expectation suite directly in production encodes the data as it exists (including defects) as the expected standard. The profiler will generate:

  • expect_column_values_to_not_be_null(column="user_id", mostly=0.97) - if 3% are null when profiled
  • expect_column_values_to_be_between(column="age", min_value=0, max_value=130) - includes the 0-year-olds from data entry errors

Profiling is a discovery tool, not a deployment tool. Always manually review and harden every expectation before adding it to a production suite. :::

:::warning The Overly Strict Suite Anti-Pattern

An expectation suite that fails on every tiny data fluctuation creates alert fatigue. Engineers start ignoring failures because "it fails all the time." Use the mostly parameter thoughtfully and separate genuinely critical expectations (row count collapse, null primary keys) from advisory ones. :::


Interview Q&A

Q1: What is the difference between an ExpectationSuite and a Checkpoint in Great Expectations?

An ExpectationSuite is a collection of expectations - it defines what the data should look like. A Checkpoint is the orchestration object that runs an ExpectationSuite against a batch of data and defines what to do with the results. The suite is the specification; the Checkpoint is the execution and action layer. In production, you define Checkpoints once and run them repeatedly. Multiple Checkpoints can reference the same suite (for example, a daily Checkpoint and an on-demand validation Checkpoint both using the same user_features_suite).

Q2: How do you prevent Great Expectations from becoming a bottleneck on large Spark tables?

GX's SparkDFExecutionEngine pushes expectation computation into Spark jobs rather than collecting data to Python. For aggregate expectations (row count, null counts, mean, max), Spark computes these natively. For row-level expectations (regex matching, pair comparisons), GX applies them as Spark column operations across the cluster. For very large tables (billions of rows), use sampling (mostly parameter + statistical sampling of the DataFrame) for expensive row-level checks while running aggregate checks on the full table. Also batch your expectations so GX computes metrics in a single pass rather than scanning the table once per expectation.

Q3: How do you use Great Expectations to prevent label leakage in ML training data?

Use expect_column_pair_values_a_to_be_greater_than_b to enforce temporal ordering. The expectation takes two column names: the label timestamp (column A, which should be greater) and the feature snapshot timestamp (column B, which should be earlier). If features from after the label event contaminate the training set, this expectation fails. Run this as a blocking gate before any model training job. Additionally, validate that no feature columns have values with timestamps beyond the expected cutoff date using expect_column_values_to_be_between with a max_value of the label cutoff.

Q4: What are Data Docs, and who is the intended audience?

Data Docs are auto-generated HTML reports that document your expectation suites and validation results. They serve two audiences: engineers (to see exactly which expectations failed, with observed vs. expected values and failure percentages) and non-technical stakeholders (to verify that the data pipeline ran quality checks). Data Docs are updated on every Checkpoint run and can be hosted in S3, making them accessible to the whole team without requiring GX installation. They also serve as living documentation of your data contracts - the full expectation suite is rendered in human-readable form, showing what every quality rule covers.

Q5: When would you write a custom expectation vs. using built-in expectations?

Use built-in expectations for standard structural and statistical checks (nulls, uniqueness, value ranges, regex patterns, distribution statistics). Write custom expectations when you have domain-specific quality rules that cannot be expressed with built-in primitives: complex PII detection, business logic that depends on multiple column relationships, industry-specific format validation (CUSIP numbers, ISIN codes, medical codes), or statistical tests not built into GX (KS tests for distribution drift, chi-squared tests for categorical distributions). Custom expectations inherit from ColumnMapExpectation or ColumnAggregateExpectation and integrate fully with the GX ecosystem - they appear in Data Docs, respect the mostly parameter, and work across all supported execution engines if you implement the appropriate backends.

Q6: How do you integrate GX validation results into a broader data observability strategy?

GX handles the expectation-based validation layer - you define contracts and GX verifies them. But data observability is broader: it also includes metric trending (is null rate increasing over time?), anomaly detection (did row count drop unexpectedly?), and lineage tracking (which upstream tables affect this table?). Integrate GX by exporting ValidationResults to a metrics store (write validation result statistics to a time-series database), feeding them into monitoring dashboards (Grafana, DataDog), and using them as data freshness signals in a catalog (DataHub, Amundsen). GX handles the "does the data meet spec" question; anomaly detection tools handle the "is this metric behaving unusually" question. Both are necessary.

© 2026 EngineersOfAI. All rights reserved.