Skip to main content

Secure Coding Patterns - Defense in Depth

Before you read any further, study this FastAPI application and count the security issues:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(debug=True)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
return user
except Exception as e:
return {"error": str(e), "traceback": traceback.format_exc()}

There are at least six security issues in this code. By the end of this lesson, you will understand defense in depth - the practice of applying multiple, overlapping security controls so that no single failure compromises the system.

What You Will Learn

  • The defense in depth principle and why single controls fail
  • The least privilege principle applied to Python applications
  • How to configure CORS correctly in FastAPI
  • How to implement rate limiting with slowapi
  • How to set security headers (CSP, HSTS, X-Frame-Options)
  • How to audit dependencies with pip-audit and safety
  • How to run bandit for static security analysis
  • The OWASP Top 10 mapped to Python-specific patterns
  • A complete hardening checklist for FastAPI applications

Prerequisites

  • All previous lessons in this module (Lessons 01-06)
  • FastAPI middleware and dependency injection (from Intermediate course)
  • Understanding of HTTP headers and CORS
  • pip install slowapi pip-audit bandit safety

Part 1 - Defense in Depth

Defense in depth means applying security at every layer. If one layer fails, the next layer catches the attack:

No single layer is sufficient. Input validation can miss edge cases. Authentication can have bugs. Rate limiting can be bypassed. But when all layers work together, an attacker must defeat every one of them - which is exponentially harder.

Part 2 - Least Privilege Principle

Every component should have the minimum permissions required to do its job:

Database Users

-- WRONG: Application uses the admin account
-- CREATE USER app WITH SUPERUSER PASSWORD 'secret';

-- RIGHT: Separate users with minimal permissions
CREATE USER app_readonly WITH PASSWORD 'secret1';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_readonly;

CREATE USER app_readwrite WITH PASSWORD 'secret2';
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO app_readwrite;
-- Note: No DELETE, no DROP, no ALTER

CREATE USER app_migration WITH PASSWORD 'secret3';
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO app_migration;
-- Used only by Alembic migrations, never by the application

File System Permissions

import os
import stat
from pathlib import Path

def secure_file_permissions(filepath: str):
"""Set file permissions to owner-read-only."""
path = Path(filepath)
# Remove all permissions, then add owner-read-only
os.chmod(path, stat.S_IRUSR) # 0o400 - owner can read, nothing else

def secure_directory_permissions(dirpath: str):
"""Set directory permissions to owner-read-execute."""
path = Path(dirpath)
os.chmod(path, stat.S_IRUSR | stat.S_IXUSR) # 0o500

# For secret files
secure_file_permissions("/app/secrets/private_key.pem")
secure_file_permissions("/app/.env")

API Scope Restriction

from fastapi import Depends, HTTPException, status
from typing import Annotated

class CurrentUser:
def __init__(self, user_id: str, roles: list[str], scopes: list[str]):
self.user_id = user_id
self.roles = roles
self.scopes = scopes

def require_scope(*required_scopes: str):
"""Dependency that enforces fine-grained scopes."""
async def checker(
user: Annotated[CurrentUser, Depends(get_current_user)],
) -> CurrentUser:
missing = set(required_scopes) - set(user.scopes)
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required scopes: {', '.join(missing)}",
)
return user
return checker

# Endpoints with specific scope requirements
@app.get("/api/courses")
async def list_courses(
user: Annotated[CurrentUser, Depends(require_scope("courses:read"))],
):
return {"courses": [...]}

@app.post("/api/courses")
async def create_course(
user: Annotated[CurrentUser, Depends(require_scope("courses:write"))],
):
return {"message": "Created"}

@app.delete("/api/courses/{course_id}")
async def delete_course(
user: Annotated[CurrentUser, Depends(require_scope("courses:delete", "admin"))],
):
return {"message": "Deleted"}

Part 3 - CORS Configuration

Cross-Origin Resource Sharing controls which domains can make requests to your API from a browser. Misconfigured CORS is one of the most common web security mistakes:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# VULNERABLE - allows any origin with credentials
# This is NEVER correct in production
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Any website can call your API
allow_credentials=True, # Cookies are sent with requests
allow_methods=["*"],
allow_headers=["*"],
)
# An attacker's website can make authenticated requests to your API
# using the victim's cookies
danger

allow_origins=["*"] with allow_credentials=True is a critical vulnerability. It allows any website to make authenticated requests to your API. The browser sends the user's cookies, and your API processes the request as if it came from the user. This enables Cross-Site Request Forgery from any origin.

# SECURE - explicit origin whitelist
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://engineersofai.com",
"https://www.engineersofai.com",
"http://localhost:3000", # Development only - remove in production
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
expose_headers=["X-Request-ID"],
max_age=600, # Cache preflight for 10 minutes
)

Dynamic CORS for Multi-Tenant Applications

import os

def get_allowed_origins() -> list[str]:
"""Load allowed origins from environment."""
origins = os.environ.get("CORS_ALLOWED_ORIGINS", "")
if not origins:
return ["https://engineersofai.com"]
return [o.strip() for o in origins.split(",") if o.strip()]

app.add_middleware(
CORSMiddleware,
allow_origins=get_allowed_origins(),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)

# In production .env:
# CORS_ALLOWED_ORIGINS=https://engineersofai.com,https://app.engineersofai.com

Part 4 - Rate Limiting with slowapi

Rate limiting prevents abuse, brute force attacks, and denial of service:

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# Create limiter with key function
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Global rate limit: 100 requests per minute per IP
@app.get("/api/courses")
@limiter.limit("100/minute")
async def list_courses(request: Request):
return {"courses": [...]}

# Stricter limit for authentication endpoints
@app.post("/api/login")
@limiter.limit("5/minute")
async def login(request: Request):
return {"token": "..."}

# Very strict limit for password reset
@app.post("/api/password-reset")
@limiter.limit("3/hour")
async def password_reset(request: Request):
return {"message": "Reset email sent"}

# Custom key function for authenticated users
def get_user_id_or_ip(request: Request) -> str:
"""Rate limit by user ID if authenticated, otherwise by IP."""
if hasattr(request.state, "user"):
return f"user:{request.state.user.user_id}"
return get_remote_address(request)

@app.post("/api/courses")
@limiter.limit("10/hour", key_func=get_user_id_or_ip)
async def create_course(request: Request):
return {"message": "Created"}

Redis-Backed Rate Limiting (Distributed)

from slowapi import Limiter
from slowapi.util import get_remote_address
import redis

# For multi-instance deployments, use Redis as the storage backend
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379/1",
)
# All instances share the same rate limit counters
tip

Set different rate limits for different endpoint categories: public endpoints (100/min), authenticated endpoints (300/min), login/registration (5/min), password reset (3/hour). Always rate limit login endpoints aggressively to prevent credential stuffing attacks.

Part 5 - Security Headers

HTTP security headers instruct the browser to enable security features:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)

# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"

# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"

# Control referrer information
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

# Enable HSTS (HTTP Strict Transport Security)
response.headers["Strict-Transport-Security"] = (
"max-age=63072000; includeSubDomains; preload"
)

# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' https:; "
"connect-src 'self' https://api.engineersofai.com; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)

# Permissions Policy (formerly Feature-Policy)
response.headers["Permissions-Policy"] = (
"camera=(), microphone=(), geolocation=(), "
"payment=(), usb=(), magnetometer=()"
)

return response

app = FastAPI()
app.add_middleware(SecurityHeadersMiddleware)

Header Reference

HeaderPurposeRecommended Value
X-Content-Type-OptionsPrevent MIME sniffingnosniff
X-Frame-OptionsPrevent clickjackingDENY
Strict-Transport-SecurityForce HTTPSmax-age=63072000; includeSubDomains
Content-Security-PolicyControl resource loadingWhitelist specific sources
Referrer-PolicyControl referrer leakingstrict-origin-when-cross-origin
Permissions-PolicyDisable browser featuresDisable unused APIs
X-Request-IDRequest tracingUUID per request

Part 6 - Dependency Auditing with pip-audit and safety

Third-party packages are a major attack surface. A single vulnerable dependency can compromise your entire application:

pip-audit (by PyPI)

# Install
pip install pip-audit

# Audit all installed packages
pip-audit

# Example output:
# Name Version ID Fix Versions
# ---------- ------- ------------------ ------------
# cryptography 41.0.3 GHSA-jfhm-5gh6-2ck 41.0.4
# certifi 2023.7 GHSA-xqr8-7jwr-rhp 2023.7.22

# Audit with automatic fix suggestions
pip-audit --fix --dry-run

# Audit a requirements file
pip-audit -r requirements.txt

# Output as JSON for CI/CD
pip-audit --format=json --output=audit-results.json

safety

# Install
pip install safety

# Check installed packages
safety check

# Check a requirements file
safety check -r requirements.txt

# Example output:
# +============================================+
# | REPORT |
# +============================================+
# | package: django |
# | installed: 4.1.0 |
# | affected: <4.1.13 |
# | vulnerability: SQL injection in |
# | Django's admin interface |
# +============================================+

CI/CD Integration

# .gitlab-ci.yml
security-audit:
stage: test
script:
- pip install pip-audit safety bandit
- pip-audit -r requirements.txt --fail-on-audit
- safety check -r requirements.txt --full-report
- bandit -r app/ -ll # Only medium and high severity
allow_failure: false # Block merge on vulnerabilities
# pyproject.toml - pin dependencies with hashes for supply chain security
# pip install pip-tools
# pip-compile --generate-hashes requirements.in

# requirements.txt (generated by pip-compile)
# cryptography==42.0.0 \
# --hash=sha256:abc123... \
# --hash=sha256:def456...
# PyJWT==2.8.0 \
# --hash=sha256:ghi789...
note

Run pip-audit in your CI/CD pipeline on every merge request. Pin dependencies to exact versions with hash verification (--require-hashes) for production deployments. This prevents supply chain attacks where a malicious version is uploaded to PyPI.

Part 7 - Static Security Analysis with bandit

bandit is a Python static analysis tool that finds common security issues in code:

# Install
pip install bandit

# Scan your project
bandit -r app/ -ll # -ll = medium and high severity only

# Example output:
# >> Issue: [B105:hardcoded_password_string] Possible hardcoded password
# Severity: Low Confidence: Medium
# Location: app/config.py:15
# More Info: https://bandit.readthedocs.io/...
#
# >> Issue: [B608:hardcoded_sql_expressions] Possible SQL injection
# Severity: Medium Confidence: Low
# Location: app/routes/users.py:42
# More Info: https://bandit.readthedocs.io/...
#
# >> Issue: [B603:subprocess_without_shell_equals_true] Starting a process
# without a shell
# Severity: Low Confidence: High
# Location: app/utils/dns.py:8

# Generate HTML report
bandit -r app/ -f html -o bandit-report.html

# Exclude specific rules
bandit -r app/ --skip B101 # Skip assert warnings

# Scan only specific severity
bandit -r app/ -ll --severity-level medium

Common bandit Rules

RuleIssueSeverity
B101assert used for security checksLow
B105Hardcoded password stringLow
B106Hardcoded password in function argumentLow
B108Probable insecure temp file usageMedium
B301pickle usage (arbitrary code execution)Medium
B303Use of insecure hash function (MD5/SHA1)Medium
B608Possible SQL injection via string formattingMedium
B602subprocess with shell=TrueHigh
B605Starting process with a shellHigh
B701Jinja2 templates without autoescapeHigh

Configuration File

# .bandit (or in pyproject.toml)
[bandit]
exclude = tests,venv,.venv
skips = B101
targets = app

# pyproject.toml alternative
[tool.bandit]
exclude_dirs = ["tests", "venv"]
skips = ["B101"]
targets = ["app"]
# Suppressing false positives inline
import subprocess

# bandit: disable=B603
result = subprocess.run(
["git", "log", "--oneline", "-5"],
capture_output=True,
text=True,
)
# The above is safe because the command is a hardcoded list with no user input

Part 8 - OWASP Top 10 Mapped to Python

The OWASP Top 10 is the industry standard list of the most critical web application security risks. Here is how each maps to Python-specific patterns:

OWASP RiskPython PatternDefense
A01: Broken Access ControlMissing role checks, IDORFastAPI Depends(), role middleware
A02: Cryptographic FailuresMD5 for passwords, no TLSArgon2/bcrypt, TLS termination
A03: Injectionf-string SQL, shell=TrueParameterized queries, subprocess lists
A04: Insecure DesignNo rate limiting, no MFAslowapi, TOTP, architecture review
A05: Security Misconfigurationdebug=True, CORS *Pydantic settings, security headers
A06: Vulnerable ComponentsOutdated packagespip-audit, safety, dependabot
A07: Auth FailuresWeak passwords, no lockoutArgon2, account lockout, MFA
A08: Data Integrity Failurespickle deserialization, no CI checksJSON-only, signed artifacts
A09: Logging FailuresNo audit trail, secrets in logsStructured logging, SecretStr
A10: SSRFUser-controlled URLsURL validation, IP blocklist

A08 Deep Dive: Insecure Deserialization

import pickle

# VULNERABLE - pickle can execute arbitrary code
@app.post("/api/import")
async def import_data(request: Request):
body = await request.body()
data = pickle.loads(body) # ARBITRARY CODE EXECUTION
return {"imported": len(data)}

# An attacker sends a crafted pickle payload:
# import pickle, os
# class Exploit:
# def __reduce__(self):
# return (os.system, ("rm -rf /",))
# payload = pickle.dumps(Exploit())
# requests.post("http://api/import", data=payload)
import json

# FIXED - use JSON, never pickle for untrusted data
@app.post("/api/import")
async def import_data(request: Request):
body = await request.body()
try:
data = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
return {"imported": len(data)}
danger

Never use pickle.loads() on untrusted data. Pickle can execute arbitrary Python code during deserialization. This is not a theoretical risk - it is trivially exploitable. Use JSON, MessagePack, or Protocol Buffers for data interchange.

Part 9 - Secure Error Handling

Error messages can leak sensitive information about your application's internals:

# VULNERABLE - exposing internal details
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
return user
except Exception as e:
# Leaks: database schema, file paths, library versions, SQL queries
return {
"error": str(e),
"traceback": traceback.format_exc(),
}
import logging
import uuid

logger = logging.getLogger(__name__)

# FIXED - log internally, return generic message externally
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except Exception:
# Generate a correlation ID for support
error_id = str(uuid.uuid4())
# Log the full error with correlation ID
logger.exception(f"Internal error [id={error_id}]")
# Return a generic error with the correlation ID
raise HTTPException(
status_code=500,
detail=f"Internal server error. Reference: {error_id}",
)

Global Exception Handler

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI(debug=False) # ALWAYS False in production

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
error_id = str(uuid.uuid4())
logger.exception(
f"Unhandled exception [id={error_id}] "
f"path={request.url.path} method={request.method}"
)
return JSONResponse(
status_code=500,
content={
"detail": "Internal server error",
"reference": error_id,
},
)

Part 10 - Real-World: FastAPI Hardening Checklist

A complete security hardening checklist for a production FastAPI application:

"""
FastAPI Security Hardening Checklist
=====================================

Run this script to verify security configuration:
"""
import importlib
import sys

checks = []

def check(name: str, condition: bool, fix: str):
status = "PASS" if condition else "FAIL"
checks.append({"name": name, "status": status, "fix": fix})

# 1. Debug mode
from app.main import app
check(
"Debug mode disabled",
not app.debug,
"Set debug=False in FastAPI() constructor",
)

# 2. CORS configuration
cors_middleware = None
for mw in app.user_middleware:
if "CORSMiddleware" in str(mw):
cors_middleware = mw
break

if cors_middleware:
kwargs = cors_middleware.kwargs
check(
"CORS origins not wildcard",
kwargs.get("allow_origins") != ["*"],
"Set allow_origins to specific domains",
)
if kwargs.get("allow_credentials"):
check(
"CORS credentials not with wildcard origins",
kwargs.get("allow_origins") != ["*"],
"Do not use allow_credentials=True with allow_origins=['*']",
)

# 3. Security headers
check(
"Security headers middleware present",
any("SecurityHeaders" in str(mw) for mw in app.user_middleware),
"Add SecurityHeadersMiddleware",
)

# 4. Rate limiting
check(
"Rate limiter configured",
hasattr(app.state, "limiter"),
"Add slowapi rate limiter",
)

# 5. Exception handler
check(
"Global exception handler present",
Exception in app.exception_handlers,
"Add global exception handler that hides internal errors",
)

# Print results
print("\n=== FastAPI Security Audit ===\n")
for c in checks:
symbol = "[+]" if c["status"] == "PASS" else "[-]"
print(f"{symbol} {c['name']}: {c['status']}")
if c["status"] == "FAIL":
print(f" Fix: {c['fix']}")

passed = sum(1 for c in checks if c["status"] == "PASS")
total = len(checks)
print(f"\n{passed}/{total} checks passed")

The Complete Hardened Application

import os
import uuid
import logging
from typing import Annotated

from fastapi import FastAPI, Depends, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, JSONResponse

logger = logging.getLogger(__name__)

# --- Application setup ---
app = FastAPI(
title="EngineersOfAI API",
debug=False, # NEVER True in production
docs_url=None if os.environ.get("APP_ENV") == "production" else "/docs",
redoc_url=None,
openapi_url=None if os.environ.get("APP_ENV") == "production" else "/openapi.json",
)

# --- Security headers ---
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Strict-Transport-Security"] = (
"max-age=63072000; includeSubDomains"
)
response.headers["Permissions-Policy"] = (
"camera=(), microphone=(), geolocation=()"
)
# Add request ID for tracing
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
response.headers["X-Request-ID"] = request_id
return response

app.add_middleware(SecurityHeadersMiddleware)

# --- Trusted hosts ---
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
"engineersofai.com",
"*.engineersofai.com",
"localhost",
],
)

# --- CORS ---
ALLOWED_ORIGINS = os.environ.get(
"CORS_ORIGINS",
"https://engineersofai.com,https://www.engineersofai.com",
).split(",")

app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
max_age=600,
)

# --- Rate limiting ---
limiter = Limiter(
key_func=get_remote_address,
storage_uri=os.environ.get("REDIS_URL", "memory://"),
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# --- Global exception handler ---
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
error_id = str(uuid.uuid4())
logger.exception(
"Unhandled exception",
extra={
"error_id": error_id,
"path": request.url.path,
"method": request.method,
},
)
return JSONResponse(
status_code=500,
content={
"detail": "Internal server error",
"reference": error_id,
},
)

# --- Health check (unauthenticated, rate-limited) ---
@app.get("/health")
@limiter.limit("10/minute")
async def health_check(request: Request):
return {"status": "healthy"}

Part 11 - Returning to the Opening Puzzle

The original code had six security issues:

  1. debug=True - Exposes auto-reload, detailed error pages, and debug toolbars. Must be False in production.

  2. allow_origins=["*"] with allow_credentials=True - Any website can make authenticated requests to the API using the victim's cookies.

  3. allow_methods=["*"] - Allows PUT, DELETE, PATCH from any origin. Restrict to the methods your API actually uses.

  4. allow_headers=["*"] - Allows any custom header. Restrict to Authorization and Content-Type.

  5. Exposing exception details - str(e) and traceback.format_exc() leak database schema, file paths, library versions, and internal logic to attackers.

  6. No authentication check - The endpoint has no authentication dependency. Any anonymous user can access any user's data by ID (IDOR - Insecure Direct Object Reference).

Key Takeaways

  • Defense in depth: Apply security at every layer - network, rate limiting, auth, validation, queries, output
  • Least privilege: Database users, file permissions, API scopes - minimum necessary access
  • CORS: Never use allow_origins=["*"] with allow_credentials=True. Whitelist specific origins
  • Rate limiting: Apply aggressive limits to login, registration, and password reset endpoints
  • Security headers: Add HSTS, CSP, X-Frame-Options, X-Content-Type-Options to every response
  • Dependency auditing: Run pip-audit and safety in CI/CD. Pin versions with hashes
  • Static analysis: Run bandit to find hardcoded passwords, SQL injection, and unsafe deserialization
  • Error handling: Log details internally, return generic errors externally, include a correlation ID
  • Never use pickle for untrusted data - use JSON or structured formats
  • Disable debug mode, API docs, and OpenAPI spec in production
  • Security is not a feature - it is a property of every feature

Graded Practice Challenges

Level 1 - Identify the Vulnerability

Question 1: What is wrong with this error handler?

@app.exception_handler(Exception)
async def handle_error(request, exc):
return JSONResponse(
status_code=500,
content={"error": str(exc), "type": type(exc).__name__},
)
Answer

The handler exposes the exception message and type to the client. Exception messages can contain sensitive information: database connection strings (from connection errors), file paths (from FileNotFoundError), SQL queries (from SQLAlchemyError), and internal class names. Return a generic "Internal server error" message with a correlation ID instead. Log the full exception details server-side.

Question 2: A developer runs pip-audit and finds a vulnerability in cryptography==41.0.3. They add it to a "known vulnerabilities" ignore list instead of upgrading. When is this acceptable?

Answer

Almost never. The only acceptable scenario is when: (1) the vulnerability does not affect your usage of the library (e.g., a vulnerability in a feature you do not use), AND (2) upgrading breaks compatibility with a critical dependency that has no fix available, AND (3) you document the risk and set a calendar reminder to re-check. In practice, upgrade the dependency. If upgrading breaks something, fix the breakage. Security vulnerabilities in cryptographic libraries are especially critical and should never be ignored without exceptional justification.

Question 3: What OWASP Top 10 risk does this code represent?

@app.get("/api/invoices/{invoice_id}")
async def get_invoice(invoice_id: int, db):
invoice = await db.get_invoice(invoice_id)
return invoice
Answer

A01: Broken Access Control, specifically IDOR (Insecure Direct Object Reference). The endpoint does not verify that the authenticated user owns or has permission to view the requested invoice. An attacker can enumerate invoice IDs and access other users' invoices. Fix: Add an authentication dependency and verify invoice.user_id == current_user.user_id before returning the data.

Level 2 - Fix the Vulnerability

This FastAPI application has multiple security issues. Identify and fix all of them:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import pickle

app = FastAPI(debug=True)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

SECRET_KEY = "mysecretkey123"

@app.post("/api/import")
async def import_data(request):
data = pickle.loads(await request.body())
return {"count": len(data)}

@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
return user
except Exception as e:
return {"error": str(e)}
Solution
import os
import json
import uuid
import logging
from typing import Annotated

from fastapi import FastAPI, Depends, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from slowapi import Limiter
from slowapi.util import get_remote_address

logger = logging.getLogger(__name__)

# Fix 1: debug=False, hide docs in production
app = FastAPI(debug=False)

# Fix 2: Explicit CORS origins
app.add_middleware(
CORSMiddleware,
allow_origins=["https://engineersofai.com"],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type"],
)

# Fix 3: Secret from environment
SECRET_KEY = os.environ["SECRET_KEY"]

# Fix 4: Rate limiting
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

# Fix 5: JSON instead of pickle
@app.post("/api/import")
@limiter.limit("10/minute")
async def import_data(request: Request):
try:
data = json.loads(await request.body())
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
return {"count": len(data)}

# Fix 6: Auth check, IDOR prevention, safe error handling
@app.get("/api/users/{user_id}")
async def get_user(
user_id: int,
current_user: Annotated[CurrentUser, Depends(get_current_user)],
):
if current_user.user_id != str(user_id) and "admin" not in current_user.roles:
raise HTTPException(status_code=403, detail="Forbidden")
try:
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="Not found")
return user
except HTTPException:
raise
except Exception:
error_id = str(uuid.uuid4())
logger.exception(f"Error [id={error_id}]")
raise HTTPException(
status_code=500,
detail=f"Internal error. Ref: {error_id}",
)

Fixes: (1) Debug disabled. (2) Explicit CORS. (3) Secret from env. (4) Rate limiting. (5) JSON not pickle. (6) Authentication, authorization, safe errors.

Level 3 - Design a Secure System

Design a security architecture for the EngineersOfAI platform that covers:

  • A FastAPI backend serving course content, user profiles, and certificates
  • A Docusaurus frontend with user authentication
  • Keycloak as the identity provider
  • PostgreSQL for data storage
  • Deployed on AWS (ECS, RDS, CloudFront)

Create a defense-in-depth design document covering: network layer security, application layer controls, data layer protection, monitoring and alerting, incident response procedures, and compliance considerations. Include specific Python code patterns, middleware configurations, and deployment settings.

Design Hints
  1. Network: CloudFront (WAF rules, DDoS protection) -> ALB (TLS termination) -> ECS (private subnet). RDS in private subnet, no public access. Security groups: API only accepts traffic from ALB, RDS only from API.
  2. Application: All middleware from this lesson (CORS, rate limiting, security headers, trusted hosts). FastAPI with debug=False, docs disabled. Global exception handler. Structured JSON logging with correlation IDs.
  3. Authentication: Keycloak RS256 JWTs verified via JWKS. Access tokens = 15 min, refresh tokens = 7 days. Role-based access (free-user, paid-individual, admin).
  4. Data: Argon2id password hashing (even though Keycloak manages passwords). All queries via SQLAlchemy ORM. text() banned via bandit custom rule. Database user with minimal privileges.
  5. Secrets: AWS Secrets Manager for database credentials, JWT keys, API keys. Rotated every 90 days. No secrets in environment variables (use SSM Parameter Store or Secrets Manager CSI).
  6. Dependencies: pip-audit in CI. Dependabot/renovate for automated updates. Hash-pinned requirements.txt.
  7. Monitoring: CloudWatch alarms for: 5xx rate > 1%, login failure rate > 10/min, unauthorized access attempts. AWS GuardDuty for infrastructure threats.
  8. Incident response: Runbook for secret rotation, user lockout, and service isolation. Security contact in API response headers.

What's Next

Congratulations on completing Module 6 - Security Engineering. You now have the knowledge to build applications that resist real-world attacks. In the Capstone Module, you will apply everything you have learned across all modules to build a complete, production-grade Python system from scratch.

© 2026 EngineersOfAI. All rights reserved.