Skip to main content

Automated Retraining Pipelines

The Fraud Model That Could Not Keep Up

The fraud detection team at a payments company had a problem that grew slowly and then all at once. Their model was trained once per month, a deliberate schedule set when model training took 6 hours and required a data scientist to babysit it. Over 18 months, three things changed: training time dropped to 45 minutes with better hardware, fraudsters started adapting their patterns faster (sometimes within days of the model update), and transaction volume grew 3x, meaning more training data was available than ever.

The symptoms were clear in hindsight: model performance metrics in production would be strong for the first two weeks after retraining, then drift steadily downward through weeks three and four. By the end of the month, the model was barely better than a simple threshold rule. But no one was watching closely enough to see the decay - they were watching monthly reports that averaged over the whole cycle.

When a new engineer proposed retraining every 48 hours instead of monthly, the team's first reaction was: "That's impossible, we can't have someone monitoring training twice a day." The second reaction, after a week of thinking: "Why does someone need to monitor it?" The answer was that nobody had ever designed the pipeline to run unattended. The retraining script assumed a human was watching, had no automated validation, and had no automated deployment. It was a training script, not a pipeline.

This lesson builds the pipeline that turned that 48-hour manual process into a fully automated system with automated validation, human approval only when something unexpected happens, and complete auditability.

:::tip 🎮 Interactive Playground Visualize this concept: Try the CI/CD Pipeline for ML demo on the EngineersOfAI Playground - no code required. :::

Why This Exists

The history of ML retraining automation mirrors the history of software deployment automation. In the early days of web software, deployments were manual - an engineer would SSH in and run a deploy script. This worked when deployments were rare. As deployment frequency increased, manual processes became bottlenecks. The DevOps movement responded with continuous deployment: automated pipelines that deploy code on every merge to main.

ML retraining automation followed the same arc, with a 5-10 year lag. The insight from teams that figured it out first: retraining should be a pipeline event, not a human task. The human should only be involved when something unusual happens - validation fails, quality regresses, an anomaly appears in the data. Routine retraining should be as unattended as routine software deployment.

The challenge is that ML retraining has more failure modes than software deployment, which is why automated validation and gating are prerequisites for automated deployment.

Retraining Trigger Types

For a fraud detection model, the recommended trigger strategy combines time-based and data-based triggers: retrain every 48 hours AND whenever a significant distribution shift is detected in transaction features. This provides predictability while remaining responsive to sudden changes.

The Complete Automated Retraining Pipeline

# src/retraining/pipeline.py
"""
Automated retraining pipeline for fraud detection model.
Designed to run unattended via scheduler. Raises exceptions with
detailed context for alerting when intervention is needed.
"""

import json
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import boto3
import mlflow
from mlflow.tracking import MlflowClient

logger = logging.getLogger(__name__)


class RetrainingPipeline:
"""
Fully automated model retraining pipeline.

Stages:
1. Data ingestion (rolling 30-day window)
2. Data validation
3. Training
4. Evaluation + gating
5. Registration (if gate passes)
6. Deployment (if auto-deploy enabled, or await approval)
"""

def __init__(self, config: dict):
self.config = config
self.client = MlflowClient(tracking_uri=config["mlflow_tracking_uri"])
self.s3 = boto3.client("s3")
self.run_id: Optional[str] = None
self.run_timestamp = datetime.utcnow()

def run(self) -> dict:
"""Execute the full retraining pipeline. Returns summary dict."""
logger.info(f"Starting retraining pipeline at {self.run_timestamp.isoformat()}")

try:
# Stage 1: Ingest training data
training_data_path = self._ingest_training_data()

# Stage 2: Validate data
self._validate_data(training_data_path)

# Stage 3: Train
model_path, train_metrics = self._train_model(training_data_path)

# Stage 4: Evaluate + gate
eval_results = self._evaluate_and_gate(model_path)

# Stage 5: Register
model_version = self._register_model(model_version_metadata={
"trained_at": self.run_timestamp.isoformat(),
"train_data_window": self.config["training_window_days"],
"mlflow_run_id": self.run_id,
"gate_results": eval_results,
})

# Stage 6: Deploy (auto or await approval)
deployment_result = self._deploy_or_await_approval(model_version)

summary = {
"status": "success",
"model_version": model_version,
"metrics": eval_results,
"deployment": deployment_result,
"run_id": self.run_id,
}
logger.info(f"Retraining pipeline complete: {summary}")
return summary

except DataValidationError as e:
# Data is bad - alert data engineering team
self._alert(
severity="high",
message=f"Data validation failed: {e}",
channel="#data-quality",
team="data-engineering",
)
raise

except GateFailureError as e:
# Model quality regressed - alert ML team
self._alert(
severity="medium",
message=f"Model gate failed during automated retraining: {e}",
channel="#ml-alerts",
team="ml-platform",
)
raise

except Exception as e:
# Unexpected error - alert on-call
self._alert(
severity="critical",
message=f"Unexpected error in retraining pipeline: {type(e).__name__}: {e}",
channel="#ml-oncall",
team="oncall",
)
raise

def _ingest_training_data(self) -> str:
"""
Fetch training data for the rolling window.
Uses a rolling window (last N days) rather than expanding window
to avoid the model being dominated by old patterns.
"""
window_days = self.config["training_window_days"]
end_date = self.run_timestamp.date()
start_date = end_date - timedelta(days=window_days)

logger.info(f"Ingesting training data: {start_date} to {end_date}")

# Build S3 path for date-partitioned data
local_data_dir = Path(self.config["local_data_dir"])
local_data_dir.mkdir(parents=True, exist_ok=True)
output_path = local_data_dir / f"train_{start_date}_{end_date}.parquet"

if output_path.exists():
logger.info(f"Training data already exists at {output_path}, reusing")
return str(output_path)

# Fetch from S3 using Athena query or direct partition scan
import pandas as pd
dfs = []
current = start_date
while current <= end_date:
s3_key = (
f"fraud/transactions/year={current.year}/"
f"month={current.month:02d}/day={current.day:02d}/"
"transactions.parquet"
)
try:
obj = self.s3.get_object(
Bucket=self.config["training_data_bucket"],
Key=s3_key
)
df = pd.read_parquet(obj["Body"])
dfs.append(df)
except self.s3.exceptions.NoSuchKey:
logger.warning(f"No data for {current} at s3://{self.config['training_data_bucket']}/{s3_key}")
current += timedelta(days=1)

if not dfs:
raise DataValidationError(
f"No training data found for window {start_date} to {end_date}"
)

training_df = pd.concat(dfs, ignore_index=True)
logger.info(f"Ingested {len(training_df):,} training rows from {len(dfs)} partitions")

training_df.to_parquet(output_path, index=False)
return str(output_path)

def _validate_data(self, data_path: str) -> None:
"""Run data validation - raises DataValidationError on failure."""
import pandas as pd
df = pd.read_parquet(data_path)

# Row count check
min_rows = self.config.get("min_training_rows", 50_000)
if len(df) < min_rows:
raise DataValidationError(
f"Training data has {len(df):,} rows, minimum is {min_rows:,}. "
"Possible data pipeline failure."
)

# Schema check
required_cols = self.config["required_feature_columns"] + ["is_fraud"]
missing = set(required_cols) - set(df.columns)
if missing:
raise DataValidationError(f"Missing required columns: {missing}")

# Fraud rate sanity check
fraud_rate = df["is_fraud"].mean()
min_fraud_rate = self.config.get("min_fraud_rate", 0.001)
max_fraud_rate = self.config.get("max_fraud_rate", 0.10)
if not (min_fraud_rate <= fraud_rate <= max_fraud_rate):
raise DataValidationError(
f"Fraud rate {fraud_rate:.4%} outside expected range "
f"[{min_fraud_rate:.4%}, {max_fraud_rate:.4%}]"
)

logger.info(f"Data validation passed: {len(df):,} rows, fraud_rate={fraud_rate:.4%}")

def _train_model(self, data_path: str) -> tuple[str, dict]:
"""Run training. Returns (model_path, training_metrics)."""
from src.training.train import run_training

output_dir = Path(self.config["model_output_dir"]) / self.run_timestamp.strftime("%Y%m%d_%H%M%S")
output_dir.mkdir(parents=True, exist_ok=True)

with mlflow.start_run(
experiment_id=mlflow.get_experiment_by_name(
self.config["mlflow_experiment_name"]
).experiment_id,
run_name=f"auto-retrain-{self.run_timestamp.strftime('%Y%m%d-%H%M')}",
) as run:
self.run_id = run.info.run_id

# Log retraining metadata
mlflow.set_tags({
"trigger": self.config.get("trigger_type", "scheduled"),
"training_window_days": str(self.config["training_window_days"]),
"automated_retrain": "true",
})

result = run_training(
data_path=data_path,
output_dir=str(output_dir),
config=self.config["training_config"],
mlflow_run_id=self.run_id,
)

return result["model_path"], result["metrics"]

def _evaluate_and_gate(self, model_path: str) -> dict:
"""Evaluate model and run gate checks. Raises GateFailureError on gate failure."""
from src.evaluation.evaluate import evaluate_model
from scripts.check_gate import run_all_gates

# Evaluate on fixed eval set
eval_results = evaluate_model(
model_path=model_path,
eval_data_path=self.config["eval_data_path"],
)

# Fetch baseline
baseline_metrics = self._fetch_production_baseline_metrics()

# Run gates
all_passed, gate_results = run_all_gates(
new_metrics=eval_results,
baseline_metrics=baseline_metrics,
config=self.config["gate_config"],
)

if not all_passed:
failed_gates = [r for r in gate_results if not r.passed]
raise GateFailureError(
f"{len(failed_gates)} gate(s) failed: "
+ "; ".join(r.message for r in failed_gates)
)

return eval_results

def _fetch_production_baseline_metrics(self) -> dict:
"""Fetch evaluation metrics from the current production model."""
try:
versions = self.client.get_latest_versions(
self.config["model_registry_name"],
stages=["Production"]
)
if not versions:
logger.warning("No production model found - skipping regression gate")
return {}

prod_version = versions[0]
run = self.client.get_run(prod_version.run_id)
return dict(run.data.metrics)
except Exception as e:
logger.warning(f"Could not fetch baseline metrics: {e}. Skipping regression gate.")
return {}

def _register_model(self, model_version_metadata: dict) -> str:
"""Register model in MLflow Registry. Returns version number."""
import mlflow.sklearn

model_uri = f"runs:/{self.run_id}/model"

# Register to Staging first
model_version = mlflow.register_model(
model_uri=model_uri,
name=self.config["model_registry_name"],
)

# Add metadata tags
for key, value in model_version_metadata.items():
self.client.set_model_version_tag(
name=self.config["model_registry_name"],
version=model_version.version,
key=key,
value=str(value),
)

logger.info(
f"Registered model version {model_version.version} "
f"in Staging: {self.config['model_registry_name']}"
)
return model_version.version

def _deploy_or_await_approval(self, model_version: str) -> dict:
"""
If auto_deploy is enabled, deploy automatically.
Otherwise, send approval request and await human decision.
"""
if self.config.get("auto_deploy", False):
return self._deploy_to_production(model_version)
else:
return self._send_approval_request(model_version)

def _deploy_to_production(self, model_version: str) -> dict:
"""Promote model version to Production in registry."""
self.client.transition_model_version_stage(
name=self.config["model_registry_name"],
version=model_version,
stage="Production",
archive_existing_versions=True,
)
logger.info(f"Promoted model v{model_version} to Production")
return {"status": "deployed", "version": model_version}

def _send_approval_request(self, model_version: str) -> dict:
"""Send approval request to ML team via Slack/email."""
import requests
registry_url = (
f"{self.config['mlflow_tracking_uri']}/models/"
f"{self.config['model_registry_name']}/versions/{model_version}"
)
message = (
f":robot_face: *Automated Retraining Complete - Approval Needed*\n"
f"Model: `{self.config['model_registry_name']}` v{model_version}\n"
f"Trigger: {self.config.get('trigger_type', 'scheduled')}\n"
f"Training window: last {self.config['training_window_days']} days\n"
f"All evaluation gates: PASSED\n"
f"Review and approve: {registry_url}\n"
f"To approve: run `python scripts/approve_model.py --version {model_version}`"
)
requests.post(
self.config["slack_webhook_url"],
json={"text": message}
)
logger.info(f"Approval request sent for model v{model_version}")
return {"status": "awaiting_approval", "version": model_version}

def _alert(self, severity: str, message: str, channel: str, team: str):
"""Send alert to Slack and/or PagerDuty."""
import requests
emoji = {"critical": ":fire:", "high": ":warning:", "medium": ":yellow_circle:"}.get(severity, ":info:")
slack_message = f"{emoji} *[{severity.upper()}] Retraining Pipeline Alert*\n{message}"

if self.config.get("slack_webhook_url"):
requests.post(
self.config["slack_webhook_url"],
json={"text": slack_message, "channel": channel}
)

if severity == "critical" and self.config.get("pagerduty_routing_key"):
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": self.config["pagerduty_routing_key"],
"event_action": "trigger",
"payload": {
"summary": f"ML Retraining Pipeline Critical Failure: {message[:100]}",
"severity": "critical",
"source": "retraining-pipeline",
}
}
)


class DataValidationError(Exception):
pass


class GateFailureError(Exception):
pass

Training Data Management: Rolling vs Expanding Window

The choice between rolling window and expanding window training data has significant implications:

# src/retraining/data_strategy.py

def rolling_window_strategy(
end_date,
window_days: int = 30,
feature_store_client=None,
) -> str:
"""
Rolling window: use only the last N days of data.

Pros:
- Model stays current - not dominated by old patterns
- Consistent training data size (predictable training time)
- Natural forgetting of outdated patterns (good for fraud)

Cons:
- Loses long-term patterns
- Requires enough data in the window for good model performance

Best for: fraud detection, demand forecasting, news ranking
"""
start_date = end_date - timedelta(days=window_days)
return fetch_date_range(start_date, end_date, feature_store_client)


def expanding_window_strategy(
end_date,
min_start_date,
feature_store_client=None,
) -> str:
"""
Expanding window: use all data from min_start_date to end_date.

Pros:
- More data = generally better model
- Captures long-term seasonal patterns

Cons:
- Training time grows unboundedly
- Old data may hurt if distributions shifted

Best for: rare event prediction, long-cycle seasonality
"""
return fetch_date_range(min_start_date, end_date, feature_store_client)


def weighted_window_strategy(
end_date,
window_days: int = 90,
recent_weight_multiplier: float = 3.0,
feature_store_client=None,
) -> str:
"""
Weighted window: include 90 days but oversample recent data.
Compromise between rolling and expanding.

Best for: models where long-term patterns matter but recent
data is more predictive (e.g., credit risk).
"""
import pandas as pd
start_date = end_date - timedelta(days=window_days)
df = fetch_date_range_as_df(start_date, end_date, feature_store_client)

# Oversample last 30 days by multiplier
recent_cutoff = end_date - timedelta(days=30)
recent_mask = pd.to_datetime(df["timestamp"]).dt.date >= recent_cutoff
recent_df = df[recent_mask]
extra_recent = pd.concat(
[recent_df] * (int(recent_weight_multiplier) - 1),
ignore_index=True
)
weighted_df = pd.concat([df, extra_recent], ignore_index=True)
weighted_df = weighted_df.sample(frac=1, random_state=42).reset_index(drop=True)
return save_and_return_path(weighted_df)

Scheduling the Pipeline

# kubernetes/cronjob-retraining.yaml
# Runs the retraining pipeline every 48 hours
apiVersion: batch/v1
kind: CronJob
metadata:
name: fraud-model-retraining
namespace: ml-platform
spec:
schedule: "0 2 */2 * *" # 2 AM every 2 days
timeZone: "UTC"
concurrencyPolicy: Forbid # Don't start new job if previous is still running
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 5
jobTemplate:
spec:
backoffLimit: 0 # Don't retry - alert on failure instead
activeDeadlineSeconds: 14400 # 4 hour hard timeout
template:
spec:
restartPolicy: Never
serviceAccountName: ml-training
containers:
- name: retraining
image: gcr.io/myproject/ml-training:latest
command:
- python
- -m
- src.retraining.run
- --config
- /config/retraining.yaml
env:
- name: MLFLOW_TRACKING_URI
valueFrom:
secretKeyRef:
name: mlflow-credentials
key: tracking-uri
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: slack-webhooks
key: ml-alerts
resources:
requests:
cpu: "4"
memory: "16Gi"
limits:
cpu: "8"
memory: "32Gi"
volumeMounts:
- name: config
mountPath: /config
volumes:
- name: config
configMap:
name: retraining-config

Handling Pipeline Failures

# src/retraining/failure_handler.py

class RetrainingFailureHandler:
"""
Handles different failure modes with appropriate responses.
The goal: human is involved only when automation cannot recover.
"""

RECOVERABLE_ERRORS = [
"connection timeout",
"temporary resource unavailable",
"rate limit exceeded",
]

def handle(self, error: Exception, pipeline_config: dict, attempt: int = 1) -> str:
"""
Returns: 'retry', 'abort', or 'alert_and_abort'
"""
error_str = str(error).lower()

# Transient errors: retry up to 3 times
if any(recoverable in error_str for recoverable in self.RECOVERABLE_ERRORS):
if attempt < 3:
logger.info(f"Recoverable error on attempt {attempt}, will retry: {error}")
return "retry"
else:
logger.error(f"Exhausted retries for recoverable error: {error}")
return "alert_and_abort"

# Data validation errors: do not retry, alert data team
if isinstance(error, DataValidationError):
return "alert_and_abort"

# Gate failures: do not retry (retraining won't fix quality issues)
if isinstance(error, GateFailureError):
return "alert_and_abort"

# Unknown errors: alert immediately
return "alert_and_abort"

Production Notes

Idempotency: Design the retraining pipeline to be idempotent - running it twice in a row should produce the same result and not create duplicate model versions. Check if a model was already registered for the same data window before starting training.

Training budget: Set a maximum training budget (both time and cost). If training exceeds the budget, abort and alert. An unexpectedly long training run is often a symptom of a problem (training on too much data, infinite gradient descent loop) that will not resolve itself.

Audit trail: Every automated retraining run should be logged with: trigger reason, data window, training parameters, evaluation metrics, gate results, and deployment decision. This trail is essential for debugging production issues and for regulatory compliance.

:::tip Start with Manual Approval When building automated retraining for the first time, start with auto_deploy: false and require manual approval for all automated retraining runs. After running for two months with zero incidents, switch to automatic deployment for routine runs. Build trust in the automation before removing the human checkpoint. :::

:::warning Do Not Retrain on the Same Data Twice Track which data windows have been used for training. If a retraining run fails midway and restarts, it should use the same data window - not start fresh with a new window. Consistent data windows make model versioning meaningful and prevent train-eval contamination. :::

:::danger Cascading Failures in Automated Pipelines If your retraining pipeline runs every 48 hours and a bug causes every run to fail, you can quickly accumulate a large number of failing jobs. Use concurrencyPolicy: Forbid in Kubernetes CronJobs (or the equivalent in your scheduler) to prevent pile-up, and set backoffLimit: 0 to fail fast rather than retry indefinitely. :::

Interview Q&A

Q: What are the four types of retraining triggers and which is most appropriate for fraud detection?

The four types are: (1) schedule-based (retrain every N hours/days), (2) data-based (retrain when N new samples arrive), (3) drift-based (retrain when feature distribution shifts beyond a threshold), and (4) performance-based (retrain when production metrics drop). For fraud detection, a combination of schedule-based and drift-based is most appropriate. Schedule-based provides predictability and ensures the model stays current with recent fraud patterns. Drift-based adds responsiveness when fraudsters make sudden tactical shifts. Performance-based is too lagging - by the time you see production metric degradation, fraud has already slipped through.

Q: What is the difference between a rolling window and an expanding window training strategy?

Rolling window uses only the last N days of data (e.g., last 30 days), discarding older data. Expanding window uses all historical data from a fixed start date to the present. Rolling window keeps the model current with recent patterns and ensures consistent training time; it is ideal for fast-changing domains like fraud. Expanding window captures more historical patterns and rare events; it is better for slowly evolving domains or when rare events need many examples. A hybrid approach (weighted window) oversamples recent data while retaining older data at lower weight.

Q: How do you handle the case where an automated retraining run produces a model that fails the gate?

The pipeline should catch the gate failure exception, log the failure details (which gates failed, by how much), send an alert to the ML team with the specific failure information, and abort without promoting the model. The previous production model continues serving. The ML team investigates the root cause: was there a data quality issue? Did the model architecture need updating? Did the evaluation set become stale? Gate failures in automated retraining are valuable signals - they indicate something has changed in the data or model landscape that requires human attention.

Q: How do you implement human-in-the-loop approval for automated retraining without blocking the pipeline for days?

Use a two-stage approach. The pipeline runs fully automatically through training, evaluation, and gating. If all gates pass, the model is registered in Staging and an approval notification is sent via Slack with a direct link to the model registry. The notification includes all key metrics and gate results inline. An SRE or ML engineer reviews and clicks approve. Set an SLA for approval (e.g., must be approved within 24 hours or the model expires). Automate the approval for routine runs after sufficient track record - see the tip about starting with manual approval.

Q: How do you prevent automated retraining from creating a training-serving skew?

Training-serving skew in automated retraining usually comes from feature computation differences: features computed differently in the training pipeline vs the serving pipeline. To prevent this, enforce a single feature computation implementation that both pipelines call. Use a feature store that serves the same feature values at training time (point-in-time correct) and serving time. Run end-to-end integration tests that check: train a model on synthetic data, serve predictions using the exact serving infrastructure, verify predictions match expectations.

© 2026 EngineersOfAI. All rights reserved.