:::tip 🎮 Interactive Playground Visualize this concept: Try the Dataset Lineage demo on the EngineersOfAI Playground - no code required. :::
Data Governance in Lakehouses
The Audit That Failed in 45 Minutes
The SOC 2 auditor asked three questions. The CISO thought they were simple questions. They were not.
Question one: "Show me a list of every employee who accessed your customer PII dataset in the last 90 days, with timestamps." The data engineering lead opened the AWS S3 access logs. They existed - but they were raw CloudTrail JSON, unindexed, partitioned by hour, covering 90 days of every API call across the entire AWS account. Finding "who read this specific Parquet file" required joining CloudTrail events against S3 key names against IAM identities. It could be done - with about two weeks of engineering work.
Question two: "What is the data lineage from your raw customer transaction data to the credit risk model that made this loan decision?" The ML team lead opened a Confluence page with a hand-drawn diagram that was 14 months old. The actual pipeline had been rewritten twice since then. Nobody knew which version of the training data the current model was trained on.
Question three: "How do you ensure that analysts in your data warehouse cannot see individual customer social security numbers when they run reports?" The answer was: they could not. The SSN column existed in the customer table. Analysts had read access to the table. There was no masking. The auditor noted this as a critical finding.
The company failed the SOC 2 audit. The remediation took six months. They implemented Databricks Unity Catalog - a unified governance layer that answered all three questions with built-in tooling. Question one: Unity Catalog's query audit log answered it in a SQL query. Question two: Unity Catalog's column-level lineage showed the complete chain from raw source to model. Question three: Unity Catalog's dynamic masking showed analysts a masked SSN (XXX-XX-XXXX) while the underlying data remained intact for authorized roles.
This lesson is about how to build a lakehouse that can answer those three questions before the auditor asks them - because in regulated industries, "we can figure it out eventually" is not a compliant answer.
Why Governance Is Different in a Lakehouse
In a traditional data warehouse (Snowflake, Redshift), governance is relatively straightforward because the warehouse controls everything: the storage format, the query engine, the access control layer, and the audit log. One vendor, one control plane.
The lakehouse breaks this tight coupling on purpose - that is its architectural advantage. Open storage (S3/GCS/ADLS), open formats (Parquet/Iceberg), multiple query engines (Spark, Trino, DuckDB). But this openness creates three governance challenges that simply do not exist in a traditional warehouse:
Challenge 1: Open Storage Bypasses Table-Level Security
In Snowflake, every query goes through the Snowflake query engine, which enforces row-level and column-level security. There is no other way to read the data - the format is proprietary.
In a lakehouse, the data is Parquet files in S3. Anyone with the S3 IAM role can run aws s3 cp s3://my-lakehouse/iceberg/customers/data/00001.parquet ./ and open the file with any Parquet reader. No table-level security is involved. This means:
- Row-level security must be enforced at the query engine layer AND access to the underlying S3 must be restricted
- Column-level masking requires either query engine enforcement (Unity Catalog, Apache Ranger) or actual data transformation at write time (encryption, tokenization)
- Access audit logs must capture both table-level queries (via the query engine) and direct S3 access (via CloudTrail)
Challenge 2: Schema Evolution Can Silently Break Downstream Consumers
A data engineer adds a new column to an Iceberg table - a legitimate schema evolution. But a downstream dbt model doesn't account for it and now produces incorrect aggregations. An ML model trained before the schema change reads the table after migration and silently uses the wrong column index. In a traditional warehouse, schema changes trigger alerts in the catalog. In a lakehouse without a governance layer, this often goes undetected until a business metric moves unexpectedly.
Challenge 3: Lineage Is Hard When Data Moves Through Multiple Layers
A lakehouse typically has data moving through: source systems → Kafka → raw Iceberg tables → curated Iceberg tables → dbt models → feature tables → ML training jobs → model predictions → Iceberg prediction tables → dashboards. At each step, data is transformed, filtered, joined, and restructured. Understanding that a specific field in a dashboard comes from a specific column in a source system - and understanding what transformations it went through - requires lineage tracking across all these systems simultaneously.
Historical Context: From Hadoop Governance to Unified Catalogs
Data governance in the Hadoop era was managed by Apache Atlas (metadata tagging and lineage) and Apache Ranger (policy-based access control). These tools were designed for the Hadoop ecosystem: HDFS files, Hive tables, HBase, Kafka. They worked, but they required significant operational investment and were deeply tied to the Hadoop stack.
As the industry moved to cloud object storage and open table formats, governance tooling had to evolve. The key developments:
- 2021: Databricks launches Unity Catalog - a unified governance layer for the Databricks lakehouse, with three-level namespacing, column-level security, and built-in lineage. This is the first commercially mature, cloud-native lakehouse governance solution.
- 2022: OpenLineage specification matures, providing a vendor-neutral standard for lineage metadata emission across Spark, Airflow, dbt, and Flink.
- 2023: DataHub and Atlan emerge as open-source and commercial data catalog solutions that integrate with Iceberg, Delta, and Unity Catalog.
- 2024: Apache Iceberg adds REST Catalog specification, enabling any governance tool to track table operations through a standard API.
The current landscape is still fragmented - there is no single "Iceberg of governance" that works across all engines and cloud providers. But the tooling has matured significantly from the Hadoop era.
Unity Catalog: The Current Standard
Databricks Unity Catalog is the most complete lakehouse governance solution available today. If you're on Databricks, it is the answer to almost every governance question. Understanding its architecture clarifies what good lakehouse governance looks like even if you're not on Databricks.
Three-Level Namespace
Unity Catalog organizes data into a three-level hierarchy:
catalog.schema.table
Examples:
production.rides.trips -- production catalog, rides schema, trips table
analytics.features.driver_stats -- analytics catalog, features schema
sandbox.amit.experiment_table -- sandbox catalog, per-user schema
This hierarchy enables isolation at multiple granularities:
- Catalog-level isolation: different business units or environments (prod vs dev)
- Schema-level isolation: different domains or teams within a catalog
- Table-level isolation: individual tables with their own permission sets
-- Unity Catalog: grant access at any level of the hierarchy
-- Grant a role read access to an entire catalog
GRANT USE CATALOG ON CATALOG production TO `data-analysts`;
-- Grant access to a specific schema
GRANT USE SCHEMA, SELECT ON SCHEMA production.rides TO `rides-team`;
-- Grant access to a specific table
GRANT SELECT ON TABLE production.rides.trips TO `external-auditor`;
-- Grant column-level access (exclude sensitive columns)
GRANT SELECT (trip_id, driver_id, fare, trip_date, status)
ON TABLE production.rides.trips TO `external-partner`;
-- external-partner cannot query user_id or payment_method
Dynamic Data Masking
Column-level security in Unity Catalog includes dynamic masking - the ability to return different values for the same column based on the querying user's role. The underlying data is never changed; the masking happens at query time in the Unity Catalog enforcement layer.
-- Define a masking policy for SSN column
CREATE OR REPLACE FUNCTION mask_ssn(ssn STRING)
RETURNS STRING
RETURN CASE
WHEN is_member('pii-authorized') THEN ssn -- authorized users see full SSN
ELSE CONCAT('XXX-XX-', SUBSTR(ssn, 8, 4)) -- others see masked version
END;
-- Apply the masking policy to the column
ALTER TABLE production.customers.profiles
ALTER COLUMN ssn
SET MASK mask_ssn;
-- Now when an unauthorized analyst queries:
SELECT customer_id, name, ssn FROM production.customers.profiles LIMIT 5;
-- They see: XXX-XX-6789 for every row, regardless of the actual SSN
-- A PII-authorized role sees the real SSN value
Row Filters
Row-level security restricts which rows a user can see based on the querying user's attributes:
-- Only show trips for the driver_id that matches the current user's driver account
CREATE OR REPLACE FUNCTION driver_row_filter(driver_id STRING)
RETURNS BOOLEAN
RETURN is_member('fleet-managers') -- fleet managers see all rows
OR driver_id = current_user(); -- drivers see only their own trips
ALTER TABLE production.rides.trips
ADD ROW FILTER driver_row_filter ON (driver_id);
-- A driver querying the table only sees their own records
-- No application-level WHERE clause required - it's enforced in the catalog
Apache Atlas and Apache Ranger: The Open-Source Stack
For organizations not on Databricks, or running on-premises Hadoop infrastructure, Apache Atlas (metadata and lineage) plus Apache Ranger (access control) is the traditional governance stack.
Apache Ranger: Policy-Based Access Control
Ranger enforces access policies across HDFS, Hive, HBase, Kafka, and Spark. Policies are defined centrally and enforced by Ranger plugins embedded in each service:
// Ranger policy: allow data-analysts to SELECT on rides schema, deny PII columns
{
"policyType": 0,
"name": "rides-analyst-policy",
"resources": {
"database": {"values": ["rides"], "isRecursive": true},
"table": {"values": ["trips"], "isRecursive": true},
"column": {"values": ["trip_id", "fare", "status", "trip_date"], "isRecursive": false}
},
"allowExceptions": [],
"policyItems": [
{
"users": [],
"groups": ["data-analysts"],
"roles": [],
"accesses": [{"type": "select", "isAllowed": true}],
"conditions": []
}
],
"denyPolicyItems": [
{
"users": [],
"groups": ["data-analysts"],
"roles": [],
"accesses": [{"type": "select", "isAllowed": true}],
"conditions": [],
"delegateAdmin": false
}
]
}
Ranger also provides a centralized audit log: every Hive or Spark query that touches a Ranger-governed resource is logged with the username, timestamp, resource name, and access result (allowed/denied).
Data Lineage: From Source to Model
Data lineage answers the question: where did this data come from, and what transformations did it go through? For regulated industries, column-level lineage is required - not just "this table came from that table," but "this specific column in the output table is derived from these specific source columns via this transformation."
OpenLineage: The Vendor-Neutral Standard
OpenLineage is a specification for emitting lineage metadata as structured events. It defines a JSON schema for "job" (a transformation) and "dataset" (a table or topic), with inputs and outputs. Any tool that emits OpenLineage events (Spark, Airflow, dbt, Flink) can feed into any OpenLineage-compatible catalog (Marquez, DataHub, Atlan).
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
RunEvent, RunState, Run, Job,
Dataset, InputDataset, OutputDataset,
)
from openlineage.client.facet import (
SchemaDatasetFacet, SchemaField,
DataSourceDatasetFacet,
SqlJobFacet,
)
import uuid
from datetime import datetime
client = OpenLineageClient.from_environment()
# Define the transformation job
job = Job(
namespace="rides-etl",
name="compute_driver_daily_stats",
facets={
"sql": SqlJobFacet(query="""
SELECT
driver_id,
trip_date,
COUNT(*) AS total_trips,
SUM(fare) AS total_revenue,
AVG(rating) AS avg_rating
FROM trips
WHERE status = 'completed'
GROUP BY driver_id, trip_date
""")
}
)
# Define input dataset (source table)
input_dataset = InputDataset(
namespace="s3://my-lakehouse",
name="iceberg.rides.trips",
facets={
"schema": SchemaDatasetFacet(fields=[
SchemaField(name="trip_id", type="LONG"),
SchemaField(name="driver_id", type="STRING"),
SchemaField(name="fare", type="DOUBLE"),
SchemaField(name="status", type="STRING"),
SchemaField(name="rating", type="DOUBLE"),
SchemaField(name="trip_date", type="STRING"),
])
}
)
# Define output dataset (result table)
output_dataset = OutputDataset(
namespace="s3://my-lakehouse",
name="iceberg.analytics.driver_daily_stats",
facets={
"schema": SchemaDatasetFacet(fields=[
SchemaField(name="driver_id", type="STRING"),
SchemaField(name="trip_date", type="STRING"),
SchemaField(name="total_trips", type="LONG"),
SchemaField(name="total_revenue", type="DOUBLE"),
SchemaField(name="avg_rating", type="DOUBLE"),
])
}
)
run_id = str(uuid.uuid4())
# Emit START event
client.emit(RunEvent(
eventType=RunState.START,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=run_id),
job=job,
inputs=[input_dataset],
outputs=[output_dataset],
))
# ... run the actual Spark job ...
# Emit COMPLETE event
client.emit(RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.utcnow().isoformat() + "Z",
run=Run(runId=run_id),
job=job,
inputs=[input_dataset],
outputs=[output_dataset],
))
When this runs, Marquez (or DataHub) receives the lineage events and builds a directed acyclic graph showing: rides.trips → compute_driver_daily_stats (SQL transform) → analytics.driver_daily_stats. Over time, this graph extends automatically as more jobs emit events.
Column-Level Lineage
Unity Catalog automatically tracks column-level lineage for all SQL operations. No instrumentation required:
# After running this Spark SQL in Unity Catalog...
spark.sql("""
CREATE OR REPLACE TABLE analytics.driver_monthly_revenue AS
SELECT
driver_id,
DATE_TRUNC('month', CAST(trip_date AS DATE)) AS revenue_month,
SUM(fare) AS total_revenue
FROM production.rides.trips
WHERE status = 'completed'
GROUP BY 1, 2
""")
# Unity Catalog now knows:
# analytics.driver_monthly_revenue.driver_id ← production.rides.trips.driver_id
# analytics.driver_monthly_revenue.revenue_month ← production.rides.trips.trip_date (via DATE_TRUNC)
# analytics.driver_monthly_revenue.total_revenue ← production.rides.trips.fare (via SUM)
# Query lineage via the Unity Catalog REST API
import requests
lineage = requests.get(
"https://your-workspace.azuredatabricks.net/api/2.0/lineage-tracking/table-lineage",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={"table_name": "analytics.driver_monthly_revenue"}
).json()
# Impact analysis: if I change trips.fare, what downstream tables are affected?
upstream = requests.get(
"https://your-workspace.azuredatabricks.net/api/2.0/lineage-tracking/column-lineage",
json={
"table_name": "production.rides.trips",
"column_name": "fare"
}
).json()
# Returns: analytics.driver_monthly_revenue.total_revenue,
# finance.reports.weekly_revenue, ml.features.driver_fare_features, ...
The Governance Architecture: Layer by Layer
Data Masking and Tokenization
Dynamic Masking (Query-Time, Reversible)
Dynamic masking returns obfuscated data at query time without changing the stored data. It is reversible - authorized roles see the real value. This is the standard approach for most PII governance use cases.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, sha2, concat, lit
spark = SparkSession.builder.appName("MaskingDemo").getOrCreate()
# Simulate masking at the Spark layer (when Unity Catalog isn't available)
def apply_masking(df, user_role: str):
"""Apply dynamic masking based on the querying user's role."""
if user_role in ("pii_authorized", "data_owner"):
# Authorized roles see unmasked data
return df
# Apply masking transformations for each PII column type
masked = df.withColumn(
"email",
regexp_replace(
col("email"),
r"^(.{1}).*(@.*)",
"$1***$2"
)
).withColumn(
"phone",
# Keep last 4 digits: +1-555-123-4567 → ***-***-4567
regexp_replace(col("phone"), r"^\+?[\d\s\-\(\)]+(?=\d{4}$)", "***-***-")
).withColumn(
"ssn",
# Mask first 5 digits: 123-45-6789 → XXX-XX-6789
regexp_replace(col("ssn"), r"^\d{3}-\d{2}", "XXX-XX")
).withColumn(
"credit_card",
# Keep last 4 digits: 4111-1111-1111-1111 → ****-****-****-1111
regexp_replace(col("credit_card"), r"(\d{4}-){3}", "****-****-****-")
)
return masked
# Usage
customers_raw = spark.table("production.customers.profiles")
customers_masked = apply_masking(customers_raw, user_role="data_analyst")
customers_masked.createOrReplaceTempView("customers")
Deterministic Tokenization (Write-Time, for ML)
For ML use cases, masking creates a problem: if user_id 12345 maps to different masked values each time it appears, the model cannot learn patterns about that user. Deterministic tokenization solves this: the same input always produces the same token, preserving record linkage while removing the identifying value.
import hashlib
import hmac
import base64
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Secret key - stored in AWS Secrets Manager, not in code
SECRET_KEY = b"your-hmac-secret-key-from-secrets-manager"
def tokenize_pii(value: str, salt: str = "") -> str:
"""
Deterministic tokenization using HMAC-SHA256.
Same input always produces the same token.
Token cannot be reversed without the secret key.
"""
if value is None:
return None
message = (salt + value).encode("utf-8")
token_bytes = hmac.new(SECRET_KEY, message, hashlib.sha256).digest()
# Base64 encode to get a readable string, take first 16 chars
return base64.urlsafe_b64encode(token_bytes).decode("utf-8")[:16]
tokenize_udf = udf(tokenize_pii, StringType())
spark = SparkSession.builder.getOrCreate()
# Apply tokenization at ingestion time - write tokenized data to ML feature table
raw_events = spark.table("production.rides.trips")
ml_features = raw_events.withColumn(
"user_token", # tokenized user_id - ML can learn per-user patterns
tokenize_udf("user_id")
).withColumn(
"driver_token", # tokenized driver_id
tokenize_udf("driver_id")
).drop("user_id", "driver_id") # drop original PII columns
ml_features.write.format("iceberg") \
.mode("overwrite") \
.saveAsTable("ml_features.rides.trip_features")
# ML team trains on ml_features.rides.trip_features
# They never see user_id or driver_id - only tokens
# But user_token=AbCdEfGhIjKlMn12 always refers to the same user across all records
GDPR and CCPA Compliance in Lakehouses
Right to Erasure (Right to Be Forgotten)
GDPR Article 17 and CCPA both require that a data subject can request deletion of all their personal data. In a lakehouse with time travel, this creates a complication: even if you delete a row from the current snapshot, historical snapshots still contain that row.
The complete deletion workflow:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("GDPRDelete").getOrCreate()
user_id_to_delete = "user_12345"
# Step 1: Delete from current snapshot
spark.sql(f"""
DELETE FROM production.customers.profiles
WHERE user_id = '{user_id_to_delete}'
""")
spark.sql(f"""
DELETE FROM production.rides.trips
WHERE user_id = '{user_id_to_delete}'
""")
spark.sql(f"""
DELETE FROM production.payments.transactions
WHERE user_id = '{user_id_to_delete}'
""")
# Step 2: Expire old snapshots that contain the deleted row
# For Iceberg: expire all snapshots older than "now" (removes historical snapshots)
spark.sql("""
CALL production.system.expire_snapshots(
table => 'customers.profiles',
older_than => NOW(),
retain_last => 1
)
""")
# Step 3: Run rewrite_data_files to physically remove deleted rows from Parquet files
# (Iceberg DELETE only marks rows as deleted in metadata; physical removal requires rewrite)
spark.sql("""
CALL production.system.rewrite_data_files(
table => 'customers.profiles',
strategy => 'sort',
where => 'user_id IS NOT NULL'
)
""")
# Step 4: Log the deletion for compliance audit
import json
from datetime import datetime
deletion_record = {
"user_id": user_id_to_delete,
"deletion_timestamp": datetime.utcnow().isoformat(),
"deleted_by": "gdpr-deletion-service",
"tables_affected": ["customers.profiles", "rides.trips", "payments.transactions"],
"request_ticket": "GDPR-2024-01-789",
}
print(json.dumps(deletion_record))
# Write this record to a compliance audit table
Data Residency
For organizations with data residency requirements (EU data must stay in EU), the lakehouse architecture needs explicit enforcement:
# Partition data by region and apply S3 bucket policies
# EU data goes to eu-west-1 bucket, US data to us-east-1 bucket
eu_trips = trips_df.filter(col("region") == "EU")
us_trips = trips_df.filter(col("region") == "US")
eu_trips.write.format("iceberg") \
.option("warehouse", "s3://my-lakehouse-eu/iceberg") \ # EU bucket
.saveAsTable("eu_catalog.rides.trips")
us_trips.write.format("iceberg") \
.option("warehouse", "s3://my-lakehouse-us/iceberg") \ # US bucket
.saveAsTable("us_catalog.rides.trips")
# IAM policies ensure that EU users cannot access the US bucket and vice versa
# Unity Catalog enforces catalog-level isolation on top of S3 bucket policies
Data Catalog: Discovery and Context
A data catalog is the human-facing interface to governance metadata. It enables data consumers to find datasets, understand what they contain, assess their quality, and determine access requirements - without needing to know which S3 path to query.
Apache Atlas
Atlas is the open-source metadata governance tool from the Hadoop ecosystem. It stores metadata about tables, columns, lineage relationships, and governance tags (like PII, CONFIDENTIAL, FINANCIAL).
# Apache Atlas REST API - tagging a column as PII
import requests
atlas_url = "http://atlas:21000"
auth = ("admin", "admin")
# Create a PII classification (tag type)
tag_definition = {
"classificationDefs": [{
"name": "PII",
"description": "Personally Identifiable Information",
"superTypes": [],
"attributeDefs": [
{
"name": "pii_type",
"typeName": "string",
"isOptional": True,
"cardinality": "SINGLE",
}
]
}]
}
requests.post(
f"{atlas_url}/api/atlas/v2/types/typedefs",
json=tag_definition,
auth=auth
)
# Apply PII tag to the 'email' column in the customers table
column_guid = "12345678-abcd-1234-abcd-123456789abc" # GUID from Atlas entity catalog
requests.post(
f"{atlas_url}/api/atlas/v2/entity/guid/{column_guid}/classifications",
json=[{
"typeName": "PII",
"attributes": {"pii_type": "email_address"}
}],
auth=auth
)
DataHub
DataHub is the modern open-source alternative to Atlas, with better Iceberg integration, a React-based UI, and a more active community. It uses an event-driven metadata architecture (all metadata changes are emitted as Kafka events):
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
GlobalTagsClass,
TagAssociationClass,
TagKeyClass,
SchemaMetadataClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
StringTypeClass,
)
from datahub.emitter.mce_builder import (
make_dataset_urn,
make_tag_urn,
)
emitter = DatahubRestEmitter("http://datahub-gms:8080")
dataset_urn = make_dataset_urn(
platform="iceberg",
name="production.rides.trips",
env="PROD"
)
# Emit dataset metadata to DataHub
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
# Tag the table as containing PII
tags_aspect = GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn("PII")),
TagAssociationClass(tag=make_tag_urn("SENSITIVE")),
]
)
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=tags_aspect,
)
emitter.emit(mcp)
print(f"Tagged {dataset_urn} with PII and SENSITIVE")
Production Engineering Notes
Governance Incident Response Playbook
When a governance incident occurs (unauthorized access, data leak, audit finding), the first 30 minutes determine the outcome. Have these queries ready:
-- Unity Catalog: who queried this table in the last 30 days?
SELECT
user_name,
action_name,
request_params.table_full_name AS table_queried,
event_time,
source_ip_address
FROM system.access.audit
WHERE request_params.table_full_name = 'production.customers.profiles'
AND event_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
ORDER BY event_time DESC;
-- What columns did they access?
SELECT
user_name,
query_text,
event_time
FROM system.query.history
WHERE contains(query_text, 'customers.profiles')
AND contains(query_text, 'ssn')
AND event_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
ORDER BY event_time DESC;
Automated PII Detection
Before data reaches the governed catalog, scan it for PII patterns using automated detection:
import re
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
PII_PATTERNS = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d[ -]*?){13,16}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b[\+]?[(]?[0-9]{3}[)]?[-\s\.]?[0-9]{3}[-\s\.]?[0-9]{4,6}\b",
}
def scan_for_pii(df: DataFrame, sample_fraction: float = 0.01) -> dict:
"""
Scan a DataFrame sample for PII patterns.
Returns: {column_name: [detected_pii_types]}
"""
sample_df = df.sample(fraction=sample_fraction)
string_columns = [
f.name for f in df.schema.fields
if str(f.dataType) == "StringType()"
]
findings = {}
for col_name in string_columns:
col_values = [
str(row[col_name])
for row in sample_df.select(col_name).collect()
if row[col_name] is not None
]
detected = []
for pii_type, pattern in PII_PATTERNS.items():
matches = sum(1 for v in col_values if re.search(pattern, v))
if matches / max(len(col_values), 1) > 0.05: # >5% match rate = flag it
detected.append(pii_type)
if detected:
findings[col_name] = detected
print(f"WARNING: Column '{col_name}' contains potential PII: {detected}")
return findings
# Run before writing to governed tables
raw_df = spark.read.format("csv").load("s3://raw-uploads/customer-data.csv")
pii_findings = scan_for_pii(raw_df)
if pii_findings:
print("PII detected - applying masking before write")
# Apply masking transformations before writing to governed catalog
:::danger Direct S3 Access Bypasses All Table-Level Security Column-level security, row filters, and dynamic masking are all enforced at the query engine layer. If a user has direct S3 read permissions, they can download the raw Parquet files and read every column - including masked ones - without going through any governance layer. This is the single most critical governance gap in most lakehouse deployments. Solution: S3 bucket policies must deny direct object-level access to all principals except the service account running the query engine. Users must query through the engine, never directly through S3. :::
:::warning Time Travel and Right to Erasure Are Fundamentally in Conflict
When a user exercises their GDPR right to erasure, you delete their record from the current snapshot. But Iceberg's time travel feature means the record still exists in every historical snapshot created before the deletion. If the snapshot retention period is 90 days, the deleted record is accessible for 90 more days via SELECT * FROM table TIMESTAMP AS OF '2024-01-01'. The only complete solution is to expire all historical snapshots immediately after deletion AND run rewrite_data_files to physically remove the row from Parquet files. This destroys your time travel history for that table - a real trade-off. Document this limitation explicitly in your compliance posture and discuss with your legal team whether snapshot history before the deletion request must also be purged.
:::
:::tip Tag Governance at the Column Level, Not the Table Level A common mistake is applying governance tags (PII, SENSITIVE, FINANCIAL) at the table level and assuming this is sufficient. It is not - if a table contains 50 columns and only 3 contain PII, table-level tagging either over-restricts access (analysts can't use the non-PII columns) or under-restricts it (PII columns are accessible to anyone with table access). Always tag at the column level, apply masking policies at the column level, and grant access at the column level for sensitive attributes. :::
:::note The Data Governance Maturity Ladder Most organizations progress through four stages of lakehouse governance maturity. Stage 1: no catalog, no access control (S3 bucket policies only), no lineage. Stage 2: a catalog exists (Glue, Hive Metastore), table-level access control, manual lineage documentation. Stage 3: column-level security, automated PII tagging, OpenLineage integration, automated audit logs. Stage 4: real-time lineage, impact analysis on schema changes, data quality SLAs enforced as pipeline gates, automated compliance reporting. Most companies are at Stage 2. Aiming for Stage 3 is the right medium-term goal for regulated industries. :::
Interview Questions and Answers
Q1: What are the three biggest data governance challenges specific to lakehouses, and how do you address each?
Answer:
Challenge 1: Open storage bypasses query-engine security
In a traditional warehouse, all access goes through the query engine, which enforces access controls. In a lakehouse, data is stored as open Parquet files in S3. Any user with S3 read permissions can download the raw files, bypassing column-level security, row filters, and dynamic masking entirely.
Solution: Apply two layers of control. First, restrict S3 bucket access to the service account(s) running your query engines (Spark, Trino) only - no individual users should have direct S3 access. Second, enforce column-level security and row filters at the query engine layer via Unity Catalog or Apache Ranger. Both layers are necessary; either alone is insufficient.
Challenge 2: Schema evolution silently breaks downstream consumers
Lakehouse tables support schema evolution - adding columns, changing types, renaming fields. Without governance tooling, a schema change to a source table can silently break downstream dbt models, ML feature pipelines, and dashboards without any alert.
Solution: Implement a schema registry or use Unity Catalog's impact analysis feature to understand which downstream tables consume a given column before making changes. Establish a schema change management process: any change to a table marked "production" requires review of its downstream lineage graph. Use Iceberg's schema evolution with write validation mode to prevent accidental incompatible changes.
Challenge 3: Multi-engine lineage is fragmented
Data moves through Spark (ETL), dbt (transformations), Airflow (orchestration), and Flink (streaming) - each with its own lineage model. Understanding the complete transformation chain from source to dashboard requires stitching together lineage from multiple systems.
Solution: Adopt the OpenLineage standard. Any tool that emits OpenLineage events (Spark, Airflow, dbt, Flink all have integrations) feeds into a single lineage backend (Marquez or DataHub). This creates a unified lineage graph that spans all engines and orchestration tools, giving you column-level provenance from source system to serving layer.
Q2: Explain the difference between dynamic masking and tokenization. When would you use each?
Answer:
Dynamic masking applies obfuscation at query time. The underlying stored data is unchanged. When an unauthorized user queries a masked column, the governance layer intercepts the result and replaces real values with masked versions (e.g., XXX-XX-6789 instead of 123-45-6789). Authorized users see the real value in the same query.
Use dynamic masking when:
- Data masking is for access control purposes (not all users should see PII)
- The original data must remain intact for authorized operations (customer support, fraud investigation)
- Masking needs are role-based and change as users change roles
- Example: SSNs masked for analysts, visible for fraud investigators
Deterministic tokenization replaces identifying values with fixed tokens at write time. The same input always produces the same token. The token cannot be reversed without the secret key. The stored data never contains the original value.
Use tokenization when:
- Data is used for ML training where the model must learn per-entity patterns (a model needs to know that all events with token
AbCd1234are the same user, without knowing who that user is) - Data is shared with third parties who should never see the original PII
- The original value is not needed for any downstream computation
- Regulatory requirements mandate that PII is never stored in the analytical environment
The key distinction: dynamic masking is a access control mechanism applied to stored PII; tokenization is a data transformation that removes PII from the stored data entirely. Both may be used in the same organization - tokenization for ML features, dynamic masking for analyst reporting.
Q3: How does data lineage work in practice, and why is column-level lineage more valuable than table-level lineage?
Answer:
Table-level lineage tells you: "Table B was derived from Table A." This is useful for understanding high-level data flow but insufficient for operational governance.
Consider this scenario: Table A has 60 columns. Table B has 30 columns derived from Table A via various aggregations and joins. Table-level lineage tells you "B depends on A." But it doesn't tell you:
- Which of A's 60 columns are used in B (so you know which column changes in A matter)
- How the values are transformed (SUM? JOIN key? FILTER condition?)
- Whether column
credit_scorein B comes fromraw_credit_scorein A unchanged, or from an average of multiple source columns
Column-level lineage answers all of these. It creates a directed graph at the column granularity:
production.customers.raw_credit_score
↓ (used as-is)
curated.customers.credit_score
↓ (averaged with bureau score)
analytics.risk.customer_risk_score
↓ (feature in ML model)
ml.models.loan_approval.feature_set.credit_score_feature
This level of detail is required for:
-
Impact analysis: "I need to change the type of
raw_credit_scorefrom INT to FLOAT. What downstream tables and models will be affected?" With column-level lineage, you get an exact list in seconds. Without it, you manually trace through dbt files and Spark jobs. -
Regulatory audit: "This loan was denied. What data did the model use, and where did each feature come from?" Column-level lineage provides a complete, auditable provenance chain.
-
Data quality root cause: "Our revenue dashboard shows a $2M anomaly this week. Which source column introduced the error?" Column-level lineage lets you trace from the dashboard metric backward to the exact source column and transformation step that introduced the error.
Q4: How would you implement GDPR right-to-erasure compliance in an Iceberg-based lakehouse?
Answer:
Right to erasure in an Iceberg lakehouse requires addressing three layers:
Layer 1: Current snapshot deletion
Use Iceberg's DELETE statement to remove the subject's records from the current snapshot:
DELETE FROM production.customers.profiles WHERE user_id = :user_id;
DELETE FROM production.rides.trips WHERE user_id = :user_id;
DELETE FROM production.payments.transactions WHERE user_id = :user_id;
Iceberg DELETE creates a new snapshot with deletion markers. The physical Parquet files are not immediately changed.
Layer 2: Physical removal from Parquet files
The deletion markers must be materialized into new Parquet files:
-- Rewrite data files to physically remove deleted rows
CALL system.rewrite_data_files(
table => 'production.customers.profiles',
where => 'user_id IS NOT NULL'
);
Layer 3: Historical snapshot expiration
The deleted records still exist in snapshots created before the deletion. These must be expired:
-- Expire all snapshots to remove historical access to deleted data
-- WARNING: This destroys time-travel history for the table
CALL system.expire_snapshots(
table => 'production.customers.profiles',
older_than => NOW(),
retain_last => 1
);
The compliance tension: Snapshot expiration destroys time-travel capability. For highly regulated industries, you must decide upfront whether time-travel history is incompatible with right-to-erasure requirements and design accordingly. One common pattern: use a "staging" retention zone where data is held for 30 days (for debugging) and expires automatically, rather than indefinite time travel. Another pattern: encrypt PII columns with per-user keys. Key deletion immediately makes the historical data unreadable without changing any snapshots - the physical bits exist but are cryptographically indecipherable.
Document the deletion in a compliance audit table with the user ID, timestamp, tables affected, and ticket reference. This documentation is itself protected PII and must be handled accordingly.
Q5: What is the difference between Apache Atlas and Unity Catalog? When would you use each?
Answer:
Apache Atlas is an open-source metadata governance framework from the Hadoop ecosystem. It stores metadata about datasets, processes, and their relationships. It supports a rich tag/classification system (PII, SENSITIVE, FINANCIAL) and generates lineage graphs from Hive, HBase, and Kafka operations. It integrates with Apache Ranger for policy enforcement.
Use Atlas when:
- You're operating an on-premises Hadoop stack with Hive, HDFS, and HBase
- You need open-source governance with no vendor lock-in
- Your team has Hadoop operational experience
- Budget constraints prohibit commercial solutions
Limitations of Atlas: the UI is dated, Iceberg integration is limited (Atlas was designed before Iceberg existed), and the ecosystem of tools that emit Atlas lineage events is narrower than OpenLineage-based tools.
Unity Catalog is Databricks' commercial unified governance layer. It provides a three-level namespace, column-level security, row filters, dynamic masking, automatic lineage (for all Unity Catalog SQL operations), Delta Sharing, and an audit log that answers governance questions via SQL.
Use Unity Catalog when:
- You're running on Databricks (it integrates natively - the governance layer and the compute layer are one product)
- You need the most complete, production-ready governance solution available
- Your primary query engines are Spark and Databricks SQL
- Operational simplicity is more important than vendor neutrality
The key difference: Atlas requires significant integration work (configuring lineage hooks for every tool, writing custom Ranger policies, operating both services). Unity Catalog is a managed service that "just works" for Databricks-native workloads but creates a dependency on the Databricks platform. Neither is universally better - the right choice depends on your existing stack.
For polyglot environments (Spark + Trino + Flink + dbt, multiple clouds), DataHub (open-source, OpenLineage-native, REST API integrations) is increasingly the best choice.
Q6: How do you audit who accessed a specific dataset in the last 90 days in a lakehouse?
Answer:
The answer depends on your governance stack, but the pattern is consistent: you need query-level audit logs (who ran what SQL and when) plus, for complete compliance, direct storage access logs (S3 CloudTrail).
With Unity Catalog (Databricks):
-- Query the system audit log - answers in seconds
SELECT
user_name,
action_name,
event_time,
request_params.table_full_name AS table_accessed,
source_ip_address,
user_agent
FROM system.access.audit
WHERE request_params.table_full_name = 'production.customers.profiles'
AND event_time >= DATEADD(day, -90, CURRENT_TIMESTAMP())
ORDER BY event_time DESC;
Without Unity Catalog (self-managed stack):
Enable query logging in your query engine:
# Trino: enable query event listener
event-listener.config-files=/etc/trino/query-logger.properties
# query-logger.properties
event-listener.name=http-event-listener
http-event-listener.log-completed=true
http-event-listener.log-created=true
For Spark:
# SparkListener to capture query events
spark.conf.set("spark.extraListeners", "com.company.audit.SparkAuditListener")
Store all query events (user, timestamp, SQL text, tables accessed) in a queryable table. For compliance, this audit table itself must be immutable - use an append-only Iceberg table with restricted DELETE permissions.
Supplement with AWS CloudTrail analysis for direct S3 access (to catch anyone who bypassed the query engine):
# Athena query on CloudTrail S3 event logs
# Find direct S3 GetObject calls on the raw Parquet files
query = """
SELECT
useridentity.arn AS principal,
eventtime,
requestparameters.bucketname AS bucket,
requestparameters.key AS object_key
FROM cloudtrail_logs
WHERE eventsource = 's3.amazonaws.com'
AND eventname = 'GetObject'
AND requestparameters.key LIKE 'iceberg/customers/profiles/%'
AND eventtime >= '2024-01-01T00:00:00Z'
ORDER BY eventtime DESC
"""
The combination of query-level audit logs and CloudTrail coverage is required for a complete compliance answer. Query logs alone miss direct S3 access; CloudTrail alone lacks context about what SQL was run.
