Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Model Registry demo on the EngineersOfAI Playground - no code required. :::

Model Registry and Versioning

The 3-Minute Rollback

At 11:47 PM, the on-call engineer's phone rang. The recommendation model was returning zero results for 12% of users. Downstream services were falling back to popularity-based recommendations - degraded but functional. The root cause was immediately obvious: the new model version, deployed two hours earlier, had a preprocessing bug that returned empty tensors for a specific edge case in user history encoding.

With the old system, fixing this would have taken 45 minutes minimum: find the previous model artifact on S3, figure out which deployment config it used, update the serving configuration, restart the serving containers, wait for health checks. The on-call engineer would be manually hunting through S3 bucket paths and Slack messages at midnight.

With the model registry, it took 3 minutes:

# Production model registry rollback - entire process
import mlflow

client = mlflow.MlflowClient()

# Step 1: See what's currently in production
current = client.get_latest_versions("recommendation-model", stages=["Production"])[0]
print(f"Current version: {current.version} (deployed {current.last_updated_timestamp})")

# Step 2: Find the previous production version
all_versions = client.search_model_versions("name='recommendation-model'")
previous = sorted(
[v for v in all_versions if v.version < current.version],
key=lambda v: v.version,
reverse=True
)[0]

# Step 3: Rollback - archive current, promote previous
client.transition_model_version_stage(
name="recommendation-model",
version=current.version,
stage="Archived",
archive_existing_versions=False,
)
client.transition_model_version_stage(
name="recommendation-model",
version=previous.version,
stage="Production",
)

print(f"Rolled back to version {previous.version}")

Step 4: The serving platform - configured to poll the model registry every 60 seconds and hot-reload production models - detected the version change and loaded the previous model without a container restart. Time from alert to resolution: 3 minutes 12 seconds.

This is the operational value of a properly designed model registry. Not just a versioned artifact store - a lifecycle management system with defined semantics, automated integration, and reliable rollback.


Why Model Registries Exist

Before model registries, ML deployment was a distributed coordination problem. The training team would produce a model artifact (a .pkl file, a PyTorch checkpoint, a TensorFlow SavedModel). They would upload it to an S3 path. They would Slack the infrastructure team with the path. The infrastructure team would update a config file. A deployment would happen. If it broke, reversing the process meant another Slack message, another config update, another deployment.

This process had six failure modes:

  1. S3 path not communicated correctly - wrong model deployed
  2. Model artifact uploaded without documentation - impossible to reproduce
  3. No staging environment - production is the test environment
  4. No lineage - can't trace which training data produced this model
  5. No history - rollback requires reconstructing what the previous model was
  6. No governance - anyone can deploy anything at any time

A model registry centralizes all of this into a single, auditable system with defined lifecycle states, explicit approval workflows, and programmatic integration with both training and serving systems.


The Registry Lifecycle Model

The standard model lifecycle has four stages:

Stage Semantics

None: The model has been registered from a training run but has not been validated. This is the output state of every training pipeline.

Staging: The model has passed offline validation (accuracy above threshold, no performance regression) and is deployed for evaluation. In staging, the model may receive shadow traffic (requests routed to it in parallel with production, results not used) or A/B test traffic.

Production: The model is the current live version. Only one version can be in Production at a time (by convention; MLflow does allow multiple). Production models are subject to SLOs - latency, availability, and accuracy monitoring.

Archived: The model is no longer serving traffic but its artifact and metadata are retained permanently. Archived models must be retrievable for regulatory compliance, audit, and rollback purposes.


Implementing the Model Registry

Model Registration with Full Lineage

import mlflow
from mlflow.tracking import MlflowClient
from datetime import datetime
import json

class ModelRegistrar:
"""
Handles model registration with complete lineage and metadata.
Called at the end of every training pipeline.
"""

def __init__(self, tracking_uri: str):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()

def register_from_run(
self,
run_id: str,
model_name: str,
artifact_path: str,
validation_results: dict,
description: str = "",
) -> mlflow.entities.model_registry.ModelVersion:
"""
Register a model from a completed training run.
Captures full lineage: run → data → code → model version.
"""
# Get run metadata for lineage
run = self.client.get_run(run_id)

# Construct model URI
model_uri = f"runs:/{run_id}/{artifact_path}"

# Register the model
mv = mlflow.register_model(model_uri, model_name)

# Set rich metadata on the model version
lineage = {
"run_id": run_id,
"git_commit": run.data.tags.get("git_commit", "unknown"),
"git_branch": run.data.tags.get("git_branch", "unknown"),
"dataset_hash": run.data.tags.get("dataset_hash", "unknown"),
"dataset_path": run.data.tags.get("dataset_path", "unknown"),
"training_start": run.info.start_time,
"training_end": run.info.end_time,
"registered_at": datetime.utcnow().isoformat(),
"registered_by": run.data.tags.get("owner", "unknown"),
}

# Training performance metrics
performance = {
metric: value
for metric, value in run.data.metrics.items()
}

self.client.update_model_version(
name=model_name,
version=mv.version,
description=f"{description}\n\nLineage: {json.dumps(lineage, indent=2)}",
)

# Tag the version with searchable metadata
self.client.set_model_version_tag(
name=model_name,
version=mv.version,
key="validation_status",
value="pending",
)
self.client.set_model_version_tag(
name=model_name,
version=mv.version,
key="validation_results",
value=json.dumps(validation_results),
)
self.client.set_model_version_tag(
name=model_name,
version=mv.version,
key="git_commit",
value=lineage["git_commit"],
)
self.client.set_model_version_tag(
name=model_name,
version=mv.version,
key="dataset_hash",
value=lineage["dataset_hash"],
)

return mv

def promote_to_staging(
self,
model_name: str,
version: int,
approval_notes: str = "",
) -> None:
"""Promote a model to Staging after offline validation passes."""
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Staging",
)
self.client.set_model_version_tag(
name=model_name,
version=version,
key="validation_status",
value="approved_for_staging",
)
self.client.set_model_version_tag(
name=model_name,
version=version,
key="staging_approval_notes",
value=approval_notes,
)

def promote_to_production(
self,
model_name: str,
version: int,
ab_test_results: dict,
) -> None:
"""Promote a model to Production after A/B test passes."""
# Archive current production version
current_prod = self.client.get_latest_versions(
model_name, stages=["Production"]
)
for cv in current_prod:
self.client.transition_model_version_stage(
name=model_name,
version=cv.version,
stage="Archived",
)

# Promote to production
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production",
)
self.client.set_model_version_tag(
name=model_name,
version=version,
key="ab_test_results",
value=json.dumps(ab_test_results),
)
self.client.set_model_version_tag(
name=model_name,
version=version,
key="production_deployed_at",
value=datetime.utcnow().isoformat(),
)

Serving Platform Integration

The serving platform should poll the model registry to load the current production model - no manual deployment step required:

import mlflow
import time
import threading
from typing import Optional

class RegistryAwareModelServer:
"""
Model server that continuously polls the registry for new production models.
Hot-reloads new models without restarting the serving process.
"""

def __init__(
self,
model_name: str,
tracking_uri: str,
poll_interval_seconds: int = 60,
):
mlflow.set_tracking_uri(tracking_uri)
self.model_name = model_name
self.poll_interval = poll_interval_seconds
self.client = mlflow.MlflowClient()

self._current_model = None
self._current_version = None
self._model_lock = threading.Lock()

# Load initial model
self._load_production_model()

# Start background polling thread
self._polling_thread = threading.Thread(
target=self._poll_registry,
daemon=True,
)
self._polling_thread.start()

def _load_production_model(self) -> None:
"""Load the current production model from registry."""
versions = self.client.get_latest_versions(
self.model_name, stages=["Production"]
)

if not versions:
raise RuntimeError(f"No production model found for '{self.model_name}'")

latest = versions[0]

if latest.version == self._current_version:
return # Already on the current version

model_uri = f"models:/{self.model_name}/Production"
new_model = mlflow.pyfunc.load_model(model_uri)

with self._model_lock:
self._current_model = new_model
self._current_version = latest.version

print(f"Loaded model version {latest.version}")

def _poll_registry(self) -> None:
"""Background thread: poll registry for model version changes."""
while True:
time.sleep(self.poll_interval)
try:
self._load_production_model()
except Exception as e:
print(f"Registry poll failed: {e}")
# Continue serving with current model - don't crash

def predict(self, inputs) -> dict:
"""Thread-safe prediction using current production model."""
with self._model_lock:
model = self._current_model
version = self._current_version

predictions = model.predict(inputs)
return {
"predictions": predictions,
"model_version": version, # Include in response for debugging
}

Model Versioning Strategies

Strategy 1: Semantic Versioning for Models

MAJOR.MINOR.PATCH
│ │ └── Artifact-only change (same architecture, retraining)
│ └──────── Architecture change (different hidden dim, etc.)
└────────────── API change (different input/output schema)
from dataclasses import dataclass
from enum import Enum

class VersionBumpType(Enum):
PATCH = "patch" # same architecture, retrained on new data
MINOR = "minor" # architecture change, compatible interface
MAJOR = "major" # interface change, requires serving update

@dataclass
class ModelVersion:
major: int
minor: int
patch: int

def bump(self, bump_type: VersionBumpType) -> "ModelVersion":
if bump_type == VersionBumpType.MAJOR:
return ModelVersion(self.major + 1, 0, 0)
elif bump_type == VersionBumpType.MINOR:
return ModelVersion(self.major, self.minor + 1, 0)
else:
return ModelVersion(self.major, self.minor, self.patch + 1)

def __str__(self) -> str:
return f"v{self.major}.{self.minor}.{self.patch}"

Artifact Storage Layout

s3://ml-model-artifacts/
└── {model_name}/
└── {version}/
├── model.pt # model weights
├── config.json # model configuration
├── tokenizer/ # preprocessing artifacts
├── training_run.json # link back to MLflow run
└── validation_report.json # test set performance
from pathlib import Path
import boto3
import json
import torch

def save_model_artifacts(
model: torch.nn.Module,
config: dict,
tokenizer,
validation_results: dict,
s3_path: str,
run_id: str,
) -> str:
"""
Save complete model artifact bundle to S3.
Returns the S3 URI for MLflow registration.
"""
s3 = boto3.client('s3')
bucket, prefix = s3_path.replace("s3://", "").split("/", 1)

# Save model weights
local_path = Path("/tmp/model_artifacts")
local_path.mkdir(exist_ok=True)

torch.save(model.state_dict(), local_path / "model.pt")
(local_path / "config.json").write_text(json.dumps(config, indent=2))
(local_path / "training_run.json").write_text(json.dumps({"run_id": run_id}))
(local_path / "validation_report.json").write_text(
json.dumps(validation_results, indent=2)
)
tokenizer.save_pretrained(str(local_path / "tokenizer"))

# Upload all artifacts
for local_file in local_path.rglob("*"):
if local_file.is_file():
s3_key = f"{prefix}/{local_file.relative_to(local_path)}"
s3.upload_file(str(local_file), bucket, s3_key)

return s3_path

Champion/Challenger Management

The champion is the current production model. Challengers are candidates that may replace it. The registry tracks both:

class ChampionChallengerManager:
"""
Manage champion/challenger A/B testing through model registry.
"""

def __init__(self, client: mlflow.MlflowClient, model_name: str):
self.client = client
self.model_name = model_name

def get_champion(self) -> Optional[mlflow.entities.model_registry.ModelVersion]:
"""Return current production (champion) model."""
versions = self.client.get_latest_versions(
self.model_name, stages=["Production"]
)
return versions[0] if versions else None

def get_challenger(self) -> Optional[mlflow.entities.model_registry.ModelVersion]:
"""Return current staging (challenger) model."""
versions = self.client.get_latest_versions(
self.model_name, stages=["Staging"]
)
return versions[0] if versions else None

def run_ab_test_config(
self,
challenger_traffic_pct: float = 5.0,
) -> dict:
"""
Return traffic routing config for champion/challenger A/B test.
Serving platform reads this to split traffic.
"""
champion = self.get_champion()
challenger = self.get_challenger()

if not champion:
raise RuntimeError("No production model to test against")
if not challenger:
raise RuntimeError("No staging model to test")

return {
"routes": [
{
"model_version": champion.version,
"stage": "Production",
"traffic_pct": 100.0 - challenger_traffic_pct,
"variant": "control",
},
{
"model_version": challenger.version,
"stage": "Staging",
"traffic_pct": challenger_traffic_pct,
"variant": "treatment",
},
],
"ab_test_id": f"champion-{champion.version}-vs-challenger-{challenger.version}",
}

Production Engineering Notes

Model Lineage Graph

Lineage is the complete causal chain: training data → code → training run → model version → deployment. Query it programmatically:

def get_model_lineage(
client: mlflow.MlflowClient,
model_name: str,
version: int,
) -> dict:
"""Build complete lineage for a model version."""
model_version = client.get_model_version(model_name, version)
run = client.get_run(model_version.run_id)

return {
"model": {
"name": model_name,
"version": version,
"stage": model_version.current_stage,
},
"training_run": {
"run_id": model_version.run_id,
"experiment": client.get_experiment(run.info.experiment_id).name,
"metrics": dict(run.data.metrics),
"params": dict(run.data.params),
},
"code": {
"git_commit": run.data.tags.get("git_commit"),
"git_branch": run.data.tags.get("git_branch"),
},
"data": {
"dataset_hash": run.data.tags.get("dataset_hash"),
"dataset_path": run.data.tags.get("dataset_path"),
},
"environment": {
"python_version": run.data.tags.get("python_version"),
"torch_version": run.data.tags.get("torch_version"),
},
}

Common Mistakes

:::danger Not archiving old production models If you don't archive old production models - just overwrite the Production stage - you lose rollback capability. Always use the archive lifecycle state. Keep all model versions indefinitely (storage cost is minimal vs the value of a 3-minute rollback in an incident). :::

:::warning Using S3 paths directly instead of registry URIs s3://models/recommendation/v2.3.pkl works until someone moves the file, accidentally overwrites it, or renames the bucket. Model registry URIs (models:/recommendation-model/Production) are stable references that resolve correctly regardless of underlying artifact location changes. :::

:::danger Registering models without validation results A model in the registry without documented validation results is a trap. When it gets promoted to production (perhaps by someone unfamiliar with the model), there's no record of what quality bar it met. Require validation results as metadata before allowing staging promotion. :::


Interview Q&A

Q: What is model lineage and why does it matter for production systems?

A: Model lineage is the complete audit trail connecting a production model to its training inputs: which training data (hash/version), which code (git commit), which training run (hyperparameters, metrics), and which engineer approved the deployment. It matters for three reasons. First, debugging: when a production model fails, lineage lets you immediately identify whether the failure is due to a data issue, code bug, or model architecture change. Second, reproducibility: to retrain or improve a model, you need to know exactly what went into it. Third, compliance: regulated industries (finance, healthcare) require being able to answer "what data trained this model?" for every production model at any point in time. I store lineage as tags on the model version in MLflow - git commit, dataset hash, training run ID - and build a simple API to query the full chain from model version backward to raw data.

Q: How would you implement a model rollback system?

A: Three requirements. First, every production model version must be archived (not deleted) when a new version is promoted - you can't roll back to something that's been deleted. Second, the serving platform must poll the model registry for the current production version and hot-reload without a container restart - this is what makes rollback fast. Third, a rollback should be a single command: archive the current production version, promote the previous version. The serving platform detects the change within its polling interval (60 seconds) and reloads. With this architecture, rollback is under 5 minutes - faster than any manual intervention. I'd also add a rollback CLI command to the platform tooling that does the two-step operation atomically, with confirmation prompt and Slack notification.

Q: How do you version models that change their input/output schema?

A: Schema changes are the most dangerous type of model change because they break downstream consumers. I use semantic versioning with major version bumps for any interface change. Before deploying a major version, run a compatibility check against all registered consumers of the model. For the transition period, run old and new versions simultaneously - route new consumers to the new version, old consumers to the old version. Only deprecate the old version when all consumers have migrated. The model registry tags should include the input/output schema (as a JSON schema), making it queryable: "what models consume input X?" I also recommend never changing schema in a minor or patch version - even if the change is additive (new optional field), it can break consumers that do exact-match validation.

Q: What's the difference between staging and a shadow deployment?

A: Both let you test a new model without affecting production traffic, but they serve different purposes. Staging: the model is deployed and receives a small percentage of real traffic (5–10%), its predictions are used, and users see the results - it's a real A/B test. Shadow deployment: the model receives a copy of production traffic, computes predictions, but those predictions are discarded - users never see the results. Shadow deployment is useful for: validating new model infrastructure before serving real traffic, testing performance and latency under real load, catching prediction errors that would affect users (like NaN outputs or schema violations). Staging is useful for: measuring actual business impact via A/B test, building statistical confidence before full rollout. I typically use shadow deployment for 24–48 hours to validate no critical errors, then transition to staging for the A/B test.

Q: How do you handle multiple models serving the same endpoint?

A: This is the champion/challenger pattern. The model registry holds multiple versions simultaneously: one in Production (champion), one in Staging (challenger). The serving platform reads a routing config - specified as metadata in the registry or in a separate routing table - that says "send X% of traffic to champion, Y% to challenger." The key requirements: requests must be randomly and consistently assigned to variants (use a hash of user ID or request ID, not true randomness, to avoid the same user switching between models mid-session), both variants must log predictions to the same schema (for direct comparison), and the A/B test framework must compute statistical significance before declaring a winner. I prefer to keep routing config in the registry itself (as a JSON tag on the model name object, not the version), making it easy to change traffic splits without redeploying.

© 2026 EngineersOfAI. All rights reserved.