Skip to main content

Python Middleware Practice Problems & Exercises

Practice: Middleware

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1Logging MiddlewareEasy
middlewareloggingdecorator pattern

Implement a logging middleware that records request and response events around a handler.

Solution
log = []

def logging_middleware(handler):
def wrapper(request):
log.append({'event': 'request', 'path': request['path']})
response = handler(request)
log.append({'event': 'response', 'status': response['status']})
return response
return wrapper

def my_handler(request):
return {'status': 200, 'body': 'Hello'}

wrapped = logging_middleware(my_handler)

resp = wrapped({'path': '/hello', 'method': 'GET'})
print(f"response: {resp}")
print(f"log entries: {log}")
log = []

def logging_middleware(handler):
  """Wrap a handler function to log calls.
  Before calling handler: append {'event': 'request', 'path': request['path']} to log.
  After calling handler: append {'event': 'response', 'status': response['status']} to log.
  Return the response unchanged.
  """
  pass

def my_handler(request):
  return {'status': 200, 'body': 'Hello'}

wrapped = logging_middleware(my_handler)

resp = wrapped({'path': '/hello', 'method': 'GET'})
print(f"response: {resp}")
print(f"log entries: {log}")
Expected Output
response: {'status': 200, 'body': 'Hello'}
log entries: [{'event': 'request', 'path': '/hello'}, {'event': 'response', 'status': 200}]
Hints

Hint 1: logging_middleware returns a new function (the wrapper) that calls the original handler.

Hint 2: Append the request log entry before calling the handler.

Hint 3: Append the response log entry after calling the handler, then return the response.


#2Authentication MiddlewareEasy
middlewareauthenticationBearer token401

Write authentication middleware that validates Bearer tokens and attaches the user role to the request.

Solution
VALID_TOKENS = {'token-admin': 'admin', 'token-user': 'user'}

def auth_middleware(handler):
def wrapper(request):
auth = request.get('headers', {}).get('Authorization', '')
if not auth.startswith('Bearer '):
return {'status': 401, 'body': 'Unauthorized'}
token = auth[len('Bearer '):]
role = VALID_TOKENS.get(token)
if role is None:
return {'status': 401, 'body': 'Unauthorized'}
request['user_role'] = role
return handler(request)
return wrapper

def protected_handler(request):
return {'status': 200, 'body': f"Hello, {request['user_role']}"}

wrapped = auth_middleware(protected_handler)

r1 = wrapped({'path': '/secure', 'headers': {'Authorization': 'Bearer token-admin'}})
print(r1)

r2 = wrapped({'path': '/secure', 'headers': {'Authorization': 'Bearer bad'}})
print(r2)

r3 = wrapped({'path': '/secure', 'headers': {}})
print(r3)
VALID_TOKENS = {'token-admin': 'admin', 'token-user': 'user'}

def auth_middleware(handler):
  """Middleware that validates Bearer tokens.
  If Authorization header is missing or token invalid: return {'status': 401, 'body': 'Unauthorized'}
  If valid: attach 'user_role' to the request and call handler.
  """
  pass

def protected_handler(request):
  return {'status': 200, 'body': f"Hello, {request['user_role']}"}

wrapped = auth_middleware(protected_handler)

r1 = wrapped({'path': '/secure', 'headers': {'Authorization': 'Bearer token-admin'}})
print(r1)

r2 = wrapped({'path': '/secure', 'headers': {'Authorization': 'Bearer bad'}})
print(r2)

r3 = wrapped({'path': '/secure', 'headers': {}})
print(r3)
Expected Output
{'status': 200, 'body': 'Hello, admin'}
{'status': 401, 'body': 'Unauthorized'}
{'status': 401, 'body': 'Unauthorized'}
Hints

Hint 1: Extract the token by checking if Authorization starts with 'Bearer ' and splitting.

Hint 2: Look up the token in VALID_TOKENS to get the role.

Hint 3: Mutate the request dict to add 'user_role' before calling the handler.


#3Request ID Injection MiddlewareEasy
middlewarerequest-idUUIDtracing

Implement request ID injection middleware that preserves existing IDs or generates new UUIDs.

Solution
import uuid

def request_id_middleware(handler):
def wrapper(request):
headers = request.get('headers', {})
request_id = headers.get('X-Request-ID') or str(uuid.uuid4())
request['request_id'] = request_id
response = handler(request)
if 'headers' not in response:
response['headers'] = {}
response['headers']['X-Request-ID'] = request_id
return response
return wrapper

def echo_handler(request):
return {'status': 200, 'body': f"id={request.get('request_id')}", 'headers': {}}

wrapped = request_id_middleware(echo_handler)

r1 = wrapped({'path': '/test', 'headers': {}})
print(f"has_id: {'X-Request-ID' in r1['headers']}")

r2 = wrapped({'path': '/test', 'headers': {'X-Request-ID': 'my-trace-123'}})
print(f"preserved: {r2['headers']['X-Request-ID']}")
import uuid

request_id_log = []

def request_id_middleware(handler):
  """Inject a unique request ID into each request.
  If request already has 'X-Request-ID' header, use that value.
  Otherwise generate a UUID4 string.
  Add the ID to the request dict as 'request_id'.
  Add 'X-Request-ID' to the response headers.
  response is a dict with 'status', 'body', and optional 'headers' dict.
  """
  pass

def echo_handler(request):
  return {'status': 200, 'body': f"id={request.get('request_id')}", 'headers': {}}

wrapped = request_id_middleware(echo_handler)

# Without pre-existing ID
r1 = wrapped({'path': '/test', 'headers': {}})
print(f"has_id: {'X-Request-ID' in r1['headers']}")

# With pre-existing ID
r2 = wrapped({'path': '/test', 'headers': {'X-Request-ID': 'my-trace-123'}})
print(f"preserved: {r2['headers']['X-Request-ID']}")
Expected Output
has_id: True
preserved: my-trace-123
Hints

Hint 1: Check if 'X-Request-ID' exists in request['headers']; use it if present.

Hint 2: Otherwise use str(uuid.uuid4()) to generate a new ID.

Hint 3: After getting the response, set response['headers']['X-Request-ID'] = request_id.


#4Response Compression MiddlewareEasy
middlewaregzipcompressionContent-Encoding

Build compression middleware that gzip-encodes response bodies when the client requests it.

Solution
import gzip
import json

def compression_middleware(handler):
def wrapper(request):
response = handler(request)
accept_enc = request.get('headers', {}).get('Accept-Encoding', '')
if 'gzip' in accept_enc:
body = response['body']
if isinstance(body, str):
body = body.encode('utf-8')
response['body'] = gzip.compress(body)
if 'headers' not in response:
response['headers'] = {}
response['headers']['Content-Encoding'] = 'gzip'
return response
return wrapper

def data_handler(request):
data = [{'id': i, 'name': f'item_{i}'} for i in range(50)]
return {'status': 200, 'body': json.dumps(data), 'headers': {}}

wrapped = compression_middleware(data_handler)

r1 = wrapped({'path': '/data', 'headers': {'Accept-Encoding': 'gzip, deflate'}})
body_is_bytes = isinstance(r1['body'], bytes)
compressed_ok = body_is_bytes and len(r1['body']) < 500
print(f"compressed: {compressed_ok}, Content-Encoding: {r1['headers'].get('Content-Encoding')}")

r2 = wrapped({'path': '/data', 'headers': {'Accept-Encoding': 'identity'}})
print(f"uncompressed type: {type(r2['body']).__name__}")
import gzip
import json

def compression_middleware(handler):
  """Compress response body with gzip if client accepts it.
  Check request['headers']['Accept-Encoding'] for 'gzip'.
  If accepted: compress body, add Content-Encoding: gzip to response headers.
  If not: pass through unchanged.
  Works on string or bytes body.
  """
  pass

def data_handler(request):
  data = [{'id': i, 'name': f'item_{i}'} for i in range(50)]
  return {'status': 200, 'body': json.dumps(data), 'headers': {}}

wrapped = compression_middleware(data_handler)

r1 = wrapped({'path': '/data', 'headers': {'Accept-Encoding': 'gzip, deflate'}})
body_is_bytes = isinstance(r1['body'], bytes)
compressed_ok = body_is_bytes and len(r1['body']) < 500
print(f"compressed: {compressed_ok}, Content-Encoding: {r1['headers'].get('Content-Encoding')}")

r2 = wrapped({'path': '/data', 'headers': {'Accept-Encoding': 'identity'}})
print(f"uncompressed type: {type(r2['body']).__name__}")
Expected Output
compressed: True, Content-Encoding: gzip
uncompressed type: str
Hints

Hint 1: Check 'gzip' in request['headers'].get('Accept-Encoding', '').

Hint 2: Encode the body to bytes if it's a string, then call gzip.compress().

Hint 3: Set response['headers']['Content-Encoding'] = 'gzip' on the response.


#5Rate Limiting MiddlewareMedium
middlewarerate limitingsliding window429

Implement sliding window rate limiting middleware that tracks requests per client IP.

Solution
import time
from collections import defaultdict, deque

def rate_limit_middleware(handler, max_requests=3, window_seconds=1):
request_history = defaultdict(deque)

def wrapper(request):
client_ip = request.get('client_ip', 'unknown')
now = time.time()
history = request_history[client_ip]

# Evict old timestamps
while history and now - history[0] > window_seconds:
history.popleft()

if len(history) >= max_requests:
return {'status': 429, 'body': 'Too Many Requests'}

history.append(now)
return handler(request)

return wrapper

def api_handler(request):
return {'status': 200, 'body': 'OK'}

limited = rate_limit_middleware(api_handler, max_requests=3, window_seconds=10)

results = []
for i in range(5):
r = limited({'path': '/api', 'client_ip': '192.168.1.1'})
results.append(r['status'])

print(f"statuses: {results}")
print(f"allowed: {results.count(200)}, blocked: {results.count(429)}")

r = limited({'path': '/api', 'client_ip': '10.0.0.1'})
print(f"different IP: {r['status']}")
import time
from collections import defaultdict, deque

def rate_limit_middleware(handler, max_requests=3, window_seconds=1):
  """Sliding window rate limiter per client IP.
  Track request timestamps per client_ip.
  If client exceeds max_requests in the last window_seconds: return 429.
  client_ip comes from request['client_ip'].
  """
  pass

def api_handler(request):
  return {'status': 200, 'body': 'OK'}

limited = rate_limit_middleware(api_handler, max_requests=3, window_seconds=10)

results = []
for i in range(5):
  r = limited({'path': '/api', 'client_ip': '192.168.1.1'})
  results.append(r['status'])

print(f"statuses: {results}")
print(f"allowed: {results.count(200)}, blocked: {results.count(429)}")

# Different IP should not be limited
r = limited({'path': '/api', 'client_ip': '10.0.0.1'})
print(f"different IP: {r['status']}")
Expected Output
statuses: [200, 200, 200, 429, 429]
allowed: 3, blocked: 2
different IP: 200
Hints

Hint 1: Use a defaultdict of deques to store request timestamps per IP.

Hint 2: Before each request, remove timestamps older than window_seconds from the deque.

Hint 3: If len(deque) >= max_requests, return 429; otherwise append current time and call handler.


#6CORS MiddlewareMedium
middlewareCORSpreflightallowed origins

Implement CORS middleware that handles both regular requests and OPTIONS preflight requests.

Solution
def cors_middleware(handler, allowed_origins=None, allow_credentials=False):
if allowed_origins is None:
allowed_origins = ['*']

def wrapper(request):
origin = request.get('headers', {}).get('Origin', '')
is_allowed = '*' in allowed_origins or origin in allowed_origins
is_preflight = (
request.get('method') == 'OPTIONS' and
'Access-Control-Request-Method' in request.get('headers', {})
)

if is_preflight:
headers = {}
if is_allowed:
headers['Access-Control-Allow-Origin'] = origin
headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
return {'status': 204, 'body': '', 'headers': headers}

response = handler(request)
if 'headers' not in response:
response['headers'] = {}
if is_allowed and origin:
response['headers']['Access-Control-Allow-Origin'] = origin
return response

return wrapper

def api_handler(request):
return {'status': 200, 'body': 'data', 'headers': {}}

cors = cors_middleware(api_handler, allowed_origins=['https://app.example.com'])

r1 = cors({'method': 'GET', 'path': '/api',
'headers': {'Origin': 'https://app.example.com'}})
print(f"allowed: {r1['headers'].get('Access-Control-Allow-Origin')}")

r2 = cors({'method': 'OPTIONS', 'path': '/api',
'headers': {'Origin': 'https://app.example.com',
'Access-Control-Request-Method': 'POST'}})
print(f"preflight: status={r2['status']}, ACAO={r2['headers'].get('Access-Control-Allow-Origin')}")

r3 = cors({'method': 'GET', 'path': '/api',
'headers': {'Origin': 'https://evil.com'}})
print(f"blocked: ACAO={r3['headers'].get('Access-Control-Allow-Origin')}")
def cors_middleware(handler, allowed_origins=None, allow_credentials=False):
  """CORS middleware for a dict-based request/response system.
  allowed_origins: list of allowed origins, or ['*'] for all.
  Preflight: OPTIONS request with Access-Control-Request-Method header.
    -> Return 204 with CORS headers (do NOT call inner handler).
  Regular request: add Access-Control-Allow-Origin if origin is allowed.
  If origin not allowed: return response without CORS headers.
  """
  if allowed_origins is None:
      allowed_origins = ['*']
  pass

def api_handler(request):
  return {'status': 200, 'body': 'data', 'headers': {}}

cors = cors_middleware(api_handler, allowed_origins=['https://app.example.com'])

# Allowed origin
r1 = cors({'method': 'GET', 'path': '/api',
         'headers': {'Origin': 'https://app.example.com'}})
print(f"allowed: {r1['headers'].get('Access-Control-Allow-Origin')}")

# Preflight
r2 = cors({'method': 'OPTIONS', 'path': '/api',
         'headers': {'Origin': 'https://app.example.com',
                     'Access-Control-Request-Method': 'POST'}})
print(f"preflight: status={r2['status']}, ACAO={r2['headers'].get('Access-Control-Allow-Origin')}")

# Blocked origin
r3 = cors({'method': 'GET', 'path': '/api',
         'headers': {'Origin': 'https://evil.com'}})
print(f"blocked: ACAO={r3['headers'].get('Access-Control-Allow-Origin')}")
Expected Output
allowed: https://app.example.com
preflight: status=204, ACAO=https://app.example.com
blocked: ACAO=None
Hints

Hint 1: Check if request['method'] == 'OPTIONS' and 'Access-Control-Request-Method' is in headers for preflight.

Hint 2: For preflight, return status 204 with CORS headers without calling the inner handler.

Hint 3: Check if '*' is in allowed_origins OR the specific origin is in the list.


#7Caching MiddlewareMedium
middlewarecachingGET requestsTTLin-memory

Write caching middleware that stores GET responses in memory with TTL and tracks cache hits/misses.

Solution
import time

def caching_middleware(handler, ttl_seconds=60):
cache_store = {}

def wrapper(request):
method = request.get('method', 'GET')
path = request.get('path', '/')
query = request.get('query', '')
key = (method, path, query)

if method == 'GET' and key in cache_store:
response, expiry = cache_store[key]
if time.time() < expiry:
wrapper.hits += 1
return response

wrapper.misses += 1
response = handler(request)
if method == 'GET' and response.get('status') == 200:
cache_store[key] = (response, time.time() + ttl_seconds)
return response

wrapper.hits = 0
wrapper.misses = 0
return wrapper

call_count = 0

def slow_handler(request):
global call_count
call_count += 1
return {'status': 200, 'body': f'data-{call_count}', 'headers': {}}

cache = caching_middleware(slow_handler, ttl_seconds=60)

r1 = cache({'method': 'GET', 'path': '/data', 'query': ''})
r2 = cache({'method': 'GET', 'path': '/data', 'query': ''})
r3 = cache({'method': 'POST', 'path': '/data', 'query': ''})

print(f"r1 body: {r1['body']}")
print(f"r2 body: {r2['body']} (same as r1 = {r1['body'] == r2['body']})")
print(f"handler called: {call_count} times (POST not cached)")
print(f"hits: {cache.hits}, misses: {cache.misses}")
import time

def caching_middleware(handler, ttl_seconds=60):
  """Cache GET request responses in memory.
  Cache key: (method, path, query_string).
  Only cache GET requests with 200 status.
  Expired cache entries should be served fresh.
  Track hits and misses.
  """
  pass

call_count = 0

def slow_handler(request):
  global call_count
  call_count += 1
  return {'status': 200, 'body': f'data-{call_count}', 'headers': {}}

cache = caching_middleware(slow_handler, ttl_seconds=60)

r1 = cache({'method': 'GET', 'path': '/data', 'query': ''})
r2 = cache({'method': 'GET', 'path': '/data', 'query': ''})  # cache hit
r3 = cache({'method': 'POST', 'path': '/data', 'query': ''})  # not cached

print(f"r1 body: {r1['body']}")
print(f"r2 body: {r2['body']} (same as r1 = {r1['body'] == r2['body']})")
print(f"handler called: {call_count} times (POST not cached)")
print(f"hits: {cache.hits}, misses: {cache.misses}")
Expected Output
r1 body: data-1
r2 body: data-1 (same as r1 = True)
handler called: 2 times (POST not cached)
hits: 1, misses: 2
Hints

Hint 1: Store cached responses as (response, expiry_time) tuples.

Hint 2: Only cache when method == 'GET' and status == 200.

Hint 3: Attach hits and misses as attributes on the wrapper function object.


#8Timing Middleware with PercentilesMedium
middlewaretimingpercentilesstatistics

Build timing middleware that records response times and exposes percentile statistics.

Solution
import time
import statistics

def timing_middleware(handler):
times_ms = []

def wrapper(request):
start = time.time()
response = handler(request)
elapsed = (time.time() - start) * 1000
times_ms.append(elapsed)
return response

def stats():
if not times_ms:
return {'count': 0, 'mean_ms': 0, 'p50_ms': 0, 'p95_ms': 0, 'max_ms': 0}
sorted_times = sorted(times_ms)
p95_idx = max(0, int(len(sorted_times) * 0.95) - 1)
return {
'count': len(times_ms),
'mean_ms': round(statistics.mean(times_ms), 2),
'p50_ms': round(statistics.median(times_ms), 2),
'p95_ms': round(sorted_times[p95_idx], 2),
'max_ms': round(max(times_ms), 2),
}

wrapper.stats = stats
return wrapper

import time as _time

def variable_handler(request):
_time.sleep(request.get('delay', 0))
return {'status': 200, 'body': 'ok'}

timed = timing_middleware(variable_handler)

delays = [0.01, 0.02, 0.01, 0.05, 0.01, 0.02, 0.01, 0.08, 0.01, 0.03]
for d in delays:
timed({'method': 'GET', 'path': '/test', 'delay': d})

stats = timed.stats()
print(f"count: {stats['count']}")
print(f"mean > 0: {stats['mean_ms'] > 0}")
print(f"p95 >= p50: {stats['p95_ms'] >= stats['p50_ms']}")
print(f"max >= p95: {stats['max_ms'] >= stats['p95_ms']}")
import time
import statistics

def timing_middleware(handler):
  """Record response times for every request.
  Attach a .stats() method to the wrapper that returns:
  {'count': N, 'mean_ms': float, 'p50_ms': float, 'p95_ms': float, 'max_ms': float}
  All times rounded to 2 decimal places.
  """
  pass

import time as _time

def variable_handler(request):
  _time.sleep(request.get('delay', 0))
  return {'status': 200, 'body': 'ok'}

timed = timing_middleware(variable_handler)

# Simulate 10 requests with varying delays
delays = [0.01, 0.02, 0.01, 0.05, 0.01, 0.02, 0.01, 0.08, 0.01, 0.03]
for d in delays:
  timed({'method': 'GET', 'path': '/test', 'delay': d})

stats = timed.stats()
print(f"count: {stats['count']}")
print(f"mean > 0: {stats['mean_ms'] > 0}")
print(f"p95 >= p50: {stats['p95_ms'] >= stats['p50_ms']}")
print(f"max >= p95: {stats['max_ms'] >= stats['p95_ms']}")
Expected Output
count: 10
mean > 0: True
p95 >= p50: True
max >= p95: True
Hints

Hint 1: Record elapsed time using time.time() before and after calling the handler.

Hint 2: statistics.median() gives p50; use sorted list indexing for p95 (index at 95th percentile).

Hint 3: Attach .stats as a method by setting wrapper.stats = stats_fn after defining wrapper.


#9ASGI Middleware PatternHard
ASGImiddlewareasyncscope/receive/send

Implement an ASGI timing middleware class that injects an x-response-time header into HTTP responses.

Solution
import asyncio
import time

async def inner_asgi_app(scope, receive, send):
if scope['type'] == 'http':
await send({'type': 'http.response.start', 'status': 200,
'headers': [[b'content-type', b'application/json']]})
await send({'type': 'http.response.body', 'body': b'{"ok": true}'})

class TimingMiddleware:
def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
if scope['type'] != 'http':
await self.app(scope, receive, send)
return

start = time.time()

async def send_with_timing(event):
if event['type'] == 'http.response.start':
elapsed = (time.time() - start) * 1000
timing_header = [b'x-response-time', f'{elapsed:.2f}ms'.encode()]
headers = list(event.get('headers', []))
headers.append(timing_header)
event = dict(event, headers=headers)
await send(event)

await self.app(scope, receive, send_with_timing)

async def run_test():
app = TimingMiddleware(inner_asgi_app)
headers_sent = []

async def mock_send(event):
headers_sent.append(event)

scope = {'type': 'http', 'method': 'GET', 'path': '/'}
await app(scope, None, mock_send)

start_event = headers_sent[0]
header_names = [h[0] for h in start_event.get('headers', [])]
print(f"status: {start_event['status']}")
print(f"has timing header: {b'x-response-time' in header_names}")

asyncio.run(run_test())
import asyncio

# Minimal ASGI app
async def inner_asgi_app(scope, receive, send):
  if scope['type'] == 'http':
      await send({'type': 'http.response.start', 'status': 200,
                  'headers': [[b'content-type', b'application/json']]})
      await send({'type': 'http.response.body', 'body': b'{"ok": true}'})

# TODO: Implement an ASGI timing middleware class.
# class TimingMiddleware:
#   __init__(self, app): store app
#   async __call__(self, scope, receive, send):
#     For http scope: record start time, wrap send to capture first status event,
#     compute duration, add 'x-response-time' header (as bytes) to the response.
#     For non-http scope: pass through unchanged.

class TimingMiddleware:
  pass

async def run_test():
  app = TimingMiddleware(inner_asgi_app)
  headers_sent = []

  async def mock_send(event):
      headers_sent.append(event)

  scope = {'type': 'http', 'method': 'GET', 'path': '/'}
  await app(scope, None, mock_send)

  start_event = headers_sent[0]
  header_names = [h[0] for h in start_event.get('headers', [])]
  print(f"status: {start_event['status']}")
  print(f"has timing header: {b'x-response-time' in header_names}")

asyncio.run(run_test())
Expected Output
status: 200
has timing header: True
Hints

Hint 1: ASGI middleware is a class with __init__(self, app) and async __call__(self, scope, receive, send).

Hint 2: Wrap the send callable: intercept 'http.response.start' events to inject headers.

Hint 3: Add the header as a tuple of bytes: [b'x-response-time', b'12.34ms'].


#10Middleware Stack with Ordering GuaranteesHard
middlewarestackorderingcomposable

Implement compose_middleware that stacks multiple middlewares with correct onion-layer execution order.

Solution
from functools import reduce

execution_order = []

def make_middleware(name, pre_msg=None, post_msg=None):
def middleware(handler):
def wrapper(request):
if pre_msg:
execution_order.append(f"{name}:pre")
response = handler(request)
if post_msg:
execution_order.append(f"{name}:post")
return response
return wrapper
return middleware

def compose_middleware(*middlewares):
def apply(handler):
return reduce(lambda h, mw: mw(h), reversed(middlewares), handler)
return apply

def final_handler(request):
execution_order.append('handler')
return {'status': 200, 'body': 'done'}

auth_mw = make_middleware('auth', pre_msg=True, post_msg=False)
logging_mw = make_middleware('logging', pre_msg=True, post_msg=True)
timing_mw = make_middleware('timing', pre_msg=True, post_msg=True)

composed = compose_middleware(logging_mw, auth_mw, timing_mw)(final_handler)
composed({'method': 'GET', 'path': '/test'})

print(f"execution order: {execution_order}")
execution_order = []

def make_middleware(name, pre_msg=None, post_msg=None):
  """Factory that creates middleware recording execution order.
  pre_msg: log before calling inner handler
  post_msg: log after calling inner handler
  """
  def middleware(handler):
      def wrapper(request):
          if pre_msg:
              execution_order.append(f"{name}:pre")
          response = handler(request)
          if post_msg:
              execution_order.append(f"{name}:post")
          return response
      return wrapper
  return middleware

def compose_middleware(*middlewares):
  """Compose multiple middlewares into one.
  Middlewares are applied in order: first in list = outermost layer.
  compose_middleware(A, B, C)(handler) = A(B(C(handler)))
  """
  pass

def final_handler(request):
  execution_order.append('handler')
  return {'status': 200, 'body': 'done'}

auth_mw = make_middleware('auth', pre_msg=True, post_msg=False)
logging_mw = make_middleware('logging', pre_msg=True, post_msg=True)
timing_mw = make_middleware('timing', pre_msg=True, post_msg=True)

composed = compose_middleware(logging_mw, auth_mw, timing_mw)(final_handler)
composed({'method': 'GET', 'path': '/test'})

print(f"execution order: {execution_order}")
Expected Output
execution order: ['logging:pre', 'auth:pre', 'timing:pre', 'handler', 'timing:post', 'logging:post']
Hints

Hint 1: Use functools.reduce to fold middleware over the handler from right to left.

Hint 2: A(B(C(handler))): C wraps handler first, then B wraps that, then A wraps that.

Hint 3: reduce(lambda h, mw: mw(h), reversed(middlewares), handler) applies them inside-out.


#11Circuit Breaker MiddlewareHard
middlewarecircuit breakerresiliencefault tolerance

Implement the circuit breaker pattern as middleware with CLOSED, OPEN, and HALF_OPEN states.

Solution
import time

class CircuitBreakerMiddleware:
CLOSED = 'closed'
OPEN = 'open'
HALF_OPEN = 'half_open'

def __init__(self, handler, failure_threshold=3, reset_timeout_s=5):
self.handler = handler
self.failure_threshold = failure_threshold
self.reset_timeout_s = reset_timeout_s
self.state = self.CLOSED
self.failure_count = 0
self.opened_at = None

def __call__(self, request):
if self.state == self.OPEN:
if time.time() - self.opened_at >= self.reset_timeout_s:
self.state = self.HALF_OPEN
else:
return {'status': 503, 'body': 'Circuit Open'}

response = self.handler(request)

if self.state == self.HALF_OPEN:
if response['status'] >= 500:
self.state = self.OPEN
self.opened_at = time.time()
else:
self.state = self.CLOSED
self.failure_count = 0
return response

# CLOSED state
if response['status'] >= 500:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = self.OPEN
self.opened_at = time.time()
else:
self.failure_count = 0

return response

fail_count = [0]

def flaky_handler(request):
fail_count[0] += 1
if fail_count[0] <= 4:
return {'status': 500, 'body': 'error'}
return {'status': 200, 'body': 'ok'}

cb = CircuitBreakerMiddleware(flaky_handler, failure_threshold=3, reset_timeout_s=0.1)

statuses = []
for i in range(5):
r = cb({'path': '/api'})
statuses.append(r['status'])

print(f"first 5 statuses: {statuses}")
print(f"circuit state after failures: {cb.state}")

time.sleep(0.15)
r = cb({'path': '/api'})
print(f"after reset: status={r['status']}, state={cb.state}")
import time

class CircuitBreakerMiddleware:
  """Circuit breaker pattern middleware.
  States: CLOSED (normal), OPEN (failing fast), HALF_OPEN (testing recovery).
  CLOSED -> OPEN: after failure_threshold consecutive failures.
  OPEN -> HALF_OPEN: after reset_timeout_s seconds.
  HALF_OPEN -> CLOSED: if next request succeeds.
  HALF_OPEN -> OPEN: if next request fails.
  A failed response: status >= 500.
  Return {'status': 503, 'body': 'Circuit Open'} when circuit is OPEN.
  """

  CLOSED = 'closed'
  OPEN = 'open'
  HALF_OPEN = 'half_open'

  def __init__(self, handler, failure_threshold=3, reset_timeout_s=5):
      self.handler = handler
      self.failure_threshold = failure_threshold
      self.reset_timeout_s = reset_timeout_s
      self.state = self.CLOSED
      self.failure_count = 0
      self.opened_at = None

  def __call__(self, request):
      pass

fail_count = [0]

def flaky_handler(request):
  fail_count[0] += 1
  if fail_count[0] <= 4:
      return {'status': 500, 'body': 'error'}
  return {'status': 200, 'body': 'ok'}

cb = CircuitBreakerMiddleware(flaky_handler, failure_threshold=3, reset_timeout_s=0.1)

statuses = []
for i in range(5):
  r = cb({'path': '/api'})
  statuses.append(r['status'])

print(f"first 5 statuses: {statuses}")
print(f"circuit state after failures: {cb.state}")

time.sleep(0.15)
r = cb({'path': '/api'})  # half-open probe
print(f"after reset: status={r['status']}, state={cb.state}")
Expected Output
first 5 statuses: [500, 500, 500, 503, 503]
circuit state after failures: open
after reset: status=200, state=closed
Hints

Hint 1: In OPEN state: check if reset_timeout_s has passed (time.time() - opened_at); if so, move to HALF_OPEN.

Hint 2: In HALF_OPEN state: call the handler once; success -> CLOSED, failure -> OPEN.

Hint 3: In CLOSED state: call handler; if status >= 500, increment failure_count; if it reaches threshold, open the circuit.

© 2026 EngineersOfAI. All rights reserved.