Skip to main content

Asyncio and Async/Await

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, predict the timing:

import requests
import time

URLS = [f"https://httpbin.org/delay/0.1" for _ in range(100)]

# Version A - synchronous
start = time.perf_counter()
responses = [requests.get(url) for url in URLS]
sync_time = time.perf_counter() - start
print(f"Sync: {sync_time:.1f}s")
# ?

# Version B - asyncio (same machine, same network, no extra threads)
import asyncio
import aiohttp

async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return responses

start = time.perf_counter()
asyncio.run(fetch_all(URLS))
async_time = time.perf_counter() - start
print(f"Async: {async_time:.1f}s")
# ?
Show Answer

Typical output:

Sync: 10.3s
Async: 0.3s

Why? The synchronous version makes each request and waits for the response before starting the next one. Each request takes ~100ms. 100 requests × 100ms = 10 seconds. The CPU is idle 99% of the time - it is waiting for network data.

The asyncio version starts all 100 requests almost simultaneously. While request 1 is waiting for the server's response, the event loop starts requests 2, 3, 4 ... 100. All 100 are in-flight at once. The total time is dominated by the single longest request (~100ms), not the sum.

No threads, no processes, no GIL tricks. A single Python thread, a single event loop, 100x faster. This is the power of cooperative concurrency.

The gap between 10 seconds and 0.3 seconds on the same hardware is not magic - it is the insight that most I/O-bound programs spend almost all their time waiting, and asyncio eliminates that wasted waiting by doing something else while each request is in-flight.

What You Will Learn

  • What async/await actually is - cooperative multitasking, not parallelism
  • Coroutines vs regular functions - what async def creates
  • What await does at the event loop level - suspend, yield, resume
  • asyncio.run() - the entry point and what it does internally
  • asyncio.create_task() - running coroutines concurrently, the Task object
  • asyncio.gather() - fan-out and fan-in, exception handling modes
  • asyncio.wait() and asyncio.wait_for() - timeouts and first-completed patterns
  • Async context managers - async with, __aenter__, __aexit__
  • Async iterators - async for, __aiter__, __anext__
  • The cardinal sin of asyncio - blocking the event loop with synchronous calls
  • How to run synchronous code safely with loop.run_in_executor()

Prerequisites

  • Lesson 01 (Threading) and Lesson 02 (Multiprocessing) - understand the concurrency landscape
  • Python generators and yield - coroutines are built on the same protocol
  • Context managers (with statement and __enter__/__exit__)

Part 1 - What Async/Await Actually Is

Cooperative Multitasking

asyncio is not parallelism. No two coroutines ever run at the exact same instant on the same thread. asyncio is cooperative concurrency - coroutines take turns on a single thread, voluntarily yielding control at await points.

The key word is cooperative: a coroutine must explicitly await something to give control back to the event loop. If a coroutine runs a tight CPU loop or calls a blocking function without yielding, no other coroutine can run until it finishes. This is the cooperative contract.

Concurrency vs Parallelism

ThreadsProcessesasyncio
Multiple tasks at once?Yes (OS scheduled)Yes (true parallel)Yes (cooperative)
True parallelism?No (GIL) / Yes (I/O)YesNo - single thread
Memory overheadMedium (~8MB per thread)High (~50MB per process)Minimal (~a few KB per task)
Max concurrent tasks~thousands~hundreds~tens of thousands
Switching mechanismOS preemptiveOS preemptivePython cooperative (await)
Best forI/O-bound, blocking APIsCPU-bound PythonI/O-bound, massive concurrency

asyncio scales where threads cannot - a server handling 50,000 simultaneous WebSocket connections would need 50,000 threads (~400GB RAM) but only one event loop thread with asyncio.

Part 2 - async def and Coroutines

What async def Creates

import asyncio
import inspect


def regular_function(x):
return x * 2


async def coroutine_function(x):
await asyncio.sleep(0) # yield control to event loop
return x * 2


# A regular function returns its value immediately
result = regular_function(5)
print(type(result)) # <class 'int'>
print(result) # 10

# An async function returns a COROUTINE OBJECT - not the result
coro = coroutine_function(5)
print(type(coro)) # <class 'coroutine'>
print(inspect.iscoroutine(coro)) # True

# The coroutine hasn't run yet - calling it only creates the object
# To run it, you must await it or schedule it as a Task

result = asyncio.run(coro)
print(result) # 10

:::warning Calling an async def Function Does Not Run It async def fetch() creates a coroutine factory. Calling fetch() returns a coroutine object - the function body has not executed at all. You must await the coroutine (inside another coroutine) or pass it to asyncio.run() / asyncio.create_task(). Forgetting await is a silent bug: the coroutine object is created and immediately garbage-collected.

async def save_data(data):
await asyncio.sleep(0.1) # simulates DB write
print("saved!")

async def main():
save_data({"key": "value"}) # BUG: coroutine created, not awaited
# "saved!" is never printed
# Python 3.11+ emits: RuntimeWarning: coroutine 'save_data' was never awaited

await save_data({"key": "value"}) # CORRECT

:::

What await Does

await expr does three things:

  1. Evaluates expr to get an awaitable (coroutine, Task, or Future)
  2. Suspends the current coroutine and yields control back to the event loop
  3. Resumes the coroutine when the awaitable completes, with the result as the value of the await expression
import asyncio


async def slow_network_call(url: str) -> str:
print(f" starting request to {url}")
await asyncio.sleep(1.0) # suspend here; event loop runs other tasks
print(f" response received from {url}")
return f"data from {url}"


async def main():
print("Step 1: start")
result = await slow_network_call("https://api.example.com")
# coroutine suspends here for ~1 second
# event loop could run other tasks during that time
# then resumes here with result = "data from https://api.example.com"
print(f"Step 2: got {result}")
print("Step 3: done")


asyncio.run(main())

Output:

Step 1: start
starting request to https://api.example.com
response received from https://api.example.com
Step 2: got data from https://api.example.com
Step 3: done

With a single await, execution is sequential (no other tasks are running). The power of asyncio comes from running multiple coroutines concurrently - covered in Parts 4 and 5.

Part 3 - asyncio.run() - The Entry Point

asyncio.run() is the standard way to launch an asyncio program. It:

  1. Creates a new event loop
  2. Runs the given coroutine until it completes
  3. Cancels any remaining tasks
  4. Closes the event loop
  5. Returns the coroutine's result
import asyncio


async def greet(name: str) -> str:
await asyncio.sleep(0.1)
return f"Hello, {name}!"


# asyncio.run() - use this in scripts and __main__ blocks
result = asyncio.run(greet("world"))
print(result) # Hello, world!

# asyncio.run() creates and closes a fresh event loop each time
# Do NOT call asyncio.run() from inside a running event loop
# (e.g., inside Jupyter - use await directly or asyncio.get_event_loop())

:::note One asyncio.run() Per Program asyncio.run() is designed to be called once as the entry point. Calling it multiple times creates and destroys a new event loop each time - this is wasteful and usually wrong. Structure your async code as a single async def main() function and call asyncio.run(main()) once.

In Jupyter notebooks, the event loop is already running. Use await coroutine() directly in cells, or use asyncio.get_event_loop().run_until_complete(). :::

Part 4 - asyncio.create_task() - True Concurrency

A bare await runs coroutines sequentially. To run them concurrently, wrap them in Task objects with asyncio.create_task():

import asyncio
import time


async def fetch_data(name: str, delay: float) -> dict:
print(f"[{name}] starting (will take {delay}s)")
await asyncio.sleep(delay) # simulates network/DB latency
print(f"[{name}] complete")
return {"source": name, "value": delay * 100}


async def sequential():
"""Sequential: total time = sum of all delays."""
start = time.perf_counter()
r1 = await fetch_data("users", 1.0)
r2 = await fetch_data("products", 0.8)
r3 = await fetch_data("orders", 1.2)
print(f"Sequential: {time.perf_counter() - start:.2f}s") # ~3.0s


async def concurrent():
"""Concurrent: total time = max of all delays."""
start = time.perf_counter()

# create_task() schedules the coroutine immediately
# The coroutine starts running as soon as the event loop gets control
task1 = asyncio.create_task(fetch_data("users", 1.0))
task2 = asyncio.create_task(fetch_data("products", 0.8))
task3 = asyncio.create_task(fetch_data("orders", 1.2))

# Await each task - results arrive in input order
r1 = await task1
r2 = await task2
r3 = await task3

print(f"Concurrent: {time.perf_counter() - start:.2f}s") # ~1.2s
return [r1, r2, r3]


asyncio.run(sequential())
asyncio.run(concurrent())

Output:

[users] starting (will take 1.0s)
[products] starting (will take 0.8s) <- all start almost simultaneously
[orders] starting (will take 1.2s)
[products] complete <- finishes first (0.8s)
[users] complete <- finishes second (1.0s)
[orders] complete <- finishes last (1.2s)
Concurrent: 1.20s <- only as long as the slowest

The Task Object

import asyncio


async def counter(name: str, n: int) -> int:
total = 0
for i in range(n):
total += i
await asyncio.sleep(0) # yield to event loop each iteration
return total


async def main():
task = asyncio.create_task(counter("my_task", 1000), name="counter-task")

print(f"Task name: {task.get_name()}")
print(f"Task running: {not task.done()}")

result = await task
print(f"Task done: {task.done()}")
print(f"Task result: {result}")
print(f"Task exception: {task.exception()}") # None - no exception raised


asyncio.run(main())

Task API at a glance:

Method / PropertyWhat it does
task.done()True if finished (success, error, or cancelled)
task.result()Returns result; raises if exception or cancelled
task.exception()Returns the exception if one was raised; None otherwise
task.cancel()Request cancellation - raises CancelledError in the coroutine
task.cancelled()True if the task was cancelled
task.get_name()The task's name (set via create_task(..., name=...))
task.add_done_callback(fn)Register a callback called when the task finishes

Part 5 - asyncio.gather() - Fan-Out and Fan-In

asyncio.gather() is the primary tool for running multiple coroutines concurrently and collecting their results:

import asyncio


async def fetch(url: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Response from {url} (after {delay}s)"


async def main():
# gather() starts all coroutines concurrently and waits for ALL to finish
# Returns results in the SAME ORDER as the inputs - regardless of completion order
results = await asyncio.gather(
fetch("https://api.users.com", 1.0),
fetch("https://api.products.com", 0.5),
fetch("https://api.orders.com", 0.8),
)

for r in results:
print(r)
# Response from https://api.users.com (after 1.0s) - index 0, even though slowest
# Response from https://api.products.com (after 0.5s) - index 1
# Response from https://api.orders.com (after 0.8s) - index 2


asyncio.run(main())

Exception Handling in gather()

import asyncio


async def may_fail(n: int) -> int:
await asyncio.sleep(0.1 * n)
if n == 2:
raise ValueError(f"task {n} failed")
return n * 100


async def gather_default():
"""By default, gather() raises the FIRST exception."""
try:
results = await asyncio.gather(
may_fail(1),
may_fail(2), # raises ValueError
may_fail(3),
)
except ValueError as e:
print(f"gather default: caught {e}")


async def gather_return_exceptions():
"""return_exceptions=True: exceptions returned as values, not raised."""
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3),
return_exceptions=True,
)

for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"task {i}: FAILED - {result}")
else:
print(f"task {i}: OK - {result}")

# task 0: OK - 100
# task 1: FAILED - task 2 failed
# task 2: OK - 300


asyncio.run(gather_default())
asyncio.run(gather_return_exceptions())

:::tip Use return_exceptions=True When Tasks Are Independent If you have a batch of API calls where one failing should not cancel the others, use return_exceptions=True. This is the correct approach for fan-out operations: collect all results including errors, then inspect each one individually. :::

Part 6 - asyncio.wait() and asyncio.wait_for()

asyncio.wait() - Fine-Grained Control

asyncio.wait() gives you more control than gather() - you can wait for the first completed task, handle results as they arrive, or apply per-batch timeouts:

import asyncio


async def job(n: int, delay: float) -> int:
await asyncio.sleep(delay)
return n


async def main():
tasks = {
asyncio.create_task(job(1, 0.5)),
asyncio.create_task(job(2, 1.0)),
asyncio.create_task(job(3, 1.5)),
asyncio.create_task(job(4, 0.2)),
}

# FIRST_COMPLETED: returns after the first task finishes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

print(f"First done: {[t.result() for t in done]}") # [4] - fastest job
print(f"Still pending: {len(pending)}") # 3

# Wait for the rest with a timeout
done2, pending2 = await asyncio.wait(pending, timeout=0.8)
print(f"Done after extra 0.8s: {[t.result() for t in done2]}")

# Cancel tasks that did not finish in time
for task in pending2:
task.cancel()


asyncio.run(main())

return_when options:

ConstantBehavior
asyncio.FIRST_COMPLETEDReturn as soon as any task finishes
asyncio.FIRST_EXCEPTIONReturn when any task raises an exception
asyncio.ALL_COMPLETEDReturn when all tasks are done (default)

asyncio.wait_for() - Per-Task Timeout

import asyncio


async def slow_api_call() -> str:
await asyncio.sleep(5.0) # simulates a slow API
return "response"


async def main():
try:
result = await asyncio.wait_for(slow_api_call(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("API call timed out after 2 seconds")


asyncio.run(main())
# Production pattern: timeout with exponential backoff retry
import asyncio


async def fetch_with_retry(url: str, timeout: float, retries: int = 3) -> str:
for attempt in range(1, retries + 1):
try:
result = await asyncio.wait_for(do_fetch(url), timeout=timeout)
return result
except asyncio.TimeoutError:
print(f"Attempt {attempt}/{retries} timed out")
if attempt < retries:
await asyncio.sleep(0.5 * attempt) # exponential backoff
raise RuntimeError(f"All {retries} attempts to {url} failed")


async def do_fetch(url: str) -> str:
await asyncio.sleep(1.0) # placeholder implementation
return f"data from {url}"

Part 7 - Async Context Managers

Async context managers allow __aenter__ and __aexit__ to be coroutines, enabling resources that require async setup/teardown - database connections, HTTP sessions, file streams:

import asyncio


class AsyncDatabaseConnection:
"""A simulated async database connection."""

def __init__(self, dsn: str) -> None:
self.dsn = dsn
self._conn = None

async def __aenter__(self):
print(f"Connecting to {self.dsn}...")
await asyncio.sleep(0.1) # simulates async connection handshake
self._conn = {"dsn": self.dsn, "connected": True}
print("Connected.")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection...")
await asyncio.sleep(0.05) # simulates async teardown
self._conn = None
print("Closed.")
return False # do not suppress exceptions

async def query(self, sql: str) -> list:
if not self._conn:
raise RuntimeError("Not connected")
await asyncio.sleep(0.1) # simulates query execution time
return [{"result": sql}]


async def main():
async with AsyncDatabaseConnection("postgresql://localhost/mydb") as db:
rows = await db.query("SELECT * FROM users LIMIT 10")
print(f"Got {len(rows)} rows")
# Connection is automatically closed here, even if an exception occurred


asyncio.run(main())

asynccontextmanager - The Decorator Shortcut

import asyncio
from contextlib import asynccontextmanager


@asynccontextmanager
async def managed_session(base_url: str):
"""Async context manager using the generator-based decorator."""
print(f"Opening session for {base_url}")
session = {"base_url": base_url, "active": True}
try:
yield session # caller gets the session object here
except Exception as exc:
print(f"Session error: {exc}")
raise
finally:
print("Closing session")
session["active"] = False


async def main():
async with managed_session("https://api.example.com") as session:
print(f"Session active: {session['active']}")
await asyncio.sleep(0.1) # simulated request


asyncio.run(main())

Real-world async context managers you will encounter regularly:

  • aiohttp.ClientSession() - HTTP session with connection pooling
  • asyncpg.create_pool() - PostgreSQL async connection pool
  • aiofiles.open() - async file I/O
  • asyncio.timeout() (Python 3.11+) - timeout as a context manager

Part 8 - Async Iterators

Async iterators allow iteration over data sources that require async operations per item - paginated APIs, database cursors, message queues, WebSocket streams:

import asyncio


class AsyncPagedAPI:
"""Simulates a paginated API that requires an async request per page."""

def __init__(self, total_pages: int) -> None:
self.total_pages = total_pages
self.current_page = 0

def __aiter__(self):
return self # self is both the iterator and the iterable

async def __anext__(self) -> list[dict]:
if self.current_page >= self.total_pages:
raise StopAsyncIteration # signals end of iteration

self.current_page += 1
await asyncio.sleep(0.1) # simulates API request per page
items = [
{"id": (self.current_page - 1) * 10 + i, "page": self.current_page}
for i in range(10)
]
return items


async def main():
api = AsyncPagedAPI(total_pages=5)
total_items = 0

async for page in api:
total_items += len(page)
print(f"Received page with {len(page)} items (total so far: {total_items})")

print(f"Done. Total items: {total_items}") # 50


asyncio.run(main())

Async Generator Functions - The Simpler Way

import asyncio


async def stream_events(source_url: str):
"""Async generator that yields events from a WebSocket-style stream."""
for event_id in range(10):
await asyncio.sleep(0.05) # simulates waiting for the next event
yield {"event_id": event_id, "source": source_url, "data": f"event-{event_id}"}


async def main():
async for event in stream_events("wss://stream.example.com/events"):
print(f"Received: {event['event_id']} - {event['data']}")


asyncio.run(main())

Part 9 - Blocking the Event Loop

This is the cardinal sin of asyncio. The event loop is single-threaded. If any coroutine calls a blocking function - one that holds the thread without yielding - every other coroutine freezes until it returns.

time.sleep() vs asyncio.sleep()

import asyncio
import time


async def bad_worker(name: str, delay: float):
"""BAD: time.sleep() blocks the entire event loop."""
print(f"[{name}] starting")
time.sleep(delay) # BLOCKS the thread - nothing else can run
print(f"[{name}] done")


async def good_worker(name: str, delay: float):
"""GOOD: asyncio.sleep() yields control to the event loop."""
print(f"[{name}] starting")
await asyncio.sleep(delay) # suspends; event loop runs other tasks
print(f"[{name}] done")


async def demo_bad():
print("--- BAD (blocking) ---")
start = time.perf_counter()
await asyncio.gather(
bad_worker("A", 1.0),
bad_worker("B", 1.0),
bad_worker("C", 1.0),
)
print(f"Elapsed: {time.perf_counter() - start:.1f}s") # ~3.0s - sequential!


async def demo_good():
print("--- GOOD (async) ---")
start = time.perf_counter()
await asyncio.gather(
good_worker("A", 1.0),
good_worker("B", 1.0),
good_worker("C", 1.0),
)
print(f"Elapsed: {time.perf_counter() - start:.1f}s") # ~1.0s - concurrent!


asyncio.run(demo_bad())
asyncio.run(demo_good())

:::danger Never Call Blocking Functions in Async Code time.sleep(), requests.get(), open().read() on large files, any synchronous database driver, subprocess.run() - these block the thread. The event loop stops. All other tasks freeze. Your server stops accepting new connections. Latency spikes for every user. This is the single most common asyncio mistake in production systems.

# WRONG - blocks the event loop while waiting for the database
async def get_user(user_id: int) -> dict:
import psycopg2 # synchronous driver
conn = psycopg2.connect("postgresql://localhost/db")
cursor = conn.execute("SELECT * FROM users WHERE id = %s", (user_id,))
return dict(cursor.fetchone())

# CORRECT - use an async driver that yields to the event loop
async def get_user(user_id: int) -> dict:
async with pool.acquire() as conn: # asyncpg or SQLAlchemy async
row = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return dict(row)

:::

Running Synchronous Code Safely - loop.run_in_executor()

When you must call a blocking function - a legacy library, a CPU-intensive task, a file operation without async support - offload it to a thread or process pool with run_in_executor():

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import requests # synchronous - safe only when run via executor


def sync_http_get(url: str) -> str:
"""Synchronous HTTP request - blocks its thread, not the event loop."""
response = requests.get(url, timeout=10)
return response.text


def cpu_heavy(n: int) -> int:
"""CPU-bound synchronous computation."""
return sum(i * i for i in range(n))


async def main():
loop = asyncio.get_running_loop()

# Strategy 1: ThreadPoolExecutor - for I/O-bound blocking calls
with ThreadPoolExecutor(max_workers=10) as thread_pool:
result = await loop.run_in_executor(
thread_pool,
sync_http_get,
"https://httpbin.org/get"
)
print(f"HTTP result length: {len(result)}")

# Strategy 2: ProcessPoolExecutor - for CPU-bound blocking calls
with ProcessPoolExecutor(max_workers=4) as process_pool:
result = await loop.run_in_executor(
process_pool,
cpu_heavy,
1_000_000
)
print(f"CPU result: {result}")

# Strategy 3: Default thread pool (None = use loop's built-in ThreadPoolExecutor)
await loop.run_in_executor(None, time.sleep, 0.1)
print("sleep done via default thread pool")


asyncio.run(main())

Decision tree - how to call synchronous code from async:

Full Example - Async News Aggregator

A production-quality async news aggregator that fetches headlines from multiple APIs concurrently, applies per-source timeouts, handles failures gracefully, and deduplicates results:

"""
async_news_aggregator.py - Fetch news from multiple sources concurrently.

Demonstrates: asyncio.gather, asyncio.timeout, async context managers,
async generators, and loop.run_in_executor for CPU-bound parsing.
"""
from __future__ import annotations

import asyncio
import hashlib
import time
from dataclasses import dataclass, field
from typing import AsyncIterator

try:
import aiohttp
except ImportError:
raise SystemExit("Install aiohttp: pip install aiohttp")


# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------

@dataclass
class Article:
title: str
url: str
source: str
description: str = ""

@property
def fingerprint(self) -> str:
"""Content hash for deduplication - identical articles across sources."""
content = f"{self.title.lower().strip()}{self.url.lower().strip()}"
return hashlib.md5(content.encode()).hexdigest()


@dataclass
class FetchResult:
source: str
articles: list[Article] = field(default_factory=list)
error: str | None = None
elapsed_ms: float = 0.0

@property
def success(self) -> bool:
return self.error is None


# ---------------------------------------------------------------------------
# Source definitions
# ---------------------------------------------------------------------------

NEWS_SOURCES = [
{
"name": "HackerNews",
"url": "https://hacker-news.firebaseio.com/v0/topstories.json",
"parser": "hackernews",
"timeout": 5.0,
},
{
"name": "DEV.to",
"url": "https://dev.to/api/articles?top=10&per_page=10",
"parser": "devto",
"timeout": 4.0,
},
{
"name": "GitHub",
"url": "https://api.github.com/search/repositories?q=stars:>1000&sort=stars&per_page=10",
"parser": "github",
"timeout": 6.0,
},
]


# ---------------------------------------------------------------------------
# Parsers - pure synchronous; offloaded to thread pool for large payloads
# ---------------------------------------------------------------------------

def parse_hackernews(data: list) -> list[Article]:
return [
Article(
title=f"HN Story #{story_id}",
url=f"https://news.ycombinator.com/item?id={story_id}",
source="HackerNews",
)
for story_id in (data[:10] if isinstance(data, list) else [])
]


def parse_devto(data: list) -> list[Article]:
return [
Article(
title=item.get("title", ""),
url=item.get("url", ""),
source="DEV.to",
description=item.get("description", ""),
)
for item in (data[:10] if isinstance(data, list) else [])
]


def parse_github(data: dict) -> list[Article]:
return [
Article(
title=repo.get("full_name", ""),
url=repo.get("html_url", ""),
source="GitHub",
description=repo.get("description", "") or "",
)
for repo in data.get("items", [])[:10]
]


PARSERS = {
"hackernews": parse_hackernews,
"devto": parse_devto,
"github": parse_github,
}


# ---------------------------------------------------------------------------
# Async fetcher - one coroutine per source, all exceptions contained
# ---------------------------------------------------------------------------

async def fetch_source(
session: aiohttp.ClientSession,
source: dict,
loop: asyncio.AbstractEventLoop,
) -> FetchResult:
"""Fetch one news source with a per-source timeout. Never raises."""
name = source["name"]
start = time.perf_counter()

try:
async with asyncio.timeout(source["timeout"]):
async with session.get(source["url"]) as resp:
resp.raise_for_status()
raw = await resp.json(content_type=None)

# Offload JSON parsing to thread pool - can be CPU-heavy for large responses
parser_fn = PARSERS[source["parser"]]
articles = await loop.run_in_executor(None, parser_fn, raw)

elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(source=name, articles=articles, elapsed_ms=elapsed_ms)

except asyncio.TimeoutError:
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(
source=name,
error=f"Timed out after {source['timeout']}s",
elapsed_ms=elapsed_ms,
)
except aiohttp.ClientError as exc:
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(source=name, error=str(exc), elapsed_ms=elapsed_ms)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start) * 1000
return FetchResult(source=name, error=f"Unexpected: {exc}", elapsed_ms=elapsed_ms)


# ---------------------------------------------------------------------------
# Async generator - lazy deduplication with cooperative yielding
# ---------------------------------------------------------------------------

async def stream_deduplicated(results: list[FetchResult]) -> AsyncIterator[Article]:
"""Yield deduplicated articles across all successful sources."""
seen: set[str] = set()
successful = sorted(
(r for r in results if r.success),
key=lambda r: r.source
)
for result in successful:
for article in result.articles:
fp = article.fingerprint
if fp not in seen:
seen.add(fp)
yield article
await asyncio.sleep(0) # cooperative yield - keeps event loop responsive


# ---------------------------------------------------------------------------
# Main orchestrator
# ---------------------------------------------------------------------------

async def aggregate_news() -> None:
loop = asyncio.get_running_loop()
wall_start = time.perf_counter()

connector = aiohttp.TCPConnector(limit=20, ttl_dns_cache=300)
session_timeout = aiohttp.ClientTimeout(total=30)

async with aiohttp.ClientSession(connector=connector, timeout=session_timeout) as session:
print(f"Fetching from {len(NEWS_SOURCES)} sources concurrently...\n")

# Fan-out: all sources fetch simultaneously
# fetch_source handles all exceptions internally - no need for return_exceptions
results: list[FetchResult] = await asyncio.gather(
*[fetch_source(session, src, loop) for src in NEWS_SOURCES],
)

# Per-source report
print("=== Source Results ===")
total_articles = 0
for result in results:
if result.success:
status = f"OK - {len(result.articles)} articles"
else:
status = f"FAILED - {result.error}"
print(f" {result.source:20s} {result.elapsed_ms:6.0f}ms {status}")
total_articles += len(result.articles)

# Fan-in: deduplicate and stream the best articles
print(f"\n=== Top Articles (deduplicated from {total_articles} total) ===")
count = 0
async for article in stream_deduplicated(results):
count += 1
print(f" [{count:2d}] [{article.source:12s}] {article.title[:60]}")
if count >= 15:
break

wall_elapsed = time.perf_counter() - wall_start
print(f"\nTotal wall time: {wall_elapsed:.2f}s")


if __name__ == "__main__":
asyncio.run(aggregate_news())

Design decisions in this example:

  • asyncio.gather() fans out to all sources simultaneously - total time equals the slowest source, not the sum
  • asyncio.timeout() (Python 3.11+) applies per-source deadlines inside fetch_source - each source gets its own independent timeout
  • fetch_source returns FetchResult for all outcomes including errors - one failing source does not affect the others
  • loop.run_in_executor(None, parser_fn, raw) offloads JSON parsing to the thread pool - keeps the event loop free during CPU-bound work
  • stream_deduplicated is an async generator with await asyncio.sleep(0) after each yield - cooperatively gives the event loop a chance to process other events between articles
  • aiohttp.TCPConnector(limit=20) caps open TCP connections - prevents overwhelming remote servers and exhausting local file descriptors

Graded Practice Challenges

Beginner - Predict the Output

import asyncio

async def say(message: str, delay: float) -> str:
await asyncio.sleep(delay)
print(message)
return message

async def main():
# Version A - sequential
await say("hello", 0.2)
await say("world", 0.1)
print("A done")

# Version B - concurrent
result = await asyncio.gather(
say("hello", 0.2),
say("world", 0.1),
)
print("B done:", result)

asyncio.run(main())

What is the exact output, and in what order do the prints appear for each version?

Show Answer

Version A output - sequential (total ~0.3s):

hello <- after 0.2s
world <- after 0.3s total (0.2 + 0.1)
A done

Version A uses two sequential await calls. say("hello", 0.2) runs and completes first. Then say("world", 0.1) runs. Total: 0.3s.

Version B output - concurrent (total ~0.2s):

world <- after 0.1s (finishes first)
hello <- after 0.2s (finishes second)
B done: ['hello', 'world']

Version B uses gather(). Both coroutines start simultaneously. world (delay 0.1s) finishes first and prints first. hello (delay 0.2s) finishes second and prints second.

Key insight: gather() returns results in input order - ['hello', 'world'], not ['world', 'hello'] - even though world completed first. The print side-effect happens in completion order, but the return value preserves input order.

Intermediate - Fix the Bugs

This async web scraper has four bugs. Find and fix each one:

import asyncio
import time
import aiohttp

URLS = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]

async def fetch(session, url):
# Bug 1: synchronous sleep blocks the event loop
time.sleep(0.1)
async with session.get(url) as resp:
return await resp.text()

async def process(text):
# Bug 2: CPU-heavy loop blocks the event loop
result = 0
for i in range(5_000_000):
result += i
return result

async def main():
async with aiohttp.ClientSession() as session:
# Bug 3: sequential fetching wastes asyncio concurrency
results = []
for url in URLS:
text = await fetch(session, url)
results.append(text)

# Bug 4: coroutine not awaited - silent bug, no result computed
processed = process(results[0])
print(f"processed: {processed}")

asyncio.run(main())
Show Solution

Bug 1 - time.sleep() blocks the event loop:

async def fetch(session, url):
await asyncio.sleep(0.1) # FIXED: cooperative yield
async with session.get(url) as resp:
return await resp.text()

Bug 2 - CPU-bound work should be offloaded to an executor:

def _process_sync(text: str) -> int:
"""Module-level sync function - safe to run in a thread."""
result = 0
for i in range(5_000_000):
result += i
return result

async def process(text: str) -> int:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _process_sync, text)

Bug 3 - Sequential fetching throws away asyncio's benefit:

async def main():
async with aiohttp.ClientSession() as session:
# FIXED: fan-out all URLs at once
results = await asyncio.gather(
*[fetch(session, url) for url in URLS]
)

Bug 4 - Missing await on the coroutine call:

# FIXED: await the coroutine so it actually runs
processed = await process(results[0])
print(f"processed: {processed}")

Advanced - Design Challenge

Design an AsyncRateLimitedFetcher class that:

  1. Accepts max_rps (max requests per second) and max_concurrent (max simultaneous in-flight requests)
  2. Has an async fetch(url) method returning the response body as a string
  3. Enforces the rate limit using asyncio.Semaphore for concurrency and a sliding-window approach for RPS
  4. Collects metrics: total requests, succeeded, failed, average latency
  5. Works as an async context manager

Example usage:

async with AsyncRateLimitedFetcher(max_rps=10, max_concurrent=5) as fetcher:
results = await asyncio.gather(*[fetcher.fetch(url) for url in urls])
print(fetcher.metrics())
Show Reference Solution
from __future__ import annotations

import asyncio
import time
from collections import deque
from dataclasses import dataclass

try:
import aiohttp
except ImportError:
raise SystemExit("pip install aiohttp")


@dataclass
class FetchMetrics:
total: int = 0
succeeded: int = 0
failed: int = 0
total_latency_ms: float = 0.0

@property
def avg_latency_ms(self) -> float:
return self.total_latency_ms / self.succeeded if self.succeeded else 0.0

def __repr__(self) -> str:
return (
f"FetchMetrics(total={self.total}, succeeded={self.succeeded}, "
f"failed={self.failed}, avg_latency={self.avg_latency_ms:.1f}ms)"
)


class AsyncRateLimitedFetcher:
"""
Async HTTP fetcher with concurrency cap and RPS rate limiting.

- asyncio.Semaphore caps the number of in-flight requests
- Sliding-window algorithm enforces max requests per second
- Built-in metrics for observability
- Async context manager for clean session lifecycle
"""

def __init__(self, max_rps: float = 10.0, max_concurrent: int = 5) -> None:
self.max_rps = max_rps
self.max_concurrent = max_concurrent

self._semaphore = asyncio.Semaphore(max_concurrent)
self._request_times: deque[float] = deque() # sliding window timestamps
self._rps_lock = asyncio.Lock()
self._metrics = FetchMetrics()
self._session: aiohttp.ClientSession | None = None

async def __aenter__(self) -> "AsyncRateLimitedFetcher":
connector = aiohttp.TCPConnector(limit=self.max_concurrent)
self._session = aiohttp.ClientSession(connector=connector)
return self

async def __aexit__(self, *exc) -> bool:
if self._session:
await self._session.close()
return False

async def _wait_for_rate_limit(self) -> None:
"""
Sliding window rate limiter.
Blocks (cooperatively) until making a new request stays within max_rps.
"""
async with self._rps_lock:
now = time.monotonic()

# Evict timestamps older than 1 second
while self._request_times and now - self._request_times[0] > 1.0:
self._request_times.popleft()

# If at the cap, sleep until the oldest timestamp exits the window
if len(self._request_times) >= self.max_rps:
sleep_time = 1.0 - (now - self._request_times[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)

self._request_times.append(time.monotonic())

async def fetch(self, url: str) -> str:
"""
Fetch a URL. Respects rate and concurrency limits.
Returns response body as str. Raises on HTTP or network error.
"""
if not self._session:
raise RuntimeError("Use 'async with AsyncRateLimitedFetcher() as fetcher'")

await self._wait_for_rate_limit()

self._metrics.total += 1
start = time.perf_counter()

async with self._semaphore: # blocks if max_concurrent requests are in-flight
try:
async with self._session.get(
url,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
resp.raise_for_status()
body = await resp.text()

latency_ms = (time.perf_counter() - start) * 1000
self._metrics.succeeded += 1
self._metrics.total_latency_ms += latency_ms
return body

except Exception:
self._metrics.failed += 1
raise

def metrics(self) -> FetchMetrics:
"""Return current metrics snapshot."""
return self._metrics


# Demo
async def demo() -> None:
urls = [f"https://httpbin.org/get?id={i}" for i in range(20)]

async with AsyncRateLimitedFetcher(max_rps=5.0, max_concurrent=3) as fetcher:
print(f"Fetching {len(urls)} URLs (max 5 RPS, max 3 concurrent)...")
start = time.perf_counter()

results = await asyncio.gather(
*[fetcher.fetch(url) for url in urls],
return_exceptions=True,
)

elapsed = time.perf_counter() - start
succeeded = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done in {elapsed:.2f}s")
print(f"Succeeded: {succeeded}/{len(urls)}")
print(f"Metrics: {fetcher.metrics()}")


if __name__ == "__main__":
asyncio.run(demo())

Key design decisions:

  • asyncio.Semaphore(max_concurrent) caps in-flight requests without blocking the event loop - async with self._semaphore releases automatically when the request finishes or raises
  • Sliding window uses a deque of timestamps and an asyncio.Lock - only one coroutine adjusts the window at a time; others queue cooperatively behind the lock
  • __aenter__ creates the aiohttp.ClientSession inside the running event loop - sessions must not be created at module level or before asyncio.run()
  • Exceptions increment failed and re-raise - the caller decides whether to swallow or propagate; return_exceptions=True in the demo collects them without crashing
  • metrics() returns the FetchMetrics dataclass directly - thread-safe reads on CPython (individual attribute reads are atomic); in a stricter multi-thread scenario you would copy the values under a lock

Key Takeaways

  • asyncio is cooperative concurrency on a single thread - coroutines take turns; no two coroutines ever execute at the exact same instant; the OS never preempts them
  • async def creates a coroutine factory - calling it returns a coroutine object, not a result; the function body does not execute until the coroutine is awaited or scheduled as a Task; forgetting await is a silent bug
  • await expr suspends the current coroutine and yields to the event loop; the coroutine resumes when expr completes - this is how thousands of concurrent I/O operations fit on one thread
  • Bare await is always sequential; use asyncio.create_task() before awaiting to schedule coroutines for concurrent execution
  • asyncio.gather() is the primary fan-out tool - starts all coroutines concurrently, returns results in input order regardless of completion order; use return_exceptions=True when tasks are independent
  • asyncio.wait_for(coro, timeout=N) cancels the coroutine and raises asyncio.TimeoutError if it does not complete within N seconds - essential for production reliability
  • Async context managers (async with) and async iterators (async for) integrate await into resource management and iteration - __aenter__/__aexit__ and __aiter__/__anext__ are the underlying protocols
  • Calling any blocking function - time.sleep, requests.get, synchronous DB drivers, subprocess.run - in async code freezes the entire event loop until the call returns; all other coroutines halt
  • Use loop.run_in_executor() with ThreadPoolExecutor for I/O-bound blocking code, and with ProcessPoolExecutor for CPU-bound blocking code - this runs the blocking call off the event loop thread while the event loop continues serving other coroutines
  • asyncio scales to tens of thousands of concurrent tasks on a single thread, but provides no CPU parallelism - for CPU-bound work, combine asyncio with ProcessPoolExecutor via run_in_executor()

What's Next

Lesson 04 - The Event Loop dives into what the event loop actually is under the hood - the selectors module, how asyncio.sleep() integrates with the OS's I/O polling (epoll on Linux, kqueue on macOS, IOCP on Windows), how Future objects connect I/O completion to coroutine resumption, and how to write custom event loop integrations. Understanding the event loop at this depth explains every asyncio behaviour you have encountered in this lesson.

© 2026 EngineersOfAI. All rights reserved.