Python Building an Async API Service: Practice Problems & Exercises
Practice: Building an Async API Service
← Back to lesson:::note Environment
Problems in this set use pure asyncio patterns runnable without FastAPI. Where FastAPI-specific patterns are taught, equivalent asyncio patterns are demonstrated with notes about the FastAPI equivalent.
:::
Write an async request handler that simulates an async database lookup.
Solution:
import asyncio
DB = {1: 'Alice', 2: 'Bob', 3: 'Carol'}
async def get_user(user_id: int):
await asyncio.sleep(0.05)
name = DB.get(user_id)
if name is None:
return {'error': 'not found'}
return {'id': user_id, 'name': name}
import asyncio
# Simulate a FastAPI-style async route handler.
# In FastAPI: @app.get('/users/{user_id}')
# async def get_user(user_id: int): ...
DB = {1: 'Alice', 2: 'Bob', 3: 'Carol'}
async def get_user(user_id: int):
# TODO: Simulate an async DB lookup (sleep 0.05s) and return
# {'id': user_id, 'name': name} or {'error': 'not found'}
pass
async def main():
r1 = await get_user(1)
r2 = await get_user(99)
print(r1)
print(r2)
asyncio.run(main())
Expected Output
{'id': 1, 'name': 'Alice'}
{'error': 'not found'}Hints
Hint 1: await asyncio.sleep(0.05) simulates an async database query.
Hint 2: Return the dict directly — FastAPI would serialize it to JSON automatically.
Hint 3: DB.get(user_id) returns None if not found.
Demonstrate the throughput difference between serial and concurrent async request handling.
Solution:
async def process_requests_concurrent(n):
t0 = time.perf_counter()
results = await asyncio.gather(*[handle_request(i) for i in range(n)])
return list(results), time.perf_counter() - t0
import asyncio
import time
async def handle_request(request_id):
await asyncio.sleep(0.1) # Simulate I/O
return {'request_id': request_id, 'status': 'ok'}
async def process_requests_serial(n):
t0 = time.perf_counter()
results = [await handle_request(i) for i in range(n)]
return results, time.perf_counter() - t0
async def process_requests_concurrent(n):
# TODO: Handle n requests concurrently using gather
t0 = time.perf_counter()
pass
async def main():
n = 10
serial_results, serial_time = await process_requests_serial(n)
concurrent_results, concurrent_time = await process_requests_concurrent(n)
print(f'Serial: {serial_time:.2f}s')
print(f'Concurrent: {concurrent_time:.2f}s')
print(f'Results equal: {serial_results == concurrent_results}')
asyncio.run(main())
Expected Output
Serial: 1.00s
Concurrent: 0.10s
Results equal: TrueHints
Hint 1: asyncio.gather(*[handle_request(i) for i in range(n)]) runs all concurrently.
Hint 2: Serial: each await blocks until the previous completes — 10 * 0.1s = 1s.
Hint 3: Concurrent: all run in parallel — max(0.1s) = 0.1s.
Implement fire-and-forget background tasks using asyncio.create_task for post-response processing.
Solution:
async def create_order(user_id, items):
order = {'user_id': user_id, 'items': items, 'status': 'created'}
asyncio.create_task(send_notification(user_id, 'Order created'))
return order
import asyncio
# In FastAPI: BackgroundTasks.add_task(fn, arg)
# In pure asyncio: asyncio.create_task(fn(arg))
notification_log = []
async def send_notification(user_id, message):
await asyncio.sleep(0.1) # Simulate sending email/webhook
notification_log.append({'to': user_id, 'msg': message})
async def create_order(user_id, items):
# TODO: Create the order (immediate return), then
# launch send_notification as a background task.
# Return the order immediately without waiting for notification.
order = {'user_id': user_id, 'items': items, 'status': 'created'}
pass
async def main():
notification_log.clear()
order = await create_order(42, ['widget', 'gadget'])
print('Order:', order)
print('Notification sent at return time:', len(notification_log) == 0)
# Give background task time to complete
await asyncio.sleep(0.2)
print('Notification sent after wait:', len(notification_log) == 1)
asyncio.run(main())
Expected Output
Order: {'user_id': 42, 'items': ['widget', 'gadget'], 'status': 'created'}
Notification sent at return time: True
Notification sent after wait: TrueHints
Hint 1: asyncio.create_task(send_notification(user_id, 'Order created')) fires the task.
Hint 2: The task runs in the background — the function returns immediately.
Hint 3: After await asyncio.sleep(0.2), the background task has had time to complete.
Build a concurrent health check that runs multiple dependency checks in parallel and aggregates results.
Solution:
import time
async def health_check():
t0 = time.perf_counter()
db, cache, queue = await asyncio.gather(
check_database(), check_cache(), check_queue()
)
elapsed_ms = (time.perf_counter() - t0) * 1000
services = [db, cache, queue]
overall = 'ok' if all(s['status'] == 'ok' for s in services) else 'degraded'
return {'status': overall, 'services': services, 'total_ms': round(elapsed_ms)}
import asyncio
import time
async def check_database():
await asyncio.sleep(0.05)
return {'service': 'database', 'status': 'ok', 'latency_ms': 50}
async def check_cache():
await asyncio.sleep(0.02)
return {'service': 'cache', 'status': 'ok', 'latency_ms': 20}
async def check_queue():
await asyncio.sleep(0.03)
return {'service': 'queue', 'status': 'ok', 'latency_ms': 30}
# TODO: Implement health_check() that runs all three checks concurrently.
# Return {'status': 'ok', 'services': [...], 'total_ms': N}
# where status is 'ok' if all services are ok, else 'degraded'.
async def health_check():
pass
result = asyncio.run(health_check())
print(f"Overall status: {result['status']}")
print(f"Services checked: {len(result['services'])}")
print(f"Total ms < 100: {result['total_ms'] < 100}") # Concurrent, so ~50ms
Expected Output
Overall status: ok
Services checked: 3
Total ms < 100: TrueHints
Hint 1: asyncio.gather(check_database(), check_cache(), check_queue()) runs all concurrently.
Hint 2: Total time is max(50, 20, 30) = 50ms, not 50+20+30 = 100ms.
Hint 3: Check if any service status != 'ok' to set overall status to 'degraded'.
Build an async connection pool using asyncio.Semaphore and @asynccontextmanager.
Solution:
import asyncio
from contextlib import asynccontextmanager
class AsyncConnectionPool:
def __init__(self, max_connections=5):
self._sem = asyncio.Semaphore(max_connections)
self._lock = asyncio.Lock()
self._active = 0
@asynccontextmanager
async def acquire(self):
async with self._sem:
async with self._lock:
self._active += 1
active = self._active
try:
yield {'id': id(self), 'active': active}
finally:
async with self._lock:
self._active -= 1
import asyncio
from contextlib import asynccontextmanager
# Simulate an async database connection pool.
# Real libraries: asyncpg.create_pool(), aiopg, aiosqlite
class AsyncConnectionPool:
def __init__(self, max_connections=5):
self._sem = asyncio.Semaphore(max_connections)
self._conn_count = 0
@asynccontextmanager
async def acquire(self):
# TODO: Acquire semaphore, yield a fake "connection" dict,
# release on exit. Track active connection count.
pass
async def test_pool():
pool = AsyncConnectionPool(max_connections=3)
active_counts = []
async def query(query_id):
async with pool.acquire() as conn:
active_counts.append(conn['active'])
await asyncio.sleep(0.05)
return f'result_{query_id}'
results = await asyncio.gather(*[query(i) for i in range(8)])
return sorted(results), max(active_counts)
results, max_concurrent = asyncio.run(test_pool())
print(f'All 8 results: {len(results) == 8}')
print(f'Max concurrent connections: {max_concurrent}')
print(f'Pool limit respected: {max_concurrent <= 3}')
Expected Output
All 8 results: True
Max concurrent connections: 3
Pool limit respected: TrueHints
Hint 1: async with self._sem: limits to max_connections concurrent acquisitions.
Hint 2: yield {'id': id(self), 'active': ...} gives a fake connection object.
Hint 3: @asynccontextmanager from contextlib works with yield for async context managers.
Implement a @with_timeout decorator that enforces response time limits on async handlers.
Solution:
import asyncio
import functools
def with_timeout(timeout_seconds):
def decorator(handler):
@functools.wraps(handler)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(
handler(*args, **kwargs),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
return {'error': 'timeout', 'handler': handler.__name__}
return wrapper
return decorator
import asyncio
import functools
# Middleware pattern: wrap every handler with a timeout.
# In FastAPI: use middleware or asyncio.wait_for in dependency.
def with_timeout(timeout_seconds):
def decorator(handler):
@functools.wraps(handler)
async def wrapper(*args, **kwargs):
# TODO: Run handler with timeout. On TimeoutError, return
# {'error': 'timeout', 'handler': handler.__name__}
pass
return wrapper
return decorator
@with_timeout(0.2)
async def fast_handler():
await asyncio.sleep(0.05)
return {'status': 'ok', 'data': 'fast result'}
@with_timeout(0.2)
async def slow_handler():
await asyncio.sleep(0.5)
return {'status': 'ok', 'data': 'slow result'}
async def main():
r1 = await fast_handler()
r2 = await slow_handler()
print(r1)
print(r2)
asyncio.run(main())
Expected Output
{'status': 'ok', 'data': 'fast result'}
{'error': 'timeout', 'handler': 'slow_handler'}Hints
Hint 1: asyncio.wait_for(handler(*args, **kwargs), timeout=timeout_seconds) applies the timeout.
Hint 2: Catch asyncio.TimeoutError and return the error dict.
Hint 3: handler.__name__ gives the function name for the error response.
Implement request ID middleware using ContextVar so each concurrent request has its own trace ID.
Solution:
import uuid
async def middleware(handler, *args, **kwargs):
req_id = f'req-{uuid.uuid4().hex[:8]}'
token = request_id.set(req_id)
try:
await log('Handling request')
result = await handler(*args, **kwargs)
await log('Request complete')
return result
finally:
request_id.reset(token)
import asyncio
from contextvars import ContextVar
import uuid
# Each request gets a unique ID for tracing through logs.
# ContextVar ensures each task/request has its own isolated ID.
request_id: ContextVar[str] = ContextVar('request_id', default='no-request')
def get_request_id() -> str:
return request_id.get()
async def log(message):
print(f'[{get_request_id()}] {message}')
# TODO: Implement a request middleware that:
# 1. Assigns a unique request_id to each incoming request
# 2. Calls the handler
# 3. Logs start and end with the request_id
async def middleware(handler, *args, **kwargs):
pass
async def get_user_handler(user_id):
await log(f'Fetching user {user_id}')
await asyncio.sleep(0.05)
return {'user_id': user_id}
async def main():
results = await asyncio.gather(
middleware(get_user_handler, 1),
middleware(get_user_handler, 2),
middleware(get_user_handler, 3),
)
print('All completed:', len(results) == 3)
asyncio.run(main())
Expected Output
[req-XXXX] Handling request
[req-XXXX] Fetching user 1
...
All completed: TrueHints
Hint 1: token = request_id.set(f'req-{uuid.uuid4().hex[:8]}') sets the per-request ID.
Hint 2: Call handler(*args, **kwargs) after setting the context variable.
Hint 3: Use try/finally with request_id.reset(token) to clean up.
Implement an async TTL cache with stampede prevention, ensuring only one fetch per key at a time.
Solution:
import asyncio
import time
class AsyncTTLCache:
def __init__(self, ttl_seconds=1.0):
self.ttl = ttl_seconds
self._cache = {} # key -> (value, expire_at)
self._in_flight = {} # key -> asyncio.Future
self._lock = asyncio.Lock()
async def get_or_fetch(self, key, fetch_fn):
now = time.monotonic()
async with self._lock:
if key in self._cache:
value, expire_at = self._cache[key]
if now < expire_at:
return value
if key in self._in_flight:
future = self._in_flight[key]
# Release lock while waiting
self._lock.release()
try:
return await future
finally:
await self._lock.acquire()
future = asyncio.get_event_loop().create_future()
self._in_flight[key] = future
# Fetch outside lock
try:
value = await fetch_fn()
async with self._lock:
self._cache[key] = (value, time.monotonic() + self.ttl)
future.set_result(value)
del self._in_flight[key]
return value
except Exception as e:
async with self._lock:
if key in self._in_flight:
del self._in_flight[key]
future.set_exception(e)
raise
import asyncio
import time
# An async cache prevents duplicate concurrent fetches (cache stampede prevention).
class AsyncTTLCache:
def __init__(self, ttl_seconds=1.0):
self.ttl = ttl_seconds
self._cache = {}
self._in_flight = {}
self._lock = asyncio.Lock()
async def get_or_fetch(self, key, fetch_fn):
# TODO: Return cached value if valid.
# If another coroutine is already fetching this key, wait for it.
# Otherwise, fetch it yourself and cache the result.
pass
fetch_count = [0]
async def expensive_fetch(key):
fetch_count[0] += 1
await asyncio.sleep(0.1)
return f'data_for_{key}'
async def main():
cache = AsyncTTLCache(ttl_seconds=0.5)
# 5 concurrent requests for the same key — should only fetch once
results = await asyncio.gather(*[
cache.get_or_fetch('user:1', lambda: expensive_fetch('user:1'))
for _ in range(5)
])
print(f'Fetch count: {fetch_count[0]} (should be 1)')
print(f'All same value: {len(set(results)) == 1}')
await asyncio.sleep(0.6) # TTL expires
await cache.get_or_fetch('user:1', lambda: expensive_fetch('user:1'))
print(f'After TTL expiry, fetches: {fetch_count[0]} (should be 2)')
asyncio.run(main())
Expected Output
Fetch count: 1 (should be 1)
All same value: True
After TTL expiry, fetches: 2 (should be 2)Hints
Hint 1: Check if key is in cache and not expired. If so, return cached value.
Hint 2: If key is 'in_flight', wait for the existing Future to complete (await future).
Hint 3: Otherwise, create a Future, store in _in_flight, fetch, set result, cache it.
Implement the Circuit Breaker pattern with CLOSED, OPEN, and HALF_OPEN states for resilient async services.
Solution:
class CircuitBreaker:
CLOSED = 'CLOSED'
OPEN = 'OPEN'
HALF_OPEN = 'HALF_OPEN'
def __init__(self, failure_threshold=3, recovery_timeout=0.5):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._opened_at = None
async def call(self, fn, *args, **kwargs):
if self.state == self.OPEN:
if time.monotonic() - self._opened_at >= self.recovery_timeout:
self.state = self.HALF_OPEN
else:
raise RuntimeError('Circuit OPEN')
try:
result = await fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.state = self.CLOSED
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold or self.state == self.HALF_OPEN:
self.state = self.OPEN
self._opened_at = time.monotonic()
import asyncio
import time
# Circuit breaker: CLOSED (normal) -> OPEN (failing) -> HALF_OPEN (testing)
# CLOSED: pass requests through
# OPEN: fail fast, no requests sent
# HALF_OPEN: let one request through to test recovery
class CircuitBreaker:
CLOSED = 'CLOSED'
OPEN = 'OPEN'
HALF_OPEN = 'HALF_OPEN'
def __init__(self, failure_threshold=3, recovery_timeout=0.5):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._opened_at = None
async def call(self, fn, *args, **kwargs):
pass
def _on_success(self):
pass
def _on_failure(self):
pass
# Test
call_count = [0]
fail_until = [5]
async def unreliable_service():
call_count[0] += 1
if call_count[0] <= fail_until[0]:
raise RuntimeError('Service down')
return 'OK'
async def main():
cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.3)
results = []
for i in range(12):
try:
result = await cb.call(unreliable_service)
results.append(('ok', result, cb.state))
except Exception as e:
results.append(('error', str(e), cb.state))
await asyncio.sleep(0.05)
for r in results:
print(r)
asyncio.run(main())
Expected Output
('error', 'Service down', 'CLOSED')
('error', 'Service down', 'CLOSED')
('error', 'Service down', 'OPEN')
('error', 'Circuit OPEN', 'OPEN')
...Hints
Hint 1: In CLOSED: call fn; on failure increment count; if count >= threshold, transition to OPEN.
Hint 2: In OPEN: check if recovery_timeout has elapsed; if yes, transition to HALF_OPEN; if no, raise fast.
Hint 3: In HALF_OPEN: let one call through; on success -> CLOSED; on failure -> OPEN.
Compose an async middleware chain where each middleware wraps the next in sequence.
Solution:
import functools
def apply_middleware(handler, middlewares):
wrapped = handler
for middleware in reversed(middlewares):
current_wrapped = wrapped
current_middleware = middleware
async def make_wrapper(req, mw=current_middleware, inner=current_wrapped):
return await mw(inner, req)
wrapped = make_wrapper
return wrapped
import asyncio
import time
# ASGI middleware chains: each middleware wraps the next.
# request -> middleware1 -> middleware2 -> handler -> middleware2 -> middleware1 -> response
async def timing_middleware(handler, request):
t0 = time.perf_counter()
response = await handler(request)
elapsed = (time.perf_counter() - t0) * 1000
response['timing_ms'] = round(elapsed, 2)
return response
async def auth_middleware(handler, request):
if 'token' not in request:
return {'error': 'unauthorized', 'status': 401}
response = await handler(request)
response['authenticated'] = True
return response
async def logging_middleware(handler, request):
print(f' >> {request.get("path")} start')
response = await handler(request)
print(f' << {request.get("path")} done')
return response
# TODO: Implement apply_middleware(handler, middlewares) that
# composes the middleware chain so they execute in order.
# The last middleware in the list wraps the handler directly.
def apply_middleware(handler, middlewares):
pass
async def user_handler(request):
await asyncio.sleep(0.05)
return {'user': 'Alice', 'status': 200}
async def main():
middlewares = [timing_middleware, auth_middleware, logging_middleware]
chained = apply_middleware(user_handler, middlewares)
request = {'path': '/users/1', 'token': 'abc123'}
response = await chained(request)
print('Response:', response)
asyncio.run(main())
Expected Output
>> /users/1 start
<< /users/1 done
Response: {'user': 'Alice', 'status': 200, 'authenticated': True, 'timing_ms': X.XX}Hints
Hint 1: Build from the inside out: wrap handler with last middleware, then wrap that with second-to-last, etc.
Hint 2: Use functools.reduce or a loop: for mw in reversed(middlewares): wrapped = partial(mw, wrapped)
Hint 3: Each middleware receives (handler, request) and calls await handler(request).
Build a complete async service that integrates caching, rate limiting, timeouts, and metrics into a single get method.
Solution:
import asyncio
import time
class AsyncService:
def __init__(self):
self._cache = {}
self._metrics = {'hits': 0, 'misses': 0, 'timeouts': 0, 'total': 0}
self._sem = asyncio.Semaphore(5)
async def fetch(self, resource_id):
await asyncio.sleep(0.1)
if resource_id > 100:
raise ValueError(f'Resource {resource_id} not found')
return {'id': resource_id, 'data': f'content_{resource_id}'}
async def get(self, resource_id):
self._metrics['total'] += 1
# Check cache
if resource_id in self._cache:
value, expire_at = self._cache[resource_id]
if time.monotonic() < expire_at:
self._metrics['hits'] += 1
return value
self._metrics['misses'] += 1
# Rate limit + fetch with timeout
async with self._sem:
try:
result = await asyncio.wait_for(
self.fetch(resource_id), timeout=0.5
)
self._cache[resource_id] = (result, time.monotonic() + 1.0)
return result
except asyncio.TimeoutError:
self._metrics['timeouts'] += 1
raise
def get_metrics(self):
return dict(self._metrics)
import asyncio
import time
# Build a complete async service integrating:
# 1. Rate limiting (max 5 req/sec)
# 2. Caching (TTL 1s)
# 3. Timeout (0.5s per request)
# 4. Metrics collection
class AsyncService:
def __init__(self):
self._cache = {}
self._metrics = {'hits': 0, 'misses': 0, 'timeouts': 0, 'total': 0}
self._sem = asyncio.Semaphore(5)
async def fetch(self, resource_id):
# Simulate external API call
await asyncio.sleep(0.1)
if resource_id > 100:
raise ValueError(f'Resource {resource_id} not found')
return {'id': resource_id, 'data': f'content_{resource_id}'}
async def get(self, resource_id):
# TODO: Implement with caching, rate limiting, and timeout
pass
def get_metrics(self):
return dict(self._metrics)
async def main():
service = AsyncService()
# Run 10 concurrent requests — some cached, some new
ids = [1, 2, 1, 3, 2, 4, 1, 5, 6, 1]
results = await asyncio.gather(
*[service.get(rid) for rid in ids],
return_exceptions=True
)
metrics = service.get_metrics()
print(f'Total requests: {metrics["total"]}')
print(f'Cache hits: {metrics["hits"]}')
print(f'Cache misses: {metrics["misses"]}')
successes = [r for r in results if isinstance(r, dict)]
print(f'Successful: {len(successes)}')
asyncio.run(main())
Expected Output
Total requests: 10
Cache hits: 4
Cache misses: 6
Successful: 10Hints
Hint 1: Check self._cache[resource_id] first — return if hit, increment metrics['hits'].
Hint 2: Use async with self._sem: to enforce rate limiting.
Hint 3: Wrap the fetch in asyncio.wait_for(self.fetch(resource_id), timeout=0.5).
