Skip to main content

Python Building an Async API Service: Practice Problems & Exercises

Practice: Building an Async API Service

11 problems4 Easy4 Medium3 Hard60–80 min
← Back to lesson

:::note Environment Problems in this set use pure asyncio patterns runnable without FastAPI. Where FastAPI-specific patterns are taught, equivalent asyncio patterns are demonstrated with notes about the FastAPI equivalent. :::


#1Async Request HandlerEasy
async defhandlerrequest/response

Write an async request handler that simulates an async database lookup.

Solution:

import asyncio

DB = {1: 'Alice', 2: 'Bob', 3: 'Carol'}

async def get_user(user_id: int):
await asyncio.sleep(0.05)
name = DB.get(user_id)
if name is None:
return {'error': 'not found'}
return {'id': user_id, 'name': name}
import asyncio

# Simulate a FastAPI-style async route handler.
# In FastAPI: @app.get('/users/{user_id}')
# async def get_user(user_id: int): ...

DB = {1: 'Alice', 2: 'Bob', 3: 'Carol'}

async def get_user(user_id: int):
  # TODO: Simulate an async DB lookup (sleep 0.05s) and return
  # {'id': user_id, 'name': name} or {'error': 'not found'}
  pass

async def main():
  r1 = await get_user(1)
  r2 = await get_user(99)
  print(r1)
  print(r2)

asyncio.run(main())
Expected Output
{'id': 1, 'name': 'Alice'}
{'error': 'not found'}
Hints

Hint 1: await asyncio.sleep(0.05) simulates an async database query.

Hint 2: Return the dict directly — FastAPI would serialize it to JSON automatically.

Hint 3: DB.get(user_id) returns None if not found.


#2Concurrent Request ProcessingEasy
gatherconcurrent requeststhroughput

Demonstrate the throughput difference between serial and concurrent async request handling.

Solution:

async def process_requests_concurrent(n):
t0 = time.perf_counter()
results = await asyncio.gather(*[handle_request(i) for i in range(n)])
return list(results), time.perf_counter() - t0
import asyncio
import time

async def handle_request(request_id):
  await asyncio.sleep(0.1)  # Simulate I/O
  return {'request_id': request_id, 'status': 'ok'}

async def process_requests_serial(n):
  t0 = time.perf_counter()
  results = [await handle_request(i) for i in range(n)]
  return results, time.perf_counter() - t0

async def process_requests_concurrent(n):
  # TODO: Handle n requests concurrently using gather
  t0 = time.perf_counter()
  pass

async def main():
  n = 10
  serial_results, serial_time = await process_requests_serial(n)
  concurrent_results, concurrent_time = await process_requests_concurrent(n)

  print(f'Serial: {serial_time:.2f}s')
  print(f'Concurrent: {concurrent_time:.2f}s')
  print(f'Results equal: {serial_results == concurrent_results}')

asyncio.run(main())
Expected Output
Serial: 1.00s
Concurrent: 0.10s
Results equal: True
Hints

Hint 1: asyncio.gather(*[handle_request(i) for i in range(n)]) runs all concurrently.

Hint 2: Serial: each await blocks until the previous completes — 10 * 0.1s = 1s.

Hint 3: Concurrent: all run in parallel — max(0.1s) = 0.1s.


#3Background Task SimulationEasy
background taskcreate_taskfire-and-forget

Implement fire-and-forget background tasks using asyncio.create_task for post-response processing.

Solution:

async def create_order(user_id, items):
order = {'user_id': user_id, 'items': items, 'status': 'created'}
asyncio.create_task(send_notification(user_id, 'Order created'))
return order
import asyncio

# In FastAPI: BackgroundTasks.add_task(fn, arg)
# In pure asyncio: asyncio.create_task(fn(arg))

notification_log = []

async def send_notification(user_id, message):
  await asyncio.sleep(0.1)  # Simulate sending email/webhook
  notification_log.append({'to': user_id, 'msg': message})

async def create_order(user_id, items):
  # TODO: Create the order (immediate return), then
  # launch send_notification as a background task.
  # Return the order immediately without waiting for notification.
  order = {'user_id': user_id, 'items': items, 'status': 'created'}
  pass

async def main():
  notification_log.clear()
  order = await create_order(42, ['widget', 'gadget'])
  print('Order:', order)
  print('Notification sent at return time:', len(notification_log) == 0)

  # Give background task time to complete
  await asyncio.sleep(0.2)
  print('Notification sent after wait:', len(notification_log) == 1)

asyncio.run(main())
Expected Output
Order: {'user_id': 42, 'items': ['widget', 'gadget'], 'status': 'created'}
Notification sent at return time: True
Notification sent after wait: True
Hints

Hint 1: asyncio.create_task(send_notification(user_id, 'Order created')) fires the task.

Hint 2: The task runs in the background — the function returns immediately.

Hint 3: After await asyncio.sleep(0.2), the background task has had time to complete.


#4Async Health Check EndpointEasy
health checkgatherasyncdependencies

Build a concurrent health check that runs multiple dependency checks in parallel and aggregates results.

Solution:

import time

async def health_check():
t0 = time.perf_counter()
db, cache, queue = await asyncio.gather(
check_database(), check_cache(), check_queue()
)
elapsed_ms = (time.perf_counter() - t0) * 1000
services = [db, cache, queue]
overall = 'ok' if all(s['status'] == 'ok' for s in services) else 'degraded'
return {'status': overall, 'services': services, 'total_ms': round(elapsed_ms)}
import asyncio
import time

async def check_database():
  await asyncio.sleep(0.05)
  return {'service': 'database', 'status': 'ok', 'latency_ms': 50}

async def check_cache():
  await asyncio.sleep(0.02)
  return {'service': 'cache', 'status': 'ok', 'latency_ms': 20}

async def check_queue():
  await asyncio.sleep(0.03)
  return {'service': 'queue', 'status': 'ok', 'latency_ms': 30}

# TODO: Implement health_check() that runs all three checks concurrently.
# Return {'status': 'ok', 'services': [...], 'total_ms': N}
# where status is 'ok' if all services are ok, else 'degraded'.

async def health_check():
  pass

result = asyncio.run(health_check())
print(f"Overall status: {result['status']}")
print(f"Services checked: {len(result['services'])}")
print(f"Total ms < 100: {result['total_ms'] < 100}")  # Concurrent, so ~50ms
Expected Output
Overall status: ok
Services checked: 3
Total ms < 100: True
Hints

Hint 1: asyncio.gather(check_database(), check_cache(), check_queue()) runs all concurrently.

Hint 2: Total time is max(50, 20, 30) = 50ms, not 50+20+30 = 100ms.

Hint 3: Check if any service status != 'ok' to set overall status to 'degraded'.


#5Async Connection PoolMedium
connection poolSemaphoreasync context manager

Build an async connection pool using asyncio.Semaphore and @asynccontextmanager.

Solution:

import asyncio
from contextlib import asynccontextmanager

class AsyncConnectionPool:
def __init__(self, max_connections=5):
self._sem = asyncio.Semaphore(max_connections)
self._lock = asyncio.Lock()
self._active = 0

@asynccontextmanager
async def acquire(self):
async with self._sem:
async with self._lock:
self._active += 1
active = self._active
try:
yield {'id': id(self), 'active': active}
finally:
async with self._lock:
self._active -= 1
import asyncio
from contextlib import asynccontextmanager

# Simulate an async database connection pool.
# Real libraries: asyncpg.create_pool(), aiopg, aiosqlite

class AsyncConnectionPool:
  def __init__(self, max_connections=5):
      self._sem = asyncio.Semaphore(max_connections)
      self._conn_count = 0

  @asynccontextmanager
  async def acquire(self):
      # TODO: Acquire semaphore, yield a fake "connection" dict,
      # release on exit. Track active connection count.
      pass

async def test_pool():
  pool = AsyncConnectionPool(max_connections=3)
  active_counts = []

  async def query(query_id):
      async with pool.acquire() as conn:
          active_counts.append(conn['active'])
          await asyncio.sleep(0.05)
          return f'result_{query_id}'

  results = await asyncio.gather(*[query(i) for i in range(8)])
  return sorted(results), max(active_counts)

results, max_concurrent = asyncio.run(test_pool())
print(f'All 8 results: {len(results) == 8}')
print(f'Max concurrent connections: {max_concurrent}')
print(f'Pool limit respected: {max_concurrent <= 3}')
Expected Output
All 8 results: True
Max concurrent connections: 3
Pool limit respected: True
Hints

Hint 1: async with self._sem: limits to max_connections concurrent acquisitions.

Hint 2: yield {'id': id(self), 'active': ...} gives a fake connection object.

Hint 3: @asynccontextmanager from contextlib works with yield for async context managers.


#6Request Timeout MiddlewareMedium
timeoutmiddlewarewait_forcircuit breaker

Implement a @with_timeout decorator that enforces response time limits on async handlers.

Solution:

import asyncio
import functools

def with_timeout(timeout_seconds):
def decorator(handler):
@functools.wraps(handler)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(
handler(*args, **kwargs),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
return {'error': 'timeout', 'handler': handler.__name__}
return wrapper
return decorator
import asyncio
import functools

# Middleware pattern: wrap every handler with a timeout.
# In FastAPI: use middleware or asyncio.wait_for in dependency.

def with_timeout(timeout_seconds):
  def decorator(handler):
      @functools.wraps(handler)
      async def wrapper(*args, **kwargs):
          # TODO: Run handler with timeout. On TimeoutError, return
          # {'error': 'timeout', 'handler': handler.__name__}
          pass
      return wrapper
  return decorator

@with_timeout(0.2)
async def fast_handler():
  await asyncio.sleep(0.05)
  return {'status': 'ok', 'data': 'fast result'}

@with_timeout(0.2)
async def slow_handler():
  await asyncio.sleep(0.5)
  return {'status': 'ok', 'data': 'slow result'}

async def main():
  r1 = await fast_handler()
  r2 = await slow_handler()
  print(r1)
  print(r2)

asyncio.run(main())
Expected Output
{'status': 'ok', 'data': 'fast result'}
{'error': 'timeout', 'handler': 'slow_handler'}
Hints

Hint 1: asyncio.wait_for(handler(*args, **kwargs), timeout=timeout_seconds) applies the timeout.

Hint 2: Catch asyncio.TimeoutError and return the error dict.

Hint 3: handler.__name__ gives the function name for the error response.


#7Request ID Context VariableMedium
ContextVarrequest IDtracingmiddleware

Implement request ID middleware using ContextVar so each concurrent request has its own trace ID.

Solution:

import uuid

async def middleware(handler, *args, **kwargs):
req_id = f'req-{uuid.uuid4().hex[:8]}'
token = request_id.set(req_id)
try:
await log('Handling request')
result = await handler(*args, **kwargs)
await log('Request complete')
return result
finally:
request_id.reset(token)
import asyncio
from contextvars import ContextVar
import uuid

# Each request gets a unique ID for tracing through logs.
# ContextVar ensures each task/request has its own isolated ID.

request_id: ContextVar[str] = ContextVar('request_id', default='no-request')

def get_request_id() -> str:
  return request_id.get()

async def log(message):
  print(f'[{get_request_id()}] {message}')

# TODO: Implement a request middleware that:
# 1. Assigns a unique request_id to each incoming request
# 2. Calls the handler
# 3. Logs start and end with the request_id

async def middleware(handler, *args, **kwargs):
  pass

async def get_user_handler(user_id):
  await log(f'Fetching user {user_id}')
  await asyncio.sleep(0.05)
  return {'user_id': user_id}

async def main():
  results = await asyncio.gather(
      middleware(get_user_handler, 1),
      middleware(get_user_handler, 2),
      middleware(get_user_handler, 3),
  )
  print('All completed:', len(results) == 3)

asyncio.run(main())
Expected Output
[req-XXXX] Handling request
[req-XXXX] Fetching user 1
...
All completed: True
Hints

Hint 1: token = request_id.set(f'req-{uuid.uuid4().hex[:8]}') sets the per-request ID.

Hint 2: Call handler(*args, **kwargs) after setting the context variable.

Hint 3: Use try/finally with request_id.reset(token) to clean up.


#8Async Cache with TTLMedium
async cacheTTLasyncio.Lockmemoization

Implement an async TTL cache with stampede prevention, ensuring only one fetch per key at a time.

Solution:

import asyncio
import time

class AsyncTTLCache:
def __init__(self, ttl_seconds=1.0):
self.ttl = ttl_seconds
self._cache = {} # key -> (value, expire_at)
self._in_flight = {} # key -> asyncio.Future
self._lock = asyncio.Lock()

async def get_or_fetch(self, key, fetch_fn):
now = time.monotonic()
async with self._lock:
if key in self._cache:
value, expire_at = self._cache[key]
if now < expire_at:
return value
if key in self._in_flight:
future = self._in_flight[key]
# Release lock while waiting
self._lock.release()
try:
return await future
finally:
await self._lock.acquire()

future = asyncio.get_event_loop().create_future()
self._in_flight[key] = future

# Fetch outside lock
try:
value = await fetch_fn()
async with self._lock:
self._cache[key] = (value, time.monotonic() + self.ttl)
future.set_result(value)
del self._in_flight[key]
return value
except Exception as e:
async with self._lock:
if key in self._in_flight:
del self._in_flight[key]
future.set_exception(e)
raise
import asyncio
import time

# An async cache prevents duplicate concurrent fetches (cache stampede prevention).

class AsyncTTLCache:
  def __init__(self, ttl_seconds=1.0):
      self.ttl = ttl_seconds
      self._cache = {}
      self._in_flight = {}
      self._lock = asyncio.Lock()

  async def get_or_fetch(self, key, fetch_fn):
      # TODO: Return cached value if valid.
      # If another coroutine is already fetching this key, wait for it.
      # Otherwise, fetch it yourself and cache the result.
      pass

fetch_count = [0]

async def expensive_fetch(key):
  fetch_count[0] += 1
  await asyncio.sleep(0.1)
  return f'data_for_{key}'

async def main():
  cache = AsyncTTLCache(ttl_seconds=0.5)

  # 5 concurrent requests for the same key — should only fetch once
  results = await asyncio.gather(*[
      cache.get_or_fetch('user:1', lambda: expensive_fetch('user:1'))
      for _ in range(5)
  ])

  print(f'Fetch count: {fetch_count[0]} (should be 1)')
  print(f'All same value: {len(set(results)) == 1}')

  await asyncio.sleep(0.6)  # TTL expires

  await cache.get_or_fetch('user:1', lambda: expensive_fetch('user:1'))
  print(f'After TTL expiry, fetches: {fetch_count[0]} (should be 2)')

asyncio.run(main())
Expected Output
Fetch count: 1 (should be 1)
All same value: True
After TTL expiry, fetches: 2 (should be 2)
Hints

Hint 1: Check if key is in cache and not expired. If so, return cached value.

Hint 2: If key is 'in_flight', wait for the existing Future to complete (await future).

Hint 3: Otherwise, create a Future, store in _in_flight, fetch, set result, cache it.


#9Circuit Breaker PatternHard
circuit breakerstate machinehalf-openresilience

Implement the Circuit Breaker pattern with CLOSED, OPEN, and HALF_OPEN states for resilient async services.

Solution:

class CircuitBreaker:
CLOSED = 'CLOSED'
OPEN = 'OPEN'
HALF_OPEN = 'HALF_OPEN'

def __init__(self, failure_threshold=3, recovery_timeout=0.5):
self.state = self.CLOSED
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._opened_at = None

async def call(self, fn, *args, **kwargs):
if self.state == self.OPEN:
if time.monotonic() - self._opened_at >= self.recovery_timeout:
self.state = self.HALF_OPEN
else:
raise RuntimeError('Circuit OPEN')

try:
result = await fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise

def _on_success(self):
self.state = self.CLOSED
self.failure_count = 0

def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold or self.state == self.HALF_OPEN:
self.state = self.OPEN
self._opened_at = time.monotonic()
import asyncio
import time

# Circuit breaker: CLOSED (normal) -> OPEN (failing) -> HALF_OPEN (testing)
# CLOSED: pass requests through
# OPEN: fail fast, no requests sent
# HALF_OPEN: let one request through to test recovery

class CircuitBreaker:
  CLOSED = 'CLOSED'
  OPEN = 'OPEN'
  HALF_OPEN = 'HALF_OPEN'

  def __init__(self, failure_threshold=3, recovery_timeout=0.5):
      self.state = self.CLOSED
      self.failure_count = 0
      self.failure_threshold = failure_threshold
      self.recovery_timeout = recovery_timeout
      self._opened_at = None

  async def call(self, fn, *args, **kwargs):
      pass

  def _on_success(self):
      pass

  def _on_failure(self):
      pass

# Test
call_count = [0]
fail_until = [5]

async def unreliable_service():
  call_count[0] += 1
  if call_count[0] <= fail_until[0]:
      raise RuntimeError('Service down')
  return 'OK'

async def main():
  cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.3)
  results = []

  for i in range(12):
      try:
          result = await cb.call(unreliable_service)
          results.append(('ok', result, cb.state))
      except Exception as e:
          results.append(('error', str(e), cb.state))
      await asyncio.sleep(0.05)

  for r in results:
      print(r)

asyncio.run(main())
Expected Output
('error', 'Service down', 'CLOSED')
('error', 'Service down', 'CLOSED')
('error', 'Service down', 'OPEN')
('error', 'Circuit OPEN', 'OPEN')
...
Hints

Hint 1: In CLOSED: call fn; on failure increment count; if count >= threshold, transition to OPEN.

Hint 2: In OPEN: check if recovery_timeout has elapsed; if yes, transition to HALF_OPEN; if no, raise fast.

Hint 3: In HALF_OPEN: let one call through; on success -> CLOSED; on failure -> OPEN.


#10Async Middleware ChainHard
middleware chainASGIhandler pipelinecomposable

Compose an async middleware chain where each middleware wraps the next in sequence.

Solution:

import functools

def apply_middleware(handler, middlewares):
wrapped = handler
for middleware in reversed(middlewares):
current_wrapped = wrapped
current_middleware = middleware
async def make_wrapper(req, mw=current_middleware, inner=current_wrapped):
return await mw(inner, req)
wrapped = make_wrapper
return wrapped
import asyncio
import time

# ASGI middleware chains: each middleware wraps the next.
# request -> middleware1 -> middleware2 -> handler -> middleware2 -> middleware1 -> response

async def timing_middleware(handler, request):
  t0 = time.perf_counter()
  response = await handler(request)
  elapsed = (time.perf_counter() - t0) * 1000
  response['timing_ms'] = round(elapsed, 2)
  return response

async def auth_middleware(handler, request):
  if 'token' not in request:
      return {'error': 'unauthorized', 'status': 401}
  response = await handler(request)
  response['authenticated'] = True
  return response

async def logging_middleware(handler, request):
  print(f'  >> {request.get("path")} start')
  response = await handler(request)
  print(f'  << {request.get("path")} done')
  return response

# TODO: Implement apply_middleware(handler, middlewares) that
# composes the middleware chain so they execute in order.
# The last middleware in the list wraps the handler directly.

def apply_middleware(handler, middlewares):
  pass

async def user_handler(request):
  await asyncio.sleep(0.05)
  return {'user': 'Alice', 'status': 200}

async def main():
  middlewares = [timing_middleware, auth_middleware, logging_middleware]
  chained = apply_middleware(user_handler, middlewares)

  request = {'path': '/users/1', 'token': 'abc123'}
  response = await chained(request)
  print('Response:', response)

asyncio.run(main())
Expected Output
  >> /users/1 start
  << /users/1 done
Response: {'user': 'Alice', 'status': 200, 'authenticated': True, 'timing_ms': X.XX}
Hints

Hint 1: Build from the inside out: wrap handler with last middleware, then wrap that with second-to-last, etc.

Hint 2: Use functools.reduce or a loop: for mw in reversed(middlewares): wrapped = partial(mw, wrapped)

Hint 3: Each middleware receives (handler, request) and calls await handler(request).


#11Full Async Service — Integrated PipelineHard
full serviceintegrationrate limitingcachingcircuit breaker

Build a complete async service that integrates caching, rate limiting, timeouts, and metrics into a single get method.

Solution:

import asyncio
import time

class AsyncService:
def __init__(self):
self._cache = {}
self._metrics = {'hits': 0, 'misses': 0, 'timeouts': 0, 'total': 0}
self._sem = asyncio.Semaphore(5)

async def fetch(self, resource_id):
await asyncio.sleep(0.1)
if resource_id > 100:
raise ValueError(f'Resource {resource_id} not found')
return {'id': resource_id, 'data': f'content_{resource_id}'}

async def get(self, resource_id):
self._metrics['total'] += 1

# Check cache
if resource_id in self._cache:
value, expire_at = self._cache[resource_id]
if time.monotonic() < expire_at:
self._metrics['hits'] += 1
return value

self._metrics['misses'] += 1

# Rate limit + fetch with timeout
async with self._sem:
try:
result = await asyncio.wait_for(
self.fetch(resource_id), timeout=0.5
)
self._cache[resource_id] = (result, time.monotonic() + 1.0)
return result
except asyncio.TimeoutError:
self._metrics['timeouts'] += 1
raise

def get_metrics(self):
return dict(self._metrics)
import asyncio
import time

# Build a complete async service integrating:
# 1. Rate limiting (max 5 req/sec)
# 2. Caching (TTL 1s)
# 3. Timeout (0.5s per request)
# 4. Metrics collection

class AsyncService:
  def __init__(self):
      self._cache = {}
      self._metrics = {'hits': 0, 'misses': 0, 'timeouts': 0, 'total': 0}
      self._sem = asyncio.Semaphore(5)

  async def fetch(self, resource_id):
      # Simulate external API call
      await asyncio.sleep(0.1)
      if resource_id > 100:
          raise ValueError(f'Resource {resource_id} not found')
      return {'id': resource_id, 'data': f'content_{resource_id}'}

  async def get(self, resource_id):
      # TODO: Implement with caching, rate limiting, and timeout
      pass

  def get_metrics(self):
      return dict(self._metrics)

async def main():
  service = AsyncService()

  # Run 10 concurrent requests — some cached, some new
  ids = [1, 2, 1, 3, 2, 4, 1, 5, 6, 1]
  results = await asyncio.gather(
      *[service.get(rid) for rid in ids],
      return_exceptions=True
  )

  metrics = service.get_metrics()
  print(f'Total requests: {metrics["total"]}')
  print(f'Cache hits: {metrics["hits"]}')
  print(f'Cache misses: {metrics["misses"]}')
  successes = [r for r in results if isinstance(r, dict)]
  print(f'Successful: {len(successes)}')

asyncio.run(main())
Expected Output
Total requests: 10
Cache hits: 4
Cache misses: 6
Successful: 10
Hints

Hint 1: Check self._cache[resource_id] first — return if hit, increment metrics['hits'].

Hint 2: Use async with self._sem: to enforce rate limiting.

Hint 3: Wrap the fetch in asyncio.wait_for(self.fetch(resource_id), timeout=0.5).

© 2026 EngineersOfAI. All rights reserved.