Building an Evaluation Harness
The Model That Shipped Without a Net
The fine-tuning job finished at 2:47 AM. The training loss looked great. The sample outputs looked great. The engineer pushed the model weights to the serving infrastructure and went to sleep with a sense of accomplishment.
By 9 AM the next morning, three customer success tickets had come in. The new model - trained to be more concise and task-focused - had improved on the target task but quietly regressed on a critical adjacent capability. The customer support summarization use case, which had been working reliably for months, was now producing summaries that omitted the resolution status. Not wrong exactly, just incomplete in a specific way that required human review on every output. The regression cost two days of manual QA to fully characterize and another three days of fine-tuning to fix.
The engineer had evaluated the model. Carefully, even. They ran it on a sample of held-out examples from the training distribution, reviewed a few dozen outputs, checked the target task metrics. What they had not done was run a systematic evaluation across the full range of capabilities the model was responsible for. There was no automated check that would have caught "summarization now omits resolution status." There was no comparison against a previous checkpoint. There was no regression gate that would have blocked the deployment.
This is the gap that an evaluation harness fills. Not one-off manual evaluation - that is necessary but not sufficient. A harness is the infrastructure that makes evaluation systematic, reproducible, automated, and integrated into the deployment workflow. It is the difference between "we checked this before shipping" and "we have a defined bar and nothing ships unless it clears it."
Building an evaluation harness is engineering work, not research work. It requires the same discipline you apply to a data pipeline or a model serving system: well-defined interfaces, versioned configs, automated runs, structured result storage, and dashboards that make regressions visible. This lesson covers all of it - from the architecture of the harness itself to integrating it with CI/CD and tracking results over time.
Why This Exists - The Cost of Manual Evaluation
Before evaluation harnesses became standard practice, the workflow looked like this: train model, spot-check outputs manually, maybe run a favorite benchmark manually, ship if it seemed fine. This worked acceptably when models were simple, few in number, and serving narrow use cases.
As model deployment scaled, the problems with manual evaluation became acute. First, manual evaluation does not scale. A team shipping one model per month can review outputs manually. A team shipping checkpoints weekly, running ablation studies, and maintaining multiple model variants cannot. The bottleneck shifts from training to evaluation.
Second, manual evaluation is not reproducible. Two engineers reviewing the same model outputs will emphasize different failure modes, have different quality bars, and produce different assessments. When a model regresses, it is often unclear whether the regression was always present, when it started, or which change introduced it. Without versioned evaluation runs against a consistent benchmark suite, you cannot answer these questions.
Third, manual evaluation is not integrated. It is a gate at the end of training, run by humans, dependent on someone remembering to do it. Automated evaluation can run on every checkpoint, every PR, every hyperparameter sweep. It transforms evaluation from a periodic ceremony into continuous measurement.
The lm-evaluation-harness project (Gao et al., 2021) from EleutherAI was the first widely adopted open framework for standardizing LLM evaluation. Before it, every research team ran their own ad-hoc benchmark code, making result comparison across papers unreliable. The harness provided a common task format, a standard few-shot templating system, and a reproducible execution environment. HuggingFace's Open LLM Leaderboard is built on it.
For production deployment, the harness pattern extends further: not just standardized benchmarks but custom tasks specific to your application, regression gates tied to CI/CD, result storage with version tracking, and dashboards that surface capability trends over time.
Historical Context - From Ad-Hoc Scripts to Infrastructure
The first generation of LLM evaluation was entirely ad-hoc. OpenAI's original GPT papers reported perplexity on held-out text. BERT's evaluation was task-specific: run the fine-tuned model on the GLUE benchmark tasks. Each benchmark had its own code, its own format, its own quirks.
The problem became critical around 2020-2021 when the open-source community started training and releasing models at scale. EleutherAI was training GPT-NeoX models and needed a way to evaluate them against the same benchmarks GPT-3 had been evaluated on, but OpenAI had not released their evaluation code. Gao et al. built lm-evaluation-harness from scratch - a framework that could implement any language modeling benchmark as a standardized task and run it against any HuggingFace-compatible model.
The key design insight of lm-evaluation-harness was separating task definition from model execution. A task is a pure data specification: how to format examples, what metric to compute, how to handle few-shot prompting. The model is a pluggable backend. This separation made it possible to add new tasks without touching model code and to evaluate new models without rewriting benchmark code.
The HuggingFace Open LLM Leaderboard (launched in 2023) adopted lm-evaluation-harness as its evaluation backend, which drove rapid adoption across the open-source community. Suddenly every model release came with harness-compatible evaluation results, making comparison between models meaningful for the first time.
The next evolution - which is where production engineering is now - is extending this pattern beyond academic benchmarks to application-specific evaluation. The architecture is the same: standardized task format, pluggable model interface, versioned configs, automated runs. The tasks are custom to your domain rather than drawn from academic benchmarks.
Core Concepts
The Evaluation Harness Architecture
A production evaluation harness has five components:
1. Model Interface - An abstract layer that hides the details of how you run the model. Whether you are calling a local HuggingFace model, a vLLM server, an OpenAI-compatible API, or a remote endpoint, the rest of the harness should not need to know. The interface exposes exactly two operations: generate(prompts, params) -> completions and logprobs(prompt, continuation) -> float.
2. Task Registry - A catalog of all evaluation tasks the harness knows about. Each task is a self-contained specification: dataset location, prompt template, few-shot examples, metric function, and pass/fail threshold. Tasks are versioned so that results from two months ago are comparable to today's results even if the underlying dataset is updated.
3. Metrics Library - A collection of scoring functions that convert model outputs into numbers. Exact match, F1, pass@k for code, BLEU, ROUGE, perplexity, LLM-as-judge scores. The library handles normalization (lowercasing, punctuation stripping) and provides confidence intervals where appropriate.
4. Result Storage - A structured database of every evaluation run: which model checkpoint, which tasks, which parameters, timestamps, per-example scores, aggregate metrics, and metadata. This is the foundation of regression detection and trend analysis.
5. Reporting Layer - Dashboards and automated summaries that surface results to the team. Alerts on regression. Comparison between model versions. Sliced analysis by input type, length, or category.
Task Format in lm-evaluation-harness
Understanding how lm-evaluation-harness represents tasks is essential for adding custom tasks. Each task inherits from the Task base class and defines:
dataset_path: where to load data from (HuggingFace datasets or local path)doc_to_text: converts a dataset example to an input prompt stringdoc_to_target: extracts the expected output from an exampledoc_to_choice: for multiple choice tasks, returns the list of choicesprocess_results: converts raw model outputs into task-specific scoresaggregation: how to aggregate per-example scores into a single metric valuehigher_is_better: whether higher metric values are betterfewshot_examples: optionally specify fixed few-shot examples
This structure enforces a clean separation. You can change the underlying dataset format, prompt template, or metric without changing any model code.
Few-Shot Templating
Few-shot prompting in lm-evaluation-harness follows a specific pattern. For a -shot evaluation:
- Retrieve examples from a few-shot pool (separate from the test set)
- Format each example using
doc_to_text+doc_to_target - Concatenate them with a separator
- Append the test example's
doc_to_textoutput (without the target) - Run the model and compare the generated output to
doc_to_target
The default separator is \n\n between examples. The few-shot pool is deterministically sampled using a fixed seed, ensuring that the same examples are used across all runs of a given task, which is critical for reproducibility.
The number of few-shot examples is specified at the harness level, not in the task definition. This allows you to evaluate the same task at 0-shot, 1-shot, 3-shot, and 5-shot without modifying the task code.
Metric Computation and Statistical Significance
For a benchmark with examples, the expected accuracy is where is correctness on example . The standard error is:
A 95% confidence interval is approximately .
This matters for regression detection. If your benchmark has 500 examples and a model scores 72.4%, the 95% CI is approximately . A subsequent run scoring 70.1% is not necessarily a regression - it is within the confidence interval of the original score. A run scoring 66.0% is clearly outside the CI and should trigger an alert.
For small task sets (under 200 examples), confidence intervals are wide enough that individual-run comparisons are unreliable. Use bootstrap confidence intervals instead:
where and are the 2.5th and 97.5th percentiles of the bootstrap distribution of accuracy over 1000 resamples.
Evaluation at Scale
Running evaluation on large benchmarks or large models requires engineering beyond a simple inference loop:
Batching: process multiple prompts in a single forward pass. The optimal batch size balances GPU memory against throughput. For generation tasks, different prompt lengths in the same batch waste compute due to padding - sort prompts by length before batching.
Parallelization: for multi-GPU setups or distributed evaluation, parallelize across shards of the evaluation dataset. Each shard runs independently; results are aggregated at the end.
Caching: store model completions keyed by (model_hash, prompt_hash, parameters). If you re-run the same evaluation with a bug fix in the metric computation but the same model, you can restore cached completions instead of re-running expensive inference.
Timeout handling: set per-example timeouts. A model stuck in a generation loop (infinite repetition) will block the entire evaluation run without timeout handling. Discard examples that exceed the timeout and flag them separately.
Code Examples
Building the Model Interface
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Union
import torch
@dataclass
class GenerationConfig:
max_new_tokens: int = 256
temperature: float = 0.0
top_p: float = 1.0
do_sample: bool = False
stop_sequences: list[str] = None
timeout_seconds: float = 30.0
class ModelInterface(ABC):
"""
Abstract model interface. All backends (local HF, vLLM, API) implement this.
The rest of the harness depends only on this interface.
"""
@abstractmethod
def generate(
self,
prompts: list[str],
config: GenerationConfig,
) -> list[str]:
"""Generate completions for a batch of prompts."""
...
@abstractmethod
def logprob(
self,
prompt: str,
continuation: str,
) -> float:
"""Return log-probability of continuation given prompt."""
...
@property
@abstractmethod
def model_id(self) -> str:
"""Unique identifier for this model version."""
...
class HuggingFaceModel(ModelInterface):
"""Local HuggingFace model backend."""
def __init__(
self,
model_name_or_path: str,
device: str = "auto",
dtype: str = "bfloat16",
max_memory: Optional[dict] = None,
):
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
self._model_id = model_name_or_path
dtype_map = {
"bfloat16": torch.bfloat16,
"float16": torch.float16,
"float32": torch.float32,
}
self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
self.model = AutoModelForCausalLM.from_pretrained(
model_name_or_path,
torch_dtype=dtype_map.get(dtype, torch.bfloat16),
device_map=device,
max_memory=max_memory,
)
self.model.eval()
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
@property
def model_id(self) -> str:
return self._model_id
def generate(
self,
prompts: list[str],
config: GenerationConfig,
) -> list[str]:
inputs = self.tokenizer(
prompts,
return_tensors="pt",
padding=True,
truncation=True,
).to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=config.max_new_tokens,
temperature=config.temperature if config.do_sample else 1.0,
top_p=config.top_p if config.do_sample else 1.0,
do_sample=config.do_sample,
pad_token_id=self.tokenizer.eos_token_id,
)
# Decode only the generated tokens (not the prompt)
prompt_lengths = inputs["input_ids"].shape[1]
completions = [
self.tokenizer.decode(
outputs[i][prompt_lengths:],
skip_special_tokens=True
)
for i in range(len(prompts))
]
return completions
def logprob(self, prompt: str, continuation: str) -> float:
full_text = prompt + continuation
inputs = self.tokenizer(full_text, return_tensors="pt").to(self.model.device)
prompt_inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
prompt_len = prompt_inputs["input_ids"].shape[1]
with torch.no_grad():
outputs = self.model(**inputs, labels=inputs["input_ids"])
# Extract per-token logprobs for the continuation only
logits = outputs.logits[0, prompt_len - 1:-1] # Shift for next-token prediction
targets = inputs["input_ids"][0, prompt_len:]
log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
token_log_probs = log_probs[torch.arange(len(targets)), targets]
return token_log_probs.sum().item()
class VLLMModel(ModelInterface):
"""vLLM inference server backend (OpenAI-compatible API)."""
def __init__(self, base_url: str, model_name: str, api_key: str = "EMPTY"):
import openai
self._model_id = model_name
self.client = openai.OpenAI(base_url=base_url, api_key=api_key)
self.model_name = model_name
@property
def model_id(self) -> str:
return self._model_id
def generate(self, prompts: list[str], config: GenerationConfig) -> list[str]:
# vLLM supports batch via multiple API calls; parallelize for throughput
import concurrent.futures
def _single(prompt):
resp = self.client.completions.create(
model=self.model_name,
prompt=prompt,
max_tokens=config.max_new_tokens,
temperature=config.temperature,
top_p=config.top_p,
stop=config.stop_sequences,
)
return resp.choices[0].text
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
results = list(executor.map(_single, prompts))
return results
def logprob(self, prompt: str, continuation: str) -> float:
resp = self.client.completions.create(
model=self.model_name,
prompt=prompt + continuation,
max_tokens=0,
echo=True,
logprobs=1,
)
# Sum logprobs for continuation tokens only
tokens = resp.choices[0].logprobs.tokens
log_probs = resp.choices[0].logprobs.token_logprobs
prompt_token_count = len(self.client.completions.create(
model=self.model_name, prompt=prompt, max_tokens=0, echo=True, logprobs=1
).choices[0].logprobs.tokens)
return sum(lp for lp in log_probs[prompt_token_count:] if lp is not None)
Implementing the Task Registry
import yaml
import hashlib
import json
from pathlib import Path
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
@dataclass
class TaskConfig:
"""Complete specification for an evaluation task."""
name: str
version: str # Semantic version; bump on breaking changes
dataset_path: str # HF dataset name or local path
dataset_split: str = "test"
prompt_template: str = "" # Jinja2 template or Python format string
few_shot_split: Optional[str] = None
n_few_shot: int = 0
output_type: str = "generate" # "generate" or "logprob"
metric: str = "exact_match" # Metric function name
metric_kwargs: dict = field(default_factory=dict)
pass_threshold: Optional[float] = None # Minimum acceptable score; used for regression gates
max_examples: Optional[int] = None # Subsample for fast CI runs
generation_config: dict = field(default_factory=lambda: {
"max_new_tokens": 256,
"temperature": 0.0,
"do_sample": False,
})
class TaskRegistry:
"""
Registry of all evaluation tasks.
Tasks are registered by name and loaded from YAML configs.
"""
def __init__(self, config_dir: str):
self.config_dir = Path(config_dir)
self._tasks: dict[str, TaskConfig] = {}
self._load_all_configs()
def _load_all_configs(self):
for yaml_path in self.config_dir.glob("**/*.yaml"):
with open(yaml_path) as f:
raw = yaml.safe_load(f)
task = TaskConfig(**raw)
self._tasks[task.name] = task
print(f"Registered task: {task.name} v{task.version}")
def get(self, name: str) -> TaskConfig:
if name not in self._tasks:
raise KeyError(f"Task '{name}' not found in registry. Available: {list(self._tasks.keys())}")
return self._tasks[name]
def list_tasks(self, tags: Optional[list[str]] = None) -> list[str]:
return sorted(self._tasks.keys())
def task_hash(self, name: str) -> str:
"""Stable hash of task config for cache keying."""
config = self._tasks[name]
config_dict = {
"name": config.name,
"version": config.version,
"dataset_path": config.dataset_path,
"prompt_template": config.prompt_template,
"n_few_shot": config.n_few_shot,
}
return hashlib.sha256(json.dumps(config_dict, sort_keys=True).encode()).hexdigest()[:16]
Example Task YAML Configs
# tasks/customer_support_summarization.yaml
name: customer_support_summarization
version: "1.2.0"
dataset_path: internal/customer_support_eval_v3
dataset_split: test
few_shot_split: train
n_few_shot: 3
output_type: generate
metric: rouge_l
pass_threshold: 0.62
max_examples: 200
prompt_template: |
Summarize the following customer support conversation. Include:
- The customer's issue
- Steps taken to resolve it
- Resolution status
Conversation:
{{ doc.conversation }}
Summary:
generation_config:
max_new_tokens: 150
temperature: 0.0
do_sample: false
# tasks/classification_sentiment.yaml
name: classification_sentiment
version: "2.0.0"
dataset_path: stanfordnlp/sst2
dataset_split: validation
n_few_shot: 0
output_type: logprob
metric: accuracy
pass_threshold: 0.90
prompt_template: |
Classify the sentiment of this review as positive or negative.
Review: {{ doc.sentence }}
Sentiment:
The Evaluation Runner
import time
import uuid
from datetime import datetime, timezone
from typing import Optional
import numpy as np
class EvaluationRunner:
"""
Core runner: executes tasks against a model and stores results.
"""
def __init__(
self,
model: ModelInterface,
task_registry: TaskRegistry,
result_store: "ResultStore",
cache: Optional["CompletionCache"] = None,
batch_size: int = 8,
):
self.model = model
self.registry = task_registry
self.result_store = result_store
self.cache = cache
self.batch_size = batch_size
def run_task(
self,
task_name: str,
run_id: Optional[str] = None,
seed: int = 42,
) -> dict:
"""
Run a single task. Returns per-task result dict.
"""
run_id = run_id or str(uuid.uuid4())
task = self.registry.get(task_name)
config = GenerationConfig(**task.generation_config)
# Load dataset
dataset = self._load_dataset(task)
if task.max_examples:
rng = np.random.default_rng(seed)
indices = rng.choice(len(dataset), size=min(task.max_examples, len(dataset)), replace=False)
dataset = [dataset[int(i)] for i in indices]
# Load few-shot examples
few_shot_examples = self._get_few_shot_examples(task, seed=seed)
# Build prompts
prompts = [
self._build_prompt(task, doc, few_shot_examples)
for doc in dataset
]
targets = [self._extract_target(task, doc) for doc in dataset]
# Run inference (with optional caching)
t0 = time.time()
completions = self._run_inference_batched(
prompts=prompts,
targets=targets,
task=task,
config=config,
)
inference_time = time.time() - t0
# Score
metric_fn = self._get_metric_fn(task.metric)
scores = [
metric_fn(pred, target, **task.metric_kwargs)
for pred, target in zip(completions, targets)
]
aggregate_score = float(np.mean(scores))
n = len(scores)
se = float(np.sqrt(aggregate_score * (1 - aggregate_score) / n)) if n > 0 else 0.0
result = {
"run_id": run_id,
"task_name": task_name,
"task_version": task.version,
"model_id": self.model.model_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"seed": seed,
"n_examples": n,
"metric": task.metric,
"score": aggregate_score,
"standard_error": se,
"ci_95_low": aggregate_score - 1.96 * se,
"ci_95_high": aggregate_score + 1.96 * se,
"pass_threshold": task.pass_threshold,
"passed": (
aggregate_score >= task.pass_threshold
if task.pass_threshold is not None else None
),
"inference_time_seconds": inference_time,
"per_example_scores": scores,
}
self.result_store.save(result)
return result
def run_suite(
self,
task_names: list[str],
run_id: Optional[str] = None,
seed: int = 42,
) -> dict:
"""
Run a suite of tasks and produce a combined report.
"""
run_id = run_id or str(uuid.uuid4())
suite_results = {}
for task_name in task_names:
print(f"\n[EVAL] Running task: {task_name}")
try:
result = self.run_task(task_name, run_id=run_id, seed=seed)
suite_results[task_name] = result
status = "PASS" if result.get("passed") else "FAIL" if result.get("passed") is False else "OK"
print(f" {status} | {task_name}: {result['score']:.4f} +/- {result['standard_error']:.4f}")
except Exception as e:
print(f" ERROR | {task_name}: {e}")
suite_results[task_name] = {"error": str(e), "task_name": task_name}
# Overall suite summary
all_scores = [r["score"] for r in suite_results.values() if "score" in r]
all_passed = [r["passed"] for r in suite_results.values() if r.get("passed") is not None]
summary = {
"run_id": run_id,
"model_id": self.model.model_id,
"n_tasks": len(task_names),
"n_completed": len(all_scores),
"mean_score": float(np.mean(all_scores)) if all_scores else None,
"all_gates_passed": all(all_passed) if all_passed else None,
"task_results": suite_results,
}
return summary
def _run_inference_batched(self, prompts, targets, task, config):
"""Batch inference with optional cache lookup."""
completions = [None] * len(prompts)
uncached_indices = []
# Check cache
if self.cache:
for i, prompt in enumerate(prompts):
cached = self.cache.get(self.model.model_id, prompt, config)
if cached is not None:
completions[i] = cached
else:
uncached_indices.append(i)
else:
uncached_indices = list(range(len(prompts)))
# Run inference in batches for uncached examples
uncached_prompts = [prompts[i] for i in uncached_indices]
for batch_start in range(0, len(uncached_prompts), self.batch_size):
batch = uncached_prompts[batch_start:batch_start + self.batch_size]
if task.output_type == "generate":
batch_completions = self.model.generate(batch, config)
else:
# For logprob tasks, compute per-choice logprob
batch_completions = batch # Placeholder - implement per task type
for j, completion in enumerate(batch_completions):
idx = uncached_indices[batch_start + j]
completions[idx] = completion
if self.cache:
self.cache.put(self.model.model_id, prompts[idx], config, completion)
return completions
def _get_metric_fn(self, metric_name: str) -> Callable:
"""Return the scoring function for a metric name."""
metrics = {
"exact_match": lambda pred, target, **kw: (
pred.strip().lower() == target.strip().lower()
),
"contains": lambda pred, target, **kw: (
target.strip().lower() in pred.strip().lower()
),
"accuracy": lambda pred, target, **kw: (
pred.strip().lower() == target.strip().lower()
),
"rouge_l": self._rouge_l_score,
"pass_at_k": self._pass_at_k_score,
}
if metric_name not in metrics:
raise ValueError(f"Unknown metric: {metric_name}. Available: {list(metrics.keys())}")
return metrics[metric_name]
@staticmethod
def _rouge_l_score(prediction: str, reference: str, **kwargs) -> float:
try:
from rouge_score import rouge_scorer
scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)
return scorer.score(reference, prediction)["rougeL"].fmeasure
except ImportError:
# Fallback: compute LCS-based F1 manually
pred_tokens = prediction.lower().split()
ref_tokens = reference.lower().split()
if not pred_tokens or not ref_tokens:
return 0.0
# Simple token overlap as approximation
common = set(pred_tokens) & set(ref_tokens)
if not common:
return 0.0
precision = len(common) / len(pred_tokens)
recall = len(common) / len(ref_tokens)
return 2 * precision * recall / (precision + recall)
@staticmethod
def _pass_at_k_score(predictions: list[str], test_cases: list[str], k: int = 1, **kwargs) -> float:
"""pass@k for code generation - simplified version."""
# Full pass@k requires running test cases; this is a placeholder
return 0.0
def _load_dataset(self, task: TaskConfig) -> list[dict]:
"""Load dataset from HF or local path."""
try:
from datasets import load_dataset
ds = load_dataset(task.dataset_path, split=task.dataset_split)
return list(ds)
except Exception:
# Local JSON/JSONL fallback
import json
path = Path(task.dataset_path)
if path.suffix == ".jsonl":
with open(path) as f:
return [json.loads(line) for line in f]
with open(path) as f:
return json.load(f)
def _get_few_shot_examples(self, task: TaskConfig, seed: int) -> list[dict]:
if task.n_few_shot == 0 or not task.few_shot_split:
return []
try:
from datasets import load_dataset
pool = list(load_dataset(task.dataset_path, split=task.few_shot_split))
except Exception:
return []
rng = np.random.default_rng(seed)
indices = rng.choice(len(pool), size=min(task.n_few_shot, len(pool)), replace=False)
return [pool[int(i)] for i in indices]
def _build_prompt(self, task: TaskConfig, doc: dict, few_shot: list[dict]) -> str:
"""Render the prompt template with few-shot examples prepended."""
from jinja2 import Template
tmpl = Template(task.prompt_template)
few_shot_text = ""
for fs_doc in few_shot:
fs_input = tmpl.render(doc=fs_doc)
fs_target = self._extract_target(task, fs_doc)
few_shot_text += fs_input + fs_target + "\n\n"
return few_shot_text + tmpl.render(doc=doc)
def _extract_target(self, task: TaskConfig, doc: dict) -> str:
"""Extract the expected answer from a dataset example."""
# Common field names; override per task in config if needed
for field in ["answer", "label", "target", "output", "completion"]:
if field in doc:
val = doc[field]
return str(val) if not isinstance(val, str) else val
raise KeyError(f"Cannot find target field in doc keys: {list(doc.keys())}")
Result Storage and Regression Detection
import json
import sqlite3
from pathlib import Path
from typing import Optional
import numpy as np
class ResultStore:
"""
SQLite-backed result store for evaluation runs.
Supports version comparison and regression detection.
"""
def __init__(self, db_path: str):
self.db_path = Path(db_path)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS eval_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
task_name TEXT NOT NULL,
task_version TEXT NOT NULL,
model_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
seed INTEGER,
n_examples INTEGER,
metric TEXT,
score REAL,
standard_error REAL,
ci_95_low REAL,
ci_95_high REAL,
pass_threshold REAL,
passed INTEGER,
inference_time_seconds REAL,
extra_json TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_model_task ON eval_results(model_id, task_name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_run_id ON eval_results(run_id)")
def save(self, result: dict):
extra = {k: v for k, v in result.items() if k not in {
"run_id", "task_name", "task_version", "model_id", "timestamp",
"seed", "n_examples", "metric", "score", "standard_error",
"ci_95_low", "ci_95_high", "pass_threshold", "passed", "inference_time_seconds",
}}
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO eval_results (
run_id, task_name, task_version, model_id, timestamp,
seed, n_examples, metric, score, standard_error,
ci_95_low, ci_95_high, pass_threshold, passed,
inference_time_seconds, extra_json
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
result.get("run_id"), result.get("task_name"), result.get("task_version"),
result.get("model_id"), result.get("timestamp"), result.get("seed"),
result.get("n_examples"), result.get("metric"), result.get("score"),
result.get("standard_error"), result.get("ci_95_low"), result.get("ci_95_high"),
result.get("pass_threshold"), int(result.get("passed", 0) or 0),
result.get("inference_time_seconds"), json.dumps(extra),
))
def get_latest(self, model_id: str, task_name: str, n: int = 5) -> list[dict]:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM eval_results
WHERE model_id = ? AND task_name = ?
ORDER BY timestamp DESC LIMIT ?
""", (model_id, task_name, n)).fetchall()
return [dict(r) for r in rows]
def detect_regression(
self,
current_model_id: str,
baseline_model_id: str,
task_name: str,
min_delta: float = 0.02,
) -> dict:
"""
Compare current model to baseline on a task.
Returns regression report.
"""
current_rows = self.get_latest(current_model_id, task_name, n=1)
baseline_rows = self.get_latest(baseline_model_id, task_name, n=3)
if not current_rows or not baseline_rows:
return {"status": "insufficient_data", "task": task_name}
current_score = current_rows[0]["score"]
baseline_score = float(np.mean([r["score"] for r in baseline_rows]))
delta = current_score - baseline_score
# Statistical significance check using current run's CI
current_ci_low = current_rows[0].get("ci_95_low", current_score)
is_regression = (
delta < -min_delta and
current_ci_low < baseline_score # CI doesn't overlap with baseline
)
return {
"task": task_name,
"current_score": current_score,
"baseline_score": baseline_score,
"delta": delta,
"regression": is_regression,
"status": "REGRESSION" if is_regression else "OK",
}
CI/CD Integration - GitHub Actions Workflow
# .github/workflows/eval.yml
name: Model Evaluation
on:
push:
paths:
- "checkpoints/**"
- "model_configs/**"
workflow_dispatch:
inputs:
model_path:
description: "Path or HF name of model to evaluate"
required: true
suite:
description: "Evaluation suite name"
default: "core"
jobs:
evaluate:
runs-on: [self-hosted, gpu]
timeout-minutes: 120
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install -r requirements-eval.txt
- name: Run evaluation suite
env:
MODEL_PATH: ${{ github.event.inputs.model_path || 'checkpoints/latest' }}
EVAL_SUITE: ${{ github.event.inputs.suite || 'core' }}
WANDB_API_KEY: ${{ secrets.WANDB_API_KEY }}
BASELINE_MODEL: ${{ vars.BASELINE_MODEL_ID }}
run: |
python scripts/run_evaluation.py \
--model "$MODEL_PATH" \
--suite "$EVAL_SUITE" \
--baseline "$BASELINE_MODEL" \
--output-dir eval_results/ \
--wandb-project "model-evals"
- name: Check regression gates
run: |
python scripts/check_gates.py \
--results-dir eval_results/ \
--fail-on-regression
- name: Upload evaluation results
uses: actions/upload-artifact@v4
if: always()
with:
name: eval-results-${{ github.sha }}
path: eval_results/
- name: Comment results on PR
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const summary = JSON.parse(fs.readFileSync('eval_results/summary.json'));
const body = `## Evaluation Results\n\n` +
`Model: \`${summary.model_id}\`\n\n` +
`| Task | Score | Baseline | Delta | Status |\n` +
`|------|-------|----------|-------|--------|\n` +
summary.tasks.map(t =>
`| ${t.name} | ${t.score.toFixed(4)} | ${t.baseline.toFixed(4)} | ` +
`${(t.score - t.baseline >= 0 ? '+' : '')}${(t.score - t.baseline).toFixed(4)} | ` +
`${t.passed ? 'PASS' : 'FAIL'} |`
).join('\n');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: body
});
WandB Integration for Result Tracking
import wandb
from typing import Optional
class WandBEvalTracker:
"""
Logs evaluation results to Weights and Biases.
Enables trend dashboards and regression alerts.
"""
def __init__(
self,
project: str,
entity: Optional[str] = None,
tags: Optional[list[str]] = None,
):
self.project = project
self.entity = entity
self.default_tags = tags or []
def log_suite_results(
self,
suite_results: dict,
model_id: str,
commit_sha: Optional[str] = None,
extra_metadata: Optional[dict] = None,
):
"""Log a complete evaluation suite run to WandB."""
run = wandb.init(
project=self.project,
entity=self.entity,
name=f"eval-{model_id[:20]}-{commit_sha[:8] if commit_sha else 'manual'}",
tags=self.default_tags + (["regression-check"] if commit_sha else []),
config={
"model_id": model_id,
"commit_sha": commit_sha,
**(extra_metadata or {}),
},
)
task_results = suite_results.get("task_results", {})
# Log per-task metrics as WandB summary
summary_metrics = {}
for task_name, result in task_results.items():
if "score" in result:
summary_metrics[f"eval/{task_name}/score"] = result["score"]
summary_metrics[f"eval/{task_name}/ci_low"] = result.get("ci_95_low")
summary_metrics[f"eval/{task_name}/ci_high"] = result.get("ci_95_high")
summary_metrics[f"eval/{task_name}/passed"] = int(result.get("passed", 0) or 0)
summary_metrics["eval/mean_score"] = suite_results.get("mean_score")
summary_metrics["eval/all_gates_passed"] = int(
suite_results.get("all_gates_passed", False) or False
)
wandb.log(summary_metrics)
# Log results table for detailed analysis
results_table = wandb.Table(
columns=["task", "score", "baseline_delta", "passed", "n_examples"]
)
for task_name, result in task_results.items():
if "score" in result:
results_table.add_data(
task_name,
result["score"],
result.get("delta_from_baseline", None),
result.get("passed"),
result.get("n_examples"),
)
wandb.log({"eval_results_table": results_table})
run.finish()
return run.id
Mermaid Diagrams
Evaluation Harness Architecture
Custom Task Integration into lm-eval-harness
Regression Detection Decision Flow
Production Engineering Notes
Seed Control and Determinism
Reproducible evaluation requires controlling every source of randomness:
-
Model generation: always use temperature=0 and greedy decoding for evaluation. Sampling introduces run-to-run variance that makes regression detection unreliable. If your application requires non-zero temperature, evaluate with temperature=0 and benchmark separately at your application temperature.
-
Dataset sampling: when subsampling a large dataset for fast CI runs, use a fixed numpy seed and store the seed with the result. The same seed must produce the same subsample across all Python versions and numpy versions - use
numpy.random.default_rng(seed)(not the legacynumpy.random.seed) for consistency. -
Few-shot selection: few-shot examples must be selected deterministically from a fixed pool. Varying the few-shot examples can shift accuracy by several points on some tasks, masking or creating apparent regressions.
-
Batch ordering: the order in which examples are processed should not affect scores for a correctly implemented harness, but batching can affect padding which can affect outputs for some models. Use consistent batch sizes across runs.
Evaluation Dataset Versioning
Your evaluation datasets are as important as your model weights. Version them explicitly:
- Store dataset versions in the task YAML as
dataset_version: "v2.1.0" - Never modify an existing evaluation dataset in place - create a new version
- Store the exact examples used in each evaluation run (or a hash that allows reconstruction)
- When you update an evaluation dataset (fix label errors, add examples, change format), increment the version and document what changed
A common mistake: updating the evaluation dataset to fix label errors, running evaluation on the new dataset, and interpreting the score change as a model improvement. The score change is partly a dataset change. Version explicitly to avoid this confusion.
Batching Strategy for Maximum Throughput
Naive batching pads all prompts in a batch to the length of the longest prompt. For evaluation datasets with variable-length inputs, this wastes significant compute. Length-sorted batching:
- Sort all prompts by token length
- Process sorted prompts in batches of size
- Within each batch, all prompts are similar in length, minimizing padding waste
This typically improves throughput by 30-50% compared to random-order batching with the same batch size. For very large evaluation runs (50k+ examples), this optimization is worth implementing.
Running lm-evaluation-harness
The standard way to run lm-evaluation-harness for academic benchmarks:
# Install
pip install lm-eval
# Run a standard benchmark suite
lm_eval \
--model hf \
--model_args pretrained=mistralai/Mistral-7B-v0.1,dtype=bfloat16 \
--tasks mmlu,hellaswag,arc_challenge,truthfulqa \
--num_fewshot 5 \
--batch_size auto \
--output_path results/mistral-7b-baseline \
--log_samples
# Add a custom task
lm_eval \
--model hf \
--model_args pretrained=your-finetuned-model \
--tasks your_custom_task \
--include_path ./custom_tasks/ \
--num_fewshot 3 \
--output_path results/your-model
The --log_samples flag stores per-example inputs and outputs, which is essential for debugging failures and for building the custom analysis you need beyond aggregate scores.
Sliced Analysis
Aggregate metrics hide important patterns. Always compute sliced metrics:
- By input length: does performance degrade on longer inputs?
- By difficulty tier: if your task has difficulty labels, does regression affect only hard examples?
- By category: for multi-topic tasks, which topics are affected?
- By date: for tasks with temporal information, is there a cutoff date effect?
Sliced analysis often reveals that a model regression is concentrated in a specific subset rather than uniformly distributed. "The model regressed on customer support tickets about billing issues but not on technical troubleshooting" is actionable. "The model regressed by 2.1 points on average" is not.
Common Mistakes
:::danger Evaluating at temperature > 0
Running evaluation with sampling (temperature > 0, do_sample=True) makes your results non-reproducible and your regression detection unreliable. The same model on the same input can score differently on two consecutive runs due to sampling randomness. This variance can easily exceed the delta from a genuine regression, making the regression invisible.
Always evaluate with temperature=0 and greedy decoding. If you need to measure performance at your application's sampling temperature, run separate benchmarks for it and document that they are sampling-temperature benchmarks, not deterministic evaluation benchmarks. Never mix the two in your regression detection system.
:::
:::danger Updating the Baseline Without Documentation
The baseline model is the anchor for all regression detection. When you promote a new model to become the baseline, every subsequent model will be compared to it. If you do this silently (just updating a config variable), you lose the ability to compare across the transition point.
Always: (1) document why a new model became the baseline, (2) run the old baseline and new baseline against all tasks simultaneously and store both, (3) keep the old baseline accessible in the result store for at least 90 days. This preserves the ability to answer "did model X regress relative to where we were three months ago."
:::
:::warning Using a Single Run for Regression Detection
A single evaluation run at temperature=0 is deterministic for the model, but your evaluation dataset may be small enough that sampling variance in the subsample matters. A task with 200 examples and a standard error of 3.4% can appear to regress by 2 points just because a different random subsample was drawn.
For critical tasks, run evaluation on the full dataset (no subsampling). For tasks where you must subsample, run at least 3 independent subsamples with different seeds and average the results before comparing to baseline.
:::
:::warning Ignoring Latency and Throughput in Evaluation
Evaluation harnesses typically measure accuracy metrics only. But in production, a model that is 15% more accurate and 4x slower may be a worse choice than the baseline for latency-sensitive applications.
Include inference time per example in your evaluation results. Track p50, p95, and p99 latency, not just mean. A model that adds 300ms of p95 latency may be unacceptable even if accuracy improves. Include throughput (examples/second) and cost per example if you are using API models. These are production metrics, not academic ones, and your evaluation harness should treat them as first-class outputs.
:::
Interview Q&A
Q1: Walk me through how you would design an evaluation harness for a production LLM application. What are the key components and how do they interact?
A production evaluation harness has five components that fit together in a specific way.
The model interface is the abstraction layer. It exposes exactly two methods: generate(prompts) -> completions and logprob(prompt, continuation) -> float. Every backend - local HuggingFace model, vLLM server, OpenAI API, custom endpoint - implements this interface. Nothing else in the harness imports model-specific code directly. This separation is critical: when you add a new serving infrastructure, you write one new class and the rest of the harness works unchanged.
The task registry is a catalog of evaluation tasks defined in YAML. Each task specifies where to load data, how to format prompts, which metric to use, and what score threshold constitutes a pass. Tasks are versioned: when you update a task, the old version is still accessible for historical comparison. The registry provides the task to the runner; the runner does not need to know where data comes from.
The evaluation runner executes tasks against the model. It handles batching, caching, timeout management, and calls the metric library. It emits result records with per-example scores, aggregate metrics, and confidence intervals. It does not make decisions about whether a result is a regression - that is the regression detector's job.
The result store persists every evaluation run. It supports querying: "give me all runs for model X on task Y in the last 30 days." This is the foundation of regression detection and trend analysis.
The reporting layer includes the regression detector (compares current run against baseline using statistical tests) and dashboards (WandB or similar) that visualize trends over time. Automated regression checks run in CI; the dashboard is for human review.
These components interact linearly during a CI run: new checkpoint triggers the runner, which uses the task registry and model interface to produce results, which are stored and then passed to the regression detector, which either blocks or approves deployment.
Q2: How do you add a custom task to lm-evaluation-harness? What files do you need to create?
Adding a custom task to lm-evaluation-harness requires two things: a YAML configuration file and optionally a Python file for non-standard metric computation.
The YAML file defines the task data and prompt format. At minimum it needs: task (name), dataset_path (HF dataset or local), doc_to_text (a Jinja2 template that formats a single example as input), doc_to_target (a Jinja2 template or field name for the expected output), metric_list (which metrics to compute), and output_type (either generate_until for generation tasks or multiple_choice for MCQ).
For a generation task, the YAML looks like:
task: my_summarization_task
dataset_path: my_org/my_dataset
doc_to_text: "Summarize this article:\n{{doc['article']}}\nSummary:"
doc_to_target: "{{doc['summary']}}"
metric_list:
- metric: rouge
aggregation: mean
higher_is_better: true
output_type: generate_until
until: ["\n\n"]
You run this with lm_eval --tasks my_summarization_task --include_path ./my_tasks_dir/.
If your metric is not in the default metrics library (ROUGE, exact match, F1 are all built in), you add a Python file alongside the YAML that defines a process_results function. This function takes the model's output and the expected output and returns a dict of metric values. Register it by referencing the function name in the YAML.
The most common pitfall is the doc_to_text template. Make sure it exactly reproduces the format your model was trained on. A mismatch between training format and evaluation format can suppress accuracy by 5-15 points on instruction-tuned models, creating a false negative in your evaluation.
Q3: How do you implement regression detection that is statistically rigorous? What are the pitfalls of naive regression detection?
Naive regression detection compares two scalar values: "old model scored 0.724, new model scored 0.703, that's a decrease." The problem is that evaluation scores have uncertainty. With 200 examples and 72.4% accuracy, the 95% confidence interval is approximately plus or minus 3.9 percentage points. The new score of 70.3% is within that interval - you cannot conclude there is a genuine regression.
Rigorous regression detection uses the confidence interval of the current run to decide. If the lower bound of the current run's 95% CI is above the baseline score, the current model is confidently not worse. If the upper bound is below the baseline, it is confidently worse. If the CI straddles the baseline, the result is inconclusive.
To reduce uncertainty, I recommend: (1) evaluate on the full task dataset, not a subsample, wherever feasible; (2) for unavoidable subsampling, use bootstrap confidence intervals rather than normal approximation; (3) use multiple baseline runs (last 3-5 evaluations) and average the baseline score to reduce its variance.
The pitfalls of naive regression detection: (1) false positives from statistical noise blocking valid deployments; (2) false negatives from averaging over multiple runs where one compensates for another; (3) ignoring that regression thresholds should be task-specific - a 2-point drop on a 5-class classification task might be noise, while a 2-point drop on a binary safety classifier might be critical; (4) treating all tasks equally when some tasks are P0 (block deployment) and others are P2 (flag for review but do not block).
Q4: Your evaluation pipeline runs for 4 hours and becomes a bottleneck. How do you speed it up without sacrificing reliability?
Four hours of evaluation typically breaks down as: most of the time is inference, a small amount is data loading and metric computation. Speeding it up requires targeting inference.
First, batching optimization. Sort examples by input length before batching to minimize padding waste. This alone often cuts inference time by 30-50%. Use batch_size=auto in lm-eval or implement adaptive batching that finds the largest batch size that fits in GPU memory.
Second, task parallelization. If you have multiple GPUs or multiple evaluation nodes, distribute tasks across them. Tasks are embarrassingly parallel - task A and task B do not depend on each other. With 4 GPUs, you can run 4 tasks simultaneously and cut wall-clock time by 4x.
Third, subsampling with coverage guarantees. Instead of running every task on its full dataset, define two tiers: (1) "fast CI" tasks - subsampled to 200 examples, run on every PR, complete in under 20 minutes; (2) "full eval" tasks - run on the complete dataset nightly or before major releases. This is the most impactful single change for improving CI velocity.
Fourth, completion caching. Cache model outputs keyed by (model_hash, prompt_hash, generation_config). If you re-run evaluation after fixing a metric bug, you can restore cached completions and recompute metrics without re-running inference. This pays off immediately when you iterate on metrics or thresholds.
Fifth, async inference for API-backed models. If you are evaluating via an OpenAI-compatible API, use async HTTP requests instead of sequential calls. 16-32 concurrent requests to a vLLM server can saturate the GPU and cut API evaluation time by 10-20x.
The combination of length-sorted batching, task parallelization across 4 GPUs, and fast CI subsampling typically reduces a 4-hour evaluation suite to under 30 minutes for regular PRs, with the full suite remaining available for release gates.
Q5: How do you build an evaluation harness that works for both open-source local models and API-served models like GPT-4 or Claude?
The key is the model interface abstraction. Define an interface with two methods: generate(prompts: list[str], config: GenerationConfig) -> list[str] and logprob(prompt: str, continuation: str) -> float. The rest of the harness calls only these methods and never imports anything model-specific.
For local HuggingFace models, the implementation uses AutoModelForCausalLM.generate(). For vLLM, it calls the /v1/completions endpoint. For OpenAI-compatible APIs, it uses the OpenAI Python client. Each is a separate class; they are completely independent.
The logprob method is the trickier part for API models. OpenAI's completion API supports log probability retrieval with echo=True and logprobs=1. The Anthropic API does not support logprob retrieval at all. This means multiple-choice tasks implemented as logprob comparisons will not work for Claude. For API models with no logprob support, you need to implement a fallback: re-cast the task as a generation task (generate a choice letter and check if it matches) rather than a logprob comparison.
The practical implication: when writing custom tasks, prefer generation-based metrics over logprob-based metrics. They work with every backend. Reserve logprob tasks for local models where you have direct access to logits.
One additional consideration for API models: rate limiting and cost. The runner should support configurable request rate limits, retry with backoff on 429 errors, and estimate cost before running. For a task with 2000 examples averaging 500 input tokens and 100 output tokens each, a GPT-4 evaluation costs approximately 10 million tokens - around $150 at current pricing. Estimate before running so there are no billing surprises.
Q6: What does "evaluation reproducibility" actually mean in practice, and how do you achieve it?
Reproducibility in evaluation means that two separate runs of the evaluation harness on the same model checkpoint produce identical results. In practice it means controlling six things.
First, the model is identical: same weights, same quantization, same precision. A checkpoint loaded in bfloat16 and the same checkpoint loaded in float16 will produce different scores due to numerical differences. Specify dtype in the evaluation config and pin it.
Second, generation is deterministic: temperature=0, greedy decoding, fixed random seed for any stochastic elements. This is the most important control.
Third, the dataset is identical: same examples, same order, or the same seed for subsampling. Store the exact indices or the seed used to select them.
Fourth, few-shot examples are identical: same examples, in the same order. Store the few-shot pool alongside the evaluation config.
Fifth, the prompt template is versioned: a change to whitespace, capitalization, or punctuation in a prompt template can shift accuracy by several points on instruction-tuned models. Version control the template and include its hash in result records.
Sixth, the metric implementation is versioned: include the version of the metric library (rouge-score, sacrebleu, etc.) in results. A bug fix in a metric library changes scores without changing the model.
In practice, the way to enforce all of this is to include a "reproducibility bundle" with every evaluation result: the model checkpoint hash, the task YAML hash, the metric library versions (as a pip freeze snapshot), the seed, and the few-shot example indices. Given this bundle, any engineer should be able to reproduce the exact result. If they cannot, you have identified a reproducibility gap to fix.
