Secure Coding Patterns - Defense in Depth
Before you read any further, study this FastAPI application and count the security issues:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(debug=True)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
return user
except Exception as e:
return {"error": str(e), "traceback": traceback.format_exc()}
There are at least six security issues in this code. By the end of this lesson, you will understand defense in depth - the practice of applying multiple, overlapping security controls so that no single failure compromises the system.
What You Will Learn
- The defense in depth principle and why single controls fail
- The least privilege principle applied to Python applications
- How to configure CORS correctly in FastAPI
- How to implement rate limiting with slowapi
- How to set security headers (CSP, HSTS, X-Frame-Options)
- How to audit dependencies with pip-audit and safety
- How to run bandit for static security analysis
- The OWASP Top 10 mapped to Python-specific patterns
- A complete hardening checklist for FastAPI applications
Prerequisites
- All previous lessons in this module (Lessons 01-06)
- FastAPI middleware and dependency injection (from Intermediate course)
- Understanding of HTTP headers and CORS
pip install slowapi pip-audit bandit safety
Part 1 - Defense in Depth
Defense in depth means applying security at every layer. If one layer fails, the next layer catches the attack:
No single layer is sufficient. Input validation can miss edge cases. Authentication can have bugs. Rate limiting can be bypassed. But when all layers work together, an attacker must defeat every one of them - which is exponentially harder.
Part 2 - Least Privilege Principle
Every component should have the minimum permissions required to do its job:
Database Users
-- WRONG: Application uses the admin account
-- CREATE USER app WITH SUPERUSER PASSWORD 'secret';
-- RIGHT: Separate users with minimal permissions
CREATE USER app_readonly WITH PASSWORD 'secret1';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO app_readonly;
CREATE USER app_readwrite WITH PASSWORD 'secret2';
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO app_readwrite;
-- Note: No DELETE, no DROP, no ALTER
CREATE USER app_migration WITH PASSWORD 'secret3';
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO app_migration;
-- Used only by Alembic migrations, never by the application
File System Permissions
import os
import stat
from pathlib import Path
def secure_file_permissions(filepath: str):
"""Set file permissions to owner-read-only."""
path = Path(filepath)
# Remove all permissions, then add owner-read-only
os.chmod(path, stat.S_IRUSR) # 0o400 - owner can read, nothing else
def secure_directory_permissions(dirpath: str):
"""Set directory permissions to owner-read-execute."""
path = Path(dirpath)
os.chmod(path, stat.S_IRUSR | stat.S_IXUSR) # 0o500
# For secret files
secure_file_permissions("/app/secrets/private_key.pem")
secure_file_permissions("/app/.env")
API Scope Restriction
from fastapi import Depends, HTTPException, status
from typing import Annotated
class CurrentUser:
def __init__(self, user_id: str, roles: list[str], scopes: list[str]):
self.user_id = user_id
self.roles = roles
self.scopes = scopes
def require_scope(*required_scopes: str):
"""Dependency that enforces fine-grained scopes."""
async def checker(
user: Annotated[CurrentUser, Depends(get_current_user)],
) -> CurrentUser:
missing = set(required_scopes) - set(user.scopes)
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required scopes: {', '.join(missing)}",
)
return user
return checker
# Endpoints with specific scope requirements
@app.get("/api/courses")
async def list_courses(
user: Annotated[CurrentUser, Depends(require_scope("courses:read"))],
):
return {"courses": [...]}
@app.post("/api/courses")
async def create_course(
user: Annotated[CurrentUser, Depends(require_scope("courses:write"))],
):
return {"message": "Created"}
@app.delete("/api/courses/{course_id}")
async def delete_course(
user: Annotated[CurrentUser, Depends(require_scope("courses:delete", "admin"))],
):
return {"message": "Deleted"}
Part 3 - CORS Configuration
Cross-Origin Resource Sharing controls which domains can make requests to your API from a browser. Misconfigured CORS is one of the most common web security mistakes:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# VULNERABLE - allows any origin with credentials
# This is NEVER correct in production
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Any website can call your API
allow_credentials=True, # Cookies are sent with requests
allow_methods=["*"],
allow_headers=["*"],
)
# An attacker's website can make authenticated requests to your API
# using the victim's cookies
allow_origins=["*"] with allow_credentials=True is a critical vulnerability. It allows any website to make authenticated requests to your API. The browser sends the user's cookies, and your API processes the request as if it came from the user. This enables Cross-Site Request Forgery from any origin.
# SECURE - explicit origin whitelist
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://engineersofai.com",
"https://www.engineersofai.com",
"http://localhost:3000", # Development only - remove in production
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
expose_headers=["X-Request-ID"],
max_age=600, # Cache preflight for 10 minutes
)
Dynamic CORS for Multi-Tenant Applications
import os
def get_allowed_origins() -> list[str]:
"""Load allowed origins from environment."""
origins = os.environ.get("CORS_ALLOWED_ORIGINS", "")
if not origins:
return ["https://engineersofai.com"]
return [o.strip() for o in origins.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=get_allowed_origins(),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# In production .env:
# CORS_ALLOWED_ORIGINS=https://engineersofai.com,https://app.engineersofai.com
Part 4 - Rate Limiting with slowapi
Rate limiting prevents abuse, brute force attacks, and denial of service:
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# Create limiter with key function
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Global rate limit: 100 requests per minute per IP
@app.get("/api/courses")
@limiter.limit("100/minute")
async def list_courses(request: Request):
return {"courses": [...]}
# Stricter limit for authentication endpoints
@app.post("/api/login")
@limiter.limit("5/minute")
async def login(request: Request):
return {"token": "..."}
# Very strict limit for password reset
@app.post("/api/password-reset")
@limiter.limit("3/hour")
async def password_reset(request: Request):
return {"message": "Reset email sent"}
# Custom key function for authenticated users
def get_user_id_or_ip(request: Request) -> str:
"""Rate limit by user ID if authenticated, otherwise by IP."""
if hasattr(request.state, "user"):
return f"user:{request.state.user.user_id}"
return get_remote_address(request)
@app.post("/api/courses")
@limiter.limit("10/hour", key_func=get_user_id_or_ip)
async def create_course(request: Request):
return {"message": "Created"}
Redis-Backed Rate Limiting (Distributed)
from slowapi import Limiter
from slowapi.util import get_remote_address
import redis
# For multi-instance deployments, use Redis as the storage backend
limiter = Limiter(
key_func=get_remote_address,
storage_uri="redis://localhost:6379/1",
)
# All instances share the same rate limit counters
Set different rate limits for different endpoint categories: public endpoints (100/min), authenticated endpoints (300/min), login/registration (5/min), password reset (3/hour). Always rate limit login endpoints aggressively to prevent credential stuffing attacks.
Part 5 - Security Headers
HTTP security headers instruct the browser to enable security features:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Control referrer information
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Enable HSTS (HTTP Strict Transport Security)
response.headers["Strict-Transport-Security"] = (
"max-age=63072000; includeSubDomains; preload"
)
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' https:; "
"connect-src 'self' https://api.engineersofai.com; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
# Permissions Policy (formerly Feature-Policy)
response.headers["Permissions-Policy"] = (
"camera=(), microphone=(), geolocation=(), "
"payment=(), usb=(), magnetometer=()"
)
return response
app = FastAPI()
app.add_middleware(SecurityHeadersMiddleware)
Header Reference
| Header | Purpose | Recommended Value |
|---|---|---|
X-Content-Type-Options | Prevent MIME sniffing | nosniff |
X-Frame-Options | Prevent clickjacking | DENY |
Strict-Transport-Security | Force HTTPS | max-age=63072000; includeSubDomains |
Content-Security-Policy | Control resource loading | Whitelist specific sources |
Referrer-Policy | Control referrer leaking | strict-origin-when-cross-origin |
Permissions-Policy | Disable browser features | Disable unused APIs |
X-Request-ID | Request tracing | UUID per request |
Part 6 - Dependency Auditing with pip-audit and safety
Third-party packages are a major attack surface. A single vulnerable dependency can compromise your entire application:
pip-audit (by PyPI)
# Install
pip install pip-audit
# Audit all installed packages
pip-audit
# Example output:
# Name Version ID Fix Versions
# ---------- ------- ------------------ ------------
# cryptography 41.0.3 GHSA-jfhm-5gh6-2ck 41.0.4
# certifi 2023.7 GHSA-xqr8-7jwr-rhp 2023.7.22
# Audit with automatic fix suggestions
pip-audit --fix --dry-run
# Audit a requirements file
pip-audit -r requirements.txt
# Output as JSON for CI/CD
pip-audit --format=json --output=audit-results.json
safety
# Install
pip install safety
# Check installed packages
safety check
# Check a requirements file
safety check -r requirements.txt
# Example output:
# +============================================+
# | REPORT |
# +============================================+
# | package: django |
# | installed: 4.1.0 |
# | affected: <4.1.13 |
# | vulnerability: SQL injection in |
# | Django's admin interface |
# +============================================+
CI/CD Integration
# .gitlab-ci.yml
security-audit:
stage: test
script:
- pip install pip-audit safety bandit
- pip-audit -r requirements.txt --fail-on-audit
- safety check -r requirements.txt --full-report
- bandit -r app/ -ll # Only medium and high severity
allow_failure: false # Block merge on vulnerabilities
# pyproject.toml - pin dependencies with hashes for supply chain security
# pip install pip-tools
# pip-compile --generate-hashes requirements.in
# requirements.txt (generated by pip-compile)
# cryptography==42.0.0 \
# --hash=sha256:abc123... \
# --hash=sha256:def456...
# PyJWT==2.8.0 \
# --hash=sha256:ghi789...
Run pip-audit in your CI/CD pipeline on every merge request. Pin dependencies to exact versions with hash verification (--require-hashes) for production deployments. This prevents supply chain attacks where a malicious version is uploaded to PyPI.
Part 7 - Static Security Analysis with bandit
bandit is a Python static analysis tool that finds common security issues in code:
# Install
pip install bandit
# Scan your project
bandit -r app/ -ll # -ll = medium and high severity only
# Example output:
# >> Issue: [B105:hardcoded_password_string] Possible hardcoded password
# Severity: Low Confidence: Medium
# Location: app/config.py:15
# More Info: https://bandit.readthedocs.io/...
#
# >> Issue: [B608:hardcoded_sql_expressions] Possible SQL injection
# Severity: Medium Confidence: Low
# Location: app/routes/users.py:42
# More Info: https://bandit.readthedocs.io/...
#
# >> Issue: [B603:subprocess_without_shell_equals_true] Starting a process
# without a shell
# Severity: Low Confidence: High
# Location: app/utils/dns.py:8
# Generate HTML report
bandit -r app/ -f html -o bandit-report.html
# Exclude specific rules
bandit -r app/ --skip B101 # Skip assert warnings
# Scan only specific severity
bandit -r app/ -ll --severity-level medium
Common bandit Rules
| Rule | Issue | Severity |
|---|---|---|
| B101 | assert used for security checks | Low |
| B105 | Hardcoded password string | Low |
| B106 | Hardcoded password in function argument | Low |
| B108 | Probable insecure temp file usage | Medium |
| B301 | pickle usage (arbitrary code execution) | Medium |
| B303 | Use of insecure hash function (MD5/SHA1) | Medium |
| B608 | Possible SQL injection via string formatting | Medium |
| B602 | subprocess with shell=True | High |
| B605 | Starting process with a shell | High |
| B701 | Jinja2 templates without autoescape | High |
Configuration File
# .bandit (or in pyproject.toml)
[bandit]
exclude = tests,venv,.venv
skips = B101
targets = app
# pyproject.toml alternative
[tool.bandit]
exclude_dirs = ["tests", "venv"]
skips = ["B101"]
targets = ["app"]
# Suppressing false positives inline
import subprocess
# bandit: disable=B603
result = subprocess.run(
["git", "log", "--oneline", "-5"],
capture_output=True,
text=True,
)
# The above is safe because the command is a hardcoded list with no user input
Part 8 - OWASP Top 10 Mapped to Python
The OWASP Top 10 is the industry standard list of the most critical web application security risks. Here is how each maps to Python-specific patterns:
| OWASP Risk | Python Pattern | Defense |
|---|---|---|
| A01: Broken Access Control | Missing role checks, IDOR | FastAPI Depends(), role middleware |
| A02: Cryptographic Failures | MD5 for passwords, no TLS | Argon2/bcrypt, TLS termination |
| A03: Injection | f-string SQL, shell=True | Parameterized queries, subprocess lists |
| A04: Insecure Design | No rate limiting, no MFA | slowapi, TOTP, architecture review |
| A05: Security Misconfiguration | debug=True, CORS * | Pydantic settings, security headers |
| A06: Vulnerable Components | Outdated packages | pip-audit, safety, dependabot |
| A07: Auth Failures | Weak passwords, no lockout | Argon2, account lockout, MFA |
| A08: Data Integrity Failures | pickle deserialization, no CI checks | JSON-only, signed artifacts |
| A09: Logging Failures | No audit trail, secrets in logs | Structured logging, SecretStr |
| A10: SSRF | User-controlled URLs | URL validation, IP blocklist |
A08 Deep Dive: Insecure Deserialization
import pickle
# VULNERABLE - pickle can execute arbitrary code
@app.post("/api/import")
async def import_data(request: Request):
body = await request.body()
data = pickle.loads(body) # ARBITRARY CODE EXECUTION
return {"imported": len(data)}
# An attacker sends a crafted pickle payload:
# import pickle, os
# class Exploit:
# def __reduce__(self):
# return (os.system, ("rm -rf /",))
# payload = pickle.dumps(Exploit())
# requests.post("http://api/import", data=payload)
import json
# FIXED - use JSON, never pickle for untrusted data
@app.post("/api/import")
async def import_data(request: Request):
body = await request.body()
try:
data = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
return {"imported": len(data)}
Never use pickle.loads() on untrusted data. Pickle can execute arbitrary Python code during deserialization. This is not a theoretical risk - it is trivially exploitable. Use JSON, MessagePack, or Protocol Buffers for data interchange.
Part 9 - Secure Error Handling
Error messages can leak sensitive information about your application's internals:
# VULNERABLE - exposing internal details
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
return user
except Exception as e:
# Leaks: database schema, file paths, library versions, SQL queries
return {
"error": str(e),
"traceback": traceback.format_exc(),
}
import logging
import uuid
logger = logging.getLogger(__name__)
# FIXED - log internally, return generic message externally
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
except HTTPException:
raise # Re-raise HTTP exceptions as-is
except Exception:
# Generate a correlation ID for support
error_id = str(uuid.uuid4())
# Log the full error with correlation ID
logger.exception(f"Internal error [id={error_id}]")
# Return a generic error with the correlation ID
raise HTTPException(
status_code=500,
detail=f"Internal server error. Reference: {error_id}",
)
Global Exception Handler
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI(debug=False) # ALWAYS False in production
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
error_id = str(uuid.uuid4())
logger.exception(
f"Unhandled exception [id={error_id}] "
f"path={request.url.path} method={request.method}"
)
return JSONResponse(
status_code=500,
content={
"detail": "Internal server error",
"reference": error_id,
},
)
Part 10 - Real-World: FastAPI Hardening Checklist
A complete security hardening checklist for a production FastAPI application:
"""
FastAPI Security Hardening Checklist
=====================================
Run this script to verify security configuration:
"""
import importlib
import sys
checks = []
def check(name: str, condition: bool, fix: str):
status = "PASS" if condition else "FAIL"
checks.append({"name": name, "status": status, "fix": fix})
# 1. Debug mode
from app.main import app
check(
"Debug mode disabled",
not app.debug,
"Set debug=False in FastAPI() constructor",
)
# 2. CORS configuration
cors_middleware = None
for mw in app.user_middleware:
if "CORSMiddleware" in str(mw):
cors_middleware = mw
break
if cors_middleware:
kwargs = cors_middleware.kwargs
check(
"CORS origins not wildcard",
kwargs.get("allow_origins") != ["*"],
"Set allow_origins to specific domains",
)
if kwargs.get("allow_credentials"):
check(
"CORS credentials not with wildcard origins",
kwargs.get("allow_origins") != ["*"],
"Do not use allow_credentials=True with allow_origins=['*']",
)
# 3. Security headers
check(
"Security headers middleware present",
any("SecurityHeaders" in str(mw) for mw in app.user_middleware),
"Add SecurityHeadersMiddleware",
)
# 4. Rate limiting
check(
"Rate limiter configured",
hasattr(app.state, "limiter"),
"Add slowapi rate limiter",
)
# 5. Exception handler
check(
"Global exception handler present",
Exception in app.exception_handlers,
"Add global exception handler that hides internal errors",
)
# Print results
print("\n=== FastAPI Security Audit ===\n")
for c in checks:
symbol = "[+]" if c["status"] == "PASS" else "[-]"
print(f"{symbol} {c['name']}: {c['status']}")
if c["status"] == "FAIL":
print(f" Fix: {c['fix']}")
passed = sum(1 for c in checks if c["status"] == "PASS")
total = len(checks)
print(f"\n{passed}/{total} checks passed")
The Complete Hardened Application
import os
import uuid
import logging
from typing import Annotated
from fastapi import FastAPI, Depends, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, JSONResponse
logger = logging.getLogger(__name__)
# --- Application setup ---
app = FastAPI(
title="EngineersOfAI API",
debug=False, # NEVER True in production
docs_url=None if os.environ.get("APP_ENV") == "production" else "/docs",
redoc_url=None,
openapi_url=None if os.environ.get("APP_ENV") == "production" else "/openapi.json",
)
# --- Security headers ---
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
response.headers["Strict-Transport-Security"] = (
"max-age=63072000; includeSubDomains"
)
response.headers["Permissions-Policy"] = (
"camera=(), microphone=(), geolocation=()"
)
# Add request ID for tracing
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
response.headers["X-Request-ID"] = request_id
return response
app.add_middleware(SecurityHeadersMiddleware)
# --- Trusted hosts ---
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
"engineersofai.com",
"*.engineersofai.com",
"localhost",
],
)
# --- CORS ---
ALLOWED_ORIGINS = os.environ.get(
"CORS_ORIGINS",
"https://engineersofai.com,https://www.engineersofai.com",
).split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
max_age=600,
)
# --- Rate limiting ---
limiter = Limiter(
key_func=get_remote_address,
storage_uri=os.environ.get("REDIS_URL", "memory://"),
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# --- Global exception handler ---
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
error_id = str(uuid.uuid4())
logger.exception(
"Unhandled exception",
extra={
"error_id": error_id,
"path": request.url.path,
"method": request.method,
},
)
return JSONResponse(
status_code=500,
content={
"detail": "Internal server error",
"reference": error_id,
},
)
# --- Health check (unauthenticated, rate-limited) ---
@app.get("/health")
@limiter.limit("10/minute")
async def health_check(request: Request):
return {"status": "healthy"}
Part 11 - Returning to the Opening Puzzle
The original code had six security issues:
-
debug=True- Exposes auto-reload, detailed error pages, and debug toolbars. Must beFalsein production. -
allow_origins=["*"]withallow_credentials=True- Any website can make authenticated requests to the API using the victim's cookies. -
allow_methods=["*"]- Allows PUT, DELETE, PATCH from any origin. Restrict to the methods your API actually uses. -
allow_headers=["*"]- Allows any custom header. Restrict toAuthorizationandContent-Type. -
Exposing exception details -
str(e)andtraceback.format_exc()leak database schema, file paths, library versions, and internal logic to attackers. -
No authentication check - The endpoint has no authentication dependency. Any anonymous user can access any user's data by ID (IDOR - Insecure Direct Object Reference).
Key Takeaways
- Defense in depth: Apply security at every layer - network, rate limiting, auth, validation, queries, output
- Least privilege: Database users, file permissions, API scopes - minimum necessary access
- CORS: Never use
allow_origins=["*"]withallow_credentials=True. Whitelist specific origins - Rate limiting: Apply aggressive limits to login, registration, and password reset endpoints
- Security headers: Add HSTS, CSP, X-Frame-Options, X-Content-Type-Options to every response
- Dependency auditing: Run
pip-auditandsafetyin CI/CD. Pin versions with hashes - Static analysis: Run
banditto find hardcoded passwords, SQL injection, and unsafe deserialization - Error handling: Log details internally, return generic errors externally, include a correlation ID
- Never use pickle for untrusted data - use JSON or structured formats
- Disable debug mode, API docs, and OpenAPI spec in production
- Security is not a feature - it is a property of every feature
Graded Practice Challenges
Level 1 - Identify the Vulnerability
Question 1: What is wrong with this error handler?
@app.exception_handler(Exception)
async def handle_error(request, exc):
return JSONResponse(
status_code=500,
content={"error": str(exc), "type": type(exc).__name__},
)
Answer
The handler exposes the exception message and type to the client. Exception messages can contain sensitive information: database connection strings (from connection errors), file paths (from FileNotFoundError), SQL queries (from SQLAlchemyError), and internal class names. Return a generic "Internal server error" message with a correlation ID instead. Log the full exception details server-side.
Question 2: A developer runs pip-audit and finds a vulnerability in cryptography==41.0.3. They add it to a "known vulnerabilities" ignore list instead of upgrading. When is this acceptable?
Answer
Almost never. The only acceptable scenario is when: (1) the vulnerability does not affect your usage of the library (e.g., a vulnerability in a feature you do not use), AND (2) upgrading breaks compatibility with a critical dependency that has no fix available, AND (3) you document the risk and set a calendar reminder to re-check. In practice, upgrade the dependency. If upgrading breaks something, fix the breakage. Security vulnerabilities in cryptographic libraries are especially critical and should never be ignored without exceptional justification.
Question 3: What OWASP Top 10 risk does this code represent?
@app.get("/api/invoices/{invoice_id}")
async def get_invoice(invoice_id: int, db):
invoice = await db.get_invoice(invoice_id)
return invoice
Answer
A01: Broken Access Control, specifically IDOR (Insecure Direct Object Reference). The endpoint does not verify that the authenticated user owns or has permission to view the requested invoice. An attacker can enumerate invoice IDs and access other users' invoices. Fix: Add an authentication dependency and verify invoice.user_id == current_user.user_id before returning the data.
Level 2 - Fix the Vulnerability
This FastAPI application has multiple security issues. Identify and fix all of them:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import pickle
app = FastAPI(debug=True)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
SECRET_KEY = "mysecretkey123"
@app.post("/api/import")
async def import_data(request):
data = pickle.loads(await request.body())
return {"count": len(data)}
@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
try:
user = await db.get_user(user_id)
return user
except Exception as e:
return {"error": str(e)}
Solution
import os
import json
import uuid
import logging
from typing import Annotated
from fastapi import FastAPI, Depends, HTTPException, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from slowapi import Limiter
from slowapi.util import get_remote_address
logger = logging.getLogger(__name__)
# Fix 1: debug=False, hide docs in production
app = FastAPI(debug=False)
# Fix 2: Explicit CORS origins
app.add_middleware(
CORSMiddleware,
allow_origins=["https://engineersofai.com"],
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type"],
)
# Fix 3: Secret from environment
SECRET_KEY = os.environ["SECRET_KEY"]
# Fix 4: Rate limiting
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# Fix 5: JSON instead of pickle
@app.post("/api/import")
@limiter.limit("10/minute")
async def import_data(request: Request):
try:
data = json.loads(await request.body())
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
return {"count": len(data)}
# Fix 6: Auth check, IDOR prevention, safe error handling
@app.get("/api/users/{user_id}")
async def get_user(
user_id: int,
current_user: Annotated[CurrentUser, Depends(get_current_user)],
):
if current_user.user_id != str(user_id) and "admin" not in current_user.roles:
raise HTTPException(status_code=403, detail="Forbidden")
try:
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="Not found")
return user
except HTTPException:
raise
except Exception:
error_id = str(uuid.uuid4())
logger.exception(f"Error [id={error_id}]")
raise HTTPException(
status_code=500,
detail=f"Internal error. Ref: {error_id}",
)
Fixes: (1) Debug disabled. (2) Explicit CORS. (3) Secret from env. (4) Rate limiting. (5) JSON not pickle. (6) Authentication, authorization, safe errors.
Level 3 - Design a Secure System
Design a security architecture for the EngineersOfAI platform that covers:
- A FastAPI backend serving course content, user profiles, and certificates
- A Docusaurus frontend with user authentication
- Keycloak as the identity provider
- PostgreSQL for data storage
- Deployed on AWS (ECS, RDS, CloudFront)
Create a defense-in-depth design document covering: network layer security, application layer controls, data layer protection, monitoring and alerting, incident response procedures, and compliance considerations. Include specific Python code patterns, middleware configurations, and deployment settings.
Design Hints
- Network: CloudFront (WAF rules, DDoS protection) -> ALB (TLS termination) -> ECS (private subnet). RDS in private subnet, no public access. Security groups: API only accepts traffic from ALB, RDS only from API.
- Application: All middleware from this lesson (CORS, rate limiting, security headers, trusted hosts). FastAPI with
debug=False, docs disabled. Global exception handler. Structured JSON logging with correlation IDs. - Authentication: Keycloak RS256 JWTs verified via JWKS. Access tokens = 15 min, refresh tokens = 7 days. Role-based access (
free-user,paid-individual,admin). - Data: Argon2id password hashing (even though Keycloak manages passwords). All queries via SQLAlchemy ORM.
text()banned via bandit custom rule. Database user with minimal privileges. - Secrets: AWS Secrets Manager for database credentials, JWT keys, API keys. Rotated every 90 days. No secrets in environment variables (use SSM Parameter Store or Secrets Manager CSI).
- Dependencies: pip-audit in CI. Dependabot/renovate for automated updates. Hash-pinned requirements.txt.
- Monitoring: CloudWatch alarms for: 5xx rate > 1%, login failure rate > 10/min, unauthorized access attempts. AWS GuardDuty for infrastructure threats.
- Incident response: Runbook for secret rotation, user lockout, and service isolation. Security contact in API response headers.
What's Next
Congratulations on completing Module 6 - Security Engineering. You now have the knowledge to build applications that resist real-world attacks. In the Capstone Module, you will apply everything you have learned across all modules to build a complete, production-grade Python system from scratch.
