Async Generators and Async Iterators
Before reading any explanation, predict the output of this program:
import asyncio
async def stream():
for i in range(3):
await asyncio.sleep(0)
yield i
async def main():
gen = stream()
print(type(gen))
value = await gen.__anext__()
print(f"First: {value}")
async for v in gen:
print(f"Loop: {v}")
# What happens here?
async for v in gen:
print(f"Second loop: {v}")
asyncio.run(main())
If you expected the second async for loop to produce values, you are wrong. Unlike synchronous generators, async generators are also single-use iterators. Once exhausted, they yield nothing on subsequent iterations. The type(gen) is <class 'async_generator'>, a distinct type from regular generators. The first __anext__() call consumes the 0 value, so the async for loop picks up from 1 and 2, and the second loop produces no output.
# Output:
# <class 'async_generator'>
# First: 0
# Loop: 1
# Loop: 2
This lesson takes you deep into the async iteration protocol -- the machinery that powers streaming data in modern Python applications.
What You Will Learn
- The
__aiter__/__anext__protocol and how it differs from synchronous iteration - Writing async generators with
async def+yield - Async comprehensions (
async forin list/set/dict comprehensions) - Async generator finalization and the
aclose()/athrow()methods - Building streaming data pipelines that process items as they arrive
- Real-world patterns: streaming API responses, database cursor iteration, file streaming
Prerequisites
- Solid understanding of synchronous generators and iterators (
yield,__iter__,__next__) - Comfort with
async def,await,asyncio.run() - Familiarity with the event loop from the Intermediate course
- Understanding of
async forsyntax at a basic level
Part 1 -- The Async Iteration Protocol
Synchronous iteration relies on two dunder methods: __iter__() returns the iterator, and __next__() returns the next value or raises StopIteration. The async counterpart mirrors this exactly, but every method is a coroutine.
class AsyncCounter:
"""An async iterator that counts from start to stop."""
def __init__(self, start: int, stop: int):
self.current = start
self.stop = stop
def __aiter__(self):
# Returns self -- just like synchronous iterators.
# This method is NOT a coroutine. It returns the iterator object.
return self
async def __anext__(self):
# This IS a coroutine. It can await other coroutines.
if self.current >= self.stop:
raise StopAsyncIteration # Not StopIteration!
value = self.current
self.current += 1
# Simulate async work (database query, network call, etc.)
await asyncio.sleep(0.01)
return value
import asyncio
async def main():
counter = AsyncCounter(0, 5)
async for value in counter:
print(value, end=" ")
# Output: 0 1 2 3 4
asyncio.run(main())
:::danger Critical Distinction
__aiter__ returns a regular object (not a coroutine). __anext__ returns a coroutine. StopAsyncIteration terminates the loop, not StopIteration. Mixing these up produces cryptic TypeError exceptions that do not mention the actual problem.
:::
The async for statement desugars to:
# async for item in iterable:
# process(item)
# Desugars to:
_iter = aiter(iterable) # calls iterable.__aiter__()
while True:
try:
item = await anext(_iter) # awaits _iter.__anext__()
except StopAsyncIteration:
break
process(item)
Python 3.10+ added the built-in aiter() and anext() functions, which mirror iter() and next() for async iterators:
async def main():
counter = AsyncCounter(0, 3)
it = aiter(counter)
print(await anext(it)) # 0
print(await anext(it)) # 1
print(await anext(it, -1)) # 2
print(await anext(it, -1)) # -1 (default, exhausted)
asyncio.run(main())
Separating Iterable from Iterator
Just as with synchronous code, you can separate the iterable (the container) from the iterator (the cursor). This allows multiple independent iterations over the same data source.
class AsyncRange:
"""Async iterable (not iterator) -- supports multiple iteration."""
def __init__(self, stop: int):
self.stop = stop
def __aiter__(self):
# Return a NEW iterator each time
return AsyncRangeIterator(self.stop)
class AsyncRangeIterator:
"""The actual async iterator with cursor state."""
def __init__(self, stop: int):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
value = self.current
self.current += 1
await asyncio.sleep(0)
return value
async def main():
r = AsyncRange(3)
# First iteration
async for v in r:
print(v, end=" ") # 0 1 2
print()
# Second iteration -- works because __aiter__ returns a new iterator
async for v in r:
print(v, end=" ") # 0 1 2
asyncio.run(main())
Part 2 -- Async Generators
Writing a full class with __aiter__ and __anext__ is verbose. Async generators provide the same capability with far less boilerplate:
async def async_range(stop: int):
"""Equivalent to AsyncRange, but as an async generator."""
for i in range(stop):
await asyncio.sleep(0) # Yield control to event loop
yield i
When you call async_range(5), Python returns an async_generator object. It implements both __aiter__ and __anext__ automatically. Each yield suspends the generator and produces a value. Each await inside the generator suspends it and allows the event loop to run other tasks.
async def main():
async for value in async_range(5):
print(value, end=" ")
# Output: 0 1 2 3 4
asyncio.run(main())
Async Generators with External Data Sources
The real power appears when the generator wraps I/O operations:
import asyncio
import json
async def stream_json_lines(reader: asyncio.StreamReader):
"""Yield parsed JSON objects from a newline-delimited stream."""
buffer = b""
while True:
chunk = await reader.read(4096)
if not chunk:
# End of stream -- yield any remaining data
if buffer.strip():
yield json.loads(buffer)
return
buffer += chunk
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
if line.strip():
yield json.loads(line)
This generator encapsulates all the complexity of buffered reading, line splitting, and JSON parsing. The consumer sees a clean stream of dictionaries:
async def process_stream(reader):
async for record in stream_json_lines(reader):
print(f"Received: {record['event']}")
Yield From Does Not Exist in Async
There is no async yield from. If you need to delegate to another async generator, iterate explicitly:
async def combined_streams(*streams):
"""Merge multiple async generators sequentially."""
for stream in streams:
async for item in stream:
yield item
PEP 525 (Python 3.6) introduced async generators. The yield from delegation syntax was deliberately excluded because the semantics of delegating to an async sub-generator while maintaining proper finalization are not straightforward. Use explicit async for delegation instead.
Part 3 -- Async Comprehensions
Python supports async for inside comprehensions, but only within an async def function:
async def fetch_page(url: str) -> str:
"""Simulate fetching a page."""
await asyncio.sleep(0.01)
return f"content of {url}"
async def generate_urls():
"""Async generator of URLs."""
for i in range(5):
await asyncio.sleep(0)
yield f"https://api.example.com/page/{i}"
async def main():
# Async list comprehension
pages = [page async for page in generate_urls()]
print(pages)
# ['https://api.example.com/page/0', ..., 'https://api.example.com/page/4']
# Async list comprehension with await
contents = [await fetch_page(url) async for url in generate_urls()]
print(contents[0])
# 'content of https://api.example.com/page/0'
# Async set comprehension
unique_domains = {url.split("/")[2] async for url in generate_urls()}
print(unique_domains)
# {'api.example.com'}
# Async dict comprehension
url_map = {i: url async for i, url in aenumerate(generate_urls())}
# Async generator expression (lazy)
lazy = (url async for url in generate_urls())
print(type(lazy))
# <class 'async_generator'>
asyncio.run(main())
A useful helper, since Python does not provide aenumerate built-in:
async def aenumerate(aiterable, start=0):
"""Async version of enumerate()."""
counter = start
async for item in aiterable:
yield counter, item
counter += 1
Async comprehensions collect ALL items into memory before returning. For large streams, keep the async for loop and process items one at a time. The async generator expression (x async for x in ...) is lazy and does not buffer.
Part 4 -- Async Generator Finalization
Async generators have a lifecycle problem that synchronous generators do not. When a synchronous generator is garbage-collected, Python calls its close() method, which throws GeneratorExit into the generator. The generator runs its finally blocks and terminates.
Async generators also have aclose(), but cleanup may require awaiting coroutines. Garbage collection cannot run coroutines because there is no event loop context during GC. Python solves this with an async generator finalization hook.
async def resource_stream():
"""An async generator that holds a resource."""
print("Opening resource")
try:
for i in range(100):
await asyncio.sleep(0)
yield i
finally:
# This cleanup code might need to await!
print("Closing resource")
await asyncio.sleep(0) # Simulate async cleanup
print("Resource closed")
async def main():
async for value in resource_stream():
if value == 3:
break # What happens to the generator?
# When we break, Python schedules aclose() via the finalizer hook.
# The finally block runs asynchronously.
print("After break")
await asyncio.sleep(0.1) # Give finalizer time to run
asyncio.run(main())
# Output:
# Opening resource
# Closing resource
# Resource closed
# After break
Explicit Finalization with aclose()
Relying on the finalizer hook is fragile. Prefer explicit cleanup:
async def main():
gen = resource_stream()
async for value in gen:
if value == 3:
break
# Explicitly close the generator
await gen.aclose()
print("Generator explicitly closed")
The athrow() Method
You can throw exceptions into an async generator, just like synchronous generators:
async def resilient_stream():
"""Generator that handles injected exceptions."""
i = 0
while True:
try:
await asyncio.sleep(0)
yield i
i += 1
except ValueError as e:
print(f"Caught injected error: {e}")
# Continue yielding from where we left off
i += 10
async def main():
gen = resilient_stream()
print(await anext(gen)) # 0
print(await anext(gen)) # 1
# Throw an exception into the generator
result = await gen.athrow(ValueError, "skip ahead")
# Caught injected error: skip ahead
print(result) # 11 (i was 1, then += 10, then yielded)
print(await anext(gen)) # 12
await gen.aclose()
asyncio.run(main())
Never leave async generators unclosed when they hold resources (database connections, file handles, network sockets). The async finalizer hook runs on the event loop's shutdown sequence, which may be too late. Always use async with wrappers or explicit aclose().
Setting the Finalization Hook
Python 3.6+ lets you install a custom async generator finalizer per event loop:
import sys
def custom_finalizer(gen):
"""Called when an async generator is garbage-collected."""
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(gen.aclose())
else:
loop.run_until_complete(gen.aclose())
# Set it globally
sys.set_asyncgen_hooks(finalizer=custom_finalizer)
In practice, asyncio.run() installs its own finalizer. You rarely need to set this yourself.
Part 5 -- Streaming Data Pipelines
Async generators compose naturally into processing pipelines:
import asyncio
from typing import AsyncIterator, TypeVar
T = TypeVar("T")
# Stage 1: Data Source
async def fetch_events(api_url: str) -> AsyncIterator[dict]:
"""Simulate streaming events from an API."""
events = [
{"type": "click", "page": "/home", "ts": 1000},
{"type": "view", "page": "/about", "ts": 1001},
{"type": "click", "page": "/pricing", "ts": 1002},
{"type": "error", "page": "/checkout", "ts": 1003},
{"type": "click", "page": "/home", "ts": 1004},
{"type": "view", "page": "/docs", "ts": 1005},
]
for event in events:
await asyncio.sleep(0.01) # Simulate network delay
yield event
# Stage 2: Filter
async def filter_events(
source: AsyncIterator[dict], event_type: str
) -> AsyncIterator[dict]:
"""Pass through only events of the specified type."""
async for event in source:
if event["type"] == event_type:
yield event
# Stage 3: Transform
async def enrich_events(source: AsyncIterator[dict]) -> AsyncIterator[dict]:
"""Add computed fields to each event."""
async for event in source:
event["processed"] = True
event["page_section"] = event["page"].strip("/").split("/")[0] or "root"
yield event
# Stage 4: Batch
async def batch_events(
source: AsyncIterator[dict], size: int
) -> AsyncIterator[list[dict]]:
"""Collect events into fixed-size batches."""
batch: list[dict] = []
async for event in source:
batch.append(event)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch # Yield remaining partial batch
# Compose the pipeline
async def main():
pipeline = batch_events(
enrich_events(
filter_events(
fetch_events("https://api.example.com/events"),
event_type="click"
)
),
size=2
)
async for batch in pipeline:
print(f"Batch of {len(batch)}: {[e['page'] for e in batch]}")
asyncio.run(main())
# Output:
# Batch of 2: ['/home', '/pricing']
# Batch of 1: ['/home']
Each stage processes items lazily. No buffering occurs except in the batch_events stage. Back-pressure is natural: the sink controls how fast the pipeline runs because each anext() call propagates through the chain.
Part 6 -- Real-World Patterns
Pattern 1: Streaming HTTP Responses with aiohttp
import aiohttp
async def stream_response(url: str, chunk_size: int = 8192):
"""Yield chunks from a streaming HTTP response."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
async for chunk in response.content.iter_chunked(chunk_size):
yield chunk
async def download_file(url: str, dest: str):
"""Stream a large file to disk without loading it all into memory."""
total = 0
with open(dest, "wb") as f:
async for chunk in stream_response(url):
f.write(chunk)
total += len(chunk)
if total % (1024 * 1024) == 0:
print(f"Downloaded {total / 1024 / 1024:.1f} MB")
print(f"Complete: {total} bytes")
Pattern 2: Database Cursor Iteration
import asyncpg
async def stream_rows(
pool: asyncpg.Pool,
query: str,
*args,
batch_size: int = 1000
):
"""Yield rows from a database query using a server-side cursor."""
async with pool.acquire() as conn:
async with conn.transaction():
# Use a server-side cursor to avoid loading all rows
async for record in conn.cursor(query, *args, prefetch=batch_size):
yield dict(record)
async def export_users(pool: asyncpg.Pool):
"""Process all users without loading them into memory."""
query = "SELECT id, name, email FROM users WHERE active = $1"
count = 0
async for user in stream_rows(pool, query, True, batch_size=500):
# Process each user
count += 1
if count % 10000 == 0:
print(f"Processed {count} users")
print(f"Total: {count} users")
Pattern 3: Server-Sent Events (SSE) Client
async def sse_stream(url: str):
"""Parse a Server-Sent Events stream into event dicts."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
event_data = {}
async for line in response.content:
line = line.decode("utf-8").rstrip("\n")
if line.startswith("event:"):
event_data["event"] = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
event_data.setdefault("data", []).append(data)
elif line.startswith("id:"):
event_data["id"] = line[3:].strip()
elif line == "":
# Empty line = end of event
if event_data:
if "data" in event_data:
event_data["data"] = "\n".join(event_data["data"])
yield event_data
event_data = {}
async def monitor_events():
async for event in sse_stream("https://api.example.com/stream"):
print(f"Event: {event.get('event', 'message')}")
print(f"Data: {event.get('data', '')}")
Pattern 4: Timeout-Aware Async Iterator
async def with_timeout(
source: AsyncIterator[T],
timeout: float,
default: T = None
) -> AsyncIterator[T]:
"""Wrap an async iterator with per-item timeouts."""
ait = aiter(source)
while True:
try:
value = await asyncio.wait_for(anext(ait), timeout=timeout)
yield value
except StopAsyncIteration:
return
except asyncio.TimeoutError:
if default is not None:
yield default
else:
raise
Part 7 -- Performance Considerations
Avoid Unnecessary Awaits
Every await in an async generator is a suspension point. If your generator does not perform I/O, it does not benefit from being async:
# Bad: async generator with no real async work
async def bad_range(n):
for i in range(n):
yield i # No await -- this is pointless as an async generator
# Good: regular generator is faster
def good_range(n):
for i in range(n):
yield i
Buffer for Throughput
Individual item processing has overhead. Batch when throughput matters:
async def buffered_read(reader, buffer_size=64 * 1024):
"""Read large chunks and yield individual lines."""
leftover = b""
while True:
chunk = await reader.read(buffer_size)
if not chunk:
if leftover:
yield leftover.decode()
return
data = leftover + chunk
lines = data.split(b"\n")
leftover = lines.pop() # May be incomplete
for line in lines:
yield line.decode()
Memory Profile of Pipelines
Each pipeline stage holds at most one item in memory (except batching stages). This is the key advantage over collecting everything into a list first.
Key Takeaways
- The async iteration protocol uses
__aiter__()(sync method returning self) and__anext__()(coroutine returning next value or raisingStopAsyncIteration). - Async generators (
async def+yield) are the preferred way to create async iterators -- they reduce boilerplate and handle finalization. - Async comprehensions (
async for x in ...inside[],{},()) work only insideasync deffunctions. List/set/dict comprehensions are eager; generator expressions are lazy. - Always explicitly
aclose()async generators that hold resources. The garbage collector's finalization hook is a safety net, not a primary cleanup mechanism. - Async generators compose into streaming pipelines with natural back-pressure and O(1) memory per stage.
aiter()andanext()built-ins (Python 3.10+) mirroriter()andnext().- Use
athrow()to inject exceptions into running async generators for error-recovery protocols.
Graded Practice Challenges
Level 1 -- Predict the Output
Question 1: What does this print?
import asyncio
async def gen():
yield 1
yield 2
yield 3
async def main():
g = gen()
print(await anext(g))
print(await anext(g))
await g.aclose()
print(await anext(g, "done"))
asyncio.run(main())
Answer
1
2
done
After aclose(), the generator is finalized. Calling anext() on a closed async generator raises StopAsyncIteration, which the default value "done" catches.
Question 2: What does this print?
import asyncio
async def counter():
i = 0
while True:
await asyncio.sleep(0)
yield i
i += 1
async def main():
results = [v async for v in counter() if v < 5]
print(results)
asyncio.run(main())
Answer
This runs forever and never prints. The comprehension filters with if v < 5, but it does not stop iterating when v >= 5. It continues calling __anext__() on the infinite generator, waiting for more values that pass the filter. There are none, so it hangs. You need to use async_islice or break the iteration explicitly.
Question 3: What does this print?
import asyncio
async def gen():
try:
yield 1
yield 2
yield 3
finally:
print("cleanup")
async def main():
async for v in gen():
print(v)
if v == 2:
break
print("after loop")
asyncio.run(main())
Answer
1
2
cleanup
after loop
When break exits the async for loop, Python calls aclose() on the generator, which triggers the finally block. The cleanup runs before "after loop" is printed because the aclose() is implicit in the loop teardown.
Question 4: What is the type of result?
import asyncio
async def gen():
for i in range(3):
yield i
async def main():
result = (x * 2 async for x in gen())
print(type(result).__name__)
asyncio.run(main())
Answer
async_generator
Parentheses around an async for comprehension create an async generator expression, not a tuple. It is lazy and produces values on demand.
Level 2 -- Debug Challenge
This async pipeline is supposed to read events, filter them, and collect results. It hangs indefinitely. Find and fix the bug.
import asyncio
async def event_source():
events = [
{"type": "click", "value": 1},
{"type": "view", "value": 2},
{"type": "click", "value": 3},
{"type": "view", "value": 4},
]
for e in events:
await asyncio.sleep(0.01)
yield e
async def filter_clicks(source):
async for event in source:
if event["type"] == "click":
yield event
async def main():
source = event_source()
clicks = filter_clicks(source)
results = []
while True:
event = await anext(clicks)
results.append(event)
print(results)
asyncio.run(main())
Answer
The while True loop calls anext(clicks) without catching StopAsyncIteration. When the source is exhausted, anext() raises StopAsyncIteration, but since there is no except clause and it is inside a while True (not an async for), the exception propagates up and crashes the program -- or in some contexts, is silently swallowed causing a hang.
Fix: use a default sentinel or async for:
async def main():
source = event_source()
clicks = filter_clicks(source)
# Option 1: Use async for
results = [event async for event in clicks]
# Option 2: Use sentinel
# SENTINEL = object()
# results = []
# while (event := await anext(clicks, SENTINEL)) is not SENTINEL:
# results.append(event)
print(results)
Level 3 -- Design Challenge
Design an async_merge function that takes multiple async iterators and yields items from all of them as they become available (interleaved, not sequential). Items should be yielded as soon as any source produces one, regardless of order. Handle the case where some sources are faster than others. Ensure all sources are properly closed when the merge generator is closed.
Design Hints
- Create a
asyncio.Queueas the central collection point. - Spawn one task per source that reads from the source and puts items into the queue.
- The main generator reads from the queue and yields items.
- Track how many sources are exhausted. When all are done, stop.
- On
aclose(), cancel all spawned tasks and drain the queue.
Key considerations:
- Use a sentinel value to signal that a source is exhausted (not
None, sinceNonemight be a valid item). - Handle exceptions from individual sources without crashing the entire merge.
- Use
asyncio.TaskGroup(Lesson 3) or manual task management for cleanup.
async def async_merge(*sources):
queue = asyncio.Queue()
DONE = object()
active = len(sources)
async def reader(source):
nonlocal active
try:
async for item in source:
await queue.put(item)
finally:
await queue.put(DONE)
tasks = [asyncio.create_task(reader(s)) for s in sources]
try:
while active > 0:
item = await queue.get()
if item is DONE:
active -= 1
else:
yield item
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
What's Next
You now know how to build streaming async pipelines with generators and iterators. The next lesson, Async Context Managers, covers how to manage the lifecycle of async resources (connections, sessions, pools) with __aenter__ / __aexit__ and asynccontextmanager -- the glue that makes those database cursors and HTTP sessions in this lesson's examples work correctly.
