Asyncio and Async/Await
Reading time: ~35 minutes | Level: Intermediate → Engineering
Before reading further, predict the timing:
import requests
import time
URLS = [f"https://httpbin.org/delay/0.1" for _ in range(100)]
# Version A - synchronous
start = time.perf_counter()
responses = [requests.get(url) for url in URLS]
sync_time = time.perf_counter() - start
print(f"Sync: {sync_time:.1f}s")
# ?
# Version B - asyncio (same machine, same network, no extra threads)
import asyncio
import aiohttp
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return responses
start = time.perf_counter()
asyncio.run(fetch_all(URLS))
async_time = time.perf_counter() - start
print(f"Async: {async_time:.1f}s")
# ?
Show Answer
Typical output:
Sync: 10.3s
Async: 0.3s
Why? The synchronous version makes each request and waits for the response before starting the next one. Each request takes ~100ms. 100 requests × 100ms = 10 seconds. The CPU is idle 99% of the time - it is waiting for network data.
The asyncio version starts all 100 requests almost simultaneously. While request 1 is waiting for the server's response, the event loop starts requests 2, 3, 4 ... 100. All 100 are in-flight at once. The total time is dominated by the single longest request (~100ms), not the sum.
No threads, no processes, no GIL tricks. A single Python thread, a single event loop, 100x faster. This is the power of cooperative concurrency.
The gap between 10 seconds and 0.3 seconds on the same hardware is not magic - it is the insight that most I/O-bound programs spend almost all their time waiting, and asyncio eliminates that wasted waiting by doing something else while each request is in-flight.
What You Will Learn
- What async/await actually is - cooperative multitasking, not parallelism
- Coroutines vs regular functions - what
async defcreates - What
awaitdoes at the event loop level - suspend, yield, resume asyncio.run()- the entry point and what it does internallyasyncio.create_task()- running coroutines concurrently, theTaskobjectasyncio.gather()- fan-out and fan-in, exception handling modesasyncio.wait()andasyncio.wait_for()- timeouts and first-completed patterns- Async context managers -
async with,__aenter__,__aexit__ - Async iterators -
async for,__aiter__,__anext__ - The cardinal sin of asyncio - blocking the event loop with synchronous calls
- How to run synchronous code safely with
loop.run_in_executor()
Prerequisites
- Lesson 01 (Threading) and Lesson 02 (Multiprocessing) - understand the concurrency landscape
- Python generators and
yield- coroutines are built on the same protocol - Context managers (
withstatement and__enter__/__exit__)
Part 1 - What Async/Await Actually Is
Cooperative Multitasking
asyncio is not parallelism. No two coroutines ever run at the exact same instant on the same thread. asyncio is cooperative concurrency - coroutines take turns on a single thread, voluntarily yielding control at await points.
The key word is cooperative: a coroutine must explicitly await something to give control back to the event loop. If a coroutine runs a tight CPU loop or calls a blocking function without yielding, no other coroutine can run until it finishes. This is the cooperative contract.
Concurrency vs Parallelism
| Threads | Processes | asyncio | |
|---|---|---|---|
| Multiple tasks at once? | Yes (OS scheduled) | Yes (true parallel) | Yes (cooperative) |
| True parallelism? | No (GIL) / Yes (I/O) | Yes | No - single thread |
| Memory overhead | Medium (~8MB per thread) | High (~50MB per process) | Minimal (~a few KB per task) |
| Max concurrent tasks | ~thousands | ~hundreds | ~tens of thousands |
| Switching mechanism | OS preemptive | OS preemptive | Python cooperative (await) |
| Best for | I/O-bound, blocking APIs | CPU-bound Python | I/O-bound, massive concurrency |
asyncio scales where threads cannot - a server handling 50,000 simultaneous WebSocket connections would need 50,000 threads (~400GB RAM) but only one event loop thread with asyncio.
Part 2 - async def and Coroutines
What async def Creates
import asyncio
import inspect
def regular_function(x):
return x * 2
async def coroutine_function(x):
await asyncio.sleep(0) # yield control to event loop
return x * 2
# A regular function returns its value immediately
result = regular_function(5)
print(type(result)) # <class 'int'>
print(result) # 10
# An async function returns a COROUTINE OBJECT - not the result
coro = coroutine_function(5)
print(type(coro)) # <class 'coroutine'>
print(inspect.iscoroutine(coro)) # True
# The coroutine hasn't run yet - calling it only creates the object
# To run it, you must await it or schedule it as a Task
result = asyncio.run(coro)
print(result) # 10
:::warning Calling an async def Function Does Not Run It
async def fetch() creates a coroutine factory. Calling fetch() returns a coroutine object - the function body has not executed at all. You must await the coroutine (inside another coroutine) or pass it to asyncio.run() / asyncio.create_task(). Forgetting await is a silent bug: the coroutine object is created and immediately garbage-collected.
async def save_data(data):
await asyncio.sleep(0.1) # simulates DB write
print("saved!")
async def main():
save_data({"key": "value"}) # BUG: coroutine created, not awaited
# "saved!" is never printed
# Python 3.11+ emits: RuntimeWarning: coroutine 'save_data' was never awaited
await save_data({"key": "value"}) # CORRECT
:::
What await Does
await expr does three things:
- Evaluates
exprto get an awaitable (coroutine, Task, or Future) - Suspends the current coroutine and yields control back to the event loop
- Resumes the coroutine when the awaitable completes, with the result as the value of the
awaitexpression
import asyncio
async def slow_network_call(url: str) -> str:
print(f" starting request to {url}")
await asyncio.sleep(1.0) # suspend here; event loop runs other tasks
print(f" response received from {url}")
return f"data from {url}"
async def main():
print("Step 1: start")
result = await slow_network_call("https://api.example.com")
# coroutine suspends here for ~1 second
# event loop could run other tasks during that time
# then resumes here with result = "data from https://api.example.com"
print(f"Step 2: got {result}")
print("Step 3: done")
asyncio.run(main())
Output:
Step 1: start
starting request to https://api.example.com
response received from https://api.example.com
Step 2: got data from https://api.example.com
Step 3: done
With a single await, execution is sequential (no other tasks are running). The power of asyncio comes from running multiple coroutines concurrently - covered in Parts 4 and 5.
Part 3 - asyncio.run() - The Entry Point
asyncio.run() is the standard way to launch an asyncio program. It:
- Creates a new event loop
- Runs the given coroutine until it completes
- Cancels any remaining tasks
- Closes the event loop
- Returns the coroutine's result
import asyncio
async def greet(name: str) -> str:
await asyncio.sleep(0.1)
return f"Hello, {name}!"
# asyncio.run() - use this in scripts and __main__ blocks
result = asyncio.run(greet("world"))
print(result) # Hello, world!
# asyncio.run() creates and closes a fresh event loop each time
# Do NOT call asyncio.run() from inside a running event loop
# (e.g., inside Jupyter - use await directly or asyncio.get_event_loop())
:::note One asyncio.run() Per Program
asyncio.run() is designed to be called once as the entry point. Calling it multiple times creates and destroys a new event loop each time - this is wasteful and usually wrong. Structure your async code as a single async def main() function and call asyncio.run(main()) once.
In Jupyter notebooks, the event loop is already running. Use await coroutine() directly in cells, or use asyncio.get_event_loop().run_until_complete().
:::
Part 4 - asyncio.create_task() - True Concurrency
A bare await runs coroutines sequentially. To run them concurrently, wrap them in Task objects with asyncio.create_task():
import asyncio
import time
async def fetch_data(name: str, delay: float) -> dict:
print(f"[{name}] starting (will take {delay}s)")
await asyncio.sleep(delay) # simulates network/DB latency
print(f"[{name}] complete")
return {"source": name, "value": delay * 100}
async def sequential():
"""Sequential: total time = sum of all delays."""
start = time.perf_counter()
r1 = await fetch_data("users", 1.0)
r2 = await fetch_data("products", 0.8)
r3 = await fetch_data("orders", 1.2)
print(f"Sequential: {time.perf_counter() - start:.2f}s") # ~3.0s
async def concurrent():
"""Concurrent: total time = max of all delays."""
start = time.perf_counter()
# create_task() schedules the coroutine immediately
# The coroutine starts running as soon as the event loop gets control
task1 = asyncio.create_task(fetch_data("users", 1.0))
task2 = asyncio.create_task(fetch_data("products", 0.8))
task3 = asyncio.create_task(fetch_data("orders", 1.2))
# Await each task - results arrive in input order
r1 = await task1
r2 = await task2
r3 = await task3
print(f"Concurrent: {time.perf_counter() - start:.2f}s") # ~1.2s
return [r1, r2, r3]
asyncio.run(sequential())
asyncio.run(concurrent())
Output:
[users] starting (will take 1.0s)
[products] starting (will take 0.8s) <- all start almost simultaneously
[orders] starting (will take 1.2s)
[products] complete <- finishes first (0.8s)
[users] complete <- finishes second (1.0s)
[orders] complete <- finishes last (1.2s)
Concurrent: 1.20s <- only as long as the slowest
The Task Object
import asyncio
async def counter(name: str, n: int) -> int:
total = 0
for i in range(n):
total += i
await asyncio.sleep(0) # yield to event loop each iteration
return total
async def main():
task = asyncio.create_task(counter("my_task", 1000), name="counter-task")
print(f"Task name: {task.get_name()}")
print(f"Task running: {not task.done()}")
result = await task
print(f"Task done: {task.done()}")
print(f"Task result: {result}")
print(f"Task exception: {task.exception()}") # None - no exception raised
asyncio.run(main())
Task API at a glance:
| Method / Property | What it does |
|---|---|
task.done() | True if finished (success, error, or cancelled) |
task.result() | Returns result; raises if exception or cancelled |
task.exception() | Returns the exception if one was raised; None otherwise |
task.cancel() | Request cancellation - raises CancelledError in the coroutine |
task.cancelled() | True if the task was cancelled |
task.get_name() | The task's name (set via create_task(..., name=...)) |
task.add_done_callback(fn) | Register a callback called when the task finishes |
Part 5 - asyncio.gather() - Fan-Out and Fan-In
asyncio.gather() is the primary tool for running multiple coroutines concurrently and collecting their results:
import asyncio
async def fetch(url: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Response from {url} (after {delay}s)"
async def main():
# gather() starts all coroutines concurrently and waits for ALL to finish
# Returns results in the SAME ORDER as the inputs - regardless of completion order
results = await asyncio.gather(
fetch("https://api.users.com", 1.0),
fetch("https://api.products.com", 0.5),
fetch("https://api.orders.com", 0.8),
)
for r in results:
print(r)
# Response from https://api.users.com (after 1.0s) - index 0, even though slowest
# Response from https://api.products.com (after 0.5s) - index 1
# Response from https://api.orders.com (after 0.8s) - index 2
asyncio.run(main())
Exception Handling in gather()
import asyncio
async def may_fail(n: int) -> int:
await asyncio.sleep(0.1 * n)
if n == 2:
raise ValueError(f"task {n} failed")
return n * 100
async def gather_default():
"""By default, gather() raises the FIRST exception."""
try:
results = await asyncio.gather(
may_fail(1),
may_fail(2), # raises ValueError
may_fail(3),
)
except ValueError as e:
print(f"gather default: caught {e}")
async def gather_return_exceptions():
"""return_exceptions=True: exceptions returned as values, not raised."""
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3),
return_exceptions=True,
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"task {i}: FAILED - {result}")
else:
print(f"task {i}: OK - {result}")
# task 0: OK - 100
# task 1: FAILED - task 2 failed
# task 2: OK - 300
asyncio.run(gather_default())
asyncio.run(gather_return_exceptions())
:::tip Use return_exceptions=True When Tasks Are Independent
If you have a batch of API calls where one failing should not cancel the others, use return_exceptions=True. This is the correct approach for fan-out operations: collect all results including errors, then inspect each one individually.
:::
Part 6 - asyncio.wait() and asyncio.wait_for()
asyncio.wait() - Fine-Grained Control
asyncio.wait() gives you more control than gather() - you can wait for the first completed task, handle results as they arrive, or apply per-batch timeouts:
import asyncio
async def job(n: int, delay: float) -> int:
await asyncio.sleep(delay)
return n
async def main():
tasks = {
asyncio.create_task(job(1, 0.5)),
asyncio.create_task(job(2, 1.0)),
asyncio.create_task(job(3, 1.5)),
asyncio.create_task(job(4, 0.2)),
}
# FIRST_COMPLETED: returns after the first task finishes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"First done: {[t.result() for t in done]}") # [4] - fastest job
print(f"Still pending: {len(pending)}") # 3
# Wait for the rest with a timeout
done2, pending2 = await asyncio.wait(pending, timeout=0.8)
print(f"Done after extra 0.8s: {[t.result() for t in done2]}")
# Cancel tasks that did not finish in time
for task in pending2:
task.cancel()
asyncio.run(main())
return_when options:
| Constant | Behavior |
|---|---|
asyncio.FIRST_COMPLETED | Return as soon as any task finishes |
asyncio.FIRST_EXCEPTION | Return when any task raises an exception |
asyncio.ALL_COMPLETED | Return when all tasks are done (default) |
asyncio.wait_for() - Per-Task Timeout
import asyncio
async def slow_api_call() -> str:
await asyncio.sleep(5.0) # simulates a slow API
return "response"
async def main():
try:
result = await asyncio.wait_for(slow_api_call(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("API call timed out after 2 seconds")
asyncio.run(main())
# Production pattern: timeout with exponential backoff retry
import asyncio
async def fetch_with_retry(url: str, timeout: float, retries: int = 3) -> str:
for attempt in range(1, retries + 1):
try:
result = await asyncio.wait_for(do_fetch(url), timeout=timeout)
return result
except asyncio.TimeoutError:
print(f"Attempt {attempt}/{retries} timed out")
if attempt < retries:
await asyncio.sleep(0.5 * attempt) # exponential backoff
raise RuntimeError(f"All {retries} attempts to {url} failed")
async def do_fetch(url: str) -> str:
await asyncio.sleep(1.0) # placeholder implementation
return f"data from {url}"
Part 7 - Async Context Managers
Async context managers allow __aenter__ and __aexit__ to be coroutines, enabling resources that require async setup/teardown - database connections, HTTP sessions, file streams:
import asyncio
class AsyncDatabaseConnection:
"""A simulated async database connection."""
def __init__(self, dsn: str) -> None:
self.dsn = dsn
self._conn = None
async def __aenter__(self):
print(f"Connecting to {self.dsn}...")
await asyncio.sleep(0.1) # simulates async connection handshake
self._conn = {"dsn": self.dsn, "connected": True}
print("Connected.")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection...")
await asyncio.sleep(0.05) # simulates async teardown
self._conn = None
print("Closed.")
return False # do not suppress exceptions
async def query(self, sql: str) -> list:
if not self._conn:
raise RuntimeError("Not connected")
await asyncio.sleep(0.1) # simulates query execution time
return [{"result": sql}]
async def main():
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
rows = await db.query("SELECT * FROM users LIMIT 10")
print(f"Got {len(rows)} rows")
# Connection is automatically closed here, even if an exception occurred
asyncio.run(main())
asynccontextmanager - The Decorator Shortcut
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_session(base_url: str):
"""Async context manager using the generator-based decorator."""
print(f"Opening session for {base_url}")
session = {"base_url": base_url, "active": True}
try:
yield session # caller gets the session object here
except Exception as exc:
print(f"Session error: {exc}")
raise
finally:
print("Closing session")
session["active"] = False
async def main():
async with managed_session("https://api.example.com") as session:
print(f"Session active: {session['active']}")
await asyncio.sleep(0.1) # simulated request
asyncio.run(main())
Real-world async context managers you will encounter regularly:
aiohttp.ClientSession()- HTTP session with connection poolingasyncpg.create_pool()- PostgreSQL async connection poolaiofiles.open()- async file I/Oasyncio.timeout()(Python 3.11+) - timeout as a context manager
Part 8 - Async Iterators
Async iterators allow iteration over data sources that require async operations per item - paginated APIs, database cursors, message queues, WebSocket streams:
import asyncio
class AsyncPagedAPI:
"""Simulates a paginated API that requires an async request per page."""
def __init__(self, total_pages: int) -> None:
self.total_pages = total_pages
self.current_page = 0
def __aiter__(self):
return self # self is both the iterator and the iterable
async def __anext__(self) -> list[dict]:
if self.current_page >= self.total_pages:
raise StopAsyncIteration # signals end of iteration
self.current_page += 1
await asyncio.sleep(0.1) # simulates API request per page
items = [
{"id": (self.current_page - 1) * 10 + i, "page": self.current_page}
for i in range(10)
]
return items
async def main():
api = AsyncPagedAPI(total_pages=5)
total_items = 0
async for page in api:
total_items += len(page)
print(f"Received page with {len(page)} items (total so far: {total_items})")
print(f"Done. Total items: {total_items}") # 50
asyncio.run(main())
Async Generator Functions - The Simpler Way
import asyncio
async def stream_events(source_url: str):
"""Async generator that yields events from a WebSocket-style stream."""
for event_id in range(10):
await asyncio.sleep(0.05) # simulates waiting for the next event
yield {"event_id": event_id, "source": source_url, "data": f"event-{event_id}"}
async def main():
async for event in stream_events("wss://stream.example.com/events"):
print(f"Received: {event['event_id']} - {event['data']}")
asyncio.run(main())
Part 9 - Blocking the Event Loop
This is the cardinal sin of asyncio. The event loop is single-threaded. If any coroutine calls a blocking function - one that holds the thread without yielding - every other coroutine freezes until it returns.
time.sleep() vs asyncio.sleep()
import asyncio
import time
async def bad_worker(name: str, delay: float):
"""BAD: time.sleep() blocks the entire event loop."""
print(f"[{name}] starting")
time.sleep(delay) # BLOCKS the thread - nothing else can run
print(f"[{name}] done")
async def good_worker(name: str, delay: float):
"""GOOD: asyncio.sleep() yields control to the event loop."""
print(f"[{name}] starting")
await asyncio.sleep(delay) # suspends; event loop runs other tasks
print(f"[{name}] done")
async def demo_bad():
print("--- BAD (blocking) ---")
start = time.perf_counter()
await asyncio.gather(
bad_worker("A", 1.0),
bad_worker("B", 1.0),
bad_worker("C", 1.0),
)
print(f"Elapsed: {time.perf_counter() - start:.1f}s") # ~3.0s - sequential!
async def demo_good():
print("--- GOOD (async) ---")
start = time.perf_counter()
await asyncio.gather(
good_worker("A", 1.0),
good_worker("B", 1.0),
good_worker("C", 1.0),
)
print(f"Elapsed: {time.perf_counter() - start:.1f}s") # ~1.0s - concurrent!
asyncio.run(demo_bad())
asyncio.run(demo_good())
:::danger Never Call Blocking Functions in Async Code
time.sleep(), requests.get(), open().read() on large files, any synchronous database driver, subprocess.run() - these block the thread. The event loop stops. All other tasks freeze. Your server stops accepting new connections. Latency spikes for every user. This is the single most common asyncio mistake in production systems.
# WRONG - blocks the event loop while waiting for the database
async def get_user(user_id: int) -> dict:
import psycopg2 # synchronous driver
conn = psycopg2.connect("postgresql://localhost/db")
cursor = conn.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return dict(cursor.fetchone())
# CORRECT - use an async driver that yields to the event loop
async def get_user(user_id: int) -> dict:
async with pool.acquire() as conn: # asyncpg or SQLAlchemy async
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(row)
:::
Running Synchronous Code Safely - loop.run_in_executor()
When you must call a blocking function - a legacy library, a CPU-intensive task, a file operation without async support - offload it to a thread or process pool with run_in_executor():
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import requests # synchronous - safe only when run via executor
def sync_http_get(url: str) -> str:
"""Synchronous HTTP request - blocks its thread, not the event loop."""
response = requests.get(url, timeout=10)
return response.text
def cpu_heavy(n: int) -> int:
"""CPU-bound synchronous computation."""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
# Strategy 1: ThreadPoolExecutor - for I/O-bound blocking calls
with ThreadPoolExecutor(max_workers=10) as thread_pool:
result = await loop.run_in_executor(
thread_pool,
sync_http_get,
"https://httpbin.org/get"
)
print(f"HTTP result length: {len(result)}")
# Strategy 2: ProcessPoolExecutor - for CPU-bound blocking calls
with ProcessPoolExecutor(max_workers=4) as process_pool:
result = await loop.run_in_executor(
process_pool,
cpu_heavy,
1_000_000
)
print(f"CPU result: {result}")
# Strategy 3: Default thread pool (None = use loop's built-in ThreadPoolExecutor)
await loop.run_in_executor(None, time.sleep, 0.1)
print("sleep done via default thread pool")
asyncio.run(main())
Decision tree - how to call synchronous code from async:
Full Example - Async News Aggregator
A production-quality async news aggregator that fetches headlines from multiple APIs concurrently, applies per-source timeouts, handles failures gracefully, and deduplicates results:
"""
async_news_aggregator.py - Fetch news from multiple sources concurrently.
Demonstrates: asyncio.gather, asyncio.timeout, async context managers,
async generators, and loop.run_in_executor for CPU-bound parsing.
"""
from __future__ import annotations
import asyncio
import hashlib
import time
from dataclasses import dataclass, field
from typing import AsyncIterator
try:
import aiohttp
except ImportError:
raise SystemExit("Install aiohttp: pip install aiohttp")
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class Article:
title: str
url: str
source: str
description: str = ""
@property
def fingerprint(self) -> str:
"""Content hash for deduplication - identical articles across sources."""
content = f"{self.title.lower().strip()}{self.url.lower().strip()}"
return hashlib.md5(content.encode()).hexdigest()
@dataclass
class FetchResult:
source: str
articles: list[Article] = field(default_factory=list)
error: str | None = None
elapsed_ms: float = 0.0
@property
def success(self) -> bool:
return self.error is None
# ---------------------------------------------------------------------------
# Source definitions
# ---------------------------------------------------------------------------
NEWS_SOURCES = [
{
"name": "HackerNews",
"url": "https://hacker-news.firebaseio.com/v0/topstories.json",
"parser": "hackernews",
"timeout": 5.0,
},
{
"name": "DEV.to",
"url": "https://dev.to/api/articles?top=10&per_page=10",
"parser": "devto",
"timeout": 4.0,
},
{
"name": "GitHub",
"url": "https://api.github.com/search/repositories?q=stars:>1000&sort=stars&per_page=10",
"parser": "github",
"timeout": 6.0,
},
]
# ---------------------------------------------------------------------------
# Parsers - pure synchronous; offloaded to thread pool for large payloads
# ---------------------------------------------------------------------------
def parse_hackernews(data: list) -> list[Article]:
return [
Article(
title=f"HN Story #{story_id}",
url=f"https://news.ycombinator.com/item?id={story_id}",
source="HackerNews",
)
for story_id in (data[:10] if isinstance(data, list) else [])
]
def parse_devto(data: list) -> list[Article]:
return [
Article(
title=item.get("title", ""),
url=item.get("url", ""),
source="DEV.to",
description=item.get("description", ""),
)
for item in (data[:10] if isinstance(data, list) else [])
]
def parse_github(data: dict) -> list[Article]:
return [
Article(
title=repo.get("full_name", ""),
url=repo.get("html_url", ""),
source="GitHub",
description=repo.get("description", "") or "",
)
for repo in data.get("items", [])[:10]
]
PARSERS = {
"hackernews": parse_hackernews,
"devto": parse_devto,
"github": parse_github,
}
# ---------------------------------------------------------------------------
# Async fetcher - one coroutine per source, all exceptions contained
# ---------------------------------------------------------------------------
async def fetch_source(
session: aiohttp.ClientSession,
source: dict,
loop: asyncio.AbstractEventLoop,
) -> FetchResult:
"""Fetch one news source with a per-source timeout. Never raises."""
name = source["name"]
start = time.perf_counter()
try:
async with asyncio.timeout(source["timeout"]):
async with session.get(source["url"]) as resp:
resp.raise_for_status()
raw = await resp.json(content_type=None)
# Offload JSON parsing to thread pool - can be CPU-heavy for large responses
parser_fn = PARSERS[source["parser"]]
articles = await loop.run_in_executor(None, parser_fn, raw)
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(source=name, articles=articles, elapsed_ms=elapsed_ms)
except asyncio.TimeoutError:
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(
source=name,
error=f"Timed out after {source['timeout']}s",
elapsed_ms=elapsed_ms,
)
except aiohttp.ClientError as exc:
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(source=name, error=str(exc), elapsed_ms=elapsed_ms)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(source=name, error=f"Unexpected: {exc}", elapsed_ms=elapsed_ms)
# ---------------------------------------------------------------------------
# Async generator - lazy deduplication with cooperative yielding
# ---------------------------------------------------------------------------
async def stream_deduplicated(results: list[FetchResult]) -> AsyncIterator[Article]:
"""Yield deduplicated articles across all successful sources."""
seen: set[str] = set()
successful = sorted(
(r for r in results if r.success),
key=lambda r: r.source
)
for result in successful:
for article in result.articles:
fp = article.fingerprint
if fp not in seen:
seen.add(fp)
yield article
await asyncio.sleep(0) # cooperative yield - keeps event loop responsive
# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------
async def aggregate_news() -> None:
loop = asyncio.get_running_loop()
wall_start = time.perf_counter()
connector = aiohttp.TCPConnector(limit=20, ttl_dns_cache=300)
session_timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector, timeout=session_timeout) as session:
print(f"Fetching from {len(NEWS_SOURCES)} sources concurrently...\n")
# Fan-out: all sources fetch simultaneously
# fetch_source handles all exceptions internally - no need for return_exceptions
results: list[FetchResult] = await asyncio.gather(
*[fetch_source(session, src, loop) for src in NEWS_SOURCES],
)
# Per-source report
print("=== Source Results ===")
total_articles = 0
for result in results:
if result.success:
status = f"OK - {len(result.articles)} articles"
else:
status = f"FAILED - {result.error}"
print(f" {result.source:20s} {result.elapsed_ms:6.0f}ms {status}")
total_articles += len(result.articles)
# Fan-in: deduplicate and stream the best articles
print(f"\n=== Top Articles (deduplicated from {total_articles} total) ===")
count = 0
async for article in stream_deduplicated(results):
count += 1
print(f" [{count:2d}] [{article.source:12s}] {article.title[:60]}")
if count >= 15:
break
wall_elapsed = time.perf_counter() - wall_start
print(f"\nTotal wall time: {wall_elapsed:.2f}s")
if __name__ == "__main__":
asyncio.run(aggregate_news())
Design decisions in this example:
asyncio.gather()fans out to all sources simultaneously - total time equals the slowest source, not the sumasyncio.timeout()(Python 3.11+) applies per-source deadlines insidefetch_source- each source gets its own independent timeoutfetch_sourcereturnsFetchResultfor all outcomes including errors - one failing source does not affect the othersloop.run_in_executor(None, parser_fn, raw)offloads JSON parsing to the thread pool - keeps the event loop free during CPU-bound workstream_deduplicatedis an async generator withawait asyncio.sleep(0)after each yield - cooperatively gives the event loop a chance to process other events between articlesaiohttp.TCPConnector(limit=20)caps open TCP connections - prevents overwhelming remote servers and exhausting local file descriptors
Graded Practice Challenges
Beginner - Predict the Output
import asyncio
async def say(message: str, delay: float) -> str:
await asyncio.sleep(delay)
print(message)
return message
async def main():
# Version A - sequential
await say("hello", 0.2)
await say("world", 0.1)
print("A done")
# Version B - concurrent
result = await asyncio.gather(
say("hello", 0.2),
say("world", 0.1),
)
print("B done:", result)
asyncio.run(main())
What is the exact output, and in what order do the prints appear for each version?
Show Answer
Version A output - sequential (total ~0.3s):
hello <- after 0.2s
world <- after 0.3s total (0.2 + 0.1)
A done
Version A uses two sequential await calls. say("hello", 0.2) runs and completes first. Then say("world", 0.1) runs. Total: 0.3s.
Version B output - concurrent (total ~0.2s):
world <- after 0.1s (finishes first)
hello <- after 0.2s (finishes second)
B done: ['hello', 'world']
Version B uses gather(). Both coroutines start simultaneously. world (delay 0.1s) finishes first and prints first. hello (delay 0.2s) finishes second and prints second.
Key insight: gather() returns results in input order - ['hello', 'world'], not ['world', 'hello'] - even though world completed first. The print side-effect happens in completion order, but the return value preserves input order.
Intermediate - Fix the Bugs
This async web scraper has four bugs. Find and fix each one:
import asyncio
import time
import aiohttp
URLS = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]
async def fetch(session, url):
# Bug 1: synchronous sleep blocks the event loop
time.sleep(0.1)
async with session.get(url) as resp:
return await resp.text()
async def process(text):
# Bug 2: CPU-heavy loop blocks the event loop
result = 0
for i in range(5_000_000):
result += i
return result
async def main():
async with aiohttp.ClientSession() as session:
# Bug 3: sequential fetching wastes asyncio concurrency
results = []
for url in URLS:
text = await fetch(session, url)
results.append(text)
# Bug 4: coroutine not awaited - silent bug, no result computed
processed = process(results[0])
print(f"processed: {processed}")
asyncio.run(main())
Show Solution
Bug 1 - time.sleep() blocks the event loop:
async def fetch(session, url):
await asyncio.sleep(0.1) # FIXED: cooperative yield
async with session.get(url) as resp:
return await resp.text()
Bug 2 - CPU-bound work should be offloaded to an executor:
def _process_sync(text: str) -> int:
"""Module-level sync function - safe to run in a thread."""
result = 0
for i in range(5_000_000):
result += i
return result
async def process(text: str) -> int:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _process_sync, text)
Bug 3 - Sequential fetching throws away asyncio's benefit:
async def main():
async with aiohttp.ClientSession() as session:
# FIXED: fan-out all URLs at once
results = await asyncio.gather(
*[fetch(session, url) for url in URLS]
)
Bug 4 - Missing await on the coroutine call:
# FIXED: await the coroutine so it actually runs
processed = await process(results[0])
print(f"processed: {processed}")
Advanced - Design Challenge
Design an AsyncRateLimitedFetcher class that:
- Accepts
max_rps(max requests per second) andmax_concurrent(max simultaneous in-flight requests) - Has an async
fetch(url)method returning the response body as a string - Enforces the rate limit using
asyncio.Semaphorefor concurrency and a sliding-window approach for RPS - Collects metrics: total requests, succeeded, failed, average latency
- Works as an async context manager
Example usage:
async with AsyncRateLimitedFetcher(max_rps=10, max_concurrent=5) as fetcher:
results = await asyncio.gather(*[fetcher.fetch(url) for url in urls])
print(fetcher.metrics())
Show Reference Solution
from __future__ import annotations
import asyncio
import time
from collections import deque
from dataclasses import dataclass
try:
import aiohttp
except ImportError:
raise SystemExit("pip install aiohttp")
@dataclass
class FetchMetrics:
total: int = 0
succeeded: int = 0
failed: int = 0
total_latency_ms: float = 0.0
@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / self.succeeded if self.succeeded else 0.0
def __repr__(self) -> str:
return (
f"FetchMetrics(total={self.total}, succeeded={self.succeeded}, "
f"failed={self.failed}, avg_latency={self.avg_latency_ms:.1f}ms)"
)
class AsyncRateLimitedFetcher:
"""
Async HTTP fetcher with concurrency cap and RPS rate limiting.
- asyncio.Semaphore caps the number of in-flight requests
- Sliding-window algorithm enforces max requests per second
- Built-in metrics for observability
- Async context manager for clean session lifecycle
"""
def __init__(self, max_rps: float = 10.0, max_concurrent: int = 5) -> None:
self.max_rps = max_rps
self.max_concurrent = max_concurrent
self._semaphore = asyncio.Semaphore(max_concurrent)
self._request_times: deque[float] = deque() # sliding window timestamps
self._rps_lock = asyncio.Lock()
self._metrics = FetchMetrics()
self._session: aiohttp.ClientSession | None = None
async def __aenter__(self) -> "AsyncRateLimitedFetcher":
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
self._session = aiohttp.ClientSession(connector=connector)
return self
async def __aexit__(self, *exc) -> bool:
if self._session:
await self._session.close()
return False
async def _wait_for_rate_limit(self) -> None:
"""
Sliding window rate limiter.
Blocks (cooperatively) until making a new request stays within max_rps.
"""
async with self._rps_lock:
now = time.monotonic()
# Evict timestamps older than 1 second
while self._request_times and now - self._request_times[0] > 1.0:
self._request_times.popleft()
# If at the cap, sleep until the oldest timestamp exits the window
if len(self._request_times) >= self.max_rps:
sleep_time = 1.0 - (now - self._request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self._request_times.append(time.monotonic())
async def fetch(self, url: str) -> str:
"""
Fetch a URL. Respects rate and concurrency limits.
Returns response body as str. Raises on HTTP or network error.
"""
if not self._session:
raise RuntimeError("Use 'async with AsyncRateLimitedFetcher() as fetcher'")
await self._wait_for_rate_limit()
self._metrics.total += 1
start = time.perf_counter()
async with self._semaphore: # blocks if max_concurrent requests are in-flight
try:
async with self._session.get(
url,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
resp.raise_for_status()
body = await resp.text()
latency_ms = (time.perf_counter() - start) * 1000
self._metrics.succeeded += 1
self._metrics.total_latency_ms += latency_ms
return body
except Exception:
self._metrics.failed += 1
raise
def metrics(self) -> FetchMetrics:
"""Return current metrics snapshot."""
return self._metrics
# Demo
async def demo() -> None:
urls = [f"https://httpbin.org/get?id={i}" for i in range(20)]
async with AsyncRateLimitedFetcher(max_rps=5.0, max_concurrent=3) as fetcher:
print(f"Fetching {len(urls)} URLs (max 5 RPS, max 3 concurrent)...")
start = time.perf_counter()
results = await asyncio.gather(
*[fetcher.fetch(url) for url in urls],
return_exceptions=True,
)
elapsed = time.perf_counter() - start
succeeded = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done in {elapsed:.2f}s")
print(f"Succeeded: {succeeded}/{len(urls)}")
print(f"Metrics: {fetcher.metrics()}")
if __name__ == "__main__":
asyncio.run(demo())
Key design decisions:
asyncio.Semaphore(max_concurrent)caps in-flight requests without blocking the event loop -async with self._semaphorereleases automatically when the request finishes or raises- Sliding window uses a
dequeof timestamps and anasyncio.Lock- only one coroutine adjusts the window at a time; others queue cooperatively behind the lock __aenter__creates theaiohttp.ClientSessioninside the running event loop - sessions must not be created at module level or beforeasyncio.run()- Exceptions increment
failedand re-raise - the caller decides whether to swallow or propagate;return_exceptions=Truein the demo collects them without crashing metrics()returns theFetchMetricsdataclass directly - thread-safe reads on CPython (individual attribute reads are atomic); in a stricter multi-thread scenario you would copy the values under a lock
Key Takeaways
- asyncio is cooperative concurrency on a single thread - coroutines take turns; no two coroutines ever execute at the exact same instant; the OS never preempts them
async defcreates a coroutine factory - calling it returns a coroutine object, not a result; the function body does not execute until the coroutine is awaited or scheduled as aTask; forgettingawaitis a silent bugawait exprsuspends the current coroutine and yields to the event loop; the coroutine resumes whenexprcompletes - this is how thousands of concurrent I/O operations fit on one thread- Bare
awaitis always sequential; useasyncio.create_task()before awaiting to schedule coroutines for concurrent execution asyncio.gather()is the primary fan-out tool - starts all coroutines concurrently, returns results in input order regardless of completion order; usereturn_exceptions=Truewhen tasks are independentasyncio.wait_for(coro, timeout=N)cancels the coroutine and raisesasyncio.TimeoutErrorif it does not complete within N seconds - essential for production reliability- Async context managers (
async with) and async iterators (async for) integrateawaitinto resource management and iteration -__aenter__/__aexit__and__aiter__/__anext__are the underlying protocols - Calling any blocking function -
time.sleep,requests.get, synchronous DB drivers,subprocess.run- in async code freezes the entire event loop until the call returns; all other coroutines halt - Use
loop.run_in_executor()withThreadPoolExecutorfor I/O-bound blocking code, and withProcessPoolExecutorfor CPU-bound blocking code - this runs the blocking call off the event loop thread while the event loop continues serving other coroutines - asyncio scales to tens of thousands of concurrent tasks on a single thread, but provides no CPU parallelism - for CPU-bound work, combine
asynciowithProcessPoolExecutorviarun_in_executor()
What's Next
Lesson 04 - The Event Loop dives into what the event loop actually is under the hood - the selectors module, how asyncio.sleep() integrates with the OS's I/O polling (epoll on Linux, kqueue on macOS, IOCP on Windows), how Future objects connect I/O completion to coroutine resumption, and how to write custom event loop integrations. Understanding the event loop at this depth explains every asyncio behaviour you have encountered in this lesson.
