Python Production Async Architecture: Practice Problems & Exercises
Practice: Production Async Architecture
← Back to lessonEasy
Implement a graceful shutdown pattern where a server processes in-flight requests before stopping.
import asyncio
async def handle_request(request_id, shutdown_event):
if shutdown_event.is_set():
return # refuse new work
await asyncio.sleep(0.01) # simulate request processing
return f"request_{request_id}_done"
async def server(shutdown_event):
print("Server running")
tasks = []
for i in range(5):
if not shutdown_event.is_set():
tasks.append(asyncio.create_task(handle_request(i, shutdown_event)))
results = await asyncio.gather(*tasks, return_exceptions=True)
print("In-flight requests drained")
async def main():
shutdown = asyncio.Event()
server_task = asyncio.create_task(server(shutdown))
await asyncio.sleep(0.02) # simulate uptime
print("Shutdown initiated")
shutdown.set()
await server_task
print("Server stopped")
asyncio.run(main())
Solution
import asyncio
async def handle_request(request_id, shutdown_event):
if shutdown_event.is_set():
return
await asyncio.sleep(0.01)
return f"request_{request_id}_done"
async def server(shutdown_event):
print("Server running")
tasks = []
for i in range(5):
if not shutdown_event.is_set():
tasks.append(asyncio.create_task(handle_request(i, shutdown_event)))
await asyncio.gather(*tasks, return_exceptions=True)
print("In-flight requests drained")
async def main():
shutdown = asyncio.Event()
server_task = asyncio.create_task(server(shutdown))
await asyncio.sleep(0.02)
print("Shutdown initiated")
shutdown.set()
await server_task
print("Server stopped")
asyncio.run(main())
Graceful shutdown pattern:
- The
asyncio.Eventis a clean separation between "accept work" and "drain and stop" phases. is_set()check at the start of each request handler prevents new work from starting.asyncio.gather(*tasks)withreturn_exceptions=Trueawaits all in-flight work without aborting on individual failures.- In production, add a drain timeout: if in-flight work does not finish within N seconds, cancel all remaining tasks.
- Signal handler (SIGTERM/SIGINT) calls
shutdown.set()— the main coroutine awaits cleanup naturally.
Expected Output
Server running\nShutdown initiated\nIn-flight requests drained\nServer stoppedHints
Hint 1: Use an asyncio.Event as a shutdown signal. Workers check it periodically or await it.
Hint 2: Allow in-flight work to complete before stopping the loop.
Implement async health check functions for liveness and readiness probes (Kubernetes-style).
import asyncio
class AppState:
def __init__(self):
self.ready = False
self.db_connected = False
async def liveness_check(state):
"""Returns healthy if the process is running."""
return {"status": "healthy"}
async def readiness_check(state):
"""Returns ready only if all dependencies are up."""
if state.ready and state.db_connected:
return {"status": "ready"}
return {"status": "not_ready", "reason": "initializing"}
async def main():
state = AppState()
# Simulate startup
await asyncio.sleep(0.01)
state.db_connected = True
state.ready = True
live = await liveness_check(state)
ready = await readiness_check(state)
print(f"liveness: {live['status']}")
print(f"readiness: {ready['status']}")
asyncio.run(main())
Solution
import asyncio
class AppState:
def __init__(self):
self.ready = False
self.db_connected = False
async def liveness_check(state):
return {"status": "healthy"}
async def readiness_check(state):
if state.ready and state.db_connected:
return {"status": "ready"}
return {"status": "not_ready", "reason": "initializing"}
async def main():
state = AppState()
await asyncio.sleep(0.01)
state.db_connected = True
state.ready = True
live = await liveness_check(state)
ready = await readiness_check(state)
print(f"liveness: {live['status']}")
print(f"readiness: {ready['status']}")
asyncio.run(main())
Liveness vs readiness:
- Liveness: is the process running? Kubernetes restarts the pod if liveness fails. Should almost always return healthy unless in an unrecoverable state.
- Readiness: can the pod serve traffic? Kubernetes removes the pod from the load balancer until readiness returns healthy. This allows graceful startup and shutdown without dropping requests.
- Production checks: ping each downstream dependency (DB, cache, message queue) with a short timeout. Surface individual dependency status in the response body.
- Add a timeout to each check: if the DB is hanging, the health check itself should not hang.
Expected Output
liveness: healthy\nreadiness: readyHints
Hint 1: Liveness checks whether the process is alive. Readiness checks whether it can serve traffic.
Hint 2: Return a dict with status and optional details.
Implement structured logging that automatically includes request context using contextvars.
import asyncio
import contextvars
import time
request_id_var = contextvars.ContextVar("request_id", default="unknown")
def log(level, message, **kwargs):
request_id = request_id_var.get()
parts = [f"{level}", f"request_id={request_id}"]
parts.extend(f"{k}={v}" for k, v in kwargs.items())
parts.append(message)
print(" ".join(parts))
async def handle_login(user_id):
start = time.monotonic()
await asyncio.sleep(0.01) # simulate auth check
elapsed = (time.monotonic() - start) * 1000
log("INFO", "action=login", user_id=user_id, duration_ms=round(elapsed, 1))
log("INFO", "status=success")
async def handle_request(req_id, user_id):
token = request_id_var.set(req_id)
try:
await handle_login(user_id)
finally:
request_id_var.reset(token)
asyncio.run(handle_request("req-001", 42))
Solution
import asyncio
import contextvars
import time
request_id_var = contextvars.ContextVar("request_id", default="unknown")
def log(level, message, **kwargs):
request_id = request_id_var.get()
parts = [level, f"request_id={request_id}"]
parts.extend(f"{k}={v}" for k, v in kwargs.items())
parts.append(message)
print(" ".join(parts))
async def handle_login(user_id):
start = time.monotonic()
await asyncio.sleep(0.01)
elapsed = (time.monotonic() - start) * 1000
log("INFO", "action=login", user_id=user_id, duration_ms=round(elapsed, 1))
log("INFO", "status=success")
async def handle_request(req_id, user_id):
token = request_id_var.set(req_id)
try:
await handle_login(user_id)
finally:
request_id_var.reset(token)
asyncio.run(handle_request("req-001", 42))
ContextVar for async tracing:
ContextVarvalues are inherited by child coroutines and tasks — set once at the request entry point, available throughout the call tree.var.set(value)returns aTokenthat allows resetting to the previous value withvar.reset(token).- Unlike
threading.local,ContextVaris safe in async code — each Task gets a copy of the context from when it was created. - Production: use Python's
loggingmodule with aFilterthat adds context vars to every log record automatically. - Libraries like
structlogintegrate withcontextvarsfor zero-config structured logging.
Expected Output
INFO request_id=req-001 user_id=42 action=login duration_ms=10.0\nINFO request_id=req-001 status=successHints
Hint 1: Use a contextvars.ContextVar to carry request context through async call stacks.
Hint 2: Include request_id in every log line for distributed tracing.
Medium
Implement an async middleware pipeline where each middleware can inspect/modify the request and response.
import asyncio
async def auth_middleware(request, next_fn):
print("[auth] checking token")
response = await next_fn(request)
print("[auth] done")
return response
async def rate_limit_middleware(request, next_fn):
print("[rate_limit] checking rate")
response = await next_fn(request)
print("[rate_limit] done")
return response
async def handler(request):
print("[handler] processing request")
return "ok"
def build_pipeline(handler_fn, *middlewares):
"""Compose middlewares around a handler."""
async def pipeline(request):
async def run(index, req):
if index >= len(middlewares):
return await handler_fn(req)
mw = middlewares[index]
return await mw(req, lambda r: run(index + 1, r))
return await run(0, request)
return pipeline
async def main():
pipeline = build_pipeline(handler, auth_middleware, rate_limit_middleware)
response = await pipeline({"path": "/api/data"})
print(f"Response: {response}")
asyncio.run(main())
Solution
import asyncio
async def auth_middleware(request, next_fn):
print("[auth] checking token")
response = await next_fn(request)
print("[auth] done")
return response
async def rate_limit_middleware(request, next_fn):
print("[rate_limit] checking rate")
response = await next_fn(request)
print("[rate_limit] done")
return response
async def handler(request):
print("[handler] processing request")
return "ok"
def build_pipeline(handler_fn, *middlewares):
async def pipeline(request):
async def run(index, req):
if index >= len(middlewares):
return await handler_fn(req)
mw = middlewares[index]
return await mw(req, lambda r: run(index + 1, r))
return await run(0, request)
return pipeline
async def main():
pipeline = build_pipeline(handler, auth_middleware, rate_limit_middleware)
response = await pipeline({"path": "/api/data"})
print(f"Response: {response}")
asyncio.run(main())
Middleware pipeline design:
- Each middleware receives
(request, next_fn). Callingawait next_fn(request)passes control to the next middleware. - Middlewares can run code before and after calling
next_fn— enabling request inspection, response modification, logging, and error handling. - The recursive
run(index, ...)pattern avoids mutating state while building the call chain. - This is how ASGI frameworks (Starlette, FastAPI) implement middleware stacks.
- Exception propagation: if the handler raises, the exception propagates back through each middleware's
await next_fn(request)call — each middleware can catch and handle it.
Expected Output
[auth] checking token\n[rate_limit] checking rate\n[handler] processing request\n[rate_limit] done\n[auth] done\nResponse: okHints
Hint 1: Each middleware is an async function that receives (request, next_fn) and calls next_fn(request) to proceed.
Hint 2: Build the pipeline by composing middlewares in order.
Demonstrate backpressure: a fast producer is slowed down by a slow consumer through a bounded queue.
import asyncio
import time
async def fast_producer(queue, n):
start = time.monotonic()
for i in range(n):
await queue.put(i) # blocks when queue is full — natural backpressure
elapsed = time.monotonic() - start
print(f"Producer done in {elapsed:.2f}s (slowed by backpressure)")
async def slow_consumer(queue, n):
processed = 0
while processed < n:
item = await queue.get()
await asyncio.sleep(0.01) # slow processing
queue.task_done()
processed += 1
print("All items processed")
async def main():
queue = asyncio.Queue(maxsize=5) # small buffer = more backpressure
await asyncio.gather(
fast_producer(queue, 20),
slow_consumer(queue, 20),
)
asyncio.run(main())
Solution
import asyncio
import time
async def fast_producer(queue, n):
start = time.monotonic()
for i in range(n):
await queue.put(i)
elapsed = time.monotonic() - start
print(f"Producer done in {elapsed:.2f}s (slowed by backpressure)")
async def slow_consumer(queue, n):
processed = 0
while processed < n:
item = await queue.get()
await asyncio.sleep(0.01)
queue.task_done()
processed += 1
print("All items processed")
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
fast_producer(queue, 20),
slow_consumer(queue, 20),
)
asyncio.run(main())
Backpressure mechanism:
- The bounded queue (
maxsize=5) limits how far ahead the producer can run. queue.put()suspends the producer when the queue is full, giving the consumer time to drain items.- Without backpressure (unbounded queue), a fast producer can fill all available memory before the consumer catches up.
- Queue size is a tuning parameter: larger buffers reduce producer waiting but increase memory usage and latency.
- In production: monitor
queue.qsize()as a metric. A consistently full queue signals the consumer needs scaling.
Expected Output
Producer slowed by backpressure\nAll items processedHints
Hint 1: A bounded Queue applies backpressure automatically — put() blocks when the queue is full.
Hint 2: Monitor queue size to detect when the consumer is struggling to keep up.
Write a retry_with_backoff decorator for async functions that adds exponential backoff with jitter.
import asyncio
import random
import functools
def retry_with_backoff(max_attempts=3, base=0.01, max_delay=1.0, retryable=(Exception,)):
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except retryable as e:
if attempt == max_attempts:
raise
delay = min(base * (2 ** (attempt - 1)) + random.uniform(0, base), max_delay)
print(f"Attempt {attempt} failed")
await asyncio.sleep(delay)
return wrapper
return decorator
attempt_count = [0]
@retry_with_backoff(max_attempts=3, base=0.001)
async def flaky_service():
attempt_count[0] += 1
if attempt_count[0] < 3:
raise ConnectionError("transient failure")
return "ok"
async def main():
result = await flaky_service()
print(f"Attempt {attempt_count[0]} succeeded")
print(f"Result: {result}")
asyncio.run(main())
Solution
import asyncio
import random
import functools
def retry_with_backoff(max_attempts=3, base=0.01, max_delay=1.0, retryable=(Exception,)):
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except retryable as e:
if attempt == max_attempts:
raise
delay = min(base * (2 ** (attempt - 1)) + random.uniform(0, base), max_delay)
print(f"Attempt {attempt} failed")
await asyncio.sleep(delay)
return wrapper
return decorator
attempt_count = [0]
@retry_with_backoff(max_attempts=3, base=0.001)
async def flaky_service():
attempt_count[0] += 1
if attempt_count[0] < 3:
raise ConnectionError("transient failure")
return "ok"
async def main():
result = await flaky_service()
print(f"Attempt {attempt_count[0]} succeeded")
print(f"Result: {result}")
asyncio.run(main())
Jitter and backoff design:
base * 2^attemptgives exponential growth: 0.01s, 0.02s, 0.04s, ...+ random.uniform(0, base)adds jitter to desynchronize retries from multiple clients hitting the same service simultaneously ("thundering herd" prevention).min(..., max_delay)caps the delay to avoid waiting indefinitely.retryable=(Exception,)is the default — in production, specifyretryable=(ConnectionError, TimeoutError)to avoid retrying programming errors.- The decorator pattern preserves the function signature via
@functools.wraps.
Expected Output
Attempt 1 failed\nAttempt 2 failed\nAttempt 3 succeeded\nResult: okHints
Hint 1: Add random jitter to prevent thundering herd: sleep = base * 2^attempt + random(0, jitter_max).
Hint 2: Only retry on specific transient errors, not on permanent failures like authentication errors.
Write pytest-asyncio tests for an async service, including fixtures and mocking async dependencies.
# test_service.py — run with: pytest test_service.py -v
import asyncio
import pytest
from unittest.mock import AsyncMock, patch
class UserService:
def __init__(self, db):
self._db = db
async def get_user(self, user_id):
row = await self._db.fetch_one(f"SELECT * FROM users WHERE id={user_id}")
if row is None:
raise ValueError(f"User {user_id} not found")
return row
# Fixture: mock DB
@pytest.fixture
async def mock_db():
db = AsyncMock()
db.fetch_one.return_value = {"id": 1, "name": "Alice"}
yield db
@pytest.mark.asyncio
async def test_get_user_success(mock_db):
service = UserService(mock_db)
user = await service.get_user(1)
assert user["name"] == "Alice"
mock_db.fetch_one.assert_called_once()
@pytest.mark.asyncio
async def test_get_user_not_found(mock_db):
mock_db.fetch_one.return_value = None
service = UserService(mock_db)
with pytest.raises(ValueError, match="User 99 not found"):
await service.get_user(99)
# Running inline (not via pytest) for demonstration:
async def run_tests():
db_mock = AsyncMock()
db_mock.fetch_one.return_value = {"id": 1, "name": "Alice"}
service = UserService(db_mock)
user = await service.get_user(1)
assert user["name"] == "Alice"
db_mock.fetch_one.return_value = None
try:
await service.get_user(99)
assert False, "Should have raised"
except ValueError as e:
assert "99" in str(e)
print("All tests pass")
asyncio.run(run_tests())
Solution
import asyncio
from unittest.mock import AsyncMock
class UserService:
def __init__(self, db):
self._db = db
async def get_user(self, user_id):
row = await self._db.fetch_one(f"SELECT * FROM users WHERE id={user_id}")
if row is None:
raise ValueError(f"User {user_id} not found")
return row
async def run_tests():
db_mock = AsyncMock()
db_mock.fetch_one.return_value = {"id": 1, "name": "Alice"}
service = UserService(db_mock)
user = await service.get_user(1)
assert user["name"] == "Alice"
db_mock.fetch_one.assert_called_once()
db_mock.fetch_one.return_value = None
try:
await service.get_user(99)
assert False
except ValueError as e:
assert "99" in str(e)
print("All tests pass")
asyncio.run(run_tests())
Async testing patterns:
AsyncMock()is the correct mock class for async functions — it returns a coroutine when called, not a plain value.mock.method.return_value = valueconfigures what the mock returns when awaited.mock.method.side_effect = [val1, val2, Exception()]sequences multiple return values or exceptions.- For pytest:
@pytest.mark.asynciomarks an async test. Theasynciofixture inpytest-asyncioprovides the event loop. pytest-asynciomode: setasyncio_mode = "auto"inpytest.inito avoid marking every test.- Async fixtures with
yieldhandle setup/teardown cleanly — theyieldpoint is the test execution.
Expected Output
All tests passHints
Hint 1: Mark async test functions with @pytest.mark.asyncio.
Hint 2: Async fixtures use yield for setup/teardown. Mock async functions with AsyncMock.
Hard
Build an async worker pool that processes a job queue and drains gracefully when signaled to stop.
import asyncio
class WorkerPool:
def __init__(self, num_workers):
self._queue = asyncio.Queue()
self._workers = []
self._num_workers = num_workers
async def start(self):
for i in range(self._num_workers):
self._workers.append(asyncio.create_task(self._worker(i)))
async def submit(self, job):
await self._queue.put(job)
async def drain_and_stop(self):
# Send sentinel None for each worker to stop
for _ in range(self._num_workers):
await self._queue.put(None)
await asyncio.gather(*self._workers)
async def _worker(self, worker_id):
while True:
job = await self._queue.get()
if job is None:
self._queue.task_done()
break
await job()
self._queue.task_done()
async def main():
pool = WorkerPool(num_workers=3)
await pool.start()
completed = []
async def make_job(job_id):
async def job():
await asyncio.sleep(0.005)
completed.append(job_id)
return job
for i in range(10):
await pool.submit(await make_job(i))
await pool.drain_and_stop()
print(f"All {len(completed)} jobs processed")
print("Pool drained gracefully")
asyncio.run(main())
Solution
import asyncio
class WorkerPool:
def __init__(self, num_workers):
self._queue = asyncio.Queue()
self._workers = []
self._num_workers = num_workers
async def start(self):
for i in range(self._num_workers):
self._workers.append(asyncio.create_task(self._worker(i)))
async def submit(self, job):
await self._queue.put(job)
async def drain_and_stop(self):
for _ in range(self._num_workers):
await self._queue.put(None)
await asyncio.gather(*self._workers)
async def _worker(self, worker_id):
while True:
job = await self._queue.get()
if job is None:
self._queue.task_done()
break
await job()
self._queue.task_done()
async def main():
pool = WorkerPool(num_workers=3)
await pool.start()
completed = []
async def make_job(job_id):
async def job():
await asyncio.sleep(0.005)
completed.append(job_id)
return job
for i in range(10):
await pool.submit(await make_job(i))
await pool.drain_and_stop()
print(f"All {len(completed)} jobs processed")
print("Pool drained gracefully")
asyncio.run(main())
Worker pool design:
- One
asyncio.Queueserves as the shared job buffer. - N workers run concurrently, each pulling from the queue in a loop.
- Sentinel
Nonevalues stop workers cleanly — one sentinel per worker ensures all workers stop. queue.task_done()+queue.join()provides an alternative drain pattern if jobs are submitted by external code.- Production extensions: job priority (use
PriorityQueue), job timeout (wrap each job inasyncio.wait_for), dead letter queue for failed jobs, dynamic worker scaling.
Expected Output
All 10 jobs processed\nPool drained gracefullyHints
Hint 1: Use a queue for jobs and N worker tasks consuming from it. Graceful drain: send N sentinel values to stop workers.
Hint 2: Wait for queue.join() to ensure all items are task_done before declaring completion.
Implement an in-memory distributed lock with TTL expiry for use across multiple concurrent "clients".
import asyncio
import time
class TTLLock:
def __init__(self, ttl_seconds):
self._ttl = ttl_seconds
self._lock = asyncio.Lock()
self._holder = None
self._acquired_at = None
def _is_expired(self):
if self._acquired_at is None:
return False
return time.monotonic() - self._acquired_at > self._ttl
async def acquire(self, client_id):
while True:
async with self._lock:
if self._holder is None or self._is_expired():
self._holder = client_id
self._acquired_at = time.monotonic()
return True
await asyncio.sleep(0.005)
async def release(self, client_id):
async with self._lock:
if self._holder == client_id:
self._holder = None
self._acquired_at = None
async def main():
lock = TTLLock(ttl_seconds=0.05)
async def client(client_id, sleep_before=0):
await asyncio.sleep(sleep_before)
await lock.acquire(client_id)
print(f"Lock acquired by {client_id}")
await asyncio.sleep(0.03) # hold for 30ms
await lock.release(client_id)
# client_1 holds lock; client_2 waits for TTL expiry (50ms)
await asyncio.gather(
client("client_1"),
client("client_2", sleep_before=0.01),
)
print("Both clients processed")
asyncio.run(main())
Solution
import asyncio
import time
class TTLLock:
def __init__(self, ttl_seconds):
self._ttl = ttl_seconds
self._lock = asyncio.Lock()
self._holder = None
self._acquired_at = None
def _is_expired(self):
if self._acquired_at is None:
return False
return time.monotonic() - self._acquired_at > self._ttl
async def acquire(self, client_id):
while True:
async with self._lock:
if self._holder is None or self._is_expired():
self._holder = client_id
self._acquired_at = time.monotonic()
return True
await asyncio.sleep(0.005)
async def release(self, client_id):
async with self._lock:
if self._holder == client_id:
self._holder = None
self._acquired_at = None
async def main():
lock = TTLLock(ttl_seconds=0.05)
async def client(client_id, sleep_before=0):
await asyncio.sleep(sleep_before)
await lock.acquire(client_id)
print(f"Lock acquired by {client_id}")
await asyncio.sleep(0.03)
await lock.release(client_id)
await asyncio.gather(
client("client_1"),
client("client_2", sleep_before=0.01),
)
print("Both clients processed")
asyncio.run(main())
TTL lock design:
- TTL prevents lock starvation: if the holder crashes without releasing, the lock auto-expires.
- The
asyncio.Lockprotects the lock state itself — preventing TOCTOU races inacquire(). - The polling
while True/asyncio.sleep(0.005)is a simple approach. Production: useasyncio.Condition.wait_for()to avoid busy-polling. - Real distributed locks (Redis
SETNX+ TTL, ZooKeeper ephemeral nodes) are similar but with network-based coordination. - Production concern: if a client holds the lock and the operation takes longer than TTL, another client may acquire it — the original client's release becomes a no-op.
Expected Output
Lock acquired by client_1\nLock expired — acquired by client_2\nBoth clients processedHints
Hint 1: Track lock acquisition time. If current time minus acquisition time exceeds TTL, the lock is considered expired.
Hint 2: Implement a background task that checks for and expires stale locks.
Implement an async event bus supporting pub-sub with multiple handlers per event and concurrent dispatch.
import asyncio
from collections import defaultdict
class AsyncEventBus:
def __init__(self):
self._handlers = defaultdict(list)
def subscribe(self, event_type, handler):
self._handlers[event_type].append(handler)
def unsubscribe(self, event_type, handler):
self._handlers[event_type].remove(handler)
async def publish(self, event_type, payload):
handlers = self._handlers.get(event_type, []) + self._handlers.get("*", [])
if handlers:
await asyncio.gather(*[h(event_type, payload) for h in handlers])
async def main():
bus = AsyncEventBus()
async def user_handler_1(event, payload):
print(f"user.created handler 1: {payload['name']}")
async def user_handler_2(event, payload):
print(f"user.created handler 2: {payload['name']}")
async def order_handler(event, payload):
print(f"order.placed handler: {payload['order_id']}")
bus.subscribe("user.created", user_handler_1)
bus.subscribe("user.created", user_handler_2)
bus.subscribe("order.placed", order_handler)
await bus.publish("user.created", {"name": "alice"})
await bus.publish("order.placed", {"order_id": "order_99"})
asyncio.run(main())
Solution
import asyncio
from collections import defaultdict
class AsyncEventBus:
def __init__(self):
self._handlers = defaultdict(list)
def subscribe(self, event_type, handler):
self._handlers[event_type].append(handler)
def unsubscribe(self, event_type, handler):
self._handlers[event_type].remove(handler)
async def publish(self, event_type, payload):
handlers = self._handlers.get(event_type, []) + self._handlers.get("*", [])
if handlers:
await asyncio.gather(*[h(event_type, payload) for h in handlers])
async def main():
bus = AsyncEventBus()
async def user_handler_1(event, payload):
print(f"user.created handler 1: {payload['name']}")
async def user_handler_2(event, payload):
print(f"user.created handler 2: {payload['name']}")
async def order_handler(event, payload):
print(f"order.placed handler: {payload['order_id']}")
bus.subscribe("user.created", user_handler_1)
bus.subscribe("user.created", user_handler_2)
bus.subscribe("order.placed", order_handler)
await bus.publish("user.created", {"name": "alice"})
await bus.publish("order.placed", {"order_id": "order_99"})
asyncio.run(main())
Async event bus design:
asyncio.gather(*handlers)dispatches all handlers concurrently — a slow handler doesn't block others.- Wildcard
"*"handlers receive every event — useful for logging and monitoring. return_exceptions=Truein gather makes the bus resilient: a handler that raises doesn't abort other handlers.- Production extensions: async handler queuing per subscriber (decouple publish speed from processing speed), dead letter queue for failed handlers, event persistence, and type-safe event schemas using dataclasses or Pydantic.
- This pattern is the foundation of CQRS (Command Query Responsibility Segregation) and event-driven microservices.
Expected Output
user.created handler 1: alice\nuser.created handler 2: alice\norder.placed handler: order_99Hints
Hint 1: Store handlers per event type in a dict. Publishing fires all handlers concurrently with asyncio.gather.
Hint 2: Support wildcard subscriptions with a special "*" key.
Build a minimal production-grade async service that combines request processing, health checks, metrics, and graceful shutdown.
import asyncio
import time
class Service:
def __init__(self):
self.shutdown = asyncio.Event()
self.metrics = {"requests": 0, "errors": 0, "uptime_start": time.monotonic()}
self._tasks = []
async def handle_request(self, request_id):
await asyncio.sleep(0.005)
self.metrics["requests"] += 1
async def health_check(self):
return {
"status": "healthy",
"requests": self.metrics["requests"],
"uptime": round(time.monotonic() - self.metrics["uptime_start"], 2),
}
async def run(self):
for i in range(5):
if self.shutdown.is_set():
break
await self.handle_request(i)
print(f"Processed {self.metrics['requests']} requests")
async def drain(self):
for t in self._tasks:
if not t.done():
await t
print("Graceful shutdown complete")
async def main():
svc = Service()
print("Service started")
await svc.run()
health = await svc.health_check()
print(f"Health: {health['status']}")
svc.shutdown.set()
await svc.drain()
asyncio.run(main())
Solution
import asyncio
import time
class Service:
def __init__(self):
self.shutdown = asyncio.Event()
self.metrics = {"requests": 0, "errors": 0, "uptime_start": time.monotonic()}
self._tasks = []
async def handle_request(self, request_id):
await asyncio.sleep(0.005)
self.metrics["requests"] += 1
async def health_check(self):
return {
"status": "healthy",
"requests": self.metrics["requests"],
"uptime": round(time.monotonic() - self.metrics["uptime_start"], 2),
}
async def run(self):
for i in range(5):
if self.shutdown.is_set():
break
await self.handle_request(i)
print(f"Processed {self.metrics['requests']} requests")
async def drain(self):
for t in self._tasks:
if not t.done():
await t
print("Graceful shutdown complete")
async def main():
svc = Service()
print("Service started")
await svc.run()
health = await svc.health_check()
print(f"Health: {health['status']}")
svc.shutdown.set()
await svc.drain()
asyncio.run(main())
Production service architecture:
asyncio.Eventfor shutdown coordination — the service checksshutdown.is_set()before accepting new work.- Metrics dict tracks key operational counters — in production, export these to Prometheus or StatsD.
- Health check returns both status and operational metrics — useful for debugging and alerting.
- Drain pattern: await all in-flight tasks before declaring shutdown complete.
- Extension to a real service: add signal handlers (SIGTERM →
shutdown.set()), a timeout on drain, connection pool lifecycle management, and async startup hooks for resource initialization.
Expected Output
Service started\nProcessed 5 requests\nHealth: healthy\nGraceful shutdown completeHints
Hint 1: Combine all production patterns: worker pool, health checks, metrics counters, and graceful shutdown.
Hint 2: Use asyncio.gather to run the server and a background metrics reporter concurrently.
