:::tip 🎮 Interactive Playground Visualize this concept: Try the Content Moderation Pipeline demo on the EngineersOfAI Playground - no code required. :::
Designing a Content Moderation System
The Scale Impossibility
YouTube processes 500 hours of video per minute. That is 720,000 hours of content per day. If you hired 10,000 human moderators working 8-hour shifts, each reviewing video in real time, they could collectively review 80,000 hours per day - 9x slower than upload rate. The content backlog would grow forever.
At Facebook's scale, 100 billion pieces of content are created per day: posts, photos, comments, videos, Stories. The number of human moderators needed to review even 1% of this content - just the flagged or highest-risk content - is tens of thousands. Meta employs approximately 15,000 content reviewers globally. That is still a tiny fraction of the volume, and most of them are dealing with the most severe categories (CSAM, imminent violence) where automated systems are insufficiently accurate and the stakes of errors are highest.
The engineering reality: for any platform at meaningful scale, the vast majority of moderation decisions must be made by automated systems. Human reviewers handle the hardest cases, the edge cases, and the appeals. Automated systems handle the volume. The design challenge is building automated systems that are accurate enough to make this work: low enough false positive rate that legitimate content creators are not unfairly removed, high enough recall that harmful content is caught before it reaches a large audience.
This lesson builds the architecture for a production content moderation system that processes text, images, and video across multiple harm categories at platform scale.
Requirements
Functional requirements:
- Detect and act on policy violations: CSAM, hate speech, violence, harassment, spam, misinformation, copyright infringement
- Support text, image, video, and audio content
- Provide an appeals process for incorrectly moderated content
- Prioritize review of content with high potential reach (trending, amplified by recommendations)
Non-functional requirements:
- Latency: new content scored within 30 seconds of upload
- Throughput: 10,000 content submissions per second at peak
- Recall: category-dependent - CSAM requires near-100% recall; spam requires 90%+
- False positive rate: under 0.5% for text and images; under 1% for video (due to context complexity)
Moderation Taxonomy and Priority
Different harm categories have fundamentally different thresholds and workflows.
Multi-Modal Pipeline
Text Moderation
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModerationResult:
content_id: str
policy_category: str
confidence: float
action: str # "allow", "review", "remove", "age_restrict"
explanation: Optional[str] = None
class TextModerationClassifier:
"""
Multi-label text classifier for policy violations.
Fine-tuned from RoBERTa on platform-specific policy data.
Categories: hate_speech, harassment, spam, violence_incitement,
adult_content, misinformation, self_harm
"""
CATEGORIES = [
"hate_speech", "harassment", "spam",
"violence_incitement", "adult_content",
"misinformation", "self_harm",
]
THRESHOLDS = {
"hate_speech": 0.6,
"harassment": 0.65,
"spam": 0.5,
"violence_incitement": 0.55,
"adult_content": 0.7,
"misinformation": 0.75,
"self_harm": 0.5,
}
def __init__(self, model_path: str, max_length: int = 512):
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForSequenceClassification.from_pretrained(
model_path,
num_labels=len(self.CATEGORIES),
problem_type="multi_label_classification",
)
self.model.eval()
self.max_length = max_length
def classify(self, text: str, content_id: str) -> list:
"""
Classify text against all policy categories.
Returns list of ModerationResult for triggered categories.
"""
inputs = self.tokenizer(
text,
return_tensors="pt",
max_length=self.max_length,
truncation=True,
padding=True,
)
with torch.no_grad():
logits = self.model(**inputs).logits
probabilities = torch.sigmoid(logits).squeeze().cpu().numpy()
results = []
for category, prob in zip(self.CATEGORIES, probabilities):
threshold = self.THRESHOLDS[category]
if prob >= threshold:
action = self._determine_action(category, float(prob))
results.append(ModerationResult(
content_id=content_id,
policy_category=category,
confidence=float(prob),
action=action,
explanation=f"Model confidence: {prob:.1%}",
))
return results
def _determine_action(self, category: str, confidence: float) -> str:
"""Determine action based on category and confidence."""
if category in ("hate_speech", "violence_incitement") and confidence > 0.85:
return "remove"
if category == "self_harm":
return "review" # always human review for self-harm
if category == "spam" and confidence > 0.9:
return "remove"
if confidence > 0.8:
return "review"
return "allow" # below threshold, flag in background
class HateSpeechContextualizer:
"""
Hate speech classification requires context:
- "I love fighting" (sports) vs "I love fighting [group]" (violence)
- Counter-speech that references slurs in quotation
- Satire and news reporting on hate speech
Multi-step pipeline: classify, then context-check high-confidence results.
"""
def __init__(self, classifier: TextModerationClassifier):
self.classifier = classifier
def classify_with_context(
self,
text: str,
content_id: str,
author_context: dict,
conversation_context: Optional[list] = None,
) -> list:
"""
Classify text with contextual signals:
author_context: {"is_news_org": bool, "prior_violations": int}
conversation_context: previous messages in the thread
"""
primary_results = self.classifier.classify(text, content_id)
# Context downgrade: news organizations reporting on hate speech
# are not committing hate speech
if author_context.get("is_news_org") and author_context.get("verified"):
primary_results = [
r for r in primary_results
if r.policy_category not in ("hate_speech",)
or r.confidence > 0.95 # only act on extremely high confidence
]
# Context upgrade: repeat offenders get lower threshold
if author_context.get("prior_violations", 0) > 3:
for result in primary_results:
if result.confidence > 0.45:
result.action = "review"
return primary_results
Image Moderation
import torch
import torchvision.transforms as T
from PIL import Image
import io
import hashlib
class ImageModerationPipeline:
"""
Multi-stage image moderation:
1. PhotoDNA hash matching (CSAM, known illegal content) - instant
2. Perceptual hash matching (spam, known violations) - fast
3. Neural classifier (novel violations) - slower
"""
def __init__(self, phash_db, classifier_model):
self.phash_db = phash_db
self.classifier = classifier_model
self.transform = T.Compose([
T.Resize((224, 224)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def moderate(self, image_bytes: bytes, content_id: str) -> list:
results = []
# Stage 1: PhotoDNA hash matching for CSAM
# PhotoDNA is a Microsoft technology that computes a robust hash
# The hash is shared with NCMEC and law enforcement
# If a match is found, auto-remove immediately
csam_match = self._check_photodna(image_bytes)
if csam_match:
return [ModerationResult(
content_id=content_id,
policy_category="csam",
confidence=1.0,
action="remove",
explanation="PhotoDNA match - NCMEC database",
)]
# Stage 2: Perceptual hash for known spam/copyright
phash = self._compute_perceptual_hash(image_bytes)
phash_matches = self.phash_db.lookup(phash, threshold=5) # hamming distance
if phash_matches:
category = phash_matches[0]["category"]
results.append(ModerationResult(
content_id=content_id,
policy_category=category,
confidence=0.95,
action="remove" if category in ("csam", "known_spam") else "review",
explanation=f"Perceptual hash match: {phash_matches[0]['match_id']}",
))
# Stage 3: Neural classifier for novel violations
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
tensor = self.transform(image).unsqueeze(0)
with torch.no_grad():
logits = self.classifier(tensor)
probas = torch.softmax(logits, dim=1).squeeze().cpu().numpy()
# Classes: safe, adult, violence, graphic, hate_symbols
categories = ["safe", "adult", "violence", "graphic", "hate_symbols"]
for category, prob in zip(categories[1:], probas[1:]):
if prob > 0.5:
action = "remove" if prob > 0.85 else "review"
results.append(ModerationResult(
content_id=content_id,
policy_category=category,
confidence=float(prob),
action=action,
))
return results
def _check_photodna(self, image_bytes: bytes) -> bool:
"""
Query PhotoDNA service for CSAM hash match.
PhotoDNA computes a robust hash that survives minor edits.
In production: call Microsoft PhotoDNA API or NCMEC hash API.
"""
# Placeholder - actual PhotoDNA is a licensed Microsoft service
# integrated via SDK or API call
return False
def _compute_perceptual_hash(self, image_bytes: bytes) -> str:
"""
Compute perceptual hash (pHash) for image deduplication.
pHash is robust to minor image edits, resizing, and compression.
"""
try:
import imagehash
from PIL import Image
img = Image.open(io.BytesIO(image_bytes))
return str(imagehash.phash(img))
except Exception:
return ""
Video Moderation
Video is the hardest moderation challenge: it is large (a 10-minute video is thousands of frames), temporally structured (a violation may appear in only 2 seconds of a 10-minute video), and multimodal (you need audio, visual, and text/caption moderation).
import cv2
import numpy as np
from typing import Iterator
class VideoModerationPipeline:
"""
Efficient video moderation using frame sampling.
Full video is not processed - adaptive sampling based on motion and content.
"""
def __init__(
self,
image_moderator: ImageModerationPipeline,
audio_moderator,
sample_rate: int = 2, # frames per second
):
self.image_moderator = image_moderator
self.audio_moderator = audio_moderator
self.sample_rate = sample_rate
def moderate_video(
self,
video_path: str,
content_id: str,
max_duration_seconds: int = 600, # cap at 10 minutes
) -> dict:
"""
Moderate video by sampling frames + transcribing audio.
Returns: {
"worst_frame_score": float,
"violation_timestamps": [float],
"audio_violations": [],
"overall_decision": str,
}
"""
frame_results = []
violation_timestamps = []
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = max(1, int(fps / self.sample_rate))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
max_frames = int(max_duration_seconds * fps)
frame_idx = 0
while cap.isOpened() and frame_idx < min(total_frames, max_frames):
ret, frame = cap.read()
if not ret:
break
if frame_idx % frame_interval == 0:
# Convert frame to bytes
_, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
frame_bytes = buffer.tobytes()
# Check frame against image moderator
frame_violations = self.image_moderator.moderate(
frame_bytes, f"{content_id}_frame_{frame_idx}"
)
for v in frame_violations:
if v.confidence > 0.5:
timestamp = frame_idx / fps
frame_results.append({
"timestamp": timestamp,
"category": v.policy_category,
"confidence": v.confidence,
})
violation_timestamps.append(timestamp)
# Early stopping: if we find CSAM or severe violence, stop immediately
if any(
v.policy_category in ("csam", "violence") and v.confidence > 0.9
for v in frame_violations
):
break
frame_idx += 1
cap.release()
# Determine overall decision from worst frame
worst_score = max(
[r["confidence"] for r in frame_results], default=0.0
)
decision = (
"remove" if worst_score > 0.85
else "review" if worst_score > 0.5
else "allow"
)
return {
"worst_frame_score": worst_score,
"violation_timestamps": violation_timestamps,
"frame_violations": frame_results,
"overall_decision": decision,
}
Active Learning for Efficient Annotation
You cannot afford to label all content for training. Active learning selects the most informative examples for human annotation - examples where the model is uncertain and where labeling would improve performance most.
import numpy as np
from typing import Callable
class ModerationActiveLearner:
"""
Active learning for content moderation annotation.
Selects examples for human labeling based on model uncertainty.
"""
def score_uncertainty(self, probabilities: np.ndarray) -> float:
"""
Uncertainty sampling: prefer examples where the model is most uncertain.
For binary classification, uncertainty = 0 when p=0 or p=1, max at p=0.5.
For multi-class, use entropy.
"""
# Entropy-based uncertainty (works for multi-label too)
# Clip to avoid log(0)
p = np.clip(probabilities, 1e-7, 1 - 1e-7)
entropy = -np.sum(p * np.log(p) + (1 - p) * np.log(1 - p))
return float(entropy)
def select_for_annotation(
self,
content_batch: list,
model_predict: Callable,
n_to_annotate: int = 100,
high_reach_boost: float = 2.0,
) -> list:
"""
Select the most valuable examples for human annotation.
Combines uncertainty with reach (high-reach content matters more).
content_batch: [{"content_id": ..., "content": ..., "reach": float}]
"""
scored = []
for item in content_batch:
probs = model_predict(item["content"])
uncertainty = self.score_uncertainty(probs)
reach_score = min(item.get("reach", 1.0) / 1000000, 1.0)
# Combined score: uncertain AND high-reach content is prioritized
combined_score = uncertainty + high_reach_boost * reach_score
scored.append({
**item,
"uncertainty": uncertainty,
"reach_score": reach_score,
"annotation_priority": combined_score,
})
# Return top N by combined priority
return sorted(scored, key=lambda x: -x["annotation_priority"])[:n_to_annotate]
Human Review Queue Design
The human review queue must prioritize high-impact content and ensure reviewer wellbeing (reviewing graphic content is psychologically harmful).
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
class ReviewPriority(int, Enum):
CRITICAL = 1 # live violence, imminent threat
HIGH = 2 # high-confidence policy violation, high reach
NORMAL = 3 # medium-confidence, normal reach
LOW = 4 # low-confidence, low reach
@dataclass
class ReviewQueueItem:
content_id: str
content_type: str # "text", "image", "video"
policy_category: str
ml_confidence: float
reach_estimate: int # estimated audience if not removed
submitted_at: datetime
priority: ReviewPriority
reviewer_id: str = None # assigned when claimed
class ReviewQueueManager:
"""
Manages the human review queue with priority scheduling.
Ensures:
- Critical items reviewed within 1 hour
- Reviewer rotation to prevent burnout on severe content
- Fair distribution of content types across reviewers
"""
MAX_SEVERE_CONTENT_PER_REVIEWER_PER_SHIFT = 50
def __init__(self, redis_client):
self.redis = redis_client
def enqueue(self, item: ReviewQueueItem) -> None:
"""Add item to appropriate priority queue."""
queue_key = f"review_queue:priority_{item.priority.value}"
self.redis.zadd(
queue_key,
{item.content_id: -item.priority.value}, # negative for ascending sort
)
# Store item metadata
self.redis.hset(
f"review_item:{item.content_id}",
mapping={
"content_type": item.content_type,
"policy_category": item.policy_category,
"ml_confidence": str(item.ml_confidence),
"reach_estimate": str(item.reach_estimate),
"submitted_at": item.submitted_at.isoformat(),
"priority": str(item.priority.value),
}
)
# Set TTL: items not reviewed within SLA are escalated
self.redis.expire(f"review_item:{item.content_id}", 86400)
def claim_item(self, reviewer_id: str) -> dict:
"""Claim the highest-priority unclaimed item."""
for priority in ReviewPriority:
queue_key = f"review_queue:priority_{priority.value}"
items = self.redis.zpopmin(queue_key, 1)
if items:
content_id = items[0][0]
item_data = self.redis.hgetall(f"review_item:{content_id}")
if isinstance(content_id, bytes):
content_id = content_id.decode()
item_data = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in item_data.items()
}
item_data["content_id"] = content_id
item_data["claimed_by"] = reviewer_id
item_data["claimed_at"] = datetime.now(timezone.utc).isoformat()
return item_data
return {} # no items available
def submit_decision(
self,
content_id: str,
reviewer_id: str,
decision: str, # "remove", "allow", "age_restrict"
policy_category: str,
note: str = "",
) -> None:
"""Record reviewer decision and update ML training data."""
decision_record = {
"content_id": content_id,
"reviewer_id": reviewer_id,
"decision": decision,
"policy_category": policy_category,
"note": note,
"decided_at": datetime.now(timezone.utc).isoformat(),
}
# Store decision (used as training label for model retraining)
self.redis.lpush(
"review_decisions",
str(decision_record),
)
Adversarial Robustness
Spammers and bad actors actively try to evade moderation. Common evasion techniques and defenses:
Text evasion: using character substitutions ("h@te" for "hate"), adding zero-width spaces between characters, translating to a less-monitored language. Defense: normalize text before classification (map character variants, remove invisible characters, translate to English for common languages).
Image evasion: adding imperceptible noise (adversarial examples), overlaying patterns that fool neural networks, using filters or color shifts. Defense: train with augmented adversarial examples, use ensemble of diverse models, implement perceptual hash matching as a non-neural backstop.
Video evasion: flipping the video horizontally, adding visual noise in the first/last frames (knowing moderation samples frames), embedding policy-violating content at low frequency among benign frames. Defense: random frame sampling (adversary cannot predict which frames are checked), temporal consistency checking (flag videos where a small number of frames are flagged while the rest are clean - suspicious pattern).
import unicodedata
import re
class TextNormalizer:
"""
Normalize text to counter common evasion techniques.
"""
CHAR_SUBSTITUTIONS = {
"@": "a", "4": "a", "0": "o", "1": "i", "3": "e",
"$": "s", "5": "s", "!": "i", "|": "l", "7": "t",
}
def normalize(self, text: str) -> str:
# 1. Remove zero-width and invisible characters
text = "".join(
c for c in text
if unicodedata.category(c) not in ("Cf", "Zs", "Cc")
)
# 2. Normalize Unicode (convert visually similar characters)
text = unicodedata.normalize("NFKD", text)
text = "".join(c for c in text if not unicodedata.combining(c))
# 3. Common character substitutions (l33tspeak)
normalized = ""
for char in text.lower():
normalized += self.CHAR_SUBSTITUTIONS.get(char, char)
# 4. Collapse repeated characters ("haaate" -> "hate")
normalized = re.sub(r"(.)\1{2,}", r"\1\1", normalized)
return normalized
:::danger Over-Removal Destroys Platform Trust
False positives in moderation - removing legitimate content - are often treated as less serious than false negatives (missing policy violations). They are equally serious. Over-removal destroys trust among the creator community, chills legitimate speech (creators self-censor out of fear of being incorrectly removed), and creates a hostile environment for minority communities whose speech is disproportionately flagged (documented in multiple platform transparency reports).
Maintain a separate metric dashboard for false positive rate across demographic groups. If a particular language, community, or content category has a significantly higher false positive rate than the platform average, investigate and fix before deploying model updates. :::
:::warning Reviewer Wellbeing
Content moderators review harmful, disturbing, and traumatic material at volume. Multiple studies have documented PTSD rates of 20-40% among content reviewers who work on the most severe categories (CSAM, graphic violence, live violence). This is a genuine engineering responsibility: design systems that limit reviewer exposure to the most severe content, require mandatory breaks and rotation, provide mental health support infrastructure, and allow reviewers to choose to exclude themselves from severe content categories.
Technology can help: auto-blur or desaturate images before human review, display only thumbnails initially with click-to-expand, randomize the order to prevent numbing from pattern exposure. But these are mitigations, not solutions. Human review of severe content at scale is a harm that the platform creates and must take responsibility for. :::
Interview Q&A
Q1: How does a content moderation system handle 500 hours of video upload per minute?
No single moderation pipeline processes all 500 hours simultaneously. Instead, the system uses a priority-tiered approach. First, all content is fingerprinted (PhotoDNA for CSAM, perceptual hash for known spam) - this is essentially instant and catches known violations without any model inference. Second, all content goes through a lightweight classifier (a fast MobileNet or DistilBERT equivalent) that produces an initial risk score within seconds. Third, only content above a risk threshold enters the full pipeline (deeper classifiers, video frame sampling, audio transcription, human review queue).
The key numbers: at 500 hours/minute, you have roughly 30,000 video files per minute (assuming 1-minute average length). At 30ms for fingerprinting, 100ms for fast classification, and 2 seconds for full video scanning: fingerprinting requires 15 machines, fast classification requires 5 machines, full scanning (20% of videos) requires 100 machines. This is manageable at cloud scale. The full pipeline is asynchronous - users can post immediately, content is reviewed within 30-60 seconds, and removed retroactively if violations are found.
Q2: How do you balance precision and recall differently for different harm categories?
The precision-recall trade-off is category-specific, driven by the severity and reversibility of harm.
For CSAM: recall is paramount. A false negative (missing CSAM) means illegal content of child exploitation remains on the platform. The threshold is set for near-100% recall, accepting higher false positive rates. False positives (removing legal content) are investigated and corrected by human review.
For hate speech: more balanced. Setting the threshold too low (high recall) over-removes legitimate speech, news reporting, satire, and counter-speech. Setting it too high (high precision) misses harmful content. Typically target 90%+ precision with 70-80% recall, with human review handling the boundary cases.
For spam: precision matters more. Incorrectly blocking legitimate users is a poor experience and increases support costs. Set precision target at 99%+ and handle lower recall by allowing users to report spam that the automated system missed.
The general principle: severity of harm (CSAM vs spam), reversibility of the action (removal vs review queue), and cost of false positives (creator trust, legal liability) drive the threshold setting for each category independently.
Q3: How does active learning improve moderation model quality?
Active learning reduces annotation cost by selecting the most informative examples for human labeling. Without active learning, you might randomly sample 10,000 examples from the 99.9% benign content pool and only get 10 positive examples for training. With active learning, you select examples where the model is most uncertain - these are the decision boundary examples that, once labeled, teach the model the most.
For moderation, uncertainty sampling identifies the hardest cases: content that the model scores at 0.4-0.6 (near the decision boundary). These are the genuinely ambiguous cases - satire that resembles hate speech, graphic content in a news reporting context, violence in an educational documentary. Labeling these cases improves the model's ability to make these contextual distinctions.
Reach-weighted selection adds an important signal: prioritize labeling content that has reached many users. A borderline video that has been viewed 10 million times is more important to label correctly than one with 10 views. The active learner combines uncertainty and reach into a single priority score.
Q4: How do you detect and counter adversarial evasion attempts?
Adversarial evasion is a cat-and-mouse game. Common patterns: character substitution (l33tspeak, look-alike Unicode), image manipulation (adding noise, flipping, adding benign overlay frames), volume attacks (flooding with borderline content to overwhelm reviewers).
The defenses: (1) text normalization as a preprocessing step - map all character variants to their canonical forms before classification; (2) ensemble classifiers - a single classifier tuned against adversarial examples is more evasible than an ensemble of diverse models trained on different data and architectures; (3) behavioral signals - legitimate users do not systematically test what gets flagged; systematic probing of the moderation system is itself a signal; (4) platform-level rate limits - an account submitting 1,000 pieces of content per day is suspicious regardless of individual content quality; (5) hash matching - known violations are fingerprinted and any near-duplicate (perceptual hash distance under threshold) is immediately flagged regardless of content changes.
Q5: How does Meta's content moderation infrastructure handle 100 billion pieces of content per day?
Meta processes 100 billion pieces of content daily through a layered system. The first layer is proactive detection - before a user even posts, the platform knows the content type, size, and basic features. Text goes through lightweight n-gram classifiers, images go through perceptual hash lookup and a fast CNN. This first layer runs at submission time and handles 90%+ of decisions in under 100ms.
The second layer is distributed classification at scale. Meta uses a distributed inference infrastructure (Triton + custom serving) that runs hundreds of specialized classifiers (hate speech in 50 languages, violence, spam, NSFW, coordinated inauthentic behavior). These run asynchronously after submission and result in delayed actions (content removed hours after posting).
The third layer is human review. Meta's Community Operations team of 15,000 reviewers handles appeals, novel harm categories, and high-confidence-but-borderline cases. Reviewers specialize by category and language. High-severity categories (live violence, CSAM) have dedicated 24/7 teams.
The key architectural choice: proactive detection (before reaching many users) vs reactive detection (after reaching many users). Meta's Integrity systems prioritize proactive detection for the highest-severity categories. For lower-severity categories, reactive detection based on user reports + automated classifiers is more cost-effective.
Summary
A production content moderation system processes text, images, and video through a multi-stage pipeline: fingerprint matching (known violations, instant), fast classifiers (lightweight models for initial risk scoring, seconds), full classification (deep models for uncertain cases, minutes), and human review (edge cases, appeals, severe content). Different harm categories demand different precision/recall trade-offs: CSAM requires near-100% recall with any precision; hate speech requires balanced precision and recall informed by false positive impact on creator communities. Active learning focuses annotation effort on uncertain and high-reach content. Adversarial robustness requires text normalization, ensemble models, behavioral signals, and hash-based matching. Human reviewer wellbeing is a design requirement, not an afterthought - limit exposure duration, provide rotation, and offer mental health support for all severe content categories.
