:::tip 🎮 Interactive Playground Visualize this concept: Try the Dataset Lineage demo on the EngineersOfAI Playground - no code required. :::
Data Lineage
The Two-Week Trace
The company was preparing for a regulatory audit. A compliance officer needed to verify that a specific metric in a quarterly financial report - "adjusted recurring revenue" - had been computed correctly and that all source data was from authorized systems.
The metric appeared in a Tableau dashboard. That dashboard read from a Redshift view. The view was produced by a dbt model. That dbt model joined three other dbt models. Those three models ingested data from five different source systems: the billing platform, the CRM, the invoicing system, a legacy Oracle database, and a partner data feed. Two of the five sources had data transformation logic applied in Python scripts before it reached dbt. One of the Python scripts had been modified six months earlier.
The investigation took two weeks. Two senior data engineers spent most of that time reading dbt model SQL, tracing table dependencies in Redshift, and interviewing the engineer who wrote the Python scripts. At the end, they could construct the lineage graph manually. They had no way to verify that the manual reconstruction was complete.
With column-level lineage tracked by OpenLineage and stored in a lineage catalog, the same trace would take 30 seconds. You click on the metric in the catalog, select "trace upstream lineage," and the system shows you a complete graph - every table, every transformation, every column mapping - from the Tableau dashboard back to each source system record.
Lineage is not just an audit tool. It is the infrastructure that makes every other data reliability practice faster and more precise. Root cause analysis, impact assessment, onboarding, compliance - all of these become tractable problems when lineage is systematically captured.
Why This Exists
Before automated lineage, data teams maintained lineage in one of three ways: spreadsheets documenting "which tables come from which sources," comments in SQL files, or tribal knowledge held by senior engineers.
All three approaches fail at scale. Spreadsheets go stale immediately after the first schema change. SQL comments are optimistic documentation - they describe intent, not reality. Tribal knowledge walks out the door when the engineer who wrote the pipeline leaves the company.
The problem is not that engineers are undisciplined. It is that manually maintaining lineage documentation is exponentially harder than the underlying system it describes. A data warehouse with 200 tables and 50 transformation models has potentially thousands of column-to-column dependencies. Keeping that graph accurate and current by hand is not a realistic expectation.
Automated lineage captures dependency information as a byproduct of running the pipeline. Every time Airflow executes a task, every time dbt runs a model, every time Spark submits a job - the lineage metadata is emitted automatically. This makes lineage as current as the most recent pipeline run, with zero ongoing documentation burden.
Historical Context
The idea of tracking data provenance is older than the modern data stack. The W3C PROV standard for recording data provenance was published in 2013. Apache Atlas, which includes lineage tracking for the Hadoop ecosystem, shipped with HDP 2.x around 2016.
The key shift happened around 2019–2020 with two converging developments. First, dbt popularized the idea of documenting the full transformation graph in a declarative way - dbt docs generate produces a lineage graph of all models as a first-class output. Second, OpenLineage launched in 2021 as an open standard for emitting lineage metadata from any data tool, so Airflow, Spark, Flink, and dbt could all emit lineage events in the same format and collect them in the same store.
This turned lineage from a proprietary feature of specific platforms (Atlas for Hadoop, Informatica for enterprise ETL) into a portable, open standard that works across the entire modern data stack.
What Data Lineage Is
Data lineage is the complete record of a data asset's origin, its movement through systems, and the transformations applied to it. It answers three questions:
- Where did this data come from? (upstream lineage)
- How was it transformed? (transformation lineage)
- What does it feed into? (downstream lineage)
These three questions correspond to the two primary use cases: debugging (trace backward from a wrong output to find the source of error) and impact analysis (trace forward from a proposed change to understand what will break).
Table-Level vs. Column-Level Lineage
Table-level lineage tracks which tables feed which tables. It tells you that staging.events is derived from raw.events, and that mart.user_features reads from staging.events. This is useful for high-level understanding but insufficient for precise debugging.
Column-level lineage tracks which source columns produce which output columns, including the transformations applied. It tells you that mart.user_features.last_event_ts is derived from raw.events.created_at by applying a UTC normalization in staging.events. This is the level of detail needed to answer: "did the wrong prediction come from the timezone bug in the created_at field?"
Column-level lineage is dramatically more valuable but requires parsing SQL or code to extract column-to-column mappings. This is a non-trivial problem - SQL is flexible and expressive, and a SELECT * can make it impossible to determine column mappings without executing the query against the actual schema.
OpenLineage: The Open Standard
OpenLineage is an open standard for capturing and communicating lineage metadata across the data ecosystem. It was created by Datakin (later acquired by Astronomer) and donated to the Linux Foundation in 2021.
The core of OpenLineage is a JSON event schema. Whenever a data job runs - an Airflow task, a Spark job, a dbt model - it emits an OpenLineage RunEvent that describes:
- The job that ran (name, namespace, run ID)
- The inputs: which tables were read, including their schema at the time of reading
- The outputs: which tables were written, including their schema
- The column-level lineage: for each output column, which input columns contributed to it
These events are collected by a lineage server. The reference implementation is Marquez, an open-source metadata service that stores OpenLineage events and exposes them via API and a web UI.
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
RunEvent, RunState, Run, Job,
Dataset, InputDataset, OutputDataset
)
from openlineage.client.facet import (
SchemaDatasetFacet, SchemaField,
ColumnLineageDatasetFacet, ColumnLineageDatasetFacetFieldsAdditional,
ColumnLineageDatasetFacetFieldsAdditionalInputField,
)
from datetime import datetime
import uuid
def emit_lineage_event(
job_name: str,
input_table: str,
output_table: str,
column_mappings: dict, # {output_col: [input_cols]}
) -> None:
"""
Emit an OpenLineage RunEvent to Marquez.
column_mappings example:
{"revenue_usd": ["amount_usd", "currency_code"],
"invoice_ts": ["created_at"]}
"""
client = OpenLineageClient(url="http://marquez:5000")
run_id = str(uuid.uuid4())
# Build column-level lineage facet
col_lineage = {}
for output_col, input_cols in column_mappings.items():
col_lineage[output_col] = ColumnLineageDatasetFacetFieldsAdditional(
inputFields=[
ColumnLineageDatasetFacetFieldsAdditionalInputField(
namespace="warehouse",
name=input_table,
field=ic,
)
for ic in input_cols
],
transformationType="IDENTITY",
transformationDescription=f"Derived from {', '.join(input_cols)}",
)
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=run_id),
job=Job(namespace="warehouse", name=job_name),
inputs=[InputDataset(namespace="warehouse", name=input_table)],
outputs=[OutputDataset(
namespace="warehouse",
name=output_table,
facets={
"columnLineage": ColumnLineageDatasetFacet(fields=col_lineage),
},
)],
)
client.emit(event)
print(f"Lineage emitted: {job_name} → {output_table}")
Lineage in dbt
dbt provides table-level lineage as a built-in feature. Every ref() and source() call in a dbt model is a declared dependency. Running dbt docs generate builds a full DAG of all models and their dependencies, which is rendered in the dbt docs serve UI.
For partial builds, dbt uses this lineage graph directly. dbt run --select +my_model runs my_model and all of its upstream dependencies. dbt run --select my_model+ runs my_model and all of its downstream dependents. This is lineage being used operationally, not just as documentation.
dbt does not natively produce column-level lineage, but the dbt-column-lineage package and tools like sqlglot can parse dbt-generated SQL to extract column-level mappings.
-- staging/stg_invoices.sql
-- dbt automatically tracks that this model depends on source('billing', 'invoices')
SELECT
id AS invoice_id,
customer_id,
-- Column-level lineage: invoice_ts derived from created_at via UTC conversion
CONVERT_TIMEZONE('America/New_York', 'UTC', created_at) AS invoice_ts,
amount_usd,
currency_code,
status
FROM {{ source('billing', 'invoices') }}
WHERE status != 'voided'
# Parse dbt-compiled SQL to extract column-level lineage
import sqlglot
from sqlglot import exp
def extract_column_lineage(sql: str, output_table: str) -> dict:
"""
Parse a SQL SELECT statement to extract column-level lineage.
Returns {output_column: [source_columns]} mapping.
"""
parsed = sqlglot.parse_one(sql)
lineage = {}
for alias in parsed.find_all(exp.Alias):
output_col = alias.alias
# Find all column references within this alias expression
source_cols = [
col.name
for col in alias.find_all(exp.Column)
]
if source_cols:
lineage[output_col] = source_cols
# Handle unaliased columns (SELECT id FROM ...)
select = parsed.find(exp.Select)
if select:
for col in select.find_all(exp.Column):
# Only include top-level columns (not inside aliases)
if col.find_ancestor(exp.Alias) is None:
lineage[col.name] = [col.name]
return lineage
# Example
sql = """
SELECT
id AS invoice_id,
CONVERT_TIMEZONE('UTC', created_at) AS invoice_ts,
amount_usd,
customer_id
FROM billing.invoices
WHERE status != 'voided'
"""
lineage = extract_column_lineage(sql, "stg_invoices")
print(lineage)
# {
# 'invoice_id': ['id'],
# 'invoice_ts': ['created_at'],
# 'amount_usd': ['amount_usd'],
# 'customer_id': ['customer_id'],
# }
Lineage for Impact Analysis
Impact analysis answers: "If I change column X in table A, what downstream assets will be affected?" This is the most operationally valuable use of lineage.
Consider a scenario: the source team wants to change raw.events.event_type from a free-text string to an integer enum. Before making this change, you need to know every downstream table, model, and dashboard that reads event_type and whether any of them will break when the type changes.
Without lineage, you grep the dbt repository for "event_type", hope nothing is in a notebook, and ask around. With column-level lineage, you query the lineage store.
import requests
from typing import List, Dict
MARQUEZ_API = "http://marquez:5000/api/v1"
def get_downstream_impact(
table: str,
column: str,
namespace: str = "warehouse",
) -> Dict:
"""
Query Marquez for all downstream consumers of a specific column.
Returns the full impact tree.
"""
# Get lineage graph for this dataset
url = f"{MARQUEZ_API}/lineage"
params = {
"nodeId": f"dataset:{namespace}:{table}",
"depth": 10, # Follow dependencies up to 10 hops downstream
}
resp = requests.get(url, params=params)
resp.raise_for_status()
graph = resp.json()
affected_assets = []
# Walk downstream nodes in the graph
for node in graph.get("graph", []):
if node.get("type") != "DATASET":
continue
dataset_name = node["data"]["name"]
if dataset_name == table:
continue
# Check if this dataset's lineage facets reference our column
facets = node["data"].get("facets", {})
col_lineage = facets.get("columnLineage", {}).get("fields", {})
affected_columns = []
for out_col, lineage_data in col_lineage.items():
input_fields = lineage_data.get("inputFields", [])
for field in input_fields:
if field.get("name") == table and field.get("field") == column:
affected_columns.append(out_col)
if affected_columns:
affected_assets.append({
"dataset": dataset_name,
"affected_output_columns": affected_columns,
"hop_distance": node.get("hopDistance", -1),
})
return {
"source_table": table,
"source_column": column,
"affected_assets": sorted(affected_assets, key=lambda x: x["hop_distance"]),
"total_affected": len(affected_assets),
}
# Usage
impact = get_downstream_impact("raw.events", "event_type")
# Returns:
# {
# "source_table": "raw.events",
# "source_column": "event_type",
# "affected_assets": [
# {"dataset": "staging.events", "affected_output_columns": ["event_category"], "hop_distance": 1},
# {"dataset": "mart.user_features", "affected_output_columns": ["top_event_type"], "hop_distance": 2},
# {"dataset": "ml.churn_features", "affected_output_columns": ["feature_event_diversity"], "hop_distance": 3},
# ],
# "total_affected": 3
# }
Lineage for Root Cause Debugging
When a prediction or metric is wrong, lineage provides the fastest path to root cause. Instead of reading every pipeline file and asking every engineer, you start at the wrong output and trace backward.
Here is a structured debugging process using lineage:
def trace_root_cause(
wrong_output_table: str,
wrong_column: str,
symptom: str,
lineage_client,
) -> List[Dict]:
"""
Trace upstream lineage to identify possible root causes.
Returns ordered list of transformation hops to investigate.
"""
investigation_path = []
current_table = wrong_output_table
current_column = wrong_column
for hop in range(10): # Follow upstream up to 10 hops
upstream = lineage_client.get_upstream(current_table, current_column)
if not upstream:
investigation_path.append({
"hop": hop,
"table": current_table,
"column": current_column,
"status": "SOURCE - no further upstream",
})
break
for source in upstream:
source_table = source["table"]
source_column = source["column"]
transformation = source["transformation"]
investigation_path.append({
"hop": hop,
"table": source_table,
"column": source_column,
"transformation": transformation,
"investigation_query": f"""
-- Check distribution of {source_column} in {source_table}
SELECT
COUNT(*) AS total_rows,
MIN({source_column}) AS min_val,
MAX({source_column}) AS max_val,
AVG({source_column}::float) AS avg_val,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY {source_column}) AS p95
FROM {source_table}
WHERE DATE(created_at) = CURRENT_DATE - 1
""",
})
current_table = source_table
current_column = source_column
return investigation_path
Lineage for GDPR Compliance
Data subject access requests (DSARs) and right-to-erasure requests require knowing every table that contains data about a specific user. Column-level lineage makes this systematic.
The process: given a user_id, find every table with a column that has a lineage path back to users.user_id. This gives you a complete inventory of user-linked tables for both data subject access and erasure verification.
def find_tables_with_user_data(
lineage_store,
user_id_source_table: str = "raw.users",
user_id_source_column: str = "user_id",
) -> List[str]:
"""
Find all tables that contain user-linked data via lineage graph.
Used for GDPR DSAR and right-to-erasure compliance.
"""
# Get full downstream graph from users.user_id
downstream = lineage_store.get_all_downstream(
user_id_source_table,
user_id_source_column,
)
tables_with_user_data = [user_id_source_table]
for node in downstream:
tables_with_user_data.append(node["dataset"])
return sorted(set(tables_with_user_data))
# After erasure: verify deletion propagated through lineage graph
def verify_erasure(conn, tables: List[str], user_id: str) -> Dict[str, bool]:
"""
For each table in the lineage graph, verify the user no longer exists.
"""
results = {}
for table in tables:
# Try common user identifier column names
for col in ["user_id", "customer_id", "account_id", "external_user_id"]:
query = f"""
SELECT COUNT(*) FROM {table} WHERE {col} = %s
"""
try:
with conn.cursor() as cur:
cur.execute(query, (user_id,))
count = cur.fetchone()[0]
if count is not None:
results[table] = count == 0
break
except Exception:
continue
return results
Lineage Storage: Graph Databases vs. Metadata Stores
Lineage data is inherently a graph: nodes are datasets and jobs, edges are the dependency relationships between them. Querying this graph - "find all nodes reachable from X in N hops" - is a graph traversal problem.
Graph databases (Neo4j, Amazon Neptune) are optimized for graph traversals. They excel at deep lineage queries (10+ hops) and complex path-finding. The tradeoff is operational complexity and a specialized query language (Cypher for Neo4j).
Metadata stores (DataHub, Apache Atlas, Marquez) store lineage as a specialized graph within a broader metadata platform. They provide REST APIs, a web UI, and integration with catalog features. They are the pragmatic choice for most teams because they combine lineage with ownership, documentation, and search.
For most teams, a dedicated metadata store is the right choice. Graph databases make sense when you have complex lineage queries with deep traversals and you are already running graph infrastructure.
# Storing lineage in Neo4j (for teams that need complex graph queries)
from neo4j import GraphDatabase
class LineageGraphStore:
def __init__(self, uri: str, auth: tuple):
self.driver = GraphDatabase.driver(uri, auth=auth)
def record_transformation(
self,
source_table: str,
source_columns: List[str],
output_table: str,
output_column: str,
transformation_sql: str,
job_name: str,
) -> None:
with self.driver.session() as session:
session.execute_write(
self._create_lineage_edge,
source_table, source_columns,
output_table, output_column,
transformation_sql, job_name,
)
@staticmethod
def _create_lineage_edge(tx, src_table, src_cols, out_table, out_col, sql, job):
# Create or merge table nodes
tx.run(
"MERGE (t:Table {name: $name})",
name=out_table,
)
for src_col in src_cols:
tx.run(
"""
MERGE (src:Table {name: $src_table})
MERGE (out:Table {name: $out_table})
MERGE (src_col:Column {name: $src_col, table: $src_table})
MERGE (out_col:Column {name: $out_col, table: $out_table})
MERGE (src_col)-[:FEEDS {job: $job, sql: $sql}]->(out_col)
""",
src_table=src_table,
out_table=out_table,
src_col=src_col,
out_col=out_col,
job=job,
sql=sql[:500], # truncate long SQL
)
def find_all_downstream(
self, table: str, column: str, max_hops: int = 10
) -> List[dict]:
with self.driver.session() as session:
result = session.run(
"""
MATCH path = (start:Column {name: $col, table: $table})-[:FEEDS*1..%d]->(end:Column)
RETURN
end.table AS downstream_table,
end.name AS downstream_column,
length(path) AS hop_distance
ORDER BY hop_distance
""" % max_hops,
col=column,
table=table,
)
return [dict(r) for r in result]
:::danger Lineage without column-level detail is incomplete
Table-level lineage tells you which tables are connected. Column-level lineage tells you what will break when a specific column changes. For impact analysis and root cause debugging, table-level lineage is almost always insufficient. You need to know not just that mart.user_features reads from staging.events, but that mart.user_features.last_event_ts is derived from staging.events.event_ts, which is in turn derived from raw.events.created_at. Without that chain, you cannot confidently say which downstream columns are affected by a change to created_at.
:::
:::warning The SELECT * lineage gap
SELECT * in SQL transformations creates a lineage gap. If staging.events is defined as SELECT * FROM raw.events, then the column-level lineage tool cannot statically determine which columns are included without executing the query against the actual schema. Tools like sqlglot handle this by falling back to table-level lineage for SELECT * and warning that column-level resolution requires schema introspection. When writing transformations that will be tracked for lineage, prefer explicit column lists over SELECT *.
:::
Production Engineering Notes
In production, lineage collection should be zero-overhead and automatic. The right model is:
- Instrument your orchestration layer (Airflow) with the OpenLineage Airflow provider - it automatically emits lineage events for every task without any code changes to existing DAGs.
- For dbt, configure the OpenLineage dbt adapter - it emits column-level lineage from dbt's compiled SQL.
- For Spark, use the OpenLineage Spark listener - it instruments Spark plans automatically.
Store all lineage events in Marquez (open-source) or DataHub. Set a retention policy on raw events but keep the aggregated lineage graph indefinitely - lineage graphs only grow and rarely need to be pruned.
Build lineage freshness checks into your observability pipeline. A lineage graph that has not received new events in 24 hours on an active warehouse is a signal that the lineage collection pipeline is broken, not that no jobs have run.
Interview Q&A
Q: What is the difference between table-level and column-level data lineage, and when does each matter?
A: Table-level lineage tracks which tables feed which tables - it answers "does table A depend on table B?" Column-level lineage tracks which specific source columns produce which specific output columns, including the transformations applied. Table-level is sufficient for high-level dependency documentation and for understanding coarse-grained blast radius. Column-level is required for precise impact analysis ("if I rename column X, exactly which downstream columns will break?"), for root cause debugging ("trace this wrong predicted value back to its source"), and for compliance ("prove that this sensitive column's data was purged from all derived tables"). In practice, you want both: table-level for the broad view, column-level for investigation.
Q: How does OpenLineage work, and what tools emit OpenLineage events?
A: OpenLineage is an open standard that defines a JSON event schema for lineage metadata. When a data job runs, it emits a RunEvent to a lineage server. The event includes the job identity, the input datasets (with their schemas), and the output datasets (with schemas and column-level lineage facets). The reference collection server is Marquez. Tools that natively emit OpenLineage events include Apache Airflow (via the openlineage-airflow provider), Apache Spark (via the OpenLineage Spark listener), dbt (via the OpenLineage dbt adapter), and Flink. DataHub also consumes OpenLineage events. The advantage of the open standard is that you get a unified lineage graph across all tools without any vendor lock-in.
Q: A data scientist reports that a machine learning feature looks wrong in production but correct in the training data. How does lineage help you debug this?
A: This is training-serving skew, and lineage is the fastest debugging tool. I start by finding the feature in the lineage graph and tracing its upstream path in both the training pipeline and the serving pipeline. Usually, you find a fork point - a place where the training pipeline and the serving pipeline diverge and apply different transformations to the same source column. Common patterns: the training pipeline applies a timezone normalization that the serving pipeline does not; the training pipeline joins on an attribute that has since changed; or the training pipeline uses a different date filter than the serving pipeline. Column-level lineage makes the fork point visible as a structural difference in the graph.
Q: How would you implement lineage for a GDPR right-to-erasure request?
A: The process has three steps. First, trace the full downstream lineage from users.user_id - find every table and column that contains a lineage path back to the user identifier. This gives you a complete inventory of tables to check. Second, after running the deletion process, query each table in the lineage graph to verify that rows linked to this user ID no longer exist. Third, store the lineage graph snapshot and the verification results as audit evidence - regulators want to see that you have a systematic, repeatable process, not ad hoc SQL queries. The key advantage of this approach over a manually maintained list of tables is that it automatically includes any new table that was added after the deletion process was last reviewed.
Q: How do you handle lineage for Python transformations, not just SQL?
A: Python transformations are harder to instrument than SQL because there is no query AST to parse. The practical approaches are: (1) use OpenLineage's Python client to manually emit lineage events from your Python scripts - you specify input datasets, output datasets, and column mappings explicitly; (2) use a higher-level framework like Kedro, which has a lineage plugin that captures dataset dependencies automatically from your pipeline definition; (3) for Pandas-based transformations, use the OpenLineage Pandas integration that instruments .read_csv, .read_sql, .to_parquet, etc. For complex Python transformations where the column mapping is determined dynamically, manual OpenLineage emission is the most reliable approach - you treat the Python function as the job and explicitly declare its lineage in the event.
Q: What is impact analysis in the context of data lineage, and how do you use it before making schema changes?
A: Impact analysis uses the downstream lineage graph to predict what will break if you change a table or column. Before making a schema change - renaming a column, changing a type, removing a column - you query the lineage store for all downstream consumers of that column. For each consumer, you assess the severity of impact: a renaming breaks any SQL that references the old name; a type change may cause silent errors in aggregations or downstream type casts; a removal is always a breaking change. Armed with this list, you either fix all consumers before making the change, version the column by adding the new column while keeping the old one with a deprecation notice, or coordinate a coordinated migration where consumers are updated before the source changes. Without lineage, impact analysis is guesswork and grep searches. With lineage, it is a systematic, complete inventory.
