Python Middleware Practice Problems & Exercises
Practice: Middleware
← Back to lessonImplement a logging middleware that records request and response events around a handler.Solution
log = []
def logging_middleware(handler):
"""Wrap a handler function to log calls.
Before calling handler: append {'event': 'request', 'path': request['path']} to log.
After calling handler: append {'event': 'response', 'status': response['status']} to log.
Return the response unchanged.
"""
pass
def my_handler(request):
return {'status': 200, 'body': 'Hello'}
wrapped = logging_middleware(my_handler)
resp = wrapped({'path': '/hello', 'method': 'GET'})
print(f"response: {resp}")
print(f"log entries: {log}")
Expected Output
response: {'status': 200, 'body': 'Hello'}
log entries: [{'event': 'request', 'path': '/hello'}, {'event': 'response', 'status': 200}]Hints
Hint 1: logging_middleware returns a new function (the wrapper) that calls the original handler.
Hint 2: Append the request log entry before calling the handler.
Hint 3: Append the response log entry after calling the handler, then return the response.
Write authentication middleware that validates Bearer tokens and attaches the user role to the request.Solution
VALID_TOKENS = {'token-admin': 'admin', 'token-user': 'user'}
def auth_middleware(handler):
"""Middleware that validates Bearer tokens.
If Authorization header is missing or token invalid: return {'status': 401, 'body': 'Unauthorized'}
If valid: attach 'user_role' to the request and call handler.
"""
pass
def protected_handler(request):
return {'status': 200, 'body': f"Hello, {request['user_role']}"}
wrapped = auth_middleware(protected_handler)
r1 = wrapped({'path': '/secure', 'headers': {'Authorization': 'Bearer token-admin'}})
print(r1)
r2 = wrapped({'path': '/secure', 'headers': {'Authorization': 'Bearer bad'}})
print(r2)
r3 = wrapped({'path': '/secure', 'headers': {}})
print(r3)
Expected Output
{'status': 200, 'body': 'Hello, admin'}
{'status': 401, 'body': 'Unauthorized'}
{'status': 401, 'body': 'Unauthorized'}Hints
Hint 1: Extract the token by checking if Authorization starts with 'Bearer ' and splitting.
Hint 2: Look up the token in VALID_TOKENS to get the role.
Hint 3: Mutate the request dict to add 'user_role' before calling the handler.
Implement request ID injection middleware that preserves existing IDs or generates new UUIDs.Solution
import uuid
request_id_log = []
def request_id_middleware(handler):
"""Inject a unique request ID into each request.
If request already has 'X-Request-ID' header, use that value.
Otherwise generate a UUID4 string.
Add the ID to the request dict as 'request_id'.
Add 'X-Request-ID' to the response headers.
response is a dict with 'status', 'body', and optional 'headers' dict.
"""
pass
def echo_handler(request):
return {'status': 200, 'body': f"id={request.get('request_id')}", 'headers': {}}
wrapped = request_id_middleware(echo_handler)
# Without pre-existing ID
r1 = wrapped({'path': '/test', 'headers': {}})
print(f"has_id: {'X-Request-ID' in r1['headers']}")
# With pre-existing ID
r2 = wrapped({'path': '/test', 'headers': {'X-Request-ID': 'my-trace-123'}})
print(f"preserved: {r2['headers']['X-Request-ID']}")
Expected Output
has_id: True
preserved: my-trace-123Hints
Hint 1: Check if 'X-Request-ID' exists in request['headers']; use it if present.
Hint 2: Otherwise use str(uuid.uuid4()) to generate a new ID.
Hint 3: After getting the response, set response['headers']['X-Request-ID'] = request_id.
Build compression middleware that gzip-encodes response bodies when the client requests it.Solution
import gzip
import json
def compression_middleware(handler):
"""Compress response body with gzip if client accepts it.
Check request['headers']['Accept-Encoding'] for 'gzip'.
If accepted: compress body, add Content-Encoding: gzip to response headers.
If not: pass through unchanged.
Works on string or bytes body.
"""
pass
def data_handler(request):
data = [{'id': i, 'name': f'item_{i}'} for i in range(50)]
return {'status': 200, 'body': json.dumps(data), 'headers': {}}
wrapped = compression_middleware(data_handler)
r1 = wrapped({'path': '/data', 'headers': {'Accept-Encoding': 'gzip, deflate'}})
body_is_bytes = isinstance(r1['body'], bytes)
compressed_ok = body_is_bytes and len(r1['body']) < 500
print(f"compressed: {compressed_ok}, Content-Encoding: {r1['headers'].get('Content-Encoding')}")
r2 = wrapped({'path': '/data', 'headers': {'Accept-Encoding': 'identity'}})
print(f"uncompressed type: {type(r2['body']).__name__}")
Expected Output
compressed: True, Content-Encoding: gzip
uncompressed type: strHints
Hint 1: Check 'gzip' in request['headers'].get('Accept-Encoding', '').
Hint 2: Encode the body to bytes if it's a string, then call gzip.compress().
Hint 3: Set response['headers']['Content-Encoding'] = 'gzip' on the response.
Implement sliding window rate limiting middleware that tracks requests per client IP.Solution
import time
from collections import defaultdict, deque
def rate_limit_middleware(handler, max_requests=3, window_seconds=1):
"""Sliding window rate limiter per client IP.
Track request timestamps per client_ip.
If client exceeds max_requests in the last window_seconds: return 429.
client_ip comes from request['client_ip'].
"""
pass
def api_handler(request):
return {'status': 200, 'body': 'OK'}
limited = rate_limit_middleware(api_handler, max_requests=3, window_seconds=10)
results = []
for i in range(5):
r = limited({'path': '/api', 'client_ip': '192.168.1.1'})
results.append(r['status'])
print(f"statuses: {results}")
print(f"allowed: {results.count(200)}, blocked: {results.count(429)}")
# Different IP should not be limited
r = limited({'path': '/api', 'client_ip': '10.0.0.1'})
print(f"different IP: {r['status']}")
Expected Output
statuses: [200, 200, 200, 429, 429]
allowed: 3, blocked: 2
different IP: 200Hints
Hint 1: Use a defaultdict of deques to store request timestamps per IP.
Hint 2: Before each request, remove timestamps older than window_seconds from the deque.
Hint 3: If len(deque) >= max_requests, return 429; otherwise append current time and call handler.
Implement CORS middleware that handles both regular requests and OPTIONS preflight requests.Solution
def cors_middleware(handler, allowed_origins=None, allow_credentials=False):
"""CORS middleware for a dict-based request/response system.
allowed_origins: list of allowed origins, or ['*'] for all.
Preflight: OPTIONS request with Access-Control-Request-Method header.
-> Return 204 with CORS headers (do NOT call inner handler).
Regular request: add Access-Control-Allow-Origin if origin is allowed.
If origin not allowed: return response without CORS headers.
"""
if allowed_origins is None:
allowed_origins = ['*']
pass
def api_handler(request):
return {'status': 200, 'body': 'data', 'headers': {}}
cors = cors_middleware(api_handler, allowed_origins=['https://app.example.com'])
# Allowed origin
r1 = cors({'method': 'GET', 'path': '/api',
'headers': {'Origin': 'https://app.example.com'}})
print(f"allowed: {r1['headers'].get('Access-Control-Allow-Origin')}")
# Preflight
r2 = cors({'method': 'OPTIONS', 'path': '/api',
'headers': {'Origin': 'https://app.example.com',
'Access-Control-Request-Method': 'POST'}})
print(f"preflight: status={r2['status']}, ACAO={r2['headers'].get('Access-Control-Allow-Origin')}")
# Blocked origin
r3 = cors({'method': 'GET', 'path': '/api',
'headers': {'Origin': 'https://evil.com'}})
print(f"blocked: ACAO={r3['headers'].get('Access-Control-Allow-Origin')}")
Expected Output
allowed: https://app.example.com
preflight: status=204, ACAO=https://app.example.com
blocked: ACAO=NoneHints
Hint 1: Check if request['method'] == 'OPTIONS' and 'Access-Control-Request-Method' is in headers for preflight.
Hint 2: For preflight, return status 204 with CORS headers without calling the inner handler.
Hint 3: Check if '*' is in allowed_origins OR the specific origin is in the list.
Write caching middleware that stores GET responses in memory with TTL and tracks cache hits/misses.Solution
import time
def caching_middleware(handler, ttl_seconds=60):
"""Cache GET request responses in memory.
Cache key: (method, path, query_string).
Only cache GET requests with 200 status.
Expired cache entries should be served fresh.
Track hits and misses.
"""
pass
call_count = 0
def slow_handler(request):
global call_count
call_count += 1
return {'status': 200, 'body': f'data-{call_count}', 'headers': {}}
cache = caching_middleware(slow_handler, ttl_seconds=60)
r1 = cache({'method': 'GET', 'path': '/data', 'query': ''})
r2 = cache({'method': 'GET', 'path': '/data', 'query': ''}) # cache hit
r3 = cache({'method': 'POST', 'path': '/data', 'query': ''}) # not cached
print(f"r1 body: {r1['body']}")
print(f"r2 body: {r2['body']} (same as r1 = {r1['body'] == r2['body']})")
print(f"handler called: {call_count} times (POST not cached)")
print(f"hits: {cache.hits}, misses: {cache.misses}")
Expected Output
r1 body: data-1
r2 body: data-1 (same as r1 = True)
handler called: 2 times (POST not cached)
hits: 1, misses: 2Hints
Hint 1: Store cached responses as (response, expiry_time) tuples.
Hint 2: Only cache when method == 'GET' and status == 200.
Hint 3: Attach hits and misses as attributes on the wrapper function object.
Build timing middleware that records response times and exposes percentile statistics.Solution
import time
import statistics
def timing_middleware(handler):
"""Record response times for every request.
Attach a .stats() method to the wrapper that returns:
{'count': N, 'mean_ms': float, 'p50_ms': float, 'p95_ms': float, 'max_ms': float}
All times rounded to 2 decimal places.
"""
pass
import time as _time
def variable_handler(request):
_time.sleep(request.get('delay', 0))
return {'status': 200, 'body': 'ok'}
timed = timing_middleware(variable_handler)
# Simulate 10 requests with varying delays
delays = [0.01, 0.02, 0.01, 0.05, 0.01, 0.02, 0.01, 0.08, 0.01, 0.03]
for d in delays:
timed({'method': 'GET', 'path': '/test', 'delay': d})
stats = timed.stats()
print(f"count: {stats['count']}")
print(f"mean > 0: {stats['mean_ms'] > 0}")
print(f"p95 >= p50: {stats['p95_ms'] >= stats['p50_ms']}")
print(f"max >= p95: {stats['max_ms'] >= stats['p95_ms']}")
Expected Output
count: 10
mean > 0: True
p95 >= p50: True
max >= p95: TrueHints
Hint 1: Record elapsed time using time.time() before and after calling the handler.
Hint 2: statistics.median() gives p50; use sorted list indexing for p95 (index at 95th percentile).
Hint 3: Attach .stats as a method by setting wrapper.stats = stats_fn after defining wrapper.
Implement an ASGI timing middleware class that injects an x-response-time header into HTTP responses.Solution
import asyncio
# Minimal ASGI app
async def inner_asgi_app(scope, receive, send):
if scope['type'] == 'http':
await send({'type': 'http.response.start', 'status': 200,
'headers': [[b'content-type', b'application/json']]})
await send({'type': 'http.response.body', 'body': b'{"ok": true}'})
# TODO: Implement an ASGI timing middleware class.
# class TimingMiddleware:
# __init__(self, app): store app
# async __call__(self, scope, receive, send):
# For http scope: record start time, wrap send to capture first status event,
# compute duration, add 'x-response-time' header (as bytes) to the response.
# For non-http scope: pass through unchanged.
class TimingMiddleware:
pass
async def run_test():
app = TimingMiddleware(inner_asgi_app)
headers_sent = []
async def mock_send(event):
headers_sent.append(event)
scope = {'type': 'http', 'method': 'GET', 'path': '/'}
await app(scope, None, mock_send)
start_event = headers_sent[0]
header_names = [h[0] for h in start_event.get('headers', [])]
print(f"status: {start_event['status']}")
print(f"has timing header: {b'x-response-time' in header_names}")
asyncio.run(run_test())
Expected Output
status: 200
has timing header: TrueHints
Hint 1: ASGI middleware is a class with __init__(self, app) and async __call__(self, scope, receive, send).
Hint 2: Wrap the send callable: intercept 'http.response.start' events to inject headers.
Hint 3: Add the header as a tuple of bytes: [b'x-response-time', b'12.34ms'].
Implement compose_middleware that stacks multiple middlewares with correct onion-layer execution order.Solution
execution_order = []
def make_middleware(name, pre_msg=None, post_msg=None):
"""Factory that creates middleware recording execution order.
pre_msg: log before calling inner handler
post_msg: log after calling inner handler
"""
def middleware(handler):
def wrapper(request):
if pre_msg:
execution_order.append(f"{name}:pre")
response = handler(request)
if post_msg:
execution_order.append(f"{name}:post")
return response
return wrapper
return middleware
def compose_middleware(*middlewares):
"""Compose multiple middlewares into one.
Middlewares are applied in order: first in list = outermost layer.
compose_middleware(A, B, C)(handler) = A(B(C(handler)))
"""
pass
def final_handler(request):
execution_order.append('handler')
return {'status': 200, 'body': 'done'}
auth_mw = make_middleware('auth', pre_msg=True, post_msg=False)
logging_mw = make_middleware('logging', pre_msg=True, post_msg=True)
timing_mw = make_middleware('timing', pre_msg=True, post_msg=True)
composed = compose_middleware(logging_mw, auth_mw, timing_mw)(final_handler)
composed({'method': 'GET', 'path': '/test'})
print(f"execution order: {execution_order}")
Expected Output
execution order: ['logging:pre', 'auth:pre', 'timing:pre', 'handler', 'timing:post', 'logging:post']Hints
Hint 1: Use functools.reduce to fold middleware over the handler from right to left.
Hint 2: A(B(C(handler))): C wraps handler first, then B wraps that, then A wraps that.
Hint 3: reduce(lambda h, mw: mw(h), reversed(middlewares), handler) applies them inside-out.
Implement the circuit breaker pattern as middleware with CLOSED, OPEN, and HALF_OPEN states.Solution
import time
class CircuitBreakerMiddleware:
"""Circuit breaker pattern middleware.
States: CLOSED (normal), OPEN (failing fast), HALF_OPEN (testing recovery).
CLOSED -> OPEN: after failure_threshold consecutive failures.
OPEN -> HALF_OPEN: after reset_timeout_s seconds.
HALF_OPEN -> CLOSED: if next request succeeds.
HALF_OPEN -> OPEN: if next request fails.
A failed response: status >= 500.
Return {'status': 503, 'body': 'Circuit Open'} when circuit is OPEN.
"""
CLOSED = 'closed'
OPEN = 'open'
HALF_OPEN = 'half_open'
def __init__(self, handler, failure_threshold=3, reset_timeout_s=5):
self.handler = handler
self.failure_threshold = failure_threshold
self.reset_timeout_s = reset_timeout_s
self.state = self.CLOSED
self.failure_count = 0
self.opened_at = None
def __call__(self, request):
pass
fail_count = [0]
def flaky_handler(request):
fail_count[0] += 1
if fail_count[0] <= 4:
return {'status': 500, 'body': 'error'}
return {'status': 200, 'body': 'ok'}
cb = CircuitBreakerMiddleware(flaky_handler, failure_threshold=3, reset_timeout_s=0.1)
statuses = []
for i in range(5):
r = cb({'path': '/api'})
statuses.append(r['status'])
print(f"first 5 statuses: {statuses}")
print(f"circuit state after failures: {cb.state}")
time.sleep(0.15)
r = cb({'path': '/api'}) # half-open probe
print(f"after reset: status={r['status']}, state={cb.state}")
Expected Output
first 5 statuses: [500, 500, 500, 503, 503]
circuit state after failures: open
after reset: status=200, state=closedHints
Hint 1: In OPEN state: check if reset_timeout_s has passed (time.time() - opened_at); if so, move to HALF_OPEN.
Hint 2: In HALF_OPEN state: call the handler once; success -> CLOSED, failure -> OPEN.
Hint 3: In CLOSED state: call handler; if status >= 500, increment failure_count; if it reaches threshold, open the circuit.
