Skip to main content

Databricks for MLOps

The Streaming Pipeline That Could Not Keep Up

The ML platform team at an e-commerce company had a real-time recommendation engine they were proud of. It ran on Kafka → Flink → Redis → FastAPI, and it could score a user's next recommended product in under 20 milliseconds. The problem was the features. There were 847 features per user. Computing them required joining 14 tables - user events, purchase history, product catalog, inventory, promotions, seasonal trends, and six more. The offline feature computation was running in Spark, taking four hours per night, and the results were loaded into Redis by 6 AM.

Then the business changed the refresh requirement. Marketing wanted features updated every 15 minutes so that the recommendations reflected what a user had clicked in the current session. Four hours became 15 minutes. The Spark job that worked overnight could not be made fast enough. The Redis memory footprint for sub-minute-fresh features on 80 million users was astronomical. The Flink pipeline for streaming feature computation turned into a six-month engineering project that was still not done.

The data engineering team was already on Databricks. They were using Delta Lake for their data lake, running Spark for ETL, and had recently migrated to Unity Catalog for governance. The ML team had been running MLflow locally. The question was whether Databricks could solve the feature serving problem - bring the feature computation and the model serving close enough to the data that the 15-minute requirement was achievable.

This lesson documents what that architecture looks like: how Databricks ties together Delta Lake for historical features, the Databricks Feature Store for training-serving consistency, MLflow for experiment tracking and model registry, and Databricks Model Serving for low-latency inference - all within a single Lakehouse platform that eliminates the traditional data engineering / ML engineering divide.


:::tip 🎮 Interactive Playground Visualize this concept: Try the Cloud ML Platforms Compared demo on the EngineersOfAI Playground - no code required. :::

Why This Exists - The Lakehouse Architecture for ML

The traditional separation between a data lake (raw storage, Spark, cheap) and a data warehouse (structured, SQL, fast) created a painful ML workflow. Training data came from the lake. Features for online serving came from the warehouse or a separate feature store. The two environments used different technologies, had different schemas, and were maintained by different teams. The inevitable result was training-serving skew: the features computed in Spark for training were subtly different from the features computed in SQL for serving.

The Lakehouse architecture, pioneered by Databricks, merges these two worlds. Delta Lake adds ACID transactions, schema enforcement, and time travel to object storage (S3/GCS/ADLS). Unity Catalog adds governance - data lineage, access control, and a unified metastore - across the entire Lakehouse. The result is a platform where data engineering, analytics, and ML share the same data layer, the same governance model, and increasingly the same compute.

For ML specifically, this matters because:

  1. Feature computation happens where the data lives - no ETL to a separate feature database
  2. Training and serving use the same feature definitions - registered in the Databricks Feature Store
  3. Time travel enables point-in-time feature lookups - critical for preventing label leakage
  4. MLflow is embedded - every Databricks cluster has MLflow available; tracking is not an add-on

:::note Historical Context Databricks was founded in 2013 by the creators of Apache Spark at UC Berkeley. Delta Lake was open-sourced in 2019. The Databricks Feature Store launched in 2021. MLflow was open-sourced by Databricks in 2018 and has since become the de facto open-source ML experiment tracking standard. Unity Catalog launched in 2022 as a unified governance layer. Databricks Model Serving reached general availability in 2023 with GPU support for LLM serving. :::


The Databricks Lakehouse Architecture for ML


Delta Lake for ML Workflows

Delta Lake is the foundation. Understanding a few key features is essential for ML use cases.

Time Travel

Delta Lake stores a transaction log that lets you query any previous version of a table. For ML, this means you can always recreate the exact training dataset used to train any model.

from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder \
.appName("ml-feature-pipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

# Write features to Delta
features_df.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("ml_platform.features.user_churn_features")

# Time travel: read the table as it was at a specific version
historical_df = spark.read.format("delta") \
.option("versionAsOf", 42) \
.table("ml_platform.features.user_churn_features")

# Or by timestamp - recreate training data from 3 months ago
historical_df = spark.read.format("delta") \
.option("timestampAsOf", "2025-10-15") \
.table("ml_platform.features.user_churn_features")

# Show table history
dt = DeltaTable.forName(spark, "ml_platform.features.user_churn_features")
dt.history(10).select(
"version", "timestamp", "operation", "operationParameters"
).show(truncate=False)

Schema Enforcement and Evolution

Delta Lake enforces schema by default. This prevents silent data corruption when upstream pipelines change.

# Schema enforcement catches errors before they reach the model
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

expected_schema = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("feature_timestamp", StringType(), nullable=False),
StructField("days_since_last_purchase", IntegerType(), nullable=True),
StructField("total_revenue_30d", DoubleType(), nullable=True),
StructField("session_count_7d", IntegerType(), nullable=True),
])

# This will FAIL if upstream schema changes break ML assumptions
# (e.g., total_revenue_30d changes type to String)
features_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "false") \
.saveAsTable("ml_platform.features.user_churn_features")

# Controlled schema evolution (only when you intend it)
features_df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("ml_platform.features.user_churn_features")

MERGE for Incremental Feature Updates

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

def upsert_features(
spark: SparkSession,
new_features_df,
target_table: str,
merge_key: str = "user_id",
):
"""Incrementally update feature table using Delta MERGE."""
target = DeltaTable.forName(spark, target_table)

target.alias("target").merge(
new_features_df.alias("source"),
f"target.{merge_key} = source.{merge_key}"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

print(f"MERGE complete on {target_table}")
print(target.history(1).select("version", "operationMetrics").collect()[0])

Databricks Feature Store

The Feature Store solves training-serving skew at the architectural level. Features are defined once as Python functions (feature engineering logic), registered in the Feature Store, and used for both training and serving.

Creating a Feature Store Table

from databricks.feature_store import FeatureStoreClient
from databricks.feature_store.entities.feature_lookup import FeatureLookup
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

fs = FeatureStoreClient()

def compute_user_features(user_events_df: DataFrame) -> DataFrame:
"""
Feature engineering function.
Same logic used for training AND serving.
"""
return user_events_df.groupBy("user_id").agg(
F.countDistinct("session_id").alias("session_count_30d"),
F.sum(
F.when(F.col("event_type") == "purchase", F.col("revenue")).otherwise(0)
).alias("total_revenue_30d"),
F.max("event_timestamp").alias("last_event_timestamp"),
F.count(
F.when(F.col("event_type") == "page_view", 1)
).alias("page_view_count_30d"),
F.countDistinct("product_id").alias("unique_products_viewed_30d"),
).withColumn(
"days_since_last_activity",
F.datediff(
F.current_date(),
F.to_date(F.col("last_event_timestamp"))
)
)


# Load raw events (last 30 days)
events_df = spark.read.table("raw.events") \
.filter(F.col("event_date") >= F.date_sub(F.current_date(), 30))

# Compute features
user_features = compute_user_features(events_df)

# Create feature table in Feature Store
fs.create_table(
name="ml_platform.feature_store.user_engagement_features",
primary_keys=["user_id"],
timestamp_keys=["feature_timestamp"], # For point-in-time lookups
df=user_features.withColumn("feature_timestamp", F.current_timestamp()),
schema=user_features.schema,
description="User engagement features for churn and recommendation models",
tags={
"team": "ml-platform",
"refresh_frequency": "daily",
"owner": "data-eng",
},
)

print("Feature table created")

Writing Features to an Existing Table

def refresh_feature_table(spark: SparkSession, fs: FeatureStoreClient):
"""Daily feature refresh pipeline."""
events_df = spark.read.table("raw.events") \
.filter(F.col("event_date") >= F.date_sub(F.current_date(), 30))

user_features = compute_user_features(events_df) \
.withColumn("feature_timestamp", F.current_timestamp())

# Write mode can be "merge" (upsert by primary key) or "overwrite"
fs.write_table(
name="ml_platform.feature_store.user_engagement_features",
df=user_features,
mode="merge", # Upsert - new users added, existing users updated
)

print(f"Feature table refreshed: {user_features.count():,} rows")

Point-in-Time Joins for Training Data

The most important Feature Store capability for ML correctness. When you join features to labels for training, you must use only features that were available at the label timestamp - not future features.

def create_training_dataset(
spark: SparkSession,
fs: FeatureStoreClient,
labels_table: str,
feature_lookups: list,
output_path: str,
):
"""
Create a point-in-time correct training dataset.

labels_table must have columns:
- user_id (join key)
- label_timestamp (as-of time for feature lookup)
- churned (label)
"""
labels_df = spark.read.table(labels_table)

training_df = fs.create_training_set(
df=labels_df,
feature_lookups=[
FeatureLookup(
table_name="ml_platform.feature_store.user_engagement_features",
lookup_key="user_id",
timestamp_lookup_key="label_timestamp", # Point-in-time!
feature_names=[
"session_count_30d",
"total_revenue_30d",
"page_view_count_30d",
"unique_products_viewed_30d",
"days_since_last_activity",
],
),
# Can join multiple feature tables in one training set
FeatureLookup(
table_name="ml_platform.feature_store.user_demographics",
lookup_key="user_id",
timestamp_lookup_key="label_timestamp",
feature_names=["account_age_days", "plan_tier", "country_code"],
),
],
label="churned",
exclude_columns=["user_id", "label_timestamp"],
)

# training_set.load_df() returns the Spark DataFrame with all features joined
training_pd = training_df.load_df().toPandas()

print(f"Training set created: {len(training_pd):,} rows, "
f"{len(training_pd.columns)} columns")
return training_df # Return the TrainingSet for MLflow logging

Online Feature Lookup

For real-time inference, the Feature Store serves features from a low-latency online store (DynamoDB on AWS, Cosmos DB on Azure, or Firestore on GCP depending on your cloud).

# Online lookup - used during inference
def get_user_features_for_inference(user_ids: list) -> dict:
"""
Look up features for real-time inference.
Returns same features as training (no skew).
"""
feature_data = fs.get_table(
"ml_platform.feature_store.user_engagement_features"
).read(user_ids)

# Returns a dict: {user_id: {feature_name: value}}
return {
row.user_id: {
"session_count_30d": row.session_count_30d,
"total_revenue_30d": row.total_revenue_30d,
"page_view_count_30d": row.page_view_count_30d,
"unique_products_viewed_30d": row.unique_products_viewed_30d,
"days_since_last_activity": row.days_since_last_activity,
}
for row in feature_data
}

MLflow on Databricks

Databricks has MLflow built in - every cluster has it available, and every workspace has a managed MLflow tracking server.

Experiment Tracking

import mlflow
import mlflow.xgboost
from mlflow.models.signature import infer_signature
import xgboost as xgb
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

mlflow.set_experiment("/Users/[email protected]/churn-experiments")

def train_with_mlflow_tracking(
training_set, # Databricks FeatureStore TrainingSet
params: dict,
):
with mlflow.start_run(run_name=f"xgboost-{pd.Timestamp.now().strftime('%Y%m%d-%H%M')}") as run:
# Log all parameters
mlflow.log_params(params)

# Load data from the Feature Store training set
df = training_set.load_df().toPandas()
feature_cols = [c for c in df.columns if c != "churned"]
X = df[feature_cols]
y = df["churned"]

X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)

# Train
model = xgb.XGBClassifier(**params, use_label_encoder=False,
eval_metric="logloss")
model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=20,
verbose=False,
callbacks=[
# Log eval metrics to MLflow every 10 rounds
mlflow.xgboost.autolog(
log_every_n_iter=10,
disable=False,
)
],
)

# Compute final metrics
val_probs = model.predict_proba(X_val)[:, 1]
val_auc = roc_auc_score(y_val, val_probs)

mlflow.log_metrics({
"val_auc": val_auc,
"n_train": len(X_train),
"n_val": len(X_val),
"best_iteration": model.best_iteration,
})

# Log model with Feature Store integration
# This logs the feature lookup logic WITH the model
# so the serving layer can automatically fetch features
fs.log_model(
model=model,
artifact_path="model",
flavor=mlflow.xgboost,
training_set=training_set, # Embeds feature lookup in the model
registered_model_name="churn-xgboost",
# Infer schema from training data for input validation
signature=infer_signature(X_train, model.predict_proba(X_train)),
)

print(f"Run ID: {run.info.run_id}")
print(f"Validation AUC: {val_auc:.4f}")
return run.info.run_id, val_auc

Hyperparameter Tuning with Hyperopt

Databricks integrates natively with Hyperopt for distributed hyperparameter search. Each trial runs on a separate Spark executor.

from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll.base import Apply
import mlflow

def hyperopt_objective(params):
"""Objective function for Hyperopt - returns loss to minimize."""
params["max_depth"] = int(params["max_depth"])
params["n_estimators"] = int(params["n_estimators"])

with mlflow.start_run(nested=True):
mlflow.log_params(params)

X_train, X_val, y_train, y_val = get_training_data() # your function

model = xgb.XGBClassifier(**params, use_label_encoder=False,
eval_metric="logloss")
model.fit(X_train, y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=15,
verbose=False)

val_auc = roc_auc_score(y_val, model.predict_proba(X_val)[:, 1])
mlflow.log_metric("val_auc", val_auc)

return {"loss": -val_auc, "status": STATUS_OK}


search_space = {
"max_depth": hp.quniform("max_depth", 3, 10, 1),
"n_estimators": hp.quniform("n_estimators", 50, 500, 50),
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)),
"subsample": hp.uniform("subsample", 0.6, 1.0),
"colsample_bytree": hp.uniform("colsample_bytree", 0.6, 1.0),
}

with mlflow.start_run(run_name="hyperopt-search"):
best_params = fmin(
fn=hyperopt_objective,
space=search_space,
algo=tpe.suggest,
max_evals=50,
trials=Trials(),
)

print(f"Best params: {best_params}")

MLflow Model Registry - Staging and Production Promotion

import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()

def promote_to_staging(model_name: str, run_id: str) -> str:
"""Register a run artifact as a new model version and move to Staging."""

# Get the model URI from the run
model_uri = f"runs:/{run_id}/model"

# Register the model (creates a new version)
model_version = mlflow.register_model(
model_uri=model_uri,
name=model_name,
)

version_num = model_version.version
print(f"Registered model version {version_num}")

# Add run metadata as tags
run = client.get_run(run_id)
client.set_model_version_tag(
name=model_name,
version=version_num,
key="val_auc",
value=run.data.metrics.get("val_auc", "unknown"),
)
client.set_model_version_tag(
name=model_name,
version=version_num,
key="trained_at",
value=str(pd.Timestamp.now()),
)

# Transition to Staging
client.transition_model_version_stage(
name=model_name,
version=version_num,
stage="Staging",
archive_existing_versions=False, # Keep existing staging for comparison
)

print(f"Model {model_name} v{version_num} moved to Staging")
return version_num


def promote_to_production(model_name: str, version: str):
"""Promote a Staging model to Production after validation."""

# Validate the model is in Staging
model_version = client.get_model_version(model_name, version)
if model_version.current_stage != "Staging":
raise ValueError(
f"Model version {version} is in stage '{model_version.current_stage}', "
f"not 'Staging'. Must be in Staging before promoting to Production."
)

# Check minimum AUC gate
val_auc = float(model_version.tags.get("val_auc", 0))
if val_auc < 0.78:
raise ValueError(
f"Model AUC {val_auc:.4f} below production threshold 0.78. "
f"Promotion rejected."
)

# Promote to Production, archive previous production
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
archive_existing_versions=True, # Archive the previous production version
)

client.set_model_version_tag(
name=model_name,
version=version,
key="promoted_to_prod_at",
value=str(pd.Timestamp.now()),
)

print(f"Model {model_name} v{version} promoted to Production")


def get_production_model(model_name: str):
"""Load the current production model."""
model_uri = f"models:/{model_name}/Production"
model = mlflow.xgboost.load_model(model_uri)
return model


def setup_registry_webhook(model_name: str, webhook_url: str):
"""Set up a webhook to notify Slack when model transitions to Production."""
from mlflow.tracking.registry import ModelRegistryClient
from mlflow.entities.model_registry.model_version_stages import ALL_STAGES

client.create_registry_webhook(
events=["MODEL_VERSION_TRANSITIONED_TO_PRODUCTION"],
http_url_spec={
"url": webhook_url,
"enable_ssl_verification": True,
},
model_name=model_name,
status="ACTIVE",
)
print(f"Webhook created for {model_name} → Production transitions")

Databricks Model Serving

Databricks Model Serving deploys MLflow models as REST API endpoints. Models registered in the Model Registry can be deployed with one API call or through the Databricks UI.

import requests
import json

def create_model_serving_endpoint(
workspace_url: str,
token: str,
endpoint_name: str,
model_name: str,
model_version: str,
workload_size: str = "Small", # Small / Medium / Large
scale_to_zero: bool = True,
):
"""
Create a Databricks Model Serving endpoint.
workload_size: Small (0-4 concurrent requests), Medium, Large.
"""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}

config = {
"name": endpoint_name,
"config": {
"served_models": [
{
"name": f"{model_name}-v{model_version}",
"model_name": model_name,
"model_version": model_version,
"workload_size": workload_size,
"scale_to_zero_enabled": scale_to_zero,
}
],
"traffic_config": {
"routes": [
{
"served_model_name": f"{model_name}-v{model_version}",
"traffic_percentage": 100,
}
]
},
},
}

response = requests.post(
f"{workspace_url}/api/2.0/serving-endpoints",
headers=headers,
json=config,
)

if response.status_code == 200:
print(f"Endpoint '{endpoint_name}' created successfully")
else:
raise RuntimeError(f"Failed to create endpoint: {response.text}")

return response.json()


def canary_update_endpoint(
workspace_url: str,
token: str,
endpoint_name: str,
model_name: str,
old_version: str,
new_version: str,
canary_pct: int = 10,
):
"""Update endpoint to split traffic between old and new model versions."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}

config = {
"served_models": [
{
"name": f"{model_name}-v{old_version}",
"model_name": model_name,
"model_version": old_version,
"workload_size": "Small",
"scale_to_zero_enabled": True,
},
{
"name": f"{model_name}-v{new_version}",
"model_name": model_name,
"model_version": new_version,
"workload_size": "Small",
"scale_to_zero_enabled": True,
},
],
"traffic_config": {
"routes": [
{
"served_model_name": f"{model_name}-v{old_version}",
"traffic_percentage": 100 - canary_pct,
},
{
"served_model_name": f"{model_name}-v{new_version}",
"traffic_percentage": canary_pct,
},
]
},
}

response = requests.put(
f"{workspace_url}/api/2.0/serving-endpoints/{endpoint_name}/config",
headers=headers,
json=config,
)

if response.status_code == 200:
print(f"Canary update: {canary_pct}% to v{new_version}, "
f"{100-canary_pct}% to v{old_version}")
else:
raise RuntimeError(f"Failed to update endpoint: {response.text}")


def score_endpoint(
workspace_url: str,
token: str,
endpoint_name: str,
data: dict,
) -> dict:
"""Call a Databricks Model Serving endpoint."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}

# Databricks Model Serving expects {"dataframe_records": [...]} or
# {"dataframe_split": {"columns": [...], "data": [[...]]}}
payload = {
"dataframe_records": data if isinstance(data, list) else [data]
}

response = requests.post(
f"{workspace_url}/serving-endpoints/{endpoint_name}/invocations",
headers=headers,
json=payload,
)

return response.json()


# Example: score with Feature Store-backed model
# When model is logged via fs.log_model(), the serving endpoint
# automatically fetches features from the online store by user_id
result = score_endpoint(
workspace_url="https://adb-1234567890.azuredatabricks.net",
token=dbutils.secrets.get("ml-secrets", "databricks-token"),
endpoint_name="churn-prediction",
data={"user_id": "user_12345"}, # Feature lookup happens server-side!
)
print(result)

Spark-Scale Feature Engineering Pipeline

For large-scale feature engineering, PySpark is the right tool. Here is a production feature engineering pipeline with proper structure.

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType
from databricks.feature_store import FeatureStoreClient
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class ChurnFeaturePipeline:
"""
Production feature engineering pipeline.
Designed for daily refresh of churn features across 80M+ users.
"""

def __init__(self, spark: SparkSession, feature_store_client: FeatureStoreClient):
self.spark = spark
self.fs = feature_store_client
self.feature_table = "ml_platform.feature_store.user_churn_features"

def compute_recency_features(self, events_df: DataFrame) -> DataFrame:
"""Days since various event types."""
today = F.current_date()

purchase_recency = events_df \
.filter(F.col("event_type") == "purchase") \
.groupBy("user_id") \
.agg(F.max("event_date").alias("last_purchase_date"))

login_recency = events_df \
.filter(F.col("event_type") == "login") \
.groupBy("user_id") \
.agg(F.max("event_date").alias("last_login_date"))

return purchase_recency.join(login_recency, "user_id", "outer") \
.withColumn(
"days_since_last_purchase",
F.datediff(today, F.col("last_purchase_date"))
) \
.withColumn(
"days_since_last_login",
F.datediff(today, F.col("last_login_date"))
)

def compute_frequency_features(self, events_df: DataFrame) -> DataFrame:
"""Rolling count features over multiple windows."""
windows = [7, 30, 90]
dfs = []

for days in windows:
cutoff = F.date_sub(F.current_date(), days)
window_df = events_df \
.filter(F.col("event_date") >= cutoff) \
.groupBy("user_id") \
.agg(
F.count("*").alias(f"event_count_{days}d"),
F.countDistinct("session_id").alias(f"session_count_{days}d"),
F.sum(
F.when(F.col("event_type") == "purchase",
F.col("revenue")).otherwise(0)
).alias(f"revenue_{days}d"),
)
dfs.append(window_df)

# Join all window features
result = dfs[0]
for df in dfs[1:]:
result = result.join(df, "user_id", "outer")
return result

def compute_trend_features(self, events_df: DataFrame) -> DataFrame:
"""Month-over-month trends - is the user more or less active?"""
current_month = events_df \
.filter(F.col("event_date") >= F.date_sub(F.current_date(), 30)) \
.groupBy("user_id") \
.agg(F.count("*").alias("events_current_month"))

prev_month = events_df \
.filter(
(F.col("event_date") >= F.date_sub(F.current_date(), 60)) &
(F.col("event_date") < F.date_sub(F.current_date(), 30))
) \
.groupBy("user_id") \
.agg(F.count("*").alias("events_prev_month"))

trend = current_month.join(prev_month, "user_id", "outer") \
.na.fill(0) \
.withColumn(
"event_count_mom_change",
(F.col("events_current_month") - F.col("events_prev_month")).cast(DoubleType())
) \
.withColumn(
"event_count_mom_pct_change",
F.when(F.col("events_prev_month") > 0,
(F.col("events_current_month") - F.col("events_prev_month"))
/ F.col("events_prev_month") * 100.0
).otherwise(F.lit(None))
)

return trend

def run(self, lookback_days: int = 90):
"""Execute the full feature pipeline."""
logger.info("Starting feature pipeline refresh")

events_df = self.spark.read.table("raw.user_events") \
.filter(
F.col("event_date") >= F.date_sub(F.current_date(), lookback_days)
)

# Cache events - used in multiple transforms
events_df.cache()
event_count = events_df.count()
logger.info(f"Loaded {event_count:,} events from last {lookback_days} days")

# Compute feature groups
recency = self.compute_recency_features(events_df)
frequency = self.compute_frequency_features(events_df)
trends = self.compute_trend_features(events_df)

# Join all features
all_users = self.spark.read.table("raw.users").select("user_id")

features = all_users \
.join(recency, "user_id", "left") \
.join(frequency, "user_id", "left") \
.join(trends, "user_id", "left") \
.withColumn("feature_timestamp", F.current_timestamp()) \
.na.fill({
"days_since_last_purchase": 9999,
"days_since_last_login": 9999,
"event_count_7d": 0,
"session_count_7d": 0,
"revenue_7d": 0.0,
})

events_df.unpersist()

# Write to Feature Store
self.fs.write_table(
name=self.feature_table,
df=features,
mode="merge",
)

logger.info(f"Feature refresh complete: {features.count():,} user records updated")

Databricks AutoML

AutoML in Databricks generates a Glass Box ML experiment: it produces Python notebooks showing every step of the automated model selection and hyperparameter tuning process. You can read the generated code, understand what it did, and modify it. This is the opposite of a black-box AutoML.

from databricks import automl
from datetime import datetime

def run_automl_experiment(
spark: SparkSession,
training_data_table: str,
target_col: str,
experiment_name: str,
timeout_minutes: int = 30,
):
"""
Run Databricks AutoML on a Delta table.
Returns the best trial's summary and the generated notebook path.
"""
df = spark.read.table(training_data_table)

summary = automl.classify(
dataset=df,
target_col=target_col,
primary_metric="roc_auc",
timeout_minutes=timeout_minutes,
experiment=mlflow.get_experiment_by_name(experiment_name),
time_col=None, # Set if doing time-series splits
)

print(f"Best model: {summary.best_trial.model_description}")
print(f"Best val_roc_auc: {summary.best_trial.validation_metric_value:.4f}")
print(f"Generated notebook: {summary.best_trial.notebook_path}")

# The best model is automatically registered in MLflow
best_model_uri = summary.best_trial.mlflow_run_id
print(f"Best run ID: {best_model_uri}")

return summary

Databricks Workflows for Orchestration

Databricks Workflows (Jobs) orchestrate multi-step ML pipelines across notebooks, Python scripts, and Spark tasks.

import requests

def create_feature_training_workflow(
workspace_url: str,
token: str,
compute_cluster_id: str,
):
"""Create a Databricks Job that runs feature refresh → train → register."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}

job_config = {
"name": "churn-ml-daily-pipeline",
"tags": {"team": "ml-platform", "project": "churn"},
"email_notifications": {
"on_failure": ["[email protected]"],
},
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?", # 2 AM daily
"timezone_id": "UTC",
"pause_status": "UNPAUSED",
},
"tasks": [
{
"task_key": "refresh_features",
"description": "Refresh user engagement features",
"notebook_task": {
"notebook_path": "/ML-Platform/feature-pipeline/refresh-features",
"source": "WORKSPACE",
"base_parameters": {
"lookback_days": "90",
},
},
"existing_cluster_id": compute_cluster_id,
"timeout_seconds": 3600,
},
{
"task_key": "train_model",
"description": "Train churn XGBoost model",
"depends_on": [{"task_key": "refresh_features"}],
"python_wheel_task": {
"package_name": "churn_trainer",
"entry_point": "train",
"parameters": [
"--max-depth=6",
"--n-estimators=300",
"--learning-rate=0.04",
],
},
"new_cluster": {
"spark_version": "13.3.x-ml-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0, # Single node for training
"spark_conf": {
"spark.databricks.delta.preview.enabled": "true",
},
},
"timeout_seconds": 7200,
},
{
"task_key": "validate_and_register",
"description": "Validate and register model to staging",
"depends_on": [{"task_key": "train_model"}],
"notebook_task": {
"notebook_path": "/ML-Platform/model-registry/validate-and-promote",
},
"existing_cluster_id": compute_cluster_id,
},
],
"max_concurrent_runs": 1,
}

response = requests.post(
f"{workspace_url}/api/2.1/jobs/create",
headers=headers,
json=job_config,
)

return response.json()

Unity Catalog for ML Governance

Unity Catalog provides a three-level namespace (catalog.schema.table) and centralized governance across all Databricks assets including Delta tables, Feature Store tables, and MLflow models.

# Create catalog and schema for ML assets
spark.sql("CREATE CATALOG IF NOT EXISTS ml_platform")
spark.sql("USE CATALOG ml_platform")
spark.sql("CREATE SCHEMA IF NOT EXISTS feature_store")
spark.sql("CREATE SCHEMA IF NOT EXISTS training_data")
spark.sql("CREATE SCHEMA IF NOT EXISTS model_outputs")

# Grant access - data scientists can read features but not write
spark.sql("""
GRANT SELECT ON SCHEMA ml_platform.feature_store
TO `data-scientists`
""")

spark.sql("""
GRANT SELECT, MODIFY ON SCHEMA ml_platform.feature_store
TO `ml-platform-engineers`
""")

# Tag tables for data catalog discoverability
spark.sql("""
ALTER TABLE ml_platform.feature_store.user_churn_features
SET TAGS ('domain' = 'churn', 'team' = 'ml-platform', 'pii' = 'false')
""")

# View lineage - Unity Catalog tracks which jobs wrote to which tables
spark.sql("""
SELECT * FROM system.access.audit
WHERE action_name = 'write'
AND request_params.full_name_arg = 'ml_platform.feature_store.user_churn_features'
ORDER BY event_time DESC
LIMIT 20
""").show(truncate=False)

Production Engineering Notes

Delta Table Maintenance

Delta tables accumulate transaction log files and small data files over time. Run OPTIMIZE and VACUUM regularly.

from delta.tables import DeltaTable

def maintain_delta_table(spark: SparkSession, table_name: str):
"""Run optimization and cleanup on a Delta table."""
# Compact small files for better read performance
spark.sql(f"OPTIMIZE {table_name} ZORDER BY (user_id)")

# Remove data files older than 7 days that are no longer referenced
# CAUTION: this limits time travel to 7 days
spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")

print(f"Maintenance complete for {table_name}")

Feature Store Online Sync

Databricks Feature Store syncs to DynamoDB (AWS), Cosmos DB (Azure), or Cloud Bigtable (GCP) for online serving. The sync must be explicitly triggered.

# Publish features to online store
fs.publish_table(
name="ml_platform.feature_store.user_churn_features",
online_store=DynamoDBSpec(
region="us-east-1",
table_name="churn-features-online",
read_secret_prefix="ml-secrets/dynamodb",
write_secret_prefix="ml-secrets/dynamodb",
),
mode="merge", # Only update changed records
)

Common Mistakes

:::danger Training on Future Features (Label Leakage) The single most damaging MLOps mistake in Databricks environments: joining features without point-in-time correctness. If your labels table has a churned column as of date T, and you join features from the feature table without the timestamp_lookup_key parameter, you get features from the most recent snapshot - which may include features computed after date T. This inflates validation AUC by 10-20 points and causes dramatic production underperformance. Always use timestamp_lookup_key in FeatureLookup. If you are not using the Feature Store API for training set creation, implement point-in-time joins manually with a range join on timestamps. :::

:::danger Using the Same Cluster for Development and Production Jobs Databricks clusters have state. Libraries installed interactively in a notebook affect every job running on the same cluster. Production jobs must run on isolated clusters with explicit library declarations. Use Job Clusters (new clusters created for each job run) for production workflows, not existing shared clusters. Job Clusters ensure a clean environment, prevent library conflicts, and improve cost attribution. :::

:::warning Not Archiving Old Model Registry Versions The MLflow Model Registry has no automatic archiving. If you promote model versions to Production without archiving old ones, you accumulate "Production" versions that are not actually serving traffic. Always set archive_existing_versions=True when transitioning to Production. Build a weekly job that checks for orphaned Production versions (registered but not serving) and archives them. :::

:::warning Delta Table Small File Problem Incremental writes to Delta tables (especially via streaming or frequent appends) create many small files, degrading read performance. Symptoms: feature pipeline reads that take 3x longer than they should, Spark DAG showing thousands of tasks for a table with only 1M rows. Fix: run OPTIMIZE table_name ZORDER BY (primary_key) on a schedule. Enable auto-optimize in your Databricks SQL and Spark configs for streaming tables. :::


Interview Q&A

Q1: How does the Databricks Feature Store prevent training-serving skew, and what is the mechanism for point-in-time correctness?

Answer: The Feature Store prevents training-serving skew by ensuring both training and serving use the same feature computation logic. When you register a feature table, the Feature Store tracks the feature definitions. When you log a model with fs.log_model(), the Feature Store embeds the feature lookup specification into the MLflow model artifact. When the model is deployed to Model Serving, the serving endpoint automatically fetches features from the online store using the embedded lookup spec - not from a separate feature computation step. The developer never writes separate training and serving feature code. Point-in-time correctness is achieved in training via the timestamp_lookup_key parameter in FeatureLookup: for each training example with a label timestamp T, the Feature Store joins only feature values that were written to the store at or before T, using the feature_timestamp column in the Delta table. This prevents label leakage from future feature values.

Q2: What is Delta Lake time travel, and how does it solve a specific ML reproducibility problem?

Answer: Delta Lake time travel lets you query a Delta table as it existed at any previous version or timestamp. The transaction log records every write operation, so the table can be reconstructed at any point in history. For ML, this solves the training data reproducibility problem: given a model trained six months ago, you can recreate the exact training dataset by reading the feature table AS OF the timestamp when training occurred. Without time travel, the feature table has been overwritten many times since, and the original training data is gone. Time travel also enables safe feature backfills: you can add new features to historical records and validate them before retraining, then revert with RESTORE TABLE table_name TO VERSION AS OF N if something goes wrong.

Q3: Explain the difference between Databricks Model Serving serverless endpoints and the older cluster-based serving approach.

Answer: Legacy Databricks model serving ran on always-on clusters - you allocated a cluster, loaded the model, and it sat there serving requests (or not). You paid for the cluster 24/7 regardless of traffic. Serverless Model Serving (introduced in 2023) provisions compute on-demand when a request arrives and scales to zero when idle. For models with bursty traffic (e.g., nightly scoring peaks), serverless is dramatically cheaper. The tradeoff is cold start latency - the first request after a zero-scale period takes a few seconds longer while the serving infrastructure initializes. For latency-sensitive production APIs with consistent traffic, scale-to-zero should be disabled. For batch-oriented or low-traffic models, scale-to-zero saves significant cost. GPU endpoints for LLM serving generally should not scale to zero because cold starts require loading multi-GB model weights, adding 30-120 seconds of latency.

Q4: How would you implement a feature pipeline that computes rolling window aggregations over 80 million users efficiently in PySpark?

Answer: Three key optimizations. First, partition the input data by date using Z-ordering on the join key - this allows Spark to prune partitions and reduce I/O. Second, compute all window sizes in a single pass over the data rather than scanning the event table once per window. Load the events once, cache the DataFrame (events_df.cache()), then apply all window filters and aggregations against the cached data. Third, write the results using mode="merge" (Delta MERGE) rather than overwrite - MERGE only touches changed records, which is much faster when only a fraction of users have new activity each day. For extreme scale, partition the user population and use mapPartitions with custom aggregation to avoid shuffle. Profile with Spark UI to identify skewed partitions - users with massive event histories can cause single-partition stragglers that slow the entire job.

Q5: What is the MLflow Model Registry staging workflow, and how do webhooks integrate with CI/CD systems?

Answer: The MLflow Model Registry has four stages: None (just registered), Staging, Production, and Archived. The typical workflow: a training pipeline registers a new version (stage=None), then automatically promotes it to Staging after passing quality gates. A CI/CD webhook fires when the version reaches Staging - the webhook calls an endpoint (GitHub Actions, Jenkins, Azure DevOps) that runs integration tests, loads the model, runs it against a held-out validation set, and checks against production model metrics. If tests pass, the pipeline promotes to Production with archive_existing_versions=True to retire the previous version. Webhooks are configured via the MLflow API with client.create_registry_webhook(events=["MODEL_VERSION_TRANSITIONED_STAGE"], ...). On Databricks, webhooks can also trigger Databricks Jobs directly, enabling a fully in-platform promotion workflow without external CI/CD systems.

Q6: When would you choose Databricks over a dedicated ML platform like Vertex AI or SageMaker?

Answer: Choose Databricks when (1) your data engineering team is already on Databricks and your training data is in Delta Lake - the platform affinity eliminates the ETL layer between data and ML. (2) Your features require Spark-scale computation - 100M+ row aggregations, complex window functions, graph features - that would be prohibitively slow in pandas or a single-node environment. (3) You need a unified governance model across data and ML assets - Unity Catalog provides lineage from raw event to feature to model to prediction in a single platform. (4) You are doing LLM fine-tuning at scale - Databricks has strong integrations with Hugging Face and DeepSpeed, and GPU clusters that autoscale. Choose Vertex AI or SageMaker when you need tighter integration with other cloud-native services (IAM, managed databases, serverless functions), when your data pipeline is already in BigQuery/Redshift and moving it to Delta would be disruptive, or when your team's expertise is in the specific cloud ecosystem rather than Spark.

© 2026 EngineersOfAI. All rights reserved.