:::tip š® Interactive Playground Visualize this concept: Try the Benchmark Explorer demo on the EngineersOfAI Playground - no code required. :::
Benchmarking Compressed Models: Measuring What Actually Matters
The deployment was supposed to be a win. The team had quantized their flagship 7B model to INT4 using AWQ - memory usage down from 14 GB to 4 GB, throughput up 3.5Ć, latency half of what it was. MMLU accuracy dropped only 1.8%. Perplexity on WikiText-2 increased by a modest 4.2%. The infra team had carefully validated both metrics. The compressed model shipped on a Thursday.
By Tuesday of the following week, their enterprise customers in financial services were filing support tickets. The model's portfolio rebalancing recommendations contained arithmetic errors. Not large ones - the kind that look plausible. A portfolio with 60% equity, 30% bonds, 10% alternatives might be presented with the weights labeled correctly but the calculations for expected returns using the wrong totals. The model sounded confident and structured. The math was wrong in 23% of cases, up from 4% with the FP16 model.
The team had benchmarked MMLU and perplexity. They hadn't benchmarked arithmetic. And INT4 quantization had hit multi-step calculation accuracy particularly hard because floating-point arithmetic chains - where each step's rounding error compounds - are the first capability to degrade at low bit-widths. The "4.2% perplexity increase" masked a 19-point drop in arithmetic accuracy. Perplexity is an average over all token predictions; the few percent of tokens that are numbers simply don't dominate the average even when the model is dramatically wrong about them.
This lesson gives you a systematic framework for benchmarking compressed models that would have caught this regression before it shipped: the right evaluation hierarchy, task-specific capability tests that expose what generic benchmarks miss, hardware-realistic latency profiling, and automated regression detection that runs in CI/CD.
The Evaluation Hierarchyā
Compressed model evaluation has four tiers, each catching failures that the previous tier misses:
The critical insight: each tier catches different types of failures. A model can pass Tier 1 and fail Tier 2 (capability collapse), pass both and fail Tier 3 (unacceptable latency), or pass all three but fail Tier 4 (subtle regression vs. last baseline). Skip any tier and you're flying partially blind.
Tier 1: Standard Accuracy Benchmarksā
Standard benchmarks establish that compression hasn't broken general reasoning and knowledge. They're fast, well-understood, and comparable across models.
import torch
import math
import time
import json
import random
from dataclasses import dataclass, field
from typing import Optional
from pathlib import Path
@dataclass
class BenchmarkResult:
"""Structured result from a single benchmark run."""
benchmark_name: str
score: float
baseline_score: Optional[float] = None
n_examples: int = 0
eval_time_s: float = 0.0
metadata: dict = field(default_factory=dict)
@property
def relative_performance(self) -> Optional[float]:
"""Score relative to baseline (1.0 = identical, 0.95 = 5% degradation)."""
if self.baseline_score is not None and self.baseline_score != 0:
return self.score / self.baseline_score
return None
@property
def degradation_pct(self) -> Optional[float]:
"""Percentage degradation from baseline (positive = worse)."""
rel = self.relative_performance
if rel is None:
return None
# For metrics where lower is better (perplexity), invert
if "perplexity" in self.benchmark_name or "loss" in self.benchmark_name:
return (self.score / self.baseline_score - 1.0) * 100 # +5 = 5% worse
else:
return (1.0 - rel) * 100 # +5 = 5% accuracy drop
def __str__(self) -> str:
parts = [f"{self.benchmark_name}: {self.score:.4f}"]
if self.baseline_score is not None:
deg = self.degradation_pct
arrow = "ā" if "perplexity" in self.benchmark_name else "ā"
parts.append(f"(baseline: {self.baseline_score:.4f}, {arrow}{abs(deg):.1f}%)")
if self.n_examples:
parts.append(f"n={self.n_examples}")
if self.eval_time_s:
parts.append(f"in {self.eval_time_s:.1f}s")
return " ".join(parts)
def evaluate_perplexity(
model,
tokenizer,
dataset_name: str = "wikitext",
dataset_config: str = "wikitext-2-raw-v1",
split: str = "test",
stride: int = 512,
max_length: int = 1024,
max_tokens: int = 524288, # ~500K tokens for reliable estimate
) -> BenchmarkResult:
"""
Compute perplexity using a sliding window over the test set.
Perplexity = exp(average negative log-likelihood per token).
Lower is better. It's the primary quick-check metric for LLM compression.
The sliding window approach:
- Processes text in overlapping windows of `max_length` tokens
- Only the last `stride` tokens contribute to the loss at each step
- This prevents the model from being penalized for lacking context at window start
Rule of thumb: < 5% perplexity increase is generally safe.
> 10% increase indicates meaningful quality degradation.
> 20% increase indicates the compression was too aggressive.
"""
from datasets import load_dataset
dataset = load_dataset(dataset_name, dataset_config, split=split)
full_text = "\n\n".join(dataset["text"])
# Tokenize the full corpus
encodings = tokenizer(full_text, return_tensors="pt")
seq_len = encodings.input_ids.size(1)
# Cap at max_tokens for speed
seq_len = min(seq_len, max_tokens)
model.eval()
device = next(model.parameters()).device
nlls = []
n_tokens_evaluated = 0
start_time = time.perf_counter()
prev_end_loc = 0
with torch.no_grad():
for begin_loc in range(0, seq_len - max_length, stride):
end_loc = min(begin_loc + max_length, seq_len)
target_len = end_loc - prev_end_loc # Only evaluate new tokens
input_ids = encodings.input_ids[:, begin_loc:end_loc].to(device)
target_ids = input_ids.clone()
# Mask previous tokens - they provide context but don't contribute to loss
target_ids[:, :-target_len] = -100
outputs = model(input_ids, labels=target_ids)
# outputs.loss is mean NLL over non-masked tokens
nll = outputs.loss.float()
nlls.append(nll * target_len) # Weight by number of tokens
n_tokens_evaluated += target_len
prev_end_loc = end_loc
if end_loc >= seq_len:
break
if not nlls:
return BenchmarkResult(benchmark_name="perplexity", score=float("inf"))
# Weighted average NLL ā perplexity
avg_nll = torch.stack(nlls).sum() / n_tokens_evaluated
ppl = math.exp(avg_nll.item())
return BenchmarkResult(
benchmark_name="perplexity_wikitext2",
score=ppl,
n_examples=n_tokens_evaluated,
eval_time_s=time.perf_counter() - start_time,
metadata={"n_tokens": n_tokens_evaluated, "stride": stride},
)
def evaluate_mmlu(
model,
tokenizer,
n_questions_per_subject: int = 30,
subjects: Optional[list[str]] = None,
shots: int = 5,
) -> BenchmarkResult:
"""
Evaluate on MMLU (Massive Multitask Language Understanding).
Tests knowledge across 57 subjects using multiple-choice questions.
The standard prompt format: 5-shot (5 in-context examples before the question).
Sensitive capabilities measured:
- High school/college level knowledge
- Logical deduction
- Professional knowledge (medicine, law, finance)
Less sensitive: pure arithmetic, complex multi-step reasoning
(those need separate tests - see Tier 2).
"""
from datasets import load_dataset
if subjects is None:
# Balanced sample across domains
subjects = [
"high_school_mathematics", "medical_genetics",
"computer_science", "philosophy", "high_school_physics",
"world_history", "abstract_algebra", "clinical_knowledge",
]
model.eval()
device = next(model.parameters()).device
correct = 0
total = 0
start = time.perf_counter()
# MMLU answer tokens - model should generate one of these
answer_tokens = ["A", "B", "C", "D"]
for subject in subjects:
try:
dataset = load_dataset("cais/mmlu", subject, split="test")
except Exception as e:
print(f" Warning: Could not load {subject}: {e}")
continue
# Build 5-shot examples from validation set
try:
val_dataset = load_dataset("cais/mmlu", subject, split="validation")
few_shot_examples = val_dataset.select(range(min(shots, len(val_dataset))))
except Exception:
few_shot_examples = []
few_shot_prompt = ""
for ex in few_shot_examples:
q = ex["question"]
choices = ex["choices"]
ans_letter = "ABCD"[ex["answer"]]
few_shot_prompt += f"Question: {q}\n"
for i, c in enumerate(choices):
few_shot_prompt += f"{'ABCD'[i]}. {c}\n"
few_shot_prompt += f"Answer: {ans_letter}\n\n"
n_sample = min(n_questions_per_subject, len(dataset))
indices = random.sample(range(len(dataset)), n_sample)
for idx in indices:
item = dataset[idx]
question = item["question"]
choices = item["choices"]
correct_idx = item["answer"]
# Build prompt
prompt = few_shot_prompt
prompt += f"Question: {question}\n"
for i, choice in enumerate(choices):
prompt += f"{'ABCD'[i]}. {choice}\n"
prompt += "Answer:"
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
# Score each possible answer by log-probability
# More robust than just generating and parsing
scores = {}
for letter in answer_tokens:
letter_id = tokenizer.encode(f" {letter}", add_special_tokens=False)[0]
output = model(**inputs)
logits = output.logits[0, -1, :] # Last token logits
scores[letter] = logits[letter_id].item()
predicted_letter = max(scores, key=scores.get)
if predicted_letter == "ABCD"[correct_idx]:
correct += 1
total += 1
return BenchmarkResult(
benchmark_name="mmlu",
score=correct / total if total > 0 else 0.0,
n_examples=total,
eval_time_s=time.perf_counter() - start,
metadata={"subjects": subjects, "shots": shots},
)
Tier 2: Capability Regression Checksā
Generic benchmarks miss localized capability degradation. For each compressed model, test the specific capabilities your application relies on:
import re
# Compression sensitivity by capability
CAPABILITY_SENSITIVITY = {
"arithmetic_calculation": {
"sensitivity": "Very High",
"reason": "Floating-point chains amplify rounding errors at low bit-width",
"typical_int4_drop": "10-20% accuracy loss vs FP16",
},
"code_generation": {
"sensitivity": "High",
"reason": "Syntactic correctness is binary; small errors cause syntax failures",
"typical_int4_drop": "5-12% pass@1 loss",
},
"multi_step_reasoning": {
"sensitivity": "High",
"reason": "Each reasoning step can accumulate quantization noise",
"typical_int4_drop": "5-15% accuracy loss on GSM8K/MATH",
},
"factual_recall": {
"sensitivity": "Medium",
"reason": "Specific facts can be lost but fuzzy matching still works",
"typical_int4_drop": "2-8% accuracy loss on TriviaQA",
},
"summarization": {
"sensitivity": "Low",
"reason": "Quality is diffuse; paraphrasing is robust to low-precision weights",
"typical_int4_drop": "1-3% ROUGE score loss",
},
"classification": {
"sensitivity": "Low",
"reason": "Few output classes; decision boundaries are robust",
"typical_int4_drop": "0-2% accuracy loss",
},
}
def evaluate_arithmetic_capability(
model,
tokenizer,
n_problems: int = 200,
difficulty_mix: dict = None,
device: str = "cuda",
) -> BenchmarkResult:
"""
Evaluate multi-step arithmetic accuracy.
This is the benchmark that catches INT4 regressions that MMLU misses.
Tests: 2-step multiplication, multi-step expressions, word problems with tax/discount.
Scoring: exact match within tolerance of 0.01 (handles float formatting variations).
"""
if difficulty_mix is None:
difficulty_mix = {"easy": 0.4, "medium": 0.4, "hard": 0.2}
problems = []
for _ in range(n_problems):
difficulty = random.choices(
list(difficulty_mix.keys()),
weights=list(difficulty_mix.values()),
)[0]
if difficulty == "easy":
# 2-digit multiplication
a, b = random.randint(12, 99), random.randint(12, 99)
problem = f"Calculate: {a} Ć {b}\nAnswer (number only):"
answer = a * b
tolerance = 0
elif difficulty == "medium":
# Multi-step expression: (a + b) Ć c - d
a = random.randint(10, 50)
b = random.randint(10, 50)
c = random.randint(3, 9)
d = random.randint(5, 40)
problem = f"Calculate: ({a} + {b}) Ć {c} - {d}\nShow your work, then give the final answer as a number."
answer = (a + b) * c - d
tolerance = 0
else:
# Word problem with percentages
items = random.randint(8, 25)
unit_price = round(random.uniform(5.0, 75.0), 2)
discount_pct = random.choice([10, 15, 20, 25])
tax_pct = random.choice([8, 9, 10])
problem = (
f"A customer orders {items} units at ${unit_price:.2f} each. "
f"They receive a {discount_pct}% discount, then {tax_pct}% sales tax is applied. "
f"What is the final total? Round to 2 decimal places.\nAnswer:"
)
discounted = items * unit_price * (1 - discount_pct / 100)
answer = round(discounted * (1 + tax_pct / 100), 2)
tolerance = 0.015 # Allow slight floating-point formatting differences
problems.append({
"problem": problem,
"answer": answer,
"tolerance": tolerance,
"difficulty": difficulty,
})
model.eval()
correct = 0
errors_by_difficulty = {"easy": 0, "medium": 0, "hard": 0}
start = time.perf_counter()
for prob in problems:
inputs = tokenizer(
prob["problem"],
return_tensors="pt",
truncation=True,
max_length=512,
).to(device)
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=60,
do_sample=False,
pad_token_id=tokenizer.pad_token_id,
)
generated = tokenizer.decode(
output_ids[0][inputs.input_ids.shape[1]:],
skip_special_tokens=True,
).strip()
# Extract the last number in the response
# (model often shows work then gives final answer)
numbers = re.findall(r"-?\d+(?:,\d{3})*(?:\.\d+)?", generated)
# Remove commas from formatted numbers like "1,234"
numbers = [n.replace(",", "") for n in numbers]
is_correct = False
if numbers:
try:
predicted = float(numbers[-1]) # Take last number found
is_correct = abs(predicted - prob["answer"]) <= max(prob["tolerance"], 0.01)
except ValueError:
pass
if is_correct:
correct += 1
else:
errors_by_difficulty[prob["difficulty"]] += 1
score = correct / n_problems
return BenchmarkResult(
benchmark_name="arithmetic",
score=score,
n_examples=n_problems,
eval_time_s=time.perf_counter() - start,
metadata={
"errors_by_difficulty": errors_by_difficulty,
"difficulty_mix": difficulty_mix,
},
)
def evaluate_instruction_following(
model,
tokenizer,
device: str = "cuda",
) -> BenchmarkResult:
"""
Test instruction following accuracy using IFEval-style constraints.
Checks: word count constraints, format requirements, negation constraints.
Instruction following degrades at INT4 due to reduced precision
in representing instruction-relevant features.
"""
test_cases = [
{
"instruction": "Write exactly 3 sentences about machine learning. Do not use the word 'algorithm'.",
"checks": [
lambda r: len([s for s in r.split(".") if s.strip()]) == 3,
lambda r: "algorithm" not in r.lower(),
],
},
{
"instruction": "List exactly 5 European countries, one per line, in alphabetical order.",
"checks": [
lambda r: len([l for l in r.strip().split("\n") if l.strip()]) == 5,
],
},
{
"instruction": "Respond in JSON format with keys 'name' and 'age'. Use the name 'Alice' and age 30.",
"checks": [
lambda r: '"name"' in r and '"age"' in r,
lambda r: "Alice" in r,
lambda r: "30" in r,
],
},
{
"instruction": "Write a haiku (5-7-5 syllable structure) about neural networks.",
"checks": [
lambda r: len(r.strip().split("\n")) >= 3,
# Rough check: haiku has 3 lines
],
},
{
"instruction": "Summarize quantum computing in exactly 50 words. Count carefully.",
"checks": [
lambda r: 45 <= len(r.split()) <= 55, # ±5 word tolerance
],
},
]
model.eval()
total_checks = 0
passed_checks = 0
start = time.perf_counter()
for case in test_cases:
inputs = tokenizer(case["instruction"], return_tensors="pt").to(device)
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=200,
do_sample=False,
)
response = tokenizer.decode(
output_ids[0][inputs.input_ids.shape[1]:],
skip_special_tokens=True,
)
for check_fn in case["checks"]:
try:
if check_fn(response):
passed_checks += 1
except Exception:
pass # Count as failed
total_checks += 1
return BenchmarkResult(
benchmark_name="instruction_following",
score=passed_checks / total_checks if total_checks > 0 else 0.0,
n_examples=len(test_cases),
eval_time_s=time.perf_counter() - start,
)
Tier 3: Hardware Performance Benchmarkingā
Accuracy benchmarks tell you whether the model is still good. Latency benchmarks tell you whether it's still fast. These must run on your actual deployment hardware - RTX 3090 numbers don't predict A100 numbers.
Understanding prefill vs. decode is critical for understanding how compression affects latency:
- Quantization primarily speeds up the decode phase (reduces memory bandwidth for weight loading). Expect 2-4Ć ITL improvement from INT4 vs FP16.
- Structured pruning speeds up both phases (fewer attention heads and layers reduce compute). Expect 1.5-2Ć TTFT improvement at 30-40% layer removal.
- Batch size matters differently: small batches are bandwidth-bound (benefit from quantization); large batches are compute-bound (benefit from pruning).
import statistics
import torch
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class LatencyProfile:
"""Complete latency and throughput profile for a model configuration."""
model_name: str
compression_method: str
batch_size: int
input_seq_len: int
n_new_tokens: int
# Time to First Token - dominated by prefill (compute-bound)
ttft_mean_ms: float
ttft_p50_ms: float
ttft_p95_ms: float
# Inter-Token Latency - dominated by decode (memory-bandwidth bound)
itl_mean_ms: float
itl_p95_ms: float
# Total
total_mean_ms: float
total_p99_ms: float
# Throughput
tokens_per_second: float # Total output tokens / second
requests_per_second: float # Complete requests / second
# Memory
peak_vram_gb: float
def benchmark_latency(
model,
tokenizer,
prompt: str = "Explain the difference between transformers and RNNs in detail:",
batch_sizes: list[int] = None,
n_new_tokens: int = 128,
n_runs: int = 30,
warmup_runs: int = 5,
device: str = "cuda",
model_name: str = "model",
compression_method: str = "baseline",
) -> list[LatencyProfile]:
"""
Comprehensive TTFT and ITL benchmarking.
Separates prefill time (TTFT) from decode time (ITL) by:
1. Measuring time to generate 1 token (= TTFT)
2. Measuring time to generate N tokens total
3. ITL = (total - TTFT) / (N - 1)
Args:
batch_sizes: List of batch sizes to test. None = [1, 4, 8]
n_new_tokens: Output tokens to generate
n_runs: Number of timed runs (more = better statistics)
warmup_runs: Runs discarded for GPU warmup
"""
if batch_sizes is None:
batch_sizes = [1, 4, 8]
model.eval()
profiles = []
for batch_size in batch_sizes:
prompts = [prompt] * batch_size
inputs = tokenizer(
prompts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512,
)
inputs = {k: v.to(device) for k, v in inputs.items()}
input_seq_len = inputs["input_ids"].shape[1]
# GPU warmup - critical: first run is always slower due to CUDA JIT
print(f" Warming up batch_size={batch_size}...")
for _ in range(warmup_runs):
with torch.no_grad():
model.generate(
**inputs,
max_new_tokens=20,
do_sample=False,
pad_token_id=tokenizer.pad_token_id,
)
if torch.cuda.is_available():
torch.cuda.synchronize(device)
torch.cuda.reset_peak_memory_stats(device)
ttft_samples = []
itl_samples = []
total_samples = []
print(f" Benchmarking batch_size={batch_size}, {n_runs} runs...")
for run_idx in range(n_runs):
# --- Measure TTFT (time to first token = prefill time) ---
if torch.cuda.is_available():
torch.cuda.synchronize(device)
t_ttft_start = time.perf_counter()
with torch.no_grad():
model.generate(
**inputs,
max_new_tokens=1,
do_sample=False,
pad_token_id=tokenizer.pad_token_id,
)
if torch.cuda.is_available():
torch.cuda.synchronize(device)
ttft_ms = (time.perf_counter() - t_ttft_start) * 1000
ttft_samples.append(ttft_ms)
# --- Measure total generation time ---
if torch.cuda.is_available():
torch.cuda.synchronize(device)
t_total_start = time.perf_counter()
with torch.no_grad():
model.generate(
**inputs,
max_new_tokens=n_new_tokens,
do_sample=False,
pad_token_id=tokenizer.pad_token_id,
)
if torch.cuda.is_available():
torch.cuda.synchronize(device)
total_ms = (time.perf_counter() - t_total_start) * 1000
total_samples.append(total_ms)
# ITL = time for tokens 2 through N, divided by (N-1)
if n_new_tokens > 1:
itl_ms = (total_ms - ttft_ms) / (n_new_tokens - 1)
itl_samples.append(itl_ms)
# Compute statistics
peak_vram_gb = 0.0
if torch.cuda.is_available():
peak_vram_gb = torch.cuda.max_memory_allocated(device) / 1024**3
ttft_sorted = sorted(ttft_samples)
total_sorted = sorted(total_samples)
itl_sorted = sorted(itl_samples) if itl_samples else [0.0]
def percentile(data, p):
idx = min(int(len(data) * p), len(data) - 1)
return data[idx]
tokens_per_second = (n_new_tokens * batch_size) / (statistics.mean(total_samples) / 1000)
requests_per_second = batch_size / (statistics.mean(total_samples) / 1000)
profile = LatencyProfile(
model_name=model_name,
compression_method=compression_method,
batch_size=batch_size,
input_seq_len=input_seq_len,
n_new_tokens=n_new_tokens,
ttft_mean_ms=statistics.mean(ttft_samples),
ttft_p50_ms=percentile(ttft_sorted, 0.50),
ttft_p95_ms=percentile(ttft_sorted, 0.95),
itl_mean_ms=statistics.mean(itl_samples) if itl_samples else 0.0,
itl_p95_ms=percentile(itl_sorted, 0.95),
total_mean_ms=statistics.mean(total_samples),
total_p99_ms=percentile(total_sorted, 0.99),
tokens_per_second=tokens_per_second,
requests_per_second=requests_per_second,
peak_vram_gb=peak_vram_gb,
)
profiles.append(profile)
print(f" batch={batch_size}: "
f"TTFT={profile.ttft_mean_ms:.1f}ms (p95={profile.ttft_p95_ms:.1f}ms), "
f"ITL={profile.itl_mean_ms:.2f}ms, "
f"throughput={profile.tokens_per_second:.0f} tok/s, "
f"VRAM={profile.peak_vram_gb:.2f} GB")
return profiles
def find_optimal_batch_size(
model,
tokenizer,
prompt: str = "Explain quantum computing:",
n_new_tokens: int = 100,
max_batch: int = 64,
device: str = "cuda",
) -> dict:
"""
Binary search for the batch size that maximizes throughput.
GPU memory sets a hard ceiling; throughput typically peaks before OOM
because large batches cause KV cache to dominate memory.
"""
model.eval()
results = []
batch_size = 1
while batch_size <= max_batch:
prompts = [prompt] * batch_size
inputs = tokenizer(prompts, return_tensors="pt", padding=True).to(device)
try:
# Warmup
with torch.no_grad():
model.generate(**inputs, max_new_tokens=10, do_sample=False,
pad_token_id=tokenizer.pad_token_id)
# Benchmark 5 runs
times = []
for _ in range(5):
t0 = time.perf_counter()
with torch.no_grad():
model.generate(**inputs, max_new_tokens=n_new_tokens,
do_sample=False, pad_token_id=tokenizer.pad_token_id)
times.append(time.perf_counter() - t0)
avg_time = statistics.mean(times)
tps = batch_size * n_new_tokens / avg_time
vram_gb = torch.cuda.max_memory_allocated(device) / 1024**3 if torch.cuda.is_available() else 0
results.append({
"batch_size": batch_size,
"tokens_per_second": round(tps, 1),
"latency_ms": round(avg_time * 1000, 1),
"vram_gb": round(vram_gb, 2),
})
print(f" batch={batch_size}: {tps:.0f} tok/s, "
f"{avg_time*1000:.0f}ms, {vram_gb:.1f} GB VRAM")
batch_size *= 2
except RuntimeError as e:
if "out of memory" in str(e).lower():
print(f" OOM at batch_size={batch_size} - stopping")
torch.cuda.empty_cache()
break
raise
if not results:
return {}
best = max(results, key=lambda x: x["tokens_per_second"])
return {
"results_by_batch": results,
"optimal_batch_size": best["batch_size"],
"max_tokens_per_second": best["tokens_per_second"],
"max_vram_at_optimal": best["vram_gb"],
}
Tier 4: Automated Regression Detectionā
The previous tiers catch regressions when you run them manually. Tier 4 makes regression detection automatic, continuous, and blocking - integrated into your deployment pipeline.
import json
from datetime import datetime
from pathlib import Path
from typing import Any
class CompressionRegressionDetector:
"""
Automated regression detector for compressed model deployments.
Workflow:
1. Run full benchmark suite on the approved baseline model ā save as baseline.json
2. For every compression change: run benchmarks, compare to baseline.json
3. Fail deployment if any metric exceeds threshold
The thresholds encode your quality requirements - they should be set
based on user-facing impact, not arbitrary percentages.
"""
# Default thresholds - customize per use case
DEFAULT_THRESHOLDS = {
# Accuracy metrics: negative = allowed drop (score can fall by this fraction)
"mmlu": -0.03, # Max 3% MMLU accuracy drop
"arithmetic": -0.08, # Max 8% arithmetic accuracy drop - tighter for math-heavy apps
"instruction_following": -0.05, # Max 5% IF accuracy drop
# Perplexity: positive = allowed increase (lower is better)
"perplexity_wikitext2": 0.05, # Max 5% perplexity increase
# Latency: positive = allowed increase
"ttft_mean_ms": 0.30, # Max 30% TTFT increase (some compression can slow prefill)
"itl_mean_ms": -0.20, # ITL should IMPROVE by at least 20% for any speedup to matter
# Memory: must improve (negative = must decrease)
"peak_vram_gb": -0.10, # VRAM must decrease by at least 10% (we're compressing!)
}
def __init__(
self,
baseline_path: str,
thresholds: Optional[dict] = None,
strict_mode: bool = False, # If True, warn-level also blocks
):
self.baseline_path = Path(baseline_path)
self.thresholds = {**self.DEFAULT_THRESHOLDS, **(thresholds or {})}
self.strict_mode = strict_mode
self.baseline = self._load_baseline()
def _load_baseline(self) -> dict:
if self.baseline_path.exists():
with open(self.baseline_path) as f:
data = json.load(f)
print(f"Loaded baseline from {self.baseline_path} "
f"(recorded: {data.get('timestamp', 'unknown')})")
return data
print(f"No baseline found at {self.baseline_path} - first run will create it")
return {}
def save_as_baseline(self, results: dict) -> None:
"""Save current results as the new baseline."""
results = {**results, "timestamp": datetime.now().isoformat()}
self.baseline_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.baseline_path, "w") as f:
json.dump(results, f, indent=2)
print(f"Baseline saved to {self.baseline_path}")
def _is_lower_better(self, metric: str) -> bool:
"""Determine if lower values are better for this metric."""
lower_better_keywords = ["perplexity", "loss", "latency", "ms", "vram", "gb", "error"]
return any(kw in metric.lower() for kw in lower_better_keywords)
def detect_regressions(
self,
compressed_results: dict,
) -> list[dict]:
"""
Compare compressed model results against baseline.
Returns list of regression events with severity.
"""
if not self.baseline:
print("No baseline - cannot detect regressions")
return []
regressions = []
for metric, threshold in self.thresholds.items():
if metric not in compressed_results:
print(f" Note: metric '{metric}' not in compressed results - skipping")
continue
if metric not in self.baseline:
print(f" Note: metric '{metric}' not in baseline - skipping")
continue
baseline_val = self.baseline[metric]
compressed_val = compressed_results[metric]
if baseline_val == 0:
continue
# Relative change: positive = compressed is higher than baseline
relative_change = (compressed_val - baseline_val) / abs(baseline_val)
lower_is_better = self._is_lower_better(metric)
# Determine if this is a regression
if lower_is_better:
# For latency/perplexity/VRAM: positive change = worse
# threshold is max allowed positive change
is_regression = relative_change > threshold
else:
# For accuracy: negative change = worse
# threshold is max allowed negative change (e.g., -0.03 = 3% drop allowed)
is_regression = relative_change < threshold
if is_regression:
# Severity: critical if 2Ć over threshold, warning otherwise
threshold_magnitude = abs(threshold)
change_magnitude = abs(relative_change)
severity = "critical" if change_magnitude > threshold_magnitude * 2 else "warning"
regressions.append({
"metric": metric,
"baseline_value": round(baseline_val, 4),
"compressed_value": round(compressed_val, 4),
"relative_change_pct": round(relative_change * 100, 2),
"threshold_pct": round(threshold * 100, 2),
"severity": severity,
"lower_is_better": lower_is_better,
})
return regressions
def generate_report(
self,
compressed_results: dict,
compression_method: str,
model_name: str,
) -> tuple[str, bool]:
"""
Generate human-readable benchmark comparison report.
Returns: (report_text, deployment_approved)
"""
regressions = self.detect_regressions(compressed_results)
critical = [r for r in regressions if r["severity"] == "critical"]
warnings = [r for r in regressions if r["severity"] == "warning"]
deployment_approved = len(critical) == 0
if self.strict_mode:
deployment_approved = deployment_approved and len(warnings) == 0
lines = [
"=" * 60,
f"COMPRESSION BENCHMARK REPORT",
f"Model: {model_name}",
f"Compression: {compression_method}",
f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
"=" * 60,
"",
"METRIC COMPARISON (baseline ā compressed):",
]
# Display all metrics
all_metrics = sorted(set(list(self.baseline.keys()) + list(compressed_results.keys())))
for metric in all_metrics:
if metric in ("timestamp", "model_name", "compression_method"):
continue
baseline_val = self.baseline.get(metric)
compressed_val = compressed_results.get(metric)
if baseline_val is None or compressed_val is None:
continue
if isinstance(baseline_val, (int, float)) and isinstance(compressed_val, (int, float)):
rel_change = (compressed_val - baseline_val) / abs(baseline_val) * 100
direction = "ā" if compressed_val > baseline_val else "ā"
lines.append(f" {metric:<35} {baseline_val:.4f} ā {compressed_val:.4f} "
f"({direction}{abs(rel_change):.1f}%)")
else:
lines.append(f" {metric:<35} {baseline_val} ā {compressed_val}")
lines.append("")
if critical:
lines.append(f"CRITICAL REGRESSIONS ({len(critical)} - BLOCKING DEPLOYMENT):")
for r in critical:
dir_label = "increase" if r["lower_is_better"] else "drop"
lines.append(f" [CRITICAL] {r['metric']}: "
f"{r['relative_change_pct']:+.1f}% {dir_label} "
f"(limit: {r['threshold_pct']:+.1f}%)")
if warnings:
lines.append(f"\nWARNINGS ({len(warnings)} - review before deployment):")
for r in warnings:
dir_label = "increase" if r["lower_is_better"] else "drop"
lines.append(f" [WARNING] {r['metric']}: "
f"{r['relative_change_pct']:+.1f}% {dir_label} "
f"(limit: {r['threshold_pct']:+.1f}%)")
if not regressions:
lines.append("No regressions detected. All metrics within acceptable thresholds.")
lines.append("")
status = "APPROVED" if deployment_approved else "BLOCKED"
lines.append(f"DEPLOYMENT STATUS: {status}")
lines.append("=" * 60)
return "\n".join(lines), deployment_approved
The Complete Benchmarking Suiteā
def run_full_benchmark_suite(
compressed_model,
tokenizer,
baseline_path: str = "./benchmarks/baseline.json",
compression_method: str = "AWQ INT4",
model_name: str = "llama-7b",
output_dir: str = "./benchmarks",
run_latency: bool = True,
latency_batch_sizes: list[int] = None,
n_mmlu_per_subject: int = 30,
n_arithmetic: int = 200,
device: str = "cuda",
) -> tuple[dict, bool]:
"""
Run the complete 4-tier benchmark suite and generate a deployment decision.
Typical runtime on single A100:
- Tier 1 (PPL + MMLU): 20-40 min
- Tier 2 (arithmetic + IF): 5-10 min
- Tier 3 (latency): 5-15 min
- Tier 4 (regression detection): < 1 min
Total: ~35-65 minutes
Returns: (results_dict, deployment_approved)
"""
import os
os.makedirs(output_dir, exist_ok=True)
if latency_batch_sizes is None:
latency_batch_sizes = [1, 4, 8]
results = {
"model_name": model_name,
"compression_method": compression_method,
"timestamp": datetime.now().isoformat(),
}
print("=" * 60)
print("TIER 1: STANDARD ACCURACY BENCHMARKS")
print("=" * 60)
print("\n[1/5] Perplexity on WikiText-2...")
ppl_result = evaluate_perplexity(compressed_model, tokenizer)
results["perplexity_wikitext2"] = ppl_result.score
print(f" {ppl_result}")
print("\n[2/5] MMLU (knowledge and reasoning)...")
mmlu_result = evaluate_mmlu(compressed_model, tokenizer, n_questions_per_subject=n_mmlu_per_subject)
results["mmlu"] = mmlu_result.score
print(f" {mmlu_result}")
print("\n" + "=" * 60)
print("TIER 2: CAPABILITY REGRESSION CHECKS")
print("=" * 60)
print("\n[3/5] Arithmetic accuracy...")
arith_result = evaluate_arithmetic_capability(
compressed_model, tokenizer, n_problems=n_arithmetic, device=device
)
results["arithmetic"] = arith_result.score
print(f" {arith_result}")
print(f" Errors by difficulty: {arith_result.metadata.get('errors_by_difficulty')}")
print("\n[4/5] Instruction following...")
if_result = evaluate_instruction_following(compressed_model, tokenizer, device=device)
results["instruction_following"] = if_result.score
print(f" {if_result}")
print("\n" + "=" * 60)
print("TIER 3: HARDWARE PERFORMANCE BENCHMARKS")
print("=" * 60)
if run_latency:
print(f"\n[5/5] Latency benchmarking (batch sizes: {latency_batch_sizes})...")
latency_profiles = benchmark_latency(
compressed_model,
tokenizer,
batch_sizes=latency_batch_sizes,
n_new_tokens=128,
n_runs=20,
device=device,
model_name=model_name,
compression_method=compression_method,
)
if latency_profiles:
# Record single-request profile (batch_size=1) for regression detection
single = latency_profiles[0]
results["ttft_mean_ms"] = single.ttft_mean_ms
results["ttft_p95_ms"] = single.ttft_p95_ms
results["itl_mean_ms"] = single.itl_mean_ms
results["total_p99_ms"] = single.total_p99_ms
results["tokens_per_second_bs1"] = single.tokens_per_second
results["peak_vram_gb"] = single.peak_vram_gb
print("\n" + "=" * 60)
print("TIER 4: REGRESSION DETECTION")
print("=" * 60)
detector = CompressionRegressionDetector(baseline_path)
# If no baseline exists, save current results as baseline and exit
if not detector.baseline:
print("\nNo baseline found - saving current results as baseline")
detector.save_as_baseline(results)
print("Re-run after applying compression to detect regressions")
return results, True
report, deployment_approved = detector.generate_report(results, compression_method, model_name)
print("\n" + report)
# Save results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_path = Path(output_dir) / f"{compression_method.replace(' ', '_').lower()}_{timestamp}.json"
with open(results_path, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {results_path}")
return results, deployment_approved
Key Metrics Referenceā
| Metric | What It Measures | Compression Sensitivity | Acceptable INT4 Loss |
|---|---|---|---|
| Perplexity (WikiText-2) | General language modeling quality | Low (averages out) | < 5% increase |
| MMLU | Knowledge recall and basic reasoning | Medium | < 3% drop |
| GSM8K / Arithmetic | Multi-step math reasoning | Very High | < 8% drop |
| HumanEval (pass@1) | Code generation correctness | High | < 5% drop |
| IFEval / Instruction Following | Constraint adherence | Medium | < 5% drop |
| TTFT (Time to First Token) | Perceived responsiveness | Should improve | Must not regress > 10% |
| ITL (Inter-Token Latency) | Sustained generation speed | Strong improvement | Should improve 2-4Ć |
| Peak VRAM | GPU memory footprint | Strong improvement | Must decrease |
| Max Throughput (tok/s) | Server-side capacity | Strong improvement | Should improve 2-4Ć |
:::danger Perplexity Alone is Not Sufficient for Compressed Model Validation Perplexity measures average token prediction quality across the full distribution. It cannot detect localized degradation in specific capabilities - a 5% perplexity increase might hide a 20% arithmetic accuracy drop (because arithmetic tokens are <2% of the text distribution and don't dominate the average). Always run task-specific capability benchmarks (arithmetic, code, reasoning) as separate Tier 2 tests before declaring a compressed model safe to deploy. :::
:::warning Benchmark on the Same Hardware You Will Deploy On A100 and RTX 3090 have different memory bandwidth (2 TB/s vs 936 GB/s), different Tensor Core throughput, and different NVLink configurations. INT4 speedup from AWQ can be 15% larger on A100 than on RTX, or vice versa depending on the batch size and model architecture. Always run Tier 3 hardware benchmarks on your production hardware - not your development machine. If CI runs on different hardware than production, record both and flag when they diverge significantly. :::
:::tip Integrate Regression Detection into Your Deployment CI/CD Add the benchmark suite to your model deployment pipeline: every time a new compressed model is produced (from a compression config change, calibration data update, or base model update), automatically run Tiers 1-2 and compare against the saved baseline. Block deployment if regressions are detected. The benchmark suite (PPL + MMLU subset + arithmetic + latency) typically runs in 40-60 minutes on a single GPU - acceptable for a CI gate. This is the only reliable way to prevent compression regressions from reaching production users. :::
Interview Questionsā
Q: Why is perplexity insufficient as the sole quality metric for compressed models?
A: Perplexity is the exponential of the average cross-entropy loss across all predicted tokens. "Average" is the problem. In a typical LLM evaluation corpus, arithmetic expressions, precise factual claims, and code syntax constitute a small fraction of the total tokens - perhaps 1-5%. When INT4 quantization degrades these capabilities by 15-20%, the perplexity increase is proportionally small: 0.01 Ć 0.20 = 0.002 average loss increase, which is < 1% perplexity increase. The model passes the perplexity gate while having dramatically degraded arithmetic and code capabilities.
The fix: treat perplexity as a screening metric only. If perplexity increases by more than 10%, something is wrong with the compression. If perplexity looks fine, you still need task-specific evaluation for arithmetic (GSM8K, custom problem sets), code (HumanEval), and factual recall (TriviaQA). These capabilities degrade non-uniformly and require dedicated benchmarks.
Q: What is the difference between TTFT and ITL, and how does each relate to the prefill vs. decode phases?
A: LLM generation has two distinct phases:
Prefill: The model processes all N input prompt tokens simultaneously, computing attention over the full sequence. This is compute-bound - all GPU cores and Tensor Cores are active, doing O(N²) attention computation plus N à d_model feedforward computation. Prefill time scales with input length. TTFT (Time to First Token) = prefill time.
Decode: The model generates one output token at a time. Each step reads the full weight matrix from HBM (High Bandwidth Memory) to compute a single token's output - but only does a tiny amount of actual computation. This is memory-bandwidth bound - the bottleneck is how fast you can load weights from HBM. ITL (Inter-Token Latency) = decode time per token.
Compression affects these differently:
- Quantization (AWQ/GPTQ INT4) primarily improves ITL. Reducing weight size from FP16 to INT4 (4Ć smaller) means 4Ć less data loaded from HBM per decode step ā up to 4Ć ITL improvement (practically 2-3Ć due to dequantization overhead and memory alignment).
- Structured pruning improves TTFT (fewer layers = fewer compute steps in prefill) and ITL (fewer weights to load).
- Increasing batch size shifts the bottleneck: small batches are bandwidth-bound; large batches become compute-bound (multiple sequences share prefill).
Q: How do you design a regression detection system that catches compression failures before production?
A: Four components:
-
Baseline establishment: Before any compression, run the full benchmark suite on the FP16 model and save results to
baseline.json. This is the reference every future compressed model is compared against. -
Threshold configuration: Per-metric thresholds that reflect your application's requirements, not arbitrary numbers. If you're a financial platform: arithmetic accuracy threshold should be tight (-3% max). If you're a creative writing tool: looser thresholds (-8% or more) are appropriate. Thresholds should encode "what degradation would cause user-facing problems."
-
CI/CD integration: Run benchmarks automatically on every compression configuration change. This means: a script that loads the compressed model, runs Tier 1-2 benchmarks (~40 minutes), compares against baseline.json, and exits with code 1 (blocking deployment) if any regression is detected.
-
Severity tiering: Distinguish "warning" regressions (metric is degrading but within 2Ć of threshold - worth reviewing) from "critical" regressions (metric exceeds threshold - deployment is blocked). Route warnings to weekly review; route criticals to immediate on-call alert.
The key insight: regression detection is only as good as the metrics you track. Track perplexity and MMLU, and you'll ship INT4 models with broken arithmetic. Track perplexity, MMLU, and arithmetic accuracy, and you'll catch the failure at compression time instead of three weeks into production.
Q: What is the typical accuracy-efficiency tradeoff curve for INT4 quantization, and where are the safe operating points?
A: The tradeoff is not linear - it has a "safe region" and a "cliff":
Safe region (INT8, INT4 with good calibration):
- WikiText-2 perplexity: < 3% increase
- MMLU: < 2% drop
- Arithmetic: < 5% drop
- Throughput improvement: 2-3Ć over FP16 (ITL)
- VRAM: 4Ć reduction
Warning zone (aggressive INT4, some architectures):
- Perplexity: 3-8% increase
- Arithmetic: 5-12% drop
- Throughput: similar improvement
- May still be acceptable for use cases where arithmetic/precision is not critical
Cliff region (INT3 or poorly calibrated INT4):
- Perplexity: > 10% increase
- Arithmetic: > 20% drop
- Model behavior becomes unpredictably wrong on specific input patterns
- Not acceptable for any production use
The safe/warning boundary depends on model size (larger models tolerate more quantization) and calibration quality (high-quality calibration data narrows the cliff). The transition from warning to cliff is sharp - a 5% additional sparsity or 0.5-bit reduction can suddenly cause catastrophic failure in specific capabilities. Always benchmark at the exact bit-width and calibration setting you'll use in production, not at nearby configurations.
Q: How do you structure a benchmarking pipeline for a team releasing compressed models monthly?
A: A mature team needs a systematic process with three key artifacts:
-
Benchmark registry: A version-controlled JSON file (
benchmarks/baselines/) with one baseline per major model version. When you upgrade from Llama-2-7B to Llama-3-8B, create a new baseline. When you change compression method (AWQ ā GPTQ), record both the new baseline and the compression-specific results. -
Automated benchmark suite: A script that runs all four tiers and outputs both a human-readable report and a machine-readable results JSON. This script runs in CI/CD on every pull request that modifies compression config, calibration data, or base model. ~45-60 minute runtime is acceptable for a deployment gate.
-
Metric dashboard: Track all key metrics over time for each model family. Plot perplexity, MMLU, arithmetic, TTFT, and throughput across compression configurations and dates. This makes it easy to see "did the June calibration data update improve arithmetic by more than it hurt MMLU?" without re-running all benchmarks.
Additional practices: (a) Run latency benchmarks on production hardware weekly (hardware drivers update, memory fragmentation affects latency); (b) Keep 3 baselines: the best-quality FP16, the current production compressed model, and the previous production compressed model. This catches regressions introduced in your most recent compression update even when they're better than FP16; (c) Store calibration data alongside the baseline - future re-compression should use the same calibration data for reproducibility.
