Python Request-Response Lifecycle: Practice Problems & Exercises
Practice: Request-Response Lifecycle
← Back to lessonImplement the minimal WSGI callable protocol that handles two routes.Solution
def simple_wsgi_app(environ, start_response):
"""A minimal WSGI application.
Respond with 200 OK and 'Hello, WSGI!' for GET /
Respond with 404 and 'Not Found' for any other path.
Content-Type should be text/plain.
start_response(status, headers) sets the response status and headers.
Return an iterable of byte strings.
"""
pass
# Minimal WSGI test harness
def call_wsgi(app, path='/', method='GET'):
status_holder = []
def start_response(status, headers):
status_holder.append(status)
environ = {'PATH_INFO': path, 'REQUEST_METHOD': method}
body = b''.join(app(environ, start_response))
return status_holder[0], body.decode()
status, body = call_wsgi(simple_wsgi_app, '/')
print(f"{status}: {body}")
status, body = call_wsgi(simple_wsgi_app, '/other')
print(f"{status}: {body}")
Expected Output
200 OK: Hello, WSGI!
404 Not Found: Not FoundHints
Hint 1: Check environ['PATH_INFO'] to determine which route to serve.
Hint 2: Call start_response('200 OK', [('Content-Type', 'text/plain')]) before returning the body.
Hint 3: Return a list of byte strings: [b'Hello, WSGI!']
Implement CORS header checking for regular and preflight requests.Solution
def check_cors(request_headers, allowed_origins):
"""Check CORS policy for a request.
Returns a dict with:
- 'allowed': bool
- 'is_preflight': bool (OPTIONS + Access-Control-Request-Method present)
- 'response_headers': dict of CORS headers to add to the response
If origin is in allowed_origins, set Access-Control-Allow-Origin to that origin.
For preflight, also add Access-Control-Allow-Methods and Access-Control-Allow-Headers.
"""
pass
# Regular request from allowed origin
r1 = check_cors(
{'Origin': 'https://app.example.com', 'Method': 'GET'},
allowed_origins=['https://app.example.com', 'https://admin.example.com']
)
print(f"allowed={r1['allowed']}, preflight={r1['is_preflight']}")
print(f"ACAO: {r1['response_headers'].get('Access-Control-Allow-Origin')}")
# Preflight request
r2 = check_cors(
{'Origin': 'https://app.example.com', 'Method': 'OPTIONS',
'Access-Control-Request-Method': 'POST',
'Access-Control-Request-Headers': 'Content-Type'},
allowed_origins=['https://app.example.com']
)
print(f"preflight: {r2['is_preflight']}, methods: {r2['response_headers'].get('Access-Control-Allow-Methods')}")
# Disallowed origin
r3 = check_cors({'Origin': 'https://evil.com', 'Method': 'GET'}, ['https://app.example.com'])
print(f"evil allowed={r3['allowed']}")
Expected Output
allowed=True, preflight=False
ACAO: https://app.example.com
preflight: True, methods: GET, POST, PUT, DELETE, OPTIONS
evil allowed=FalseHints
Hint 1: A preflight request uses the OPTIONS method AND includes Access-Control-Request-Method.
Hint 2: Check if the Origin header value is in allowed_origins.
Hint 3: For preflight responses, include Access-Control-Allow-Methods and Access-Control-Allow-Headers.
Build a Cache-Control header factory that returns correct caching directives for different strategies.Solution
def build_cache_headers(strategy):
"""Build Cache-Control and related headers for a given caching strategy.
strategies:
- 'no-cache': must revalidate every time
- 'no-store': never cache (sensitive data)
- 'public-immutable': CDN can cache forever (static assets)
- 'private-short': user-specific, cache 5 minutes
- 'revalidate': cache 1 hour, must revalidate after
Return a dict of header names to values.
"""
pass
for strategy in ['no-cache', 'no-store', 'public-immutable', 'private-short', 'revalidate']:
headers = build_cache_headers(strategy)
print(f"{strategy}: {headers.get('Cache-Control')}")
Expected Output
no-cache: no-cache, must-revalidate
no-store: no-store
public-immutable: public, max-age=31536000, immutable
private-short: private, max-age=300
revalidate: public, max-age=3600, must-revalidateHints
Hint 1: Use a dictionary mapping strategy names to Cache-Control values.
Hint 2: 31536000 seconds = 1 year (standard for immutable assets).
Hint 3: max-age=300 = 5 minutes; max-age=3600 = 1 hour.
Implement a Cookie header parser and a Set-Cookie header builder.Solution
def parse_cookie_header(cookie_header):
"""Parse a Cookie request header into a dict.
Input: "session=abc123; user_id=42; theme=dark"
Output: {"session": "abc123", "user_id": "42", "theme": "dark"}
"""
pass
def build_set_cookie(name, value, max_age=None, http_only=True, secure=True, same_site='Lax'):
"""Build a Set-Cookie header value string.
Include Max-Age if provided, always add HttpOnly if True, Secure if True, SameSite.
"""
pass
cookies = parse_cookie_header("session=abc123; user_id=42; theme=dark")
print(cookies)
sc = build_set_cookie('session', 'xyz789', max_age=3600)
print(sc)
sc2 = build_set_cookie('guest', 'true', http_only=False, secure=False, same_site='None')
print(sc2)
Expected Output
{'session': 'abc123', 'user_id': '42', 'theme': 'dark'}
session=xyz789; Max-Age=3600; HttpOnly; Secure; SameSite=Lax
guest=true; SameSite=NoneHints
Hint 1: Split the Cookie header on '; ' to get individual name=value pairs.
Hint 2: For Set-Cookie, start with 'name=value', then append optional attributes.
Hint 3: Only append 'HttpOnly' and 'Secure' strings (no value) when their flags are True.
Implement HTTP content negotiation by parsing the Accept header and selecting the best matching type.Solution
def negotiate_content_type(accept_header, available_types):
"""Select the best Content-Type based on the Accept header and what's available.
Parse quality factors: 'application/json;q=0.9, text/html;q=1.0, */*;q=0.1'
Return the available type with the highest matching quality factor.
Return None if no match (should result in 406 Not Acceptable).
"""
pass
available = ['application/json', 'text/html', 'text/plain']
tests = [
('text/html, application/json;q=0.9, */*;q=0.5', 'text/html'),
('application/json', 'application/json'),
('text/xml, application/xml', None),
('*/*', 'application/json'),
('application/json;q=0.5, text/html;q=0.9', 'text/html'),
]
for accept, expected in tests:
result = negotiate_content_type(accept, available)
status = 'PASS' if result == expected else 'FAIL'
print(f"{status}: '{accept[:40]}' -> {result}")
Expected Output
PASS: 'text/html, application/json;q=0.9, */*;q=0.5' -> text/html
PASS: 'application/json' -> application/json
PASS: 'text/xml, application/xml' -> None
PASS: '*/*' -> application/json
PASS: 'application/json;q=0.5, text/html;q=0.9' -> text/htmlHints
Hint 1: Split the Accept header on ',' to get each media type entry.
Hint 2: Each entry may have ';q=0.x' — default quality is 1.0 if absent.
Hint 3: For '*/*', match any available type; for 'type/*', match any subtype of that type.
Implement conditional gzip compression based on the client's Accept-Encoding header.Solution
import gzip
import json
def compress_response(body_str, request_headers):
"""Compress the response body if the client accepts gzip.
Check Accept-Encoding header for 'gzip'.
If supported: return (compressed_bytes, {'Content-Encoding': 'gzip', 'Vary': 'Accept-Encoding'})
If not supported: return (body_str.encode(), {})
"""
pass
data = {'users': [{'id': i, 'name': f'User {i}'} for i in range(100)]}
body = json.dumps(data)
# Client supports gzip
compressed, headers = compress_response(body, {'Accept-Encoding': 'gzip, deflate, br'})
print(f"gzip headers: {headers}")
print(f"compressed smaller: {len(compressed) < len(body.encode())}")
decompressed = json.loads(gzip.decompress(compressed))
print(f"round-trip ok: {len(decompressed['users']) == 100}")
# Client does NOT support gzip
raw, headers2 = compress_response(body, {'Accept-Encoding': 'identity'})
print(f"no-gzip headers: {headers2}")
print(f"raw is original: {raw == body.encode()}")
Expected Output
gzip headers: {'Content-Encoding': 'gzip', 'Vary': 'Accept-Encoding'}
compressed smaller: True
round-trip ok: True
no-gzip headers: {}
raw is original: TrueHints
Hint 1: Check if 'gzip' is in the Accept-Encoding header value.
Hint 2: Use gzip.compress(body_str.encode()) to compress; it returns bytes.
Hint 3: Always add 'Vary: Accept-Encoding' when compression depends on that header.
Implement ETag-based conditional GET that returns 304 Not Modified when the resource hasn't changed.Solution
import hashlib
import json
def handle_conditional_get(resource_data, request_headers):
"""Handle a conditional GET request using ETags.
1. Compute ETag as MD5 hex of the serialized resource.
2. If request has 'If-None-Match' header matching the ETag, return 304.
3. Otherwise return 200 with body and ETag header.
Return: (status_code, response_headers, body_or_none)
body_or_none is None for 304 responses.
"""
pass
resource = {'id': 1, 'name': 'Alice', 'role': 'admin'}
# First request — no conditional headers
status, headers, body = handle_conditional_get(resource, {})
etag = headers.get('ETag')
print(f"first: status={status}, etag={etag is not None}, body={body is not None}")
# Second request — correct ETag (not modified)
status2, headers2, body2 = handle_conditional_get(resource, {'If-None-Match': etag})
print(f"cached: status={status2}, body={body2}")
# Request with stale ETag
status3, headers3, body3 = handle_conditional_get(resource, {'If-None-Match': '"stale-etag"'})
print(f"stale: status={status3}, body={body3 is not None}")
Expected Output
first: status=200, etag=True, body=True
cached: status=304, body=None
stale: status=200, body=TrueHints
Hint 1: Compute ETag as '"' + hashlib.md5(json.dumps(resource, sort_keys=True).encode()).hexdigest() + '"'.
Hint 2: Compare the If-None-Match header value to the computed ETag.
Hint 3: 304 responses have no body — return None for the body.
Build a request processing pipeline that runs phases in order and short-circuits on the first non-None response.Solution
class RequestPipeline:
"""Simulates a web framework request processing pipeline.
Phases (in order):
1. parse_headers
2. authenticate
3. authorize
4. validate_body
5. handle (the actual route handler)
Each phase can short-circuit by returning a response dict.
If it returns None, processing continues to the next phase.
"""
def __init__(self):
self.phases = []
self.executed = []
def add_phase(self, name, fn):
self.phases.append((name, fn))
def process(self, request):
for name, fn in self.phases:
self.executed.append(name)
result = fn(request)
if result is not None:
return {'phase': name, 'response': result}
return {'phase': 'completed', 'response': None}
pipeline = RequestPipeline()
pipeline.add_phase('parse_headers', lambda r: None)
pipeline.add_phase('authenticate', lambda r: {'error': 'Unauthorized'} if not r.get('token') else None)
pipeline.add_phase('authorize', lambda r: {'error': 'Forbidden'} if r.get('role') != 'admin' else None)
pipeline.add_phase('validate_body', lambda r: None)
pipeline.add_phase('handle', lambda r: {'data': 'success'})
# Unauthenticated request
pipeline.executed.clear()
result = pipeline.process({'path': '/admin'})
print(f"no-auth: stopped_at={result['phase']}, phases_run={pipeline.executed}")
# Authenticated but wrong role
pipeline.executed.clear()
result = pipeline.process({'path': '/admin', 'token': 'abc', 'role': 'user'})
print(f"no-admin: stopped_at={result['phase']}, phases_run={pipeline.executed}")
# Fully authorized
pipeline.executed.clear()
result = pipeline.process({'path': '/admin', 'token': 'abc', 'role': 'admin'})
print(f"success: stopped_at={result['phase']}, phases_run={pipeline.executed}")
Expected Output
no-auth: stopped_at=authenticate, phases_run=['parse_headers', 'authenticate']
no-admin: stopped_at=authorize, phases_run=['parse_headers', 'authenticate', 'authorize']
success: stopped_at=completed, phases_run=['parse_headers', 'authenticate', 'authorize', 'validate_body', 'handle']Hints
Hint 1: The pipeline iterates through phases in order; if a phase returns non-None, it stops.
Hint 2: Record the phase name in self.executed before calling the function.
Hint 3: Return the phase name and response when short-circuiting.
Implement a session manager that creates and validates HMAC-signed tokens with expiry.Solution
import hmac
import hashlib
import time
import json
import base64
class SessionManager:
"""Manages server-side sessions with signed tokens.
Token format: base64(json_payload).base64(signature)
Signature: HMAC-SHA256 of the payload using secret_key.
"""
def __init__(self, secret_key: str, ttl_seconds: int = 3600):
self.secret_key = secret_key.encode()
self.ttl = ttl_seconds
def create_token(self, user_id: int, role: str) -> str:
"""Create a signed session token."""
pass
def validate_token(self, token: str):
"""Validate a token. Return user dict or raise ValueError.
Check: signature valid, not expired.
"""
pass
def _sign(self, payload_b64: str) -> str:
sig = hmac.new(self.secret_key, payload_b64.encode(), hashlib.sha256).hexdigest()
return sig
mgr = SessionManager('supersecret', ttl_seconds=3600)
token = mgr.create_token(42, 'admin')
print(f"token created: {len(token) > 10}")
user = mgr.validate_token(token)
print(f"valid: user_id={user['user_id']}, role={user['role']}")
# Tampered token
parts = token.split('.')
tampered = parts[0] + '.' + 'invalidsig'
try:
mgr.validate_token(tampered)
print("tamper: allowed (bad)")
except ValueError as e:
print(f"tamper: blocked ({e})")
# Expired token
old_mgr = SessionManager('supersecret', ttl_seconds=0)
old_token = old_mgr.create_token(1, 'user')
time.sleep(0.01)
try:
old_mgr.validate_token(old_token)
print("expired: allowed (bad)")
except ValueError as e:
print(f"expired: blocked ({e})")
Expected Output
token created: True
valid: user_id=42, role=admin
tamper: blocked (Invalid signature)
expired: blocked (Token expired)Hints
Hint 1: Encode the payload as JSON, then base64url-encode it for the first part of the token.
Hint 2: Use hmac.new(key, payload.encode(), hashlib.sha256).hexdigest() for the signature.
Hint 3: On validation: split on '.', verify signature, then check 'exp' field against time.time().
Compose three WSGI middleware layers (timing, request ID, error catching) around an inner app.Solution
import time
# The inner WSGI app
def inner_app(environ, start_response):
path = environ.get('PATH_INFO', '/')
if path == '/error':
raise RuntimeError("Simulated server error")
start_response('200 OK', [('Content-Type', 'application/json')])
return [b'{"status":"ok"}']
# TODO: Implement these WSGI middleware wrappers:
def timing_middleware(app):
"""Wrap app to add X-Response-Time header (milliseconds as string)."""
pass
def request_id_middleware(app):
"""Add X-Request-ID header to the response (use a counter or uuid)."""
pass
def error_catching_middleware(app):
"""Catch exceptions, return 500 JSON: {"error": "Internal Server Error"}."""
pass
# Stack: error_catching -> request_id -> timing -> inner_app
def call_wsgi(app, path='/'):
captured = {'status': None, 'headers': {}}
def start_response(status, headers):
captured['status'] = status
captured['headers'] = dict(headers)
environ = {'PATH_INFO': path, 'REQUEST_METHOD': 'GET'}
body = b''.join(app(environ, start_response))
return captured['status'], captured['headers'], body.decode()
stacked = error_catching_middleware(request_id_middleware(timing_middleware(inner_app)))
status, headers, body = call_wsgi(stacked, '/')
print(f"status: {status}")
print(f"has timing: {'X-Response-Time' in headers}")
print(f"has request-id: {'X-Request-ID' in headers}")
status2, headers2, body2 = call_wsgi(stacked, '/error')
print(f"error status: {status2}")
print(f"error body: {body2}")
Expected Output
status: 200 OK
has timing: True
has request-id: True
error status: 500 Internal Server Error
error body: {"error": "Internal Server Error"}Hints
Hint 1: Each middleware is a function that takes an app and returns a new WSGI callable.
Hint 2: To add headers: intercept start_response, add to the headers list, then forward.
Hint 3: For error catching: wrap the app call in try/except and call start_response manually on error.
Implement an HTTP keep-alive connection pool that tracks created vs reused connections.Solution
import time
import threading
from collections import defaultdict
class ConnectionPool:
"""Simulates HTTP/1.1 keep-alive connection pooling.
Connections to the same host are reused instead of creating new ones.
Max connections per host: max_per_host.
Connection idle timeout: idle_timeout_s.
"""
def __init__(self, max_per_host=5, idle_timeout_s=30):
self.max_per_host = max_per_host
self.idle_timeout_s = idle_timeout_s
self._pool = defaultdict(list) # host -> list of (conn_id, last_used)
self._conn_counter = 0
self._created = 0
self._reused = 0
def get_connection(self, host: str) -> int:
"""Return an existing connection ID if available, else create a new one."""
pass
def release_connection(self, host: str, conn_id: int):
"""Return connection to pool for reuse."""
pass
def evict_idle(self):
"""Remove connections idle longer than idle_timeout_s."""
pass
pool = ConnectionPool(max_per_host=3)
# Simulate 5 requests to same host
conns = []
for _ in range(3):
c = pool.get_connection('api.example.com')
conns.append(c)
# Release all
for c in conns:
pool.release_connection('api.example.com', c)
# Next 3 requests should reuse connections
for _ in range(3):
c = pool.get_connection('api.example.com')
pool.release_connection('api.example.com', c)
print(f"created: {pool._created}")
print(f"reused: {pool._reused}")
print(f"reuse_rate: {pool._reused / (pool._created + pool._reused):.0%}")
Expected Output
created: 3
reused: 3
reuse_rate: 50%Hints
Hint 1: When getting a connection: if the pool for the host has idle connections, pop one and record as reused.
Hint 2: If no idle connections available, create a new one and increment _created.
Hint 3: release_connection appends the conn_id back to the pool with the current timestamp.
