Building an Async API Service
Reading time: ~40 minutes | Level: Intermediate → Engineering
Before reading further, consider this puzzle:
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()
# Version A
@app.get("/news/sync")
def get_news_sync():
import requests
r1 = requests.get("https://api.source1.com/headlines")
r2 = requests.get("https://api.source2.com/headlines")
r3 = requests.get("https://api.source3.com/headlines")
return r1.json() + r2.json() + r3.json()
# Version B
@app.get("/news/async")
async def get_news_async():
async with httpx.AsyncClient() as client:
r1, r2, r3 = await asyncio.gather(
client.get("https://api.source1.com/headlines"),
client.get("https://api.source2.com/headlines"),
client.get("https://api.source3.com/headlines"),
)
return r1.json() + r2.json() + r3.json()
Both routes return the same data. If each external API takes 300 ms to respond, how long does each version take under 100 concurrent users?
Version A takes approximately 3 × 300 ms = 900 ms per request, serially. Under 100 concurrent users with a typical 4-worker Gunicorn setup, most users wait in a queue. Version B completes all three calls in ~300 ms - the three awaits run concurrently via asyncio.gather. Under 100 concurrent users, a single Uvicorn worker handles them all without queuing because while one request awaits its three API calls, the event loop processes other requests.
This is not a marginal improvement. It is the difference between an API that struggles at moderate load and one that handles thousands of concurrent requests on a single process.
What You Will Learn
- How FastAPI's sync vs async routing decisions affect event loop health
- Why blocking database drivers destroy async API performance
- Using
asyncio.Semaphorefor rate-limiting outbound calls - FastAPI
BackgroundTasksvs Celery - when each is the right tool - Lifespan context managers for startup/shutdown resource management
asyncpgconnection pools and why pool reuse mattersasyncio.wait_for()for timeouts and circuit breaker patternrun_in_executor()to safely offload CPU-bound work from async routes- A complete async news aggregation API tying all patterns together
Prerequisites
- Module 08, Lessons 03–07 - asyncio fundamentals, the event loop, semaphores, and
ThreadPoolExecutor - Module 06, Lesson 04 - FastAPI routing, dependency injection, and Pydantic models
- Module 07 - database fundamentals; async DB access builds on sync concepts
Part 1 - FastAPI is Async-First: Sync vs Async Routes
FastAPI accepts both def and async def route handlers. The choice matters:
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()
# Synchronous handler - FastAPI runs this in a thread pool
# (via asyncio.run_in_executor internally)
@app.get("/sync-work")
def sync_handler():
time.sleep(1) # blocks, but only its thread - not the event loop
return {"status": "done"}
# Async handler - runs directly in the event loop
@app.get("/async-work")
async def async_handler():
await asyncio.sleep(1) # yields to event loop - other coroutines run
return {"status": "done"}
# THIS IS DANGEROUS - sync blocking inside async def
@app.get("/broken")
async def broken_handler():
time.sleep(1) # blocks the ENTIRE event loop for 1 second
return {"status": "done"}
The rule is clear but frequently violated:
| Handler type | Blocking call inside | Result |
|---|---|---|
def | time.sleep(), requests.get() | Safe - runs in thread pool |
async def | await asyncio.sleep(), await httpx... | Safe - yields to event loop |
async def | time.sleep(), requests.get() | Dangerous - blocks event loop |
async def does not make blocking calls non-blocking. It only allows you to await coroutines. If you call requests.get() or time.sleep() inside async def, you block the single thread that runs the event loop - no other request is processed until that call returns. This is the most common async performance bug in production FastAPI services.
When to Use def vs async def
Use async def when your handler:
- Makes HTTP calls with
httpx,aiohttp, or similar async clients - Queries a database with an async driver (
asyncpg,databases,SQLAlchemy async) - Reads or writes files with
aiofiles - Calls other
async deffunctions
Use def when your handler:
- Does CPU-bound work (data transformation, image processing, PDF generation)
- Must call a library that has no async API and cannot be easily wrapped
- Is a simple in-memory operation where the overhead of thread pool dispatch is acceptable
If you are unsure whether a dependency uses blocking I/O, profile it. Run the endpoint under load with tools like locust or wrk and watch CPU usage and event loop lag. A healthy async service shows low event loop latency even under high request volume. Unexplained latency spikes often trace to a single blocking call in an async def handler.
Part 2 - Async Database Access with asyncpg and SQLAlchemy Async
The most common mistake when migrating a synchronous API to async is keeping a synchronous database driver. Here is why this is catastrophic:
# DO NOT DO THIS - psycopg2 (sync) inside async handler
import psycopg2
from fastapi import FastAPI
app = FastAPI()
@app.get("/users/{user_id}")
async def get_user(user_id: int):
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cursor.fetchone()
conn.close()
return {"id": row[0], "name": row[1]}
psycopg2.connect() is a blocking system call - it performs TCP socket connection and PostgreSQL handshake synchronously. cursor.execute() is a blocking network round-trip. Every one of these calls parks the event loop thread. With 100 concurrent requests, you serialize 100 database round-trips through a single thread, eliminating all async benefit.
asyncpg - The Correct Approach
asyncpg is a PostgreSQL driver built from scratch for asyncio. Every operation is a proper coroutine:
import asyncpg
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from typing import AsyncGenerator
# Module-level pool - created once at startup, shared across all requests
_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _pool
# Startup: create connection pool
_pool = await asyncpg.create_pool(
dsn="postgresql://user:password@localhost/mydb",
min_size=5, # always keep 5 connections ready
max_size=20, # allow bursts up to 20 concurrent connections
max_queries=50_000, # recycle connection after 50k queries
max_inactive_connection_lifetime=300, # 5 minutes
)
yield
# Shutdown: close all pooled connections cleanly
await _pool.close()
app = FastAPI(lifespan=lifespan)
# Dependency: yields a connection from the pool for one request
async def get_db() -> AsyncGenerator[asyncpg.Connection, None]:
async with _pool.acquire() as conn:
yield conn
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
conn: asyncpg.Connection = Depends(get_db),
) -> dict:
row = await conn.fetchrow(
"SELECT id, name, email FROM users WHERE id = $1",
user_id,
)
if row is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="User not found")
return dict(row)
Key properties of asyncpg:
pool.acquire()is a coroutine \text{---} while waiting for a free connection, the event loop runs other coroutinesconn.fetchrow(),conn.fetch(),conn.execute()are all coroutines- The pool manages connection health, reconnection, and lifetime automatically
- Connections are returned to the pool when the
async withblock exits, even on exception
SQLAlchemy Async \text{---} For ORM-Based Projects
If your project uses SQLAlchemy models, use the async engine:
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
async_sessionmaker,
)
from sqlalchemy import select
from fastapi import FastAPI, Depends
from typing import AsyncGenerator
# Note the driver: postgresql+asyncpg (uses asyncpg under the hood)
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
pool_size=10,
max_overflow=20,
echo=False, # set True during development to log SQL
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # prevent lazy-load errors after commit
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.get("/articles/{article_id}")
async def get_article(
article_id: int,
db: AsyncSession = Depends(get_db),
) -> dict:
result = await db.execute(
select(Article).where(Article.id == article_id)
)
article = result.scalar_one_or_none()
if article is None:
raise HTTPException(status_code=404, detail="Article not found")
return ArticleResponse.model_validate(article).model_dump()
With SQLAlchemy async, never use lazy loading. Lazy relationships trigger synchronous I/O when accessed, which raises MissingGreenlet errors in async context. Declare all needed relationships as lazy="selectin" or use selectinload() / joinedload() in your query to load them eagerly.
Part 3 \text{---} asyncio.Semaphore for Rate Limiting Outbound Calls
When your API calls external services, you must control how many concurrent outbound calls you make. Without limits, 1000 concurrent requests to your API become 1000 simultaneous calls to an external service that may rate-limit you, become overwhelmed, or charge per-request.
import asyncio
import httpx
from fastapi import FastAPI
app = FastAPI()
# Global semaphore \text{---} at most 10 concurrent outbound calls at any time
_external_api_semaphore = asyncio.Semaphore(10)
async def fetch_with_rate_limit(client: httpx.AsyncClient, url: str) -> dict:
"""Fetch a URL, but only when a semaphore slot is available."""
async with _external_api_semaphore:
# At most 10 coroutines can be inside here simultaneously
# Additional callers wait here (but yield to event loop while waiting)
response = await client.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
@app.get("/aggregate")
async def aggregate_data() -> dict:
urls = [
"https://api.source1.com/data",
"https://api.source2.com/data",
"https://api.source3.com/data",
# ... potentially many more
]
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[fetch_with_rate_limit(client, url) for url in urls],
return_exceptions=True,
)
# Filter out failures, log them, return partial results
successes = [r for r in results if not isinstance(r, Exception)]
return {"results": successes, "total": len(successes)}
The semaphore pattern has a critical property: asyncio.Semaphore is not a thread lock. It is a counter that cooperative coroutines decrement and increment. When a coroutine waits to acquire a semaphore, it yields to the event loop \text{---} other requests proceed normally. There is no thread blocking.
Set semaphore limits based on the external service's documented rate limits and your measured latency. A common starting point: (rate_limit_per_second) × (expected_avg_response_time_in_seconds). For example, a service that allows 50 requests/second and responds in 200 ms → 50 × 0.2 = 10 concurrent connections is a reasonable ceiling. Monitor and tune.
Part 4 \text{---} Background Tasks in FastAPI
BackgroundTasks schedules work to run after the HTTP response has been sent to the client. The client gets a fast response; the server does the slow work afterward:
from fastapi import FastAPI, BackgroundTasks
import logging
import asyncio
app = FastAPI()
logger = logging.getLogger(__name__)
async def send_notification_email(user_email: str, article_title: str) -> None:
"""Runs after the response is sent. Client does not wait."""
logger.info("Sending notification to \%s for article: \%s", user_email, article_title)
# async email send via SMTP or email service API
await asyncio.sleep(0) # placeholder for actual async email logic
async def update_view_counter(article_id: int) -> None:
"""Increment view count asynchronously after serving the article."""
await _pool.execute(
"UPDATE articles SET view_count = view_count + 1 WHERE id = $1",
article_id,
)
@app.get("/articles/{article_id}")
async def get_article_with_tracking(
article_id: int,
background_tasks: BackgroundTasks,
conn: asyncpg.Connection = Depends(get_db),
) -> dict:
row = await conn.fetchrow(
"SELECT id, title, content, author_email FROM articles WHERE id = $1",
article_id,
)
if row is None:
raise HTTPException(status_code=404)
# Schedule background work \text{---} runs AFTER this function returns
background_tasks.add_task(update_view_counter, article_id)
background_tasks.add_task(
send_notification_email,
row["author_email"],
row["title"],
)
# Client receives this response immediately
return {"id": row["id"], "title": row["title"], "content": row["content"]}
BackgroundTasks vs Celery \text{---} When to Use Each
| Criterion | FastAPI BackgroundTasks | Celery / ARQ |
|---|---|---|
| Execution location | Same process, same event loop | Separate worker process(es) |
| Survives server restart | No \text{---} tasks in memory are lost | Yes \text{---} tasks persisted in broker |
| Visibility/monitoring | None (only logs) | Full \text{---} Flower, task state, retries |
| Suitable for | Short async work (< 1\text{--}2 s) | Long jobs, batch work, scheduled tasks |
| External dependencies | None \text{---} built into FastAPI | Redis or RabbitMQ broker required |
| Retry on failure | Manual | Built-in with configurable policy |
| Horizontal scaling | No | Yes \text{---} add more workers |
BackgroundTasks share the event loop with active request handlers. A background task that takes 10 seconds or performs CPU-intensive work degrades response time for all active requests during that period. For anything longer than a second or two \text{---} file processing, ML inference, bulk email \text{---} use Celery, ARQ, or another external task queue.
Part 5 \text{---} Lifespan Events for Resource Management
FastAPI's lifespan context manager is the correct place to create and destroy shared resources: database pools, HTTP client sessions, caches, and connection managers. Resources created in lifespan are available for the entire application lifetime and are cleaned up gracefully on shutdown.
import asyncio
import httpx
import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI
# Module-level singletons \text{---} set during lifespan startup
_db_pool: asyncpg.Pool | None = None
_http_client: httpx.AsyncClient | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _db_pool, _http_client
# ---- STARTUP ----
_db_pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/newsdb",
min_size=5,
max_size=25,
)
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=2.0, read=8.0, write=2.0, pool=None),
limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
)
print("Startup complete: DB pool and HTTP client ready")
yield # Application runs here
# ---- SHUTDOWN ----
await _http_client.aclose()
await _db_pool.close()
print("Shutdown complete: resources released")
app = FastAPI(lifespan=lifespan)
Before lifespan was introduced, developers used @app.on_event("startup") and @app.on_event("shutdown"). The lifespan pattern is superior because:
- Resources created before
yieldand cleaned up afteryieldare guaranteed to be paired \text{---} like a context manager - Exceptions during startup propagate cleanly without leaving partial resources open
- Testing is simpler:
TestClientruns the lifespan on enter and exit
Never create a new httpx.AsyncClient() inside a request handler. Creating a client opens a connection pool and is expensive. Creating one per request and immediately closing it defeats HTTP/1.1 keep-alive and TLS session reuse. Create a single client in lifespan and share it across all requests.
Part 6 \text{---} Connection Pooling in Async Context
Connection pooling in an async API is not just an optimization \text{---} it is a correctness requirement. Here is what happens without a pool:
Request 1 arrives → open DB connection → query → close connection
Request 2 arrives → open DB connection → query → close connection
...
1000 concurrent requests → 1000 simultaneous connection opens
PostgreSQL's default max_connections is 100. Opening 1000 connections simultaneously crashes the database server. With a pool:
Pool initialized at startup with min=5, max=20 connections
Request 1 arrives → acquire from pool (instant) → query → return to pool
Request 2 arrives → acquire from pool (instant) → query → return to pool
...
1000 concurrent requests → at most 20 DB connections open simultaneously
Requests 21-1000 wait (without blocking) for a connection to be returned
import asyncpg
from typing import AsyncGenerator
from fastapi import Depends
# Pool created in lifespan (see Part 5)
# _pool is the module-level Pool
async def get_conn() -> AsyncGenerator[asyncpg.Connection, None]:
"""
Dependency that acquires one connection from the pool.
The connection is returned to the pool when the request ends,
even if an exception was raised.
"""
async with _pool.acquire() as conn:
yield conn
# Pool-level operations (no per-request connection needed for simple queries)
async def get_article_count() -> int:
return await _pool.fetchval("SELECT COUNT(*) FROM articles")
# Use a transaction when multiple queries must be atomic
async def create_article_with_tags(
title: str, content: str, tag_ids: list[int],
conn: asyncpg.Connection = Depends(get_conn),
) -> int:
async with conn.transaction():
article_id = await conn.fetchval(
"INSERT INTO articles (title, content) VALUES ($1, $2) RETURNING id",
title, content,
)
await conn.executemany(
"INSERT INTO article_tags (article_id, tag_id) VALUES ($1, $2)",
[(article_id, tag_id) for tag_id in tag_ids],
)
return article_id
Pool Sizing Guidelines
The correct pool size is not "as large as possible." Oversized pools exhaust database connections and increase memory usage without performance benefit:
optimal_pool_size ≈ (number_of_cpu_cores_on_DB_server × 2) + number_of_effective_spindles
For a 4-core PostgreSQL server with SSD storage (1 effective spindle): 4 × 2 + 1 = 9. A pool of 10\text{--}15 connections is typically optimal for such a server. More connections add queueing overhead at the database level.
Part 7 \text{---} Timeout Handling and Circuit Breaker
Every external call in an async service must have a timeout. Without timeouts, a slow or unresponsive upstream can hold all your semaphore slots indefinitely, eventually making your entire API unresponsive.
asyncio.wait_for for Timeouts
import asyncio
import httpx
async def fetch_with_timeout(client: httpx.AsyncClient, url: str, timeout: float = 5.0) -> dict:
"""Fetch a URL and raise TimeoutError if it doesn't respond within timeout seconds."""
try:
return await asyncio.wait_for(
_fetch(client, url),
timeout=timeout,
)
except asyncio.TimeoutError:
raise TimeoutError(f"Request to {url} timed out after {timeout}s")
async def _fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url)
response.raise_for_status()
return response.json()
httpx.AsyncClient has its own timeout parameter (timeout=httpx.Timeout(...)). Using asyncio.wait_for on top gives you a second layer of timeout that also covers connection queueing time (waiting for a free connection in the pool). Both are useful; the asyncio.wait_for timeout is the outer bound the caller experiences.
Circuit Breaker Pattern
A circuit breaker prevents your service from hammering a failing upstream. After a threshold of consecutive failures, it "opens" and returns a cached or default response immediately, without even attempting the call:
import asyncio
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation \text{---} calls pass through
OPEN = "open" # Failing \text{---} calls blocked, return fallback
HALF_OPEN = "half_open" # Testing \text{---} one call allowed to check recovery
class CircuitBreaker:
def __init__(
self,
name: str,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time: float | None = None
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
return self._state
def record_success(self):
self._failure_count = 0
self._state = CircuitState.CLOSED
def record_failure(self):
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
async def call(self, coro):
"""Execute coro if circuit is closed/half-open; raise if open."""
if self.state == CircuitState.OPEN:
raise RuntimeError(f"Circuit {self.name} is OPEN \text{---} upstream is down")
try:
result = await coro
self.record_success()
return result
except Exception:
self.record_failure()
raise
# One circuit breaker per upstream service
_source1_breaker = CircuitBreaker("source1", failure_threshold=3, recovery_timeout=60.0)
_source2_breaker = CircuitBreaker("source2", failure_threshold=3, recovery_timeout=60.0)
async def fetch_source1(client: httpx.AsyncClient) -> list[dict]:
return await _source1_breaker.call(
fetch_with_timeout(client, "https://api.source1.com/headlines")
)
Part 8 \text{---} Mixing Sync and Async: run_in_executor
Some operations are inherently CPU-bound and cannot be made async. Parsing a large HTML document, computing a hash, running a regex over megabytes of text \text{---} these keep the CPU busy and cannot yield. But if you call them directly in async def, they block the event loop.
The solution: asyncio.get_event_loop().run_in_executor() offloads the work to a thread pool, freeing the event loop to handle other requests while the CPU work runs in a thread:
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import hashlib
# Thread pool for I/O-bound blocking calls (default executor)
# Process pool for CPU-bound work (bypasses GIL)
_process_pool = ProcessPoolExecutor(max_workers=4)
def _compute_checksum(content: bytes) -> str:
"""CPU-bound \text{---} runs in process pool to bypass the GIL."""
return hashlib.sha256(content).hexdigest()
def _parse_html_sync(html: str) -> dict:
"""CPU-bound HTML parsing \text{---} runs in thread pool."""
from html.parser import HTMLParser
# ... parsing logic ...
return {"title": "Extracted title", "word_count": 1234}
async def process_article(url: str, client: httpx.AsyncClient) -> dict:
# Network call \text{---} async, no blocking
response = await client.get(url)
html_bytes = response.content
loop = asyncio.get_event_loop()
# CPU-bound checksum in process pool (bypasses GIL)
checksum = await loop.run_in_executor(
_process_pool,
_compute_checksum,
html_bytes,
)
# CPU-bound HTML parsing in default thread pool
# (html.parser releases GIL intermittently \text{---} thread pool is fine)
parsed = await loop.run_in_executor(
None, # None = default thread pool executor
_parse_html_sync,
html_bytes.decode("utf-8", errors="replace"),
)
return {"url": url, "checksum": checksum, **parsed}
run_in_executor with the default thread pool is subject to the GIL. CPU-intensive Python code in threads gains no parallelism. For true CPU parallelism, use ProcessPoolExecutor (which spawns separate processes). Be mindful of data serialization overhead \text{---} large arguments are pickled when crossing process boundaries, which can negate the benefit for small inputs.
Part 9 \text{---} Full Example: Async News Aggregation API
The following is a complete, production-style async news aggregation service that applies every concept from this lesson.
Architecture
Implementation
# news_aggregator/main.py
import asyncio
import time
import logging
from contextlib import asynccontextmanager
from typing import Any
import asyncpg
import httpx
from fastapi import FastAPI, BackgroundTasks, HTTPException, Query
from pydantic import BaseModel
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Application state (set during lifespan)
# ---------------------------------------------------------------------------
_db_pool: asyncpg.Pool | None = None
_http_client: httpx.AsyncClient | None = None
_semaphore = asyncio.Semaphore(10) # max 10 concurrent external calls
# ---------------------------------------------------------------------------
# Circuit breakers \text{---} one per upstream
# ---------------------------------------------------------------------------
class CircuitBreaker:
def __init__(self, name: str, threshold: int = 5, timeout: float = 30.0):
self.name = name
self.threshold = threshold
self.timeout = timeout
self._failures = 0
self._opened_at: float | None = None
self._open = False
@property
def is_open(self) -> bool:
if self._open:
if time.monotonic() - self._opened_at > self.timeout:
self._open = False # half-open: allow one test call
return False
return self._open
def success(self):
self._failures = 0
self._open = False
def failure(self):
self._failures += 1
if self._failures >= self.threshold:
self._open = True
self._opened_at = time.monotonic()
logger.warning("Circuit breaker OPENED for \%s", self.name)
_breakers: dict[str, CircuitBreaker] = {
"bbc": CircuitBreaker("bbc"),
"ap": CircuitBreaker("ap"),
"reuters": CircuitBreaker("reuters"),
}
NEWS_API_URLS: dict[str, str] = {
"bbc": "https://newsapi.org/v2/top-headlines?sources=bbc-news",
"ap": "https://newsapi.org/v2/top-headlines?sources=associated-press",
"reuters": "https://newsapi.org/v2/top-headlines?sources=reuters",
}
# ---------------------------------------------------------------------------
# Lifespan \text{---} startup and shutdown
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
global _db_pool, _http_client
_db_pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/newsdb",
min_size=5,
max_size=20,
)
await _db_pool.execute("""
CREATE TABLE IF NOT EXISTS news_cache (
cache_key TEXT PRIMARY KEY,
data JSONB NOT NULL,
cached_at DOUBLE PRECISION NOT NULL
)
""")
await _db_pool.execute("""
CREATE TABLE IF NOT EXISTS fetch_stats (
id BIGSERIAL PRIMARY KEY,
sources TEXT NOT NULL,
duration_ms INTEGER NOT NULL,
fetched_at TIMESTAMPTZ DEFAULT NOW()
)
""")
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=2.0, read=6.0, write=2.0, pool=None),
)
logger.info("Startup complete")
yield
await _http_client.aclose()
await _db_pool.close()
logger.info("Shutdown complete")
app = FastAPI(title="Async News Aggregator", lifespan=lifespan)
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class Article(BaseModel):
source: str
title: str
url: str
published_at: str | None = None
class NewsResponse(BaseModel):
sources_requested: list[str]
sources_succeeded: list[str]
sources_failed: list[str]
articles: list[Article]
cached: bool
fetched_in_ms: int
# ---------------------------------------------------------------------------
# Core fetch logic
# ---------------------------------------------------------------------------
async def fetch_source(source: str) -> list[dict]:
"""Fetch one news source, respecting circuit breaker and semaphore."""
breaker = _breakers.get(source)
if breaker and breaker.is_open:
raise RuntimeError(f"Circuit breaker OPEN for {source}")
url = NEWS_API_URLS[source]
async with _semaphore:
try:
resp = await asyncio.wait_for(
_http_client.get(url),
timeout=5.0,
)
resp.raise_for_status()
data = resp.json()
if breaker:
breaker.success()
return data.get("articles", [])
except Exception as exc:
if breaker:
breaker.failure()
raise
async def get_cached_news(cache_key: str, ttl_seconds: int = 120) -> list[dict] | None:
"""Return cached articles if fresh, else None."""
row = await _db_pool.fetchrow(
"SELECT data, cached_at FROM news_cache WHERE cache_key = $1",
cache_key,
)
if row and (time.time() - row["cached_at"]) < ttl_seconds:
import json
return json.loads(row["data"])
return None
async def set_cached_news(cache_key: str, articles: list[dict]) -> None:
import json
await _db_pool.execute(
"""
INSERT INTO news_cache (cache_key, data, cached_at)
VALUES ($1, $2, $3)
ON CONFLICT (cache_key) DO UPDATE
SET data = EXCLUDED.data, cached_at = EXCLUDED.cached_at
""",
cache_key, json.dumps(articles), time.time(),
)
async def log_fetch_stats(sources: str, duration_ms: int) -> None:
"""Background task: write fetch metrics to DB."""
await _db_pool.execute(
"INSERT INTO fetch_stats (sources, duration_ms) VALUES ($1, $2)",
sources, duration_ms,
)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/news", response_model=NewsResponse)
async def get_news(
background_tasks: BackgroundTasks,
sources: list[str] = Query(
default=["bbc", "ap", "reuters"],
description="News sources to aggregate",
),
) -> NewsResponse:
valid_sources = [s for s in sources if s in NEWS_API_URLS]
if not valid_sources:
raise HTTPException(status_code=400, detail="No valid sources specified")
cache_key = ",".join(sorted(valid_sources))
start_ms = int(time.time() * 1000)
# Check cache first
cached = await get_cached_news(cache_key)
if cached is not None:
return NewsResponse(
sources_requested=valid_sources,
sources_succeeded=valid_sources,
sources_failed=[],
articles=[Article(**a) for a in cached],
cached=True,
fetched_in_ms=int(time.time() * 1000) - start_ms,
)
# Fetch all sources concurrently
results = await asyncio.gather(
*[fetch_source(s) for s in valid_sources],
return_exceptions=True,
)
succeeded: list[str] = []
failed: list[str] = []
all_articles: list[dict] = []
for source, result in zip(valid_sources, results):
if isinstance(result, Exception):
logger.warning("Source %s failed: %s", source, result)
failed.append(source)
else:
succeeded.append(source)
for article in result:
all_articles.append({
"source": source,
"title": article.get("title", ""),
"url": article.get("url", ""),
"published_at": article.get("publishedAt"),
})
if all_articles:
await set_cached_news(cache_key, all_articles)
duration_ms = int(time.time() * 1000) - start_ms
background_tasks.add_task(log_fetch_stats, cache_key, duration_ms)
return NewsResponse(
sources_requested=valid_sources,
sources_succeeded=succeeded,
sources_failed=failed,
articles=[Article(**a) for a in all_articles],
cached=False,
fetched_in_ms=duration_ms,
)
@app.get("/health")
async def health_check() -> dict:
"""Health endpoint - shows DB connectivity and circuit breaker states."""
try:
await asyncio.wait_for(_db_pool.fetchval("SELECT 1"), timeout=2.0)
db_status = "ok"
except Exception as e:
db_status = f"error: {e}"
return {
"db": db_status,
"circuit_breakers": {
name: "OPEN" if cb.is_open else "CLOSED"
for name, cb in _breakers.items()
},
}
Graded Practice
Level 1 - Predict the Behavior
Given this route:
@app.get("/articles")
async def list_articles(page: int = 1, size: int = 20):
import time
time.sleep(2) # simulating "slow database query"
return {"page": page, "size": size}
Under 50 concurrent requests, what is the approximate p50 response time? What about p99?
Show Answer
Despite async def, this handler blocks the event loop for 2 seconds each time it runs. Because time.sleep() is synchronous, only one request can execute at a time - all 50 concurrent requests are serialized through the single event loop thread.
- p50: approximately 25 × 2 s = 50 seconds (the median request waits for half the queue ahead of it)
- p99: approximately 49 × 2 s = 98 seconds
The fix: replace time.sleep(2) with await asyncio.sleep(2) (or with a real async DB call). With asyncio.sleep, all 50 requests run concurrently and complete in approximately 2 seconds each - p50 and p99 are both ~2 seconds.
Level 2 - Debug This Code
A developer builds this news endpoint:
_cache: dict = {}
@app.get("/news")
async def get_news(source: str):
if source in _cache:
return _cache[source]
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.newssite.com/{source}")
_cache[source] = response.json()
return _cache[source]
Identify three production problems with this code.
Show Answer
Problem 1: New httpx.AsyncClient per request.
Creating an AsyncClient inside a request handler opens a new connection pool, performs TCP and TLS handshakes on every request, and creates overhead. The client is destroyed immediately after the request, wasting all setup cost. Fix: create a single client in lifespan and reuse it.
Problem 2: No TTL on the cache.
Entries in _cache live forever. News from 6 hours ago is served as if it were current. The cache also grows unboundedly - if source can be any string, malicious or misconfigured callers can exhaust memory. Fix: store (data, timestamp) tuples and check freshness on read. Use a size-bounded cache (e.g., functools.lru_cache with maxsize, or a dedicated in-memory structure).
Problem 3: No timeout on the external call.
If api.newssite.com becomes slow or unresponsive, the await client.get(...) can hang indefinitely. Under load, all request handlers will eventually be suspended waiting for this call, and the service becomes unresponsive. Fix: await asyncio.wait_for(client.get(...), timeout=5.0) and handle asyncio.TimeoutError.
Level 3 - Design Challenge
You are building an async API that aggregates data from five external partners. Two of them (Partners A and B) are critical - their data must always be present in the response. Three (C, D, E) are optional - if they fail, return partial data. The combined SLA for your API is 500 ms p99.
Design the fetch strategy: which calls are concurrent, which have timeouts, and how do you differentiate critical vs optional failures? Sketch the code structure.
Show Answer
import asyncio
import httpx
CRITICAL_PARTNERS = {"A": "https://api.a.com/data", "B": "https://api.b.com/data"}
OPTIONAL_PARTNERS = {
"C": "https://api.c.com/data",
"D": "https://api.d.com/data",
"E": "https://api.e.com/data",
}
async def fetch_one(client: httpx.AsyncClient, name: str, url: str, timeout: float) -> dict:
try:
resp = await asyncio.wait_for(client.get(url), timeout=timeout)
resp.raise_for_status()
return {"partner": name, "data": resp.json(), "ok": True}
except Exception as exc:
return {"partner": name, "error": str(exc), "ok": False}
@app.get("/aggregate")
async def aggregate(background_tasks: BackgroundTasks):
async with httpx.AsyncClient() as client:
# Critical partners: tight timeout (300 ms leaves 200 ms buffer for our logic)
critical_results = await asyncio.gather(
*[fetch_one(client, n, u, timeout=0.3) for n, u in CRITICAL_PARTNERS.items()]
)
# Check critical failures before spending time on optional calls
failed_critical = [r for r in critical_results if not r["ok"]]
if failed_critical:
names = [r["partner"] for r in failed_critical]
raise HTTPException(
status_code=503,
detail=f"Critical partners unavailable: {names}",
)
# Optional partners: slightly looser timeout, failures are tolerated
optional_results = await asyncio.gather(
*[fetch_one(client, n, u, timeout=0.2) for n, u in OPTIONAL_PARTNERS.items()],
return_exceptions=True,
)
return {
"critical": {r["partner"]: r["data"] for r in critical_results},
"optional": {
r["partner"]: r.get("data") if r["ok"] else None
for r in optional_results
if isinstance(r, dict)
},
}
The key design decisions:
- Critical calls run first (or truly concurrently if SLA allows). Their failure immediately raises
503- no point calling optional partners. - Optional calls run with
return_exceptions=True- exceptions are captured, not propagated. - Timeouts are set conservatively below the SLA boundary to leave processing time.
- The entire optional partner batch runs concurrently; even if all five each take 200 ms, the total for the optional step is ~200 ms (not 1000 ms).
Key Takeaways
async defis a contract, not magic. Declaring a handlerasync defonly means it can useawait. Any blocking call inside an async handler blocks the event loop for every concurrent request. Profile your handlers and eliminate everytime.sleep,requests.get, and synchronous database call.- Database drivers must match your concurrency model.
psycopg2andsqlite3block. In an async FastAPI service, useasyncpg,databases, orSQLAlchemy async(create_async_engine+AsyncSession). The driver choice is the most impactful async decision you make. asyncio.Semaphoreis your rate limiter. Use it to bound concurrent outbound calls, whether to external APIs or internal services. A semaphore set correctly prevents you from overwhelming upstream dependencies and avoids rate-limit errors.- Create shared resources in
lifespan, reuse them across requests.asyncpg.create_pool()andhttpx.AsyncClient()are expensive to construct. One pool or client shared across thousands of requests is the correct pattern. asyncio.gather(..., return_exceptions=True)is the correct way to run concurrent fallible operations. It collects exceptions as values rather than propagating the first failure, giving you fine-grained control over which failures are critical and which are acceptable.asyncio.wait_for()is mandatory for all external calls. Without timeouts, a slow upstream holds your coroutine suspended indefinitely, consuming a semaphore slot and contributing to cascading failure.- Circuit breakers stop the cascade. After
Nconsecutive failures, open the circuit and return a fast fallback instead of attempting the call. Half-open state allows periodic recovery probes. BackgroundTasksfor fast async fire-and-forget; Celery or ARQ for everything else. Logging a metric, sending a lightweight webhook, incrementing a counter - these are background task jobs. Processing a video, generating a report, sending 10,000 emails - use a proper task queue.run_in_executor()bridges sync and async safely. CPU-bound work belongs inProcessPoolExecutor, not in the event loop thread. The executor call itself is awaitable - the event loop continues serving other requests while the process pool works.
What's Next
You have now built the foundational async engineering skills. The Module 08 projects apply everything together at production scale:
- Project 01 - Concurrent Web Scraper: apply
ThreadPoolExecutorandasynciowith semaphores, retry, and structured output to scrape URLs in parallel - Project 02 - Async Data Aggregation API: build the full FastAPI async aggregation service with caching, rate limiting, circuit breaking, and background refresh
