Python Build a Production API Platform: Practice Problems & Exercises
Practice: Build a Production API Platform
← Back to lessonEasy
Build a simple RouteRegistry class that stores and lists API route registrations. It should support register(method, path, handler) and list_routes().
from dataclasses import dataclass, field
from typing import Callable, List
@dataclass
class Route:
method: str
path: str
handler: Callable
class RouteRegistry:
def __init__(self):
self.routes: List[Route] = []
def register(self, method: str, path: str, handler: Callable):
self.routes.append(Route(method.upper(), path, handler))
def list_routes(self):
for route in self.routes:
print(f"{route.method} {route.path} -> {route.handler.__name__}")
# Test
registry = RouteRegistry()
def list_users(): pass
def create_user(): pass
def get_user(): pass
def delete_user(): pass
registry.register("GET", "/users", list_users)
registry.register("POST", "/users", create_user)
registry.register("GET", "/users/{id}", get_user)
registry.register("DELETE", "/users/{id}", delete_user)
registry.list_routes()
Solution
from dataclasses import dataclass, field
from typing import Callable, List
@dataclass
class Route:
method: str
path: str
handler: Callable
class RouteRegistry:
def __init__(self):
self.routes: List[Route] = []
def register(self, method: str, path: str, handler: Callable):
self.routes.append(Route(method.upper(), path, handler))
def list_routes(self):
for route in self.routes:
print(f"{route.method} {route.path} -> {route.handler.__name__}")
registry = RouteRegistry()
def list_users(): pass
def create_user(): pass
def get_user(): pass
def delete_user(): pass
registry.register("GET", "/users", list_users)
registry.register("POST", "/users", create_user)
registry.register("GET", "/users/{id}", get_user)
registry.register("DELETE", "/users/{id}", delete_user)
registry.list_routes()
Key insight: A route registry is the core data structure powering every web framework. FastAPI, Flask, and Django all maintain an internal registry mapping (method, path) pairs to handler callables. Understanding this structure helps you reason about how @app.get("/users") decorators work under the hood — they simply call app.register("GET", "/users", handler).
Expected Output
GET /users -> list_users\nPOST /users -> create_user\nGET /users/{id} -> get_user\nDELETE /users/{id} -> delete_userHints
Hint 1: A route registry is just a list of (method, path, handler_name) tuples. Iterate and print each entry.
Hint 2: Use a dataclass or namedtuple to represent a route entry. Group by HTTP method for clarity.
Build a UserCreateRequest validator (without Pydantic) that validates user creation input and returns structured errors.
import re
from dataclasses import dataclass
from typing import Dict, Any, Tuple
@dataclass
class UserCreateRequest:
name: str
email: str
age: int
def validate(self) -> Tuple[bool, Dict[str, Any]]:
errors = {}
if not self.name or not self.name.strip():
errors["name"] = "Name cannot be empty"
if not self.email or "@" not in self.email:
errors["email"] = "Invalid email address"
if not isinstance(self.age, int) or self.age < 0 or self.age > 150:
errors["age"] = "Age must be between 0 and 150"
if errors:
return False, errors
return True, {"name": self.name, "email": self.email, "age": self.age}
# Test
invalid_req = UserCreateRequest(name="", email="not-an-email", age=-1)
ok, result = valid_req.validate()
print(f"Valid: {ok}", result)
ok, result = invalid_req.validate()
print(f"Valid: {ok}", result)
Solution
from dataclasses import dataclass
from typing import Dict, Any, Tuple
@dataclass
class UserCreateRequest:
name: str
email: str
age: int
def validate(self) -> Tuple[bool, Dict[str, Any]]:
errors = {}
if not self.name or not self.name.strip():
errors["name"] = "Name cannot be empty"
if not self.email or "@" not in self.email:
errors["email"] = "Invalid email address"
if not isinstance(self.age, int) or self.age < 0 or self.age > 150:
errors["age"] = "Age must be between 0 and 150"
if errors:
return False, errors
return True, {"name": self.name, "email": self.email, "age": self.age}
invalid_req = UserCreateRequest(name="", email="not-an-email", age=-1)
ok, result = valid_req.validate()
print(f"Valid: {ok}", result)
ok, result = invalid_req.validate()
print(f"Valid: {ok}", result)
Production note: Pydantic does exactly this but with far more power — it coerces types, generates JSON Schema, and integrates with FastAPI's OpenAPI documentation. The pattern here (validate -> return errors dict) mirrors how Pydantic's ValidationError works internally.
Expected Output
Valid: True {'name': 'Alice', 'email': '[email protected]', 'age': 30}\nValid: False {'name': '', 'email': 'not-an-email', 'age': -1}Hints
Hint 1: Use a dataclass with a `validate()` method that checks each field against rules and returns (is_valid, errors) or (is_valid, cleaned_data).
Hint 2: Check: name non-empty, email contains @, age between 0 and 150.
Medium
Build a lightweight dependency injection container that resolves dependencies by name and injects them into functions automatically.
import inspect
from typing import Callable, Any, Dict
class DIContainer:
def __init__(self):
self._factories: Dict[str, Callable] = {}
self._cache: Dict[str, Any] = {}
def register(self, name: str, factory: Callable) -> None:
self._factories[name] = factory
def get(self, name: str) -> Any:
if name not in self._cache:
if name not in self._factories:
raise KeyError(f"No dependency registered for '{name}'")
self._cache[name] = self._factories[name]()
return self._cache[name]
def inject(self, fn: Callable) -> Any:
sig = inspect.signature(fn)
kwargs = {name: self.get(name) for name in sig.parameters}
return fn(**kwargs)
# Test fixtures
class Database:
def __init__(self, url: str):
self.url = url
def __repr__(self):
return f"<Database url={self.url}>"
class Mailer:
def __init__(self, smtp: str):
self.smtp = smtp
def __repr__(self):
return f"<Mailer smtp={self.smtp}>"
container = DIContainer()
container.register("db", lambda: Database("postgresql://localhost/app"))
container.register("mailer", lambda: Mailer("smtp.example.com"))
def create_user(db, mailer):
print(f"db={container.get('db')}")
print(f"mailer={container.get('mailer')}")
print(f"result={container.inject(create_user)}")
Solution
import inspect
from typing import Callable, Any, Dict
class DIContainer:
def __init__(self):
self._factories: Dict[str, Callable] = {}
self._cache: Dict[str, Any] = {}
def register(self, name: str, factory: Callable) -> None:
self._factories[name] = factory
def get(self, name: str) -> Any:
if name not in self._cache:
if name not in self._factories:
raise KeyError(f"No dependency registered for '{name}'")
self._cache[name] = self._factories[name]()
return self._cache[name]
def inject(self, fn: Callable) -> Any:
sig = inspect.signature(fn)
kwargs = {name: self.get(name) for name in sig.parameters}
return fn(**kwargs)
class Database:
def __init__(self, url: str):
self.url = url
def __repr__(self):
return f"<Database url={self.url}>"
class Mailer:
def __init__(self, smtp: str):
self.smtp = smtp
def __repr__(self):
return f"<Mailer smtp={self.smtp}>"
container = DIContainer()
container.register("db", lambda: Database("postgresql://localhost/app"))
container.register("mailer", lambda: Mailer("smtp.example.com"))
def create_user(db, mailer):
print(f"db={container.get('db')}")
print(f"mailer={container.get('mailer')}")
print(f"result={container.inject(create_user)}")
How FastAPI's DI works: FastAPI uses Depends() to declare dependencies in route handlers. When a request arrives, FastAPI inspects the handler's signature, resolves each Depends() by calling the dependency function, and injects the results. This container is the same idea — inspect.signature for introspection, a factory registry, and singleton caching.
from typing import Callable, Any, Dict, Type
class DIContainer:
"""Lightweight dependency injection container.
Supports:
- register(name, factory_fn) to register a factory
- get(name) to resolve and cache a dependency
- inject(fn) to call fn with resolved dependencies
"""
def __init__(self):
self._factories: Dict[str, Callable] = {}
self._cache: Dict[str, Any] = {}
def register(self, name: str, factory: Callable) -> None:
pass
def get(self, name: str) -> Any:
pass
def inject(self, fn: Callable) -> Any:
passExpected Output
db=<Database url=postgresql://localhost/app>\nmailer=<Mailer smtp=smtp.example.com>\nresult=User created: [email protected]Hints
Hint 1: Use `inspect.signature(fn).parameters` to get parameter names, then call `self.get(name)` for each to build the kwargs dict.
Hint 2: Cache resolved dependencies in `self._cache` so factories are only called once (singleton pattern).
Implement a token bucket rate limiter that allows bursting up to capacity requests but limits sustained throughput to refill_rate requests/second.
import time
from typing import Dict, Tuple
class TokenBucketRateLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self._buckets: Dict[str, Tuple[float, float]] = {}
def _get_bucket(self, key: str) -> Tuple[float, float]:
if key not in self._buckets:
self._buckets[key] = (float(self.capacity), time.monotonic())
return self._buckets[key]
def is_allowed(self, client_key: str) -> bool:
tokens, last_refill = self._get_bucket(client_key)
now = time.monotonic()
elapsed = now - last_refill
tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
if tokens >= 1:
self._buckets[client_key] = (tokens - 1, now)
return True
self._buckets[client_key] = (tokens, now)
return False
# Test: capacity=5, refill 2/sec
limiter = TokenBucketRateLimiter(capacity=5, refill_rate=2.0)
results = [limiter.is_allowed("user_1") for _ in range(6)]
for i, allowed in enumerate(results, 1):
status = "allowed" if allowed else "rate_limited"
print(f"Request {i}: {status}")
time.sleep(1.0)
print(f"After 1s refill: {'allowed' if limiter.is_allowed('user_1') else 'rate_limited'}")
Solution
import time
from typing import Dict, Tuple
class TokenBucketRateLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self._buckets: Dict[str, Tuple[float, float]] = {}
def _get_bucket(self, key: str) -> Tuple[float, float]:
if key not in self._buckets:
self._buckets[key] = (float(self.capacity), time.monotonic())
return self._buckets[key]
def is_allowed(self, client_key: str) -> bool:
tokens, last_refill = self._get_bucket(client_key)
now = time.monotonic()
elapsed = now - last_refill
tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
if tokens >= 1:
self._buckets[client_key] = (tokens - 1, now)
return True
self._buckets[client_key] = (tokens, now)
return False
limiter = TokenBucketRateLimiter(capacity=5, refill_rate=2.0)
results = [limiter.is_allowed("user_1") for _ in range(6)]
for i, allowed in enumerate(results, 1):
status = "allowed" if allowed else "rate_limited"
print(f"Request {i}: {status}")
time.sleep(1.0)
print(f"After 1s refill: {'allowed' if limiter.is_allowed('user_1') else 'rate_limited'}")
Token bucket vs fixed window: A fixed window counter allows a burst of N requests at the end of one window and N at the start of the next (2N effectively). Token bucket naturally smooths this — tokens accumulate over time, so bursting is bounded by capacity and you can never accumulate more than capacity requests to fire at once.
Production note: In distributed systems, store bucket state in Redis (with Lua scripts for atomic compare-and-set) rather than in-process memory.
import time
from typing import Dict
class TokenBucketRateLimiter:
"""Token bucket rate limiter per client key.
Each client gets a bucket of 'capacity' tokens.
Tokens refill at 'refill_rate' tokens/second.
Each request consumes 1 token.
Returns True if request is allowed, False if rate-limited.
"""
def __init__(self, capacity: int, refill_rate: float):
pass
def is_allowed(self, client_key: str) -> bool:
passExpected Output
Requests 1-5: allowed\nRequest 6: rate_limited\nAfter 1s refill: allowedHints
Hint 1: Store (tokens, last_refill_time) per client key. On each request, compute elapsed time, add elapsed * refill_rate tokens (capped at capacity), then check if tokens >= 1.
Hint 2: Use `time.monotonic()` for the timestamp — it never goes backwards, unlike `time.time()`.
Implement a minimal JWT HS256 token creator and verifier from scratch — no external libraries.
import base64
import json
import hmac
import hashlib
from typing import Optional, Dict, Any
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
class JWTMiddleware:
@staticmethod
def create_token(payload: Dict[str, Any], secret: str) -> str:
header = b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
body = b64url_encode(json.dumps(payload).encode())
signing_input = f"{header}.{body}"
sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
return f"{signing_input}.{b64url_encode(sig)}"
@staticmethod
def verify_token(token: str, secret: str) -> Optional[Dict[str, Any]]:
try:
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
if not hmac.compare_digest(b64url_decode(sig_b64), expected_sig):
return None
return json.loads(b64url_decode(payload_b64))
except Exception:
return None
# Test
secret = "super-secret-key-do-not-share"
token = JWTMiddleware.create_token({"user_id": 42, "role": "admin"}, secret)
print(f"Token: {token[:20]}...")
print(f"Payload: {JWTMiddleware.verify_token(token, secret)}")
tampered = token[:-5] + "XXXXX"
print(f"Tampered token rejected: {JWTMiddleware.verify_token(tampered, secret) is None}")
Solution
import base64
import json
import hmac
import hashlib
from typing import Optional, Dict, Any
def b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()
def b64url_decode(s: str) -> bytes:
padding = 4 - len(s) % 4
if padding != 4:
s += "=" * padding
return base64.urlsafe_b64decode(s)
class JWTMiddleware:
@staticmethod
def create_token(payload: Dict[str, Any], secret: str) -> str:
header = b64url_encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode())
body = b64url_encode(json.dumps(payload).encode())
signing_input = f"{header}.{body}"
sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
return f"{signing_input}.{b64url_encode(sig)}"
@staticmethod
def verify_token(token: str, secret: str) -> Optional[Dict[str, Any]]:
try:
parts = token.split(".")
if len(parts) != 3:
return None
header_b64, payload_b64, sig_b64 = parts
signing_input = f"{header_b64}.{payload_b64}"
expected_sig = hmac.new(
secret.encode(),
signing_input.encode(),
hashlib.sha256,
).digest()
if not hmac.compare_digest(b64url_decode(sig_b64), expected_sig):
return None
return json.loads(b64url_decode(payload_b64))
except Exception:
return None
secret = "super-secret-key-do-not-share"
token = JWTMiddleware.create_token({"user_id": 42, "role": "admin"}, secret)
print(f"Token: {token[:20]}...")
print(f"Payload: {JWTMiddleware.verify_token(token, secret)}")
tampered = token[:-5] + "XXXXX"
print(f"Tampered token rejected: {JWTMiddleware.verify_token(tampered, secret) is None}")
Critical security details:
hmac.compare_digest()uses constant-time comparison to prevent timing attacks. Never use==to compare MAC values.- Always validate the
algheader to prevent the "none" algorithm attack (never acceptalg: none). - In production, add
exp(expiry) andiat(issued-at) claims and validate them on every request.
import base64
import json
import hmac
import hashlib
from typing import Optional, Dict, Any
class JWTMiddleware:
"""Minimal JWT HS256 auth middleware.
Supports:
- create_token(payload, secret) -> token string
- verify_token(token, secret) -> payload dict or None
Does NOT handle expiry (add exp claim checking as an extension).
"""
@staticmethod
def create_token(payload: Dict[str, Any], secret: str) -> str:
pass
@staticmethod
def verify_token(token: str, secret: str) -> Optional[Dict[str, Any]]:
passExpected Output
Token: eyJ...\nPayload: {'user_id': 42, 'role': 'admin'}\nTampered token rejected: TrueHints
Hint 1: JWT = base64url(header) + "." + base64url(payload) + "." + base64url(HMAC-SHA256(header+"."+payload, secret)). Use hmac.new() with hashlib.sha256.
Hint 2: For base64url encoding: replace + with - and / with _ and strip = padding. For decoding, add back = padding before decoding.
Build a middleware pipeline where each middleware can intercept requests and responses, similar to Express.js or ASGI middleware.
from typing import Callable, Dict, Any
class MiddlewareStack:
def __init__(self, final_handler: Callable):
self._handler = final_handler
self._middlewares: list = []
def use(self, middleware: Callable) -> 'MiddlewareStack':
self._middlewares.append(middleware)
return self
def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
handler = self._handler
for mw in reversed(self._middlewares):
next_handler = handler
def make_mw(m, n):
return lambda req: m(req, n)
handler = make_mw(mw, next_handler)
return handler(request)
# Test middlewares
import time
def logging_middleware(request, next_handler):
print(f"[logging] {request['method']} {request['path']}")
response = next_handler(request)
print(f"[logging] Response: {response['status']}")
return response
def auth_middleware(request, next_handler):
token = request.get("headers", {}).get("Authorization", "")
print(f"[auth] Checking token: {token}")
return next_handler(request)
def timing_middleware(request, next_handler):
start = time.perf_counter()
print("[timing] Processing...")
response = next_handler(request)
elapsed = time.perf_counter() - start
print(f"[timing] Response time: {elapsed:.2f}s")
return response
def final_handler(request):
print("Handler: listing users")
return {"status": 200, "body": ["alice", "bob"]}
stack = MiddlewareStack(final_handler)
stack.use(logging_middleware).use(auth_middleware).use(timing_middleware)
stack.handle({
"method": "GET",
"path": "/api/users",
"headers": {"Authorization": "Bearer test123"},
})
Solution
from typing import Callable, Dict, Any
import time
class MiddlewareStack:
def __init__(self, final_handler: Callable):
self._handler = final_handler
self._middlewares: list = []
def use(self, middleware: Callable) -> 'MiddlewareStack':
self._middlewares.append(middleware)
return self
def handle(self, request: Dict[str, Any]) -> Dict[str, Any]:
handler = self._handler
for mw in reversed(self._middlewares):
next_handler = handler
def make_mw(m, n):
return lambda req: m(req, n)
handler = make_mw(mw, next_handler)
return handler(request)
def logging_middleware(request, next_handler):
print(f"[logging] {request['method']} {request['path']}")
response = next_handler(request)
print(f"[logging] Response: {response['status']}")
return response
def auth_middleware(request, next_handler):
token = request.get("headers", {}).get("Authorization", "")
print(f"[auth] Checking token: {token}")
return next_handler(request)
def timing_middleware(request, next_handler):
start = time.perf_counter()
print("[timing] Processing...")
response = next_handler(request)
elapsed = time.perf_counter() - start
print(f"[timing] Response time: {elapsed:.2f}s")
return response
def final_handler(request):
print("Handler: listing users")
return {"status": 200, "body": ["alice", "bob"]}
stack = MiddlewareStack(final_handler)
stack.use(logging_middleware).use(auth_middleware).use(timing_middleware)
stack.handle({
"method": "GET",
"path": "/api/users",
"headers": {"Authorization": "Bearer test123"},
})
Execution order: Middlewares are applied outermost-first (the first use() call wraps the outermost). Request flows in: logging -> auth -> timing -> handler. Response flows out: timing -> auth -> logging. This is the classic "onion model" used by Express, Koa, Django, and Starlette/FastAPI.
Common closure bug: Without make_mw(), all closures in the for loop would capture the same mw variable (its final value). Always bind loop variables explicitly when building closure chains.
from typing import Callable, List, Any
class MiddlewareStack:
"""WSGI/ASGI-style middleware pipeline.
Middleware functions have signature:
middleware(request, next_handler) -> response
They can modify request before calling next_handler,
modify response after, or short-circuit entirely.
"""
def __init__(self, final_handler: Callable):
pass
def use(self, middleware: Callable) -> 'MiddlewareStack':
pass
def handle(self, request: dict) -> dict:
passExpected Output
[logging] GET /api/users\n[auth] Checking token: Bearer test123\n[timing] Processing...\nHandler: listing users\n[timing] Response time: 0.00s\n[logging] Response: 200Hints
Hint 1: Store middleware in a list. When handling a request, build a chain of callables: each middleware gets a `next` that calls the next middleware in the chain (or the final handler).
Hint 2: Build the chain by iterating middleware in reverse. Start with the final_handler, then wrap each middleware around it using a closure that captures the current `next`.
Hard
Build an async connection pool that manages a fixed set of database connections with acquire/release semantics and proper async context manager support.
import asyncio
from contextlib import asynccontextmanager
from typing import Any
class FakeConnection:
"""Simulates a database connection."""
_counter = 0
def __init__(self):
FakeConnection._counter += 1
self.id = f"conn-{FakeConnection._counter - 1}"
self.closed = False
async def close(self):
self.closed = True
class AsyncConnectionPool:
def __init__(self, factory, min_size: int = 2, max_size: int = 10):
self._factory = factory
self._min_size = min_size
self._max_size = max_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._all_connections: list = []
async def start(self) -> None:
for _ in range(self._min_size):
conn = await asyncio.coroutine(self._factory)() if asyncio.iscoroutinefunction(self._factory) else self._factory()
self._all_connections.append(conn)
await self._pool.put(conn)
print(f"Pool started with {self._min_size} connections")
async def acquire(self, timeout: float = 5.0):
try:
conn = await asyncio.wait_for(self._pool.get(), timeout=timeout)
return conn
except asyncio.TimeoutError:
raise TimeoutError("No connection available in pool")
async def release(self, conn) -> None:
await self._pool.put(conn)
@asynccontextmanager
async def connection(self, timeout: float = 5.0):
conn = await self.acquire(timeout)
try:
yield conn
finally:
await self.release(conn)
async def close(self) -> None:
for conn in self._all_connections:
await conn.close()
print("Pool closed")
async def main():
pool = AsyncConnectionPool(factory=FakeConnection, min_size=2)
await pool.start()
conn1 = await pool.acquire()
conn2 = await pool.acquire()
print(f"Acquired: {conn1.id}")
print(f"Acquired: {conn2.id}")
await pool.release(conn1)
print(f"Released: {conn1.id}")
conn3 = await pool.acquire()
print(f"Acquired: {conn3.id} (reused)")
await pool.release(conn2)
await pool.release(conn3)
await pool.close()
asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager
class FakeConnection:
_counter = 0
def __init__(self):
FakeConnection._counter += 1
self.id = f"conn-{FakeConnection._counter - 1}"
self.closed = False
async def close(self):
self.closed = True
class AsyncConnectionPool:
def __init__(self, factory, min_size: int = 2, max_size: int = 10):
self._factory = factory
self._min_size = min_size
self._max_size = max_size
self._pool: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._all_connections: list = []
async def start(self) -> None:
for _ in range(self._min_size):
conn = self._factory()
self._all_connections.append(conn)
await self._pool.put(conn)
print(f"Pool started with {self._min_size} connections")
async def acquire(self, timeout: float = 5.0):
try:
return await asyncio.wait_for(self._pool.get(), timeout=timeout)
except asyncio.TimeoutError:
raise TimeoutError("No connection available in pool")
async def release(self, conn) -> None:
await self._pool.put(conn)
@asynccontextmanager
async def connection(self, timeout: float = 5.0):
conn = await self.acquire(timeout)
try:
yield conn
finally:
await self.release(conn)
async def close(self) -> None:
for conn in self._all_connections:
await conn.close()
print("Pool closed")
async def main():
pool = AsyncConnectionPool(factory=FakeConnection, min_size=2)
await pool.start()
conn1 = await pool.acquire()
conn2 = await pool.acquire()
print(f"Acquired: {conn1.id}")
print(f"Acquired: {conn2.id}")
await pool.release(conn1)
print(f"Released: {conn1.id}")
conn3 = await pool.acquire()
print(f"Acquired: {conn3.id} (reused)")
await pool.release(conn2)
await pool.release(conn3)
await pool.close()
asyncio.run(main())
Why asyncio.Queue: asyncio.Queue is the ideal backing store for a connection pool. await queue.get() suspends the coroutine (yielding control to the event loop) if the queue is empty, then automatically resumes it when a connection becomes available via queue.put(). This gives you correct async backpressure with zero busy-waiting.
Production additions: health-check pings on borrowed connections, max connection lifetime (recycle old connections), circuit-breaker integration if the database is unreachable.
import asyncio
from typing import Optional, List
class AsyncConnectionPool:
"""Async connection pool with acquire/release semantics.
- Pool pre-creates min_size connections on startup
- acquire() waits if no connection is available (up to timeout)
- release() returns connection to pool
- Supports async context manager: async with pool.acquire() as conn
- close() drains and closes all connections
"""
def __init__(self, factory, min_size: int = 2, max_size: int = 10):
pass
async def start(self) -> None:
pass
async def acquire(self, timeout: float = 5.0):
pass
async def release(self, conn) -> None:
pass
async def close(self) -> None:
passExpected Output
Pool started with 2 connections\nAcquired: conn-0\nAcquired: conn-1\nReleased: conn-0\nAcquired: conn-0 (reused)\nPool closedHints
Hint 1: Use `asyncio.Queue` as the pool storage. Put connections in, get them out. `asyncio.wait_for(queue.get(), timeout)` handles the timeout case.
Hint 2: The context manager should yield the connection and call release() in the __aexit__ method, even if an exception occurs.
Build a generic typed API response envelope used across all routes to ensure consistent JSON output shape.
from dataclasses import dataclass, field
from typing import TypeVar, Generic, Optional, List, Any, Dict
import math
T = TypeVar('T')
@dataclass
class ErrorInfo:
message: str
code: int
@dataclass
class PaginationMeta:
total: int
page: int
page_size: int
@property
def pages(self) -> int:
return math.ceil(self.total / self.page_size)
@dataclass
class APIResponse(Generic[T]):
success: bool
data: Optional[T] = None
error: Optional[ErrorInfo] = None
meta: Optional[PaginationMeta] = None
@classmethod
def ok(cls, data: T) -> 'APIResponse[T]':
return cls(success=True, data=data)
@classmethod
def fail(cls, message: str, code: int = 400) -> 'APIResponse[None]':
return cls(success=False, error=ErrorInfo(message=message, code=code))
@classmethod
def paginated(cls, items: List[T], total: int, page: int, page_size: int) -> 'APIResponse[List[T]]':
return cls(
success=True,
data=items,
meta=PaginationMeta(total=total, page=page, page_size=page_size),
)
def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {"success": self.success}
if self.data is not None:
result["data"] = self.data
if self.error:
result["error"] = {"message": self.error.message, "code": self.error.code}
if not self.error:
result.setdefault("data", None)
result.setdefault("error", None)
if self.meta:
result["meta"] = {
"total": self.meta.total,
"page": self.meta.page,
"pages": self.meta.pages,
}
return result
# Test
print("Success:", APIResponse.ok({"id": 1, "name": "Alice"}).to_dict())
print("Error:", APIResponse.fail("Not found", 404).to_dict())
print("Paged:", APIResponse.paginated([1, 2, 3], total=10, page=1, page_size=3).to_dict())
Solution
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional, List, Any, Dict
import math
T = TypeVar('T')
@dataclass
class ErrorInfo:
message: str
code: int
@dataclass
class PaginationMeta:
total: int
page: int
page_size: int
@property
def pages(self) -> int:
return math.ceil(self.total / self.page_size)
@dataclass
class APIResponse(Generic[T]):
success: bool
data: Optional[T] = None
error: Optional[ErrorInfo] = None
meta: Optional[PaginationMeta] = None
@classmethod
def ok(cls, data: T) -> 'APIResponse[T]':
return cls(success=True, data=data)
@classmethod
def fail(cls, message: str, code: int = 400) -> 'APIResponse[None]':
return cls(success=False, error=ErrorInfo(message=message, code=code))
@classmethod
def paginated(cls, items: List[T], total: int, page: int, page_size: int) -> 'APIResponse[List[T]]':
return cls(
success=True,
data=items,
meta=PaginationMeta(total=total, page=page, page_size=page_size),
)
def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {"success": self.success}
if self.data is not None:
result["data"] = self.data
if self.error:
result["error"] = {"message": self.error.message, "code": self.error.code}
if not self.error:
result.setdefault("data", None)
result.setdefault("error", None)
if self.meta:
result["meta"] = {
"total": self.meta.total,
"page": self.meta.page,
"pages": self.meta.pages,
}
return result
print("Success:", APIResponse.ok({"id": 1, "name": "Alice"}).to_dict())
print("Error:", APIResponse.fail("Not found", 404).to_dict())
print("Paged:", APIResponse.paginated([1, 2, 3], total=10, page=1, page_size=3).to_dict())
Envelope pattern: Wrapping all API responses in a consistent {"success": bool, "data": ..., "error": ...} envelope makes client code simpler — clients always know the shape, regardless of which endpoint they hit. FastAPI's response_model parameter enforces this at the framework level.
from dataclasses import dataclass, field
from typing import TypeVar, Generic, Optional, List, Any
from datetime import datetime
T = TypeVar('T')
@dataclass
class APIResponse(Generic[T]):
"""Typed API response envelope.
Supports:
- APIResponse.ok(data) -> success response
- APIResponse.error(message, code) -> error response
- APIResponse.paginated(items, total, page, page_size) -> paged response
- .to_dict() -> serializable dict
"""
passExpected Output
Success: {'success': True, 'data': {'id': 1, 'name': 'Alice'}, 'error': None}\nError: {'success': False, 'data': None, 'error': {'message': 'Not found', 'code': 404}}\nPaged: {'success': True, 'data': [1, 2, 3], 'meta': {'total': 10, 'page': 1, 'pages': 4}}Hints
Hint 1: Use Generic[T] so the response can carry any data type with full type hints. The `data` field is Optional[T].
Hint 2: Use @classmethod constructors (ok, error, paginated) — they are more readable than calling __init__ directly. Store pagination metadata in a separate `meta` field.
Build an async background task queue that tracks task status, supports concurrent workers, and gracefully shuts down.
import asyncio
import uuid
from typing import Callable, Any
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class TaskRecord:
id: str
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = ""
class BackgroundTaskQueue:
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue()
self._records: dict = {}
self._workers: list = []
self._running = False
def enqueue(self, coro_fn: Callable, *args) -> str:
task_id = f"task-{len(self._records)}"
self._records[task_id] = TaskRecord(id=task_id)
self._queue.put_nowait((task_id, coro_fn, args))
return task_id
def status(self, task_id: str) -> TaskRecord:
return self._records[task_id]
async def _worker(self):
while self._running:
try:
task_id, coro_fn, args = await asyncio.wait_for(
self._queue.get(), timeout=0.1
)
except asyncio.TimeoutError:
continue
record = self._records[task_id]
record.status = TaskStatus.RUNNING
try:
record.result = await coro_fn(*args)
record.status = TaskStatus.DONE
except Exception as e:
record.status = TaskStatus.FAILED
record.error = str(e)
finally:
self._queue.task_done()
async def start(self, workers: int = 3) -> None:
self._running = True
self._workers = [
asyncio.create_task(self._worker()) for _ in range(workers)
]
async def stop(self) -> None:
await self._queue.join()
self._running = False
for w in self._workers:
await w
async def double(x: int) -> int:
await asyncio.sleep(0.01)
return x * 2
async def main():
queue = BackgroundTaskQueue()
await queue.start(workers=2)
ids = [queue.enqueue(double, i + 1) for i in range(3)]
print(f"Enqueued {len(ids)} tasks")
await queue.stop()
print("All tasks completed")
for task_id in ids:
rec = queue.status(task_id)
print(f"Task {rec.id}: {rec.status.value}, result={rec.result}")
asyncio.run(main())
Solution
import asyncio
from typing import Callable, Any
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class TaskRecord:
id: str
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = ""
class BackgroundTaskQueue:
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue()
self._records: dict = {}
self._workers: list = []
self._running = False
def enqueue(self, coro_fn: Callable, *args) -> str:
task_id = f"task-{len(self._records)}"
self._records[task_id] = TaskRecord(id=task_id)
self._queue.put_nowait((task_id, coro_fn, args))
return task_id
def status(self, task_id: str) -> TaskRecord:
return self._records[task_id]
async def _worker(self):
while self._running:
try:
task_id, coro_fn, args = await asyncio.wait_for(
self._queue.get(), timeout=0.1
)
except asyncio.TimeoutError:
continue
record = self._records[task_id]
record.status = TaskStatus.RUNNING
try:
record.result = await coro_fn(*args)
record.status = TaskStatus.DONE
except Exception as e:
record.status = TaskStatus.FAILED
record.error = str(e)
finally:
self._queue.task_done()
async def start(self, workers: int = 3) -> None:
self._running = True
self._workers = [asyncio.create_task(self._worker()) for _ in range(workers)]
async def stop(self) -> None:
await self._queue.join()
self._running = False
for w in self._workers:
await w
async def double(x: int) -> int:
await asyncio.sleep(0.01)
return x * 2
async def main():
queue = BackgroundTaskQueue()
await queue.start(workers=2)
ids = [queue.enqueue(double, i + 1) for i in range(3)]
print(f"Enqueued {len(ids)} tasks")
await queue.stop()
print("All tasks completed")
for task_id in ids:
rec = queue.status(task_id)
print(f"Task {rec.id}: {rec.status.value}, result={rec.result}")
asyncio.run(main())
queue.join() semantics: queue.join() blocks until every item placed in the queue has been retrieved AND task_done() has been called for it. This is the correct shutdown mechanism — it drains all pending work before stopping workers. FastAPI's BackgroundTasks works similarly but delegates to Starlette's task runner.
import asyncio
from typing import Callable, Any, Awaitable
from dataclasses import dataclass, field
from enum import Enum
import uuid
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class TaskRecord:
id: str
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = ""
class BackgroundTaskQueue:
"""Async background task queue with status tracking.
- enqueue(coro_fn, *args) -> task_id
- status(task_id) -> TaskRecord
- start(workers=3) -> launches worker coroutines
- stop() -> drains queue and shuts down workers
"""
passExpected Output
Enqueued 3 tasks\nAll tasks completed\nTask task-0: done, result=2\nTask task-1: done, result=4\nTask task-2: done, result=6Hints
Hint 1: Use `asyncio.Queue` to hold pending tasks as (task_id, coro_fn, args) tuples. Worker coroutines loop on `await queue.get()` and process each task.
Hint 2: Track task records in a dict keyed by task_id. Update status to RUNNING before executing, then DONE or FAILED after.
Build an idempotency cache that deduplicates identical API requests within a TTL window, preventing duplicate payments, duplicate user creation, etc.
import asyncio
import time
from typing import Any, Callable, Awaitable, Optional, Tuple
class IdempotencyCache:
def __init__(self, ttl: float = 60.0):
self._ttl = ttl
self._store: dict = {}
self._locks: dict = {}
async def execute(
self,
idempotency_key: str,
handler: Callable[..., Awaitable[Any]],
*args,
) -> Tuple[Any, bool]:
"""
Returns (result, was_cached).
If was_cached is True, handler was NOT called again.
"""
now = time.monotonic()
# Check cache (fast path, no lock)
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if now - stored_at < self._ttl:
return result, True
# Get or create a per-key lock to prevent duplicate execution
if idempotency_key not in self._locks:
self._locks[idempotency_key] = asyncio.Lock()
lock = self._locks[idempotency_key]
async with lock:
# Re-check after acquiring lock (another coroutine may have filled it)
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if time.monotonic() - stored_at < self._ttl:
return result, True
result = await handler(*args)
self._store[idempotency_key] = (result, time.monotonic())
return result, False
execution_count = 0
async def charge_card(amount: float) -> dict:
global execution_count
execution_count += 1
return {"charged": amount, "tx_id": f"tx_{execution_count}"}
async def main():
cache = IdempotencyCache(ttl=0.5)
result, cached = await cache.execute("req-abc", charge_card, 99.99)
print(f"First call: {'returned cached result' if cached else 'executed handler'}")
result2, cached2 = await cache.execute("req-abc", charge_card, 99.99)
print(f"Second call (same key): {'returned cached result' if cached2 else 'executed handler'}")
assert result == result2
result3, cached3 = await cache.execute("req-xyz", charge_card, 49.99)
print(f"Different key: {'returned cached result' if cached3 else 'executed handler'}")
await asyncio.sleep(0.6)
result4, cached4 = await cache.execute("req-abc", charge_card, 99.99)
print(f"After TTL: {'returned cached result' if cached4 else 'executed handler (cache expired)'}")
asyncio.run(main())
Solution
import asyncio
import time
from typing import Any, Callable, Awaitable, Tuple
class IdempotencyCache:
def __init__(self, ttl: float = 60.0):
self._ttl = ttl
self._store: dict = {}
self._locks: dict = {}
async def execute(
self,
idempotency_key: str,
handler: Callable[..., Awaitable[Any]],
*args,
) -> Tuple[Any, bool]:
now = time.monotonic()
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if now - stored_at < self._ttl:
return result, True
if idempotency_key not in self._locks:
self._locks[idempotency_key] = asyncio.Lock()
lock = self._locks[idempotency_key]
async with lock:
if idempotency_key in self._store:
result, stored_at = self._store[idempotency_key]
if time.monotonic() - stored_at < self._ttl:
return result, True
result = await handler(*args)
self._store[idempotency_key] = (result, time.monotonic())
return result, False
execution_count = 0
async def charge_card(amount: float) -> dict:
global execution_count
execution_count += 1
return {"charged": amount, "tx_id": f"tx_{execution_count}"}
async def main():
cache = IdempotencyCache(ttl=0.5)
result, cached = await cache.execute("req-abc", charge_card, 99.99)
print(f"First call: {'returned cached result' if cached else 'executed handler'}")
result2, cached2 = await cache.execute("req-abc", charge_card, 99.99)
print(f"Second call (same key): {'returned cached result' if cached2 else 'executed handler'}")
result3, cached3 = await cache.execute("req-xyz", charge_card, 49.99)
print(f"Different key: {'returned cached result' if cached3 else 'executed handler'}")
await asyncio.sleep(0.6)
result4, cached4 = await cache.execute("req-abc", charge_card, 99.99)
print(f"After TTL: {'returned cached result' if cached4 else 'executed handler (cache expired)'}")
asyncio.run(main())
Double-checked locking: The pattern here — check before lock, re-check after lock — is a classic double-checked locking pattern. The first check avoids lock contention on the happy path (cache hit). The second check inside the lock prevents two coroutines that both missed the cache from both executing the handler.
Stripe's idempotency: Stripe requires clients to pass Idempotency-Key headers for payment operations. Their implementation stores results for 24 hours, allowing safe retries on network failures without double-charging customers.
import asyncio
import hashlib
import json
import time
from typing import Any, Callable, Optional, Awaitable
class IdempotencyCache:
"""Request deduplication cache for idempotent API operations.
Given an idempotency_key and request body, ensures that
executing the same request twice returns the cached result
rather than executing the handler a second time.
TTL-based expiry removes stale entries.
"""
passExpected Output
First call: executed handler\nSecond call (same key): returned cached result\nDifferent key: executed handler\nAfter TTL: executed handler (cache expired)Hints
Hint 1: Store {idempotency_key: (result, timestamp)} in a dict. On each call, check if key exists and not expired before running the handler.
Hint 2: Use asyncio.Lock per key to prevent concurrent duplicate requests from both executing the handler simultaneously (the "thundering herd" for the same key).
Build a production health check subsystem that runs component checks concurrently, handles timeouts, and computes an overall system health status.
import asyncio
import time
from typing import Callable, Dict, Any, Awaitable, List, Optional
from dataclasses import dataclass
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ComponentHealth:
name: str
status: HealthStatus
latency_ms: Optional[float] = None
detail: str = ""
@dataclass
class HealthReport:
overall: HealthStatus
components: List[ComponentHealth]
def __str__(self):
lines = [f"Overall: {self.overall.value}"]
for c in self.components:
if c.status == HealthStatus.HEALTHY:
lines.append(f"{c.name}: healthy ({c.latency_ms:.0f}ms)")
else:
lines.append(f"{c.name}: {c.status.value} - {c.detail}")
return "\n".join(lines)
class HealthChecker:
def __init__(self, timeout: float = 2.0):
self._timeout = timeout
self._checks: List[tuple] = []
def register(self, name: str, check_fn: Callable, critical: bool = True) -> None:
self._checks.append((name, check_fn, critical))
async def _run_one(self, name: str, check_fn: Callable, critical: bool) -> ComponentHealth:
start = time.monotonic()
try:
await asyncio.wait_for(check_fn(), timeout=self._timeout)
latency = (time.monotonic() - start) * 1000
return ComponentHealth(name=name, status=HealthStatus.HEALTHY, latency_ms=latency)
except asyncio.TimeoutError:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail="timeout")
except Exception as e:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail=str(e))
async def run_all(self) -> HealthReport:
results = await asyncio.gather(
*[self._run_one(name, fn, critical) for name, fn, critical in self._checks],
return_exceptions=False,
)
critical_names = {name for name, _, critical in self._checks if critical}
overall = HealthStatus.HEALTHY
for comp in results:
if comp.status == HealthStatus.UNHEALTHY:
if comp.name in critical_names:
overall = HealthStatus.UNHEALTHY
break
overall = HealthStatus.DEGRADED
return HealthReport(overall=overall, components=list(results))
async def main():
async def db_check():
await asyncio.sleep(0.002)
async def cache_check():
await asyncio.sleep(0.001)
async def failing_cache():
raise ConnectionError("Connection refused")
async def slow_db():
await asyncio.sleep(5.0)
checker = HealthChecker(timeout=1.0)
checker.register("db", db_check, critical=True)
checker.register("cache", cache_check, critical=False)
print(await checker.run_all())
print()
checker2 = HealthChecker(timeout=1.0)
checker2.register("db", db_check, critical=True)
checker2.register("cache", failing_cache, critical=False)
print(await checker2.run_all())
print()
checker3 = HealthChecker(timeout=0.1)
checker3.register("db", slow_db, critical=True)
checker3.register("cache", cache_check, critical=False)
print(await checker3.run_all())
asyncio.run(main())
Solution
import asyncio
import time
from typing import Callable, List, Optional
from dataclasses import dataclass
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class ComponentHealth:
name: str
status: HealthStatus
latency_ms: Optional[float] = None
detail: str = ""
@dataclass
class HealthReport:
overall: HealthStatus
components: List[ComponentHealth]
def __str__(self):
lines = [f"Overall: {self.overall.value}"]
for c in self.components:
if c.status == HealthStatus.HEALTHY:
lines.append(f"{c.name}: healthy ({c.latency_ms:.0f}ms)")
else:
lines.append(f"{c.name}: {c.status.value} - {c.detail}")
return "\n".join(lines)
class HealthChecker:
def __init__(self, timeout: float = 2.0):
self._timeout = timeout
self._checks: List[tuple] = []
def register(self, name: str, check_fn: Callable, critical: bool = True) -> None:
self._checks.append((name, check_fn, critical))
async def _run_one(self, name: str, check_fn: Callable, critical: bool) -> ComponentHealth:
start = time.monotonic()
try:
await asyncio.wait_for(check_fn(), timeout=self._timeout)
latency = (time.monotonic() - start) * 1000
return ComponentHealth(name=name, status=HealthStatus.HEALTHY, latency_ms=latency)
except asyncio.TimeoutError:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail="timeout")
except Exception as e:
return ComponentHealth(name=name, status=HealthStatus.UNHEALTHY, detail=str(e))
async def run_all(self) -> HealthReport:
results = await asyncio.gather(
*[self._run_one(name, fn, critical) for name, fn, critical in self._checks],
return_exceptions=False,
)
critical_names = {name for name, _, critical in self._checks if critical}
overall = HealthStatus.HEALTHY
for comp in results:
if comp.status == HealthStatus.UNHEALTHY:
if comp.name in critical_names:
overall = HealthStatus.UNHEALTHY
break
overall = HealthStatus.DEGRADED
return HealthReport(overall=overall, components=list(results))
async def main():
async def db_check(): await asyncio.sleep(0.002)
async def cache_check(): await asyncio.sleep(0.001)
async def failing_cache(): raise ConnectionError("Connection refused")
async def slow_db(): await asyncio.sleep(5.0)
checker = HealthChecker(timeout=1.0)
checker.register("db", db_check, critical=True)
checker.register("cache", cache_check, critical=False)
print(await checker.run_all())
print()
checker2 = HealthChecker(timeout=1.0)
checker2.register("db", db_check, critical=True)
checker2.register("cache", failing_cache, critical=False)
print(await checker2.run_all())
print()
checker3 = HealthChecker(timeout=0.1)
checker3.register("db", slow_db, critical=True)
checker3.register("cache", cache_check, critical=False)
print(await checker3.run_all())
asyncio.run(main())
Liveness vs readiness: Kubernetes distinguishes liveness checks (is the process alive?) from readiness checks (is it ready to serve traffic?). A readiness check failing removes the pod from the load balancer; a liveness check failing causes a pod restart. This HealthChecker implements readiness-style checks. In FastAPI, expose GET /healthz (liveness, always 200) and GET /ready (readiness, uses this checker).
import asyncio
import time
from typing import Callable, Dict, Any, Awaitable
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
class HealthChecker:
"""Production health check subsystem.
- register(name, check_fn, critical=False) adds a component check
- run_all() executes all checks concurrently with timeout
- get_report() returns overall status + per-component details
- Critical failures -> UNHEALTHY; non-critical -> DEGRADED
"""
passExpected Output
Overall: healthy\ndb: healthy (2ms)\ncache: healthy (1ms)\nOverall: degraded\ndb: healthy (2ms)\ncache: unhealthy - Connection refused\nOverall: unhealthy\ndb: unhealthy - timeout\ncache: healthy (1ms)Hints
Hint 1: Use asyncio.gather() with return_exceptions=True to run all checks concurrently. Wrap each check in asyncio.wait_for() to enforce the timeout.
Hint 2: Compute overall status: if any critical check is unhealthy -> UNHEALTHY; elif any check is unhealthy -> DEGRADED; else HEALTHY.
