Skip to main content

Production Async Architecture

Before reading any explanation, study this production startup sequence and predict what happens when Ctrl+C is pressed during the "Processing" phase:

import asyncio
import signal

class Service:
def __init__(self):
self._shutdown = asyncio.Event()
self._tasks: set[asyncio.Task] = set()

async def run(self):
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, self._shutdown.set)
loop.add_signal_handler(signal.SIGTERM, self._shutdown.set)

print("Starting workers...")
async with asyncio.TaskGroup() as tg:
for i in range(3):
task = tg.create_task(self._worker(i))
self._tasks.add(task)
tg.create_task(self._shutdown_monitor())

async def _worker(self, wid):
while not self._shutdown.is_set():
print(f"Worker {wid}: Processing")
await asyncio.sleep(1)
print(f"Worker {wid}: Draining...")
await asyncio.sleep(0.5)
print(f"Worker {wid}: Done")

async def _shutdown_monitor(self):
await self._shutdown.wait()
print("Shutdown signal received")
# Cancel the TaskGroup by raising
raise SystemExit(0)

asyncio.run(Service().run())

What happens when you press Ctrl+C?

The signal handler calls self._shutdown.set(). The shutdown monitor wakes, prints "Shutdown signal received", and raises SystemExit. The TaskGroup catches this BaseException, cancels all workers, and waits for them to finish. But the workers check self._shutdown.is_set() -- it is True, so they exit their loop, run the drain logic, and print "Done". The SystemExit then propagates from the TaskGroup as a BaseExceptionGroup.

This works, but it has problems. SystemExit inside a TaskGroup wraps into a BaseExceptionGroup, which is awkward to handle. The workers might not see the shutdown event before cancellation reaches them. This lesson covers how to build these patterns correctly.

What You Will Learn

  • Error handling strategies for async production services
  • Graceful shutdown with signal handlers and cleanup sequences
  • Health check endpoints and liveness monitoring
  • Backpressure with bounded queues and flow control
  • Testing async code with pytest-asyncio
  • Structured logging in concurrent async code
  • Real-world FastAPI/Starlette production patterns

Prerequisites

  • All previous lessons in this module (async generators, context managers, TaskGroup, awaitables, event loop, synchronization)
  • Familiarity with FastAPI or Starlette at a basic level
  • Experience writing tests with pytest

Part 1 -- Error Handling Strategies

Per-Task Error Isolation

In production, you rarely want one failed task to bring down the entire service. Isolate errors at the task level:

import asyncio
import logging
import traceback

logger = logging.getLogger(__name__)

async def resilient_worker(name: str, work_queue: asyncio.Queue):
"""Worker that handles errors without crashing."""
while True:
item = await work_queue.get()
try:
await process_item(item)
except asyncio.CancelledError:
# Put item back for another worker on shutdown
await work_queue.put(item)
raise
except Exception:
logger.exception(f"Worker {name} failed to process {item}")
# Optionally: send to dead letter queue
await dead_letter_queue.put((item, traceback.format_exc()))
finally:
work_queue.task_done()

Exception Group Handling in Production

With TaskGroup and ExceptionGroups, you need deliberate strategies:

import asyncio

async def fetch_all_data(endpoints: list[str]) -> dict:
"""Fetch data from multiple endpoints, collecting partial results."""
results = {}
errors = []

async def safe_fetch(endpoint):
try:
data = await fetch_endpoint(endpoint)
results[endpoint] = data
except Exception as e:
errors.append((endpoint, e))

async with asyncio.TaskGroup() as tg:
for endpoint in endpoints:
tg.create_task(safe_fetch(endpoint))

if errors:
failed = [ep for ep, _ in errors]
logger.warning(f"Partial failure: {len(errors)} endpoints failed: {failed}")

return results

The Error Boundary Pattern

Create explicit boundaries where errors are caught, logged, and converted:

from contextlib import asynccontextmanager
from typing import TypeVar, Generic
from dataclasses import dataclass

T = TypeVar("T")

@dataclass
class ServiceResult(Generic[T]):
data: T | None = None
error: str | None = None
error_type: str | None = None

@property
def ok(self) -> bool:
return self.error is None


@asynccontextmanager
async def error_boundary(operation_name: str):
"""Catch, log, and convert exceptions at service boundaries."""
try:
yield
except asyncio.CancelledError:
raise # Never catch cancellation
except asyncio.TimeoutError:
logger.warning(f"{operation_name}: timed out")
raise
except ConnectionError as e:
logger.error(f"{operation_name}: connection failed: {e}")
raise
except Exception:
logger.exception(f"{operation_name}: unexpected error")
raise


async def handle_user_request(user_id: int) -> ServiceResult:
try:
async with error_boundary("fetch_user"):
user = await fetch_user(user_id)
async with error_boundary("fetch_permissions"):
perms = await fetch_permissions(user_id)
return ServiceResult(data={"user": user, "permissions": perms})
except asyncio.TimeoutError:
return ServiceResult(error="Service temporarily unavailable",
error_type="timeout")
except ConnectionError:
return ServiceResult(error="Could not reach backend service",
error_type="connection")
except Exception as e:
return ServiceResult(error="Internal error",
error_type="internal")

Part 2 -- Graceful Shutdown

A production async service must shut down cleanly: stop accepting new work, finish in-progress tasks, close connections, and flush buffers.

The Shutdown Sequence

Implementation

import asyncio
import signal
import logging

logger = logging.getLogger(__name__)


class GracefulService:
"""Production service with proper shutdown handling."""

def __init__(self, shutdown_timeout: float = 30.0):
self.shutdown_timeout = shutdown_timeout
self._shutdown_event = asyncio.Event()
self._worker_tasks: set[asyncio.Task] = set()
self._accepting_work = True

async def start(self):
"""Main entry point."""
loop = asyncio.get_event_loop()

# Register signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self._request_shutdown, sig)

try:
# Initialize resources
await self._startup()

# Run workers
await self._run_workers()
except asyncio.CancelledError:
logger.info("Service cancelled")
finally:
await self._cleanup()

def _request_shutdown(self, sig):
sig_name = signal.Signals(sig).name
logger.info(f"Received {sig_name}, initiating graceful shutdown")
self._accepting_work = False
self._shutdown_event.set()

async def _startup(self):
logger.info("Initializing resources...")
# Initialize database pool, cache, etc.
await asyncio.sleep(0.1)
logger.info("Resources initialized")

async def _run_workers(self):
# Start workers
for i in range(4):
task = asyncio.create_task(
self._worker(f"worker-{i}"),
name=f"worker-{i}"
)
self._worker_tasks.add(task)
task.add_done_callback(self._worker_tasks.discard)

# Wait for shutdown signal
await self._shutdown_event.wait()

# Stop accepting new work and drain
logger.info(f"Draining workers (timeout: {self.shutdown_timeout}s)")
await self._drain_workers()

async def _drain_workers(self):
"""Wait for workers to finish, then force-cancel if needed."""
if not self._worker_tasks:
return

# Wait for workers to finish naturally
done, pending = await asyncio.wait(
self._worker_tasks,
timeout=self.shutdown_timeout
)

if pending:
logger.warning(
f"{len(pending)} workers did not finish in time, cancelling"
)
for task in pending:
task.cancel()

# Wait for cancellation to complete
done2, still_pending = await asyncio.wait(
pending, timeout=5.0
)

if still_pending:
logger.error(
f"{len(still_pending)} workers did not respond to cancellation"
)

# Check for unexpected errors
for task in done:
if task.exception() and not isinstance(
task.exception(), asyncio.CancelledError
):
logger.error(
f"Worker {task.get_name()} failed: {task.exception()}"
)

async def _cleanup(self):
logger.info("Closing resources...")
# Close database pools, flush logs, etc.
await asyncio.sleep(0.1)
logger.info("Shutdown complete")

async def _worker(self, name: str):
logger.info(f"{name}: started")
try:
while not self._shutdown_event.is_set():
if self._accepting_work:
await self._process_next(name)
else:
# Not accepting new work, but finish current
break
except asyncio.CancelledError:
logger.info(f"{name}: cancelled, running cleanup")
raise
finally:
logger.info(f"{name}: stopped")

async def _process_next(self, name):
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=2.0
)
except asyncio.TimeoutError:
# Normal: process one unit of work
logger.debug(f"{name}: processing item")
await asyncio.sleep(0.1)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
asyncio.run(GracefulService(shutdown_timeout=10.0).start())

Part 3 -- Health Checks

Production services need health endpoints that report whether the service can handle traffic.

import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum


class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"


@dataclass
class ComponentHealth:
name: str
status: HealthStatus
latency_ms: float = 0.0
message: str = ""
last_check: float = 0.0


class HealthChecker:
"""Async health checker for service dependencies."""

def __init__(self, check_interval: float = 10.0):
self.check_interval = check_interval
self._checks: dict[str, callable] = {}
self._results: dict[str, ComponentHealth] = {}
self._running = False

def register(self, name: str, check_func):
"""Register a health check function.

check_func should be an async function that either:
- Returns normally (healthy)
- Raises an exception (unhealthy)
"""
self._checks[name] = check_func

async def start(self):
"""Start periodic health checking."""
self._running = True
while self._running:
await self._run_all_checks()
await asyncio.sleep(self.check_interval)

def stop(self):
self._running = False

async def _run_all_checks(self):
async with asyncio.TaskGroup() as tg:
for name, check in self._checks.items():
tg.create_task(self._run_check(name, check))

async def _run_check(self, name: str, check_func):
start = time.perf_counter()
try:
await asyncio.wait_for(check_func(), timeout=5.0)
latency = (time.perf_counter() - start) * 1000
self._results[name] = ComponentHealth(
name=name,
status=HealthStatus.HEALTHY,
latency_ms=latency,
last_check=time.time()
)
except asyncio.TimeoutError:
self._results[name] = ComponentHealth(
name=name,
status=HealthStatus.UNHEALTHY,
message="Health check timed out",
last_check=time.time()
)
except Exception as e:
self._results[name] = ComponentHealth(
name=name,
status=HealthStatus.UNHEALTHY,
message=str(e),
last_check=time.time()
)

@property
def overall_status(self) -> HealthStatus:
if not self._results:
return HealthStatus.UNHEALTHY
statuses = [r.status for r in self._results.values()]
if all(s == HealthStatus.HEALTHY for s in statuses):
return HealthStatus.HEALTHY
if any(s == HealthStatus.UNHEALTHY for s in statuses):
return HealthStatus.UNHEALTHY
return HealthStatus.DEGRADED

def to_dict(self) -> dict:
return {
"status": self.overall_status.value,
"components": {
name: {
"status": r.status.value,
"latency_ms": round(r.latency_ms, 2),
"message": r.message,
}
for name, r in self._results.items()
}
}

Usage with FastAPI:

from fastapi import FastAPI

app = FastAPI()
health = HealthChecker(check_interval=15.0)

# Register checks
health.register("database", check_database_connection)
health.register("redis", check_redis_connection)
health.register("external_api", check_external_api)

@app.on_event("startup")
async def startup():
asyncio.create_task(health.start())

@app.get("/health")
async def health_endpoint():
result = health.to_dict()
status_code = 200 if result["status"] != "unhealthy" else 503
return JSONResponse(result, status_code=status_code)

@app.get("/health/live")
async def liveness():
"""Kubernetes liveness probe -- is the process alive?"""
return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
"""Kubernetes readiness probe -- can we serve traffic?"""
if health.overall_status == HealthStatus.UNHEALTHY:
return JSONResponse({"status": "not ready"}, status_code=503)
return {"status": "ready"}

Part 4 -- Backpressure

Backpressure prevents fast producers from overwhelming slow consumers. Without it, queues grow unbounded, memory usage spikes, and the service crashes.

Bounded Queues

import asyncio

async def producer(queue: asyncio.Queue, name: str):
"""Produces items. Blocks when queue is full (backpressure)."""
for i in range(100):
item = f"{name}-{i}"
await queue.put(item) # Blocks if queue is full
logger.debug(f"Produced: {item}, queue size: {queue.qsize()}")

async def consumer(queue: asyncio.Queue, name: str):
"""Consumes items at its own pace."""
while True:
item = await queue.get()
if item is None: # Poison pill
break
# Simulate slow processing
await asyncio.sleep(0.1)
logger.debug(f"{name} processed: {item}")
queue.task_done()


async def main():
# Bounded queue: max 10 items
queue: asyncio.Queue = asyncio.Queue(maxsize=10)

async with asyncio.TaskGroup() as tg:
# Fast producer
tg.create_task(producer(queue, "producer"))
# Slow consumers
consumers = [
tg.create_task(consumer(queue, f"consumer-{i}"))
for i in range(3)
]

# Wait for all items to be processed
await queue.join()

# Send poison pills to stop consumers
for _ in consumers:
await queue.put(None)

asyncio.run(main())

Flow Control with Watermarks

For streaming pipelines, use high and low watermarks to pause and resume producers:

class BackpressureChannel:
"""Channel with watermark-based flow control."""

def __init__(self, high_water: int = 100, low_water: int = 25):
self.high_water = high_water
self.low_water = low_water
self._queue: asyncio.Queue = asyncio.Queue()
self._paused = asyncio.Event()
self._paused.set() # Start unpaused

async def send(self, item):
"""Send an item, blocking if backpressure is active."""
await self._paused.wait() # Block if paused
await self._queue.put(item)

if self._queue.qsize() >= self.high_water:
self._paused.clear() # Pause producers
logger.warning(f"Backpressure activated: queue size {self._queue.qsize()}")

async def receive(self):
"""Receive an item."""
item = await self._queue.get()

if not self._paused.is_set() and self._queue.qsize() <= self.low_water:
self._paused.set() # Resume producers
logger.info(f"Backpressure released: queue size {self._queue.qsize()}")

return item

@property
def stats(self):
return {
"queue_size": self._queue.qsize(),
"paused": not self._paused.is_set(),
"high_water": self.high_water,
"low_water": self.low_water,
}

Dropping Strategies

When backpressure is not acceptable (real-time systems), drop items instead:

class DroppingChannel:
"""Channel that drops oldest items when full."""

def __init__(self, maxsize: int = 100):
self._queue = asyncio.Queue(maxsize=maxsize)

async def send(self, item):
if self._queue.full():
dropped = self._queue.get_nowait()
logger.warning(f"Dropped oldest item: {dropped}")
await self._queue.put(item)

async def receive(self):
return await self._queue.get()

Part 5 -- Async Testing with pytest-asyncio

Setup

pip install pytest pytest-asyncio
# conftest.py
import asyncio
import pytest

# Configure pytest-asyncio mode
# Options: "auto" (all async tests), "strict" (require explicit marks)
# In pyproject.toml: [tool.pytest.ini_options] asyncio_mode = "auto"

Basic Async Tests

# test_service.py
import asyncio
import pytest


@pytest.mark.asyncio
async def test_basic_async():
"""Test an async function."""
result = await asyncio.sleep(0.01, result=42)
assert result == 42


@pytest.mark.asyncio
async def test_async_function():
async def add(a, b):
await asyncio.sleep(0)
return a + b

result = await add(2, 3)
assert result == 5

Testing with Fixtures

import pytest
from contextlib import asynccontextmanager


@asynccontextmanager
async def create_test_pool():
"""Create a test database pool."""
pool = {"connected": True, "queries": []}
yield pool
pool["connected"] = False


@pytest.fixture
async def db_pool():
"""Async fixture for database pool."""
async with create_test_pool() as pool:
yield pool


@pytest.mark.asyncio
async def test_database_query(db_pool):
assert db_pool["connected"] is True
db_pool["queries"].append("SELECT 1")
assert len(db_pool["queries"]) == 1


@pytest.fixture
async def service(db_pool):
"""Compose fixtures."""
svc = Service(db_pool)
await svc.start()
yield svc
await svc.stop()

Testing Timeouts and Cancellation

@pytest.mark.asyncio
async def test_timeout_handling():
"""Verify function handles timeout correctly."""
async def slow_operation():
await asyncio.sleep(10)
return "never reached"

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(), timeout=0.1)


@pytest.mark.asyncio
async def test_cancellation_cleanup():
"""Verify cleanup runs on cancellation."""
cleaned_up = False

async def task_with_cleanup():
nonlocal cleaned_up
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cleaned_up = True
raise

task = asyncio.create_task(task_with_cleanup())
await asyncio.sleep(0.01)
task.cancel()

with pytest.raises(asyncio.CancelledError):
await task

assert cleaned_up is True

Testing TaskGroups and ExceptionGroups

@pytest.mark.asyncio
async def test_taskgroup_error_handling():
"""Verify TaskGroup collects all errors."""
async def fail_with(error):
raise error

with pytest.raises(ExceptionGroup) as exc_info:
async with asyncio.TaskGroup() as tg:
tg.create_task(fail_with(ValueError("val")))
tg.create_task(fail_with(TypeError("type")))

eg = exc_info.value
assert len(eg.exceptions) == 2
types = {type(e) for e in eg.exceptions}
assert types == {ValueError, TypeError}


@pytest.mark.asyncio
async def test_except_star():
"""Test except* handling."""
caught_values = []
caught_types = []

try:
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(0)) # succeeds
raise ValueError("direct error")
except* ValueError as eg:
caught_values.extend(eg.exceptions)
except* TypeError as eg:
caught_types.extend(eg.exceptions)

assert len(caught_values) == 1
assert len(caught_types) == 0

Testing Rate Limiters

import time

@pytest.mark.asyncio
async def test_rate_limiter():
limiter = TokenBucket(rate=10, burst=5)

# Burst: first 5 should be instant
start = time.perf_counter()
for _ in range(5):
await limiter.acquire()
burst_time = time.perf_counter() - start
assert burst_time < 0.1 # Burst should be near-instant

# Next request should wait ~0.1s (1/10 rate)
start = time.perf_counter()
await limiter.acquire()
wait_time = time.perf_counter() - start
assert 0.05 < wait_time < 0.2 # ~0.1s with tolerance

Mocking Async Functions

from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
"""Mock an async dependency."""
mock_fetch = AsyncMock(return_value={"id": 1, "name": "Test"})

result = await mock_fetch("https://api.example.com/user/1")

mock_fetch.assert_called_once_with("https://api.example.com/user/1")
assert result["name"] == "Test"


@pytest.mark.asyncio
async def test_mock_side_effect():
"""Mock with different responses per call."""
mock_fetch = AsyncMock(side_effect=[
{"id": 1},
ConnectionError("network down"),
{"id": 2},
])

result1 = await mock_fetch("url1")
assert result1 == {"id": 1}

with pytest.raises(ConnectionError):
await mock_fetch("url2")

result3 = await mock_fetch("url3")
assert result3 == {"id": 2}

Part 6 -- Structured Logging in Async Code

Synchronous logging works in async code, but you lose context about which task produced which log. Structured logging solves this.

Context Variables for Task Tracking

import asyncio
import contextvars
import logging
import json

# Context variable for request tracking
request_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
'request_id', default='no-request'
)
task_name_var: contextvars.ContextVar[str] = contextvars.ContextVar(
'task_name', default='main'
)


class AsyncContextFilter(logging.Filter):
"""Inject async context into log records."""

def filter(self, record):
record.request_id = request_id_var.get('no-request')
record.task_name = task_name_var.get('main')

# Add current task name if available
task = asyncio.current_task()
if task:
record.async_task = task.get_name()
else:
record.async_task = 'no-task'

return True


class JSONFormatter(logging.Formatter):
"""Format logs as JSON for structured logging systems."""

def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"request_id": getattr(record, 'request_id', None),
"task": getattr(record, 'async_task', None),
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)


def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
handler.addFilter(AsyncContextFilter())

root = logging.getLogger()
root.addHandler(handler)
root.setLevel(logging.INFO)

Using Context in Request Handlers

import uuid

async def handle_request(request):
"""FastAPI/Starlette-style request handler with context."""
# Set request context
req_id = str(uuid.uuid4())[:8]
request_id_var.set(req_id)

logger.info("Request started", extra={"path": request.path})

try:
result = await process_request(request)
logger.info("Request completed")
return result
except Exception:
logger.exception("Request failed")
raise

Async-Safe Log Flushing

class AsyncLogBuffer:
"""Buffer log entries and flush asynchronously."""

def __init__(self, flush_interval: float = 5.0, max_buffer: int = 100):
self._buffer: list[dict] = []
self._lock = asyncio.Lock()
self._flush_interval = flush_interval
self._max_buffer = max_buffer

async def log(self, entry: dict):
async with self._lock:
self._buffer.append(entry)
if len(self._buffer) >= self._max_buffer:
await self._flush()

async def start_periodic_flush(self):
while True:
await asyncio.sleep(self._flush_interval)
async with self._lock:
if self._buffer:
await self._flush()

async def _flush(self):
entries = self._buffer.copy()
self._buffer.clear()
# Send to logging service (Elasticsearch, CloudWatch, etc.)
await send_to_logging_service(entries)

Part 7 -- Production FastAPI/Starlette Patterns

Application Lifespan

from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg
import aiohttp

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle."""
# Startup
app.state.db_pool = await asyncpg.create_pool(
"postgresql://localhost/mydb",
min_size=5,
max_size=20,
)
app.state.http_client = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
app.state.health = HealthChecker()
app.state.health.register("database", lambda: check_db(app.state.db_pool))

health_task = asyncio.create_task(app.state.health.start())

logger.info("Application started")

yield # Application serves requests here

# Shutdown
logger.info("Application shutting down")
app.state.health.stop()
health_task.cancel()

await app.state.http_client.close()
await app.state.db_pool.close()

logger.info("Application shutdown complete")


app = FastAPI(lifespan=lifespan)

Middleware for Request Tracking

import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

class RequestTrackingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
request_id_var.set(request_id)

start = time.perf_counter()
logger.info(f"Request start: {request.method} {request.url.path}")

try:
response = await call_next(request)
elapsed = (time.perf_counter() - start) * 1000
logger.info(
f"Request end: {request.method} {request.url.path} "
f"status={response.status_code} duration={elapsed:.1f}ms"
)
response.headers["X-Request-ID"] = request_id
response.headers["X-Response-Time"] = f"{elapsed:.1f}ms"
return response
except Exception:
elapsed = (time.perf_counter() - start) * 1000
logger.exception(
f"Request error: {request.method} {request.url.path} "
f"duration={elapsed:.1f}ms"
)
raise

app.add_middleware(RequestTrackingMiddleware)

Dependency Injection for Async Resources

from fastapi import Depends, Request

async def get_db(request: Request):
"""Provide a database connection from the pool."""
async with request.app.state.db_pool.acquire() as conn:
yield conn

async def get_http_client(request: Request) -> aiohttp.ClientSession:
return request.app.state.http_client

@app.get("/users/{user_id}")
async def get_user(
user_id: int,
db=Depends(get_db),
http=Depends(get_http_client),
):
user = await db.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
if not user:
raise HTTPException(404, "User not found")

# Enrich with external data
try:
async with asyncio.timeout(5.0):
profile = await http.get(f"https://profiles.example.com/{user_id}")
profile_data = await profile.json()
except (asyncio.TimeoutError, aiohttp.ClientError):
profile_data = None # Graceful degradation

return {"user": dict(user), "profile": profile_data}

Background Task Management

class BackgroundTaskManager:
"""Manage background tasks with monitoring and cleanup."""

def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock()

async def start_task(self, name: str, coro):
async with self._lock:
if name in self._tasks and not self._tasks[name].done():
raise ValueError(f"Task {name} is already running")

task = asyncio.create_task(coro, name=name)
self._tasks[name] = task
task.add_done_callback(lambda t: self._on_task_done(name, t))
logger.info(f"Background task started: {name}")

def _on_task_done(self, name: str, task: asyncio.Task):
if task.cancelled():
logger.info(f"Background task cancelled: {name}")
elif task.exception():
logger.error(
f"Background task failed: {name}",
exc_info=task.exception()
)
else:
logger.info(f"Background task completed: {name}")

async def stop_all(self, timeout: float = 10.0):
async with self._lock:
for name, task in self._tasks.items():
if not task.done():
logger.info(f"Cancelling background task: {name}")
task.cancel()

if self._tasks:
await asyncio.wait(
self._tasks.values(),
timeout=timeout
)

@property
def status(self) -> dict:
return {
name: {
"running": not task.done(),
"cancelled": task.cancelled() if task.done() else False,
"error": str(task.exception()) if task.done() and task.exception() else None,
}
for name, task in self._tasks.items()
}

Part 8 -- Architecture Summary

Key Takeaways

  • Error isolation: Wrap individual task logic in try/except. Use the error boundary pattern to catch, log, and convert exceptions at service boundaries. Never let one failing task crash the entire service.
  • Graceful shutdown: Register signal handlers (SIGTERM, SIGINT), set a shutdown event, stop accepting work, drain with a timeout, force-cancel if needed, then close resources.
  • Health checks: Run periodic async health checks against all dependencies. Expose /health, /health/live, and /health/ready endpoints for load balancers and orchestrators.
  • Backpressure: Use bounded asyncio.Queue to prevent memory exhaustion. Implement watermark-based flow control for streaming pipelines. Choose between blocking, dropping, or rejecting when buffers are full.
  • Testing: Use pytest-asyncio with @pytest.mark.asyncio. Test timeouts, cancellation, and ExceptionGroups explicitly. Use AsyncMock for async dependencies.
  • Structured logging: Use contextvars to propagate request IDs across async tasks. Format logs as JSON for structured logging systems. Include task names and request context in every log entry.
  • FastAPI patterns: Use the lifespan context manager for resource lifecycle. Dependency injection provides per-request resources from shared pools. Background task managers track and clean up long-running work.
  • Defense in depth: Combine rate limiting, circuit breaking, timeouts, and health checks. No single mechanism is sufficient. Layer them for resilient services.

Graded Practice Challenges

Level 1 -- Predict the Output

Question 1: What does this print when SIGINT is received after 2 seconds?

import asyncio
import signal

async def main():
shutdown = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, shutdown.set)

print("waiting")
await shutdown.wait()
print("shutting down")
await asyncio.sleep(0.1)
print("done")

asyncio.run(main())
Answer
waiting
shutting down
done

SIGINT triggers shutdown.set(), which wakes shutdown.wait(). The remaining code executes normally: "shutting down", sleep 0.1s, "done". The program exits cleanly. The signal handler does NOT raise KeyboardInterrupt because add_signal_handler replaces the default handler.

Question 2: What does this test assertion check?

@pytest.mark.asyncio
async def test_bounded_queue_backpressure():
queue = asyncio.Queue(maxsize=2)
await queue.put("a")
await queue.put("b")

put_task = asyncio.create_task(queue.put("c"))
await asyncio.sleep(0) # Give task a chance to run

assert not put_task.done() # What does this assert?

await queue.get()
await asyncio.sleep(0)

assert put_task.done() # And this?
Answer

The first assertion checks that put("c") is blocked because the queue is full (maxsize=2, already holds "a" and "b"). The task is not done -- it is waiting.

The second assertion checks that after removing one item with get(), the put task completes because there is now space in the queue. Backpressure is released.

Question 3: Will this test pass or fail?

from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_mock_async():
mock = AsyncMock(return_value=42)
result = mock("arg1", key="val")
assert result == 42
Answer

The test FAILS. AsyncMock() returns a coroutine when called, not the return value directly. You must await it:

result = await mock("arg1", key="val")
assert result == 42

Without await, result is a coroutine object, which is not equal to 42.

Level 2 -- Debug Challenge

This graceful shutdown implementation has a subtle bug that can cause data loss. Find it.

import asyncio
import signal

class WorkerService:
def __init__(self):
self._queue = asyncio.Queue()
self._shutdown = asyncio.Event()

async def run(self):
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, self._shutdown.set)

workers = [
asyncio.create_task(self._worker(i))
for i in range(3)
]

# Feed work
for i in range(100):
await self._queue.put(f"item-{i}")

await self._shutdown.wait()

# Cancel workers
for w in workers:
w.cancel()
await asyncio.gather(*workers, return_exceptions=True)

print(f"Items remaining in queue: {self._queue.qsize()}")

async def _worker(self, wid):
while True:
item = await self._queue.get()
await self._process(item)
self._queue.task_done()

async def _process(self, item):
await asyncio.sleep(0.1)
print(f"Processed: {item}")
Answer

The bug: when workers are cancelled, any item they have already dequeued with get() but not yet finished processing is lost. The cancellation happens at the await self._process(item) line, and the item disappears.

Fix: handle CancelledError in the worker and re-queue the item:

async def _worker(self, wid):
while True:
item = await self._queue.get()
try:
await self._process(item)
except asyncio.CancelledError:
# Put the item back so it can be processed later
# or persisted to a recovery queue
await self._queue.put(item)
raise
finally:
self._queue.task_done()

Additionally, instead of cancelling workers, signal them to stop and drain:

await self._shutdown.wait()

# Stop workers gracefully -- let them finish current items
# Send poison pills
for _ in workers:
await self._queue.put(None)

# Wait for workers to finish
await asyncio.gather(*workers)

With the worker checking:

async def _worker(self, wid):
while True:
item = await self._queue.get()
if item is None:
self._queue.task_done()
break
await self._process(item)
self._queue.task_done()

Level 3 -- Design Challenge

Design a complete AsyncServiceFramework that an engineer can subclass to build a production async service. The framework should provide:

  1. Startup and shutdown lifecycle with resource management
  2. Signal handling (SIGTERM, SIGINT) with graceful shutdown
  3. Health check registration and endpoint
  4. Background task management with monitoring
  5. Structured logging with request context propagation
  6. Configuration from environment variables
  7. Metrics collection (request count, latency histogram, error rate)
Design Hints
from abc import ABC, abstractmethod

class AsyncServiceFramework(ABC):
def __init__(self, config: dict | None = None):
self.config = config or self._load_config()
self._shutdown_event = asyncio.Event()
self._health = HealthChecker()
self._bg_tasks = BackgroundTaskManager()
self._metrics = MetricsCollector()

async def run(self):
"""Main entry point -- do not override."""
self._setup_signals()
self._setup_logging()

try:
async with AsyncExitStack() as stack:
# Let subclass initialize resources
await self.on_startup(stack)

# Start health checks
await self._bg_tasks.start_task(
"health", self._health.start()
)

# Let subclass run
await self.on_run()

# Wait for shutdown
await self._shutdown_event.wait()
await self.on_drain()
finally:
await self._bg_tasks.stop_all()
await self.on_shutdown()

@abstractmethod
async def on_startup(self, stack: AsyncExitStack):
"""Initialize resources. Use stack for lifecycle management."""

@abstractmethod
async def on_run(self):
"""Start serving / processing."""

async def on_drain(self):
"""Override to drain work queues before shutdown."""

async def on_shutdown(self):
"""Override for final cleanup."""

def register_health_check(self, name, check):
self._health.register(name, check)

async def start_background(self, name, coro):
await self._bg_tasks.start_task(name, coro)

# ... signal handling, logging setup, config loading

The subclass would look like:

class MyService(AsyncServiceFramework):
async def on_startup(self, stack):
self.db = await stack.enter_async_context(create_db_pool())
self.register_health_check("db", self.db.ping)

async def on_run(self):
await self.start_background("processor", self.process_loop())

async def process_loop(self):
while True:
await asyncio.sleep(1)
# process work

if __name__ == "__main__":
MyService().run()

What's Next

Congratulations -- you have completed Module 3: Advanced Async & Concurrency. You now have the knowledge to build production-grade async systems in Python, from low-level protocol understanding (__await__, event loop internals) to high-level architecture (graceful shutdown, backpressure, testing).

Next, move on to Module 4: Performance Engineering, where you will learn how to profile, measure, and optimize Python code -- including async code -- for production performance targets.

© 2026 EngineersOfAI. All rights reserved.