Skip to main content

:::tip 🎮 Interactive Playground Visualize this concept: Try the Data Drift Detection demo on the EngineersOfAI Playground - no code required. :::

Data Quality and Filtering: The Hidden Lever in Synthetic Data

The Pipeline That Made Things Worse

The team had done everything right - or so they thought. Two weeks of generation using a well-designed Self-Instruct pipeline. Half a million instruction-response pairs. Topic coverage across 40 domains. Diversity guaranteed by embedding-based deduplication. The dataset looked comprehensive. Impressive even.

They fine-tuned LLaMA-7B on the full dataset. Training ran for three days. Evaluation started and the numbers came in: the fine-tuned model was measurably worse than the base model on every benchmark they cared about. Worse, not just unchanged. It hallucinated more confidently. It gave verbose non-answers to simple questions. It occasionally produced content that violated their safety policies - content that wasn't in the base model at all. It had, by every metric they tracked, regressed.

The post-mortem was humbling. On manual review of 500 randomly sampled examples: 28% contained factual errors. 19% had format problems where the "response" field was actually another instruction continuation. 12% were near-duplicates that had slipped past embedding deduplication. 5% contained either inappropriate content or self-referential AI chatbot phrases ("As an AI language model, I cannot...") that taught the model to refuse legitimate questions. Fine-tuning amplifies signal and noise equally. Train on garbage at scale and you get a confidently garbage model.

The lesson the Phi team had articulated and this team rediscovered: data quality is the dominant factor in small model fine-tuning performance. The difference between 7 billion tokens of random internet data and 7 billion tokens of textbook-quality, filtered data is 10-30 percentage points on reasoning benchmarks. This lesson is about the systematic pipeline for turning raw synthetic generation into high-quality training data.

The Layered Filtering Architecture

Quality filtering works best as a cascade: fast and cheap layers first to eliminate obvious garbage, expensive layers last to make fine-grained quality judgments on the survivors. Each layer should reject 5-25% of examples - if a layer rejects everything, the previous generation step was wrong; if it rejects nothing, the layer is too lenient.

Layer 1: Structural Filters

Fast, rule-based checks. Zero API calls. Run these first - they eliminate obvious problems for free.

import re
from dataclasses import dataclass
from typing import Optional


@dataclass
class FilterResult:
passed: bool
reason: Optional[str] = None
score: float = 1.0


def structural_filter(instruction: str, response: str) -> FilterResult:
"""
Fast structural validation - no API calls needed.

Catches: empty fields, extreme lengths, self-referential responses,
responses that are just the instruction repeated.

Target rejection rate: 10-15% of raw data.
"""
# None/empty checks
if not instruction or not instruction.strip():
return FilterResult(False, "empty_instruction")
if not response or not response.strip():
return FilterResult(False, "empty_response")

instruction = instruction.strip()
response = response.strip()

# Minimum word counts
instr_words = len(instruction.split())
resp_words = len(response.split())

if instr_words < 3:
return FilterResult(False, f"instruction_too_short: {instr_words} words")
if resp_words < 5:
return FilterResult(False, f"response_too_short: {resp_words} words")

# Maximum word counts (catch pathological outliers)
if instr_words > 800:
return FilterResult(False, f"instruction_too_long: {instr_words} words")
if resp_words > 8000:
return FilterResult(False, f"response_too_long: {resp_words} words")

# Response starts with instruction metadata markers
response_lower = response.lower().strip()
bad_starts = [
"instruction:", "task:", "question:", "prompt:", "input:",
"task 1:", "task 2:", "here's a task:", "here is a task:",
]
if any(response_lower.startswith(s) for s in bad_starts):
return FilterResult(False, "response_is_instruction")

# Response is just the instruction repeated
if instruction.strip().lower() == response.strip().lower():
return FilterResult(False, "response_equals_instruction")

# Response is a substring of the instruction (generation cut off)
if response.strip().lower() in instruction.lower():
return FilterResult(False, "response_is_instruction_substring")

# Character composition sanity checks
if len(response) > 0:
# Too many special chars suggests encoding issue
special_char_ratio = sum(1 for c in response if not c.isalnum() and c not in ' \n\t.,!?;:()-_\'\"[]{}') / len(response)
if special_char_ratio > 0.4:
return FilterResult(False, f"high_special_char_ratio: {special_char_ratio:.2f}")

return FilterResult(True)

Layer 2: Heuristic Filters

Pattern-based filters for common quality problems specific to LLM-generated data:

import re
from typing import Optional


# Phrases indicating the model refused or couldn't answer
REFUSAL_PATTERNS = [
r"i cannot (help|assist|provide|generate|create|write|complete)",
r"i (am|'m) (not able|unable) to",
r"i don't (have|possess) (the ability|access|information)",
r"as an ai (language model|assistant|system)",
r"i must (decline|refuse|respectfully decline)",
r"this (request|question|task) (is|seems) (inappropriate|harmful|unethical)",
r"i apologize,? but i (cannot|can't|won't|am not able)",
r"i'm sorry,? but i (cannot|can't|won't)",
r"i don't feel comfortable",
]

# Low-effort opener phrases that indicate generic responses
GENERIC_OPENER_PATTERNS = [
r"^(sure|certainly|of course|absolutely|definitely)[,!.]?\s+(here|i)",
r"^great (question|choice|point)[!.]",
r"^(excellent|wonderful|fantastic) (question|point)[!.]",
r"^thank(s| you) for (asking|your question)",
]

# Closer phrases that suggest filler content
FILLER_CLOSER_PATTERNS = [
r"(feel free to|don't hesitate to) (ask|reach out)",
r"i hope this (helps|answers|clarifies|is helpful)",
r"please (let me know|don't hesitate) if you (have|need|want)",
r"is there anything else (i can|you need)",
]

# Self-referential patterns (model talking about itself instead of answering)
SELF_REFERENTIAL_PATTERNS = [
r"as an ai,? i",
r"my training (data|cutoff|information)",
r"i was trained (by|on|to|with)",
r"my knowledge (cutoff|is limited|ends)",
r"i don't have (real-time|live|current|up-to-date)",
r"my (capabilities|limitations) (include|are)",
]


def heuristic_filter(instruction: str, response: str) -> FilterResult:
"""
Pattern-based quality checks for LLM-specific failure modes.

These patterns catch problems that are essentially impossible
to catch without knowing what LLM-generated data looks like:
refusals, generic openers, excessive self-reference, etc.

Target rejection rate: 15-20% of structurally-valid data.
"""
response_lower = response.lower()

# Check refusal patterns (hard reject)
for pattern in REFUSAL_PATTERNS:
if re.search(pattern, response_lower):
return FilterResult(False, f"refusal_detected")

# Check self-referential patterns (reject if multiple present)
self_ref_count = sum(
1 for p in SELF_REFERENTIAL_PATTERNS
if re.search(p, response_lower)
)
if self_ref_count >= 2:
return FilterResult(False, f"excessive_self_reference: {self_ref_count} indicators")

# Generic opener check
for pattern in GENERIC_OPENER_PATTERNS:
if re.search(pattern, response_lower[:100]):
return FilterResult(False, "generic_opener_pattern")

# Response should have substance relative to instruction complexity
instruction_words = len(instruction.split())
response_words = len(response.split())

# Complex question with very short response
if instruction_words > 30 and response_words < 20:
return FilterResult(False, "response_too_brief_for_complex_question")

# Very simple question with extremely long response (verbosity)
if instruction_words < 10 and response_words > 1000:
return FilterResult(False, "excessive_verbosity_for_simple_question")

# Filler closer count
filler_count = sum(
1 for p in FILLER_CLOSER_PATTERNS
if re.search(p, response_lower[-300:]) # Check last 300 chars
)
if filler_count >= 2:
return FilterResult(False, "excessive_filler_closers")

return FilterResult(True)


def modality_filter(instruction: str, response: str) -> FilterResult:
"""
Filter tasks requiring unavailable modalities (images, audio, video).

The key distinction: "image processing algorithm" is fine (conceptual reference),
"analyze this image" is not (requires actual image input we don't have).
"""
MODALITY_REFERENTIAL_PHRASES = {
"image": ["this image", "the image", "given image", "following image", "attached image", "uploaded image", "show in the image"],
"audio": ["this audio", "the audio", "listen to", "the sound file", "attached audio"],
"video": ["this video", "the video", "watch the", "in the video"],
"file": ["this file", "attached file", "uploaded file", "the spreadsheet", "the excel file"],
}

instruction_lower = instruction.lower()
for modality, phrases in MODALITY_REFERENTIAL_PHRASES.items():
for phrase in phrases:
if phrase in instruction_lower:
return FilterResult(False, f"requires_modality:{modality}")

return FilterResult(True)

Layer 3: Semantic Deduplication

Near-duplicate removal using embedding similarity - the layer that catches semantic duplicates ROUGE-L misses:

import numpy as np
from typing import Optional


def semantic_deduplicate_dataset(
examples: list[dict],
similarity_threshold: float = 0.85,
embedding_field: str = "instruction",
batch_size: int = 128,
verbose: bool = True,
) -> tuple[list[dict], list[dict]]:
"""
Remove semantically duplicate examples using sentence embeddings.

Why this matters: ROUGE-L catches lexical duplicates. Embedding similarity
catches semantic duplicates - "Summarize this article" and "Give me the
main points of the following text" are semantically identical but have
low ROUGE-L overlap. Training on both wastes capacity.

Algorithm: Greedy sequential deduplication. Process examples in order,
keeping an example only if its cosine similarity to all kept examples
is below the threshold. O(n²) in the worst case but fast in practice
because most examples are unique.

Args:
examples: List of example dicts
similarity_threshold: Cosine similarity above which to consider duplicate
embedding_field: Field to embed for comparison (usually "instruction")
batch_size: Embedding computation batch size

Returns:
(kept_examples, removed_examples) tuple
"""
try:
from sentence_transformers import SentenceTransformer
except ImportError:
print("pip install sentence-transformers")
return examples, []

if len(examples) == 0:
return [], []

model = SentenceTransformer("all-MiniLM-L6-v2")
texts = [ex.get(embedding_field, "") for ex in examples]

if verbose:
print(f"Computing embeddings for {len(texts)} examples...")

embeddings = model.encode(
texts,
batch_size=batch_size,
show_progress_bar=verbose,
normalize_embeddings=True, # Enables dot product as cosine similarity
)

kept_indices = [0]
kept_embeddings = [embeddings[0]]
removed_indices = []

for i in range(1, len(examples)):
kept_matrix = np.array(kept_embeddings)
# Vectorized dot product (cosine similarity since normalized)
similarities = np.dot(kept_matrix, embeddings[i])
max_sim = similarities.max()

if max_sim < similarity_threshold:
kept_indices.append(i)
kept_embeddings.append(embeddings[i])
else:
removed_indices.append(i)

if verbose and (i + 1) % 10000 == 0:
print(f" Processed {i+1}/{len(examples)} | Kept: {len(kept_indices)}")

kept = [examples[i] for i in kept_indices]
removed = [examples[i] for i in removed_indices]

if verbose:
print(f"\nDeduplication complete:")
print(f" Input: {len(examples):,}")
print(f" Kept: {len(kept):,} ({len(kept)/len(examples)*100:.1f}%)")
print(f" Removed: {len(removed):,} ({len(removed)/len(examples)*100:.1f}%)")

return kept, removed


def cluster_and_balance(
examples: list[dict],
n_clusters: int = 50,
max_per_cluster: int = 1000,
quality_field: Optional[str] = "_overall_score",
embedding_field: str = "instruction",
verbose: bool = True,
) -> list[dict]:
"""
Cluster examples by topic and cap each cluster to prevent topic imbalance.

Without this, Python programming might have 10,000 examples while
database design has 200. The fine-tuned model becomes Python-expert
and database-novice.

Within each cluster, keeps the highest-quality examples (by quality_field)
up to max_per_cluster.

Args:
examples: Examples with optional quality scores
n_clusters: Number of topic clusters to fit
max_per_cluster: Maximum examples to keep per cluster
quality_field: Field to sort by within each cluster (optional)

Returns:
Balanced list of examples
"""
try:
from sentence_transformers import SentenceTransformer
from sklearn.cluster import MiniBatchKMeans
except ImportError:
print("pip install sentence-transformers scikit-learn")
return examples

model = SentenceTransformer("all-MiniLM-L6-v2")
texts = [ex.get(embedding_field, "") for ex in examples]
embeddings = model.encode(texts, show_progress_bar=verbose, normalize_embeddings=True)

kmeans = MiniBatchKMeans(n_clusters=n_clusters, random_state=42, batch_size=1000)
labels = kmeans.fit_predict(embeddings)

# Group by cluster
clusters: dict[int, list[int]] = {}
for i, label in enumerate(labels):
clusters.setdefault(label, []).append(i)

balanced_indices = []
for cluster_id, indices in clusters.items():
# Sort by quality score if available (higher quality first)
if quality_field:
indices_sorted = sorted(
indices,
key=lambda i: examples[i].get(quality_field, 0),
reverse=True
)
else:
# Sort by response length as proxy for detail/quality
indices_sorted = sorted(
indices,
key=lambda i: len(examples[i].get("output", "").split()),
reverse=True
)

balanced_indices.extend(indices_sorted[:max_per_cluster])

balanced = [examples[i] for i in balanced_indices]

if verbose:
print(f"Cluster balancing: {len(examples):,}{len(balanced):,} examples")
print(f"({n_clusters} clusters, max {max_per_cluster} per cluster)")

return balanced

Layer 4: LLM Quality Scoring

The most expensive but most accurate layer:

import anthropic
import json
import re
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional

client = anthropic.Anthropic()

QUALITY_RUBRIC = """You are a training data quality evaluator. Score this instruction-response pair for fine-tuning quality.

Instruction: {instruction}

Response: {response}

Rate each dimension 1-5:
1. accuracy: Is the response factually correct and technically sound?
(1=Wrong, 3=Partially correct, 5=Completely accurate)
2. helpfulness: Does the response directly address the instruction?
(1=Off-topic, 3=Partially relevant, 5=Perfectly on-target)
3. completeness: Is the response appropriately detailed for this instruction?
(1=Major gaps, 3=Covers basics, 5=Comprehensive)
4. clarity: Is the response clear and easy to understand?
(1=Confusing, 3=Mostly clear, 5=Crystal clear)
5. format: Is formatting appropriate (code blocks, lists, length)?
(1=Wrong format, 3=Acceptable, 5=Perfect)

Respond with ONLY this JSON, no other text:
{{"accuracy": N, "helpfulness": N, "completeness": N, "clarity": N, "format": N, "overall": N}}

Where overall is your holistic 1-5 assessment (can differ from average of dimensions)."""


def score_example_quality(
instruction: str,
response: str,
scorer_model: str = "claude-haiku-4-5-20251001",
max_instruction_chars: int = 800,
max_response_chars: int = 1500,
) -> dict:
"""
Score a single training example using LLM-as-judge.

Uses claude-haiku (cheap/fast) for high-throughput scoring.
Score 1-5 on 5 dimensions + holistic overall score.

Args:
instruction: The instruction text
response: The response text
scorer_model: Cheap model for scoring (not the generator model!)
max_instruction_chars: Truncate instruction for cost control
max_response_chars: Truncate response for cost control

Returns:
Dict with dimension scores and overall score (1-5 scale)
"""
prompt = QUALITY_RUBRIC.format(
instruction=instruction[:max_instruction_chars],
response=response[:max_response_chars],
)

try:
api_response = client.messages.create(
model=scorer_model,
max_tokens=200,
temperature=0, # Deterministic scoring
messages=[{"role": "user", "content": prompt}]
)

text = api_response.content[0].text.strip()
json_match = re.search(r'\{[^}]+\}', text, re.DOTALL)
if json_match:
scores = json.loads(json_match.group())
# Validate all required fields present
required = ["accuracy", "helpfulness", "completeness", "clarity", "format", "overall"]
if all(k in scores for k in required):
return scores

return {"overall": 3.0, "parse_error": True}

except Exception as e:
return {"overall": 3.0, "error": str(e)}


def batch_score_parallel(
examples: list[dict],
task_description: str,
min_overall_score: float = 3.5,
max_workers: int = 10,
instruction_field: str = "instruction",
response_field: str = "output",
score_sample_rate: float = 1.0,
verbose: bool = True,
) -> tuple[list[dict], list[dict]]:
"""
Score examples in parallel and filter by minimum quality.

Args:
examples: List of examples to score
task_description: Used in reporting (not passed to scorer)
min_overall_score: Minimum 1-5 score to keep example
max_workers: Parallel scoring threads
score_sample_rate: Only score this fraction (save cost by sampling)

Returns:
(accepted_examples, rejected_examples) tuple
"""
import random

# Optionally sample to reduce cost
if score_sample_rate < 1.0:
n_to_score = int(len(examples) * score_sample_rate)
score_indices = set(random.sample(range(len(examples)), n_to_score))
unscored = [ex for i, ex in enumerate(examples) if i not in score_indices]
to_score = [ex for i, ex in enumerate(examples) if i in score_indices]
else:
to_score = examples
unscored = []

if verbose:
print(f"Scoring {len(to_score)} examples with {max_workers} workers...")

def score_one(example: dict) -> dict:
scores = score_example_quality(
example.get(instruction_field, ""),
example.get(response_field, "")
)
return {**example, "_quality_scores": scores, "_overall_score": scores.get("overall", 0)}

scored = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(score_one, ex): ex for ex in to_score}
for i, future in enumerate(as_completed(futures)):
try:
result = future.result()
scored.append(result)
except Exception as e:
scored.append({**futures[future], "_overall_score": 3.0, "_error": str(e)})

if verbose and (i + 1) % 1000 == 0:
print(f" Scored: {i+1}/{len(to_score)}")

# Filter scored examples
accepted_scored = [ex for ex in scored if ex["_overall_score"] >= min_overall_score]
rejected_scored = [ex for ex in scored if ex["_overall_score"] < min_overall_score]

# Unscored examples: default to accepted
all_accepted = accepted_scored + unscored
all_rejected = rejected_scored

if verbose:
all_scores = [ex["_overall_score"] for ex in scored]
print(f"\nQuality filter results:")
print(f" Score distribution: mean={np.mean(all_scores):.2f}, "
f"p25={np.percentile(all_scores, 25):.2f}, "
f"p75={np.percentile(all_scores, 75):.2f}")
print(f" Threshold: {min_overall_score}")
print(f" Accepted: {len(all_accepted):,} | Rejected: {len(all_rejected):,}")

return all_accepted, all_rejected

Layer 5: Safety Filtering

import re
from typing import Optional


# Hard-blocked patterns (regex prefilter - no API call needed)
HARD_BLOCKED_PATTERNS = [
r"\b(how to (make|build|create|synthesize) (a )?bomb\b)",
r"\b(child (sexual abuse|pornography))\b",
r"\bsocial security number[s]?:?\s+\d{3}[-\s]?\d{2}[-\s]?\d{4}\b",
r"\bcredit card\s+\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b",
r"\b(step.{0,20}step.{0,50}(kill|murder|harm|poison))\b",
]

SAFETY_SYSTEM_PROMPT = """You are a content safety classifier for LLM training data.

Review this instruction and response. Check if either contains:
- Violence, self-harm instructions, or graphic content
- Hate speech or discriminatory content targeting protected groups
- Sexual content involving minors or non-consensual scenarios
- Detailed instructions for illegal activities
- Personal identifying information (names+addresses, SSNs, credit cards)
- Dangerous health or safety misinformation

Respond with ONLY this JSON:
{"safe": true, "category": null}
or
{"safe": false, "category": "category_name", "confidence": 0.0-1.0}"""


def fast_safety_prefilter(text: str) -> bool:
"""
Quick regex check before expensive LLM safety classifier.

Hard-blocks patterns that are unambiguously harmful.
Returns True if text passes (appears safe), False if blocked.
"""
text_lower = text.lower()
for pattern in HARD_BLOCKED_PATTERNS:
if re.search(pattern, text_lower):
return False
return True


def llm_safety_filter(
instruction: str,
response: str,
model: str = "claude-haiku-4-5-20251001",
) -> FilterResult:
"""
LLM-based safety classification for subtle harmful content.

Run after regex prefilter to check for nuanced harmful patterns
that regexes can't reliably catch.
"""
# Fast prefilter first
combined = instruction + " " + response
if not fast_safety_prefilter(combined):
return FilterResult(False, "hard_blocked_pattern")

content = f"Instruction: {instruction[:500]}\n\nResponse: {response[:800]}"

try:
api_response = client.messages.create(
model=model,
max_tokens=100,
system=SAFETY_SYSTEM_PROMPT,
messages=[{"role": "user", "content": content}]
)

text = api_response.content[0].text.strip()
json_match = re.search(r'\{[^}]+\}', text)
if json_match:
result = json.loads(json_match.group())
if result.get("safe", True):
return FilterResult(True)
else:
return FilterResult(
False,
f"safety:{result.get('category', 'unknown')}",
score=1 - result.get("confidence", 0.5)
)
except Exception:
pass

return FilterResult(True) # Default to safe on parse failure

Full Pipeline Integration

import json
from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class FilteringStats:
total_input: int = 0
structural_rejected: int = 0
heuristic_rejected: int = 0
modality_rejected: int = 0
dedup_rejected: int = 0
quality_rejected: int = 0
safety_rejected: int = 0
final_accepted: int = 0
rejection_reasons: dict = field(default_factory=dict)

def rejection_pct(self, count: int) -> str:
if self.total_input == 0:
return "0.0%"
return f"{count/self.total_input*100:.1f}%"


class DataQualityPipeline:
"""
Complete data quality pipeline for synthetic training data.

Processes examples through 5 layers of increasing cost,
tracking rejection statistics at each layer.

Usage:
pipeline = DataQualityPipeline(quality_threshold=3.5)
clean_data = pipeline.run(raw_examples, output_path="clean.jsonl")
pipeline.print_summary()
"""

def __init__(
self,
quality_threshold: float = 3.5,
dedup_threshold: float = 0.85,
run_llm_scoring: bool = True,
run_safety_filter: bool = True,
score_sample_rate: float = 1.0,
max_scoring_workers: int = 10,
):
self.quality_threshold = quality_threshold
self.dedup_threshold = dedup_threshold
self.run_llm_scoring = run_llm_scoring
self.run_safety_filter = run_safety_filter
self.score_sample_rate = score_sample_rate
self.max_scoring_workers = max_scoring_workers
self.stats = FilteringStats()

def run(
self,
examples: list[dict],
output_path: str,
instruction_field: str = "instruction",
response_field: str = "output",
) -> list[dict]:
"""Run the complete filtering pipeline."""

self.stats.total_input = len(examples)
current = list(examples)

# Layer 1: Structural
print(f"\n[Layer 1] Structural filtering...")
current = self._apply_filter(current, instruction_field, response_field, structural_filter, "structural")
self.stats.structural_rejected = self.stats.total_input - len(current)
print(f" Remaining: {len(current):,}")

# Layer 2a: Heuristic
print(f"\n[Layer 2a] Heuristic filtering...")
n_before = len(current)
current = self._apply_filter(current, instruction_field, response_field, heuristic_filter, "heuristic")
self.stats.heuristic_rejected = n_before - len(current)
print(f" Remaining: {len(current):,}")

# Layer 2b: Modality
print(f"\n[Layer 2b] Modality filtering...")
n_before = len(current)
current = self._apply_filter(current, instruction_field, response_field, modality_filter, "modality")
self.stats.modality_rejected = n_before - len(current)
print(f" Remaining: {len(current):,}")

# Layer 3: Semantic deduplication
print(f"\n[Layer 3] Semantic deduplication (threshold={self.dedup_threshold})...")
n_before = len(current)
current, _ = semantic_deduplicate_dataset(
current,
similarity_threshold=self.dedup_threshold,
embedding_field=instruction_field,
)
self.stats.dedup_rejected = n_before - len(current)
print(f" Remaining: {len(current):,}")

# Layer 4: LLM Quality Scoring
if self.run_llm_scoring:
print(f"\n[Layer 4] LLM quality scoring (threshold={self.quality_threshold}/5)...")
n_before = len(current)
current, _ = batch_score_parallel(
current,
task_description="", # Summary only
min_overall_score=self.quality_threshold,
max_workers=self.max_scoring_workers,
instruction_field=instruction_field,
response_field=response_field,
score_sample_rate=self.score_sample_rate,
)
self.stats.quality_rejected = n_before - len(current)
print(f" Remaining: {len(current):,}")

# Layer 5: Safety
if self.run_safety_filter:
print(f"\n[Layer 5] Safety filtering...")
n_before = len(current)
safe = []
for ex in current:
result = llm_safety_filter(
ex.get(instruction_field, ""),
ex.get(response_field, "")
)
if result.passed:
safe.append(ex)
else:
reason = f"safety:{result.reason}"
self.stats.rejection_reasons[reason] = self.stats.rejection_reasons.get(reason, 0) + 1

self.stats.safety_rejected = n_before - len(safe)
current = safe
print(f" Remaining: {len(current):,}")

self.stats.final_accepted = len(current)

# Save output
with open(output_path, "w") as f:
for ex in current:
f.write(json.dumps(ex) + "\n")

self.print_summary()
return current

def _apply_filter(
self,
examples: list[dict],
instruction_field: str,
response_field: str,
filter_fn,
filter_name: str
) -> list[dict]:
"""Apply a filter function and track rejection reasons."""
passed = []
for ex in examples:
result = filter_fn(
ex.get(instruction_field, ""),
ex.get(response_field, "")
)
if result.passed:
passed.append(ex)
else:
key = f"{filter_name}:{result.reason}"
self.stats.rejection_reasons[key] = self.stats.rejection_reasons.get(key, 0) + 1
return passed

def print_summary(self):
"""Print filtering pipeline summary."""
s = self.stats
total = s.total_input
print(f"\n{'='*55}")
print(f"DATA QUALITY PIPELINE SUMMARY")
print(f"{'='*55}")
print(f"Input: {total:>10,}")
print(f"Structural rejects: {s.structural_rejected:>10,} ({s.rejection_pct(s.structural_rejected)})")
print(f"Heuristic rejects: {s.heuristic_rejected:>10,} ({s.rejection_pct(s.heuristic_rejected)})")
print(f"Modality rejects: {s.modality_rejected:>10,} ({s.rejection_pct(s.modality_rejected)})")
print(f"Dedup rejects: {s.dedup_rejected:>10,} ({s.rejection_pct(s.dedup_rejected)})")
print(f"Quality rejects: {s.quality_rejected:>10,} ({s.rejection_pct(s.quality_rejected)})")
print(f"Safety rejects: {s.safety_rejected:>10,} ({s.rejection_pct(s.safety_rejected)})")
print(f"{'─'*45}")
print(f"Final accepted: {s.final_accepted:>10,} ({s.rejection_pct(s.final_accepted)})")
print(f"\nTop 10 rejection reasons:")
sorted_reasons = sorted(s.rejection_reasons.items(), key=lambda x: x[1], reverse=True)
for reason, count in sorted_reasons[:10]:
print(f" {reason}: {count:,}")

Domain-Specific Quality Signals

Different domains require different quality checks:

import ast
import re


def code_quality_filter(instruction: str, response: str) -> FilterResult:
"""Domain-specific quality filter for coding training examples."""
# Is this a coding task?
coding_keywords = [
"function", "implement", "code", "python", "javascript",
"algorithm", "class", "def ", "sort", "search", "write a"
]
is_coding_task = any(kw in instruction.lower() for kw in coding_keywords)

if not is_coding_task:
return FilterResult(True) # Not a coding task, skip

# Coding tasks must have code blocks
if "```" not in response and "def " not in response and "class " not in response:
return FilterResult(False, "coding_task_missing_code")

# Validate Python syntax for extracted code blocks
code_blocks = re.findall(r'```(?:python)?\n(.*?)```', response, re.DOTALL)
for code in code_blocks:
if code.strip() and len(code.strip().split('\n')) > 2:
try:
ast.parse(code) # Syntax check
except SyntaxError as e:
return FilterResult(False, f"syntax_error:{str(e)[:50]}")

return FilterResult(True)


def math_quality_filter(instruction: str, response: str) -> FilterResult:
"""Domain-specific quality filter for math training examples."""
math_keywords = [
"calculate", "solve", "compute", "find the", "what is",
"how many", "percentage", "probability", "area", "volume"
]
is_math_task = any(kw in instruction.lower() for kw in math_keywords)

if not is_math_task:
return FilterResult(True)

# Math responses should show step-by-step work
step_indicators = ["step", "first", "then", "therefore", "=", "so", "because", "since"]
step_count = sum(1 for ind in step_indicators if ind in response.lower())

if step_count < 3 and len(response.split()) < 50:
return FilterResult(False, "math_missing_step_by_step")

return FilterResult(True)

Diversity Measurement

After filtering, verify the resulting dataset is actually diverse:

import numpy as np


def measure_dataset_diversity(
examples: list[dict],
instruction_field: str = "instruction",
n_clusters: int = 50,
) -> dict:
"""
Comprehensive diversity analysis of a filtered dataset.

Reports: topic Gini coefficient, topic coverage, response length distribution.
A well-filtered dataset should have: Gini < 0.3, all topics covered, varied lengths.
"""
try:
from sentence_transformers import SentenceTransformer
from sklearn.cluster import MiniBatchKMeans
except ImportError:
return {"error": "sentence-transformers and scikit-learn required"}

texts = [ex.get(instruction_field, "") for ex in examples]
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(texts, normalize_embeddings=True, show_progress_bar=True)

kmeans = MiniBatchKMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings)

unique, counts = np.unique(labels, return_counts=True)

# Gini coefficient (0 = perfectly equal, 1 = maximally unequal)
c = np.array(sorted(counts), dtype=float)
c /= c.sum()
n = len(c)
gini = (2 * np.sum(np.arange(1, n+1) * c) - (n + 1)) / n

# Response length distribution
response_lengths = [len(ex.get("output", "").split()) for ex in examples]

return {
"n_examples": len(examples),
"topic_gini": round(float(gini), 4),
"n_topics_covered": len(unique),
"largest_topic_pct": round(float(max(counts)) / len(examples) * 100, 2),
"smallest_topic_pct": round(float(min(counts)) / len(examples) * 100, 4),
"avg_response_words": round(float(np.mean(response_lengths)), 1),
"p50_response_words": int(np.percentile(response_lengths, 50)),
"p90_response_words": int(np.percentile(response_lengths, 90)),
"interpretation": (
"Well-balanced dataset - proceed to training" if gini < 0.3
else "Moderately imbalanced - consider cluster-capping" if gini < 0.5
else "Severely imbalanced - add topic-spread generation before filtering"
)
}

Common Mistakes

:::danger The "More Data Is Always Better" Fallacy Adding low-quality examples to a high-quality dataset consistently makes fine-tuned models worse. This is counterintuitive - shouldn't more data help? The answer is: fine-tuning doesn't average over data quality, it trains until loss converges. Low-quality examples add gradient noise that interferes with learning from high-quality examples. Run the experiment yourself: train with 10K high-quality examples and with 100K mixed-quality examples. In most cases with models under 13B parameters, 10K wins on every benchmark. Prove it to yourself before scaling. :::

:::warning Don't Filter Too Aggressively on Length It's tempting to filter out short responses (under 50 words) because long, detailed responses look better. But many valid instructions have correct short answers: "What is the capital of France?" → "Paris." Filtering by length alone destroys factual recall tasks. Filter by length relative to instruction complexity instead: a 5-word response is fine for a 5-word question, problematic for a 50-word reasoning question. Use the instruction word count as a divisor when setting length thresholds. :::

:::tip Build a Labeled Quality Test Set Before Filtering Before running filtering on your full dataset, manually label 500-1000 examples as high/medium/low quality. Use this as a test set to calibrate your filtering thresholds. Measure: precision (are my "high quality" keeps actually good?) and recall (am I discarding too many good examples?). This prevents over-filtering - a common mistake that leaves too few examples to fine-tune effectively. Target: 75%+ precision on the high-quality bucket, and validate that your test-set distribution matches your full dataset. :::

:::info The Cost of Each Layer Rough cost per 100K examples: Layer 1 (structural): 0.Layer2(heuristic):0. Layer 2 (heuristic): 0. Layer 3 (dedup): 0.50incompute(embeddingmodel).Layer4(LLMqualityscoringwithHaiku): 0.50 in compute (embedding model). Layer 4 (LLM quality scoring with Haiku): ~25 (assuming 500 tokens per example). Layer 5 (LLM safety with Haiku): ~10.Total: 10. Total: ~35 for 100K examples. This is why you run cheap layers first - you reduce the number of expensive Layer 4/5 calls by 30-40% just with the free structural and heuristic layers. :::

Interview Q&A

Q: Why does data quality matter more than data quantity for small model fine-tuning?

Small models have limited representational capacity. During fine-tuning, they must update weights to minimize loss on the training distribution. With noisy data, the model tries to fit both signal (the correct patterns) and noise (errors, inconsistencies, hallucinations) simultaneously. Because noise is high-entropy (random, inconsistent), fitting it requires more parameters than fitting clean signal. Small models don't have spare parameters for noise accommodation - they sacrifice signal fitting to accommodate noise. Large models (70B+) have enough redundant capacity to partially tolerate noise. The Phi paper quantified this: 7B tokens of textbook-quality data produced a 1.3B model that beat models trained on 100x more internet data. Quality ROI is highest for the smallest models.

Q: What is semantic deduplication and why is ROUGE-L not sufficient?

ROUGE-L measures lexical overlap - longest common subsequence of words. Two semantically identical instructions with different wording ("Classify this email as spam" vs. "Determine whether this message is spam") have low ROUGE-L but are semantic duplicates - training on both wastes capacity. Semantic deduplication uses sentence embeddings (dense vectors capturing meaning) and removes examples whose embedding cosine similarity exceeds a threshold (~0.85). This catches semantically similar examples regardless of wording. The cost: you need to run an embedding model over your full dataset (fast with sentence-transformers) and do approximate nearest-neighbor search. In practice, semantic dedup removes 10-20% more examples than ROUGE-L alone - reducing effective dataset size but increasing effective diversity.

Q: How would you design a quality scoring system for coding-specific training data?

Multi-signal scoring with automated verification: (1) Syntax validity - parse extracted code with ast.parse() for Python, acorn for JavaScript. Any syntax error is an automatic reject - this is the highest-value cheap check. (2) Test execution - if the example includes test cases (which high-quality coding examples should), run them and check pass rate. A response that passes 0/3 test cases is definitionally wrong. (3) Complexity analysis presence - check if the response includes time/space complexity discussion. Responses without complexity analysis are lower quality for learning from. (4) Code style - check for docstrings, meaningful variable names, appropriate comments using lightweight static analysis. (5) LLM judge - ask a capable model "Is this code correct, well-documented, and does it handle edge cases?" and score 1-5. Weight automated verification highest (objective truth) and LLM judge for subjective quality. The critical insight: automated verification catches objective correctness; LLM judge catches educational quality.

Q: What's the difference between filtering for quality and filtering for alignment, and why do you need both?

Quality filtering ensures examples are accurate, clear, and well-formatted - they provide good training signal for the task. A high-quality example teaches the model to do the task well. Alignment filtering ensures examples don't teach harmful behaviors - the response is safe, appropriate, and doesn't propagate dangerous misinformation. A response can be high quality but misaligned (a well-written, accurate guide to making malware - high quality as training signal, catastrophically misaligned). Conversely, a response can be misaligned but low quality (a garbled, incoherent harmful response). You need both filters independently: quality filtering alone leaves harmful examples in if they happen to be well-written. Alignment filtering alone doesn't improve accuracy or format. In practice, run quality filtering first (reduces volume, making alignment filtering cheaper to run), then alignment filtering last.

Q: How do you validate that your filtered dataset actually improves fine-tuning results?

Dataset validation runs before full fine-tuning. Process: (1) Create a held-out evaluation set (500 examples) manually labeled by domain experts - this is your ground truth, never used for training. (2) Fine-tune on a 10-20% sample of your full dataset using the same hyperparameters as your planned full run. (3) Evaluate the fine-tuned model on your held-out set and compare against: (a) the base model without fine-tuning, (b) a model fine-tuned on the unfiltered dataset (same volume). If filtering helps, filtered model outperforms unfiltered model and base model. (4) Run quality threshold ablations: try quality threshold 3.0 vs 3.5 vs 4.0/5. Measure the tradeoff between dataset size and model quality. (5) Automate as CI/CD: every change to filtering logic triggers a small-scale fine-tune + eval run to catch regressions before they affect production.

© 2026 EngineersOfAI. All rights reserved.