Skip to main content

Python Build a Production API Platform: Practice Problems & Exercises

Practice: Build a Production API Platform

11 problems2 Easy4 Medium5 Hard90–120 min
← Back to lesson

Easy

#1Route Registration RegistryEasy
routingregistryfastapi-patterns

Build a simple RouteRegistry class that stores and lists API route registrations. It should support register(method, path, handler) and list_routes().

from dataclasses import dataclass, field
from typing import Callable, List

@dataclass
class Route:
method: str
path: str
handler: Callable

class RouteRegistry:
def __init__(self):
self.routes: List[Route] = []

def register(self, method: str, path: str, handler: Callable):
self.routes.append(Route(method.upper(), path, handler))

def list_routes(self):
for route in self.routes:
print(f"{route.method} {route.path} -> {route.handler.__name__}")

# Test
registry = RouteRegistry()

def list_users(): pass
def create_user(): pass
def get_user(): pass
def delete_user(): pass

registry.register("GET", "/users", list_users)
registry.register("POST", "/users", create_user)
registry.register("GET", "/users/{id}", get_user)
registry.register("DELETE", "/users/{id}", delete_user)

registry.list_routes()
Solution
from dataclasses import dataclass, field
from typing import Callable, List

@dataclass
class Route:
method: str
path: str
handler: Callable

class RouteRegistry:
def __init__(self):
self.routes: List[Route] = []

def register(self, method: str, path: str, handler: Callable):
self.routes.append(Route(method.upper(), path, handler))

def list_routes(self):
for route in self.routes:
print(f"{route.method} {route.path} -> {route.handler.__name__}")

registry = RouteRegistry()

def list_users(): pass
def create_user(): pass
def get_user(): pass
def delete_user(): pass

registry.register("GET", "/users", list_users)
registry.register("POST", "/users", create_user)
registry.register("GET", "/users/{id}", get_user)
registry.register("DELETE", "/users/{id}", delete_user)

registry.list_routes()

Key insight: A route registry is the core data structure powering every web framework. FastAPI, Flask, and Django all maintain an internal registry mapping (method, path) pairs to handler callables. Understanding this structure helps you reason about how @app.get("/users") decorators work under the hood — they simply call app.register("GET", "/users", handler).

Expected Output
GET /users -> list_users\nPOST /users -> create_user\nGET /users/{id} -> get_user\nDELETE /users/{id} -> delete_user
Hints

Hint 1: A route registry is just a list of (method, path, handler_name) tuples. Iterate and print each entry.

Hint 2: Use a dataclass or namedtuple to represent a route entry. Group by HTTP method for clarity.

#2Request/Response Model ValidatorEasy
pydanticvalidationmodels

Build a UserCreateRequest validator (without Pydantic) that validates user creation input and returns structured errors.

import re
from dataclasses import dataclass
from typing import Dict, Any, Tuple

@dataclass
class UserCreateRequest:
name: str
email: str
age: int

def validate(self) -> Tuple[bool, Dict[str, Any]]:
errors = {}

if not self.name or not self.name.strip():
errors["name"] = "Name cannot be empty"

if not self.email or "@" not in self.email:
errors["email"] = "Invalid email address"

if not isinstance(self.age, int) or self.age < 0 or self.age > 150:
errors["age"] = "Age must be between 0 and 150"

if errors:
return False, errors
return True, {"name": self.name, "email": self.email, "age": self.age}

# Test
valid_req = UserCreateRequest(name="Alice", email="[email protected]", age=30)
invalid_req = UserCreateRequest(name="", email="not-an-email", age=-1)

ok, result = valid_req.validate()
print(f"Valid: {ok}", result)

ok, result = invalid_req.validate()
print(f"Valid: {ok}", result)
Solution
from dataclasses import dataclass
from typing import Dict, Any, Tuple

@dataclass
class UserCreateRequest:
name: str
email: str
age: int

def validate(self) -> Tuple[bool, Dict[str, Any]]:
errors = {}

if not self.name or not self.name.strip():
errors["name"] = "Name cannot be empty"

if not self.email or "@" not in self.email:
errors["email"] = "Invalid email address"

if not isinstance(self.age, int) or self.age < 0 or self.age > 150:
errors["age"] = "Age must be between 0 and 150"

if errors:
return False, errors
return True, {"name": self.name, "email": self.email, "age": self.age}

valid_req = UserCreateRequest(name="Alice", email="[email protected]", age=30)
invalid_req = UserCreateRequest(name="", email="not-an-email", age=-1)

ok, result = valid_req.validate()
print(f"Valid: {ok}", result)

ok, result = invalid_req.validate()
print(f"Valid: {ok}", result)

Production note: Pydantic does exactly this but with far more power — it coerces types, generates JSON Schema, and integrates with FastAPI's OpenAPI documentation. The pattern here (validate -> return errors dict) mirrors how Pydantic's ValidationError works internally.

Expected Output
Valid: True {'name': 'Alice', 'email': '[email protected]', 'age': 30}\nValid: False {'name': '', 'email': 'not-an-email', 'age': -1}
Hints

Hint 1: Use a dataclass with a `validate()` method that checks each field against rules and returns (is_valid, errors) or (is_valid, cleaned_data).

Hint 2: Check: name non-empty, email contains @, age between 0 and 150.


Medium

#3Dependency Injection ContainerMedium
dependency-injectionioc-containerfastapi-di

Build a lightweight dependency injection container that resolves dependencies by name and injects them into functions automatically.

import inspect
from typing import Callable, Any, Dict

class DIContainer:
def __init__(self):
self._factories: Dict[str, Callable] = {}
self._cache: Dict[str, Any] = {}

def register(self, name: str, factory: Callable) -> None:
self._factories[name] = factory

def get(self, name: str) -> Any:
if name not in self._cache:
if name not in self._factories:
raise KeyError(f"No dependency registered for '{name}'")
self._cache[name] = self._factories[name]()
return self._cache[name]

def inject(self, fn: Callable) -> Any:
sig = inspect.signature(fn)
kwargs = {name: self.get(name) for name in sig.parameters}
return fn(**kwargs)


# Test fixtures
class Database:
def __init__(self, url: str):
self.url = url
def __repr__(self):
return f"<Database url={self.url}>"

class Mailer:
def __init__(self, smtp: str):
self.smtp = smtp
def __repr__(self):
return f"<Mailer smtp={self.smtp}>"

container = DIContainer()
container.register("db", lambda: Database("postgresql://localhost/app"))
container.register("mailer", lambda: Mailer("smtp.example.com"))

def create_user(db, mailer):
return f"User created: [email protected]"

print(f"db={container.get('db')}")
print(f"mailer={container.get('mailer')}")
print(f"result={container.inject(create_user)}")
Solution
import inspect
from typing import Callable, Any, Dict

class DIContainer:
def __init__(self):
self._factories: Dict[str, Callable] = {}
self._cache: Dict[str, Any] = {}

def register(self, name: str, factory: Callable) -> None:
self._factories[name] = factory

def get(self, name: str) -> Any:
if name not in self._cache:
if name not in self._factories:
raise KeyError(f"No dependency registered for '{name}'")
self._cache[name] = self._factories[name]()
return self._cache[name]

def inject(self, fn: Callable) -> Any:
sig = inspect.signature(fn)
kwargs = {name: self.get(name) for name in sig.parameters}
return fn(**kwargs)

class Database:
def __init__(self, url: str):
self.url = url
def __repr__(self):
return f"<Database url={self.url}>"

class Mailer:
def __init__(self, smtp: str):
self.smtp = smtp
def __repr__(self):
return f"<Mailer smtp={self.smtp}>"

container = DIContainer()
container.register("db", lambda: Database("postgresql://localhost/app"))
container.register("mailer", lambda: Mailer("smtp.example.com"))

def create_user(db, mailer):
return f"User created: [email protected]"

print(f"db={container.get('db')}")
print(f"mailer={container.get('mailer')}")
print(f"result={container.inject(create_user)}")

How FastAPI's DI works: FastAPI uses Depends() to declare dependencies in route handlers. When a request arrives, FastAPI inspects the handler's signature, resolves each Depends() by calling the dependency function, and injects the results. This container is the same idea — inspect.signature for introspection, a factory registry, and singleton caching.

from typing import Callable, Any, Dict, Type

class DIContainer:
    """Lightweight dependency injection container.
    
    Supports:
      - register(name, factory_fn) to register a factory
      - get(name) to resolve and cache a dependency
      - inject(fn) to call fn with resolved dependencies
    """
    def __init__(self):
        self._factories: Dict[str, Callable] = {}
        self._cache: Dict[str, Any] = {}

    def register(self, name: str, factory: Callable) -> None:
        pass

    def get(self, name: str) -> Any:
        pass

    def inject(self, fn: Callable) -> Any:
        pass
Expected Output
db=<Database url=postgresql://localhost/app>\nmailer=<Mailer smtp=smtp.example.com>\nresult=User created: [email protected]
Hints

Hint 1: Use `inspect.signature(fn).parameters` to get parameter names, then call `self.get(name)` for each to build the kwargs dict.

Hint 2: Cache resolved dependencies in `self._cache` so factories are only called once (singleton pattern).

#4Token Bucket Rate LimiterMedium
rate-limitingtoken-bucketmiddleware

Implement a token bucket rate limiter that allows bursting up to capacity requests but limits sustained throughput to refill_rate requests/second.

import time
from typing import Dict, Tuple

class TokenBucketRateLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self._buckets: Dict[str, Tuple[float, float]] = {}

def _get_bucket(self, key: str) -> Tuple[float, float]:
if key not in self._buckets:
self._buckets[key] = (float(self.capacity), time.monotonic())
return self._buckets[key]

def is_allowed(self, client_key: str) -> bool:
tokens, last_refill = self._get_bucket(client_key)
now = time.monotonic()
elapsed = now - last_refill
tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
if tokens >= 1:
self._buckets[client_key] = (tokens - 1, now)
return True
self._buckets[client_key] = (tokens, now)
return False

# Test: capacity=5, refill 2/sec
limiter = TokenBucketRateLimiter(capacity=5, refill_rate=2.0)

results = [limiter.is_allowed("user_1") for _ in range(6)]
for i, allowed in enumerate(results, 1):
status = "allowed" if allowed else "rate_limited"
print(f"Request {i}: {status}")

time.sleep(1.0)
print(f"After 1s refill: {'allowed' if limiter.is_allowed('user_1') else 'rate_limited'}")
Solution
import time
from typing import Dict, Tuple

class TokenBucketRateLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self._buckets: Dict[str, Tuple[float, float]] = {}

def _get_bucket(self, key: str) -> Tuple[float, float]:
if key not in self._buckets:
self._buckets[key] = (float(self.capacity), time.monotonic())
return self._buckets[key]

def is_allowed(self, client_key: str) -> bool:
tokens, last_refill = self._get_bucket(client_key)
now = time.monotonic()
elapsed = now - last_refill
tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
if tokens >= 1:
self._buckets[client_key] = (tokens - 1, now)
return True
self._buckets[client_key] = (tokens, now)
return False

limiter = TokenBucketRateLimiter(capacity=5, refill_rate=2.0)

results = [limiter.is_allowed("user_1") for _ in range(6)]
for i, allowed in enumerate(results, 1):
status = "allowed" if allowed else "rate_limited"
print(f"Request {i}: {status}")

time.sleep(1.0)
print(f"After 1s refill: {'allowed' if limiter.is_allowed('user_1') else 'rate_limited'}")

Token bucket vs fixed window: A fixed window counter allows a burst of N requests at the end of one window and N at the start of the next (2N effectively). Token bucket naturally smooths this — tokens accumulate over time, so bursting is bounded by capacity and you can never accumulate more than capacity requests to fire at once.

Production note: In distributed systems, store bucket state in Redis (with Lua scripts for atomic compare-and-set) rather than in-process memory.

import time
from typing import Dict

class TokenBucketRateLimiter:
    """Token bucket rate limiter per client key.
    
    Each client gets a bucket of 'capacity' tokens.
    Tokens refill at 'refill_rate' tokens/second.
    Each request consumes 1 token.
    Returns True if request is allowed, False if rate-limited.
    """
    def __init__(self, capacity: int, refill_rate: float):
        pass

    def is_allowed(self, client_key: str) -> bool:
        pass
Expected Output
Requests 1-5: allowed\nRequest 6: rate_limited\nAfter 1s refill: allowed
Hints

Hint 1: Store (tokens, last_refill_time) per client key. On each request, compute elapsed time, add elapsed * refill_rate tokens (capped at capacity), then check if tokens >= 1.

Hint 2: Use `time.monotonic()` for the timestamp — it never goes backwards, unlike `time.time()`.

#5JWT Auth MiddlewareMedium
jwtauthmiddlewaresecurity

Implement a minimal JWT HS256 token creator and verifier from scratch — no external libraries.

import base64
import json
import hmac
import hashlib
from typing import Optional, Dict, Any

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)

class JWTMiddleware:
@staticmethod
def create_token(payload: Dict[str, Any], secret: str) -> str:
header = b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
body = b64url_encode(json.dumps(payload).encode())
signing_input = f"{header}.{body}"
sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
return f"{signing_input}.{b64url_encode(sig)}"

@staticmethod
def verify_token(token: str, secret: str) -> Optional[Dict[str, Any]]:
try:
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
if not hmac.compare_digest(b64url_decode(sig_b64), expected_sig):
return None
return json.loads(b64url_decode(payload_b64))
except Exception:
return None

# Test
secret = "super-secret-key-do-not-share"
token = JWTMiddleware.create_token({"user_id": 42, "role": "admin"}, secret)
print(f"Token: {token[:20]}...")
print(f"Payload: {JWTMiddleware.verify_token(token, secret)}")

tampered = token[:-5] + "XXXXX"
print(f"Tampered token rejected: {JWTMiddleware.verify_token(tampered, secret) is None}")
Solution
import base64
import json
import hmac
import hashlib
from typing import Optional, Dict, Any

def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()

def b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)

class JWTMiddleware:
@staticmethod
def create_token(payload: Dict[str, Any], secret: str) -> str:
header = b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
body = b64url_encode(json.dumps(payload).encode())
signing_input = f"{header}.{body}"
sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
return f"{signing_input}.{b64url_encode(sig)}"

@staticmethod
def verify_token(token: str, secret: str) -> Optional[Dict[str, Any]]:
try:
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
if not hmac.compare_digest(b64url_decode(sig_b64), expected_sig):
return None
return json.loads(b64url_decode(payload_b64))
except Exception:
return None

secret = "super-secret-key-do-not-share"
token = JWTMiddleware.create_token({"user_id": 42, "role": "admin"}, secret)
print(f"Token: {token[:20]}...")
print(f"Payload: {JWTMiddleware.verify_token(token, secret)}")

tampered = token[:-5] + "XXXXX"
print(f"Tampered token rejected: {JWTMiddleware.verify_token(tampered, secret) is None}")

Critical security details:

  • hmac.compare_digest() uses constant-time comparison to prevent timing attacks. Never use == to compare MAC values.
  • Always validate the alg header to prevent the "none" algorithm attack (never accept alg: none).
  • In production, add exp (expiry) and iat (issued-at) claims and validate them on every request.
import base64
import json
import hmac
import hashlib
from typing import Optional, Dict, Any

class JWTMiddleware:
    """Minimal JWT HS256 auth middleware.
    
    Supports:
      - create_token(payload, secret) -> token string
      - verify_token(token, secret) -> payload dict or None
    Does NOT handle expiry (add exp claim checking as an extension).
    """
    @staticmethod
    def create_token(payload: Dict[str, Any], secret: str) -> str:
        pass

    @staticmethod
    def verify_token(token: str, secret: str) -> Optional[Dict[str, Any]]:
        pass
Expected Output
Token: eyJ...\nPayload: {'user_id': 42, 'role': 'admin'}\nTampered token rejected: True
Hints

Hint 1: JWT = base64url(header) + "." + base64url(payload) + "." + base64url(HMAC-SHA256(header+"."+payload, secret)). Use hmac.new() with hashlib.sha256.

Hint 2: For base64url encoding: replace + with - and / with _ and strip = padding. For decoding, add back = padding before decoding.

#6Middleware Stack PipelineMedium
middlewarepipelinedecorator-chain

Build a middleware pipeline where each middleware can intercept requests and responses, similar to Express.js or ASGI middleware.

from typing import Callable, Dict, Any

class MiddlewareStack:
def __init__(self, final_handler: Callable):
self._handler = final_handler
self._middlewares: list = []

def use(self, middleware: Callable) -> 'MiddlewareStack':
self._middlewares.append(middleware)
return self

def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
handler = self._handler
for mw in reversed(self._middlewares):
next_handler = handler
def make_mw(m, n):
return lambda req: m(req, n)
handler = make_mw(mw, next_handler)
return handler(request)

# Test middlewares
import time

def logging_middleware(request, next_handler):
print(f"[logging] {request['method']} {request['path']}")
response = next_handler(request)
print(f"[logging] Response: {response['status']}")
return response

def auth_middleware(request, next_handler):
token = request.get("headers", {}).get("Authorization", "")
print(f"[auth] Checking token: {token}")
return next_handler(request)

def timing_middleware(request, next_handler):
start = time.perf_counter()
print("[timing] Processing...")
response = next_handler(request)
elapsed = time.perf_counter() - start
print(f"[timing] Response time: {elapsed:.2f}s")
return response

def final_handler(request):
print("Handler: listing users")
return {"status": 200, "body": ["alice", "bob"]}

stack = MiddlewareStack(final_handler)
stack.use(logging_middleware).use(auth_middleware).use(timing_middleware)

stack.handle({
"method": "GET",
"path": "/api/users",
"headers": {"Authorization": "Bearer test123"},
})
Solution
from typing import Callable, Dict, Any
import time

class MiddlewareStack:
def __init__(self, final_handler: Callable):
self._handler = final_handler
self._middlewares: list = []

def use(self, middleware: Callable) -> 'MiddlewareStack':
self._middlewares.append(middleware)
return self

def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
handler = self._handler
for mw in reversed(self._middlewares):
next_handler = handler
def make_mw(m, n):
return lambda req: m(req, n)
handler = make_mw(mw, next_handler)
return handler(request)

def logging_middleware(request, next_handler):
print(f"[logging] {request['method']} {request['path']}")
response = next_handler(request)
print(f"[logging] Response: {response['status']}")
return response

def auth_middleware(request, next_handler):
token = request.get("headers", {}).get("Authorization", "")
print(f"[auth] Checking token: {token}")
return next_handler(request)

def timing_middleware(request, next_handler):
start = time.perf_counter()
print("[timing] Processing...")
response = next_handler(request)
elapsed = time.perf_counter() - start
print(f"[timing] Response time: {elapsed:.2f}s")
return response

def final_handler(request):
print("Handler: listing users")
return {"status": 200, "body": ["alice", "bob"]}

stack = MiddlewareStack(final_handler)
stack.use(logging_middleware).use(auth_middleware).use(timing_middleware)

stack.handle({
"method": "GET",
"path": "/api/users",
"headers": {"Authorization": "Bearer test123"},
})

Execution order: Middlewares are applied outermost-first (the first use() call wraps the outermost). Request flows in: logging -> auth -> timing -> handler. Response flows out: timing -> auth -> logging. This is the classic "onion model" used by Express, Koa, Django, and Starlette/FastAPI.

Common closure bug: Without make_mw(), all closures in the for loop would capture the same mw variable (its final value). Always bind loop variables explicitly when building closure chains.

from typing import Callable, List, Any

class MiddlewareStack:
    """WSGI/ASGI-style middleware pipeline.
    
    Middleware functions have signature:
      middleware(request, next_handler) -> response
    
    They can modify request before calling next_handler,
    modify response after, or short-circuit entirely.
    """
    def __init__(self, final_handler: Callable):
        pass

    def use(self, middleware: Callable) -> 'MiddlewareStack':
        pass

    def handle(self, request: dict) -> dict:
        pass
Expected Output
[logging] GET /api/users\n[auth] Checking token: Bearer test123\n[timing] Processing...\nHandler: listing users\n[timing] Response time: 0.00s\n[logging] Response: 200
Hints

Hint 1: Store middleware in a list. When handling a request, build a chain of callables: each middleware gets a `next` that calls the next middleware in the chain (or the final handler).

Hint 2: Build the chain by iterating middleware in reverse. Start with the final_handler, then wrap each middleware around it using a closure that captures the current `next`.


Hard

#7Async Connection PoolHard
asyncioconnection-poolcontext-manager

Build an async connection pool that manages a fixed set of database connections with acquire/release semantics and proper async context manager support.

import asyncio
from contextlib import asynccontextmanager
from typing import Any

class FakeConnection:
"""Simulates a database connection."""
_counter = 0
def __init__(self):
FakeConnection._counter += 1
self.id = f"conn-{FakeConnection._counter - 1}"
self.closed = False

async def close(self):
self.closed = True

class AsyncConnectionPool:
def __init__(self, factory, min_size: int = 2, max_size: int = 10):
self._factory = factory
self._min_size = min_size
self._max_size = max_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._all_connections: list = []

async def start(self) -> None:
for _ in range(self._min_size):
conn = await asyncio.coroutine(self._factory)() if asyncio.iscoroutinefunction(self._factory) else self._factory()
self._all_connections.append(conn)
await self._pool.put(conn)
print(f"Pool started with {self._min_size} connections")

async def acquire(self, timeout: float = 5.0):
try:
conn = await asyncio.wait_for(self._pool.get(), timeout=timeout)
return conn
except asyncio.TimeoutError:
raise TimeoutError("No connection available in pool")

async def release(self, conn) -> None:
await self._pool.put(conn)

@asynccontextmanager
async def connection(self, timeout: float = 5.0):
conn = await self.acquire(timeout)
try:
yield conn
finally:
await self.release(conn)

async def close(self) -> None:
for conn in self._all_connections:
await conn.close()
print("Pool closed")

async def main():
pool = AsyncConnectionPool(factory=FakeConnection, min_size=2)
await pool.start()

conn1 = await pool.acquire()
conn2 = await pool.acquire()
print(f"Acquired: {conn1.id}")
print(f"Acquired: {conn2.id}")

await pool.release(conn1)
print(f"Released: {conn1.id}")

conn3 = await pool.acquire()
print(f"Acquired: {conn3.id} (reused)")

await pool.release(conn2)
await pool.release(conn3)
await pool.close()

asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager

class FakeConnection:
_counter = 0
def __init__(self):
FakeConnection._counter += 1
self.id = f"conn-{FakeConnection._counter - 1}"
self.closed = False

async def close(self):
self.closed = True

class AsyncConnectionPool:
def __init__(self, factory, min_size: int = 2, max_size: int = 10):
self._factory = factory
self._min_size = min_size
self._max_size = max_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._all_connections: list = []

async def start(self) -> None:
for _ in range(self._min_size):
conn = self._factory()
self._all_connections.append(conn)
await self._pool.put(conn)
print(f"Pool started with {self._min_size} connections")

async def acquire(self, timeout: float = 5.0):
try:
return await asyncio.wait_for(self._pool.get(), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError("No connection available in pool")

async def release(self, conn) -> None:
await self._pool.put(conn)

@asynccontextmanager
async def connection(self, timeout: float = 5.0):
conn = await self.acquire(timeout)
try:
yield conn
finally:
await self.release(conn)

async def close(self) -> None:
for conn in self._all_connections:
await conn.close()
print("Pool closed")

async def main():
pool = AsyncConnectionPool(factory=FakeConnection, min_size=2)
await pool.start()
conn1 = await pool.acquire()
conn2 = await pool.acquire()
print(f"Acquired: {conn1.id}")
print(f"Acquired: {conn2.id}")
await pool.release(conn1)
print(f"Released: {conn1.id}")
conn3 = await pool.acquire()
print(f"Acquired: {conn3.id} (reused)")
await pool.release(conn2)
await pool.release(conn3)
await pool.close()

asyncio.run(main())

Why asyncio.Queue: asyncio.Queue is the ideal backing store for a connection pool. await queue.get() suspends the coroutine (yielding control to the event loop) if the queue is empty, then automatically resumes it when a connection becomes available via queue.put(). This gives you correct async backpressure with zero busy-waiting.

Production additions: health-check pings on borrowed connections, max connection lifetime (recycle old connections), circuit-breaker integration if the database is unreachable.

import asyncio
from typing import Optional, List

class AsyncConnectionPool:
    """Async connection pool with acquire/release semantics.
    
    - Pool pre-creates min_size connections on startup
    - acquire() waits if no connection is available (up to timeout)
    - release() returns connection to pool
    - Supports async context manager: async with pool.acquire() as conn
    - close() drains and closes all connections
    """
    def __init__(self, factory, min_size: int = 2, max_size: int = 10):
        pass

    async def start(self) -> None:
        pass

    async def acquire(self, timeout: float = 5.0):
        pass

    async def release(self, conn) -> None:
        pass

    async def close(self) -> None:
        pass
Expected Output
Pool started with 2 connections\nAcquired: conn-0\nAcquired: conn-1\nReleased: conn-0\nAcquired: conn-0 (reused)\nPool closed
Hints

Hint 1: Use `asyncio.Queue` as the pool storage. Put connections in, get them out. `asyncio.wait_for(queue.get(), timeout)` handles the timeout case.

Hint 2: The context manager should yield the connection and call release() in the __aexit__ method, even if an exception occurs.

#8Typed API Response BuilderHard
genericstypingapi-responseenvelope-pattern

Build a generic typed API response envelope used across all routes to ensure consistent JSON output shape.

from dataclasses import dataclass, field
from typing import TypeVar, Generic, Optional, List, Any, Dict
import math

T = TypeVar('T')

@dataclass
class ErrorInfo:
message: str
code: int

@dataclass
class PaginationMeta:
total: int
page: int
page_size: int

@property
def pages(self) -> int:
return math.ceil(self.total / self.page_size)

@dataclass
class APIResponse(Generic[T]):
success: bool
data: Optional[T] = None
error: Optional[ErrorInfo] = None
meta: Optional[PaginationMeta] = None

@classmethod
def ok(cls, data: T) -> 'APIResponse[T]':
return cls(success=True, data=data)

@classmethod
def fail(cls, message: str, code: int = 400) -> 'APIResponse[None]':
return cls(success=False, error=ErrorInfo(message=message, code=code))

@classmethod
def paginated(cls, items: List[T], total: int, page: int, page_size: int) -> 'APIResponse[List[T]]':
return cls(
success=True,
data=items,
meta=PaginationMeta(total=total, page=page, page_size=page_size),
)

def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {"success": self.success}
if self.data is not None:
result["data"] = self.data
if self.error:
result["error"] = {"message": self.error.message, "code": self.error.code}
if not self.error:
result.setdefault("data", None)
result.setdefault("error", None)
if self.meta:
result["meta"] = {
"total": self.meta.total,
"page": self.meta.page,
"pages": self.meta.pages,
}
return result

# Test
print("Success:", APIResponse.ok({"id": 1, "name": "Alice"}).to_dict())
print("Error:", APIResponse.fail("Not found", 404).to_dict())
print("Paged:", APIResponse.paginated([1, 2, 3], total=10, page=1, page_size=3).to_dict())
Solution
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional, List, Any, Dict
import math

T = TypeVar('T')

@dataclass
class ErrorInfo:
message: str
code: int

@dataclass
class PaginationMeta:
total: int
page: int
page_size: int

@property
def pages(self) -> int:
return math.ceil(self.total / self.page_size)

@dataclass
class APIResponse(Generic[T]):
success: bool
data: Optional[T] = None
error: Optional[ErrorInfo] = None
meta: Optional[PaginationMeta] = None

@classmethod
def ok(cls, data: T) -> 'APIResponse[T]':
return cls(success=True, data=data)

@classmethod
def fail(cls, message: str, code: int = 400) -> 'APIResponse[None]':
return cls(success=False, error=ErrorInfo(message=message, code=code))

@classmethod
def paginated(cls, items: List[T], total: int, page: int, page_size: int) -> 'APIResponse[List[T]]':
return cls(
success=True,
data=items,
meta=PaginationMeta(total=total, page=page, page_size=page_size),
)

def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {"success": self.success}
if self.data is not None:
result["data"] = self.data
if self.error:
result["error"] = {"message": self.error.message, "code": self.error.code}
if not self.error:
result.setdefault("data", None)
result.setdefault("error", None)
if self.meta:
result["meta"] = {
"total": self.meta.total,
"page": self.meta.page,
"pages": self.meta.pages,
}
return result

print("Success:", APIResponse.ok({"id": 1, "name": "Alice"}).to_dict())
print("Error:", APIResponse.fail("Not found", 404).to_dict())
print("Paged:", APIResponse.paginated([1, 2, 3], total=10, page=1, page_size=3).to_dict())

Envelope pattern: Wrapping all API responses in a consistent {"success": bool, "data": ..., "error": ...} envelope makes client code simpler — clients always know the shape, regardless of which endpoint they hit. FastAPI's response_model parameter enforces this at the framework level.

from dataclasses import dataclass, field
from typing import TypeVar, Generic, Optional, List, Any
from datetime import datetime

T = TypeVar('T')

@dataclass
class APIResponse(Generic[T]):
    """Typed API response envelope.
    
    Supports:
      - APIResponse.ok(data) -> success response
      - APIResponse.error(message, code) -> error response
      - APIResponse.paginated(items, total, page, page_size) -> paged response
      - .to_dict() -> serializable dict
    """
    pass
Expected Output
Success: {'success': True, 'data': {'id': 1, 'name': 'Alice'}, 'error': None}\nError: {'success': False, 'data': None, 'error': {'message': 'Not found', 'code': 404}}\nPaged: {'success': True, 'data': [1, 2, 3], 'meta': {'total': 10, 'page': 1, 'pages': 4}}
Hints

Hint 1: Use Generic[T] so the response can carry any data type with full type hints. The `data` field is Optional[T].

Hint 2: Use @classmethod constructors (ok, error, paginated) — they are more readable than calling __init__ directly. Store pagination metadata in a separate `meta` field.

#9Background Task QueueHard
asynciobackground-taskstask-queueworker-pool

Build an async background task queue that tracks task status, supports concurrent workers, and gracefully shuts down.

import asyncio
import uuid
from typing import Callable, Any
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"

@dataclass
class TaskRecord:
id: str
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = ""

class BackgroundTaskQueue:
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue()
self._records: dict = {}
self._workers: list = []
self._running = False

def enqueue(self, coro_fn: Callable, *args) -> str:
task_id = f"task-{len(self._records)}"
self._records[task_id] = TaskRecord(id=task_id)
self._queue.put_nowait((task_id, coro_fn, args))
return task_id

def status(self, task_id: str) -> TaskRecord:
return self._records[task_id]

async def _worker(self):
while self._running:
try:
task_id, coro_fn, args = await asyncio.wait_for(
self._queue.get(), timeout=0.1
)
except asyncio.TimeoutError:
continue
record = self._records[task_id]
record.status = TaskStatus.RUNNING
try:
record.result = await coro_fn(*args)
record.status = TaskStatus.DONE
except Exception as e:
record.status = TaskStatus.FAILED
record.error = str(e)
finally:
self._queue.task_done()

async def start(self, workers: int = 3) -> None:
self._running = True
self._workers = [
asyncio.create_task(self._worker()) for _ in range(workers)
]

async def stop(self) -> None:
await self._queue.join()
self._running = False
for w in self._workers:
await w

async def double(x: int) -> int:
await asyncio.sleep(0.01)
return x * 2

async def main():
queue = BackgroundTaskQueue()
await queue.start(workers=2)

ids = [queue.enqueue(double, i + 1) for i in range(3)]
print(f"Enqueued {len(ids)} tasks")

await queue.stop()
print("All tasks completed")

for task_id in ids:
rec = queue.status(task_id)
print(f"Task {rec.id}: {rec.status.value}, result={rec.result}")

asyncio.run(main())
Solution
import asyncio
from typing import Callable, Any
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"

@dataclass
class TaskRecord:
id: str
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = ""

class BackgroundTaskQueue:
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue()
self._records: dict = {}
self._workers: list = []
self._running = False

def enqueue(self, coro_fn: Callable, *args) -> str:
task_id = f"task-{len(self._records)}"
self._records[task_id] = TaskRecord(id=task_id)
self._queue.put_nowait((task_id, coro_fn, args))
return task_id

def status(self, task_id: str) -> TaskRecord:
return self._records[task_id]

async def _worker(self):
while self._running:
try:
task_id, coro_fn, args = await asyncio.wait_for(
self._queue.get(), timeout=0.1
)
except asyncio.TimeoutError:
continue
record = self._records[task_id]
record.status = TaskStatus.RUNNING
try:
record.result = await coro_fn(*args)
record.status = TaskStatus.DONE
except Exception as e:
record.status = TaskStatus.FAILED
record.error = str(e)
finally:
self._queue.task_done()

async def start(self, workers: int = 3) -> None:
self._running = True
self._workers = [asyncio.create_task(self._worker()) for _ in range(workers)]

async def stop(self) -> None:
await self._queue.join()
self._running = False
for w in self._workers:
await w

async def double(x: int) -> int:
await asyncio.sleep(0.01)
return x * 2

async def main():
queue = BackgroundTaskQueue()
await queue.start(workers=2)
ids = [queue.enqueue(double, i + 1) for i in range(3)]
print(f"Enqueued {len(ids)} tasks")
await queue.stop()
print("All tasks completed")
for task_id in ids:
rec = queue.status(task_id)
print(f"Task {rec.id}: {rec.status.value}, result={rec.result}")

asyncio.run(main())

queue.join() semantics: queue.join() blocks until every item placed in the queue has been retrieved AND task_done() has been called for it. This is the correct shutdown mechanism — it drains all pending work before stopping workers. FastAPI's BackgroundTasks works similarly but delegates to Starlette's task runner.

import asyncio
from typing import Callable, Any, Awaitable
from dataclasses import dataclass, field
from enum import Enum
import uuid

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    DONE = "done"
    FAILED = "failed"

@dataclass
class TaskRecord:
    id: str
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: str = ""

class BackgroundTaskQueue:
    """Async background task queue with status tracking.
    
    - enqueue(coro_fn, *args) -> task_id
    - status(task_id) -> TaskRecord
    - start(workers=3) -> launches worker coroutines
    - stop() -> drains queue and shuts down workers
    """
    pass
Expected Output
Enqueued 3 tasks\nAll tasks completed\nTask task-0: done, result=2\nTask task-1: done, result=4\nTask task-2: done, result=6
Hints

Hint 1: Use `asyncio.Queue` to hold pending tasks as (task_id, coro_fn, args) tuples. Worker coroutines loop on `await queue.get()` and process each task.

Hint 2: Track task records in a dict keyed by task_id. Update status to RUNNING before executing, then DONE or FAILED after.

#10Request Deduplication CacheHard
idempotencycachingdeduplicationapi-patterns

Build an idempotency cache that deduplicates identical API requests within a TTL window, preventing duplicate payments, duplicate user creation, etc.

import asyncio
import time
from typing import Any, Callable, Awaitable, Optional, Tuple

class IdempotencyCache:
def __init__(self, ttl: float = 60.0):
self._ttl = ttl
self._store: dict = {}
self._locks: dict = {}

async def execute(
self,
idempotency_key: str,
handler: Callable[..., Awaitable[Any]],
*args,
) -> Tuple[Any, bool]:
"""
Returns (result, was_cached).
If was_cached is True, handler was NOT called again.
"""
now = time.monotonic()

# Check cache (fast path, no lock)
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if now - stored_at < self._ttl:
return result, True

# Get or create a per-key lock to prevent duplicate execution
if idempotency_key not in self._locks:
self._locks[idempotency_key] = asyncio.Lock()
lock = self._locks[idempotency_key]

async with lock:
# Re-check after acquiring lock (another coroutine may have filled it)
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if time.monotonic() - stored_at < self._ttl:
return result, True

result = await handler(*args)
self._store[idempotency_key] = (result, time.monotonic())
return result, False

execution_count = 0

async def charge_card(amount: float) -> dict:
global execution_count
execution_count += 1
return {"charged": amount, "tx_id": f"tx_{execution_count}"}

async def main():
cache = IdempotencyCache(ttl=0.5)

result, cached = await cache.execute("req-abc", charge_card, 99.99)
print(f"First call: {'returned cached result' if cached else 'executed handler'}")

result2, cached2 = await cache.execute("req-abc", charge_card, 99.99)
print(f"Second call (same key): {'returned cached result' if cached2 else 'executed handler'}")
assert result == result2

result3, cached3 = await cache.execute("req-xyz", charge_card, 49.99)
print(f"Different key: {'returned cached result' if cached3 else 'executed handler'}")

await asyncio.sleep(0.6)
result4, cached4 = await cache.execute("req-abc", charge_card, 99.99)
print(f"After TTL: {'returned cached result' if cached4 else 'executed handler (cache expired)'}")

asyncio.run(main())
Solution
import asyncio
import time
from typing import Any, Callable, Awaitable, Tuple

class IdempotencyCache:
def __init__(self, ttl: float = 60.0):
self._ttl = ttl
self._store: dict = {}
self._locks: dict = {}

async def execute(
self,
idempotency_key: str,
handler: Callable[..., Awaitable[Any]],
*args,
) -> Tuple[Any, bool]:
now = time.monotonic()

if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if now - stored_at < self._ttl:
return result, True

if idempotency_key not in self._locks:
self._locks[idempotency_key] = asyncio.Lock()
lock = self._locks[idempotency_key]

async with lock:
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if time.monotonic() - stored_at < self._ttl:
return result, True
result = await handler(*args)
self._store[idempotency_key] = (result, time.monotonic())
return result, False

execution_count = 0

async def charge_card(amount: float) -> dict:
global execution_count
execution_count += 1
return {"charged": amount, "tx_id": f"tx_{execution_count}"}

async def main():
cache = IdempotencyCache(ttl=0.5)
result, cached = await cache.execute("req-abc", charge_card, 99.99)
print(f"First call: {'returned cached result' if cached else 'executed handler'}")
result2, cached2 = await cache.execute("req-abc", charge_card, 99.99)
print(f"Second call (same key): {'returned cached result' if cached2 else 'executed handler'}")
result3, cached3 = await cache.execute("req-xyz", charge_card, 49.99)
print(f"Different key: {'returned cached result' if cached3 else 'executed handler'}")
await asyncio.sleep(0.6)
result4, cached4 = await cache.execute("req-abc", charge_card, 99.99)
print(f"After TTL: {'returned cached result' if cached4 else 'executed handler (cache expired)'}")

asyncio.run(main())

Double-checked locking: The pattern here — check before lock, re-check after lock — is a classic double-checked locking pattern. The first check avoids lock contention on the happy path (cache hit). The second check inside the lock prevents two coroutines that both missed the cache from both executing the handler.

Stripe's idempotency: Stripe requires clients to pass Idempotency-Key headers for payment operations. Their implementation stores results for 24 hours, allowing safe retries on network failures without double-charging customers.

import asyncio
import hashlib
import json
import time
from typing import Any, Callable, Optional, Awaitable

class IdempotencyCache:
    """Request deduplication cache for idempotent API operations.
    
    Given an idempotency_key and request body, ensures that
    executing the same request twice returns the cached result
    rather than executing the handler a second time.
    
    TTL-based expiry removes stale entries.
    """
    pass
Expected Output
First call: executed handler\nSecond call (same key): returned cached result\nDifferent key: executed handler\nAfter TTL: executed handler (cache expired)
Hints

Hint 1: Store {idempotency_key: (result, timestamp)} in a dict. On each call, check if key exists and not expired before running the handler.

Hint 2: Use asyncio.Lock per key to prevent concurrent duplicate requests from both executing the handler simultaneously (the "thundering herd" for the same key).

#11Health Check SubsystemHard
health-checkscircuit-breakerobservabilityproduction

Build a production health check subsystem that runs component checks concurrently, handles timeouts, and computes an overall system health status.

import asyncio
import time
from typing import Callable, Dict, Any, Awaitable, List, Optional
from dataclasses import dataclass
from enum import Enum

class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"

@dataclass
class ComponentHealth:
name: str
status: HealthStatus
latency_ms: Optional[float] = None
detail: str = ""

@dataclass
class HealthReport:
overall: HealthStatus
components: List[ComponentHealth]

def __str__(self):
lines = [f"Overall: {self.overall.value}"]
for c in self.components:
if c.status == HealthStatus.HEALTHY:
lines.append(f"{c.name}: healthy ({c.latency_ms:.0f}ms)")
else:
lines.append(f"{c.name}: {c.status.value} - {c.detail}")
return "\n".join(lines)

class HealthChecker:
def __init__(self, timeout: float = 2.0):
self._timeout = timeout
self._checks: List[tuple] = []

def register(self, name: str, check_fn: Callable, critical: bool = True) -> None:
self._checks.append((name, check_fn, critical))

async def _run_one(self, name: str, check_fn: Callable, critical: bool) -> ComponentHealth:
start = time.monotonic()
try:
await asyncio.wait_for(check_fn(), timeout=self._timeout)
latency = (time.monotonic() - start) * 1000
return ComponentHealth(name=name, status=HealthStatus.HEALTHY, latency_ms=latency)
except asyncio.TimeoutError:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail="timeout")
except Exception as e:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail=str(e))

async def run_all(self) -> HealthReport:
results = await asyncio.gather(
*[self._run_one(name, fn, critical) for name, fn, critical in self._checks],
return_exceptions=False,
)

critical_names = {name for name, _, critical in self._checks if critical}
overall = HealthStatus.HEALTHY
for comp in results:
if comp.status == HealthStatus.UNHEALTHY:
if comp.name in critical_names:
overall = HealthStatus.UNHEALTHY
break
overall = HealthStatus.DEGRADED

return HealthReport(overall=overall, components=list(results))

async def main():
async def db_check():
await asyncio.sleep(0.002)

async def cache_check():
await asyncio.sleep(0.001)

async def failing_cache():
raise ConnectionError("Connection refused")

async def slow_db():
await asyncio.sleep(5.0)

checker = HealthChecker(timeout=1.0)
checker.register("db", db_check, critical=True)
checker.register("cache", cache_check, critical=False)
print(await checker.run_all())
print()

checker2 = HealthChecker(timeout=1.0)
checker2.register("db", db_check, critical=True)
checker2.register("cache", failing_cache, critical=False)
print(await checker2.run_all())
print()

checker3 = HealthChecker(timeout=0.1)
checker3.register("db", slow_db, critical=True)
checker3.register("cache", cache_check, critical=False)
print(await checker3.run_all())

asyncio.run(main())
Solution
import asyncio
import time
from typing import Callable, List, Optional
from dataclasses import dataclass
from enum import Enum

class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"

@dataclass
class ComponentHealth:
name: str
status: HealthStatus
latency_ms: Optional[float] = None
detail: str = ""

@dataclass
class HealthReport:
overall: HealthStatus
components: List[ComponentHealth]

def __str__(self):
lines = [f"Overall: {self.overall.value}"]
for c in self.components:
if c.status == HealthStatus.HEALTHY:
lines.append(f"{c.name}: healthy ({c.latency_ms:.0f}ms)")
else:
lines.append(f"{c.name}: {c.status.value} - {c.detail}")
return "\n".join(lines)

class HealthChecker:
def __init__(self, timeout: float = 2.0):
self._timeout = timeout
self._checks: List[tuple] = []

def register(self, name: str, check_fn: Callable, critical: bool = True) -> None:
self._checks.append((name, check_fn, critical))

async def _run_one(self, name: str, check_fn: Callable, critical: bool) -> ComponentHealth:
start = time.monotonic()
try:
await asyncio.wait_for(check_fn(), timeout=self._timeout)
latency = (time.monotonic() - start) * 1000
return ComponentHealth(name=name, status=HealthStatus.HEALTHY, latency_ms=latency)
except asyncio.TimeoutError:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail="timeout")
except Exception as e:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail=str(e))

async def run_all(self) -> HealthReport:
results = await asyncio.gather(
*[self._run_one(name, fn, critical) for name, fn, critical in self._checks],
return_exceptions=False,
)
critical_names = {name for name, _, critical in self._checks if critical}
overall = HealthStatus.HEALTHY
for comp in results:
if comp.status == HealthStatus.UNHEALTHY:
if comp.name in critical_names:
overall = HealthStatus.UNHEALTHY
break
overall = HealthStatus.DEGRADED
return HealthReport(overall=overall, components=list(results))

async def main():
async def db_check(): await asyncio.sleep(0.002)
async def cache_check(): await asyncio.sleep(0.001)
async def failing_cache(): raise ConnectionError("Connection refused")
async def slow_db(): await asyncio.sleep(5.0)

checker = HealthChecker(timeout=1.0)
checker.register("db", db_check, critical=True)
checker.register("cache", cache_check, critical=False)
print(await checker.run_all())
print()

checker2 = HealthChecker(timeout=1.0)
checker2.register("db", db_check, critical=True)
checker2.register("cache", failing_cache, critical=False)
print(await checker2.run_all())
print()

checker3 = HealthChecker(timeout=0.1)
checker3.register("db", slow_db, critical=True)
checker3.register("cache", cache_check, critical=False)
print(await checker3.run_all())

asyncio.run(main())

Liveness vs readiness: Kubernetes distinguishes liveness checks (is the process alive?) from readiness checks (is it ready to serve traffic?). A readiness check failing removes the pod from the load balancer; a liveness check failing causes a pod restart. This HealthChecker implements readiness-style checks. In FastAPI, expose GET /healthz (liveness, always 200) and GET /ready (readiness, uses this checker).

import asyncio
import time
from typing import Callable, Dict, Any, Awaitable
from enum import Enum

class HealthStatus(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

class HealthChecker:
    """Production health check subsystem.
    
    - register(name, check_fn, critical=False) adds a component check
    - run_all() executes all checks concurrently with timeout
    - get_report() returns overall status + per-component details
    - Critical failures -> UNHEALTHY; non-critical -> DEGRADED
    """
    pass
Expected Output
Overall: healthy\ndb: healthy (2ms)\ncache: healthy (1ms)\nOverall: degraded\ndb: healthy (2ms)\ncache: unhealthy - Connection refused\nOverall: unhealthy\ndb: unhealthy - timeout\ncache: healthy (1ms)
Hints

Hint 1: Use asyncio.gather() with return_exceptions=True to run all checks concurrently. Wrap each check in asyncio.wait_for() to enforce the timeout.

Hint 2: Compute overall status: if any critical check is unhealthy -> UNHEALTHY; elif any check is unhealthy -> DEGRADED; else HEALTHY.

© 2026 EngineersOfAI. All rights reserved.