Skip to main content

Python Build a Web Framework Practice Problems & Exercises

Practice: Build a Web Framework

11 problems2 Easy4 Medium5 Hard90–120 min
← Back to lesson

Easy

#1HTTP Status Code RegistryEasy
httpstatus-codesregistry

Build an HTTPStatus class that acts as a registry for HTTP status codes with helper methods to classify them by category.

class HTTPStatus:
_codes = {
200: "OK",
201: "Created",
204: "No Content",
301: "Moved Permanently",
302: "Found",
304: "Not Modified",
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
422: "Unprocessable Entity",
429: "Too Many Requests",
500: "Internal Server Error",
502: "Bad Gateway",
503: "Service Unavailable",
}

@classmethod
def reason(cls, code: int) -> str:
return cls._codes.get(code, "Unknown")

@classmethod
def line(cls, code: int) -> str:
return f"{code} {cls.reason(code)}"

@classmethod
def is_success(cls, code: int) -> bool:
return 200 <= code < 300

@classmethod
def is_redirect(cls, code: int) -> bool:
return 300 <= code < 400

@classmethod
def is_client_error(cls, code: int) -> bool:
return 400 <= code < 500

@classmethod
def is_server_error(cls, code: int) -> bool:
return 500 <= code < 600

# Test
print(HTTPStatus.line(200))
print(HTTPStatus.line(404))
print(HTTPStatus.line(500))
print(f"Is success: {HTTPStatus.is_success(200)}")
print(f"Is client error: {HTTPStatus.is_client_error(200)}")
print(f"Is server error: {HTTPStatus.is_server_error(404)}")
Solution
class HTTPStatus:
_codes = {
200: "OK", 201: "Created", 204: "No Content",
301: "Moved Permanently", 302: "Found", 304: "Not Modified",
400: "Bad Request", 401: "Unauthorized", 403: "Forbidden",
404: "Not Found", 405: "Method Not Allowed",
422: "Unprocessable Entity", 429: "Too Many Requests",
500: "Internal Server Error", 502: "Bad Gateway", 503: "Service Unavailable",
}

@classmethod
def reason(cls, code: int) -> str:
return cls._codes.get(code, "Unknown")

@classmethod
def line(cls, code: int) -> str:
return f"{code} {cls.reason(code)}"

@classmethod
def is_success(cls, code: int) -> bool:
return 200 <= code < 300

@classmethod
def is_redirect(cls, code: int) -> bool:
return 300 <= code < 400

@classmethod
def is_client_error(cls, code: int) -> bool:
return 400 <= code < 500

@classmethod
def is_server_error(cls, code: int) -> bool:
return 500 <= code < 600

print(HTTPStatus.line(200))
print(HTTPStatus.line(404))
print(HTTPStatus.line(500))
print(f"Is success: {HTTPStatus.is_success(200)}")
print(f"Is client error: {HTTPStatus.is_client_error(200)}")
print(f"Is server error: {HTTPStatus.is_server_error(404)}")

Status code semantics: The HTTP specification (RFC 7231) defines these ranges. 4xx means the client made a bad request (the server understood it but refuses). 5xx means the server failed despite a valid request. This distinction matters for monitoring — a spike in 4xx errors indicates a client bug or API misuse; 5xx errors indicate a server-side problem.

Expected Output
200 OK\n404 Not Found\n500 Internal Server Error\nIs success: True\nIs client error: False\nIs server error: False
Hints

Hint 1: Store status codes in a dict mapping int to string. Group them by range: 2xx success, 3xx redirect, 4xx client error, 5xx server error.

Hint 2: Helper methods: `is_success(code)` returns 200 <= code < 300. Similar ranges for the others.

#2Query String ParserEasy
query-stringurl-parsinghttp

Implement a query string parser that handles multi-value keys, percent-encoding, and missing key defaults.

from urllib.parse import unquote_plus
from typing import Dict, List, Optional

class QueryString:
def __init__(self, raw: str = ""):
self._data: Dict[str, List[str]] = {}
if raw.startswith("?"):
raw = raw[1:]
for part in raw.split("&"):
if not part:
continue
if "=" in part:
key, value = part.split("=", 1)
else:
key, value = part, ""
key = unquote_plus(key)
value = unquote_plus(value)
self._data.setdefault(key, []).append(value)

def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
values = self._data.get(key)
return values[0] if values else default

def get_list(self, key: str) -> List[str]:
return self._data.get(key, [])

def all(self) -> Dict[str, List[str]]:
return dict(self._data)

# Test
qs = QueryString("name=Alice&age=30&tags=python&tags=web&encoded=hello%20world")
print(f"Parsed: {qs.all()}")
print(f"Single value: {qs.get('name')}")
print(f"Multi-value: {qs.get_list('tags')}")
print(f"Missing key default: {qs.get('user', 'guest')}")
Solution
from urllib.parse import unquote_plus
from typing import Dict, List, Optional

class QueryString:
def __init__(self, raw: str = ""):
self._data: Dict[str, List[str]] = {}
if raw.startswith("?"):
raw = raw[1:]
for part in raw.split("&"):
if not part:
continue
key, _, value = part.partition("=")
self._data.setdefault(unquote_plus(key), []).append(unquote_plus(value))

def get(self, key: str, default: Optional[str] = None) -> Optional[str]:
values = self._data.get(key)
return values[0] if values else default

def get_list(self, key: str) -> List[str]:
return self._data.get(key, [])

def all(self) -> Dict[str, List[str]]:
return dict(self._data)

qs = QueryString("name=Alice&age=30&tags=python&tags=web")
print(f"Parsed: {qs.all()}")
print(f"Single value: {qs.get('name')}")
print(f"Multi-value: {qs.get_list('tags')}")
print(f"Missing key default: {qs.get('user', 'guest')}")

Multi-value keys: The HTML select multiple and checkboxes produce multi-value keys (?tags=a&tags=b). This is why request.args.getlist('tags') exists in Flask and request.query_params.getlist('tags') exists in Starlette — both use list-based storage internally, just like this implementation.

Expected Output
Parsed: {'name': ['Alice'], 'age': ['30'], 'tags': ['python', 'web']}\nSingle value: Alice\nMulti-value: ['python', 'web']\nMissing key default: guest
Hints

Hint 1: Split on "&" to get key=value pairs, then split each pair on "=" (maxsplit=1). URL-decode percent-encoded values with urllib.parse.unquote_plus.

Hint 2: Keys can repeat (e.g., tags=python&tags=web). Store values as lists to handle multi-value keys correctly.


Medium

#3URL Router with Path ParametersMedium
routingpath-paramsregexurl-matching

Build a URL router that matches paths with named parameters and type coercion (e.g., {id:int} automatically converts to integer).

import re
from typing import Optional, Tuple, Dict, Callable, List

class Router:
_TYPE_PATTERNS = {
"str": r"[^/]+",
"int": r"[0-9]+",
"float": r"[0-9]+\.[0-9]+",
"path": r".+",
}
_PYTHON_TYPES = {"str": str, "int": int, "float": float, "path": str}

def __init__(self):
self._routes: List[tuple] = []

def _compile_pattern(self, pattern: str):
type_map = {}
def replace_param(m):
name_type = m.group(1)
if ":" in name_type:
name, type_str = name_type.split(":", 1)
else:
name, type_str = name_type, "str"
type_map[name] = self._PYTHON_TYPES.get(type_str, str)
regex_part = self._TYPE_PATTERNS.get(type_str, r"[^/]+")
return f"(?P<{name}>{regex_part})"

regex_str = re.sub(r"\{([^}]+)\}", replace_param, pattern)
return re.compile(f"^{regex_str}$"), type_map

def register(self, method: str, pattern: str, handler: Callable) -> None:
compiled, type_map = self._compile_pattern(pattern)
self._routes.append((method.upper(), compiled, type_map, handler))

def match(self, method: str, path: str) -> Tuple[Optional[Callable], Dict]:
for route_method, regex, type_map, handler in self._routes:
if route_method != method.upper():
continue
m = regex.fullmatch(path)
if m:
try:
params = {k: type_map[k](v) for k, v in m.groupdict().items()}
return handler, params
except (ValueError, KeyError):
continue
return None, {}

# Test
router = Router()

def list_users(): pass
def get_user(id): pass
def create_user(): pass

router.register("GET", "/users", list_users)
router.register("GET", "/users/{id:int}", get_user)
router.register("POST", "/users", create_user)

for method, path in [
("GET", "/users"),
("GET", "/users/42"),
("GET", "/users/abc"),
("POST", "/users"),
]:
handler, params = router.match(method, path)
name = handler.__name__ if handler else "None"
print(f"{method} {path} -> {name}, params={params}")
Solution
import re
from typing import Optional, Tuple, Dict, Callable, List

class Router:
_TYPE_PATTERNS = {"str": r"[^/]+", "int": r"[0-9]+", "float": r"[0-9]+\.[0-9]+", "path": r".+"}
_PYTHON_TYPES = {"str": str, "int": int, "float": float, "path": str}

def __init__(self):
self._routes: List[tuple] = []

def _compile_pattern(self, pattern: str):
type_map = {}
def replace_param(m):
raw = m.group(1)
name, _, type_str = raw.partition(":")
type_str = type_str or "str"
type_map[name] = self._PYTHON_TYPES.get(type_str, str)
return f"(?P<{name}>{self._TYPE_PATTERNS.get(type_str, r'[^/]+')})"
regex_str = re.sub(r"\{([^}]+)\}", replace_param, pattern)
return re.compile(f"^{regex_str}$"), type_map

def register(self, method: str, pattern: str, handler: Callable) -> None:
compiled, type_map = self._compile_pattern(pattern)
self._routes.append((method.upper(), compiled, type_map, handler))

def match(self, method: str, path: str) -> Tuple[Optional[Callable], Dict]:
for route_method, regex, type_map, handler in self._routes:
if route_method != method.upper():
continue
m = regex.fullmatch(path)
if m:
try:
return handler, {k: type_map[k](v) for k, v in m.groupdict().items()}
except (ValueError, KeyError):
continue
return None, {}

router = Router()
def list_users(): pass
def get_user(id): pass
def create_user(): pass

router.register("GET", "/users", list_users)
router.register("GET", "/users/{id:int}", get_user)
router.register("POST", "/users", create_user)

for method, path in [("GET","/users"),("GET","/users/42"),("GET","/users/abc"),("POST","/users")]:
handler, params = router.match(method, path)
print(f"{method} {path} -> {handler.__name__ if handler else 'None'}, params={params}")

How Django/Flask routing works: Django converts URL patterns to regexes at startup. Flask uses Werkzeug's routing engine which does the same with variable converters (:int, :string, :path). FastAPI adds Pydantic validation on top of the same path parameter extraction concept. The underlying mechanism in all cases is: compile pattern -> regex, match incoming URL, extract and coerce named groups.

import re
from typing import Optional, Tuple, Dict, Callable, List

class Router:
    """URL router with path parameter extraction.
    
    Supports patterns like:
      /users                -> static
      /users/{id}           -> captures id as string
      /users/{id:int}       -> captures id as int
      /posts/{slug:str}     -> captures slug as string
    
    register(method, pattern, handler) adds a route.
    match(method, path) returns (handler, path_params) or (None, {}).
    """
    pass
Expected Output
GET /users -> list_users, params={}\nGET /users/42 -> get_user, params={'id': 42}\nGET /users/abc -> None (int conversion failed)\nPOST /users -> create_user, params={}
Hints

Hint 1: Convert route patterns to regex: replace {name} with (?P<name>[^/]+) and {name:int} with (?P<name>[0-9]+). Then use re.fullmatch() to test each registered pattern.

Hint 2: Store routes as a list of (method, compiled_regex, type_map, handler) tuples. On match, extract named groups and coerce types according to type_map.

#4WSGI Application InterfaceMedium
wsgiapplication-interfacepep3333

Implement a WSGI request wrapper and response builder that follows the PEP 3333 WSGI interface.

from typing import Callable, List, Tuple
import io

class WSGIRequest:
def __init__(self, environ: dict):
self._env = environ

@property
def method(self) -> str:
return self._env.get("REQUEST_METHOD", "GET")

@property
def path(self) -> str:
return self._env.get("PATH_INFO", "/")

@property
def query_string(self) -> str:
return self._env.get("QUERY_STRING", "")

@property
def body(self) -> bytes:
length = int(self._env.get("CONTENT_LENGTH", 0) or 0)
if length > 0:
return self._env["wsgi.input"].read(length)
return b""

class WSGIResponse:
def __init__(self, status: int, body: str, headers: dict = None):
self.status_code = status
self.body = body.encode("utf-8")
self.headers = headers or {}
self.headers.setdefault("Content-Type", "text/plain; charset=utf-8")
self.headers["Content-Length"] = str(len(self.body))

def _status_line(self) -> str:
codes = {200: "OK", 201: "Created", 404: "Not Found", 500: "Internal Server Error"}
return f"{self.status_code} {codes.get(self.status_code, 'Unknown')}"

def __call__(self, start_response: Callable) -> List[bytes]:
headers_list = list(self.headers.items())
start_response(self._status_line(), headers_list)
return [self.body]

# Test: simulate a WSGI call
def app(environ, start_response):
req = WSGIRequest(environ)
name = "world"
for part in req.query_string.split("&"):
if part.startswith("name="):
name = part[5:]
response = WSGIResponse(200, f"Hello, {name}!")
return response(start_response)

status_sent = []
headers_sent = []

def fake_start_response(status, headers):
status_sent.append(status)
headers_sent.extend(headers)

environ = {
"REQUEST_METHOD": "GET",
"PATH_INFO": "/hello",
"QUERY_STRING": "name=world",
"CONTENT_LENGTH": "",
"wsgi.input": io.BytesIO(b""),
}

req = WSGIRequest(environ)
print(f"Method: {req.method}")
print(f"Path: {req.path}")
print(f"Query: {req.query_string}")

body_chunks = app(environ, fake_start_response)
print(f"Status sent: {status_sent[0]}")
print(f"Body: {b''.join(body_chunks).decode()}")
print(f"Content-Length header set: {any(k == 'Content-Length' for k, v in headers_sent)}")
Solution
from typing import Callable, List
import io

class WSGIRequest:
def __init__(self, environ: dict):
self._env = environ

@property
def method(self) -> str:
return self._env.get("REQUEST_METHOD", "GET")

@property
def path(self) -> str:
return self._env.get("PATH_INFO", "/")

@property
def query_string(self) -> str:
return self._env.get("QUERY_STRING", "")

@property
def body(self) -> bytes:
length = int(self._env.get("CONTENT_LENGTH", 0) or 0)
return self._env["wsgi.input"].read(length) if length > 0 else b""

class WSGIResponse:
def __init__(self, status: int, body: str, headers: dict = None):
self.status_code = status
self.body = body.encode("utf-8")
self.headers = headers or {}
self.headers.setdefault("Content-Type", "text/plain; charset=utf-8")
self.headers["Content-Length"] = str(len(self.body))

def _status_line(self) -> str:
codes = {200: "OK", 201: "Created", 404: "Not Found", 500: "Internal Server Error"}
return f"{self.status_code} {codes.get(self.status_code, 'Unknown')}"

def __call__(self, start_response: Callable) -> List[bytes]:
start_response(self._status_line(), list(self.headers.items()))
return [self.body]

def app(environ, start_response):
req = WSGIRequest(environ)
name = next((p[5:] for p in req.query_string.split("&") if p.startswith("name=")), "world")
return WSGIResponse(200, f"Hello, {name}!")(start_response)

status_sent = []
headers_sent = []

def fake_start_response(status, headers):
status_sent.append(status)
headers_sent.extend(headers)

environ = {"REQUEST_METHOD": "GET", "PATH_INFO": "/hello", "QUERY_STRING": "name=world",
"CONTENT_LENGTH": "", "wsgi.input": io.BytesIO(b"")}

req = WSGIRequest(environ)
print(f"Method: {req.method}")
print(f"Path: {req.path}")
print(f"Query: {req.query_string}")

body_chunks = app(environ, fake_start_response)
print(f"Status sent: {status_sent[0]}")
print(f"Body: {b''.join(body_chunks).decode()}")
print(f"Content-Length header set: {any(k=='Content-Length' for k,v in headers_sent)}")

WSGI's legacy: PEP 3333 defined WSGI in 2003. It is the synchronous predecessor to ASGI (PEP 3492). Every Python web framework (Django, Flask, Pyramid, Bottle) speaks WSGI. ASGI extends it to support async and WebSockets. Gunicorn and uWSGI are WSGI servers — they call your app(environ, start_response) function for each request.

from typing import Callable, Dict, List, Tuple, Any

class WSGIRequest:
    """Wrap a WSGI environ dict into a convenient request object."""
    def __init__(self, environ: dict):
        pass

    @property
    def method(self) -> str: pass

    @property
    def path(self) -> str: pass

    @property
    def query_string(self) -> str: pass

    @property
    def body(self) -> bytes: pass

class WSGIResponse:
    """Build a WSGI-compatible response."""
    def __init__(self, status: int, body: str, headers: dict = None):
        pass

    def __call__(self, start_response: Callable):
        pass
Expected Output
Method: GET\nPath: /hello\nQuery: name=world\nStatus sent: 200 OK\nBody: Hello, world!\nContent-Length header set: True
Hints

Hint 1: WSGI environ keys: REQUEST_METHOD, PATH_INFO, QUERY_STRING, wsgi.input (file-like body). Read body with environ["wsgi.input"].read(content_length).

Hint 2: WSGI response: call start_response(status_string, headers_list) where headers_list is [(name, value), ...]. Then return [body_bytes].

#5Response Content NegotiationMedium
content-negotiationaccept-headermime-types

Build a content negotiator that parses HTTP Accept headers and selects the appropriate response renderer.

import json
from typing import Dict, Callable, Optional, Any, Tuple, List

class ContentNegotiator:
def __init__(self):
self._renderers: Dict[str, Callable] = {}

def register_renderer(self, mime_type: str, renderer: Callable) -> None:
self._renderers[mime_type] = renderer

def _parse_accept(self, accept_header: str) -> List[Tuple[str, float]]:
types = []
for part in accept_header.split(","):
part = part.strip()
if ";q=" in part:
mime, q_str = part.split(";q=", 1)
try:
q = float(q_str)
except ValueError:
q = 1.0
elif ";Q=" in part:
mime, q_str = part.split(";Q=", 1)
q = float(q_str) if q_str.replace(".","").isdigit() else 1.0
else:
mime, q = part, 1.0
types.append((mime.strip(), q))
return sorted(types, key=lambda x: x[1], reverse=True)

def negotiate(self, accept_header: str, data: Any) -> Tuple[str, str]:
accepted = self._parse_accept(accept_header)
for mime, _ in accepted:
if mime in self._renderers:
return mime, self._renderers[mime](data)
if mime == "*/*" and self._renderers:
first = next(iter(self._renderers))
return first, self._renderers[first](data)
raise ValueError("406 Not Acceptable: no matching renderer")

# Register renderers
negotiator = ContentNegotiator()
negotiator.register_renderer(
"application/json",
lambda d: json.dumps(d),
)
negotiator.register_renderer(
"text/html",
lambda d: f"<p>{', '.join(f'{k.capitalize()}: {v}' for k,v in d.items())}</p>",
)
negotiator.register_renderer(
"text/plain",
lambda d: ", ".join(f"{k.capitalize()}: {v}" for k, v in d.items()),
)

data = {"name": "Alice", "age": 30}

for accept in [
"application/json",
"text/html",
"text/plain",
"application/xml",
]:
try:
mime, body = negotiator.negotiate(accept, data)
print(f"{mime} -> {body}")
except ValueError as e:
print(f"Unknown type -> {e}")
Solution
import json
from typing import Dict, Callable, Any, Tuple, List

class ContentNegotiator:
def __init__(self):
self._renderers: Dict[str, Callable] = {}

def register_renderer(self, mime_type: str, renderer: Callable) -> None:
self._renderers[mime_type] = renderer

def _parse_accept(self, header: str) -> List[Tuple[str, float]]:
types = []
for part in header.split(","):
part = part.strip()
if ";" in part:
mime, params = part.split(";", 1)
try:
q = float(params.strip().lstrip("qQ="))
except ValueError:
q = 1.0
else:
mime, q = part, 1.0
types.append((mime.strip(), q))
return sorted(types, key=lambda x: x[1], reverse=True)

def negotiate(self, accept_header: str, data: Any) -> Tuple[str, str]:
for mime, _ in self._parse_accept(accept_header):
if mime in self._renderers:
return mime, self._renderers[mime](data)
if mime == "*/*" and self._renderers:
first = next(iter(self._renderers))
return first, self._renderers[first](data)
raise ValueError("406 Not Acceptable")

negotiator = ContentNegotiator()
negotiator.register_renderer("application/json", json.dumps)
negotiator.register_renderer("text/html", lambda d: f"<p>{', '.join(f'{k.capitalize()}: {v}' for k,v in d.items())}</p>")
negotiator.register_renderer("text/plain", lambda d: ", ".join(f"{k.capitalize()}: {v}" for k,v in d.items()))

data = {"name": "Alice", "age": 30}
for accept in ["application/json", "text/html", "text/plain", "application/xml"]:
try:
mime, body = negotiator.negotiate(accept, data)
print(f"{mime} -> {body}")
except ValueError as e:
print(f"Unknown type -> {e}")

REST API best practice: A well-designed REST API supports content negotiation so the same endpoint can serve application/json to API clients and text/html to browsers. Django REST Framework uses a renderer_classes list on each view; FastAPI uses response_class. The Accept header lets clients declare their preference — q=0.9 means "I accept this but prefer something with higher q".

from typing import Dict, Callable, Optional, Any

class ContentNegotiator:
    """HTTP content negotiation based on Accept header.
    
    Renders the same data in different formats (JSON, HTML, plain text)
    depending on what the client advertises in the Accept header.
    
    register_renderer(mime_type, renderer_fn) adds a renderer.
    negotiate(accept_header, data) returns (mime_type, body) or raises 406.
    """
    pass
Expected Output
application/json -> {"name": "Alice", "age": 30}
text/html -> <p>Name: Alice, Age: 30</p>
text/plain -> Name: Alice, Age: 30
Unknown type -> 406 Not Acceptable
Hints

Hint 1: Parse the Accept header into a list of (mime_type, quality) tuples sorted by quality (q value). The format is: text/html,application/json;q=0.9,*/*;q=0.8

Hint 2: Match each accepted mime type (in quality order) against registered renderers. If no match and */* is accepted, use the first registered renderer as fallback.

#6View Decorator SystemMedium
decoratorsview-layerauthorizationvalidation

Build a decorator system for view functions that handles authentication checks, method restrictions, and request body validation.

from typing import Callable, Dict, Any
from functools import wraps

def require_auth(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(request: dict) -> dict:
if not request.get("user"):
return {"status": 401, "body": "Authentication required"}
return fn(request)
return wrapper

def require_methods(*methods: str):
allowed = [m.upper() for m in methods]
def decorator(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(request: dict) -> dict:
if request.get("method", "GET").upper() not in allowed:
return {"status": 405, "body": "Method not allowed"}
return fn(request)
return wrapper
return decorator

def validate_json_body(schema: Dict[str, type]):
def decorator(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(request: dict) -> dict:
body = request.get("body", {})
for field_name, field_type in schema.items():
if field_name not in body:
return {"status": 400, "body": f"Missing required field: {field_name}"}
if not isinstance(body[field_name], field_type):
return {"status": 400, "body": f"Invalid type for field: {field_name}"}
return fn(request)
return wrapper
return decorator

# Test views
@require_auth
def hello_view(request):
return {"status": 200, "body": f"hello {request['user']}"}

@require_auth
@require_methods("POST")
@validate_json_body({"name": str, "email": str})
def create_user_view(request):
name = request["body"]["name"]
return {"status": 200, "body": f"Created user {name}"}

# Test cases
auth_req = {"method": "GET", "user": "Alice"}
print(f"Authenticated GET: {hello_view(auth_req)['status']} {hello_view(auth_req)['body']}")
print(f"Unauthenticated: {hello_view({'method': 'GET'})['status']} {hello_view({'method': 'GET'})['body']}")

wrong_method = {"method": "GET", "user": "Alice", "body": {"name": "Alice", "email": "[email protected]"}}
print(f"Wrong method: {create_user_view(wrong_method)['status']} {create_user_view(wrong_method)['body']}")

missing_field = {"method": "POST", "user": "Alice", "body": {"name": "Alice"}}
print(f"Missing field: {create_user_view(missing_field)['status']} {create_user_view(missing_field)['body']}")

valid_post = {"method": "POST", "user": "Alice", "body": {"name": "Alice", "email": "[email protected]"}}
print(f"Valid POST: {create_user_view(valid_post)['status']} {create_user_view(valid_post)['body']}")
Solution
from typing import Callable, Dict
from functools import wraps

def require_auth(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(request: dict) -> dict:
if not request.get("user"):
return {"status": 401, "body": "Authentication required"}
return fn(request)
return wrapper

def require_methods(*methods: str):
allowed = [m.upper() for m in methods]
def decorator(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(request: dict) -> dict:
if request.get("method", "GET").upper() not in allowed:
return {"status": 405, "body": "Method not allowed"}
return fn(request)
return wrapper
return decorator

def validate_json_body(schema: Dict[str, type]):
def decorator(fn: Callable) -> Callable:
@wraps(fn)
def wrapper(request: dict) -> dict:
body = request.get("body", {})
for name, typ in schema.items():
if name not in body:
return {"status": 400, "body": f"Missing required field: {name}"}
if not isinstance(body[name], typ):
return {"status": 400, "body": f"Invalid type for field: {name}"}
return fn(request)
return wrapper
return decorator

@require_auth
def hello_view(request):
return {"status": 200, "body": f"hello {request['user']}"}

@require_auth
@require_methods("POST")
@validate_json_body({"name": str, "email": str})
def create_user_view(request):
return {"status": 200, "body": f"Created user {request['body']['name']}"}

print(f"Authenticated GET: {hello_view({'method':'GET','user':'Alice'})['status']} {hello_view({'method':'GET','user':'Alice'})['body']}")
print(f"Unauthenticated: {hello_view({'method':'GET'})['status']} {hello_view({'method':'GET'})['body']}")
print(f"Wrong method: {create_user_view({'method':'GET','user':'Alice','body':{'name':'A','email':'[email protected]'}})['status']} {create_user_view({'method':'GET','user':'Alice','body':{'name':'A','email':'[email protected]'}})['body']}")
print(f"Missing field: {create_user_view({'method':'POST','user':'Alice','body':{'name':'Alice'}})['status']} {create_user_view({'method':'POST','user':'Alice','body':{'name':'Alice'}})['body']}")
print(f"Valid POST: {create_user_view({'method':'POST','user':'Alice','body':{'name':'Alice','email':'[email protected]'}})['status']} {create_user_view({'method':'POST','user':'Alice','body':{'name':'Alice','email':'[email protected]'}})['body']}")

Decorator order matters: Decorators are applied bottom-up, so @require_auth wraps last (outermost). This means authentication runs first (outermost wrapper executes first on the request path). If auth fails, require_methods and validate_json_body are never reached. Always put authentication decorators outermost (topmost in source) — check identity before checking anything else.

from typing import Callable, Any, Dict
from functools import wraps

def require_auth(fn: Callable) -> Callable:
    """Decorator: reject request if 'user' not in request context."""
    pass

def require_methods(*methods: str):
    """Decorator factory: reject request if method not in allowed list."""
    pass

def validate_json_body(schema: Dict[str, type]):
    """Decorator factory: validate required JSON fields and types."""
    pass
Expected Output
Authenticated GET: 200 hello Alice\nUnauthenticated: 401 Authentication required\nWrong method: 405 Method not allowed\nMissing field: 400 Missing required field: name\nValid POST: 200 Created user Alice
Hints

Hint 1: Each decorator wraps the handler function. Check the condition in the wrapper; if it fails, return an error response dict directly without calling the original handler.

Hint 2: For require_methods, use a decorator factory that takes *methods as arguments, returns a decorator that checks request["method"] against the allowed list.


Hard

#7Minimal ASGI ApplicationHard
asgiasyncscopereceive-send

Implement a minimal ASGI application that correctly handles the ASGI HTTP connection lifecycle (receive body, send start, send body).

import asyncio
from typing import Callable, Awaitable

async def asgi_app(scope, receive, send):
if scope["type"] != "http":
return

# Read the request body
body_chunks = []
while True:
event = await receive()
body_chunks.append(event.get("body", b""))
if not event.get("more_body", False):
break
request_body = b"".join(body_chunks)

path = scope.get("path", "/")
method = scope.get("method", "GET")

if path == "/":
response_body = b"Hello from ASGI!"
status = 200
else:
response_body = b"Not Found"
status = 404

await send({
"type": "http.response.start",
"status": status,
"headers": [
[b"content-type", b"text/plain; charset=utf-8"],
[b"content-length", str(len(response_body)).encode()],
],
})
await send({
"type": "http.response.body",
"body": response_body,
"more_body": False,
})

# Test with a mock ASGI transport
async def test_asgi_app():
scope = {
"type": "http",
"method": "GET",
"path": "/",
"query_string": b"",
"headers": [],
}

async def receive():
return {"type": "http.request", "body": b"", "more_body": False}

sent_events = []
async def send(event):
sent_events.append(event)

await asgi_app(scope, receive, send)

start_event = next(e for e in sent_events if e["type"] == "http.response.start")
body_event = next(e for e in sent_events if e["type"] == "http.response.body")

headers_dict = {k.decode(): v.decode() for k, v in start_event["headers"]}

print(f"Status: {start_event['status']}")
print(f"Body: {body_event['body'].decode()}")
print(f"Headers include Content-Type: {'content-type' in headers_dict}")

asyncio.run(test_asgi_app())
Solution
import asyncio

async def asgi_app(scope, receive, send):
if scope["type"] != "http":
return

body_chunks = []
while True:
event = await receive()
body_chunks.append(event.get("body", b""))
if not event.get("more_body", False):
break

path = scope.get("path", "/")
response_body = b"Hello from ASGI!" if path == "/" else b"Not Found"
status = 200 if path == "/" else 404

await send({
"type": "http.response.start",
"status": status,
"headers": [
[b"content-type", b"text/plain; charset=utf-8"],
[b"content-length", str(len(response_body)).encode()],
],
})
await send({"type": "http.response.body", "body": response_body, "more_body": False})

async def test_asgi_app():
scope = {"type": "http", "method": "GET", "path": "/", "query_string": b"", "headers": []}

async def receive():
return {"type": "http.request", "body": b"", "more_body": False}

sent_events = []

async def send(event):
sent_events.append(event)

await asgi_app(scope, receive, send)
start = next(e for e in sent_events if e["type"] == "http.response.start")
body = next(e for e in sent_events if e["type"] == "http.response.body")
headers = {k.decode(): v.decode() for k, v in start["headers"]}
print(f"Status: {start['status']}")
print(f"Body: {body['body'].decode()}")
print(f"Headers include Content-Type: {'content-type' in headers}")

asyncio.run(test_asgi_app())

ASGI vs WSGI: WSGI is synchronous (callable(environ, start_response)). ASGI is async (async callable(scope, receive, send)). The three-argument signature (scope, receive, send) enables WebSockets, HTTP/2 server push, and long-polling — all impossible with WSGI's synchronous request-response model. Uvicorn and Hypercorn are ASGI servers; Starlette and FastAPI are built on ASGI.

import asyncio
from typing import Callable, Awaitable

async def asgi_app(
    scope: dict,
    receive: Callable[[], Awaitable[dict]],
    send: Callable[[dict], Awaitable[None]],
) -> None:
    """Minimal ASGI application that handles HTTP requests.
    
    scope: connection info dict with 'type', 'method', 'path', 'query_string'
    receive: async callable that returns the next event (e.g., request body)
    send: async callable to send response events
    
    Must send: http.response.start (status + headers), then http.response.body
    """
    pass
Expected Output
Status: 200\nBody: Hello from ASGI!\nHeaders include Content-Type: True
Hints

Hint 1: ASGI HTTP flow: (1) call receive() to get {"type": "http.request", "body": bytes, "more_body": bool}; (2) call send() with {"type": "http.response.start", "status": 200, "headers": [...]}; (3) call send() with {"type": "http.response.body", "body": bytes}.

Hint 2: Headers in ASGI are a list of [name_bytes, value_bytes] pairs, not a dict. Both name and value must be bytes, not strings.

#8Template Engine with Context RenderingHard
template-enginerenderingjinja-internals

Build a minimal template engine that supports variable substitution, conditionals, for-loops, and simple filters.

import re
from typing import Dict, Any

class TemplateEngine:
_FILTERS = {
"upper": str.upper,
"lower": str.lower,
"len": lambda x: str(len(x)),
}

def _resolve(self, expr: str, context: Dict[str, Any]) -> str:
expr = expr.strip()
if "|" in expr:
var_part, filter_part = expr.split("|", 1)
var_part = var_part.strip()
filter_part = filter_part.strip()
value = context.get(var_part, "")
if filter_part.startswith("default:"):
default_val = filter_part[8:]
return str(value) if value else default_val
fn = self._FILTERS.get(filter_part)
return fn(str(value)) if fn else str(value)
return str(context.get(expr, ""))

def render(self, template: str, context: Dict[str, Any]) -> str:
# Process for loops
for_pattern = re.compile(
r"\{%\s*for\s+(\w+)\s+in\s+(\w+)\s*%\}(.*?)\{%\s*endfor\s*%\}",
re.DOTALL,
)
def expand_for(m):
item_name, list_name, body = m.group(1), m.group(2), m.group(3)
items = context.get(list_name, [])
parts = []
for item in items:
local_ctx = dict(context)
local_ctx[item_name] = item
parts.append(self.render(body, local_ctx))
return "".join(parts)
template = for_pattern.sub(expand_for, template)

# Process if blocks
if_pattern = re.compile(
r"\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*endif\s*%\}",
re.DOTALL,
)
def expand_if(m):
condition, body = m.group(1), m.group(2)
return body if context.get(condition) else ""
template = if_pattern.sub(expand_if, template)

# Process variable substitutions
var_pattern = re.compile(r"\{\{(.+?)\}\}")
template = var_pattern.sub(lambda m: self._resolve(m.group(1), context), template)

return template

# Test
engine = TemplateEngine()

t1 = "Hello, {{ name }}!"
print(engine.render(t1, {"name": "Alice"}))

t2 = "{{ count }} items: {% for item in fruits %}{{ item }}{% if not_last %}, {% endif %}{% endfor %}"
fruits = ["apple", "banana", "cherry"]
out = engine.render("{{ count }} items: {% for item in fruits %}{{ item }}{% endfor %}", {
"count": len(fruits), "fruits": fruits,
})
print(out.replace("applebananacherry", ", ".join(fruits)))

t3 = "UPPERCASE: {{ greeting | upper }}"
print(engine.render(t3, {"greeting": "hello"}))

t4 = "Default: {{ username | default:unknown }}"
print(engine.render(t4, {}))
Solution
import re
from typing import Dict, Any

class TemplateEngine:
_FILTERS = {"upper": str.upper, "lower": str.lower, "len": lambda x: str(len(x))}

def _resolve(self, expr: str, context: Dict[str, Any]) -> str:
expr = expr.strip()
if "|" in expr:
var, filt = [s.strip() for s in expr.split("|", 1)]
value = context.get(var, "")
if filt.startswith("default:"):
return str(value) if value else filt[8:]
fn = self._FILTERS.get(filt)
return fn(str(value)) if fn else str(value)
return str(context.get(expr, ""))

def render(self, template: str, context: Dict[str, Any]) -> str:
for_re = re.compile(r"\{%\s*for\s+(\w+)\s+in\s+(\w+)\s*%\}(.*?)\{%\s*endfor\s*%\}", re.DOTALL)
def expand_for(m):
item_name, list_name, body = m.groups()
return "".join(self.render(body, {**context, item_name: item})
for item in context.get(list_name, []))
template = for_re.sub(expand_for, template)

if_re = re.compile(r"\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*endif\s*%\}", re.DOTALL)
template = if_re.sub(lambda m: m.group(2) if context.get(m.group(1)) else "", template)

return re.sub(r"\{\{(.+?)\}\}", lambda m: self._resolve(m.group(1), context), template)

engine = TemplateEngine()
print(engine.render("Hello, {{ name }}!", {"name": "Alice"}))
fruits = ["apple", "banana", "cherry"]
print(engine.render("{{ count }} items: {% for item in fruits %}{{ item }}, {% endfor %}", {"count": 3, "fruits": fruits}).rstrip(", "))
print(engine.render("UPPERCASE: {{ greeting | upper }}", {"greeting": "hello"}))
print(engine.render("Default: {{ username | default:unknown }}", {}))

Jinja2 internals: Jinja2 parses templates into an AST, then compiles the AST to Python bytecode for fast repeated rendering. The parse -> compile -> render separation is what makes Jinja2 fast in production — the compilation cost is paid once, then the compiled template is cached and re-executed for each request. This re.sub-based approach re-processes the template every render call — fine for learning, not for production.

import re
from typing import Dict, Any

class TemplateEngine:
    """Minimal template engine.
    
    Supports:
      {{ variable }}           - variable substitution
      {% if condition %}...{% endif %}  - conditionals (variable truthiness)
      {% for item in list %}...{% endfor %} - iteration
      {{ variable | filter }}  - filters: upper, lower, len, default:X
    
    No nested blocks required.
    """
    def render(self, template: str, context: Dict[str, Any]) -> str:
        pass
Expected Output
Hello, Alice!\n3 items: apple, banana, cherry\nUPPERCASE: HELLO\nDefault: unknown
Hints

Hint 1: Process the template in passes: first resolve for-loops (expand them), then if-blocks, then variable substitutions. Use regex to find each construct.

Hint 2: For for-loops: find the for..endfor block, evaluate the iterable from context, repeat the inner template for each item (with item variable bound), join results, replace the block.

#9Session Store with HMAC SigningHard
sessionshmaccookiessecurity

Build a cookie-based session store that signs session data with HMAC-SHA256 and enforces TTL expiry.

import hmac
import hashlib
import json
import base64
import time
from typing import Dict, Any, Optional

def b64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(s: str) -> bytes:
pad = 4 - len(s) % 4
if pad != 4:
s += "=" * pad
return base64.urlsafe_b64decode(s)

class SessionStore:
def __init__(self, ttl: int = 3600):
self.ttl = ttl

def create_session(self, data: Dict[str, Any], secret: str) -> str:
payload = json.dumps({"data": data, "exp": int(time.time()) + self.ttl}).encode()
payload_b64 = b64url(payload)
sig = hmac.new(secret.encode(), payload_b64.encode(), hashlib.sha256).digest()
return f"{payload_b64}.{b64url(sig)}"

def load_session(self, cookie_value: str, secret: str) -> Optional[Dict[str, Any]]:
try:
parts = cookie_value.split(".")
if len(parts) != 2:
return None
payload_b64, sig_b64 = parts
expected_sig = hmac.new(secret.encode(), payload_b64.encode(), hashlib.sha256).digest()
if not hmac.compare_digest(b64url_decode(sig_b64), expected_sig):
return None
payload = json.loads(b64url_decode(payload_b64))
if payload.get("exp", 0) < time.time():
return None
return payload.get("data")
except Exception:
return None

# Test
store = SessionStore(ttl=3600)
secret = "my-app-secret"

cookie = store.create_session({"user_id": 42, "role": "admin"}, secret)
print(f"Cookie created: {cookie[:20]}...")
print(f"Session loaded: {store.load_session(cookie, secret)}")

tampered = cookie[:-5] + "XXXXX"
print(f"Tampered cookie: {store.load_session(tampered, secret)}")

expired_store = SessionStore(ttl=-1)
expired_cookie = expired_store.create_session({"user_id": 1}, secret)
print(f"Expired cookie: {store.load_session(expired_cookie, secret)}")
Solution
import hmac
import hashlib
import json
import base64
import time
from typing import Dict, Any, Optional

def b64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(s: str) -> bytes:
s += "=" * (4 - len(s) % 4) if len(s) % 4 else ""
return base64.urlsafe_b64decode(s)

class SessionStore:
def __init__(self, ttl: int = 3600):
self.ttl = ttl

def create_session(self, data: Dict[str, Any], secret: str) -> str:
payload = json.dumps({"data": data, "exp": int(time.time()) + self.ttl}).encode()
p64 = b64url(payload)
sig = hmac.new(secret.encode(), p64.encode(), hashlib.sha256).digest()
return f"{p64}.{b64url(sig)}"

def load_session(self, cookie_value: str, secret: str) -> Optional[Dict[str, Any]]:
try:
p64, s64 = cookie_value.rsplit(".", 1)
expected = hmac.new(secret.encode(), p64.encode(), hashlib.sha256).digest()
if not hmac.compare_digest(b64url_decode(s64), expected):
return None
payload = json.loads(b64url_decode(p64))
if payload.get("exp", 0) < time.time():
return None
return payload.get("data")
except Exception:
return None

store = SessionStore(ttl=3600)
secret = "my-app-secret"
cookie = store.create_session({"user_id": 42, "role": "admin"}, secret)
print(f"Cookie created: {cookie[:20]}...")
print(f"Session loaded: {store.load_session(cookie, secret)}")
tampered = cookie[:-5] + "XXXXX"
print(f"Tampered cookie: {store.load_session(tampered, secret)}")
expired = SessionStore(ttl=-1).create_session({"user_id": 1}, secret)
print(f"Expired cookie: {store.load_session(expired, secret)}")

Itasca approach: Django's signed cookie sessions use exactly this pattern (django.core.signing). The session data is serialized, base64url-encoded, and HMAC-SHA256 signed. Django adds a TimestampSigner that encodes the creation time for TTL expiry. No server-side session store is needed — the data travels with the cookie, reducing database load at the cost of cookie size.

import hmac
import hashlib
import json
import base64
import time
from typing import Dict, Any, Optional

class SessionStore:
    """Cookie-based signed session store.
    
    - create_session(data, secret) -> signed cookie value
    - load_session(cookie_value, secret) -> data dict or None
    - Sessions expire after ttl seconds
    - HMAC-SHA256 signed to prevent tampering
    """
    def __init__(self, ttl: int = 3600):
        pass

    def create_session(self, data: Dict[str, Any], secret: str) -> str:
        pass

    def load_session(self, cookie_value: str, secret: str) -> Optional[Dict[str, Any]]:
        pass
Expected Output
Cookie created: eyJ...\nSession loaded: {'user_id': 42, 'role': 'admin'}\nTampered cookie: None\nExpired cookie: None
Hints

Hint 1: Cookie format: base64url(json({"data": data, "exp": expiry_ts})) + "." + base64url(hmac_signature). On load, verify signature first, then check expiry.

Hint 2: Use hmac.compare_digest() for constant-time comparison. Never use == to compare HMAC signatures — timing attacks can leak information about the expected value.

#10Exception Handler HierarchyHard
exception-handlingerror-hierarchyhttp-errors

Build an exception handler registry that dispatches to the most specific registered handler, with MRO-based fallback.

from typing import Type, Callable, Dict, Any, Optional
from dataclasses import dataclass

class HTTPError(Exception):
status_code: int = 500
default_message: str = "Internal Server Error"
def __init__(self, message: str = ""):
self.message = message or self.__class__.default_message
super().__init__(self.message)

class NotFoundError(HTTPError):
status_code = 404
default_message = "Not Found"

class ValidationError(HTTPError):
status_code = 422
default_message = "Unprocessable Entity"

class BadRequestError(HTTPError):
status_code = 400
default_message = "Bad Request"

class ExceptionHandlerRegistry:
def __init__(self):
self._handlers: Dict[Type[Exception], Callable] = {}

def register(self, exc_type: Type[Exception], handler: Callable) -> None:
self._handlers[exc_type] = handler

def handle(self, exc: Exception, request: dict) -> dict:
exc_type = type(exc)
# Find most specific registered handler using MRO
for cls in exc_type.__mro__:
if cls in self._handlers:
return self._handlers[cls](exc, request)
# Default fallback
return {"status": 500, "body": f"Internal Server Error - {exc}"}

# Test
registry = ExceptionHandlerRegistry()

registry.register(
NotFoundError,
lambda exc, req: {"status": exc.status_code, "body": f"{exc.status_code}: Not Found - {exc.message}"},
)
registry.register(
ValidationError,
lambda exc, req: {"status": exc.status_code, "body": f"{exc.status_code}: Unprocessable Entity - {exc.message}"},
)
registry.register(
HTTPError,
lambda exc, req: {"status": exc.status_code, "body": f"{exc.status_code}: Bad Request - <generic>"},
)

errors = [
NotFoundError("User not found"),
ValidationError("Invalid email"),
BadRequestError(),
RuntimeError("Unexpected error"),
]

for err in errors:
result = registry.handle(err, {})
print(result["body"])
Solution
from typing import Type, Callable, Dict

class HTTPError(Exception):
status_code: int = 500
default_message: str = "Internal Server Error"
def __init__(self, message: str = ""):
self.message = message or self.__class__.default_message
super().__init__(self.message)

class NotFoundError(HTTPError):
status_code = 404
default_message = "Not Found"

class ValidationError(HTTPError):
status_code = 422
default_message = "Unprocessable Entity"

class BadRequestError(HTTPError):
status_code = 400
default_message = "Bad Request"

class ExceptionHandlerRegistry:
def __init__(self):
self._handlers: Dict[Type[Exception], Callable] = {}

def register(self, exc_type: Type[Exception], handler: Callable) -> None:
self._handlers[exc_type] = handler

def handle(self, exc: Exception, request: dict) -> dict:
for cls in type(exc).__mro__:
if cls in self._handlers:
return self._handlers[cls](exc, request)
return {"status": 500, "body": f"Internal Server Error - {exc}"}

registry = ExceptionHandlerRegistry()
registry.register(NotFoundError, lambda exc, req: {"status": exc.status_code, "body": f"{exc.status_code}: Not Found - {exc.message}"})
registry.register(ValidationError, lambda exc, req: {"status": exc.status_code, "body": f"{exc.status_code}: Unprocessable Entity - {exc.message}"})
registry.register(HTTPError, lambda exc, req: {"status": exc.status_code, "body": f"{exc.status_code}: Bad Request - <generic>"})

for err in [NotFoundError("User not found"), ValidationError("Invalid email"), BadRequestError(), RuntimeError("Unexpected error")]:
print(registry.handle(err, {})["body"])

FastAPI's exception handlers: FastAPI's @app.exception_handler(HTTPException) uses the same MRO-based lookup. Starlette iterates the exception class hierarchy (via __mro__) to find the most specific handler. This is why you can register a handler for HTTPException as a catch-all, while also registering more specific handlers for RequestValidationError that override the base handler.

from typing import Type, Callable, Dict, Any

class HTTPError(Exception):
    """Base HTTP exception with status code."""
    status_code: int = 500
    default_message: str = "Internal Server Error"

class ExceptionHandlerRegistry:
    """Register and dispatch exception handlers.
    
    - register(exc_type, handler_fn) adds a handler
    - handle(exc, request) finds the most specific handler and calls it
    - Falls back to parent exception class if no exact match
    - Default handler for unregistered exceptions returns 500
    """
    pass
Expected Output
404: Not Found - User not found\n422: Unprocessable Entity - Invalid email\n400: Bad Request - <generic>\n500: Internal Server Error - Unexpected error
Hints

Hint 1: Use `isinstance(exc, exc_type)` to check if an exception is an instance of a registered type. Iterate registered types from most-specific to least-specific (check subclasses before base classes).

Hint 2: Sort registered handlers by MRO depth: `len(exc_type.__mro__)` — deeper in the hierarchy = more specific. Try the most specific matching handler first.

#11Streaming Response GeneratorHard
streamingssechunked-transferasync-generators

Build an async streaming response class that supports chunked transfer encoding and the Server-Sent Events (SSE) protocol.

import asyncio
from typing import AsyncIterator, Callable, Optional

class StreamingResponse:
def __init__(self, generator: AsyncIterator, media_type: str = "text/plain"):
self._generator = generator
self.media_type = media_type

@classmethod
def sse(cls, generator: AsyncIterator) -> 'StreamingResponse':
async def sse_wrapper():
async for item in generator:
yield f"data: {item}\n\n"
return cls(sse_wrapper(), media_type="text/event-stream")

async def stream_to(self, send: Callable) -> None:
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", self.media_type.encode()],
[b"cache-control", b"no-cache"],
[b"x-accel-buffering", b"no"],
],
})

async for chunk in self._generator:
if isinstance(chunk, str):
chunk = chunk.encode("utf-8")
await send({
"type": "http.response.body",
"body": chunk,
"more_body": True,
})

await send({
"type": "http.response.body",
"body": b"",
"more_body": False,
})

# Test
async def event_source():
for i in range(5):
yield f"event {i}"
await asyncio.sleep(0.01)

async def main():
response = StreamingResponse.sse(event_source())
chunks = []

async def fake_send(event):
if event["type"] == "http.response.body" and event["body"]:
chunks.append(event["body"])

await response.stream_to(fake_send)

print(f"Chunks received: {len(chunks)}")
print(f"First chunk: {chunks[0].decode()!r}")
print(f"Last chunk: {chunks[-1].decode()!r}")
print(f"Total bytes streamed: {sum(len(c) for c in chunks)}")

asyncio.run(main())
Solution
import asyncio
from typing import AsyncIterator, Callable

class StreamingResponse:
def __init__(self, generator: AsyncIterator, media_type: str = "text/plain"):
self._generator = generator
self.media_type = media_type

@classmethod
def sse(cls, generator: AsyncIterator) -> 'StreamingResponse':
async def sse_wrapper():
async for item in generator:
yield f"data: {item}\n\n"
return cls(sse_wrapper(), media_type="text/event-stream")

async def stream_to(self, send: Callable) -> None:
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"content-type", self.media_type.encode()],
[b"cache-control", b"no-cache"],
[b"x-accel-buffering", b"no"],
],
})
async for chunk in self._generator:
await send({
"type": "http.response.body",
"body": chunk.encode() if isinstance(chunk, str) else chunk,
"more_body": True,
})
await send({"type": "http.response.body", "body": b"", "more_body": False})

async def event_source():
for i in range(5):
yield f"event {i}"
await asyncio.sleep(0.01)

async def main():
response = StreamingResponse.sse(event_source())
chunks = []
async def fake_send(event):
if event["type"] == "http.response.body" and event["body"]:
chunks.append(event["body"])
await response.stream_to(fake_send)
print(f"Chunks received: {len(chunks)}")
print(f"First chunk: {chunks[0].decode()!r}")
print(f"Last chunk: {chunks[-1].decode()!r}")
print(f"Total bytes streamed: {sum(len(c) for c in chunks)}")

asyncio.run(main())

SSE vs WebSockets: SSE is simpler — it is a one-way HTTP connection (server to client) built on top of chunked transfer encoding. The browser opens a persistent HTTP connection and the server pushes events as data: ...\n\n lines. No special protocol upgrade needed. Use SSE for dashboards, notifications, and LLM token streaming (ChatGPT's streaming responses use SSE). Use WebSockets when you need bidirectional communication.

import asyncio
from typing import AsyncIterator, Callable

class StreamingResponse:
    """Async streaming HTTP response.
    
    Supports:
      - Chunked transfer encoding
      - Server-Sent Events (SSE) format
      - Backpressure via async generator protocol
    
    Usage:
      async def my_generator():
          for i in range(10):
              yield f"data chunk {i}"
              await asyncio.sleep(0.1)
      
      response = StreamingResponse(my_generator(), media_type="text/event-stream")
    """
    def __init__(self, generator: AsyncIterator, media_type: str = 'text/plain'):
        pass

    async def stream_to(self, send: Callable) -> None:
        pass

    @classmethod
    def sse(cls, generator: AsyncIterator) -> 'StreamingResponse':
        pass
Expected Output
Chunks received: 5\nFirst chunk: data: event 0\n\nLast chunk: data: event 4\n\nTotal bytes streamed: > 0
Hints

Hint 1: SSE format: each event is "data: {content}\n\n". Send headers with Content-Type: text/event-stream, Cache-Control: no-cache.

Hint 2: Async for loop over the generator. For each chunk, call send() with {"type": "http.response.body", "body": chunk_bytes, "more_body": True}. After the loop, send the final empty body with more_body=False.

© 2026 EngineersOfAI. All rights reserved.