Skip to main content

Python Request-Response Lifecycle: Practice Problems & Exercises

Practice: Request-Response Lifecycle

11 problems4 Easy4 Medium3 Hard50–70 min
← Back to lesson

#1Minimal WSGI ApplicationEasy
WSGIcallableenvironstart_response

Implement the minimal WSGI callable protocol that handles two routes.

Solution
def simple_wsgi_app(environ, start_response):
path = environ.get('PATH_INFO', '/')
if path == '/':
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b'Hello, WSGI!']
else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']

def call_wsgi(app, path='/', method='GET'):
status_holder = []
def start_response(status, headers):
status_holder.append(status)
environ = {'PATH_INFO': path, 'REQUEST_METHOD': method}
body = b''.join(app(environ, start_response))
return status_holder[0], body.decode()

status, body = call_wsgi(simple_wsgi_app, '/')
print(f"{status}: {body}")

status, body = call_wsgi(simple_wsgi_app, '/other')
print(f"{status}: {body}")
def simple_wsgi_app(environ, start_response):
  """A minimal WSGI application.
  Respond with 200 OK and 'Hello, WSGI!' for GET /
  Respond with 404 and 'Not Found' for any other path.
  Content-Type should be text/plain.
  start_response(status, headers) sets the response status and headers.
  Return an iterable of byte strings.
  """
  pass

# Minimal WSGI test harness
def call_wsgi(app, path='/', method='GET'):
  status_holder = []
  def start_response(status, headers):
      status_holder.append(status)
  environ = {'PATH_INFO': path, 'REQUEST_METHOD': method}
  body = b''.join(app(environ, start_response))
  return status_holder[0], body.decode()

status, body = call_wsgi(simple_wsgi_app, '/')
print(f"{status}: {body}")

status, body = call_wsgi(simple_wsgi_app, '/other')
print(f"{status}: {body}")
Expected Output
200 OK: Hello, WSGI!
404 Not Found: Not Found
Hints

Hint 1: Check environ['PATH_INFO'] to determine which route to serve.

Hint 2: Call start_response('200 OK', [('Content-Type', 'text/plain')]) before returning the body.

Hint 3: Return a list of byte strings: [b'Hello, WSGI!']


#2Parse CORS Request HeadersEasy
CORSOriginpreflightheaders

Implement CORS header checking for regular and preflight requests.

Solution
def check_cors(request_headers, allowed_origins):
origin = request_headers.get('Origin', '')
method = request_headers.get('Method', '')
acr_method = request_headers.get('Access-Control-Request-Method')
is_preflight = method == 'OPTIONS' and acr_method is not None
allowed = origin in allowed_origins

response_headers = {}
if allowed:
response_headers['Access-Control-Allow-Origin'] = origin
if is_preflight:
response_headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response_headers['Access-Control-Allow-Headers'] = request_headers.get(
'Access-Control-Request-Headers', 'Content-Type')

return {'allowed': allowed, 'is_preflight': is_preflight, 'response_headers': response_headers}

r1 = check_cors(
{'Origin': 'https://app.example.com', 'Method': 'GET'},
allowed_origins=['https://app.example.com', 'https://admin.example.com']
)
print(f"allowed={r1['allowed']}, preflight={r1['is_preflight']}")
print(f"ACAO: {r1['response_headers'].get('Access-Control-Allow-Origin')}")

r2 = check_cors(
{'Origin': 'https://app.example.com', 'Method': 'OPTIONS',
'Access-Control-Request-Method': 'POST',
'Access-Control-Request-Headers': 'Content-Type'},
allowed_origins=['https://app.example.com']
)
print(f"preflight: {r2['is_preflight']}, methods: {r2['response_headers'].get('Access-Control-Allow-Methods')}")

r3 = check_cors({'Origin': 'https://evil.com', 'Method': 'GET'}, ['https://app.example.com'])
print(f"evil allowed={r3['allowed']}")
def check_cors(request_headers, allowed_origins):
  """Check CORS policy for a request.
  Returns a dict with:
  - 'allowed': bool
  - 'is_preflight': bool (OPTIONS + Access-Control-Request-Method present)
  - 'response_headers': dict of CORS headers to add to the response
  If origin is in allowed_origins, set Access-Control-Allow-Origin to that origin.
  For preflight, also add Access-Control-Allow-Methods and Access-Control-Allow-Headers.
  """
  pass

# Regular request from allowed origin
r1 = check_cors(
  {'Origin': 'https://app.example.com', 'Method': 'GET'},
  allowed_origins=['https://app.example.com', 'https://admin.example.com']
)
print(f"allowed={r1['allowed']}, preflight={r1['is_preflight']}")
print(f"ACAO: {r1['response_headers'].get('Access-Control-Allow-Origin')}")

# Preflight request
r2 = check_cors(
  {'Origin': 'https://app.example.com', 'Method': 'OPTIONS',
   'Access-Control-Request-Method': 'POST',
   'Access-Control-Request-Headers': 'Content-Type'},
  allowed_origins=['https://app.example.com']
)
print(f"preflight: {r2['is_preflight']}, methods: {r2['response_headers'].get('Access-Control-Allow-Methods')}")

# Disallowed origin
r3 = check_cors({'Origin': 'https://evil.com', 'Method': 'GET'}, ['https://app.example.com'])
print(f"evil allowed={r3['allowed']}")
Expected Output
allowed=True, preflight=False
ACAO: https://app.example.com
preflight: True, methods: GET, POST, PUT, DELETE, OPTIONS
evil allowed=False
Hints

Hint 1: A preflight request uses the OPTIONS method AND includes Access-Control-Request-Method.

Hint 2: Check if the Origin header value is in allowed_origins.

Hint 3: For preflight responses, include Access-Control-Allow-Methods and Access-Control-Allow-Headers.


#3HTTP Response Cache-Control BuilderEasy
cachingCache-ControlETagresponse headers

Build a Cache-Control header factory that returns correct caching directives for different strategies.

Solution
def build_cache_headers(strategy):
strategies = {
'no-cache': {'Cache-Control': 'no-cache, must-revalidate'},
'no-store': {'Cache-Control': 'no-store'},
'public-immutable':{'Cache-Control': 'public, max-age=31536000, immutable'},
'private-short': {'Cache-Control': 'private, max-age=300'},
'revalidate': {'Cache-Control': 'public, max-age=3600, must-revalidate'},
}
return strategies.get(strategy, {})

for strategy in ['no-cache', 'no-store', 'public-immutable', 'private-short', 'revalidate']:
headers = build_cache_headers(strategy)
print(f"{strategy}: {headers.get('Cache-Control')}")
def build_cache_headers(strategy):
  """Build Cache-Control and related headers for a given caching strategy.
  strategies:
  - 'no-cache': must revalidate every time
  - 'no-store': never cache (sensitive data)
  - 'public-immutable': CDN can cache forever (static assets)
  - 'private-short': user-specific, cache 5 minutes
  - 'revalidate': cache 1 hour, must revalidate after
  Return a dict of header names to values.
  """
  pass

for strategy in ['no-cache', 'no-store', 'public-immutable', 'private-short', 'revalidate']:
  headers = build_cache_headers(strategy)
  print(f"{strategy}: {headers.get('Cache-Control')}")
Expected Output
no-cache: no-cache, must-revalidate
no-store: no-store
public-immutable: public, max-age=31536000, immutable
private-short: private, max-age=300
revalidate: public, max-age=3600, must-revalidate
Hints

Hint 1: Use a dictionary mapping strategy names to Cache-Control values.

Hint 2: 31536000 seconds = 1 year (standard for immutable assets).

Hint 3: max-age=300 = 5 minutes; max-age=3600 = 1 hour.


#4Cookie Parser and SetterEasy
cookiesSet-CookieCookie headerparsing

Implement a Cookie header parser and a Set-Cookie header builder.

Solution
def parse_cookie_header(cookie_header):
result = {}
for part in cookie_header.split('; '):
if '=' in part:
name, _, value = part.partition('=')
result[name.strip()] = value.strip()
return result

def build_set_cookie(name, value, max_age=None, http_only=True, secure=True, same_site='Lax'):
parts = [f"{name}={value}"]
if max_age is not None:
parts.append(f"Max-Age={max_age}")
if http_only:
parts.append("HttpOnly")
if secure:
parts.append("Secure")
parts.append(f"SameSite={same_site}")
return '; '.join(parts)

cookies = parse_cookie_header("session=abc123; user_id=42; theme=dark")
print(cookies)

sc = build_set_cookie('session', 'xyz789', max_age=3600)
print(sc)

sc2 = build_set_cookie('guest', 'true', http_only=False, secure=False, same_site='None')
print(sc2)
def parse_cookie_header(cookie_header):
  """Parse a Cookie request header into a dict.
  Input: "session=abc123; user_id=42; theme=dark"
  Output: {"session": "abc123", "user_id": "42", "theme": "dark"}
  """
  pass

def build_set_cookie(name, value, max_age=None, http_only=True, secure=True, same_site='Lax'):
  """Build a Set-Cookie header value string.
  Include Max-Age if provided, always add HttpOnly if True, Secure if True, SameSite.
  """
  pass

cookies = parse_cookie_header("session=abc123; user_id=42; theme=dark")
print(cookies)

sc = build_set_cookie('session', 'xyz789', max_age=3600)
print(sc)

sc2 = build_set_cookie('guest', 'true', http_only=False, secure=False, same_site='None')
print(sc2)
Expected Output
{'session': 'abc123', 'user_id': '42', 'theme': 'dark'}
session=xyz789; Max-Age=3600; HttpOnly; Secure; SameSite=Lax
guest=true; SameSite=None
Hints

Hint 1: Split the Cookie header on '; ' to get individual name=value pairs.

Hint 2: For Set-Cookie, start with 'name=value', then append optional attributes.

Hint 3: Only append 'HttpOnly' and 'Secure' strings (no value) when their flags are True.


#5Content NegotiationMedium
content negotiationAccept headermedia typequality factor

Implement HTTP content negotiation by parsing the Accept header and selecting the best matching type.

Solution
def negotiate_content_type(accept_header, available_types):
preferences = []
for entry in accept_header.split(','):
entry = entry.strip()
if ';q=' in entry:
media_type, _, q_str = entry.partition(';q=')
quality = float(q_str)
else:
media_type = entry
quality = 1.0
preferences.append((media_type.strip(), quality))

preferences.sort(key=lambda x: -x[1])

for media_type, quality in preferences:
if media_type == '*/*':
return available_types[0] if available_types else None
main_type = media_type.split('/')[0]
if media_type.endswith('/*'):
for avail in available_types:
if avail.startswith(main_type + '/'):
return avail
elif media_type in available_types:
return media_type

return None

available = ['application/json', 'text/html', 'text/plain']

tests = [
('text/html, application/json;q=0.9, */*;q=0.5', 'text/html'),
('application/json', 'application/json'),
('text/xml, application/xml', None),
('*/*', 'application/json'),
('application/json;q=0.5, text/html;q=0.9', 'text/html'),
]

for accept, expected in tests:
result = negotiate_content_type(accept, available)
status = 'PASS' if result == expected else 'FAIL'
print(f"{status}: '{accept[:40]}' -> {result}")
def negotiate_content_type(accept_header, available_types):
  """Select the best Content-Type based on the Accept header and what's available.
  Parse quality factors: 'application/json;q=0.9, text/html;q=1.0, */*;q=0.1'
  Return the available type with the highest matching quality factor.
  Return None if no match (should result in 406 Not Acceptable).
  """
  pass

available = ['application/json', 'text/html', 'text/plain']

tests = [
  ('text/html, application/json;q=0.9, */*;q=0.5', 'text/html'),
  ('application/json', 'application/json'),
  ('text/xml, application/xml', None),
  ('*/*', 'application/json'),
  ('application/json;q=0.5, text/html;q=0.9', 'text/html'),
]

for accept, expected in tests:
  result = negotiate_content_type(accept, available)
  status = 'PASS' if result == expected else 'FAIL'
  print(f"{status}: '{accept[:40]}' -> {result}")
Expected Output
PASS: 'text/html, application/json;q=0.9, */*;q=0.5' -> text/html
PASS: 'application/json' -> application/json
PASS: 'text/xml, application/xml' -> None
PASS: '*/*' -> application/json
PASS: 'application/json;q=0.5, text/html;q=0.9' -> text/html
Hints

Hint 1: Split the Accept header on ',' to get each media type entry.

Hint 2: Each entry may have ';q=0.x' — default quality is 1.0 if absent.

Hint 3: For '*/*', match any available type; for 'type/*', match any subtype of that type.


#6gzip Response CompressionMedium
gzipcompressionContent-EncodingAccept-Encoding

Implement conditional gzip compression based on the client's Accept-Encoding header.

Solution
import gzip
import json

def compress_response(body_str, request_headers):
accept_encoding = request_headers.get('Accept-Encoding', '')
if 'gzip' in accept_encoding:
compressed = gzip.compress(body_str.encode('utf-8'))
return compressed, {'Content-Encoding': 'gzip', 'Vary': 'Accept-Encoding'}
return body_str.encode('utf-8'), {}

data = {'users': [{'id': i, 'name': f'User {i}'} for i in range(100)]}
body = json.dumps(data)

compressed, headers = compress_response(body, {'Accept-Encoding': 'gzip, deflate, br'})
print(f"gzip headers: {headers}")
print(f"compressed smaller: {len(compressed) < len(body.encode())}")
decompressed = json.loads(gzip.decompress(compressed))
print(f"round-trip ok: {len(decompressed['users']) == 100}")

raw, headers2 = compress_response(body, {'Accept-Encoding': 'identity'})
print(f"no-gzip headers: {headers2}")
print(f"raw is original: {raw == body.encode()}")
import gzip
import json

def compress_response(body_str, request_headers):
  """Compress the response body if the client accepts gzip.
  Check Accept-Encoding header for 'gzip'.
  If supported: return (compressed_bytes, {'Content-Encoding': 'gzip', 'Vary': 'Accept-Encoding'})
  If not supported: return (body_str.encode(), {})
  """
  pass

data = {'users': [{'id': i, 'name': f'User {i}'} for i in range(100)]}
body = json.dumps(data)

# Client supports gzip
compressed, headers = compress_response(body, {'Accept-Encoding': 'gzip, deflate, br'})
print(f"gzip headers: {headers}")
print(f"compressed smaller: {len(compressed) < len(body.encode())}")
decompressed = json.loads(gzip.decompress(compressed))
print(f"round-trip ok: {len(decompressed['users']) == 100}")

# Client does NOT support gzip
raw, headers2 = compress_response(body, {'Accept-Encoding': 'identity'})
print(f"no-gzip headers: {headers2}")
print(f"raw is original: {raw == body.encode()}")
Expected Output
gzip headers: {'Content-Encoding': 'gzip', 'Vary': 'Accept-Encoding'}
compressed smaller: True
round-trip ok: True
no-gzip headers: {}
raw is original: True
Hints

Hint 1: Check if 'gzip' is in the Accept-Encoding header value.

Hint 2: Use gzip.compress(body_str.encode()) to compress; it returns bytes.

Hint 3: Always add 'Vary: Accept-Encoding' when compression depends on that header.


#7ETag-based Conditional RequestMedium
ETagIf-None-Match304 Not Modifiedcaching

Implement ETag-based conditional GET that returns 304 Not Modified when the resource hasn't changed.

Solution
import hashlib
import json

def handle_conditional_get(resource_data, request_headers):
serialized = json.dumps(resource_data, sort_keys=True).encode()
etag = '"' + hashlib.md5(serialized).hexdigest() + '"'

if_none_match = request_headers.get('If-None-Match')
if if_none_match == etag:
return 304, {'ETag': etag}, None

return 200, {'ETag': etag, 'Content-Type': 'application/json'}, resource_data

resource = {'id': 1, 'name': 'Alice', 'role': 'admin'}

status, headers, body = handle_conditional_get(resource, {})
etag = headers.get('ETag')
print(f"first: status={status}, etag={etag is not None}, body={body is not None}")

status2, headers2, body2 = handle_conditional_get(resource, {'If-None-Match': etag})
print(f"cached: status={status2}, body={body2}")

status3, headers3, body3 = handle_conditional_get(resource, {'If-None-Match': '"stale-etag"'})
print(f"stale: status={status3}, body={body3 is not None}")
import hashlib
import json

def handle_conditional_get(resource_data, request_headers):
  """Handle a conditional GET request using ETags.
  1. Compute ETag as MD5 hex of the serialized resource.
  2. If request has 'If-None-Match' header matching the ETag, return 304.
  3. Otherwise return 200 with body and ETag header.
  Return: (status_code, response_headers, body_or_none)
  body_or_none is None for 304 responses.
  """
  pass

resource = {'id': 1, 'name': 'Alice', 'role': 'admin'}

# First request — no conditional headers
status, headers, body = handle_conditional_get(resource, {})
etag = headers.get('ETag')
print(f"first: status={status}, etag={etag is not None}, body={body is not None}")

# Second request — correct ETag (not modified)
status2, headers2, body2 = handle_conditional_get(resource, {'If-None-Match': etag})
print(f"cached: status={status2}, body={body2}")

# Request with stale ETag
status3, headers3, body3 = handle_conditional_get(resource, {'If-None-Match': '"stale-etag"'})
print(f"stale: status={status3}, body={body3 is not None}")
Expected Output
first: status=200, etag=True, body=True
cached: status=304, body=None
stale: status=200, body=True
Hints

Hint 1: Compute ETag as '"' + hashlib.md5(json.dumps(resource, sort_keys=True).encode()).hexdigest() + '"'.

Hint 2: Compare the If-None-Match header value to the computed ETag.

Hint 3: 304 responses have no body — return None for the body.


#8Request Pipeline SimulationMedium
middleware pipelinerequest lifecyclephases

Build a request processing pipeline that runs phases in order and short-circuits on the first non-None response.

Solution
class RequestPipeline:
def __init__(self):
self.phases = []
self.executed = []

def add_phase(self, name, fn):
self.phases.append((name, fn))

def process(self, request):
for name, fn in self.phases:
self.executed.append(name)
result = fn(request)
if result is not None:
return {'phase': name, 'response': result}
return {'phase': 'completed', 'response': None}

pipeline = RequestPipeline()
pipeline.add_phase('parse_headers', lambda r: None)
pipeline.add_phase('authenticate', lambda r: {'error': 'Unauthorized'} if not r.get('token') else None)
pipeline.add_phase('authorize', lambda r: {'error': 'Forbidden'} if r.get('role') != 'admin' else None)
pipeline.add_phase('validate_body', lambda r: None)
pipeline.add_phase('handle', lambda r: {'data': 'success'})

pipeline.executed.clear()
result = pipeline.process({'path': '/admin'})
print(f"no-auth: stopped_at={result['phase']}, phases_run={pipeline.executed}")

pipeline.executed.clear()
result = pipeline.process({'path': '/admin', 'token': 'abc', 'role': 'user'})
print(f"no-admin: stopped_at={result['phase']}, phases_run={pipeline.executed}")

pipeline.executed.clear()
result = pipeline.process({'path': '/admin', 'token': 'abc', 'role': 'admin'})
print(f"success: stopped_at={result['phase']}, phases_run={pipeline.executed}")
class RequestPipeline:
  """Simulates a web framework request processing pipeline.
  Phases (in order):
  1. parse_headers
  2. authenticate
  3. authorize
  4. validate_body
  5. handle (the actual route handler)
  Each phase can short-circuit by returning a response dict.
  If it returns None, processing continues to the next phase.
  """

  def __init__(self):
      self.phases = []
      self.executed = []

  def add_phase(self, name, fn):
      self.phases.append((name, fn))

  def process(self, request):
      for name, fn in self.phases:
          self.executed.append(name)
          result = fn(request)
          if result is not None:
              return {'phase': name, 'response': result}
      return {'phase': 'completed', 'response': None}

pipeline = RequestPipeline()

pipeline.add_phase('parse_headers', lambda r: None)
pipeline.add_phase('authenticate', lambda r: {'error': 'Unauthorized'} if not r.get('token') else None)
pipeline.add_phase('authorize', lambda r: {'error': 'Forbidden'} if r.get('role') != 'admin' else None)
pipeline.add_phase('validate_body', lambda r: None)
pipeline.add_phase('handle', lambda r: {'data': 'success'})

# Unauthenticated request
pipeline.executed.clear()
result = pipeline.process({'path': '/admin'})
print(f"no-auth: stopped_at={result['phase']}, phases_run={pipeline.executed}")

# Authenticated but wrong role
pipeline.executed.clear()
result = pipeline.process({'path': '/admin', 'token': 'abc', 'role': 'user'})
print(f"no-admin: stopped_at={result['phase']}, phases_run={pipeline.executed}")

# Fully authorized
pipeline.executed.clear()
result = pipeline.process({'path': '/admin', 'token': 'abc', 'role': 'admin'})
print(f"success: stopped_at={result['phase']}, phases_run={pipeline.executed}")
Expected Output
no-auth: stopped_at=authenticate, phases_run=['parse_headers', 'authenticate']
no-admin: stopped_at=authorize, phases_run=['parse_headers', 'authenticate', 'authorize']
success: stopped_at=completed, phases_run=['parse_headers', 'authenticate', 'authorize', 'validate_body', 'handle']
Hints

Hint 1: The pipeline iterates through phases in order; if a phase returns non-None, it stops.

Hint 2: Record the phase name in self.executed before calling the function.

Hint 3: Return the phase name and response when short-circuiting.


#9Session Token ManagerHard
session managementtokensexpiryHMAC

Implement a session manager that creates and validates HMAC-signed tokens with expiry.

Solution
import hmac
import hashlib
import time
import json
import base64

class SessionManager:
def __init__(self, secret_key: str, ttl_seconds: int = 3600):
self.secret_key = secret_key.encode()
self.ttl = ttl_seconds

def create_token(self, user_id: int, role: str) -> str:
payload = {
'user_id': user_id,
'role': role,
'exp': time.time() + self.ttl,
}
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode()
sig = self._sign(payload_b64)
return f"{payload_b64}.{sig}"

def validate_token(self, token: str):
try:
payload_b64, sig = token.rsplit('.', 1)
except ValueError:
raise ValueError('Malformed token')

expected_sig = self._sign(payload_b64)
if not hmac.compare_digest(sig, expected_sig):
raise ValueError('Invalid signature')

payload = json.loads(base64.urlsafe_b64decode(payload_b64).decode())
if time.time() > payload['exp']:
raise ValueError('Token expired')
return payload

def _sign(self, payload_b64: str) -> str:
return hmac.new(self.secret_key, payload_b64.encode(), hashlib.sha256).hexdigest()

mgr = SessionManager('supersecret', ttl_seconds=3600)
token = mgr.create_token(42, 'admin')
print(f"token created: {len(token) > 10}")

user = mgr.validate_token(token)
print(f"valid: user_id={user['user_id']}, role={user['role']}")

parts = token.split('.')
tampered = parts[0] + '.' + 'invalidsig'
try:
mgr.validate_token(tampered)
print("tamper: allowed (bad)")
except ValueError as e:
print(f"tamper: blocked ({e})")

old_mgr = SessionManager('supersecret', ttl_seconds=0)
old_token = old_mgr.create_token(1, 'user')
time.sleep(0.01)
try:
old_mgr.validate_token(old_token)
print("expired: allowed (bad)")
except ValueError as e:
print(f"expired: blocked ({e})")
import hmac
import hashlib
import time
import json
import base64

class SessionManager:
  """Manages server-side sessions with signed tokens.
  Token format: base64(json_payload).base64(signature)
  Signature: HMAC-SHA256 of the payload using secret_key.
  """

  def __init__(self, secret_key: str, ttl_seconds: int = 3600):
      self.secret_key = secret_key.encode()
      self.ttl = ttl_seconds

  def create_token(self, user_id: int, role: str) -> str:
      """Create a signed session token."""
      pass

  def validate_token(self, token: str):
      """Validate a token. Return user dict or raise ValueError.
      Check: signature valid, not expired.
      """
      pass

  def _sign(self, payload_b64: str) -> str:
      sig = hmac.new(self.secret_key, payload_b64.encode(), hashlib.sha256).hexdigest()
      return sig

mgr = SessionManager('supersecret', ttl_seconds=3600)
token = mgr.create_token(42, 'admin')
print(f"token created: {len(token) > 10}")

user = mgr.validate_token(token)
print(f"valid: user_id={user['user_id']}, role={user['role']}")

# Tampered token
parts = token.split('.')
tampered = parts[0] + '.' + 'invalidsig'
try:
  mgr.validate_token(tampered)
  print("tamper: allowed (bad)")
except ValueError as e:
  print(f"tamper: blocked ({e})")

# Expired token
old_mgr = SessionManager('supersecret', ttl_seconds=0)
old_token = old_mgr.create_token(1, 'user')
time.sleep(0.01)
try:
  old_mgr.validate_token(old_token)
  print("expired: allowed (bad)")
except ValueError as e:
  print(f"expired: blocked ({e})")
Expected Output
token created: True
valid: user_id=42, role=admin
tamper: blocked (Invalid signature)
expired: blocked (Token expired)
Hints

Hint 1: Encode the payload as JSON, then base64url-encode it for the first part of the token.

Hint 2: Use hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() for the signature.

Hint 3: On validation: split on '.', verify signature, then check 'exp' field against time.time().


#10Full WSGI Middleware StackHard
WSGImiddlewarestackrequest-response

Compose three WSGI middleware layers (timing, request ID, error catching) around an inner app.

Solution
import time
import uuid

def inner_app(environ, start_response):
path = environ.get('PATH_INFO', '/')
if path == '/error':
raise RuntimeError("Simulated server error")
start_response('200 OK', [('Content-Type', 'application/json')])
return [b'{"status":"ok"}']

def timing_middleware(app):
def wsgi(environ, start_response):
start = time.time()
captured_args = {}
def capturing_start_response(status, headers):
captured_args['status'] = status
captured_args['headers'] = list(headers)
captured_args['sr'] = start_response
body = app(environ, capturing_start_response)
elapsed_ms = str(round((time.time() - start) * 1000, 2))
headers = captured_args['headers'] + [('X-Response-Time', elapsed_ms)]
captured_args['sr'](captured_args['status'], headers)
return body
return wsgi

def request_id_middleware(app):
_counter = [0]
def wsgi(environ, start_response):
_counter[0] += 1
captured_args = {}
def capturing_start_response(status, headers):
captured_args['status'] = status
captured_args['headers'] = list(headers)
captured_args['sr'] = start_response
body = app(environ, capturing_start_response)
headers = captured_args['headers'] + [('X-Request-ID', str(_counter[0]))]
captured_args['sr'](captured_args['status'], headers)
return body
return wsgi

def error_catching_middleware(app):
def wsgi(environ, start_response):
try:
return app(environ, start_response)
except Exception:
start_response('500 Internal Server Error', [('Content-Type', 'application/json')])
return [b'{"error": "Internal Server Error"}']
return wsgi

def call_wsgi(app, path='/'):
captured = {'status': None, 'headers': {}}
def start_response(status, headers):
captured['status'] = status
captured['headers'] = dict(headers)
environ = {'PATH_INFO': path, 'REQUEST_METHOD': 'GET'}
body = b''.join(app(environ, start_response))
return captured['status'], captured['headers'], body.decode()

stacked = error_catching_middleware(request_id_middleware(timing_middleware(inner_app)))

status, headers, body = call_wsgi(stacked, '/')
print(f"status: {status}")
print(f"has timing: {'X-Response-Time' in headers}")
print(f"has request-id: {'X-Request-ID' in headers}")

status2, headers2, body2 = call_wsgi(stacked, '/error')
print(f"error status: {status2}")
print(f"error body: {body2}")
import time

# The inner WSGI app
def inner_app(environ, start_response):
  path = environ.get('PATH_INFO', '/')
  if path == '/error':
      raise RuntimeError("Simulated server error")
  start_response('200 OK', [('Content-Type', 'application/json')])
  return [b'{"status":"ok"}']

# TODO: Implement these WSGI middleware wrappers:

def timing_middleware(app):
  """Wrap app to add X-Response-Time header (milliseconds as string)."""
  pass

def request_id_middleware(app):
  """Add X-Request-ID header to the response (use a counter or uuid)."""
  pass

def error_catching_middleware(app):
  """Catch exceptions, return 500 JSON: {"error": "Internal Server Error"}."""
  pass

# Stack: error_catching -> request_id -> timing -> inner_app
def call_wsgi(app, path='/'):
  captured = {'status': None, 'headers': {}}
  def start_response(status, headers):
      captured['status'] = status
      captured['headers'] = dict(headers)
  environ = {'PATH_INFO': path, 'REQUEST_METHOD': 'GET'}
  body = b''.join(app(environ, start_response))
  return captured['status'], captured['headers'], body.decode()

stacked = error_catching_middleware(request_id_middleware(timing_middleware(inner_app)))

status, headers, body = call_wsgi(stacked, '/')
print(f"status: {status}")
print(f"has timing: {'X-Response-Time' in headers}")
print(f"has request-id: {'X-Request-ID' in headers}")

status2, headers2, body2 = call_wsgi(stacked, '/error')
print(f"error status: {status2}")
print(f"error body: {body2}")
Expected Output
status: 200 OK
has timing: True
has request-id: True
error status: 500 Internal Server Error
error body: {"error": "Internal Server Error"}
Hints

Hint 1: Each middleware is a function that takes an app and returns a new WSGI callable.

Hint 2: To add headers: intercept start_response, add to the headers list, then forward.

Hint 3: For error catching: wrap the app call in try/except and call start_response manually on error.


#11HTTP Keep-Alive Connection Pool SimulatorHard
keep-aliveconnection poolingHTTP/1.1reuse

Implement an HTTP keep-alive connection pool that tracks created vs reused connections.

Solution
import time
from collections import defaultdict

class ConnectionPool:
def __init__(self, max_per_host=5, idle_timeout_s=30):
self.max_per_host = max_per_host
self.idle_timeout_s = idle_timeout_s
self._pool = defaultdict(list)
self._conn_counter = 0
self._created = 0
self._reused = 0

def get_connection(self, host: str) -> int:
self.evict_idle()
idle = self._pool[host]
if idle:
conn_id, _ = idle.pop()
self._reused += 1
return conn_id
self._conn_counter += 1
self._created += 1
return self._conn_counter

def release_connection(self, host: str, conn_id: int):
if len(self._pool[host]) < self.max_per_host:
self._pool[host].append((conn_id, time.time()))

def evict_idle(self):
now = time.time()
for host in list(self._pool.keys()):
self._pool[host] = [
(cid, ts) for cid, ts in self._pool[host]
if now - ts < self.idle_timeout_s
]

pool = ConnectionPool(max_per_host=3)

conns = []
for _ in range(3):
c = pool.get_connection('api.example.com')
conns.append(c)

for c in conns:
pool.release_connection('api.example.com', c)

for _ in range(3):
c = pool.get_connection('api.example.com')
pool.release_connection('api.example.com', c)

print(f"created: {pool._created}")
print(f"reused: {pool._reused}")
print(f"reuse_rate: {pool._reused / (pool._created + pool._reused):.0%}")
import time
import threading
from collections import defaultdict

class ConnectionPool:
  """Simulates HTTP/1.1 keep-alive connection pooling.
  Connections to the same host are reused instead of creating new ones.
  Max connections per host: max_per_host.
  Connection idle timeout: idle_timeout_s.
  """

  def __init__(self, max_per_host=5, idle_timeout_s=30):
      self.max_per_host = max_per_host
      self.idle_timeout_s = idle_timeout_s
      self._pool = defaultdict(list)  # host -> list of (conn_id, last_used)
      self._conn_counter = 0
      self._created = 0
      self._reused = 0

  def get_connection(self, host: str) -> int:
      """Return an existing connection ID if available, else create a new one."""
      pass

  def release_connection(self, host: str, conn_id: int):
      """Return connection to pool for reuse."""
      pass

  def evict_idle(self):
      """Remove connections idle longer than idle_timeout_s."""
      pass

pool = ConnectionPool(max_per_host=3)

# Simulate 5 requests to same host
conns = []
for _ in range(3):
  c = pool.get_connection('api.example.com')
  conns.append(c)

# Release all
for c in conns:
  pool.release_connection('api.example.com', c)

# Next 3 requests should reuse connections
for _ in range(3):
  c = pool.get_connection('api.example.com')
  pool.release_connection('api.example.com', c)

print(f"created: {pool._created}")
print(f"reused: {pool._reused}")
print(f"reuse_rate: {pool._reused / (pool._created + pool._reused):.0%}")
Expected Output
created: 3
reused: 3
reuse_rate: 50%
Hints

Hint 1: When getting a connection: if the pool for the host has idle connections, pop one and record as reused.

Hint 2: If no idle connections available, create a new one and increment _created.

Hint 3: release_connection appends the conn_id back to the pool with the current timestamp.

© 2026 EngineersOfAI. All rights reserved.