Skip to main content

Async Generators and Async Iterators

Before reading any explanation, predict the output of this program:

import asyncio

async def stream():
for i in range(3):
await asyncio.sleep(0)
yield i

async def main():
gen = stream()
print(type(gen))

value = await gen.__anext__()
print(f"First: {value}")

async for v in gen:
print(f"Loop: {v}")

# What happens here?
async for v in gen:
print(f"Second loop: {v}")

asyncio.run(main())

If you expected the second async for loop to produce values, you are wrong. Unlike synchronous generators, async generators are also single-use iterators. Once exhausted, they yield nothing on subsequent iterations. The type(gen) is <class 'async_generator'>, a distinct type from regular generators. The first __anext__() call consumes the 0 value, so the async for loop picks up from 1 and 2, and the second loop produces no output.

# Output:
# <class 'async_generator'>
# First: 0
# Loop: 1
# Loop: 2

This lesson takes you deep into the async iteration protocol -- the machinery that powers streaming data in modern Python applications.

What You Will Learn

  • The __aiter__ / __anext__ protocol and how it differs from synchronous iteration
  • Writing async generators with async def + yield
  • Async comprehensions (async for in list/set/dict comprehensions)
  • Async generator finalization and the aclose() / athrow() methods
  • Building streaming data pipelines that process items as they arrive
  • Real-world patterns: streaming API responses, database cursor iteration, file streaming

Prerequisites

  • Solid understanding of synchronous generators and iterators (yield, __iter__, __next__)
  • Comfort with async def, await, asyncio.run()
  • Familiarity with the event loop from the Intermediate course
  • Understanding of async for syntax at a basic level

Part 1 -- The Async Iteration Protocol

Synchronous iteration relies on two dunder methods: __iter__() returns the iterator, and __next__() returns the next value or raises StopIteration. The async counterpart mirrors this exactly, but every method is a coroutine.

class AsyncCounter:
"""An async iterator that counts from start to stop."""

def __init__(self, start: int, stop: int):
self.current = start
self.stop = stop

def __aiter__(self):
# Returns self -- just like synchronous iterators.
# This method is NOT a coroutine. It returns the iterator object.
return self

async def __anext__(self):
# This IS a coroutine. It can await other coroutines.
if self.current >= self.stop:
raise StopAsyncIteration # Not StopIteration!

value = self.current
self.current += 1
# Simulate async work (database query, network call, etc.)
await asyncio.sleep(0.01)
return value
import asyncio

async def main():
counter = AsyncCounter(0, 5)

async for value in counter:
print(value, end=" ")
# Output: 0 1 2 3 4

asyncio.run(main())

:::danger Critical Distinction __aiter__ returns a regular object (not a coroutine). __anext__ returns a coroutine. StopAsyncIteration terminates the loop, not StopIteration. Mixing these up produces cryptic TypeError exceptions that do not mention the actual problem. :::

The async for statement desugars to:

# async for item in iterable:
# process(item)

# Desugars to:
_iter = aiter(iterable) # calls iterable.__aiter__()
while True:
try:
item = await anext(_iter) # awaits _iter.__anext__()
except StopAsyncIteration:
break
process(item)

Python 3.10+ added the built-in aiter() and anext() functions, which mirror iter() and next() for async iterators:

async def main():
counter = AsyncCounter(0, 3)

it = aiter(counter)
print(await anext(it)) # 0
print(await anext(it)) # 1
print(await anext(it, -1)) # 2
print(await anext(it, -1)) # -1 (default, exhausted)

asyncio.run(main())

Separating Iterable from Iterator

Just as with synchronous code, you can separate the iterable (the container) from the iterator (the cursor). This allows multiple independent iterations over the same data source.

class AsyncRange:
"""Async iterable (not iterator) -- supports multiple iteration."""

def __init__(self, stop: int):
self.stop = stop

def __aiter__(self):
# Return a NEW iterator each time
return AsyncRangeIterator(self.stop)


class AsyncRangeIterator:
"""The actual async iterator with cursor state."""

def __init__(self, stop: int):
self.current = 0
self.stop = stop

def __aiter__(self):
return self

async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
value = self.current
self.current += 1
await asyncio.sleep(0)
return value


async def main():
r = AsyncRange(3)

# First iteration
async for v in r:
print(v, end=" ") # 0 1 2
print()

# Second iteration -- works because __aiter__ returns a new iterator
async for v in r:
print(v, end=" ") # 0 1 2

asyncio.run(main())

Part 2 -- Async Generators

Writing a full class with __aiter__ and __anext__ is verbose. Async generators provide the same capability with far less boilerplate:

async def async_range(stop: int):
"""Equivalent to AsyncRange, but as an async generator."""
for i in range(stop):
await asyncio.sleep(0) # Yield control to event loop
yield i

When you call async_range(5), Python returns an async_generator object. It implements both __aiter__ and __anext__ automatically. Each yield suspends the generator and produces a value. Each await inside the generator suspends it and allows the event loop to run other tasks.

async def main():
async for value in async_range(5):
print(value, end=" ")
# Output: 0 1 2 3 4

asyncio.run(main())

Async Generators with External Data Sources

The real power appears when the generator wraps I/O operations:

import asyncio
import json

async def stream_json_lines(reader: asyncio.StreamReader):
"""Yield parsed JSON objects from a newline-delimited stream."""
buffer = b""
while True:
chunk = await reader.read(4096)
if not chunk:
# End of stream -- yield any remaining data
if buffer.strip():
yield json.loads(buffer)
return

buffer += chunk
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
if line.strip():
yield json.loads(line)

This generator encapsulates all the complexity of buffered reading, line splitting, and JSON parsing. The consumer sees a clean stream of dictionaries:

async def process_stream(reader):
async for record in stream_json_lines(reader):
print(f"Received: {record['event']}")

Yield From Does Not Exist in Async

There is no async yield from. If you need to delegate to another async generator, iterate explicitly:

async def combined_streams(*streams):
"""Merge multiple async generators sequentially."""
for stream in streams:
async for item in stream:
yield item
note

PEP 525 (Python 3.6) introduced async generators. The yield from delegation syntax was deliberately excluded because the semantics of delegating to an async sub-generator while maintaining proper finalization are not straightforward. Use explicit async for delegation instead.

Part 3 -- Async Comprehensions

Python supports async for inside comprehensions, but only within an async def function:

async def fetch_page(url: str) -> str:
"""Simulate fetching a page."""
await asyncio.sleep(0.01)
return f"content of {url}"

async def generate_urls():
"""Async generator of URLs."""
for i in range(5):
await asyncio.sleep(0)
yield f"https://api.example.com/page/{i}"

async def main():
# Async list comprehension
pages = [page async for page in generate_urls()]
print(pages)
# ['https://api.example.com/page/0', ..., 'https://api.example.com/page/4']

# Async list comprehension with await
contents = [await fetch_page(url) async for url in generate_urls()]
print(contents[0])
# 'content of https://api.example.com/page/0'

# Async set comprehension
unique_domains = {url.split("/")[2] async for url in generate_urls()}
print(unique_domains)
# {'api.example.com'}

# Async dict comprehension
url_map = {i: url async for i, url in aenumerate(generate_urls())}

# Async generator expression (lazy)
lazy = (url async for url in generate_urls())
print(type(lazy))
# <class 'async_generator'>

asyncio.run(main())

A useful helper, since Python does not provide aenumerate built-in:

async def aenumerate(aiterable, start=0):
"""Async version of enumerate()."""
counter = start
async for item in aiterable:
yield counter, item
counter += 1
tip

Async comprehensions collect ALL items into memory before returning. For large streams, keep the async for loop and process items one at a time. The async generator expression (x async for x in ...) is lazy and does not buffer.

Part 4 -- Async Generator Finalization

Async generators have a lifecycle problem that synchronous generators do not. When a synchronous generator is garbage-collected, Python calls its close() method, which throws GeneratorExit into the generator. The generator runs its finally blocks and terminates.

Async generators also have aclose(), but cleanup may require awaiting coroutines. Garbage collection cannot run coroutines because there is no event loop context during GC. Python solves this with an async generator finalization hook.

async def resource_stream():
"""An async generator that holds a resource."""
print("Opening resource")
try:
for i in range(100):
await asyncio.sleep(0)
yield i
finally:
# This cleanup code might need to await!
print("Closing resource")
await asyncio.sleep(0) # Simulate async cleanup
print("Resource closed")


async def main():
async for value in resource_stream():
if value == 3:
break # What happens to the generator?

# When we break, Python schedules aclose() via the finalizer hook.
# The finally block runs asynchronously.
print("After break")
await asyncio.sleep(0.1) # Give finalizer time to run

asyncio.run(main())
# Output:
# Opening resource
# Closing resource
# Resource closed
# After break

Explicit Finalization with aclose()

Relying on the finalizer hook is fragile. Prefer explicit cleanup:

async def main():
gen = resource_stream()
async for value in gen:
if value == 3:
break

# Explicitly close the generator
await gen.aclose()
print("Generator explicitly closed")

The athrow() Method

You can throw exceptions into an async generator, just like synchronous generators:

async def resilient_stream():
"""Generator that handles injected exceptions."""
i = 0
while True:
try:
await asyncio.sleep(0)
yield i
i += 1
except ValueError as e:
print(f"Caught injected error: {e}")
# Continue yielding from where we left off
i += 10


async def main():
gen = resilient_stream()

print(await anext(gen)) # 0
print(await anext(gen)) # 1

# Throw an exception into the generator
result = await gen.athrow(ValueError, "skip ahead")
# Caught injected error: skip ahead
print(result) # 11 (i was 1, then += 10, then yielded)

print(await anext(gen)) # 12

await gen.aclose()

asyncio.run(main())
danger

Never leave async generators unclosed when they hold resources (database connections, file handles, network sockets). The async finalizer hook runs on the event loop's shutdown sequence, which may be too late. Always use async with wrappers or explicit aclose().

Setting the Finalization Hook

Python 3.6+ lets you install a custom async generator finalizer per event loop:

import sys

def custom_finalizer(gen):
"""Called when an async generator is garbage-collected."""
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(gen.aclose())
else:
loop.run_until_complete(gen.aclose())

# Set it globally
sys.set_asyncgen_hooks(finalizer=custom_finalizer)

In practice, asyncio.run() installs its own finalizer. You rarely need to set this yourself.

Part 5 -- Streaming Data Pipelines

Async generators compose naturally into processing pipelines:

import asyncio
from typing import AsyncIterator, TypeVar

T = TypeVar("T")


# Stage 1: Data Source
async def fetch_events(api_url: str) -> AsyncIterator[dict]:
"""Simulate streaming events from an API."""
events = [
{"type": "click", "page": "/home", "ts": 1000},
{"type": "view", "page": "/about", "ts": 1001},
{"type": "click", "page": "/pricing", "ts": 1002},
{"type": "error", "page": "/checkout", "ts": 1003},
{"type": "click", "page": "/home", "ts": 1004},
{"type": "view", "page": "/docs", "ts": 1005},
]
for event in events:
await asyncio.sleep(0.01) # Simulate network delay
yield event


# Stage 2: Filter
async def filter_events(
source: AsyncIterator[dict], event_type: str
) -> AsyncIterator[dict]:
"""Pass through only events of the specified type."""
async for event in source:
if event["type"] == event_type:
yield event


# Stage 3: Transform
async def enrich_events(source: AsyncIterator[dict]) -> AsyncIterator[dict]:
"""Add computed fields to each event."""
async for event in source:
event["processed"] = True
event["page_section"] = event["page"].strip("/").split("/")[0] or "root"
yield event


# Stage 4: Batch
async def batch_events(
source: AsyncIterator[dict], size: int
) -> AsyncIterator[list[dict]]:
"""Collect events into fixed-size batches."""
batch: list[dict] = []
async for event in source:
batch.append(event)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch # Yield remaining partial batch


# Compose the pipeline
async def main():
pipeline = batch_events(
enrich_events(
filter_events(
fetch_events("https://api.example.com/events"),
event_type="click"
)
),
size=2
)

async for batch in pipeline:
print(f"Batch of {len(batch)}: {[e['page'] for e in batch]}")

asyncio.run(main())
# Output:
# Batch of 2: ['/home', '/pricing']
# Batch of 1: ['/home']

Each stage processes items lazily. No buffering occurs except in the batch_events stage. Back-pressure is natural: the sink controls how fast the pipeline runs because each anext() call propagates through the chain.

Part 6 -- Real-World Patterns

Pattern 1: Streaming HTTP Responses with aiohttp

import aiohttp

async def stream_response(url: str, chunk_size: int = 8192):
"""Yield chunks from a streaming HTTP response."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
async for chunk in response.content.iter_chunked(chunk_size):
yield chunk


async def download_file(url: str, dest: str):
"""Stream a large file to disk without loading it all into memory."""
total = 0
with open(dest, "wb") as f:
async for chunk in stream_response(url):
f.write(chunk)
total += len(chunk)
if total % (1024 * 1024) == 0:
print(f"Downloaded {total / 1024 / 1024:.1f} MB")
print(f"Complete: {total} bytes")

Pattern 2: Database Cursor Iteration

import asyncpg

async def stream_rows(
pool: asyncpg.Pool,
query: str,
*args,
batch_size: int = 1000
):
"""Yield rows from a database query using a server-side cursor."""
async with pool.acquire() as conn:
async with conn.transaction():
# Use a server-side cursor to avoid loading all rows
async for record in conn.cursor(query, *args, prefetch=batch_size):
yield dict(record)


async def export_users(pool: asyncpg.Pool):
"""Process all users without loading them into memory."""
query = "SELECT id, name, email FROM users WHERE active = $1"

count = 0
async for user in stream_rows(pool, query, True, batch_size=500):
# Process each user
count += 1
if count % 10000 == 0:
print(f"Processed {count} users")

print(f"Total: {count} users")

Pattern 3: Server-Sent Events (SSE) Client

async def sse_stream(url: str):
"""Parse a Server-Sent Events stream into event dicts."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
event_data = {}
async for line in response.content:
line = line.decode("utf-8").rstrip("\n")

if line.startswith("event:"):
event_data["event"] = line[6:].strip()
elif line.startswith("data:"):
data = line[5:].strip()
event_data.setdefault("data", []).append(data)
elif line.startswith("id:"):
event_data["id"] = line[3:].strip()
elif line == "":
# Empty line = end of event
if event_data:
if "data" in event_data:
event_data["data"] = "\n".join(event_data["data"])
yield event_data
event_data = {}


async def monitor_events():
async for event in sse_stream("https://api.example.com/stream"):
print(f"Event: {event.get('event', 'message')}")
print(f"Data: {event.get('data', '')}")

Pattern 4: Timeout-Aware Async Iterator

async def with_timeout(
source: AsyncIterator[T],
timeout: float,
default: T = None
) -> AsyncIterator[T]:
"""Wrap an async iterator with per-item timeouts."""
ait = aiter(source)
while True:
try:
value = await asyncio.wait_for(anext(ait), timeout=timeout)
yield value
except StopAsyncIteration:
return
except asyncio.TimeoutError:
if default is not None:
yield default
else:
raise

Part 7 -- Performance Considerations

Avoid Unnecessary Awaits

Every await in an async generator is a suspension point. If your generator does not perform I/O, it does not benefit from being async:

# Bad: async generator with no real async work
async def bad_range(n):
for i in range(n):
yield i # No await -- this is pointless as an async generator

# Good: regular generator is faster
def good_range(n):
for i in range(n):
yield i

Buffer for Throughput

Individual item processing has overhead. Batch when throughput matters:

async def buffered_read(reader, buffer_size=64 * 1024):
"""Read large chunks and yield individual lines."""
leftover = b""
while True:
chunk = await reader.read(buffer_size)
if not chunk:
if leftover:
yield leftover.decode()
return

data = leftover + chunk
lines = data.split(b"\n")
leftover = lines.pop() # May be incomplete

for line in lines:
yield line.decode()

Memory Profile of Pipelines

Each pipeline stage holds at most one item in memory (except batching stages). This is the key advantage over collecting everything into a list first.

Key Takeaways

  • The async iteration protocol uses __aiter__() (sync method returning self) and __anext__() (coroutine returning next value or raising StopAsyncIteration).
  • Async generators (async def + yield) are the preferred way to create async iterators -- they reduce boilerplate and handle finalization.
  • Async comprehensions (async for x in ... inside [], {}, ()) work only inside async def functions. List/set/dict comprehensions are eager; generator expressions are lazy.
  • Always explicitly aclose() async generators that hold resources. The garbage collector's finalization hook is a safety net, not a primary cleanup mechanism.
  • Async generators compose into streaming pipelines with natural back-pressure and O(1) memory per stage.
  • aiter() and anext() built-ins (Python 3.10+) mirror iter() and next().
  • Use athrow() to inject exceptions into running async generators for error-recovery protocols.

Graded Practice Challenges

Level 1 -- Predict the Output

Question 1: What does this print?

import asyncio

async def gen():
yield 1
yield 2
yield 3

async def main():
g = gen()
print(await anext(g))
print(await anext(g))
await g.aclose()
print(await anext(g, "done"))

asyncio.run(main())
Answer
1
2
done

After aclose(), the generator is finalized. Calling anext() on a closed async generator raises StopAsyncIteration, which the default value "done" catches.

Question 2: What does this print?

import asyncio

async def counter():
i = 0
while True:
await asyncio.sleep(0)
yield i
i += 1

async def main():
results = [v async for v in counter() if v < 5]
print(results)

asyncio.run(main())
Answer

This runs forever and never prints. The comprehension filters with if v < 5, but it does not stop iterating when v >= 5. It continues calling __anext__() on the infinite generator, waiting for more values that pass the filter. There are none, so it hangs. You need to use async_islice or break the iteration explicitly.

Question 3: What does this print?

import asyncio

async def gen():
try:
yield 1
yield 2
yield 3
finally:
print("cleanup")

async def main():
async for v in gen():
print(v)
if v == 2:
break
print("after loop")

asyncio.run(main())
Answer
1
2
cleanup
after loop

When break exits the async for loop, Python calls aclose() on the generator, which triggers the finally block. The cleanup runs before "after loop" is printed because the aclose() is implicit in the loop teardown.

Question 4: What is the type of result?

import asyncio

async def gen():
for i in range(3):
yield i

async def main():
result = (x * 2 async for x in gen())
print(type(result).__name__)

asyncio.run(main())
Answer
async_generator

Parentheses around an async for comprehension create an async generator expression, not a tuple. It is lazy and produces values on demand.

Level 2 -- Debug Challenge

This async pipeline is supposed to read events, filter them, and collect results. It hangs indefinitely. Find and fix the bug.

import asyncio

async def event_source():
events = [
{"type": "click", "value": 1},
{"type": "view", "value": 2},
{"type": "click", "value": 3},
{"type": "view", "value": 4},
]
for e in events:
await asyncio.sleep(0.01)
yield e

async def filter_clicks(source):
async for event in source:
if event["type"] == "click":
yield event

async def main():
source = event_source()
clicks = filter_clicks(source)

results = []
while True:
event = await anext(clicks)
results.append(event)

print(results)

asyncio.run(main())
Answer

The while True loop calls anext(clicks) without catching StopAsyncIteration. When the source is exhausted, anext() raises StopAsyncIteration, but since there is no except clause and it is inside a while True (not an async for), the exception propagates up and crashes the program -- or in some contexts, is silently swallowed causing a hang.

Fix: use a default sentinel or async for:

async def main():
source = event_source()
clicks = filter_clicks(source)

# Option 1: Use async for
results = [event async for event in clicks]

# Option 2: Use sentinel
# SENTINEL = object()
# results = []
# while (event := await anext(clicks, SENTINEL)) is not SENTINEL:
# results.append(event)

print(results)

Level 3 -- Design Challenge

Design an async_merge function that takes multiple async iterators and yields items from all of them as they become available (interleaved, not sequential). Items should be yielded as soon as any source produces one, regardless of order. Handle the case where some sources are faster than others. Ensure all sources are properly closed when the merge generator is closed.

Design Hints
  1. Create a asyncio.Queue as the central collection point.
  2. Spawn one task per source that reads from the source and puts items into the queue.
  3. The main generator reads from the queue and yields items.
  4. Track how many sources are exhausted. When all are done, stop.
  5. On aclose(), cancel all spawned tasks and drain the queue.

Key considerations:

  • Use a sentinel value to signal that a source is exhausted (not None, since None might be a valid item).
  • Handle exceptions from individual sources without crashing the entire merge.
  • Use asyncio.TaskGroup (Lesson 3) or manual task management for cleanup.
async def async_merge(*sources):
queue = asyncio.Queue()
DONE = object()
active = len(sources)

async def reader(source):
nonlocal active
try:
async for item in source:
await queue.put(item)
finally:
await queue.put(DONE)

tasks = [asyncio.create_task(reader(s)) for s in sources]

try:
while active > 0:
item = await queue.get()
if item is DONE:
active -= 1
else:
yield item
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

What's Next

You now know how to build streaming async pipelines with generators and iterators. The next lesson, Async Context Managers, covers how to manage the lifecycle of async resources (connections, sessions, pools) with __aenter__ / __aexit__ and asynccontextmanager -- the glue that makes those database cursors and HTTP sessions in this lesson's examples work correctly.

© 2026 EngineersOfAI. All rights reserved.