Cloud ML Cost Optimization
The $80,000 Monthly Bill
The Slack message arrived on a Monday morning: "Hey, can you explain why our AWS bill jumped 40% last month?" The sender was the VP of Engineering. The recipient was the ML platform lead who had spent the past six months building out a proper MLOps platform on AWS. The bill was 57,000 the month before.
The answer, once the platform lead spent two hours in AWS Cost Explorer, was simultaneously obvious and mortifying. A training job scheduled to run overnight had been misconfigured - it launched on-demand ml.p3.8xlarge instances (4 × V100 GPUs at 14.49 × 4 instances × 264 hours = $15,300. Just that one job. But pulling on that thread revealed more: experiments that were never cleaned up, SageMaker endpoints running with zero traffic, training datasets stored in S3 Standard that were read exactly once, and notebook instances running over weekends with no one using them.
Six weeks later, through a systematic program of cost reduction, the monthly bill was $31,000. The same model training volume. The same inference traffic. The same team. The difference was discipline: spot instances for interruptible workloads, reserved capacity for predictable loads, storage tiering, idle resource cleanup, and a tagging policy that finally made it possible to know what was costing what.
This lesson documents exactly how to do that. We cover the ML cost taxonomy, every major cost reduction lever, cross-cloud pricing comparisons, and the Python tooling to make cost visibility a first-class engineering concern.
:::tip 🎮 Interactive Playground Visualize this concept: Try the ML Cost & Unit Economics demo on the EngineersOfAI Playground - no code required. :::
Why This Exists - Cloud Billing Is Adversarially Opaque
Cloud providers have a strong incentive to make costs hard to understand. Dozens of pricing dimensions, regional variations, data transfer fees, API call charges, and storage tier complexity mean that the total cost of running an ML workload is rarely obvious from the list price. A team that knows the GPU hourly rate might overlook the storage egress cost that doubles the effective compute bill.
ML workloads have specific cost characteristics that differ from web applications. Training is bursty and expensive - you might spend $10,000 in a 6-hour training run and nothing for a week. Inference is steady but has very different cost profiles depending on whether you use serverless or always-on serving. Feature computation in Spark has data transfer costs that vary by cloud provider and region. Understanding these dynamics is not a finance team problem - it is an engineering problem that requires code.
:::note Historical Context Cloud FinOps (Financial Operations) emerged as a discipline around 2015, formalized by the FinOps Foundation in 2020. Spot/preemptible instances have been available since AWS introduced Spot Instances in 2009 and Google introduced Preemptible VMs in 2014. Azure launched Spot VMs in 2020. The challenge of managing cloud ML costs specifically became a recognized problem as ML workloads moved from research to production around 2018-2020. :::
The ML Cost Taxonomy
Understanding exactly what you are paying for is the prerequisite for reducing it.
Compute Cost: On-Demand vs Spot vs Reserved
The Three Compute Tiers
On-Demand instances are billed per second (or per minute on older instance types) with no commitment. They are always available, never interrupted, and cost the list price. Use them only for workloads that cannot tolerate interruption and run unpredictably (e.g., a CI job triggered on every commit).
Spot / Preemptible instances use spare cloud capacity at 60-90% discounts. The tradeoff: they can be interrupted with 2 minutes' notice (AWS) or 30 seconds' notice (GCP). For ML training, this is manageable if you checkpoint regularly. Most training jobs are interruptible - if the job fails, you restart from the last checkpoint. For jobs under 2 hours, the spot termination risk is low enough that most teams accept it.
Reserved Instances / Committed Use Discounts provide 20-60% savings in exchange for a 1 or 3-year capacity commitment. Use these for predictable, always-running workloads: production inference endpoints, always-on feature pipeline clusters, dedicated training nodes for regular overnight jobs.
Cross-Cloud GPU Pricing Comparison
These are approximate prices as of early 2025. Always check current pricing - GPU instance pricing changes frequently.
| GPU | AWS | GCP | Azure |
|---|---|---|---|
| A100 80GB (1x) | p4de.xlarge: $32.77/hr | a2-ultragpu-1g: $3.67/hr* | NC96ads_A100_v4: $27.20/hr |
| A100 40GB (1x) | p4d.xlarge n/a | a2-highgpu-1g: $3.67/hr | NC24ads_A100_v4: $3.67/hr |
| V100 (1x) | p3.2xlarge: $3.06/hr | n1+v100: $2.48/hr | NC6s_v3: $3.06/hr |
| T4 (1x) | g4dn.xlarge: $0.526/hr | n1+t4: $0.35/hr | NC4as_T4_v3: $0.526/hr |
*GCP pricing uses a1-megagpu for large workloads; a2-ultragpu for the 80GB variant
Key insight: GCP is generally 10-20% cheaper than AWS for equivalent GPU instances. Azure pricing tends to match AWS. For pure training cost minimization, run on GCP with preemptible VMs.
# cost_calculator.py - estimate training job cost before submitting
from dataclasses import dataclass
from typing import Optional
@dataclass
class ComputeConfig:
cloud: str # "aws", "gcp", "azure"
instance_type: str
gpu_count: int
on_demand_hourly: float
spot_discount: float # Fraction (e.g., 0.7 = 70% discount)
reserved_discount: float # Fraction (e.g., 0.4 = 40% discount)
# Approximate pricing data (update regularly from cloud price lists)
INSTANCE_CATALOG = {
"aws": {
"ml.p3.2xlarge": ComputeConfig("aws", "ml.p3.2xlarge", 1, 3.825, 0.70, 0.40),
"ml.p3.8xlarge": ComputeConfig("aws", "ml.p3.8xlarge", 4, 14.49, 0.70, 0.40),
"ml.p4d.24xlarge": ComputeConfig("aws", "ml.p4d.24xlarge", 8, 32.77, 0.65, 0.45),
"ml.g4dn.xlarge": ComputeConfig("aws", "ml.g4dn.xlarge", 1, 0.736, 0.70, 0.30),
"ml.g5.xlarge": ComputeConfig("aws", "ml.g5.xlarge", 1, 1.006, 0.70, 0.30),
},
"gcp": {
"a2-highgpu-1g": ComputeConfig("gcp", "a2-highgpu-1g", 1, 3.673, 0.80, 0.55),
"a2-highgpu-8g": ComputeConfig("gcp", "a2-highgpu-8g", 8, 29.39, 0.80, 0.55),
"n1-standard-8+1xT4": ComputeConfig("gcp", "n1-standard-8+1xT4", 1, 0.760, 0.80, 0.45),
},
"azure": {
"Standard_NC6s_v3": ComputeConfig("azure", "Standard_NC6s_v3", 1, 3.06, 0.60, 0.35),
"Standard_NC24s_v3": ComputeConfig("azure", "Standard_NC24s_v3", 4, 12.24, 0.60, 0.35),
"Standard_ND96asr_v4": ComputeConfig("azure", "Standard_ND96asr_v4", 8, 27.20, 0.60, 0.40),
},
}
def estimate_training_cost(
cloud: str,
instance_type: str,
estimated_hours: float,
pricing_model: str = "spot",
storage_gb: float = 100.0,
egress_gb: float = 10.0,
) -> dict:
"""
Estimate total cost for a training job.
Args:
cloud: Cloud provider ("aws", "gcp", "azure")
instance_type: Instance type string
estimated_hours: Expected training duration in hours
pricing_model: "on_demand", "spot", or "reserved"
storage_gb: Amount of data to read from storage (GB)
egress_gb: Data transferred out of cloud (GB)
"""
config = INSTANCE_CATALOG[cloud][instance_type]
# Compute cost
if pricing_model == "spot":
effective_rate = config.on_demand_hourly * (1 - config.spot_discount)
elif pricing_model == "reserved":
effective_rate = config.on_demand_hourly * (1 - config.reserved_discount)
else: # on_demand
effective_rate = config.on_demand_hourly
compute_cost = effective_rate * estimated_hours
# Storage cost (approximate, region-dependent)
storage_costs = {
"aws": 0.023, # S3 Standard per GB/month
"gcp": 0.020, # GCS Standard per GB/month
"azure": 0.018, # Azure Blob per GB/month
}
storage_cost = storage_gb * storage_costs[cloud]
# Egress cost (worst case: cross-region)
egress_costs = {
"aws": 0.09, # per GB after 100GB free
"gcp": 0.08, # per GB
"azure": 0.087, # per GB
}
egress_cost = egress_gb * egress_costs[cloud]
total = compute_cost + storage_cost + egress_cost
return {
"compute_cost": round(compute_cost, 2),
"storage_cost": round(storage_cost, 2),
"egress_cost": round(egress_cost, 2),
"total_cost": round(total, 2),
"effective_hourly_rate": round(effective_rate, 3),
"gpu_count": config.gpu_count,
"pricing_model": pricing_model,
}
def compare_across_clouds(
estimated_hours: float,
gpu_count: int = 1,
pricing_model: str = "spot",
) -> None:
"""Compare cost for equivalent GPU configurations across clouds."""
print(f"\nCost comparison for {gpu_count}x GPU, {estimated_hours}h training ({pricing_model}):")
print("-" * 70)
comparisons = [
("aws", "ml.g5.xlarge"),
("aws", "ml.p3.2xlarge"),
("gcp", "n1-standard-8+1xT4"),
("gcp", "a2-highgpu-1g"),
("azure", "Standard_NC6s_v3"),
]
results = []
for cloud, instance in comparisons:
cost = estimate_training_cost(cloud, instance, estimated_hours,
pricing_model=pricing_model)
results.append((cloud, instance, cost))
results.sort(key=lambda x: x[2]["total_cost"])
for cloud, instance, cost in results:
print(f"{cloud:6} | {instance:30} | "
f"${cost['effective_hourly_rate']:6.3f}/hr × {estimated_hours}h | "
f"Total: ${cost['total_cost']:8.2f}")
# Example usage
compare_across_clouds(estimated_hours=24, gpu_count=1, pricing_model="spot")
Spot Instance Interruption Handling
The fear of spot interruptions stops many teams from using them. The antidote is checkpoint-and-restart: save model state frequently enough that a 2-minute warning lets you save and restart from a recent checkpoint.
# checkpoint_trainer.py - spot-safe training with automatic recovery
import os
import signal
import time
import json
import boto3
import torch
import torch.nn as nn
from pathlib import Path
class SpotSafeTrainer:
"""
Training loop designed for spot instance interruption.
Checkpoints every N steps and handles SIGTERM gracefully.
"""
def __init__(
self,
model: nn.Module,
optimizer: torch.optim.Optimizer,
checkpoint_dir: str,
checkpoint_every_steps: int = 500,
s3_backup_bucket: Optional[str] = None,
):
self.model = model
self.optimizer = optimizer
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.checkpoint_every_steps = checkpoint_every_steps
self.s3_bucket = s3_backup_bucket
self.s3_client = boto3.client("s3") if s3_backup_bucket else None
self.global_step = 0
self.epoch = 0
self.interrupted = False
# Register SIGTERM handler (AWS spot gives 2-minute warning via SIGTERM)
signal.signal(signal.SIGTERM, self._handle_sigterm)
signal.signal(signal.SIGINT, self._handle_sigterm)
def _handle_sigterm(self, signum, frame):
"""On SIGTERM, save checkpoint and exit gracefully."""
print(f"\nReceived signal {signum}. Saving checkpoint before termination...")
self.interrupted = True
self._save_checkpoint(is_final=False)
print("Checkpoint saved. Exiting.")
# Return - the training loop will check self.interrupted and exit
def _save_checkpoint(self, is_final: bool = False):
"""Save model, optimizer, and training state."""
checkpoint = {
"global_step": self.global_step,
"epoch": self.epoch,
"model_state_dict": self.model.state_dict(),
"optimizer_state_dict": self.optimizer.state_dict(),
"is_final": is_final,
}
# Save locally first (fast)
checkpoint_path = self.checkpoint_dir / f"checkpoint_step_{self.global_step}.pt"
torch.save(checkpoint, checkpoint_path)
print(f" Checkpoint saved: {checkpoint_path}")
# Keep only the last 3 checkpoints to save disk space
all_checkpoints = sorted(
self.checkpoint_dir.glob("checkpoint_step_*.pt"),
key=lambda p: int(p.stem.split("_")[-1]),
)
for old_checkpoint in all_checkpoints[:-3]:
old_checkpoint.unlink()
# Backup to S3 (async-ish - small file, fast)
if self.s3_bucket:
s3_key = f"checkpoints/{os.environ.get('JOB_ID', 'default')}/latest.pt"
self.s3_client.upload_file(str(checkpoint_path), self.s3_bucket, s3_key)
print(f" Backed up to s3://{self.s3_bucket}/{s3_key}")
# Write metadata file for easy discovery
meta = {
"latest_checkpoint": str(checkpoint_path.name),
"global_step": self.global_step,
"epoch": self.epoch,
"timestamp": time.time(),
}
with open(self.checkpoint_dir / "checkpoint_meta.json", "w") as f:
json.dump(meta, f)
def load_checkpoint(self) -> bool:
"""Load the latest checkpoint if one exists. Returns True if loaded."""
meta_file = self.checkpoint_dir / "checkpoint_meta.json"
# Also check S3 for checkpoints (in case instance is new after interruption)
if self.s3_bucket and not meta_file.exists():
s3_key = f"checkpoints/{os.environ.get('JOB_ID', 'default')}/latest.pt"
local_path = self.checkpoint_dir / "latest_from_s3.pt"
try:
self.s3_client.download_file(self.s3_bucket, s3_key, str(local_path))
checkpoint = torch.load(local_path)
self.model.load_state_dict(checkpoint["model_state_dict"])
self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
self.global_step = checkpoint["global_step"]
self.epoch = checkpoint["epoch"]
print(f"Resumed from S3 checkpoint at step {self.global_step}")
return True
except Exception as e:
print(f"No S3 checkpoint found: {e}")
return False
if not meta_file.exists():
print("No checkpoint found. Starting from scratch.")
return False
with open(meta_file) as f:
meta = json.load(f)
checkpoint_path = self.checkpoint_dir / meta["latest_checkpoint"]
checkpoint = torch.load(checkpoint_path)
self.model.load_state_dict(checkpoint["model_state_dict"])
self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
self.global_step = checkpoint["global_step"]
self.epoch = checkpoint["epoch"]
print(f"Resumed from checkpoint at step {self.global_step}, epoch {self.epoch}")
return True
def train_step(self, batch) -> float:
"""Single training step - implement in subclass."""
raise NotImplementedError
def train(self, train_loader, num_epochs: int):
"""Main training loop with spot-safe checkpointing."""
self.load_checkpoint()
for epoch in range(self.epoch, num_epochs):
self.epoch = epoch
for batch_idx, batch in enumerate(train_loader):
if self.interrupted:
print(f"Training interrupted at epoch {epoch}, step {self.global_step}")
return
loss = self.train_step(batch)
self.global_step += 1
# Log every 100 steps
if self.global_step % 100 == 0:
print(f"Epoch {epoch} | Step {self.global_step} | Loss: {loss:.4f}")
# Checkpoint every N steps
if self.global_step % self.checkpoint_every_steps == 0:
self._save_checkpoint()
# Checkpoint at end of each epoch
self._save_checkpoint()
self._save_checkpoint(is_final=True)
print("Training complete.")
Spot Instance Strategy by Workload Type
def recommend_pricing_model(
training_hours: float,
interruption_cost: str, # "low", "medium", "high"
team_size: int,
jobs_per_week: int,
) -> dict:
"""
Recommend compute pricing model based on workload characteristics.
interruption_cost: "low" = can restart from checkpoint with <5min overhead
"medium" = restart takes 30-60 min to reach same state
"high" = cannot tolerate interruption (real-time serving, etc.)
"""
recommendation = {}
if interruption_cost == "low" and training_hours <= 72:
recommendation["model"] = "spot"
recommendation["expected_savings"] = "60-90% vs on-demand"
recommendation["checkpoint_interval"] = "every 500 steps or 15 minutes"
recommendation["risk"] = "low - checkpoints minimize restart overhead"
elif interruption_cost == "low" and training_hours > 72:
recommendation["model"] = "spot with auto-restart"
recommendation["expected_savings"] = "60-90% vs on-demand"
recommendation["note"] = (
"Configure SageMaker Managed Spot Training or GCP managed "
"preemptible spot with auto-restart for very long jobs"
)
recommendation["checkpoint_interval"] = "every 250 steps"
elif interruption_cost == "medium":
recommendation["model"] = "spot for experiments, on-demand for final run"
recommendation["expected_savings"] = "40-60% overall"
recommendation["note"] = (
"Use spot for hyperparameter tuning and ablations (cheap, restartable), "
"then on-demand for the final training run that produces the production model"
)
elif interruption_cost == "high":
recommendation["model"] = "on-demand or reserved"
if jobs_per_week >= 10:
recommendation["model"] = "reserved (1-year commitment)"
recommendation["expected_savings"] = "40% vs on-demand"
else:
recommendation["model"] = "on-demand"
recommendation["expected_savings"] = "0% but fully reliable"
return recommendation
AWS SageMaker Managed Spot Training
SageMaker has built-in spot training support. It automatically requests spot capacity, saves checkpoints to S3, and restarts if the instance is terminated.
import sagemaker
from sagemaker.estimator import Estimator
def launch_spot_training_job(
role: str,
bucket: str,
training_script: str,
max_training_hours: float = 12,
max_wait_hours: float = 24,
):
estimator = Estimator(
image_uri=sagemaker.image_uris.retrieve("xgboost", "us-east-1", "1.7-1"),
role=role,
instance_count=1,
instance_type="ml.g4dn.xlarge",
# Spot training configuration
use_spot_instances=True,
max_run=int(max_training_hours * 3600),
max_wait=int(max_wait_hours * 3600), # Must be >= max_run
# SageMaker handles checkpoint save/restore automatically
checkpoint_s3_uri=f"s3://{bucket}/checkpoints/",
checkpoint_local_path="/opt/ml/checkpoints",
output_path=f"s3://{bucket}/training-output/",
hyperparameters={
"max_depth": 6,
"n_estimators": 300,
},
)
estimator.fit(
inputs={"train": f"s3://{bucket}/data/train/"},
job_name=f"spot-churn-{int(time.time())}",
wait=False, # Don't block - the job will restart automatically if interrupted
)
print(f"Spot training job submitted: {estimator.latest_training_job.name}")
return estimator
Storage Cost Optimization
S3 Lifecycle Policies for ML Artifacts
ML teams accumulate artifacts at a alarming rate. Every experiment produces model weights, evaluation artifacts, and logs. Without lifecycle policies, these accumulate indefinitely in S3 Standard at $0.023/GB/month.
import boto3
import json
def configure_ml_artifact_lifecycle(bucket_name: str):
"""
Configure S3 lifecycle rules for ML artifact retention.
Strategy:
- Active experiments: Standard (immediate access)
- Completed experiments (30+ days): Standard-IA (rare access)
- Old experiments (90+ days): Glacier Instant Retrieval (archive)
- Very old artifacts (365+ days): Deep Archive (cold storage, minutes to restore)
- Deleted experiments: Clean up immediately via cleanup job
"""
s3 = boto3.client("s3")
lifecycle_config = {
"Rules": [
{
"ID": "ml-experiments-transition",
"Status": "Enabled",
"Filter": {"Prefix": "experiments/"},
"Transitions": [
{
"Days": 30,
"StorageClass": "STANDARD_IA", # ~46% cheaper than Standard
},
{
"Days": 90,
"StorageClass": "GLACIER_IR", # ~68% cheaper than Standard
},
{
"Days": 365,
"StorageClass": "DEEP_ARCHIVE", # ~95% cheaper than Standard
},
],
},
{
"ID": "ml-training-data-transition",
"Status": "Enabled",
"Filter": {"Prefix": "data/processed/"},
"Transitions": [
{
"Days": 60,
"StorageClass": "STANDARD_IA",
},
{
"Days": 180,
"StorageClass": "GLACIER_IR",
},
],
},
{
"ID": "ml-logs-expiration",
"Status": "Enabled",
"Filter": {"Prefix": "logs/"},
"Expiration": {
"Days": 90, # Delete training logs after 90 days
},
},
{
"ID": "ml-tmp-cleanup",
"Status": "Enabled",
"Filter": {"Prefix": "tmp/"},
"Expiration": {
"Days": 7, # Temporary files gone in a week
},
},
]
}
s3.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config,
)
print(f"Lifecycle policy configured for s3://{bucket_name}")
def estimate_storage_savings(
bucket_name: str,
experiment_prefix: str = "experiments/",
) -> dict:
"""
Estimate monthly savings from storage tiering.
Analyzes actual bucket contents.
"""
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
total_bytes = 0
recent_bytes = 0 # < 30 days old (stays in Standard)
mid_age_bytes = 0 # 30-90 days (Standard-IA candidate)
old_bytes = 0 # 90-365 days (Glacier candidate)
ancient_bytes = 0 # > 365 days (Deep Archive candidate)
from datetime import datetime, timezone, timedelta
now = datetime.now(timezone.utc)
for page in paginator.paginate(Bucket=bucket_name, Prefix=experiment_prefix):
for obj in page.get("Contents", []):
age_days = (now - obj["LastModified"]).days
size_bytes = obj["Size"]
total_bytes += size_bytes
if age_days < 30:
recent_bytes += size_bytes
elif age_days < 90:
mid_age_bytes += size_bytes
elif age_days < 365:
old_bytes += size_bytes
else:
ancient_bytes += size_bytes
# Cost per GB per month (approximate)
standard_rate = 0.023
standard_ia_rate = 0.0125 # 46% cheaper
glacier_ir_rate = 0.004 # 83% cheaper
deep_archive_rate = 0.00099 # 96% cheaper
def gb(b): return b / (1024 ** 3)
current_monthly_cost = total_bytes / (1024**3) * standard_rate
optimized_monthly_cost = (
gb(recent_bytes) * standard_rate +
gb(mid_age_bytes) * standard_ia_rate +
gb(old_bytes) * glacier_ir_rate +
gb(ancient_bytes) * deep_archive_rate
)
return {
"total_gb": round(gb(total_bytes), 2),
"current_monthly_cost": round(current_monthly_cost, 2),
"optimized_monthly_cost": round(optimized_monthly_cost, 2),
"monthly_savings": round(current_monthly_cost - optimized_monthly_cost, 2),
"savings_pct": round(
(current_monthly_cost - optimized_monthly_cost) / current_monthly_cost * 100, 1
) if current_monthly_cost > 0 else 0,
"breakdown": {
"recent_gb": round(gb(recent_bytes), 2),
"mid_age_gb": round(gb(mid_age_bytes), 2),
"old_gb": round(gb(old_bytes), 2),
"ancient_gb": round(gb(ancient_bytes), 2),
},
}
Model Artifact Deduplication
The same model weights are often stored multiple times: by the training script, by the MLflow artifact, by the model registry, and by the deployment pipeline. Deduplication using content-addressable storage saves significant space.
import hashlib
import boto3
from pathlib import Path
def deduplicate_model_artifacts(
bucket: str,
prefix: str,
dry_run: bool = True,
) -> dict:
"""
Find duplicate model artifacts in S3 by content hash.
In practice, model.pkl and model.joblib from different experiment runs
of the same hyperparameters are often identical.
"""
s3 = boto3.client("s3")
paginator = s3.get_paginator("list_objects_v2")
# Collect all model files with their ETags (S3's content hash)
hash_to_keys = {}
total_bytes = 0
duplicate_bytes = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
if not any(key.endswith(ext) for ext in [".pkl", ".pt", ".joblib", ".bin"]):
continue
etag = obj["ETag"].strip('"') # S3 ETag is MD5 (for single-part uploads)
size = obj["Size"]
total_bytes += size
if etag not in hash_to_keys:
hash_to_keys[etag] = []
hash_to_keys[etag].append((key, size))
# Find groups of duplicates
duplicates = {k: v for k, v in hash_to_keys.items() if len(v) > 1}
for etag, keys_and_sizes in duplicates.items():
# Keep the first (oldest), delete the rest
_, size = keys_and_sizes[0]
duplicate_count = len(keys_and_sizes) - 1
duplicate_bytes += size * duplicate_count
if not dry_run:
for key, _ in keys_and_sizes[1:]:
s3.delete_object(Bucket=bucket, Key=key)
print(f" Deleted duplicate: {key}")
total_gb = total_bytes / (1024 ** 3)
dup_gb = duplicate_bytes / (1024 ** 3)
print(f"Total artifacts: {total_gb:.2f} GB")
print(f"Duplicate artifacts: {dup_gb:.2f} GB ({dup_gb/total_gb*100:.1f}%)")
print(f"Potential savings: ${dup_gb * 0.023:.2f}/month")
if dry_run:
print("DRY RUN - no files deleted. Set dry_run=False to proceed.")
return {"total_gb": total_gb, "duplicate_gb": dup_gb, "duplicate_groups": len(duplicates)}
Inference Cost Optimization
Inference cost is often larger than training cost in production systems. A model trained once serves millions of requests.
Batching Requests
The single most impactful inference cost reduction for throughput-optimized serving is batching. GPUs are designed for parallel computation. A batch of 32 requests often costs only 3-4x more compute than a single request, but provides 32x the throughput.
import asyncio
import time
from collections import defaultdict
from typing import List, Any
import numpy as np
class DynamicBatcher:
"""
Groups individual prediction requests into batches for efficient GPU inference.
Waits up to max_wait_ms or until max_batch_size is reached.
"""
def __init__(
self,
model,
max_batch_size: int = 32,
max_wait_ms: float = 10.0,
):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending_requests = []
self.lock = asyncio.Lock()
self.batch_ready = asyncio.Event()
async def predict(self, features: dict) -> float:
"""Submit a single prediction request and await the result."""
future = asyncio.Future()
async with self.lock:
self.pending_requests.append((features, future))
if len(self.pending_requests) >= self.max_batch_size:
self.batch_ready.set()
# Wait for this request's result
return await future
async def batch_processor(self):
"""Background task that processes batches."""
while True:
# Wait until batch is ready OR timeout
try:
await asyncio.wait_for(
self.batch_ready.wait(),
timeout=self.max_wait_ms / 1000.0
)
except asyncio.TimeoutError:
pass
async with self.lock:
if not self.pending_requests:
self.batch_ready.clear()
continue
# Take up to max_batch_size requests
batch = self.pending_requests[:self.max_batch_size]
self.pending_requests = self.pending_requests[self.max_batch_size:]
if not self.pending_requests:
self.batch_ready.clear()
# Process the batch
features_batch = [req[0] for req in batch]
futures = [req[1] for req in batch]
try:
# Convert to numpy array for model
X = np.array([list(f.values()) for f in features_batch])
predictions = self.model.predict_proba(X)[:, 1]
for future, pred in zip(futures, predictions):
if not future.done():
future.set_result(float(pred))
except Exception as e:
for future in futures:
if not future.done():
future.set_exception(e)
def estimate_inference_savings_from_batching(
requests_per_second: float,
single_request_ms: float, # Latency for batch_size=1
batch_size: int = 32,
batch_request_ms: float = None, # If None, estimate as 4x single
instance_hourly_cost: float = 0.526, # g4dn.xlarge
) -> dict:
"""Estimate cost savings from batching vs individual requests."""
if batch_request_ms is None:
# Empirical: batch of 32 typically takes 3-4x single request time
batch_request_ms = single_request_ms * 4.0
# Throughput comparison
single_throughput_rps = 1000.0 / single_request_ms # requests per second per instance
batch_throughput_rps = (1000.0 / batch_request_ms) * batch_size
# Instances needed to handle load
instances_without_batching = max(1, int(np.ceil(requests_per_second / single_throughput_rps)))
instances_with_batching = max(1, int(np.ceil(requests_per_second / batch_throughput_rps)))
hourly_cost_without = instances_without_batching * instance_hourly_cost
hourly_cost_with = instances_with_batching * instance_hourly_cost
return {
"requests_per_second": requests_per_second,
"instances_without_batching": instances_without_batching,
"instances_with_batching": instances_with_batching,
"hourly_cost_without": round(hourly_cost_without, 2),
"hourly_cost_with": round(hourly_cost_with, 2),
"hourly_savings": round(hourly_cost_without - hourly_cost_with, 2),
"monthly_savings": round((hourly_cost_without - hourly_cost_with) * 730, 2),
"savings_pct": round(
(1 - instances_with_batching / instances_without_batching) * 100, 1
),
}
Serverless vs Always-On Inference
def compare_inference_deployment_options(
requests_per_day: int,
avg_latency_requirement_ms: float,
model_load_time_seconds: float = 2.0,
on_demand_hourly_cost: float = 0.526,
serverless_per_million_requests: float = 0.20,
serverless_per_gb_second: float = 0.0000166,
model_size_gb: float = 0.5,
avg_inference_ms: float = 50,
) -> dict:
"""
Compare costs of always-on vs serverless inference.
Always-on: pay for 24/7 instance regardless of traffic
Serverless: pay per invocation (Lambda, Cloud Run, etc.)
"""
# Always-on cost (per month = 730 hours)
always_on_monthly = on_demand_hourly_cost * 730
# Serverless cost
requests_per_month = requests_per_day * 30
invocation_cost = (requests_per_month / 1_000_000) * serverless_per_million_requests
gb_seconds = (model_size_gb * avg_inference_ms / 1000) * requests_per_month
compute_cost = gb_seconds * serverless_per_gb_second
serverless_monthly = invocation_cost + compute_cost
# Break-even calculation
# Always-on is cheaper when: always_on < serverless
# Break-even requests = always_on / (serverless cost per request)
cost_per_request_serverless = serverless_monthly / requests_per_month if requests_per_month > 0 else 0
break_even_requests_per_month = (
always_on_monthly / cost_per_request_serverless
if cost_per_request_serverless > 0 else float("inf")
)
# Latency check - serverless has cold start
cold_start_ms = model_load_time_seconds * 1000
meets_latency = cold_start_ms < avg_latency_requirement_ms
return {
"requests_per_month": requests_per_month,
"always_on_monthly": round(always_on_monthly, 2),
"serverless_monthly": round(serverless_monthly, 2),
"cheaper_option": "always-on" if always_on_monthly < serverless_monthly else "serverless",
"savings": round(abs(always_on_monthly - serverless_monthly), 2),
"break_even_requests_per_month": int(break_even_requests_per_month),
"cold_start_ms": cold_start_ms,
"meets_latency_with_serverless": meets_latency,
"recommendation": (
"Use serverless" if serverless_monthly < always_on_monthly and meets_latency
else "Use always-on" if not meets_latency
else "Use always-on (cheaper at this traffic volume)"
),
}
# Example
result = compare_inference_deployment_options(
requests_per_day=10_000,
avg_latency_requirement_ms=500,
model_load_time_seconds=1.5,
)
print(f"Recommendation: {result['recommendation']}")
print(f"Always-on: ${result['always_on_monthly']}/mo")
print(f"Serverless: ${result['serverless_monthly']}/mo")
Model Distillation ROI Calculator
Distilling a large model into a smaller one reduces serving cost at the expense of a one-time training investment.
def distillation_roi(
teacher_model_latency_ms: float,
student_model_latency_ms: float,
teacher_accuracy: float,
student_accuracy: float,
requests_per_second: float,
instance_hourly_cost: float,
distillation_training_cost: float,
acceptable_accuracy_drop: float = 0.02,
) -> dict:
"""
Calculate ROI for model distillation.
Args:
teacher_model_latency_ms: Current (large) model latency
student_model_latency_ms: Distilled (small) model latency
teacher_accuracy: Current model accuracy (e.g., 0.85)
student_accuracy: Distilled model accuracy
requests_per_second: Production request rate
instance_hourly_cost: Inference instance cost per hour
distillation_training_cost: One-time cost to train the student model
acceptable_accuracy_drop: Maximum allowed accuracy degradation
"""
accuracy_drop = teacher_accuracy - student_accuracy
if accuracy_drop > acceptable_accuracy_drop:
return {
"viable": False,
"reason": (
f"Accuracy drop {accuracy_drop:.3f} exceeds threshold {acceptable_accuracy_drop:.3f}. "
"Distillation not recommended."
)
}
# Throughput per instance
teacher_throughput = 1000.0 / teacher_model_latency_ms
student_throughput = 1000.0 / student_model_latency_ms
# Instances needed
teacher_instances = max(1, int(np.ceil(requests_per_second / teacher_throughput)))
student_instances = max(1, int(np.ceil(requests_per_second / student_throughput)))
teacher_monthly = teacher_instances * instance_hourly_cost * 730
student_monthly = student_instances * instance_hourly_cost * 730
monthly_savings = teacher_monthly - student_monthly
# Payback period
payback_months = (
distillation_training_cost / monthly_savings
if monthly_savings > 0 else float("inf")
)
return {
"viable": monthly_savings > 0,
"teacher_instances": teacher_instances,
"student_instances": student_instances,
"teacher_monthly": round(teacher_monthly, 2),
"student_monthly": round(student_monthly, 2),
"monthly_savings": round(monthly_savings, 2),
"annual_savings": round(monthly_savings * 12, 2),
"distillation_cost": distillation_training_cost,
"payback_months": round(payback_months, 1),
"accuracy_drop": round(accuracy_drop, 4),
"latency_improvement_pct": round(
(teacher_model_latency_ms - student_model_latency_ms)
/ teacher_model_latency_ms * 100, 1
),
}
# Example: BERT → DistilBERT for a text classification API
roi = distillation_roi(
teacher_model_latency_ms=120,
student_model_latency_ms=35,
teacher_accuracy=0.892,
student_accuracy=0.878,
requests_per_second=200,
instance_hourly_cost=0.526,
distillation_training_cost=3500,
acceptable_accuracy_drop=0.02,
)
print(f"Payback period: {roi['payback_months']} months")
print(f"Annual savings: ${roi['annual_savings']:,.0f}")
Resource Tagging Strategy
Without tags, it is impossible to know which model, team, or project is responsible for which costs.
def apply_ml_resource_tags(resource_arn: str, tags: dict):
"""Apply standardized tags to AWS ML resources."""
import boto3
sagemaker = boto3.client("sagemaker")
required_tags = {
"team": "required",
"project": "required",
"environment": "required", # dev / staging / prod
"cost-center": "required",
"owner": "required",
}
missing = [k for k in required_tags if k not in tags]
if missing:
raise ValueError(f"Missing required tags: {missing}")
tag_list = [{"Key": k, "Value": v} for k, v in tags.items()]
sagemaker.add_tags(ResourceArn=resource_arn, Tags=tag_list)
# Standard tag set for every ML resource
STANDARD_ML_TAGS = {
"team": "ml-platform",
"project": "churn-prediction",
"environment": "production",
"cost-center": "ML-2024",
"managed-by": "terraform",
}
FinOps Tooling
AWS Cost Explorer Query
import boto3
from datetime import datetime, timedelta
def get_ml_costs_by_tag(
tag_key: str = "project",
start_date: str = None,
end_date: str = None,
) -> dict:
"""Query AWS Cost Explorer for ML costs grouped by project tag."""
client = boto3.client("ce", region_name="us-east-1")
if not end_date:
end_date = datetime.today().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.today() - timedelta(days=30)).strftime("%Y-%m-%d")
response = client.get_cost_and_usage(
TimePeriod={"Start": start_date, "End": end_date},
Granularity="MONTHLY",
Filter={
"Dimensions": {
"Key": "SERVICE",
"Values": [
"Amazon SageMaker",
"Amazon EC2",
"Amazon S3",
"Amazon Elastic Container Registry",
],
}
},
GroupBy=[
{"Type": "TAG", "Key": tag_key},
{"Type": "DIMENSION", "Key": "SERVICE"},
],
Metrics=["UnblendedCost"],
)
# Parse results into a usable structure
cost_breakdown = {}
for result in response["ResultsByTime"]:
for group in result["Groups"]:
keys = group["Keys"]
project = keys[0].replace(f"{tag_key}$", "")
service = keys[1]
cost = float(group["Metrics"]["UnblendedCost"]["Amount"])
if project not in cost_breakdown:
cost_breakdown[project] = {}
cost_breakdown[project][service] = round(cost, 2)
return cost_breakdown
def identify_idle_resources() -> list:
"""Find SageMaker endpoints and notebooks with low or zero traffic."""
sagemaker = boto3.client("sagemaker")
cloudwatch = boto3.client("cloudwatch")
idle_resources = []
# Check SageMaker endpoints
endpoints = sagemaker.list_endpoints(StatusEquals="InService")["Endpoints"]
for endpoint in endpoints:
name = endpoint["EndpointName"]
# Get invocation count over last 7 days
metrics = cloudwatch.get_metric_statistics(
Namespace="AWS/SageMaker",
MetricName="Invocations",
Dimensions=[{"Name": "EndpointName", "Value": name}],
StartTime=datetime.now() - timedelta(days=7),
EndTime=datetime.now(),
Period=604800, # 7 days in seconds
Statistics=["Sum"],
)
total_invocations = sum(
dp["Sum"] for dp in metrics.get("Datapoints", [])
)
if total_invocations == 0:
config = sagemaker.describe_endpoint(EndpointName=name)
idle_resources.append({
"type": "SageMaker Endpoint",
"name": name,
"invocations_7d": 0,
"created": str(config["CreationTime"]),
"action": "DELETE - zero traffic in 7 days",
})
return idle_resources
The 31K Case Study
Here is the specific breakdown of what changed:
| Category | Before | After | Savings/Month |
|---|---|---|---|
| Training compute (on-demand GPUs) | $28,000 | $8,400 (70% spot) | $19,600 |
| Zombie endpoints (zero traffic) | $9,200 | $0 | $9,200 |
| Notebook instances (running 24/7) | $4,800 | $1,200 (scheduled stop) | $3,600 |
| S3 storage (all Standard tier) | $6,100 | $2,800 (lifecycle policies) | $3,300 |
| Training datasets (recomputed daily) | $5,400 | $1,200 (cached features) | $4,200 |
| Data egress (cross-region) | $4,200 | $1,800 (same-region) | $2,400 |
| Idle cluster time (scale to zero off) | $3,100 | $600 | $2,500 |
| Miscellaneous (logging, ECR) | $1,200 | $900 | $300 |
| Total | $62,000 | $16,900 | $45,100 |
The remaining 18K for unavoidable baseline) was reserved instances for the three production serving endpoints, which were locked in at 40% discount on a 1-year commitment.
Common Mistakes
:::danger Not Terminating Hung Training Jobs
The opening scenario of this lesson was a real pattern. Training jobs that hang - due to a bug in the checkpoint logic, a deadlock in distributed training, or a data loading issue - continue to bill indefinitely. Implement a maximum job runtime as a hard limit at the infrastructure level. In SageMaker, set max_run on every Estimator. In GCP Vertex AI, set timeout on every training job. In Azure ML, set timeout_seconds on every component. Never rely on "someone will notice" - set hard limits.
:::
:::danger Storing All Model Artifacts in Standard Storage Forever
Model weights from 50 failed hyperparameter tuning experiments from 18 months ago are sitting in S3 Standard, costing you $0.023/GB/month, and nobody is ever going to use them. This is the most common ML storage waste pattern. Apply lifecycle policies immediately. If you are concerned about losing valuable experiments, implement a policy: any model version with a production deployment or a manually added keep tag is exempt from lifecycle rules. Everything else ages out.
:::
:::warning Cross-Region Data Transfer
Training data in us-east-1 being read by a training job in us-west-2 incurs 100/month in data transfer alone - on top of the compute cost. Always verify that your training data, training compute, and model storage are in the same region. For multi-cloud setups, understand that cross-cloud egress ($0.08-0.09/GB) makes heavy data transfer between clouds economically prohibitive.
:::
:::warning Ignoring Inference Cost When Choosing Model Architecture Teams spend months optimizing training cost and then deploy a model that costs 10x more to serve than the previous one. A BERT-large model with 340M parameters costs ~5,000/month. The same task solved with a distilled model (66M parameters) costs 1,200/month. The distillation project costs $10,000 in engineering time and pays back in 3 months. Always compute inference cost at production scale before committing to a model architecture. :::
Interview Q&A
Q1: Explain the spot instance interruption problem for ML training and the technical approaches to mitigate it.
Answer: Spot/preemptible instances can be terminated with 2 minutes' notice (AWS) or 30 seconds' notice (GCP). For ML training, interruption means losing all training progress since the last checkpoint. The mitigation is checkpoint-and-restart: save model weights, optimizer state, and training step count to durable storage (S3, GCS) at regular intervals - typically every 15-30 minutes or every 500-1000 gradient steps. On restart, load the latest checkpoint and continue from that point. The implementation requires registering a SIGTERM signal handler that saves a checkpoint before the instance is terminated. AWS SageMaker Managed Spot Training automates this: you configure a checkpoint S3 path, and SageMaker handles saving, termination, and restart. The residual risk is losing the work since the last checkpoint - this is acceptable for most training jobs where the checkpoint interval is short relative to the job duration.
Q2: How do you calculate whether a reserved instance commitment is worth making for an ML training cluster?
Answer: Three data points needed: (1) utilization rate - what fraction of time is the instance type actually running; (2) on-demand vs reserved price difference (typically 30-60%); (3) commitment term (1 year or 3 year). The break-even utilization is: (1 - reserved_discount) / 1.0. For a 40% reserved discount, the break-even utilization is 60% - if the instance runs more than 60% of the time, reserved is cheaper. Most scheduled overnight training jobs run 6-8 hours per night, which is 25-33% utilization - below break-even for reserved instances. Reserved instances are worth it for always-on production inference endpoints (100% utilization) and for dedicated training servers that run multi-day training runs regularly. For bursty experimental training, spot instances with no commitment are almost always better.
Q3: What is the practical approach to reducing inference cost by 80% without retraining the model?
Answer: Three techniques that compound: (1) Response caching - if the same input appears multiple times (common for popular items in recommendation systems), cache the prediction in Redis with a TTL appropriate to how stale predictions are acceptable. Typical cache hit rates of 30-70% at zero compute cost. (2) Request batching - group individual requests into batches of 16-64 before hitting the model. GPU throughput scales approximately linearly with batch size up to saturation, while latency increases by only 2-4x. At 200 requests/second, batching to 32 reduces required instances from 20 to 2-3. (3) Quantization - convert model weights from float32 to int8 using post-training quantization. This reduces model size by 4x and typically increases GPU throughput by 2-3x with less than 1% accuracy drop for most tabular and NLP tasks. Combined, these three techniques routinely reduce inference compute cost by 70-85%.
Q4: Describe the tagging strategy you would implement for a team with 5 ML projects and 3 cloud accounts.
Answer: A mandatory tagging policy enforced at the infrastructure level, not as a suggestion. Six required tags: team (who owns this resource), project (which ML project this belongs to), environment (dev/staging/prod - used to set lifecycle and retention policies), cost-center (for chargeback to finance), owner (individual email, for idle resource alerts), and managed-by (terraform/manual - so you know what can be safely deleted). Implement enforcement via AWS Organizations Service Control Policies or GCP Organization Policy that deny resource creation without required tags. For resources created through CI/CD (SageMaker training jobs, Vertex AI pipelines), inject tags programmatically from the pipeline configuration. Run a weekly Cost Explorer report grouped by project tag to give each team their cost attribution. Alert on untagged resources in a Slack channel - make the absence of tags visible.
Q5: How would you reduce the $80K monthly bill described in this lesson in 6 weeks given a team of two engineers?
Answer: Week 1: inventory and quick wins. Run identify_idle_resources() to find zero-traffic endpoints and idle notebooks. Delete or stop them immediately - this is typically the fastest $5-10K reduction. Week 2: storage tiering. Apply S3 lifecycle policies to experiment and training data prefixes. This does not save money immediately (objects must age into cheaper tiers) but sets up future savings. Also run storage deduplication. Week 3: convert training jobs to spot. Audit all recurring training jobs, add checkpoint logic to training scripts, convert to spot instances. This requires engineering work but delivers 60-70% reduction on training compute. Week 4: right-size endpoints. Check CloudWatch metrics for endpoint instances - many production endpoints are over-provisioned. If a ml.m5.4xlarge is at 15% CPU utilization, move to ml.m5.xlarge for 75% cost reduction. Week 5: reserved instances for predictable loads. Identify the endpoints and clusters that run at 80%+ utilization and purchase 1-year reserved instances. Week 6: implement cost monitoring. Set up CloudWatch billing alarms, daily cost reports by project tag, and a maximum job runtime on all training jobs.
Q6: What is the relationship between model size, inference latency, and cost, and how do you make the tradeoff decision?
Answer: Model size drives latency in two ways: memory bandwidth (loading weights from GPU HBM is the bottleneck for large models at small batch sizes) and compute (more parameters = more FLOPS per inference). The relationship is roughly linear - a model with 2x the parameters takes ~2x longer to infer at the same batch size, though this varies significantly by architecture. Cost is proportional to latency × instances needed to handle QPS × instance price. The tradeoff decision requires four numbers: (1) current model accuracy; (2) minimum acceptable accuracy (from the business, not engineering); (3) inference QPS and latency SLA; (4) distillation or quantization development cost. If the accuracy headroom (1 minus 2) allows for a 5-10 point compression while maintaining SLA, and the monthly inference savings exceed the one-time engineering cost in less than 6 months, compression is worth pursuing. For models at the accuracy frontier (where every 0.1% matters), the tradeoff often does not make sense. For models with significant headroom (e.g., AUROC 0.92 where business needs 0.85), aggressive compression is almost always justified.
