Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Synthetic Data Generation demo on the EngineersOfAI Playground - no code required. :::

Dataset Curation for Fine-Tuning

The 500K Problem

The team had every reason to be confident. Six months of customer support transcripts. Half a million examples. Real users. Real questions. Real resolutions. The data engineering lead had spent three weeks cleaning the raw exports - stripping HTML, normalizing encodings, converting to JSONL. It looked immaculate.

They fine-tuned Mistral-7B. The model finished training with a clean loss curve. Internal eval scores were up eleven points over the base model. Everyone celebrated.

Then a product manager ran a side experiment. She had personally curated 5,000 examples from the same ticket system - spending two weeks reading tickets, rewriting bad answers, removing duplicates by hand, balancing topics. She fine-tuned the same base model on her 5K dataset. Her model scored fifteen points higher than the 500K model on every benchmark they ran. Support engineers preferred her model in blind A/B tests by a two-to-one margin.

The team had made the most common mistake in fine-tuning: they treated data volume as a proxy for data quality. The 500K dataset was full of copy-paste boilerplate responses, near-duplicate tickets about the same three issues, agent answers that said "I'll escalate this" without solving anything, and examples where the user's question was never actually answered. Every one of those examples was teaching the model to be mediocre.

This lesson is about preventing that outcome. We will cover how to source data, how to detect and remove duplicates, how to score and filter for quality, and how to build a production curation pipeline. By the end, you will understand why dataset curation is not a preprocessing step - it is the most important engineering decision you make before fine-tuning.


Data Quality Is Not a Filter - It Is the Foundation

Most teams think about data quality the way they think about linting: something you run at the end to catch obvious problems. The correct mental model is the opposite. Data quality is the foundation your model's behavior is built on. Every example in your training set is a lesson. Bad lessons create bad behavior - and unlike bugs in code, bad fine-tuning behavior is hard to trace back to its source.

Why More Data Makes Things Worse

Adding low-quality data to a high-quality dataset degrades model performance. This is empirically well-established. The reason is that gradient descent optimizes over the entire dataset. If 30% of your examples teach the model to give vague non-answers, the model learns to give vague non-answers 30% of the time - or worse, it learns a compromised mixture of good and bad response styles that makes it mediocre across the board.

LIMA (Zhao et al., 2023) demonstrated this definitively: 1,000 carefully curated examples produced better instruction-following than models trained on datasets fifty times larger with lower curation standards. The signal-to-noise ratio in your dataset determines your ceiling.

Data Quality vs. Model Performance

High │ ● ← 5K curated
│ ●
│ ●
Perf │ ●
│ ●
│ ●
Low │●
└──────────────────────────────
0 100K 300K 500K
Dataset Size

Same base model, same training config.
The only variable: curation quality.

The Curation Pipeline Overview

Every stage removes noise. The goal is not to keep as many examples as possible - it is to keep only the examples that teach the behavior you want.


Data Sources: Where Fine-Tuning Data Comes From

Before you can curate, you need data. The three primary sources each have distinct characteristics, failure modes, and use cases.

Source Comparison Table

SourceQuality CeilingCostScaleMain Risk
Human-written (expert)HighestVery high ($20–100/example)Low (thousands)Inconsistency between annotators
Human-written (crowd)MediumMedium ($1–5/example)Medium (tens of thousands)Variable quality, gaming
Synthetic (teacher LLM)High if curatedLow ($0.001–0.01/example)UnlimitedStyle collapse, hallucination
Web-scrapedWildly variableNear zeroBillionsNoise, toxicity, duplication
Existing logs (user data)Depends on productNear zeroLargePrivacy, boilerplate, irrelevance

Human-Written Data

Expert-written data sets the quality ceiling for your dataset. When a domain expert writes an ideal response to a well-formed question, that example teaches the model exactly what good looks like. This is why LIMA, Alpaca, and OpenAssistant all invested heavily in expert curation even at small scale.

The failure modes are human: annotators disagree on what "good" means, they get tired and write shorter answers over time, they interpret instructions differently. Mitigation requires detailed annotation guidelines, inter-annotator agreement checks, and calibration sessions.

When to use: always include some expert examples as anchors. They set the quality bar that your filtering pipeline should aspire to match.

Synthetic Data

Synthetic data from a teacher LLM (GPT-4, Claude, Gemini) is the most practical path to scale. You design prompts that specify exactly what you want, and the teacher model generates examples. The quality is bounded by the teacher model's capability and your prompt design.

The failure modes are subtle. Teacher models have their own stylistic biases - Claude tends toward structured responses with headers, GPT-4 tends toward numbered lists. If you train exclusively on synthetic data, your student model inherits those biases whether you want them or not. More dangerously, synthetic data can hallucinate plausible-sounding but incorrect facts, and those facts get baked into your fine-tuned model with high confidence.

warning

Never use synthetic data without factual verification for knowledge-intensive domains (medicine, law, finance, security). LLM-generated content sounds authoritative even when wrong. Build in a factual spot-check layer before including synthetic examples in your training set.

When to use: synthetic data works well for format compliance (output structure, length), style transfer, and augmenting rare categories. Use a strong teacher model and always filter the output.

Web-Scraped Data

Web scraping can produce enormous datasets cheaply, but the quality distribution is extremely wide. Most text on the internet is not instruction-following data. You need both domain filtering (only include content relevant to your task) and quality filtering (remove low-quality text within that domain).

Common Quality Signals for Web Data:

  • Perplexity score: high perplexity on a reference LM = unusual/noisy text
  • Length distribution: too short = fragment; too long = padding
  • Repetition ratio: repeated n-grams = boilerplate or spam
  • Language detection: wrong language for your target domain
  • URL source quality: known low-quality domains

When to use: only with aggressive filtering. Web data is a starting point, not a destination.


Deduplication: The Most Underrated Step

Deduplication is routinely underestimated. Teams remove exact duplicates and consider the job done. But near-duplicates - examples that differ by a few words, by paraphrase, by minor formatting - are just as harmful. They bias the model toward overlearning the duplicated patterns while underlearning everything else.

Consider a customer support dataset where 40,000 of 100,000 examples are variations of "How do I reset my password?" with slightly different phrasing. Even after removing exact duplicates, you have 35,000 near-duplicate password reset examples. Your fine-tuned model will be excellent at password resets and mediocre at everything else - because that is what you trained it on.

Exact Deduplication

Exact deduplication is straightforward: normalize text (lowercase, strip whitespace, remove punctuation), compute SHA-256, group by hash. Keep one example from each group.

import hashlib
from collections import defaultdict


def compute_hash(text: str) -> str:
normalized = " ".join(text.lower().strip().split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()


def exact_dedup(examples: list[dict]) -> list[dict]:
seen = set()
unique = []
duplicates = 0

for ex in examples:
# Hash on the full conversation content
content = ex.get("input", "") + ex.get("output", "")
h = compute_hash(content)

if h not in seen:
seen.add(h)
unique.append(ex)
else:
duplicates += 1

print(f"Exact dedup: {len(examples)}{len(unique)} ({duplicates} removed)")
return unique

Near-Duplicate Detection with MinHash

MinHash (Locality Sensitive Hashing) detects near-duplicates efficiently without comparing every pair of documents. It works by computing a compact signature for each document based on its n-gram shingles, then grouping documents with similar signatures into buckets. Documents in the same bucket are candidate duplicates.

MinHash scales to millions of examples with sub-linear time complexity. It is the right tool when exact deduplication is not enough.

import re
from collections import defaultdict
from dataclasses import dataclass, field

import numpy as np


def get_shingles(text: str, k: int = 5) -> set[str]:
"""Extract character k-grams from text."""
normalized = re.sub(r"\s+", " ", text.lower().strip())
return {normalized[i : i + k] for i in range(len(normalized) - k + 1)}


def minhash_signature(shingles: set[str], num_hashes: int = 128) -> np.ndarray:
"""Compute MinHash signature for a set of shingles."""
# Use random hash functions: h(x) = (a*x + b) % large_prime
rng = np.random.default_rng(seed=42)
large_prime = (1 << 31) - 1
a = rng.integers(1, large_prime, size=num_hashes)
b = rng.integers(0, large_prime, size=num_hashes)

sig = np.full(num_hashes, np.inf)
for shingle in shingles:
h = hash(shingle) % large_prime
values = (a * h + b) % large_prime
sig = np.minimum(sig, values)

return sig.astype(np.int64)


def lsh_buckets(
signatures: list[np.ndarray], num_bands: int = 16
) -> dict[tuple, list[int]]:
"""Group document indices into LSH buckets by band."""
num_hashes = signatures[0].shape[0]
rows_per_band = num_hashes // num_bands
buckets: dict[tuple, list[int]] = defaultdict(list)

for doc_idx, sig in enumerate(signatures):
for band in range(num_bands):
start = band * rows_per_band
end = start + rows_per_band
band_key = (band, tuple(sig[start:end]))
buckets[band_key].append(doc_idx)

return buckets


def minhash_dedup(
examples: list[dict],
similarity_threshold: float = 0.8,
num_hashes: int = 128,
num_bands: int = 16,
) -> list[dict]:
"""Remove near-duplicates using MinHash LSH."""
print(f"Computing MinHash signatures for {len(examples)} examples...")

texts = [ex.get("input", "") + " " + ex.get("output", "") for ex in examples]
shingle_sets = [get_shingles(t) for t in texts]
signatures = [minhash_signature(s, num_hashes) for s in shingle_sets]

buckets = lsh_buckets(signatures, num_bands)

# Find connected components of near-duplicate groups
parent = list(range(len(examples)))

def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x

def union(x: int, y: int) -> None:
px, py = find(x), find(y)
if px != py:
parent[px] = py

for bucket_members in buckets.values():
if len(bucket_members) < 2:
continue
# Verify Jaccard similarity for bucket candidates
for i in range(len(bucket_members)):
for j in range(i + 1, len(bucket_members)):
a, b = bucket_members[i], bucket_members[j]
sig_a, sig_b = signatures[a], signatures[b]
jaccard_est = np.mean(sig_a == sig_b)
if jaccard_est >= similarity_threshold:
union(a, b)

# Keep one example per connected component
kept = {}
for idx in range(len(examples)):
root = find(idx)
if root not in kept:
kept[root] = idx

unique_indices = sorted(kept.values())
removed = len(examples) - len(unique_indices)
print(f"MinHash dedup: {len(examples)}{len(unique_indices)} ({removed} near-duplicates removed)")

return [examples[i] for i in unique_indices]

Embedding-Based Near-Deduplication

For smaller datasets (up to ~100K examples) or when you need higher precision than MinHash, embedding-based similarity detection is more accurate. Compute embeddings for each example, then cluster or threshold by cosine similarity.

from typing import Optional

import numpy as np


def cosine_similarity_matrix(embeddings: np.ndarray) -> np.ndarray:
"""Compute pairwise cosine similarity matrix."""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / (norms + 1e-8)
return normalized @ normalized.T


def embedding_dedup(
examples: list[dict],
embeddings: np.ndarray,
similarity_threshold: float = 0.92,
batch_size: int = 1000,
) -> list[dict]:
"""
Remove near-duplicates based on embedding cosine similarity.

For large datasets, processes in batches to avoid OOM.
Keeps the first example when a duplicate pair is found.
"""
n = len(examples)
removed = set()

# Process in batches to manage memory
for start in range(0, n, batch_size):
end = min(start + batch_size, n)
batch_embeddings = embeddings[start:end]

# Compare batch against all subsequent examples
for i in range(len(batch_embeddings)):
global_i = start + i
if global_i in removed:
continue

# Compare with rest of batch
for j in range(i + 1, len(batch_embeddings)):
global_j = start + j
if global_j in removed:
continue

sim = float(
np.dot(batch_embeddings[i], batch_embeddings[j])
/ (
np.linalg.norm(batch_embeddings[i])
* np.linalg.norm(batch_embeddings[j])
+ 1e-8
)
)
if sim >= similarity_threshold:
removed.add(global_j)

unique = [ex for idx, ex in enumerate(examples) if idx not in removed]
print(
f"Embedding dedup: {n}{len(unique)} "
f"({len(removed)} near-duplicates removed, threshold={similarity_threshold})"
)
return unique
tip

For production pipelines on millions of examples, use FAISS approximate nearest neighbor search instead of brute-force cosine similarity. FAISS can process 10M+ embeddings in minutes on a single GPU.


Quality Filtering Pipeline

After deduplication, you filter for quality. Quality filtering happens in two layers: fast heuristic filters that remove obvious problems in milliseconds, and slow LLM-as-judge scoring that evaluates subtle quality dimensions.

Layer 1: Heuristic Filters

Heuristic filters are rules derived from domain knowledge about what bad examples look like. They are fast (microseconds per example), cheap, and should run first.

FilterWhat It CatchesThreshold (typical)
Minimum lengthFragments, one-word answersInput: 20 tokens, Output: 10 tokens
Maximum lengthPadded/bloated responsesInput: 2048 tokens, Output: 1024 tokens
Repetition ratioCopy-paste boilerplate, stuck loopsLess than 15% repeated bigrams
Language IDWrong-language examplesConfidence > 0.95 for target language
PII patternsSSNs, credit cards, emails, phone numbersZero tolerance
Toxic contentHate speech, explicit contentScore < 0.1
Completion ratioTruncated responsesOutput length / expected length > 0.8
Format complianceMissing required fields, malformed JSONSchema validation pass
import re
from typing import Optional


def check_length(
example: dict,
min_input_tokens: int = 20,
max_input_tokens: int = 2048,
min_output_tokens: int = 10,
max_output_tokens: int = 1024,
) -> tuple[bool, Optional[str]]:
"""Rough token length check (whitespace split as proxy)."""
input_len = len(example.get("input", "").split())
output_len = len(example.get("output", "").split())

if input_len < min_input_tokens:
return False, f"input too short ({input_len} tokens)"
if input_len > max_input_tokens:
return False, f"input too long ({input_len} tokens)"
if output_len < min_output_tokens:
return False, f"output too short ({output_len} tokens)"
if output_len > max_output_tokens:
return False, f"output too long ({output_len} tokens)"

return True, None


def check_repetition(text: str, threshold: float = 0.15) -> tuple[bool, Optional[str]]:
"""Flag examples with excessive repeated bigrams."""
words = text.lower().split()
if len(words) < 10:
return True, None

bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)]
unique_bigrams = set(bigrams)
repetition_ratio = 1.0 - (len(unique_bigrams) / len(bigrams))

if repetition_ratio > threshold:
return False, f"high repetition ratio ({repetition_ratio:.2f})"
return True, None


def check_pii(text: str) -> tuple[bool, list[str]]:
"""Detect common PII patterns via regex."""
patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]\d{4}[\s-]\d{4}[\s-]\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
}

found = []
for pii_type, pattern in patterns.items():
if re.search(pattern, text):
found.append(pii_type)

return len(found) == 0, found


def heuristic_filter(examples: list[dict]) -> tuple[list[dict], list[dict]]:
"""
Apply all heuristic filters. Returns (passed, rejected) with rejection reasons.
"""
passed = []
rejected = []

for ex in examples:
rejection_reasons = []
input_text = ex.get("input", "")
output_text = ex.get("output", "")

# Length check
ok, reason = check_length(ex)
if not ok:
rejection_reasons.append(reason)

# Repetition check on output
ok, reason = check_repetition(output_text)
if not ok:
rejection_reasons.append(reason)

# PII check on full text
full_text = input_text + " " + output_text
ok, pii_types = check_pii(full_text)
if not ok:
rejection_reasons.append(f"PII detected: {', '.join(pii_types)}")

if rejection_reasons:
ex["_rejection_reasons"] = rejection_reasons
rejected.append(ex)
else:
passed.append(ex)

print(
f"Heuristic filter: {len(examples)}{len(passed)} passed, "
f"{len(rejected)} rejected"
)
return passed, rejected

Layer 2: LLM-as-Judge Quality Scoring

After heuristic filtering removes obvious problems, LLM-as-judge scoring evaluates subtle quality dimensions: correctness, helpfulness, instruction-following, response quality. This is expensive - use it only on examples that pass heuristics.

The key insight from LLM-as-judge research (Zheng et al., 2023) is that strong LLMs are reliable judges of response quality, especially when you use structured scoring rubrics and force explicit reasoning before the score.

import json
import os
import time
from typing import Optional

import anthropic


QUALITY_JUDGE_PROMPT = """You are a quality evaluator for LLM fine-tuning data.
Evaluate the following example on five dimensions.

<example>
<input>{input}</input>
<output>{output}</output>
</example>

Score each dimension from 1 to 5, where:
1 = Poor (unacceptable, should be excluded)
2 = Below average (significant issues)
3 = Average (acceptable but not great)
4 = Good (clearly useful training example)
5 = Excellent (ideal training example)

Dimensions:
1. CORRECTNESS: Is the output factually accurate and free of errors?
2. HELPFULNESS: Does the output directly address what the input asks for?
3. INSTRUCTION_FOLLOWING: Does the output follow any formatting/style instructions in the input?
4. COMPLETENESS: Is the output complete, or does it truncate / leave things unfinished?
5. QUALITY: Is the output well-written, clear, and appropriate in length?

Respond with ONLY valid JSON in this exact format:
{{
"reasoning": "Brief explanation of your scores (2-3 sentences)",
"correctness": <1-5>,
"helpfulness": <1-5>,
"instruction_following": <1-5>,
"completeness": <1-5>,
"quality": <1-5>,
"overall": <1-5>,
"exclude": <true|false>
}}

Set "exclude" to true if ANY dimension scores 1, or if overall score is below 3."""


def score_example_with_llm(
example: dict,
client: anthropic.Anthropic,
model: str = "claude-haiku-4-5-20251001",
max_retries: int = 3,
) -> Optional[dict]:
"""
Score a single example using Claude as judge.

Uses claude-haiku-4-5-20251001 for cost efficiency at scale.
Returns parsed score dict or None on failure.
"""
prompt = QUALITY_JUDGE_PROMPT.format(
input=example.get("input", "")[:1500], # truncate to stay within context
output=example.get("output", "")[:1500],
)

for attempt in range(max_retries):
try:
response = client.messages.create(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
content = response.content[0].text.strip()

# Strip markdown code blocks if present
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]

scores = json.loads(content)
return scores

except json.JSONDecodeError as e:
if attempt < max_retries - 1:
time.sleep(1.0 * (attempt + 1))
continue
print(f"JSON parse error after {max_retries} attempts: {e}")
return None

except anthropic.RateLimitError:
wait = 5.0 * (attempt + 1)
print(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)

except anthropic.APIError as e:
print(f"API error: {e}")
return None

return None


def llm_judge_filter(
examples: list[dict],
min_overall_score: float = 3.5,
max_examples: Optional[int] = None,
batch_delay: float = 0.1,
) -> tuple[list[dict], list[dict]]:
"""
Run LLM-as-judge scoring on examples and filter below threshold.

Args:
examples: Pre-filtered examples to score
min_overall_score: Minimum overall score to keep (1-5 scale)
max_examples: Optional cap (useful for cost control during testing)
batch_delay: Seconds between API calls to avoid rate limits

Returns:
(passed, rejected) lists with scores attached
"""
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

if max_examples:
examples = examples[:max_examples]

passed = []
rejected = []
failed_scoring = []

for i, ex in enumerate(examples):
if i % 100 == 0:
print(f"Scoring example {i}/{len(examples)}...")

scores = score_example_with_llm(ex, client)

if scores is None:
# Scoring failed - keep example but flag it
ex["_score_failed"] = True
failed_scoring.append(ex)
continue

ex["_scores"] = scores

overall = scores.get("overall", 0)
exclude = scores.get("exclude", False)

if exclude or overall < min_overall_score:
ex["_rejection_reason"] = f"LLM judge score {overall:.1f} < {min_overall_score}"
rejected.append(ex)
else:
passed.append(ex)

if batch_delay > 0:
time.sleep(batch_delay)

print(
f"LLM judge filter: {len(examples)} scored → "
f"{len(passed)} passed, {len(rejected)} rejected, "
f"{len(failed_scoring)} failed to score"
)
return passed, rejected
info

At Claude Haiku pricing (~0.00025per1Kinputtokens),scoring100Kexamplescostsroughly0.00025 per 1K input tokens), scoring 100K examples costs roughly 25–50 depending on example length. This is cheap insurance against training on low-quality data. Budget this into your pipeline from the start.


The Complete DataCurationPipeline

Now we assemble all the pieces into a single, end-to-end pipeline class with logging, statistics, and lineage tracking built in.

import hashlib
import json
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

import numpy as np


@dataclass
class CurationConfig:
"""Configuration for the data curation pipeline."""

# Deduplication
exact_dedup: bool = True
minhash_dedup: bool = True
minhash_similarity_threshold: float = 0.8
minhash_num_hashes: int = 128
minhash_num_bands: int = 16
embedding_dedup: bool = False
embedding_similarity_threshold: float = 0.92

# Heuristic filters
min_input_tokens: int = 20
max_input_tokens: int = 2048
min_output_tokens: int = 10
max_output_tokens: int = 1024
max_repetition_ratio: float = 0.15

# LLM judge
llm_judge: bool = True
llm_judge_model: str = "claude-haiku-4-5-20251001"
min_quality_score: float = 3.5
llm_judge_max_examples: Optional[int] = None

# Output
output_dir: str = "./curated_dataset"
dataset_name: str = "curated"
random_seed: int = 42


@dataclass
class CurationStats:
"""Tracks statistics through each pipeline stage."""

stage_counts: dict[str, int] = field(default_factory=dict)
rejection_reasons: dict[str, int] = field(default_factory=dict)
score_distribution: list[float] = field(default_factory=list)
start_time: float = field(default_factory=time.time)

def record_stage(self, stage: str, count: int) -> None:
self.stage_counts[stage] = count

def record_rejection(self, reason: str) -> None:
self.rejection_reasons[reason] = self.rejection_reasons.get(reason, 0) + 1

def elapsed(self) -> float:
return time.time() - self.start_time

def summary(self) -> dict:
return {
"stage_counts": self.stage_counts,
"top_rejection_reasons": sorted(
self.rejection_reasons.items(), key=lambda x: -x[1]
)[:10],
"score_distribution": {
"mean": float(np.mean(self.score_distribution)) if self.score_distribution else None,
"median": float(np.median(self.score_distribution)) if self.score_distribution else None,
"p25": float(np.percentile(self.score_distribution, 25)) if self.score_distribution else None,
"p75": float(np.percentile(self.score_distribution, 75)) if self.score_distribution else None,
},
"elapsed_seconds": self.elapsed(),
}


class DataCurationPipeline:
"""
End-to-end dataset curation pipeline for LLM fine-tuning.

Stages:
1. load - Load and validate raw examples
2. dedup - Exact + near-duplicate removal
3. heuristic - Fast rule-based filtering
4. quality - LLM-as-judge quality scoring
5. balance - Topic/length balancing
6. export - Save with lineage metadata
"""

def __init__(self, config: CurationConfig) -> None:
self.config = config
self.stats = CurationStats()
self.rejected: list[dict] = []
self.run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")

Path(config.output_dir).mkdir(parents=True, exist_ok=True)

# ------------------------------------------------------------------
# Stage 1: Load
# ------------------------------------------------------------------

def load(self, source: str | list[str]) -> list[dict]:
"""
Load examples from one or more JSONL files.
Validates required fields and logs malformed records.
"""
sources = [source] if isinstance(source, str) else source
examples = []
malformed = 0

for path in sources:
print(f"Loading {path}...")
with open(path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
ex = json.loads(line)
if "input" not in ex or "output" not in ex:
malformed += 1
continue
# Attach source metadata
ex["_source_file"] = path
ex["_source_line"] = line_num
examples.append(ex)
except json.JSONDecodeError:
malformed += 1

print(
f"Loaded {len(examples)} examples from {len(sources)} files "
f"({malformed} malformed records skipped)"
)
self.stats.record_stage("load", len(examples))
return examples

# ------------------------------------------------------------------
# Stage 2: Deduplication
# ------------------------------------------------------------------

def dedup(
self, examples: list[dict], embeddings: Optional[np.ndarray] = None
) -> list[dict]:
"""Run exact and near-duplicate detection."""
current = examples

if self.config.exact_dedup:
current = exact_dedup(current)
self.stats.record_stage("exact_dedup", len(current))

if self.config.minhash_dedup:
current = minhash_dedup(
current,
similarity_threshold=self.config.minhash_similarity_threshold,
num_hashes=self.config.minhash_num_hashes,
num_bands=self.config.minhash_num_bands,
)
self.stats.record_stage("minhash_dedup", len(current))

if self.config.embedding_dedup and embeddings is not None:
current = embedding_dedup(
current,
embeddings,
similarity_threshold=self.config.embedding_similarity_threshold,
)
self.stats.record_stage("embedding_dedup", len(current))

return current

# ------------------------------------------------------------------
# Stage 3: Heuristic Filtering
# ------------------------------------------------------------------

def heuristic_filter_stage(self, examples: list[dict]) -> list[dict]:
"""Apply fast rule-based quality filters."""
passed, rejected = heuristic_filter(examples)

for ex in rejected:
for reason in ex.get("_rejection_reasons", ["unknown"]):
self.stats.record_rejection(f"heuristic: {reason}")
self.rejected.extend(rejected)

self.stats.record_stage("heuristic_filter", len(passed))
return passed

# ------------------------------------------------------------------
# Stage 4: LLM Quality Scoring
# ------------------------------------------------------------------

def quality_score(self, examples: list[dict]) -> list[dict]:
"""Score examples with LLM judge and filter below threshold."""
if not self.config.llm_judge:
print("LLM judge disabled - skipping quality scoring")
return examples

passed, rejected = llm_judge_filter(
examples,
min_overall_score=self.config.min_quality_score,
max_examples=self.config.llm_judge_max_examples,
)

# Record score distribution
for ex in passed:
if "_scores" in ex:
self.stats.score_distribution.append(ex["_scores"].get("overall", 0))

for ex in rejected:
self.stats.record_rejection(
ex.get("_rejection_reason", "llm_judge: low score")
)
self.rejected.extend(rejected)

self.stats.record_stage("llm_judge", len(passed))
return passed

# ------------------------------------------------------------------
# Stage 5: Balance
# ------------------------------------------------------------------

def balance(
self,
examples: list[dict],
max_per_topic: Optional[int] = None,
) -> list[dict]:
"""
Optional: cap examples per topic category to prevent over-representation.
Requires examples to have a '_topic' field (add during loading or scoring).
"""
if max_per_topic is None:
return examples

topic_counts: dict[str, int] = {}
balanced = []

rng = np.random.default_rng(seed=self.config.random_seed)
shuffled = list(examples)
rng.shuffle(shuffled)

for ex in shuffled:
topic = ex.get("_topic", "unknown")
count = topic_counts.get(topic, 0)
if count < max_per_topic:
balanced.append(ex)
topic_counts[topic] = count + 1

removed = len(examples) - len(balanced)
print(f"Balance: {len(examples)}{len(balanced)} ({removed} capped for balance)")
self.stats.record_stage("balance", len(balanced))
return balanced

# ------------------------------------------------------------------
# Stage 6: Export
# ------------------------------------------------------------------

def export(self, examples: list[dict]) -> dict[str, Any]:
"""
Export curated dataset and lineage metadata.

Writes:
- curated_RUNID.jsonl - clean training examples
- rejected_RUNID.jsonl - rejected examples with reasons
- lineage_RUNID.json - full provenance and stats
"""
output_dir = Path(self.config.output_dir)

# Write curated examples (strip internal metadata fields)
curated_path = output_dir / f"{self.config.dataset_name}_{self.run_id}.jsonl"
clean_keys = {"input", "output", "system", "metadata"}
written = 0

with open(curated_path, "w", encoding="utf-8") as f:
for ex in examples:
clean_ex = {k: v for k, v in ex.items() if k in clean_keys or not k.startswith("_")}
f.write(json.dumps(clean_ex, ensure_ascii=False) + "\n")
written += 1

# Write rejected examples for analysis
rejected_path = output_dir / f"rejected_{self.run_id}.jsonl"
with open(rejected_path, "w", encoding="utf-8") as f:
for ex in self.rejected:
f.write(json.dumps(ex, ensure_ascii=False, default=str) + "\n")

# Compute dataset fingerprint
fingerprint = hashlib.sha256(
"".join(ex.get("input", "") for ex in examples[:100]).encode()
).hexdigest()[:16]

# Write lineage metadata
lineage = {
"run_id": self.run_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"dataset_name": self.config.dataset_name,
"fingerprint": fingerprint,
"config": self.config.__dict__,
"stats": self.stats.summary(),
"final_count": written,
"rejected_count": len(self.rejected),
"output_files": {
"curated": str(curated_path),
"rejected": str(rejected_path),
},
}

lineage_path = output_dir / f"lineage_{self.run_id}.json"
with open(lineage_path, "w", encoding="utf-8") as f:
json.dump(lineage, f, indent=2, ensure_ascii=False, default=str)

print(f"\nExport complete:")
print(f" Curated: {curated_path} ({written} examples)")
print(f" Rejected: {rejected_path} ({len(self.rejected)} examples)")
print(f" Lineage: {lineage_path}")
print(f" Elapsed: {self.stats.elapsed():.1f}s")

return lineage

# ------------------------------------------------------------------
# Full Pipeline
# ------------------------------------------------------------------

def run(
self,
source: str | list[str],
embeddings: Optional[np.ndarray] = None,
max_per_topic: Optional[int] = None,
) -> dict[str, Any]:
"""Execute the full curation pipeline end to end."""
print(f"\n{'='*60}")
print(f"DataCurationPipeline - Run {self.run_id}")
print(f"{'='*60}\n")

examples = self.load(source)
examples = self.dedup(examples, embeddings)
examples = self.heuristic_filter_stage(examples)
examples = self.quality_score(examples)
examples = self.balance(examples, max_per_topic)
lineage = self.export(examples)

# Print summary
print(f"\nPipeline Summary:")
for stage, count in self.stats.stage_counts.items():
print(f" {stage:25s}: {count:,}")

return lineage


# Example usage
if __name__ == "__main__":
config = CurationConfig(
exact_dedup=True,
minhash_dedup=True,
minhash_similarity_threshold=0.8,
llm_judge=True,
llm_judge_model="claude-haiku-4-5-20251001",
min_quality_score=3.5,
output_dir="./curated",
dataset_name="helpdesk_v1",
)

pipeline = DataCurationPipeline(config)
lineage = pipeline.run(
source=["data/raw_tickets_2024.jsonl", "data/raw_tickets_2023.jsonl"],
max_per_topic=500,
)

Dataset Statistics and Analysis

Understanding your dataset's statistical properties before fine-tuning is not optional - it is how you catch distribution problems before they become model problems.

Length Distribution Analysis

import json
from collections import Counter
from pathlib import Path

import numpy as np


def analyze_length_distribution(dataset_path: str) -> dict:
"""Compute input/output length statistics."""
input_lengths = []
output_lengths = []

with open(dataset_path) as f:
for line in f:
ex = json.loads(line)
input_lengths.append(len(ex.get("input", "").split()))
output_lengths.append(len(ex.get("output", "").split()))

def stats(values: list[int]) -> dict:
arr = np.array(values)
return {
"count": len(arr),
"mean": float(np.mean(arr)),
"median": float(np.median(arr)),
"p10": float(np.percentile(arr, 10)),
"p90": float(np.percentile(arr, 90)),
"p99": float(np.percentile(arr, 99)),
"min": int(np.min(arr)),
"max": int(np.max(arr)),
}

return {
"input_length": stats(input_lengths),
"output_length": stats(output_lengths),
"total_tokens_estimate": sum(input_lengths) + sum(output_lengths),
}


def analyze_topic_balance(dataset_path: str, topic_field: str = "topic") -> dict:
"""Analyze topic/category distribution."""
topic_counts: Counter = Counter()

with open(dataset_path) as f:
for line in f:
ex = json.loads(line)
topic = ex.get(topic_field, "unknown")
topic_counts[topic] += 1

total = sum(topic_counts.values())
distribution = {
topic: {"count": count, "fraction": count / total}
for topic, count in topic_counts.most_common()
}

# Compute imbalance ratio
counts = list(topic_counts.values())
imbalance_ratio = max(counts) / min(counts) if min(counts) > 0 else float("inf")

return {
"topic_distribution": distribution,
"num_topics": len(topic_counts),
"imbalance_ratio": imbalance_ratio,
"is_balanced": imbalance_ratio < 10,
}


def print_dataset_report(dataset_path: str) -> None:
"""Print a human-readable dataset quality report."""
lengths = analyze_length_distribution(dataset_path)
topics = analyze_topic_balance(dataset_path)

print(f"\nDataset Report: {dataset_path}")
print("=" * 50)

il = lengths["input_length"]
print(f"\nInput Length (tokens, whitespace-split):")
print(f" Count: {il['count']:,}")
print(f" Mean: {il['mean']:.1f}")
print(f" Median: {il['median']:.1f}")
print(f" P10: {il['p10']:.0f} | P90: {il['p90']:.0f} | P99: {il['p99']:.0f}")

ol = lengths["output_length"]
print(f"\nOutput Length (tokens, whitespace-split):")
print(f" Mean: {ol['mean']:.1f}")
print(f" Median: {ol['median']:.1f}")
print(f" P10: {ol['p10']:.0f} | P90: {ol['p90']:.0f} | P99: {ol['p99']:.0f}")

print(f"\nTotal Estimated Tokens: {lengths['total_tokens_estimate']:,}")

print(f"\nTopic Distribution ({topics['num_topics']} topics):")
print(f" Imbalance Ratio: {topics['imbalance_ratio']:.1f}x")
print(f" Balanced: {'Yes' if topics['is_balanced'] else 'No - consider capping over-represented topics'}")
print(f"\n Top Topics:")
for topic, info in list(topics["topic_distribution"].items())[:10]:
bar = "█" * int(info["fraction"] * 40)
print(f" {topic:25s} {info['count']:5,} {bar}")

What the Numbers Should Tell You

Before you launch a fine-tuning job, your dataset statistics should pass these checks:

CheckHealthy SignalWarning Signal
Input length P90/P10 ratioLess than 20xGreater than 50x
Output length median50–300 tokensLess than 20 or greater than 800 tokens
Topic imbalance ratioLess than 10xGreater than 50x
Examples with score less than 3.0Less than 5%Greater than 20%
Dedup reduction rate5–30%Greater than 60% (dataset was mostly duplicates)
Final dataset sizeProportional to task complexityLess than 1K (too small) or greater than 500K without curation
warning

A deduplication removal rate above 60% is a red flag. It means your raw dataset was essentially the same data repeated many times. Before fine-tuning on what remains, investigate whether you actually have enough topical diversity to generalize.


Data Versioning and Lineage Tracking

Dataset versions must be tracked with the same rigor as model versions. You will need to reproduce fine-tuning runs, debug behavior regressions, and audit what data a model was trained on. Without lineage, none of this is possible.

What Lineage Must Capture

import hashlib
import json
import os
from datetime import datetime, timezone
from pathlib import Path


class DatasetRegistry:
"""
Simple file-based dataset version registry.

Tracks versions, lineage, and links datasets to fine-tuning runs.
In production, replace with MLflow, W&B Artifacts, or DVC.
"""

def __init__(self, registry_path: str = "./dataset_registry.json") -> None:
self.registry_path = Path(registry_path)
self.registry = self._load()

def _load(self) -> dict:
if self.registry_path.exists():
with open(self.registry_path) as f:
return json.load(f)
return {"datasets": {}, "runs": []}

def _save(self) -> None:
with open(self.registry_path, "w") as f:
json.dump(self.registry, f, indent=2, ensure_ascii=False)

def compute_fingerprint(self, dataset_path: str) -> str:
"""SHA-256 of the first 10K lines for fast fingerprinting."""
sha = hashlib.sha256()
with open(dataset_path, "rb") as f:
for i, line in enumerate(f):
if i >= 10000:
break
sha.update(line)
return sha.hexdigest()

def register_dataset(
self,
name: str,
version: str,
dataset_path: str,
lineage: dict,
notes: str = "",
) -> str:
"""Register a curated dataset version."""
fingerprint = self.compute_fingerprint(dataset_path)
file_size = os.path.getsize(dataset_path)

entry = {
"name": name,
"version": version,
"fingerprint": fingerprint,
"file_size_bytes": file_size,
"dataset_path": str(dataset_path),
"lineage": lineage,
"notes": notes,
"registered_at": datetime.now(timezone.utc).isoformat(),
}

key = f"{name}@{version}"
self.registry["datasets"][key] = entry
self._save()

print(f"Registered dataset: {key} (fingerprint: {fingerprint[:12]}...)")
return fingerprint

def link_to_training_run(
self,
dataset_key: str,
run_id: str,
model_name: str,
hyperparams: dict,
) -> None:
"""Record that a training run used a specific dataset version."""
run_record = {
"run_id": run_id,
"dataset_key": dataset_key,
"model_name": model_name,
"hyperparams": hyperparams,
"started_at": datetime.now(timezone.utc).isoformat(),
}
self.registry["runs"].append(run_record)
self._save()
print(f"Linked run {run_id} to dataset {dataset_key}")

def get_dataset_history(self, name: str) -> list[dict]:
"""Return all versions of a named dataset."""
return [
v for k, v in self.registry["datasets"].items()
if k.startswith(f"{name}@")
]
tip

For production use, integrate with DVC (data version control) or Weights & Biases Artifacts. These tools give you dataset diffing, lineage visualization, and storage-efficient versioning with minimal setup.


Common Dataset Pitfalls

Every team building fine-tuning datasets makes some version of the same mistakes. Here they are, with the symptoms that surface when you hit each one.

PitfallRoot CauseSymptomFix
Label leakageAnswer appears in the input (e.g., "summarize this: ... SUMMARY: ...")Model copies input rather than generatingStrip answer portions from inputs during preprocessing
Distribution mismatchTraining distribution differs from production distributionHigh eval score, poor production performanceSample training data to match production query distribution
Recency biasTraining data skewed toward recent examplesModel fails on older or seasonal queriesBalance by time period; include historical examples
Format overfittingAll training examples have identical formatModel refuses inputs in different formatsVary formatting deliberately; include format-diverse examples
Near-duplicate contaminationParaphrased versions of eval set in trainingInflated eval scoresAlways dedup across train/eval splits, not just within train
Length biasTraining outputs consistently one lengthModel truncates or pads to match training length distributionAnalyze length distribution; ensure it matches expected production output
Topic concentration80% of examples cover 20% of topicsModel excellent on common topics, fails on rare onesCap per-topic examples; oversample rare topics
Annotation inconsistencyDifferent annotators applying different quality standardsNoisy training signal; unstable fine-tuning lossUse annotation rubrics; measure inter-annotator agreement
Eval contaminationEval examples processed by same pipeline as trainDedup removes eval examples instead of train duplicatesRun dedup on train-only; eval set is frozen before any processing
Temporal data leakageFuture information in features or contextModel learns patterns that won't generalizeSort by timestamp; train on past, eval on future
danger

Never run deduplication across your combined train and eval sets. Always freeze your eval set first, then run deduplication on the training set only. If a training example is a near-duplicate of an eval example, remove the training example - not the eval example. Running dedup on the combined pool will silently inflate your evaluation scores.


Interview Questions and Answers

Q1: You have a dataset of 200K examples but your fine-tuned model underperforms a baseline trained on 20K carefully curated examples. What happened and how do you diagnose it?

Answer:

The most likely explanation is that the 200K dataset has a lower signal-to-noise ratio than the 20K curated set. Gradient descent optimizes over all examples equally (unless you use weighted loss), so low-quality examples actively degrade learning.

Diagnosis steps:

First, compute the deduplication rate on the 200K dataset. If you remove 30–50% as near-duplicates, the "200K" dataset was really 100–140K unique examples - and many of those may still be on the same narrow set of topics.

Second, sample 200 random examples from the 200K dataset and manually score them using your quality rubric. This gives you an empirical quality distribution. If the average is below 3.5/5, the bulk of examples are actively harmful.

Third, compute the topic distribution. If 60% of examples cluster around 5 common topics and only 10% are spread across all other topics, you have severe topic concentration - the model learns the top-5 topics well and everything else badly.

Fourth, compare the length distributions. Bimodal distributions (many very short + many very long) often indicate mixing incompatible data sources, which confuses the model about expected response format.

The fix is curation: deduplicate, apply quality filtering with LLM-as-judge, cap per-topic examples, and verify the resulting dataset's statistics match your target distribution. Starting with 200K and ending at 30K after curation is a good outcome, not a failure.


Q2: What is MinHash LSH and why is it preferred over embedding similarity for large-scale deduplication?

Answer:

MinHash is a probabilistic algorithm that estimates Jaccard similarity between sets using compact signatures called MinHash signatures. For document deduplication, you represent each document as a set of n-gram shingles, then compute a fixed-size signature by applying multiple hash functions and taking the minimum value for each.

The key property: if two documents have Jaccard similarity J, the probability that their MinHash signatures agree on any given hash function is also J. This allows you to estimate similarity from the signatures without comparing full documents.

Locality Sensitive Hashing (LSH) then groups documents into buckets using bands of the signature. Documents that share any bucket are candidate duplicates. You only do pairwise comparison within buckets, making the algorithm sub-quadratic in the number of documents.

Why it beats embedding similarity at scale:

Embedding similarity requires computing a dense vector for every document (expensive) and then computing pairwise cosine similarity (O(n^2) naive, O(n log n) with FAISS). For 10M documents, even FAISS requires significant GPU resources and careful index tuning.

MinHash scales to billions of documents with commodity hardware. It requires no model inference - only n-gram extraction and hashing. The tradeoff is accuracy: MinHash is approximate and may miss near-duplicates that differ in character-level features but are semantically identical. For most fine-tuning datasets (up to 10M examples), MinHash is the right tool. For very high-precision deduplication (medical, legal, safety-critical), you add an embedding verification step on MinHash candidates.


Q3: How would you use an LLM-as-judge to score dataset quality, and what are the failure modes you must guard against?

Answer:

LLM-as-judge uses a strong frontier model (Claude, GPT-4) to score training examples on quality dimensions like correctness, helpfulness, instruction-following, and completeness. You design a structured prompt with a scoring rubric, force the model to reason before scoring, and parse the numeric scores from the output.

Implementation considerations:

Reasoning before score: asking the model to explain its scoring before giving the number significantly improves accuracy. The model "commits" to a position through reasoning, which reduces anchoring to superficial signals like response length.

Structured output: use JSON with explicit field names for each dimension. Parse the JSON and validate the schema - malformed outputs need retry logic.

Dimension weighting: not all dimensions are equally important for your task. A customer support model cares more about correctness and helpfulness than stylistic quality. Weight dimensions accordingly.

Failure modes:

Verbosity bias: LLMs tend to rate longer responses higher, even when shorter responses are more appropriate. Counter this by including an example of an excellent short response in your rubric.

Self-consistency bias: if you use the same model family to generate synthetic data and judge its quality, the judge will rate its own outputs higher than human-written alternatives. Use a different model family as judge when possible.

Calibration drift: scores from a judge LLM are not absolute - they depend on the prompt and the model. If you update either, recalibrate by having humans score a sample and aligning the LLM threshold to match human judgment.

Cost: at scale, even cheap models add up. Score only examples that pass heuristic filtering. For a pipeline that reduces 500K to 100K through heuristics, you only pay to LLM-score 100K examples.

Prompt injection: adversarial content in training examples can manipulate the judge prompt. Sanitize example content and use strict delimiters (XML tags or triple-quotes) to separate the rubric from the example being scored.


Q4: What is the difference between train/eval split contamination and label leakage, and how do you prevent each?

Answer:

These are distinct problems that are often conflated.

Train/eval contamination happens when examples in your training set are semantically similar to (or exact copies of) examples in your evaluation set. The model effectively "memorizes" the eval set during training, inflating your evaluation metrics. The result is that your eval scores look great but the model generalizes poorly to production queries.

Prevention: freeze your eval set before any preprocessing. Run deduplication only on the training pool. If a training example is a near-duplicate of an eval example, remove the training example. Never modify the eval set.

Label leakage happens when the input to a training example contains the answer you are trying to predict. For example, if you are training a summarization model and your input is "Summarize the following: [article text] SUMMARY: [the actual summary]", the model learns to copy the portion after "SUMMARY:" rather than learning to summarize.

Label leakage is subtle because it manifests as excellent training loss but complete failure on real inputs (where the answer is not in the input). The model has learned a shortcut that does not exist in production.

Prevention: inspect input-output pairs for answer contamination. Use schema validation to ensure inputs and outputs are structured correctly. Run a heuristic check: compute the longest common subsequence between input and output - if it exceeds 40% of output length, flag for review.

A third related problem is temporal leakage: training data that contains information from the future relative to the query timestamp. This is common in financial and time-series domains. Prevention requires sorting data by timestamp and ensuring training uses only past information.


Q5: How do you handle imbalanced datasets in fine-tuning, and when should you balance versus when should you let the imbalance stand?

Answer:

Dataset imbalance in fine-tuning means some topics, task types, or output patterns appear far more often than others. Whether to correct this depends on what you want the model to do.

When to balance:

If your production traffic is roughly uniform across topics but your training data is 80% one topic, the model will over-optimize for that topic and underperform on others. Balance by capping per-topic examples (say, 500 maximum per category) and oversampling rare categories using synthetic augmentation.

If you are training a classifier head and classes are severely imbalanced, use weighted loss (weight each example inversely proportional to its class frequency) rather than resampling. Resampling duplicates examples, which can cause overfitting on small classes.

When to let imbalance stand:

If your production traffic is itself imbalanced in the same way as your training data, then preserving the imbalance is correct. A customer support model where 60% of real queries are password resets should train on data where 60% of examples are password resets - that reflects real-world usage.

If you are doing instruction fine-tuning (RLHF / SFT) and you have naturally diverse data that happens to be unequal in topic counts, moderate imbalance (less than 10x) is usually fine. Modern base models are robust enough to learn minority patterns from a handful of examples.

Practical rule: measure your production query distribution, compare it to your training distribution, and correct only where there is a significant mismatch. A 10x imbalance in training on a topic that appears 0.1% in production is not worth correcting - you have five examples of that topic regardless of whether you balance. Synthetic augmentation is the right fix when a real topic has insufficient training coverage.


Summary

Dataset curation is the highest-leverage work in fine-tuning. The team with 5K curated examples beat the team with 500K raw ones because quality compounds: every good example teaches the model correctly, and every bad example adds noise that must be overcome by the good ones.

The complete curation pipeline runs in six stages: load and validate raw sources, deduplicate with exact matching and MinHash LSH, apply heuristic filters for obvious problems, score quality with LLM-as-judge, balance across topics to prevent over-representation, and export with full lineage metadata.

The most dangerous mistakes are not the obvious ones. Exact deduplication is easy - near-duplicate contamination is what silently ruins datasets. Label leakage produces perfect training loss and zero production value. Eval contamination inflates your confidence in a model that will fail immediately in production. Dataset lineage is not bureaucracy - it is how you debug model behavior six months after training.

The correct mental model is this: your training dataset is a curriculum. Every example is a lesson. You would not design a curriculum by randomly selecting 500K pages from the internet. Design it the same way a great teacher designs a course - deliberately, with attention to what each example teaches, and ruthless about removing anything that teaches the wrong lesson.

tip

Start every fine-tuning project by building a gold evaluation set of 200–500 hand-curated examples before you touch training data. This gold set is your compass: it tells you whether a dataset change is helping or hurting. Without it, you are flying blind.

© 2026 EngineersOfAI. All rights reserved.