Skip to main content

Python Production Async Architecture: Practice Problems & Exercises

Practice: Production Async Architecture

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1Graceful Shutdown with asyncio.EventEasy
graceful-shutdownasyncio.Eventsignal-handling

Implement a graceful shutdown pattern where a server processes in-flight requests before stopping.

import asyncio

async def handle_request(request_id, shutdown_event):
if shutdown_event.is_set():
return # refuse new work
await asyncio.sleep(0.01) # simulate request processing
return f"request_{request_id}_done"

async def server(shutdown_event):
print("Server running")
tasks = []
for i in range(5):
if not shutdown_event.is_set():
tasks.append(asyncio.create_task(handle_request(i, shutdown_event)))
results = await asyncio.gather(*tasks, return_exceptions=True)
print("In-flight requests drained")

async def main():
shutdown = asyncio.Event()
server_task = asyncio.create_task(server(shutdown))

await asyncio.sleep(0.02) # simulate uptime
print("Shutdown initiated")
shutdown.set()

await server_task
print("Server stopped")

asyncio.run(main())
Solution
import asyncio

async def handle_request(request_id, shutdown_event):
if shutdown_event.is_set():
return
await asyncio.sleep(0.01)
return f"request_{request_id}_done"

async def server(shutdown_event):
print("Server running")
tasks = []
for i in range(5):
if not shutdown_event.is_set():
tasks.append(asyncio.create_task(handle_request(i, shutdown_event)))
await asyncio.gather(*tasks, return_exceptions=True)
print("In-flight requests drained")

async def main():
shutdown = asyncio.Event()
server_task = asyncio.create_task(server(shutdown))

await asyncio.sleep(0.02)
print("Shutdown initiated")
shutdown.set()

await server_task
print("Server stopped")

asyncio.run(main())

Graceful shutdown pattern:

  • The asyncio.Event is a clean separation between "accept work" and "drain and stop" phases.
  • is_set() check at the start of each request handler prevents new work from starting.
  • asyncio.gather(*tasks) with return_exceptions=True awaits all in-flight work without aborting on individual failures.
  • In production, add a drain timeout: if in-flight work does not finish within N seconds, cancel all remaining tasks.
  • Signal handler (SIGTERM/SIGINT) calls shutdown.set() — the main coroutine awaits cleanup naturally.
Expected Output
Server running\nShutdown initiated\nIn-flight requests drained\nServer stopped
Hints

Hint 1: Use an asyncio.Event as a shutdown signal. Workers check it periodically or await it.

Hint 2: Allow in-flight work to complete before stopping the loop.

#2Health Check EndpointEasy
health-checkasynciolivenessreadiness

Implement async health check functions for liveness and readiness probes (Kubernetes-style).

import asyncio

class AppState:
def __init__(self):
self.ready = False
self.db_connected = False

async def liveness_check(state):
"""Returns healthy if the process is running."""
return {"status": "healthy"}

async def readiness_check(state):
"""Returns ready only if all dependencies are up."""
if state.ready and state.db_connected:
return {"status": "ready"}
return {"status": "not_ready", "reason": "initializing"}

async def main():
state = AppState()

# Simulate startup
await asyncio.sleep(0.01)
state.db_connected = True
state.ready = True

live = await liveness_check(state)
ready = await readiness_check(state)
print(f"liveness: {live['status']}")
print(f"readiness: {ready['status']}")

asyncio.run(main())
Solution
import asyncio

class AppState:
def __init__(self):
self.ready = False
self.db_connected = False

async def liveness_check(state):
return {"status": "healthy"}

async def readiness_check(state):
if state.ready and state.db_connected:
return {"status": "ready"}
return {"status": "not_ready", "reason": "initializing"}

async def main():
state = AppState()

await asyncio.sleep(0.01)
state.db_connected = True
state.ready = True

live = await liveness_check(state)
ready = await readiness_check(state)
print(f"liveness: {live['status']}")
print(f"readiness: {ready['status']}")

asyncio.run(main())

Liveness vs readiness:

  • Liveness: is the process running? Kubernetes restarts the pod if liveness fails. Should almost always return healthy unless in an unrecoverable state.
  • Readiness: can the pod serve traffic? Kubernetes removes the pod from the load balancer until readiness returns healthy. This allows graceful startup and shutdown without dropping requests.
  • Production checks: ping each downstream dependency (DB, cache, message queue) with a short timeout. Surface individual dependency status in the response body.
  • Add a timeout to each check: if the DB is hanging, the health check itself should not hang.
Expected Output
liveness: healthy\nreadiness: ready
Hints

Hint 1: Liveness checks whether the process is alive. Readiness checks whether it can serve traffic.

Hint 2: Return a dict with status and optional details.

#3Structured Async LoggingEasy
structured-loggingasynciologgingcontext

Implement structured logging that automatically includes request context using contextvars.

import asyncio
import contextvars
import time

request_id_var = contextvars.ContextVar("request_id", default="unknown")

def log(level, message, **kwargs):
request_id = request_id_var.get()
parts = [f"{level}", f"request_id={request_id}"]
parts.extend(f"{k}={v}" for k, v in kwargs.items())
parts.append(message)
print(" ".join(parts))

async def handle_login(user_id):
start = time.monotonic()
await asyncio.sleep(0.01) # simulate auth check
elapsed = (time.monotonic() - start) * 1000
log("INFO", "action=login", user_id=user_id, duration_ms=round(elapsed, 1))
log("INFO", "status=success")

async def handle_request(req_id, user_id):
token = request_id_var.set(req_id)
try:
await handle_login(user_id)
finally:
request_id_var.reset(token)

asyncio.run(handle_request("req-001", 42))
Solution
import asyncio
import contextvars
import time

request_id_var = contextvars.ContextVar("request_id", default="unknown")

def log(level, message, **kwargs):
request_id = request_id_var.get()
parts = [level, f"request_id={request_id}"]
parts.extend(f"{k}={v}" for k, v in kwargs.items())
parts.append(message)
print(" ".join(parts))

async def handle_login(user_id):
start = time.monotonic()
await asyncio.sleep(0.01)
elapsed = (time.monotonic() - start) * 1000
log("INFO", "action=login", user_id=user_id, duration_ms=round(elapsed, 1))
log("INFO", "status=success")

async def handle_request(req_id, user_id):
token = request_id_var.set(req_id)
try:
await handle_login(user_id)
finally:
request_id_var.reset(token)

asyncio.run(handle_request("req-001", 42))

ContextVar for async tracing:

  • ContextVar values are inherited by child coroutines and tasks — set once at the request entry point, available throughout the call tree.
  • var.set(value) returns a Token that allows resetting to the previous value with var.reset(token).
  • Unlike threading.local, ContextVar is safe in async code — each Task gets a copy of the context from when it was created.
  • Production: use Python's logging module with a Filter that adds context vars to every log record automatically.
  • Libraries like structlog integrate with contextvars for zero-config structured logging.
Expected Output
INFO request_id=req-001 user_id=42 action=login duration_ms=10.0\nINFO request_id=req-001 status=success
Hints

Hint 1: Use a contextvars.ContextVar to carry request context through async call stacks.

Hint 2: Include request_id in every log line for distributed tracing.


Medium

#4Async Middleware PipelineMedium
middlewarepipelineasyncdecorator-chain

Implement an async middleware pipeline where each middleware can inspect/modify the request and response.

import asyncio

async def auth_middleware(request, next_fn):
print("[auth] checking token")
response = await next_fn(request)
print("[auth] done")
return response

async def rate_limit_middleware(request, next_fn):
print("[rate_limit] checking rate")
response = await next_fn(request)
print("[rate_limit] done")
return response

async def handler(request):
print("[handler] processing request")
return "ok"

def build_pipeline(handler_fn, *middlewares):
"""Compose middlewares around a handler."""
async def pipeline(request):
async def run(index, req):
if index >= len(middlewares):
return await handler_fn(req)
mw = middlewares[index]
return await mw(req, lambda r: run(index + 1, r))
return await run(0, request)
return pipeline

async def main():
pipeline = build_pipeline(handler, auth_middleware, rate_limit_middleware)
response = await pipeline({"path": "/api/data"})
print(f"Response: {response}")

asyncio.run(main())
Solution
import asyncio

async def auth_middleware(request, next_fn):
print("[auth] checking token")
response = await next_fn(request)
print("[auth] done")
return response

async def rate_limit_middleware(request, next_fn):
print("[rate_limit] checking rate")
response = await next_fn(request)
print("[rate_limit] done")
return response

async def handler(request):
print("[handler] processing request")
return "ok"

def build_pipeline(handler_fn, *middlewares):
async def pipeline(request):
async def run(index, req):
if index >= len(middlewares):
return await handler_fn(req)
mw = middlewares[index]
return await mw(req, lambda r: run(index + 1, r))
return await run(0, request)
return pipeline

async def main():
pipeline = build_pipeline(handler, auth_middleware, rate_limit_middleware)
response = await pipeline({"path": "/api/data"})
print(f"Response: {response}")

asyncio.run(main())

Middleware pipeline design:

  • Each middleware receives (request, next_fn). Calling await next_fn(request) passes control to the next middleware.
  • Middlewares can run code before and after calling next_fn — enabling request inspection, response modification, logging, and error handling.
  • The recursive run(index, ...) pattern avoids mutating state while building the call chain.
  • This is how ASGI frameworks (Starlette, FastAPI) implement middleware stacks.
  • Exception propagation: if the handler raises, the exception propagates back through each middleware's await next_fn(request) call — each middleware can catch and handle it.
Expected Output
[auth] checking token\n[rate_limit] checking rate\n[handler] processing request\n[rate_limit] done\n[auth] done\nResponse: ok
Hints

Hint 1: Each middleware is an async function that receives (request, next_fn) and calls next_fn(request) to proceed.

Hint 2: Build the pipeline by composing middlewares in order.

#5Backpressure with asyncio.QueueMedium
backpressureasyncio.Queueproducerconsumerflow-control

Demonstrate backpressure: a fast producer is slowed down by a slow consumer through a bounded queue.

import asyncio
import time

async def fast_producer(queue, n):
start = time.monotonic()
for i in range(n):
await queue.put(i) # blocks when queue is full — natural backpressure
elapsed = time.monotonic() - start
print(f"Producer done in {elapsed:.2f}s (slowed by backpressure)")

async def slow_consumer(queue, n):
processed = 0
while processed < n:
item = await queue.get()
await asyncio.sleep(0.01) # slow processing
queue.task_done()
processed += 1
print("All items processed")

async def main():
queue = asyncio.Queue(maxsize=5) # small buffer = more backpressure
await asyncio.gather(
fast_producer(queue, 20),
slow_consumer(queue, 20),
)

asyncio.run(main())
Solution
import asyncio
import time

async def fast_producer(queue, n):
start = time.monotonic()
for i in range(n):
await queue.put(i)
elapsed = time.monotonic() - start
print(f"Producer done in {elapsed:.2f}s (slowed by backpressure)")

async def slow_consumer(queue, n):
processed = 0
while processed < n:
item = await queue.get()
await asyncio.sleep(0.01)
queue.task_done()
processed += 1
print("All items processed")

async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
fast_producer(queue, 20),
slow_consumer(queue, 20),
)

asyncio.run(main())

Backpressure mechanism:

  • The bounded queue (maxsize=5) limits how far ahead the producer can run.
  • queue.put() suspends the producer when the queue is full, giving the consumer time to drain items.
  • Without backpressure (unbounded queue), a fast producer can fill all available memory before the consumer catches up.
  • Queue size is a tuning parameter: larger buffers reduce producer waiting but increase memory usage and latency.
  • In production: monitor queue.qsize() as a metric. A consistently full queue signals the consumer needs scaling.
Expected Output
Producer slowed by backpressure\nAll items processed
Hints

Hint 1: A bounded Queue applies backpressure automatically — put() blocks when the queue is full.

Hint 2: Monitor queue size to detect when the consumer is struggling to keep up.

#6Retry with Exponential Backoff and JitterMedium
retryexponential-backoffjitterresilience

Write a retry_with_backoff decorator for async functions that adds exponential backoff with jitter.

import asyncio
import random
import functools

def retry_with_backoff(max_attempts=3, base=0.01, max_delay=1.0, retryable=(Exception,)):
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except retryable as e:
if attempt == max_attempts:
raise
delay = min(base * (2 ** (attempt - 1)) + random.uniform(0, base), max_delay)
print(f"Attempt {attempt} failed")
await asyncio.sleep(delay)
return wrapper
return decorator

attempt_count = [0]

@retry_with_backoff(max_attempts=3, base=0.001)
async def flaky_service():
attempt_count[0] += 1
if attempt_count[0] < 3:
raise ConnectionError("transient failure")
return "ok"

async def main():
result = await flaky_service()
print(f"Attempt {attempt_count[0]} succeeded")
print(f"Result: {result}")

asyncio.run(main())
Solution
import asyncio
import random
import functools

def retry_with_backoff(max_attempts=3, base=0.01, max_delay=1.0, retryable=(Exception,)):
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except retryable as e:
if attempt == max_attempts:
raise
delay = min(base * (2 ** (attempt - 1)) + random.uniform(0, base), max_delay)
print(f"Attempt {attempt} failed")
await asyncio.sleep(delay)
return wrapper
return decorator

attempt_count = [0]

@retry_with_backoff(max_attempts=3, base=0.001)
async def flaky_service():
attempt_count[0] += 1
if attempt_count[0] < 3:
raise ConnectionError("transient failure")
return "ok"

async def main():
result = await flaky_service()
print(f"Attempt {attempt_count[0]} succeeded")
print(f"Result: {result}")

asyncio.run(main())

Jitter and backoff design:

  • base * 2^attempt gives exponential growth: 0.01s, 0.02s, 0.04s, ...
  • + random.uniform(0, base) adds jitter to desynchronize retries from multiple clients hitting the same service simultaneously ("thundering herd" prevention).
  • min(..., max_delay) caps the delay to avoid waiting indefinitely.
  • retryable=(Exception,) is the default — in production, specify retryable=(ConnectionError, TimeoutError) to avoid retrying programming errors.
  • The decorator pattern preserves the function signature via @functools.wraps.
Expected Output
Attempt 1 failed\nAttempt 2 failed\nAttempt 3 succeeded\nResult: ok
Hints

Hint 1: Add random jitter to prevent thundering herd: sleep = base * 2^attempt + random(0, jitter_max).

Hint 2: Only retry on specific transient errors, not on permanent failures like authentication errors.

#7pytest-asyncio Test PatternsMedium
pytest-asynciotestingasync-fixturesmocking

Write pytest-asyncio tests for an async service, including fixtures and mocking async dependencies.

# test_service.py — run with: pytest test_service.py -v
import asyncio
import pytest
from unittest.mock import AsyncMock, patch

class UserService:
def __init__(self, db):
self._db = db

async def get_user(self, user_id):
row = await self._db.fetch_one(f"SELECT * FROM users WHERE id={user_id}")
if row is None:
raise ValueError(f"User {user_id} not found")
return row

# Fixture: mock DB
@pytest.fixture
async def mock_db():
db = AsyncMock()
db.fetch_one.return_value = {"id": 1, "name": "Alice"}
yield db

@pytest.mark.asyncio
async def test_get_user_success(mock_db):
service = UserService(mock_db)
user = await service.get_user(1)
assert user["name"] == "Alice"
mock_db.fetch_one.assert_called_once()

@pytest.mark.asyncio
async def test_get_user_not_found(mock_db):
mock_db.fetch_one.return_value = None
service = UserService(mock_db)
with pytest.raises(ValueError, match="User 99 not found"):
await service.get_user(99)

# Running inline (not via pytest) for demonstration:
async def run_tests():
db_mock = AsyncMock()
db_mock.fetch_one.return_value = {"id": 1, "name": "Alice"}
service = UserService(db_mock)

user = await service.get_user(1)
assert user["name"] == "Alice"

db_mock.fetch_one.return_value = None
try:
await service.get_user(99)
assert False, "Should have raised"
except ValueError as e:
assert "99" in str(e)

print("All tests pass")

asyncio.run(run_tests())
Solution
import asyncio
from unittest.mock import AsyncMock

class UserService:
def __init__(self, db):
self._db = db

async def get_user(self, user_id):
row = await self._db.fetch_one(f"SELECT * FROM users WHERE id={user_id}")
if row is None:
raise ValueError(f"User {user_id} not found")
return row

async def run_tests():
db_mock = AsyncMock()
db_mock.fetch_one.return_value = {"id": 1, "name": "Alice"}
service = UserService(db_mock)

user = await service.get_user(1)
assert user["name"] == "Alice"
db_mock.fetch_one.assert_called_once()

db_mock.fetch_one.return_value = None
try:
await service.get_user(99)
assert False
except ValueError as e:
assert "99" in str(e)

print("All tests pass")

asyncio.run(run_tests())

Async testing patterns:

  • AsyncMock() is the correct mock class for async functions — it returns a coroutine when called, not a plain value.
  • mock.method.return_value = value configures what the mock returns when awaited.
  • mock.method.side_effect = [val1, val2, Exception()] sequences multiple return values or exceptions.
  • For pytest: @pytest.mark.asyncio marks an async test. The asyncio fixture in pytest-asyncio provides the event loop.
  • pytest-asyncio mode: set asyncio_mode = "auto" in pytest.ini to avoid marking every test.
  • Async fixtures with yield handle setup/teardown cleanly — the yield point is the test execution.
Expected Output
All tests pass
Hints

Hint 1: Mark async test functions with @pytest.mark.asyncio.

Hint 2: Async fixtures use yield for setup/teardown. Mock async functions with AsyncMock.


Hard

#8Async Worker Pool with Graceful DrainHard
worker-poolgraceful-drainasyncio.Queueshutdown

Build an async worker pool that processes a job queue and drains gracefully when signaled to stop.

import asyncio

class WorkerPool:
def __init__(self, num_workers):
self._queue = asyncio.Queue()
self._workers = []
self._num_workers = num_workers

async def start(self):
for i in range(self._num_workers):
self._workers.append(asyncio.create_task(self._worker(i)))

async def submit(self, job):
await self._queue.put(job)

async def drain_and_stop(self):
# Send sentinel None for each worker to stop
for _ in range(self._num_workers):
await self._queue.put(None)
await asyncio.gather(*self._workers)

async def _worker(self, worker_id):
while True:
job = await self._queue.get()
if job is None:
self._queue.task_done()
break
await job()
self._queue.task_done()

async def main():
pool = WorkerPool(num_workers=3)
await pool.start()

completed = []
async def make_job(job_id):
async def job():
await asyncio.sleep(0.005)
completed.append(job_id)
return job

for i in range(10):
await pool.submit(await make_job(i))

await pool.drain_and_stop()
print(f"All {len(completed)} jobs processed")
print("Pool drained gracefully")

asyncio.run(main())
Solution
import asyncio

class WorkerPool:
def __init__(self, num_workers):
self._queue = asyncio.Queue()
self._workers = []
self._num_workers = num_workers

async def start(self):
for i in range(self._num_workers):
self._workers.append(asyncio.create_task(self._worker(i)))

async def submit(self, job):
await self._queue.put(job)

async def drain_and_stop(self):
for _ in range(self._num_workers):
await self._queue.put(None)
await asyncio.gather(*self._workers)

async def _worker(self, worker_id):
while True:
job = await self._queue.get()
if job is None:
self._queue.task_done()
break
await job()
self._queue.task_done()

async def main():
pool = WorkerPool(num_workers=3)
await pool.start()

completed = []
async def make_job(job_id):
async def job():
await asyncio.sleep(0.005)
completed.append(job_id)
return job

for i in range(10):
await pool.submit(await make_job(i))

await pool.drain_and_stop()
print(f"All {len(completed)} jobs processed")
print("Pool drained gracefully")

asyncio.run(main())

Worker pool design:

  • One asyncio.Queue serves as the shared job buffer.
  • N workers run concurrently, each pulling from the queue in a loop.
  • Sentinel None values stop workers cleanly — one sentinel per worker ensures all workers stop.
  • queue.task_done() + queue.join() provides an alternative drain pattern if jobs are submitted by external code.
  • Production extensions: job priority (use PriorityQueue), job timeout (wrap each job in asyncio.wait_for), dead letter queue for failed jobs, dynamic worker scaling.
Expected Output
All 10 jobs processed\nPool drained gracefully
Hints

Hint 1: Use a queue for jobs and N worker tasks consuming from it. Graceful drain: send N sentinel values to stop workers.

Hint 2: Wait for queue.join() to ensure all items are task_done before declaring completion.

#9Distributed Locking with TTLHard
distributed-lockTTLasyncio.Lockexpiry

Implement an in-memory distributed lock with TTL expiry for use across multiple concurrent "clients".

import asyncio
import time

class TTLLock:
def __init__(self, ttl_seconds):
self._ttl = ttl_seconds
self._lock = asyncio.Lock()
self._holder = None
self._acquired_at = None

def _is_expired(self):
if self._acquired_at is None:
return False
return time.monotonic() - self._acquired_at > self._ttl

async def acquire(self, client_id):
while True:
async with self._lock:
if self._holder is None or self._is_expired():
self._holder = client_id
self._acquired_at = time.monotonic()
return True
await asyncio.sleep(0.005)

async def release(self, client_id):
async with self._lock:
if self._holder == client_id:
self._holder = None
self._acquired_at = None

async def main():
lock = TTLLock(ttl_seconds=0.05)

async def client(client_id, sleep_before=0):
await asyncio.sleep(sleep_before)
await lock.acquire(client_id)
print(f"Lock acquired by {client_id}")
await asyncio.sleep(0.03) # hold for 30ms
await lock.release(client_id)

# client_1 holds lock; client_2 waits for TTL expiry (50ms)
await asyncio.gather(
client("client_1"),
client("client_2", sleep_before=0.01),
)
print("Both clients processed")

asyncio.run(main())
Solution
import asyncio
import time

class TTLLock:
def __init__(self, ttl_seconds):
self._ttl = ttl_seconds
self._lock = asyncio.Lock()
self._holder = None
self._acquired_at = None

def _is_expired(self):
if self._acquired_at is None:
return False
return time.monotonic() - self._acquired_at > self._ttl

async def acquire(self, client_id):
while True:
async with self._lock:
if self._holder is None or self._is_expired():
self._holder = client_id
self._acquired_at = time.monotonic()
return True
await asyncio.sleep(0.005)

async def release(self, client_id):
async with self._lock:
if self._holder == client_id:
self._holder = None
self._acquired_at = None

async def main():
lock = TTLLock(ttl_seconds=0.05)

async def client(client_id, sleep_before=0):
await asyncio.sleep(sleep_before)
await lock.acquire(client_id)
print(f"Lock acquired by {client_id}")
await asyncio.sleep(0.03)
await lock.release(client_id)

await asyncio.gather(
client("client_1"),
client("client_2", sleep_before=0.01),
)
print("Both clients processed")

asyncio.run(main())

TTL lock design:

  • TTL prevents lock starvation: if the holder crashes without releasing, the lock auto-expires.
  • The asyncio.Lock protects the lock state itself — preventing TOCTOU races in acquire().
  • The polling while True / asyncio.sleep(0.005) is a simple approach. Production: use asyncio.Condition.wait_for() to avoid busy-polling.
  • Real distributed locks (Redis SETNX + TTL, ZooKeeper ephemeral nodes) are similar but with network-based coordination.
  • Production concern: if a client holds the lock and the operation takes longer than TTL, another client may acquire it — the original client's release becomes a no-op.
Expected Output
Lock acquired by client_1\nLock expired — acquired by client_2\nBoth clients processed
Hints

Hint 1: Track lock acquisition time. If current time minus acquisition time exceeds TTL, the lock is considered expired.

Hint 2: Implement a background task that checks for and expires stale locks.

#10Async Event BusHard
event-buspub-subasyncioobservers

Implement an async event bus supporting pub-sub with multiple handlers per event and concurrent dispatch.

import asyncio
from collections import defaultdict

class AsyncEventBus:
def __init__(self):
self._handlers = defaultdict(list)

def subscribe(self, event_type, handler):
self._handlers[event_type].append(handler)

def unsubscribe(self, event_type, handler):
self._handlers[event_type].remove(handler)

async def publish(self, event_type, payload):
handlers = self._handlers.get(event_type, []) + self._handlers.get("*", [])
if handlers:
await asyncio.gather(*[h(event_type, payload) for h in handlers])

async def main():
bus = AsyncEventBus()

async def user_handler_1(event, payload):
print(f"user.created handler 1: {payload['name']}")

async def user_handler_2(event, payload):
print(f"user.created handler 2: {payload['name']}")

async def order_handler(event, payload):
print(f"order.placed handler: {payload['order_id']}")

bus.subscribe("user.created", user_handler_1)
bus.subscribe("user.created", user_handler_2)
bus.subscribe("order.placed", order_handler)

await bus.publish("user.created", {"name": "alice"})
await bus.publish("order.placed", {"order_id": "order_99"})

asyncio.run(main())
Solution
import asyncio
from collections import defaultdict

class AsyncEventBus:
def __init__(self):
self._handlers = defaultdict(list)

def subscribe(self, event_type, handler):
self._handlers[event_type].append(handler)

def unsubscribe(self, event_type, handler):
self._handlers[event_type].remove(handler)

async def publish(self, event_type, payload):
handlers = self._handlers.get(event_type, []) + self._handlers.get("*", [])
if handlers:
await asyncio.gather(*[h(event_type, payload) for h in handlers])

async def main():
bus = AsyncEventBus()

async def user_handler_1(event, payload):
print(f"user.created handler 1: {payload['name']}")

async def user_handler_2(event, payload):
print(f"user.created handler 2: {payload['name']}")

async def order_handler(event, payload):
print(f"order.placed handler: {payload['order_id']}")

bus.subscribe("user.created", user_handler_1)
bus.subscribe("user.created", user_handler_2)
bus.subscribe("order.placed", order_handler)

await bus.publish("user.created", {"name": "alice"})
await bus.publish("order.placed", {"order_id": "order_99"})

asyncio.run(main())

Async event bus design:

  • asyncio.gather(*handlers) dispatches all handlers concurrently — a slow handler doesn't block others.
  • Wildcard "*" handlers receive every event — useful for logging and monitoring.
  • return_exceptions=True in gather makes the bus resilient: a handler that raises doesn't abort other handlers.
  • Production extensions: async handler queuing per subscriber (decouple publish speed from processing speed), dead letter queue for failed handlers, event persistence, and type-safe event schemas using dataclasses or Pydantic.
  • This pattern is the foundation of CQRS (Command Query Responsibility Segregation) and event-driven microservices.
Expected Output
user.created handler 1: alice\nuser.created handler 2: alice\norder.placed handler: order_99
Hints

Hint 1: Store handlers per event type in a dict. Publishing fires all handlers concurrently with asyncio.gather.

Hint 2: Support wildcard subscriptions with a special "*" key.

#11Full Async Service with Health, Metrics, and ShutdownHard
production-servicehealth-checkmetricsgraceful-shutdownasync

Build a minimal production-grade async service that combines request processing, health checks, metrics, and graceful shutdown.

import asyncio
import time

class Service:
def __init__(self):
self.shutdown = asyncio.Event()
self.metrics = {"requests": 0, "errors": 0, "uptime_start": time.monotonic()}
self._tasks = []

async def handle_request(self, request_id):
await asyncio.sleep(0.005)
self.metrics["requests"] += 1

async def health_check(self):
return {
"status": "healthy",
"requests": self.metrics["requests"],
"uptime": round(time.monotonic() - self.metrics["uptime_start"], 2),
}

async def run(self):
for i in range(5):
if self.shutdown.is_set():
break
await self.handle_request(i)
print(f"Processed {self.metrics['requests']} requests")

async def drain(self):
for t in self._tasks:
if not t.done():
await t
print("Graceful shutdown complete")

async def main():
svc = Service()
print("Service started")

await svc.run()

health = await svc.health_check()
print(f"Health: {health['status']}")

svc.shutdown.set()
await svc.drain()

asyncio.run(main())
Solution
import asyncio
import time

class Service:
def __init__(self):
self.shutdown = asyncio.Event()
self.metrics = {"requests": 0, "errors": 0, "uptime_start": time.monotonic()}
self._tasks = []

async def handle_request(self, request_id):
await asyncio.sleep(0.005)
self.metrics["requests"] += 1

async def health_check(self):
return {
"status": "healthy",
"requests": self.metrics["requests"],
"uptime": round(time.monotonic() - self.metrics["uptime_start"], 2),
}

async def run(self):
for i in range(5):
if self.shutdown.is_set():
break
await self.handle_request(i)
print(f"Processed {self.metrics['requests']} requests")

async def drain(self):
for t in self._tasks:
if not t.done():
await t
print("Graceful shutdown complete")

async def main():
svc = Service()
print("Service started")

await svc.run()

health = await svc.health_check()
print(f"Health: {health['status']}")

svc.shutdown.set()
await svc.drain()

asyncio.run(main())

Production service architecture:

  • asyncio.Event for shutdown coordination — the service checks shutdown.is_set() before accepting new work.
  • Metrics dict tracks key operational counters — in production, export these to Prometheus or StatsD.
  • Health check returns both status and operational metrics — useful for debugging and alerting.
  • Drain pattern: await all in-flight tasks before declaring shutdown complete.
  • Extension to a real service: add signal handlers (SIGTERM → shutdown.set()), a timeout on drain, connection pool lifecycle management, and async startup hooks for resource initialization.
Expected Output
Service started\nProcessed 5 requests\nHealth: healthy\nGraceful shutdown complete
Hints

Hint 1: Combine all production patterns: worker pool, health checks, metrics counters, and graceful shutdown.

Hint 2: Use asyncio.gather to run the server and a background metrics reporter concurrently.

© 2026 EngineersOfAI. All rights reserved.