Project 02 - Async Data Aggregation API
Objective
Build an async FastAPI service that calls three or more external APIs concurrently, aggregates their responses, and returns a combined result - with a TTL cache, background refresh, circuit breakers for failing upstreams, and a health check endpoint. The service must remain responsive even when one or more upstreams are slow or failing.
This project is the capstone of the concurrency module. It demands that you combine every async pattern you have studied: asyncio.gather, Semaphore, wait_for, lifespan events, BackgroundTasks, and circuit breakers - into a coherent, production-quality service.
What to Build
Service Overview
Your API aggregates data from three upstream sources (you choose what they represent - weather, news, financial data, sports scores, etc.). For each request to your /aggregate endpoint, the service:
- Checks an in-memory TTL cache - if fresh data exists, returns it immediately
- If stale or missing, calls all three upstreams concurrently via
asyncio.gather - Applies a
Semaphoreto cap concurrent outbound connections - Applies
asyncio.wait_forto enforce per-upstream timeouts - Checks circuit breakers - if an upstream has failed
Ntimes recently, skips it and returns a fallback - Stores the result in cache and schedules a background refresh task
- Returns the aggregated result - partial data from working upstreams is acceptable
API Endpoints
| Method | Path | Description |
|---|---|---|
GET | /aggregate | Return aggregated data from all upstreams |
GET | /aggregate/{source} | Return data from a single named upstream |
POST | /cache/invalidate | Clear the cache for all or a specific key |
GET | /health | Show per-upstream status, circuit breaker states, cache stats, DB connectivity |
Technical Requirements
Requirement 1 - All Routes Must Be Async
Every route handler must be declared async def. Any utility function that performs I/O must be a coroutine awaited with await. No synchronous blocking calls (time.sleep, requests.get, synchronous DB drivers) anywhere in the hot path.
# CORRECT
@app.get("/aggregate")
async def aggregate() -> AggregateResponse:
data = await fetch_all_sources()
return AggregateResponse(sources=data)
# WRONG - blocks the event loop
@app.get("/aggregate")
async def aggregate() -> AggregateResponse:
import requests
data = requests.get("https://upstream.example.com/api").json() # BLOCKS
return AggregateResponse(sources=data)
Requirement 2 - Concurrent Upstream Calls with asyncio.gather
Call all upstream APIs concurrently. Use asyncio.gather(..., return_exceptions=True) so a failure in one upstream does not prevent the others from completing:
results = await asyncio.gather(
fetch_source_a(client),
fetch_source_b(client),
fetch_source_c(client),
return_exceptions=True,
)
Process the results list - separate successes from exceptions, log failures, and include only successful data in the response.
Requirement 3 - Semaphore for Rate Limiting
A module-level asyncio.Semaphore must cap the number of concurrent outbound HTTP connections. The limit must be configurable (e.g., SEMAPHORE_LIMIT=10 from environment or config). Every fetch_source_* call must acquire the semaphore before making the HTTP request:
_semaphore = asyncio.Semaphore(10)
async def fetch_source_a(client: httpx.AsyncClient) -> dict:
async with _semaphore:
response = await asyncio.wait_for(
client.get(SOURCE_A_URL),
timeout=5.0,
)
response.raise_for_status()
return response.json()
Requirement 4 - Per-Upstream Timeouts
Every upstream call must have a timeout enforced with asyncio.wait_for. The timeout must be configurable per upstream (some may be consistently slower). An asyncio.TimeoutError must be caught and converted to a structured error, not allowed to propagate as an unhandled exception:
try:
data = await asyncio.wait_for(fetch_source_a(client), timeout=SOURCE_A_TIMEOUT)
except asyncio.TimeoutError:
logger.warning("Source A timed out after %ss", SOURCE_A_TIMEOUT)
data = None # or a cached fallback value
Requirement 5 - In-Memory TTL Cache
Implement a simple TTL cache that stores the last aggregated result per cache key. A request within the TTL receives the cached response without calling any upstream:
# cache.py
import time
from dataclasses import dataclass, field
from typing import Any
@dataclass
class CacheEntry:
data: Any
cached_at: float = field(default_factory=time.monotonic)
def is_fresh(self, ttl_seconds: float) -> bool:
return (time.monotonic() - self.cached_at) < ttl_seconds
class TTLCache:
def __init__(self, ttl_seconds: float = 60.0):
self.ttl = ttl_seconds
self._store: dict[str, CacheEntry] = {}
def get(self, key: str) -> Any | None:
entry = self._store.get(key)
if entry and entry.is_fresh(self.ttl):
return entry.data
return None # expired or absent
def set(self, key: str, data: Any) -> None:
self._store[key] = CacheEntry(data=data)
def invalidate(self, key: str | None = None) -> None:
if key:
self._store.pop(key, None)
else:
self._store.clear()
def stats(self) -> dict:
total = len(self._store)
fresh = sum(1 for e in self._store.values() if e.is_fresh(self.ttl))
return {"total_entries": total, "fresh_entries": fresh, "stale_entries": total - fresh}
The default TTL is 60 seconds but must be configurable.
Requirement 6 - Background Cache Refresh
When a cache hit is served but the entry is older than half the TTL, schedule a background task to refresh it proactively. The client receives the fast cached response; the refresh happens after the response is sent:
@app.get("/aggregate")
async def aggregate(background_tasks: BackgroundTasks) -> AggregateResponse:
cache_key = "aggregate_all"
cached = _cache.get(cache_key)
if cached is not None:
age = time.monotonic() - _cache._store[cache_key].cached_at
if age > (_cache.ttl / 2):
# Cache is aging - refresh proactively in background
background_tasks.add_task(refresh_cache, cache_key)
return AggregateResponse(**cached, cached=True)
# Cache miss - fetch now
data = await fetch_all_sources()
_cache.set(cache_key, data)
return AggregateResponse(**data, cached=False)
async def refresh_cache(cache_key: str) -> None:
"""Runs after response is sent. Refreshes the cache entry."""
try:
data = await fetch_all_sources()
_cache.set(cache_key, data)
logger.info("Background cache refresh complete for key: %s", cache_key)
except Exception as exc:
logger.error("Background cache refresh failed: %s", exc)
Requirement 7 - Circuit Breaker per Upstream
Each upstream must have its own circuit breaker. After FAILURE_THRESHOLD consecutive failures (configurable, default 5), the circuit opens. While open, calls to that upstream are skipped immediately and a fallback is returned. After RECOVERY_TIMEOUT seconds (default 30), the circuit enters half-open state and allows one probe call:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, name: str, threshold: int = 5, recovery_timeout: float = 30.0):
self.name = name
self.threshold = threshold
self.recovery_timeout = recovery_timeout
self._failures = 0
self._state = CircuitState.CLOSED
self._opened_at: float | None = None
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._opened_at >= self.recovery_timeout:
return CircuitState.HALF_OPEN
return self._state
def record_success(self) -> None:
self._failures = 0
self._state = CircuitState.CLOSED
def record_failure(self) -> None:
self._failures += 1
if self._failures >= self.threshold:
if self._state != CircuitState.OPEN:
self._opened_at = time.monotonic()
logger.warning("Circuit OPENED for upstream: %s", self.name)
self._state = CircuitState.OPEN
async def call(self, coro) -> Any:
if self.state == CircuitState.OPEN:
raise RuntimeError(f"Circuit OPEN for {self.name} - skipping call")
try:
result = await coro
self.record_success()
return result
except Exception:
self.record_failure()
raise
Requirement 8 - Health Check Endpoint
GET /health must return:
{
"status": "ok",
"db": "ok",
"cache": {
"total_entries": 3,
"fresh_entries": 2,
"stale_entries": 1
},
"upstreams": {
"source_a": {
"circuit_state": "closed",
"consecutive_failures": 0,
"last_success_ago_seconds": 42.1
},
"source_b": {
"circuit_state": "open",
"consecutive_failures": 6,
"opened_ago_seconds": 12.4,
"recovers_in_seconds": 17.6
},
"source_c": {
"circuit_state": "closed",
"consecutive_failures": 0,
"last_success_ago_seconds": 42.1
}
}
}
The top-level "status" is "ok" if the DB is reachable and at least one upstream is healthy. It is "degraded" if some upstreams are down. It is "error" if the DB is unreachable.
Requirement 9 - Lifespan Resource Management
All shared resources - the async HTTP client and database connection pool - must be created in a lifespan context manager and destroyed on shutdown:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=2.0, read=10.0, write=2.0, pool=None),
limits=httpx.Limits(max_connections=50),
)
app.state.db_pool = await asyncpg.create_pool(
dsn=settings.DATABASE_URL,
min_size=3,
max_size=15,
)
yield
# Shutdown
await app.state.http_client.aclose()
await app.state.db_pool.close()
app = FastAPI(lifespan=lifespan)
Aggregation Flow
Data Models
# models.py
from pydantic import BaseModel
from typing import Any
class UpstreamResult(BaseModel):
source: str
data: Any | None
error: str | None = None
duration_ms: int
class AggregateResponse(BaseModel):
results: list[UpstreamResult]
sources_succeeded: list[str]
sources_failed: list[str]
sources_skipped: list[str] # circuit breaker open
cached: bool
cache_age_seconds: float | None
class HealthResponse(BaseModel):
status: str # "ok" | "degraded" | "error"
db: str
cache: dict
upstreams: dict[str, dict]
class CacheInvalidateRequest(BaseModel):
key: str | None = None # None means invalidate all
Configuration
Use environment variables or a config.py module. Do not hardcode URLs, timeouts, or limits:
# config.py
import os
class Settings:
# Upstream URLs
SOURCE_A_URL: str = os.getenv("SOURCE_A_URL", "https://api.source-a.example.com/data")
SOURCE_B_URL: str = os.getenv("SOURCE_B_URL", "https://api.source-b.example.com/data")
SOURCE_C_URL: str = os.getenv("SOURCE_C_URL", "https://api.source-c.example.com/data")
# Timeouts (seconds)
SOURCE_A_TIMEOUT: float = float(os.getenv("SOURCE_A_TIMEOUT", "5.0"))
SOURCE_B_TIMEOUT: float = float(os.getenv("SOURCE_B_TIMEOUT", "5.0"))
SOURCE_C_TIMEOUT: float = float(os.getenv("SOURCE_C_TIMEOUT", "5.0"))
# Concurrency
SEMAPHORE_LIMIT: int = int(os.getenv("SEMAPHORE_LIMIT", "10"))
# Cache
CACHE_TTL_SECONDS: float = float(os.getenv("CACHE_TTL_SECONDS", "60.0"))
# Circuit breaker
CIRCUIT_FAILURE_THRESHOLD: int = int(os.getenv("CIRCUIT_FAILURE_THRESHOLD", "5"))
CIRCUIT_RECOVERY_TIMEOUT: float = float(os.getenv("CIRCUIT_RECOVERY_TIMEOUT", "30.0"))
# Database
DATABASE_URL: str = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/aggdb")
settings = Settings()
Acceptance Criteria
Your implementation passes when all of the following are true:
-
GET /aggregatereturns data from all working upstreams in a single response, with all three HTTP calls made concurrently (verify by checking that total response time is close tomax(individual_upstream_latencies), not their sum). -
GET /aggregatecalled twice within the TTL returnscached: trueon the second call without making any outbound HTTP requests. (Verify by mocking the HTTP client and asserting it was not called.) - When one upstream is configured to always fail (return 500 or raise an exception),
GET /aggregatestill returns a200response with data from the other two upstreams. - When one upstream is configured to always time out, the route returns within
(timeout + 0.5)seconds - not hanging indefinitely. (Verify withasyncio.wait_foron the test request itself.) - After
CIRCUIT_FAILURE_THRESHOLDconsecutive failures for one upstream, the circuit opens and subsequent calls to/aggregatedo not attempt to contact that upstream (verify by asserting the HTTP client is not called for that URL while the circuit is open). - After
CIRCUIT_RECOVERY_TIMEOUTseconds, the circuit enters half-open state and the next request probes the upstream. If the probe succeeds, the circuit closes. -
GET /healthreturns200with correctcircuit_statefor each upstream. -
GET /healthreturns"status": "degraded"when at least one circuit is open but DB is reachable. -
POST /cache/invalidatewith no body clears all cache entries. A subsequentGET /aggregatecall fetches fresh data. -
POST /cache/invalidatewith{"key": "aggregate_all"}clears only that key. - All route handlers are
async def- nodefhandlers except for Pydantic validators. - No blocking calls (
time.sleep,requests.get, synchronous DB calls) anywhere in the async call path. (Verify with a slow upstream - event loop should remain responsive to other requests.) - The service starts up and shuts down cleanly -
lifespancreates and closes the DB pool and HTTP client without errors. - All outbound calls use the shared
httpx.AsyncClientfromapp.state, not per-request client instances.
Hints
How do I test that upstream calls are truly concurrent and not sequential?
Mock all three upstreams to each sleep for 1 second before responding. If they are called sequentially, the total time is ~3 seconds. If concurrent, ~1 second.
import asyncio
import time
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_upstream_calls_are_concurrent():
call_times = []
async def slow_fetch(source: str):
call_times.append(time.monotonic())
await asyncio.sleep(1.0)
return {"source": source, "data": {}}
with patch("aggregator.fetcher.fetch_source_a", side_effect=lambda: slow_fetch("a")), \
patch("aggregator.fetcher.fetch_source_b", side_effect=lambda: slow_fetch("b")), \
patch("aggregator.fetcher.fetch_source_c", side_effect=lambda: slow_fetch("c")):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
start = time.monotonic()
response = await client.get("/aggregate")
elapsed = time.monotonic() - start
assert response.status_code == 200
# If concurrent, all three start within ~100ms of each other
assert max(call_times) - min(call_times) < 0.1
# Total time is ~1s, not ~3s
assert elapsed < 1.5
How do I verify that the circuit breaker prevents calls to a failing upstream?
@pytest.mark.asyncio
async def test_circuit_breaker_stops_calls_after_threshold():
from aggregator.fetcher import _breakers
from unittest.mock import AsyncMock, patch
# Reset the circuit breaker
_breakers["source_a"]._failures = 0
_breakers["source_a"]._state = CircuitState.CLOSED
call_count = 0
async def always_fail():
nonlocal call_count
call_count += 1
raise RuntimeError("upstream down")
with patch("aggregator.fetcher.fetch_source_a", side_effect=always_fail):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Trigger failures up to threshold
for _ in range(settings.CIRCUIT_FAILURE_THRESHOLD):
await client.get("/aggregate")
calls_before_open = call_count
assert _breakers["source_a"].state == CircuitState.OPEN
# Now the circuit is open - subsequent calls should NOT invoke fetch_source_a
await client.get("/aggregate")
await client.get("/aggregate")
assert call_count == calls_before_open # no new calls after circuit opened
How do I test the background cache refresh without waiting for it?
FastAPI's BackgroundTasks runs immediately after the response handler in test mode when using TestClient. With httpx.AsyncClient, you may need to manually await pending tasks or check side effects:
@pytest.mark.asyncio
async def test_background_refresh_scheduled_when_cache_aging():
from aggregator.cache import _cache
import time
# Prime the cache with an entry that is 55s old (beyond TTL/2 of 30s)
_cache._store["aggregate_all"] = CacheEntry(
data={"results": []},
cached_at=time.monotonic() - 55,
)
refresh_called = False
async def mock_refresh(key: str):
nonlocal refresh_called
refresh_called = True
with patch("aggregator.main.refresh_cache", side_effect=mock_refresh):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get("/aggregate")
assert response.json()["cached"] is True
assert refresh_called is True
How do I simulate a slow upstream without actually waiting in tests?
Use asyncio.sleep in your mock and verify total elapsed time with asyncio.wait_for:
@pytest.mark.asyncio
async def test_timeout_does_not_hang_the_route():
async def hanging_fetch():
await asyncio.sleep(100) # simulates an unresponsive upstream
with patch("aggregator.fetcher.fetch_source_b", side_effect=hanging_fetch):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Route should complete within timeout + 1s grace, not hang indefinitely
response = await asyncio.wait_for(
client.get("/aggregate"),
timeout=settings.SOURCE_B_TIMEOUT + 1.0,
)
assert response.status_code == 200
# Source B should appear in sources_failed
assert "source_b" in response.json()["sources_failed"]
How do I store aggregated results in the database for history and auditing?
Add an AggregateLog table and write to it in a background task (so it does not affect response latency):
async def log_aggregate_result(
db_pool: asyncpg.Pool,
sources_succeeded: list[str],
sources_failed: list[str],
duration_ms: int,
cached: bool,
) -> None:
"""Background task: persist aggregation metadata for monitoring."""
await db_pool.execute(
"""
INSERT INTO aggregate_log
(sources_succeeded, sources_failed, duration_ms, cached, logged_at)
VALUES ($1, $2, $3, $4, NOW())
""",
sources_succeeded,
sources_failed,
duration_ms,
cached,
)
# In your route:
background_tasks.add_task(
log_aggregate_result,
app.state.db_pool,
response.sources_succeeded,
response.sources_failed,
response.fetched_in_ms,
response.cached,
)
This keeps your route fast (DB write happens after response) while maintaining a complete audit trail for debugging and SLA monitoring.
How do I configure and run the service with uvicorn?
# run.py
import uvicorn
from aggregator.main import app
if __name__ == "__main__":
uvicorn.run(
"aggregator.main:app",
host="0.0.0.0",
port=8000,
reload=True, # hot-reload during development
workers=1, # single worker - async handles concurrency
log_level="info",
)
In production, run with multiple workers only if your application state (cache, circuit breakers) is stored externally (Redis, PostgreSQL). In-memory state is not shared between worker processes. For this project, a single Uvicorn worker is correct.
Extension Challenges
These are optional. Attempt them only after all acceptance criteria pass.
Extension A - Redis Cache
Replace the in-memory TTLCache with a Redis-backed cache using redis.asyncio:
import redis.asyncio as redis
class RedisCache:
def __init__(self, client: redis.Redis, ttl_seconds: int = 60):
self._client = client
self.ttl = ttl_seconds
async def get(self, key: str) -> dict | None:
import json
raw = await self._client.get(key)
return json.loads(raw) if raw else None
async def set(self, key: str, data: dict) -> None:
import json
await self._client.setex(key, self.ttl, json.dumps(data))
async def invalidate(self, key: str | None = None) -> None:
if key:
await self._client.delete(key)
else:
await self._client.flushdb()
The Redis cache survives service restarts and is shared across multiple Uvicorn workers, enabling horizontal scaling. Add redis://localhost:6379 as a configurable REDIS_URL environment variable and create the Redis client in lifespan.
Extension B - Webhook Notifications on Upstream Failures
When a circuit breaker opens (an upstream transitions from CLOSED to OPEN), fire a webhook to a configurable URL with details about the failure:
async def notify_circuit_opened(
http_client: httpx.AsyncClient,
upstream_name: str,
failure_count: int,
webhook_url: str,
) -> None:
payload = {
"event": "circuit_opened",
"upstream": upstream_name,
"failure_count": failure_count,
"opened_at": time.time(),
}
try:
await asyncio.wait_for(
http_client.post(webhook_url, json=payload),
timeout=3.0,
)
except Exception as exc:
logger.error("Webhook notification failed: %s", exc)
Wire this into CircuitBreaker.record_failure when the state transitions to OPEN. Use FastAPI BackgroundTasks to send the webhook without blocking the request path.
Extension C - Per-Source Endpoint with Passthrough
Add GET /aggregate/{source} that calls a single named upstream directly, bypasses the global cache, and returns raw data. This is useful for debugging a specific upstream in isolation:
- The circuit breaker must still apply - if the circuit is open for the requested source, return
503 Service Unavailablewith aRetry-Afterheader indicating when the circuit will half-open. - The per-source result must still be subject to the semaphore (do not bypass rate limiting).
- Successful responses from this endpoint should update the circuit breaker's success counter and contribute to closing an open circuit.
