:::tip 🎮 Interactive Playground Visualize this concept: Try the Synthetic Data Generation demo on the EngineersOfAI Playground - no code required. :::
Dataset Curation for Fine-Tuning
The 500K Problem
The team had every reason to be confident. Six months of customer support transcripts. Half a million examples. Real users. Real questions. Real resolutions. The data engineering lead had spent three weeks cleaning the raw exports - stripping HTML, normalizing encodings, converting to JSONL. It looked immaculate.
They fine-tuned Mistral-7B. The model finished training with a clean loss curve. Internal eval scores were up eleven points over the base model. Everyone celebrated.
Then a product manager ran a side experiment. She had personally curated 5,000 examples from the same ticket system - spending two weeks reading tickets, rewriting bad answers, removing duplicates by hand, balancing topics. She fine-tuned the same base model on her 5K dataset. Her model scored fifteen points higher than the 500K model on every benchmark they ran. Support engineers preferred her model in blind A/B tests by a two-to-one margin.
The team had made the most common mistake in fine-tuning: they treated data volume as a proxy for data quality. The 500K dataset was full of copy-paste boilerplate responses, near-duplicate tickets about the same three issues, agent answers that said "I'll escalate this" without solving anything, and examples where the user's question was never actually answered. Every one of those examples was teaching the model to be mediocre.
This lesson is about preventing that outcome. We will cover how to source data, how to detect and remove duplicates, how to score and filter for quality, and how to build a production curation pipeline. By the end, you will understand why dataset curation is not a preprocessing step - it is the most important engineering decision you make before fine-tuning.
Data Quality Is Not a Filter - It Is the Foundation
Most teams think about data quality the way they think about linting: something you run at the end to catch obvious problems. The correct mental model is the opposite. Data quality is the foundation your model's behavior is built on. Every example in your training set is a lesson. Bad lessons create bad behavior - and unlike bugs in code, bad fine-tuning behavior is hard to trace back to its source.
Why More Data Makes Things Worse
Adding low-quality data to a high-quality dataset degrades model performance. This is empirically well-established. The reason is that gradient descent optimizes over the entire dataset. If 30% of your examples teach the model to give vague non-answers, the model learns to give vague non-answers 30% of the time - or worse, it learns a compromised mixture of good and bad response styles that makes it mediocre across the board.
LIMA (Zhao et al., 2023) demonstrated this definitively: 1,000 carefully curated examples produced better instruction-following than models trained on datasets fifty times larger with lower curation standards. The signal-to-noise ratio in your dataset determines your ceiling.
Data Quality vs. Model Performance
High │ ● ← 5K curated
│ ●
│ ●
Perf │ ●
│ ●
│ ●
Low │●
└──────────────────────────────
0 100K 300K 500K
Dataset Size
Same base model, same training config.
The only variable: curation quality.
The Curation Pipeline Overview
Every stage removes noise. The goal is not to keep as many examples as possible - it is to keep only the examples that teach the behavior you want.
Data Sources: Where Fine-Tuning Data Comes From
Before you can curate, you need data. The three primary sources each have distinct characteristics, failure modes, and use cases.
Source Comparison Table
| Source | Quality Ceiling | Cost | Scale | Main Risk |
|---|---|---|---|---|
| Human-written (expert) | Highest | Very high ($20–100/example) | Low (thousands) | Inconsistency between annotators |
| Human-written (crowd) | Medium | Medium ($1–5/example) | Medium (tens of thousands) | Variable quality, gaming |
| Synthetic (teacher LLM) | High if curated | Low ($0.001–0.01/example) | Unlimited | Style collapse, hallucination |
| Web-scraped | Wildly variable | Near zero | Billions | Noise, toxicity, duplication |
| Existing logs (user data) | Depends on product | Near zero | Large | Privacy, boilerplate, irrelevance |
Human-Written Data
Expert-written data sets the quality ceiling for your dataset. When a domain expert writes an ideal response to a well-formed question, that example teaches the model exactly what good looks like. This is why LIMA, Alpaca, and OpenAssistant all invested heavily in expert curation even at small scale.
The failure modes are human: annotators disagree on what "good" means, they get tired and write shorter answers over time, they interpret instructions differently. Mitigation requires detailed annotation guidelines, inter-annotator agreement checks, and calibration sessions.
When to use: always include some expert examples as anchors. They set the quality bar that your filtering pipeline should aspire to match.
Synthetic Data
Synthetic data from a teacher LLM (GPT-4, Claude, Gemini) is the most practical path to scale. You design prompts that specify exactly what you want, and the teacher model generates examples. The quality is bounded by the teacher model's capability and your prompt design.
The failure modes are subtle. Teacher models have their own stylistic biases - Claude tends toward structured responses with headers, GPT-4 tends toward numbered lists. If you train exclusively on synthetic data, your student model inherits those biases whether you want them or not. More dangerously, synthetic data can hallucinate plausible-sounding but incorrect facts, and those facts get baked into your fine-tuned model with high confidence.
Never use synthetic data without factual verification for knowledge-intensive domains (medicine, law, finance, security). LLM-generated content sounds authoritative even when wrong. Build in a factual spot-check layer before including synthetic examples in your training set.
When to use: synthetic data works well for format compliance (output structure, length), style transfer, and augmenting rare categories. Use a strong teacher model and always filter the output.
Web-Scraped Data
Web scraping can produce enormous datasets cheaply, but the quality distribution is extremely wide. Most text on the internet is not instruction-following data. You need both domain filtering (only include content relevant to your task) and quality filtering (remove low-quality text within that domain).
Common Quality Signals for Web Data:
- Perplexity score: high perplexity on a reference LM = unusual/noisy text
- Length distribution: too short = fragment; too long = padding
- Repetition ratio: repeated n-grams = boilerplate or spam
- Language detection: wrong language for your target domain
- URL source quality: known low-quality domains
When to use: only with aggressive filtering. Web data is a starting point, not a destination.
Deduplication: The Most Underrated Step
Deduplication is routinely underestimated. Teams remove exact duplicates and consider the job done. But near-duplicates - examples that differ by a few words, by paraphrase, by minor formatting - are just as harmful. They bias the model toward overlearning the duplicated patterns while underlearning everything else.
Consider a customer support dataset where 40,000 of 100,000 examples are variations of "How do I reset my password?" with slightly different phrasing. Even after removing exact duplicates, you have 35,000 near-duplicate password reset examples. Your fine-tuned model will be excellent at password resets and mediocre at everything else - because that is what you trained it on.
Exact Deduplication
Exact deduplication is straightforward: normalize text (lowercase, strip whitespace, remove punctuation), compute SHA-256, group by hash. Keep one example from each group.
import hashlib
from collections import defaultdict
def compute_hash(text: str) -> str:
normalized = " ".join(text.lower().strip().split())
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def exact_dedup(examples: list[dict]) -> list[dict]:
seen = set()
unique = []
duplicates = 0
for ex in examples:
# Hash on the full conversation content
content = ex.get("input", "") + ex.get("output", "")
h = compute_hash(content)
if h not in seen:
seen.add(h)
unique.append(ex)
else:
duplicates += 1
print(f"Exact dedup: {len(examples)} → {len(unique)} ({duplicates} removed)")
return unique
Near-Duplicate Detection with MinHash
MinHash (Locality Sensitive Hashing) detects near-duplicates efficiently without comparing every pair of documents. It works by computing a compact signature for each document based on its n-gram shingles, then grouping documents with similar signatures into buckets. Documents in the same bucket are candidate duplicates.
MinHash scales to millions of examples with sub-linear time complexity. It is the right tool when exact deduplication is not enough.
import re
from collections import defaultdict
from dataclasses import dataclass, field
import numpy as np
def get_shingles(text: str, k: int = 5) -> set[str]:
"""Extract character k-grams from text."""
normalized = re.sub(r"\s+", " ", text.lower().strip())
return {normalized[i : i + k] for i in range(len(normalized) - k + 1)}
def minhash_signature(shingles: set[str], num_hashes: int = 128) -> np.ndarray:
"""Compute MinHash signature for a set of shingles."""
# Use random hash functions: h(x) = (a*x + b) % large_prime
rng = np.random.default_rng(seed=42)
large_prime = (1 << 31) - 1
a = rng.integers(1, large_prime, size=num_hashes)
b = rng.integers(0, large_prime, size=num_hashes)
sig = np.full(num_hashes, np.inf)
for shingle in shingles:
h = hash(shingle) % large_prime
values = (a * h + b) % large_prime
sig = np.minimum(sig, values)
return sig.astype(np.int64)
def lsh_buckets(
signatures: list[np.ndarray], num_bands: int = 16
) -> dict[tuple, list[int]]:
"""Group document indices into LSH buckets by band."""
num_hashes = signatures[0].shape[0]
rows_per_band = num_hashes // num_bands
buckets: dict[tuple, list[int]] = defaultdict(list)
for doc_idx, sig in enumerate(signatures):
for band in range(num_bands):
start = band * rows_per_band
end = start + rows_per_band
band_key = (band, tuple(sig[start:end]))
buckets[band_key].append(doc_idx)
return buckets
def minhash_dedup(
examples: list[dict],
similarity_threshold: float = 0.8,
num_hashes: int = 128,
num_bands: int = 16,
) -> list[dict]:
"""Remove near-duplicates using MinHash LSH."""
print(f"Computing MinHash signatures for {len(examples)} examples...")
texts = [ex.get("input", "") + " " + ex.get("output", "") for ex in examples]
shingle_sets = [get_shingles(t) for t in texts]
signatures = [minhash_signature(s, num_hashes) for s in shingle_sets]
buckets = lsh_buckets(signatures, num_bands)
# Find connected components of near-duplicate groups
parent = list(range(len(examples)))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(x: int, y: int) -> None:
px, py = find(x), find(y)
if px != py:
parent[px] = py
for bucket_members in buckets.values():
if len(bucket_members) < 2:
continue
# Verify Jaccard similarity for bucket candidates
for i in range(len(bucket_members)):
for j in range(i + 1, len(bucket_members)):
a, b = bucket_members[i], bucket_members[j]
sig_a, sig_b = signatures[a], signatures[b]
jaccard_est = np.mean(sig_a == sig_b)
if jaccard_est >= similarity_threshold:
union(a, b)
# Keep one example per connected component
kept = {}
for idx in range(len(examples)):
root = find(idx)
if root not in kept:
kept[root] = idx
unique_indices = sorted(kept.values())
removed = len(examples) - len(unique_indices)
print(f"MinHash dedup: {len(examples)} → {len(unique_indices)} ({removed} near-duplicates removed)")
return [examples[i] for i in unique_indices]
Embedding-Based Near-Deduplication
For smaller datasets (up to ~100K examples) or when you need higher precision than MinHash, embedding-based similarity detection is more accurate. Compute embeddings for each example, then cluster or threshold by cosine similarity.
from typing import Optional
import numpy as np
def cosine_similarity_matrix(embeddings: np.ndarray) -> np.ndarray:
"""Compute pairwise cosine similarity matrix."""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
normalized = embeddings / (norms + 1e-8)
return normalized @ normalized.T
def embedding_dedup(
examples: list[dict],
embeddings: np.ndarray,
similarity_threshold: float = 0.92,
batch_size: int = 1000,
) -> list[dict]:
"""
Remove near-duplicates based on embedding cosine similarity.
For large datasets, processes in batches to avoid OOM.
Keeps the first example when a duplicate pair is found.
"""
n = len(examples)
removed = set()
# Process in batches to manage memory
for start in range(0, n, batch_size):
end = min(start + batch_size, n)
batch_embeddings = embeddings[start:end]
# Compare batch against all subsequent examples
for i in range(len(batch_embeddings)):
global_i = start + i
if global_i in removed:
continue
# Compare with rest of batch
for j in range(i + 1, len(batch_embeddings)):
global_j = start + j
if global_j in removed:
continue
sim = float(
np.dot(batch_embeddings[i], batch_embeddings[j])
/ (
np.linalg.norm(batch_embeddings[i])
* np.linalg.norm(batch_embeddings[j])
+ 1e-8
)
)
if sim >= similarity_threshold:
removed.add(global_j)
unique = [ex for idx, ex in enumerate(examples) if idx not in removed]
print(
f"Embedding dedup: {n} → {len(unique)} "
f"({len(removed)} near-duplicates removed, threshold={similarity_threshold})"
)
return unique
For production pipelines on millions of examples, use FAISS approximate nearest neighbor search instead of brute-force cosine similarity. FAISS can process 10M+ embeddings in minutes on a single GPU.
Quality Filtering Pipeline
After deduplication, you filter for quality. Quality filtering happens in two layers: fast heuristic filters that remove obvious problems in milliseconds, and slow LLM-as-judge scoring that evaluates subtle quality dimensions.
Layer 1: Heuristic Filters
Heuristic filters are rules derived from domain knowledge about what bad examples look like. They are fast (microseconds per example), cheap, and should run first.
| Filter | What It Catches | Threshold (typical) |
|---|---|---|
| Minimum length | Fragments, one-word answers | Input: 20 tokens, Output: 10 tokens |
| Maximum length | Padded/bloated responses | Input: 2048 tokens, Output: 1024 tokens |
| Repetition ratio | Copy-paste boilerplate, stuck loops | Less than 15% repeated bigrams |
| Language ID | Wrong-language examples | Confidence > 0.95 for target language |
| PII patterns | SSNs, credit cards, emails, phone numbers | Zero tolerance |
| Toxic content | Hate speech, explicit content | Score < 0.1 |
| Completion ratio | Truncated responses | Output length / expected length > 0.8 |
| Format compliance | Missing required fields, malformed JSON | Schema validation pass |
import re
from typing import Optional
def check_length(
example: dict,
min_input_tokens: int = 20,
max_input_tokens: int = 2048,
min_output_tokens: int = 10,
max_output_tokens: int = 1024,
) -> tuple[bool, Optional[str]]:
"""Rough token length check (whitespace split as proxy)."""
input_len = len(example.get("input", "").split())
output_len = len(example.get("output", "").split())
if input_len < min_input_tokens:
return False, f"input too short ({input_len} tokens)"
if input_len > max_input_tokens:
return False, f"input too long ({input_len} tokens)"
if output_len < min_output_tokens:
return False, f"output too short ({output_len} tokens)"
if output_len > max_output_tokens:
return False, f"output too long ({output_len} tokens)"
return True, None
def check_repetition(text: str, threshold: float = 0.15) -> tuple[bool, Optional[str]]:
"""Flag examples with excessive repeated bigrams."""
words = text.lower().split()
if len(words) < 10:
return True, None
bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)]
unique_bigrams = set(bigrams)
repetition_ratio = 1.0 - (len(unique_bigrams) / len(bigrams))
if repetition_ratio > threshold:
return False, f"high repetition ratio ({repetition_ratio:.2f})"
return True, None
def check_pii(text: str) -> tuple[bool, list[str]]:
"""Detect common PII patterns via regex."""
patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]\d{4}[\s-]\d{4}[\s-]\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
}
found = []
for pii_type, pattern in patterns.items():
if re.search(pattern, text):
found.append(pii_type)
return len(found) == 0, found
def heuristic_filter(examples: list[dict]) -> tuple[list[dict], list[dict]]:
"""
Apply all heuristic filters. Returns (passed, rejected) with rejection reasons.
"""
passed = []
rejected = []
for ex in examples:
rejection_reasons = []
input_text = ex.get("input", "")
output_text = ex.get("output", "")
# Length check
ok, reason = check_length(ex)
if not ok:
rejection_reasons.append(reason)
# Repetition check on output
ok, reason = check_repetition(output_text)
if not ok:
rejection_reasons.append(reason)
# PII check on full text
full_text = input_text + " " + output_text
ok, pii_types = check_pii(full_text)
if not ok:
rejection_reasons.append(f"PII detected: {', '.join(pii_types)}")
if rejection_reasons:
ex["_rejection_reasons"] = rejection_reasons
rejected.append(ex)
else:
passed.append(ex)
print(
f"Heuristic filter: {len(examples)} → {len(passed)} passed, "
f"{len(rejected)} rejected"
)
return passed, rejected
Layer 2: LLM-as-Judge Quality Scoring
After heuristic filtering removes obvious problems, LLM-as-judge scoring evaluates subtle quality dimensions: correctness, helpfulness, instruction-following, response quality. This is expensive - use it only on examples that pass heuristics.
The key insight from LLM-as-judge research (Zheng et al., 2023) is that strong LLMs are reliable judges of response quality, especially when you use structured scoring rubrics and force explicit reasoning before the score.
import json
import os
import time
from typing import Optional
import anthropic
QUALITY_JUDGE_PROMPT = """You are a quality evaluator for LLM fine-tuning data.
Evaluate the following example on five dimensions.
<example>
<input>{input}</input>
<output>{output}</output>
</example>
Score each dimension from 1 to 5, where:
1 = Poor (unacceptable, should be excluded)
2 = Below average (significant issues)
3 = Average (acceptable but not great)
4 = Good (clearly useful training example)
5 = Excellent (ideal training example)
Dimensions:
1. CORRECTNESS: Is the output factually accurate and free of errors?
2. HELPFULNESS: Does the output directly address what the input asks for?
3. INSTRUCTION_FOLLOWING: Does the output follow any formatting/style instructions in the input?
4. COMPLETENESS: Is the output complete, or does it truncate / leave things unfinished?
5. QUALITY: Is the output well-written, clear, and appropriate in length?
Respond with ONLY valid JSON in this exact format:
{{
"reasoning": "Brief explanation of your scores (2-3 sentences)",
"correctness": <1-5>,
"helpfulness": <1-5>,
"instruction_following": <1-5>,
"completeness": <1-5>,
"quality": <1-5>,
"overall": <1-5>,
"exclude": <true|false>
}}
Set "exclude" to true if ANY dimension scores 1, or if overall score is below 3."""
def score_example_with_llm(
example: dict,
client: anthropic.Anthropic,
model: str = "claude-haiku-4-5-20251001",
max_retries: int = 3,
) -> Optional[dict]:
"""
Score a single example using Claude as judge.
Uses claude-haiku-4-5-20251001 for cost efficiency at scale.
Returns parsed score dict or None on failure.
"""
prompt = QUALITY_JUDGE_PROMPT.format(
input=example.get("input", "")[:1500], # truncate to stay within context
output=example.get("output", "")[:1500],
)
for attempt in range(max_retries):
try:
response = client.messages.create(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
content = response.content[0].text.strip()
# Strip markdown code blocks if present
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
scores = json.loads(content)
return scores
except json.JSONDecodeError as e:
if attempt < max_retries - 1:
time.sleep(1.0 * (attempt + 1))
continue
print(f"JSON parse error after {max_retries} attempts: {e}")
return None
except anthropic.RateLimitError:
wait = 5.0 * (attempt + 1)
print(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
except anthropic.APIError as e:
print(f"API error: {e}")
return None
return None
def llm_judge_filter(
examples: list[dict],
min_overall_score: float = 3.5,
max_examples: Optional[int] = None,
batch_delay: float = 0.1,
) -> tuple[list[dict], list[dict]]:
"""
Run LLM-as-judge scoring on examples and filter below threshold.
Args:
examples: Pre-filtered examples to score
min_overall_score: Minimum overall score to keep (1-5 scale)
max_examples: Optional cap (useful for cost control during testing)
batch_delay: Seconds between API calls to avoid rate limits
Returns:
(passed, rejected) lists with scores attached
"""
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
if max_examples:
examples = examples[:max_examples]
passed = []
rejected = []
failed_scoring = []
for i, ex in enumerate(examples):
if i % 100 == 0:
print(f"Scoring example {i}/{len(examples)}...")
scores = score_example_with_llm(ex, client)
if scores is None:
# Scoring failed - keep example but flag it
ex["_score_failed"] = True
failed_scoring.append(ex)
continue
ex["_scores"] = scores
overall = scores.get("overall", 0)
exclude = scores.get("exclude", False)
if exclude or overall < min_overall_score:
ex["_rejection_reason"] = f"LLM judge score {overall:.1f} < {min_overall_score}"
rejected.append(ex)
else:
passed.append(ex)
if batch_delay > 0:
time.sleep(batch_delay)
print(
f"LLM judge filter: {len(examples)} scored → "
f"{len(passed)} passed, {len(rejected)} rejected, "
f"{len(failed_scoring)} failed to score"
)
return passed, rejected
At Claude Haiku pricing (~25–50 depending on example length. This is cheap insurance against training on low-quality data. Budget this into your pipeline from the start.
The Complete DataCurationPipeline
Now we assemble all the pieces into a single, end-to-end pipeline class with logging, statistics, and lineage tracking built in.
import hashlib
import json
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import numpy as np
@dataclass
class CurationConfig:
"""Configuration for the data curation pipeline."""
# Deduplication
exact_dedup: bool = True
minhash_dedup: bool = True
minhash_similarity_threshold: float = 0.8
minhash_num_hashes: int = 128
minhash_num_bands: int = 16
embedding_dedup: bool = False
embedding_similarity_threshold: float = 0.92
# Heuristic filters
min_input_tokens: int = 20
max_input_tokens: int = 2048
min_output_tokens: int = 10
max_output_tokens: int = 1024
max_repetition_ratio: float = 0.15
# LLM judge
llm_judge: bool = True
llm_judge_model: str = "claude-haiku-4-5-20251001"
min_quality_score: float = 3.5
llm_judge_max_examples: Optional[int] = None
# Output
output_dir: str = "./curated_dataset"
dataset_name: str = "curated"
random_seed: int = 42
@dataclass
class CurationStats:
"""Tracks statistics through each pipeline stage."""
stage_counts: dict[str, int] = field(default_factory=dict)
rejection_reasons: dict[str, int] = field(default_factory=dict)
score_distribution: list[float] = field(default_factory=list)
start_time: float = field(default_factory=time.time)
def record_stage(self, stage: str, count: int) -> None:
self.stage_counts[stage] = count
def record_rejection(self, reason: str) -> None:
self.rejection_reasons[reason] = self.rejection_reasons.get(reason, 0) + 1
def elapsed(self) -> float:
return time.time() - self.start_time
def summary(self) -> dict:
return {
"stage_counts": self.stage_counts,
"top_rejection_reasons": sorted(
self.rejection_reasons.items(), key=lambda x: -x[1]
)[:10],
"score_distribution": {
"mean": float(np.mean(self.score_distribution)) if self.score_distribution else None,
"median": float(np.median(self.score_distribution)) if self.score_distribution else None,
"p25": float(np.percentile(self.score_distribution, 25)) if self.score_distribution else None,
"p75": float(np.percentile(self.score_distribution, 75)) if self.score_distribution else None,
},
"elapsed_seconds": self.elapsed(),
}
class DataCurationPipeline:
"""
End-to-end dataset curation pipeline for LLM fine-tuning.
Stages:
1. load - Load and validate raw examples
2. dedup - Exact + near-duplicate removal
3. heuristic - Fast rule-based filtering
4. quality - LLM-as-judge quality scoring
5. balance - Topic/length balancing
6. export - Save with lineage metadata
"""
def __init__(self, config: CurationConfig) -> None:
self.config = config
self.stats = CurationStats()
self.rejected: list[dict] = []
self.run_id = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
Path(config.output_dir).mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------------
# Stage 1: Load
# ------------------------------------------------------------------
def load(self, source: str | list[str]) -> list[dict]:
"""
Load examples from one or more JSONL files.
Validates required fields and logs malformed records.
"""
sources = [source] if isinstance(source, str) else source
examples = []
malformed = 0
for path in sources:
print(f"Loading {path}...")
with open(path, "r", encoding="utf-8") as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
ex = json.loads(line)
if "input" not in ex or "output" not in ex:
malformed += 1
continue
# Attach source metadata
ex["_source_file"] = path
ex["_source_line"] = line_num
examples.append(ex)
except json.JSONDecodeError:
malformed += 1
print(
f"Loaded {len(examples)} examples from {len(sources)} files "
f"({malformed} malformed records skipped)"
)
self.stats.record_stage("load", len(examples))
return examples
# ------------------------------------------------------------------
# Stage 2: Deduplication
# ------------------------------------------------------------------
def dedup(
self, examples: list[dict], embeddings: Optional[np.ndarray] = None
) -> list[dict]:
"""Run exact and near-duplicate detection."""
current = examples
if self.config.exact_dedup:
current = exact_dedup(current)
self.stats.record_stage("exact_dedup", len(current))
if self.config.minhash_dedup:
current = minhash_dedup(
current,
similarity_threshold=self.config.minhash_similarity_threshold,
num_hashes=self.config.minhash_num_hashes,
num_bands=self.config.minhash_num_bands,
)
self.stats.record_stage("minhash_dedup", len(current))
if self.config.embedding_dedup and embeddings is not None:
current = embedding_dedup(
current,
embeddings,
similarity_threshold=self.config.embedding_similarity_threshold,
)
self.stats.record_stage("embedding_dedup", len(current))
return current
# ------------------------------------------------------------------
# Stage 3: Heuristic Filtering
# ------------------------------------------------------------------
def heuristic_filter_stage(self, examples: list[dict]) -> list[dict]:
"""Apply fast rule-based quality filters."""
passed, rejected = heuristic_filter(examples)
for ex in rejected:
for reason in ex.get("_rejection_reasons", ["unknown"]):
self.stats.record_rejection(f"heuristic: {reason}")
self.rejected.extend(rejected)
self.stats.record_stage("heuristic_filter", len(passed))
return passed
# ------------------------------------------------------------------
# Stage 4: LLM Quality Scoring
# ------------------------------------------------------------------
def quality_score(self, examples: list[dict]) -> list[dict]:
"""Score examples with LLM judge and filter below threshold."""
if not self.config.llm_judge:
print("LLM judge disabled - skipping quality scoring")
return examples
passed, rejected = llm_judge_filter(
examples,
min_overall_score=self.config.min_quality_score,
max_examples=self.config.llm_judge_max_examples,
)
# Record score distribution
for ex in passed:
if "_scores" in ex:
self.stats.score_distribution.append(ex["_scores"].get("overall", 0))
for ex in rejected:
self.stats.record_rejection(
ex.get("_rejection_reason", "llm_judge: low score")
)
self.rejected.extend(rejected)
self.stats.record_stage("llm_judge", len(passed))
return passed
# ------------------------------------------------------------------
# Stage 5: Balance
# ------------------------------------------------------------------
def balance(
self,
examples: list[dict],
max_per_topic: Optional[int] = None,
) -> list[dict]:
"""
Optional: cap examples per topic category to prevent over-representation.
Requires examples to have a '_topic' field (add during loading or scoring).
"""
if max_per_topic is None:
return examples
topic_counts: dict[str, int] = {}
balanced = []
rng = np.random.default_rng(seed=self.config.random_seed)
shuffled = list(examples)
rng.shuffle(shuffled)
for ex in shuffled:
topic = ex.get("_topic", "unknown")
count = topic_counts.get(topic, 0)
if count < max_per_topic:
balanced.append(ex)
topic_counts[topic] = count + 1
removed = len(examples) - len(balanced)
print(f"Balance: {len(examples)} → {len(balanced)} ({removed} capped for balance)")
self.stats.record_stage("balance", len(balanced))
return balanced
# ------------------------------------------------------------------
# Stage 6: Export
# ------------------------------------------------------------------
def export(self, examples: list[dict]) -> dict[str, Any]:
"""
Export curated dataset and lineage metadata.
Writes:
- curated_RUNID.jsonl - clean training examples
- rejected_RUNID.jsonl - rejected examples with reasons
- lineage_RUNID.json - full provenance and stats
"""
output_dir = Path(self.config.output_dir)
# Write curated examples (strip internal metadata fields)
curated_path = output_dir / f"{self.config.dataset_name}_{self.run_id}.jsonl"
clean_keys = {"input", "output", "system", "metadata"}
written = 0
with open(curated_path, "w", encoding="utf-8") as f:
for ex in examples:
clean_ex = {k: v for k, v in ex.items() if k in clean_keys or not k.startswith("_")}
f.write(json.dumps(clean_ex, ensure_ascii=False) + "\n")
written += 1
# Write rejected examples for analysis
rejected_path = output_dir / f"rejected_{self.run_id}.jsonl"
with open(rejected_path, "w", encoding="utf-8") as f:
for ex in self.rejected:
f.write(json.dumps(ex, ensure_ascii=False, default=str) + "\n")
# Compute dataset fingerprint
fingerprint = hashlib.sha256(
"".join(ex.get("input", "") for ex in examples[:100]).encode()
).hexdigest()[:16]
# Write lineage metadata
lineage = {
"run_id": self.run_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"dataset_name": self.config.dataset_name,
"fingerprint": fingerprint,
"config": self.config.__dict__,
"stats": self.stats.summary(),
"final_count": written,
"rejected_count": len(self.rejected),
"output_files": {
"curated": str(curated_path),
"rejected": str(rejected_path),
},
}
lineage_path = output_dir / f"lineage_{self.run_id}.json"
with open(lineage_path, "w", encoding="utf-8") as f:
json.dump(lineage, f, indent=2, ensure_ascii=False, default=str)
print(f"\nExport complete:")
print(f" Curated: {curated_path} ({written} examples)")
print(f" Rejected: {rejected_path} ({len(self.rejected)} examples)")
print(f" Lineage: {lineage_path}")
print(f" Elapsed: {self.stats.elapsed():.1f}s")
return lineage
# ------------------------------------------------------------------
# Full Pipeline
# ------------------------------------------------------------------
def run(
self,
source: str | list[str],
embeddings: Optional[np.ndarray] = None,
max_per_topic: Optional[int] = None,
) -> dict[str, Any]:
"""Execute the full curation pipeline end to end."""
print(f"\n{'='*60}")
print(f"DataCurationPipeline - Run {self.run_id}")
print(f"{'='*60}\n")
examples = self.load(source)
examples = self.dedup(examples, embeddings)
examples = self.heuristic_filter_stage(examples)
examples = self.quality_score(examples)
examples = self.balance(examples, max_per_topic)
lineage = self.export(examples)
# Print summary
print(f"\nPipeline Summary:")
for stage, count in self.stats.stage_counts.items():
print(f" {stage:25s}: {count:,}")
return lineage
# Example usage
if __name__ == "__main__":
config = CurationConfig(
exact_dedup=True,
minhash_dedup=True,
minhash_similarity_threshold=0.8,
llm_judge=True,
llm_judge_model="claude-haiku-4-5-20251001",
min_quality_score=3.5,
output_dir="./curated",
dataset_name="helpdesk_v1",
)
pipeline = DataCurationPipeline(config)
lineage = pipeline.run(
source=["data/raw_tickets_2024.jsonl", "data/raw_tickets_2023.jsonl"],
max_per_topic=500,
)
Dataset Statistics and Analysis
Understanding your dataset's statistical properties before fine-tuning is not optional - it is how you catch distribution problems before they become model problems.
Length Distribution Analysis
import json
from collections import Counter
from pathlib import Path
import numpy as np
def analyze_length_distribution(dataset_path: str) -> dict:
"""Compute input/output length statistics."""
input_lengths = []
output_lengths = []
with open(dataset_path) as f:
for line in f:
ex = json.loads(line)
input_lengths.append(len(ex.get("input", "").split()))
output_lengths.append(len(ex.get("output", "").split()))
def stats(values: list[int]) -> dict:
arr = np.array(values)
return {
"count": len(arr),
"mean": float(np.mean(arr)),
"median": float(np.median(arr)),
"p10": float(np.percentile(arr, 10)),
"p90": float(np.percentile(arr, 90)),
"p99": float(np.percentile(arr, 99)),
"min": int(np.min(arr)),
"max": int(np.max(arr)),
}
return {
"input_length": stats(input_lengths),
"output_length": stats(output_lengths),
"total_tokens_estimate": sum(input_lengths) + sum(output_lengths),
}
def analyze_topic_balance(dataset_path: str, topic_field: str = "topic") -> dict:
"""Analyze topic/category distribution."""
topic_counts: Counter = Counter()
with open(dataset_path) as f:
for line in f:
ex = json.loads(line)
topic = ex.get(topic_field, "unknown")
topic_counts[topic] += 1
total = sum(topic_counts.values())
distribution = {
topic: {"count": count, "fraction": count / total}
for topic, count in topic_counts.most_common()
}
# Compute imbalance ratio
counts = list(topic_counts.values())
imbalance_ratio = max(counts) / min(counts) if min(counts) > 0 else float("inf")
return {
"topic_distribution": distribution,
"num_topics": len(topic_counts),
"imbalance_ratio": imbalance_ratio,
"is_balanced": imbalance_ratio < 10,
}
def print_dataset_report(dataset_path: str) -> None:
"""Print a human-readable dataset quality report."""
lengths = analyze_length_distribution(dataset_path)
topics = analyze_topic_balance(dataset_path)
print(f"\nDataset Report: {dataset_path}")
print("=" * 50)
il = lengths["input_length"]
print(f"\nInput Length (tokens, whitespace-split):")
print(f" Count: {il['count']:,}")
print(f" Mean: {il['mean']:.1f}")
print(f" Median: {il['median']:.1f}")
print(f" P10: {il['p10']:.0f} | P90: {il['p90']:.0f} | P99: {il['p99']:.0f}")
ol = lengths["output_length"]
print(f"\nOutput Length (tokens, whitespace-split):")
print(f" Mean: {ol['mean']:.1f}")
print(f" Median: {ol['median']:.1f}")
print(f" P10: {ol['p10']:.0f} | P90: {ol['p90']:.0f} | P99: {ol['p99']:.0f}")
print(f"\nTotal Estimated Tokens: {lengths['total_tokens_estimate']:,}")
print(f"\nTopic Distribution ({topics['num_topics']} topics):")
print(f" Imbalance Ratio: {topics['imbalance_ratio']:.1f}x")
print(f" Balanced: {'Yes' if topics['is_balanced'] else 'No - consider capping over-represented topics'}")
print(f"\n Top Topics:")
for topic, info in list(topics["topic_distribution"].items())[:10]:
bar = "█" * int(info["fraction"] * 40)
print(f" {topic:25s} {info['count']:5,} {bar}")
What the Numbers Should Tell You
Before you launch a fine-tuning job, your dataset statistics should pass these checks:
| Check | Healthy Signal | Warning Signal |
|---|---|---|
| Input length P90/P10 ratio | Less than 20x | Greater than 50x |
| Output length median | 50–300 tokens | Less than 20 or greater than 800 tokens |
| Topic imbalance ratio | Less than 10x | Greater than 50x |
| Examples with score less than 3.0 | Less than 5% | Greater than 20% |
| Dedup reduction rate | 5–30% | Greater than 60% (dataset was mostly duplicates) |
| Final dataset size | Proportional to task complexity | Less than 1K (too small) or greater than 500K without curation |
A deduplication removal rate above 60% is a red flag. It means your raw dataset was essentially the same data repeated many times. Before fine-tuning on what remains, investigate whether you actually have enough topical diversity to generalize.
Data Versioning and Lineage Tracking
Dataset versions must be tracked with the same rigor as model versions. You will need to reproduce fine-tuning runs, debug behavior regressions, and audit what data a model was trained on. Without lineage, none of this is possible.
What Lineage Must Capture
import hashlib
import json
import os
from datetime import datetime, timezone
from pathlib import Path
class DatasetRegistry:
"""
Simple file-based dataset version registry.
Tracks versions, lineage, and links datasets to fine-tuning runs.
In production, replace with MLflow, W&B Artifacts, or DVC.
"""
def __init__(self, registry_path: str = "./dataset_registry.json") -> None:
self.registry_path = Path(registry_path)
self.registry = self._load()
def _load(self) -> dict:
if self.registry_path.exists():
with open(self.registry_path) as f:
return json.load(f)
return {"datasets": {}, "runs": []}
def _save(self) -> None:
with open(self.registry_path, "w") as f:
json.dump(self.registry, f, indent=2, ensure_ascii=False)
def compute_fingerprint(self, dataset_path: str) -> str:
"""SHA-256 of the first 10K lines for fast fingerprinting."""
sha = hashlib.sha256()
with open(dataset_path, "rb") as f:
for i, line in enumerate(f):
if i >= 10000:
break
sha.update(line)
return sha.hexdigest()
def register_dataset(
self,
name: str,
version: str,
dataset_path: str,
lineage: dict,
notes: str = "",
) -> str:
"""Register a curated dataset version."""
fingerprint = self.compute_fingerprint(dataset_path)
file_size = os.path.getsize(dataset_path)
entry = {
"name": name,
"version": version,
"fingerprint": fingerprint,
"file_size_bytes": file_size,
"dataset_path": str(dataset_path),
"lineage": lineage,
"notes": notes,
"registered_at": datetime.now(timezone.utc).isoformat(),
}
key = f"{name}@{version}"
self.registry["datasets"][key] = entry
self._save()
print(f"Registered dataset: {key} (fingerprint: {fingerprint[:12]}...)")
return fingerprint
def link_to_training_run(
self,
dataset_key: str,
run_id: str,
model_name: str,
hyperparams: dict,
) -> None:
"""Record that a training run used a specific dataset version."""
run_record = {
"run_id": run_id,
"dataset_key": dataset_key,
"model_name": model_name,
"hyperparams": hyperparams,
"started_at": datetime.now(timezone.utc).isoformat(),
}
self.registry["runs"].append(run_record)
self._save()
print(f"Linked run {run_id} to dataset {dataset_key}")
def get_dataset_history(self, name: str) -> list[dict]:
"""Return all versions of a named dataset."""
return [
v for k, v in self.registry["datasets"].items()
if k.startswith(f"{name}@")
]
For production use, integrate with DVC (data version control) or Weights & Biases Artifacts. These tools give you dataset diffing, lineage visualization, and storage-efficient versioning with minimal setup.
Common Dataset Pitfalls
Every team building fine-tuning datasets makes some version of the same mistakes. Here they are, with the symptoms that surface when you hit each one.
| Pitfall | Root Cause | Symptom | Fix |
|---|---|---|---|
| Label leakage | Answer appears in the input (e.g., "summarize this: ... SUMMARY: ...") | Model copies input rather than generating | Strip answer portions from inputs during preprocessing |
| Distribution mismatch | Training distribution differs from production distribution | High eval score, poor production performance | Sample training data to match production query distribution |
| Recency bias | Training data skewed toward recent examples | Model fails on older or seasonal queries | Balance by time period; include historical examples |
| Format overfitting | All training examples have identical format | Model refuses inputs in different formats | Vary formatting deliberately; include format-diverse examples |
| Near-duplicate contamination | Paraphrased versions of eval set in training | Inflated eval scores | Always dedup across train/eval splits, not just within train |
| Length bias | Training outputs consistently one length | Model truncates or pads to match training length distribution | Analyze length distribution; ensure it matches expected production output |
| Topic concentration | 80% of examples cover 20% of topics | Model excellent on common topics, fails on rare ones | Cap per-topic examples; oversample rare topics |
| Annotation inconsistency | Different annotators applying different quality standards | Noisy training signal; unstable fine-tuning loss | Use annotation rubrics; measure inter-annotator agreement |
| Eval contamination | Eval examples processed by same pipeline as train | Dedup removes eval examples instead of train duplicates | Run dedup on train-only; eval set is frozen before any processing |
| Temporal data leakage | Future information in features or context | Model learns patterns that won't generalize | Sort by timestamp; train on past, eval on future |
Never run deduplication across your combined train and eval sets. Always freeze your eval set first, then run deduplication on the training set only. If a training example is a near-duplicate of an eval example, remove the training example - not the eval example. Running dedup on the combined pool will silently inflate your evaluation scores.
Interview Questions and Answers
Q1: You have a dataset of 200K examples but your fine-tuned model underperforms a baseline trained on 20K carefully curated examples. What happened and how do you diagnose it?
Answer:
The most likely explanation is that the 200K dataset has a lower signal-to-noise ratio than the 20K curated set. Gradient descent optimizes over all examples equally (unless you use weighted loss), so low-quality examples actively degrade learning.
Diagnosis steps:
First, compute the deduplication rate on the 200K dataset. If you remove 30–50% as near-duplicates, the "200K" dataset was really 100–140K unique examples - and many of those may still be on the same narrow set of topics.
Second, sample 200 random examples from the 200K dataset and manually score them using your quality rubric. This gives you an empirical quality distribution. If the average is below 3.5/5, the bulk of examples are actively harmful.
Third, compute the topic distribution. If 60% of examples cluster around 5 common topics and only 10% are spread across all other topics, you have severe topic concentration - the model learns the top-5 topics well and everything else badly.
Fourth, compare the length distributions. Bimodal distributions (many very short + many very long) often indicate mixing incompatible data sources, which confuses the model about expected response format.
The fix is curation: deduplicate, apply quality filtering with LLM-as-judge, cap per-topic examples, and verify the resulting dataset's statistics match your target distribution. Starting with 200K and ending at 30K after curation is a good outcome, not a failure.
Q2: What is MinHash LSH and why is it preferred over embedding similarity for large-scale deduplication?
Answer:
MinHash is a probabilistic algorithm that estimates Jaccard similarity between sets using compact signatures called MinHash signatures. For document deduplication, you represent each document as a set of n-gram shingles, then compute a fixed-size signature by applying multiple hash functions and taking the minimum value for each.
The key property: if two documents have Jaccard similarity J, the probability that their MinHash signatures agree on any given hash function is also J. This allows you to estimate similarity from the signatures without comparing full documents.
Locality Sensitive Hashing (LSH) then groups documents into buckets using bands of the signature. Documents that share any bucket are candidate duplicates. You only do pairwise comparison within buckets, making the algorithm sub-quadratic in the number of documents.
Why it beats embedding similarity at scale:
Embedding similarity requires computing a dense vector for every document (expensive) and then computing pairwise cosine similarity (O(n^2) naive, O(n log n) with FAISS). For 10M documents, even FAISS requires significant GPU resources and careful index tuning.
MinHash scales to billions of documents with commodity hardware. It requires no model inference - only n-gram extraction and hashing. The tradeoff is accuracy: MinHash is approximate and may miss near-duplicates that differ in character-level features but are semantically identical. For most fine-tuning datasets (up to 10M examples), MinHash is the right tool. For very high-precision deduplication (medical, legal, safety-critical), you add an embedding verification step on MinHash candidates.
Q3: How would you use an LLM-as-judge to score dataset quality, and what are the failure modes you must guard against?
Answer:
LLM-as-judge uses a strong frontier model (Claude, GPT-4) to score training examples on quality dimensions like correctness, helpfulness, instruction-following, and completeness. You design a structured prompt with a scoring rubric, force the model to reason before scoring, and parse the numeric scores from the output.
Implementation considerations:
Reasoning before score: asking the model to explain its scoring before giving the number significantly improves accuracy. The model "commits" to a position through reasoning, which reduces anchoring to superficial signals like response length.
Structured output: use JSON with explicit field names for each dimension. Parse the JSON and validate the schema - malformed outputs need retry logic.
Dimension weighting: not all dimensions are equally important for your task. A customer support model cares more about correctness and helpfulness than stylistic quality. Weight dimensions accordingly.
Failure modes:
Verbosity bias: LLMs tend to rate longer responses higher, even when shorter responses are more appropriate. Counter this by including an example of an excellent short response in your rubric.
Self-consistency bias: if you use the same model family to generate synthetic data and judge its quality, the judge will rate its own outputs higher than human-written alternatives. Use a different model family as judge when possible.
Calibration drift: scores from a judge LLM are not absolute - they depend on the prompt and the model. If you update either, recalibrate by having humans score a sample and aligning the LLM threshold to match human judgment.
Cost: at scale, even cheap models add up. Score only examples that pass heuristic filtering. For a pipeline that reduces 500K to 100K through heuristics, you only pay to LLM-score 100K examples.
Prompt injection: adversarial content in training examples can manipulate the judge prompt. Sanitize example content and use strict delimiters (XML tags or triple-quotes) to separate the rubric from the example being scored.
Q4: What is the difference between train/eval split contamination and label leakage, and how do you prevent each?
Answer:
These are distinct problems that are often conflated.
Train/eval contamination happens when examples in your training set are semantically similar to (or exact copies of) examples in your evaluation set. The model effectively "memorizes" the eval set during training, inflating your evaluation metrics. The result is that your eval scores look great but the model generalizes poorly to production queries.
Prevention: freeze your eval set before any preprocessing. Run deduplication only on the training pool. If a training example is a near-duplicate of an eval example, remove the training example. Never modify the eval set.
Label leakage happens when the input to a training example contains the answer you are trying to predict. For example, if you are training a summarization model and your input is "Summarize the following: [article text] SUMMARY: [the actual summary]", the model learns to copy the portion after "SUMMARY:" rather than learning to summarize.
Label leakage is subtle because it manifests as excellent training loss but complete failure on real inputs (where the answer is not in the input). The model has learned a shortcut that does not exist in production.
Prevention: inspect input-output pairs for answer contamination. Use schema validation to ensure inputs and outputs are structured correctly. Run a heuristic check: compute the longest common subsequence between input and output - if it exceeds 40% of output length, flag for review.
A third related problem is temporal leakage: training data that contains information from the future relative to the query timestamp. This is common in financial and time-series domains. Prevention requires sorting data by timestamp and ensuring training uses only past information.
Q5: How do you handle imbalanced datasets in fine-tuning, and when should you balance versus when should you let the imbalance stand?
Answer:
Dataset imbalance in fine-tuning means some topics, task types, or output patterns appear far more often than others. Whether to correct this depends on what you want the model to do.
When to balance:
If your production traffic is roughly uniform across topics but your training data is 80% one topic, the model will over-optimize for that topic and underperform on others. Balance by capping per-topic examples (say, 500 maximum per category) and oversampling rare categories using synthetic augmentation.
If you are training a classifier head and classes are severely imbalanced, use weighted loss (weight each example inversely proportional to its class frequency) rather than resampling. Resampling duplicates examples, which can cause overfitting on small classes.
When to let imbalance stand:
If your production traffic is itself imbalanced in the same way as your training data, then preserving the imbalance is correct. A customer support model where 60% of real queries are password resets should train on data where 60% of examples are password resets - that reflects real-world usage.
If you are doing instruction fine-tuning (RLHF / SFT) and you have naturally diverse data that happens to be unequal in topic counts, moderate imbalance (less than 10x) is usually fine. Modern base models are robust enough to learn minority patterns from a handful of examples.
Practical rule: measure your production query distribution, compare it to your training distribution, and correct only where there is a significant mismatch. A 10x imbalance in training on a topic that appears 0.1% in production is not worth correcting - you have five examples of that topic regardless of whether you balance. Synthetic augmentation is the right fix when a real topic has insufficient training coverage.
Summary
Dataset curation is the highest-leverage work in fine-tuning. The team with 5K curated examples beat the team with 500K raw ones because quality compounds: every good example teaches the model correctly, and every bad example adds noise that must be overcome by the good ones.
The complete curation pipeline runs in six stages: load and validate raw sources, deduplicate with exact matching and MinHash LSH, apply heuristic filters for obvious problems, score quality with LLM-as-judge, balance across topics to prevent over-representation, and export with full lineage metadata.
The most dangerous mistakes are not the obvious ones. Exact deduplication is easy - near-duplicate contamination is what silently ruins datasets. Label leakage produces perfect training loss and zero production value. Eval contamination inflates your confidence in a model that will fail immediately in production. Dataset lineage is not bureaucracy - it is how you debug model behavior six months after training.
The correct mental model is this: your training dataset is a curriculum. Every example is a lesson. You would not design a curriculum by randomly selecting 500K pages from the internet. Design it the same way a great teacher designs a course - deliberately, with attention to what each example teaches, and ruthless about removing anything that teaches the wrong lesson.
Start every fine-tuning project by building a gold evaluation set of 200–500 hand-curated examples before you touch training data. This gold set is your compass: it tells you whether a dataset change is helping or hurting. Without it, you are flying blind.
