Python Async Generators Practice Problems & Exercises
Practice: Async Generators and Iterators
← Back to lessonEasy
Write an async generator count_up(n) that yields integers from 0 to n-1, then consume it with async for.
import asyncio
async def count_up(n):
# yield integers from 0 to n-1
pass
async def main():
async for value in count_up(5):
print(value)
asyncio.run(main())
Solution
import asyncio
async def count_up(n):
for i in range(n):
yield i
async def main():
async for value in count_up(5):
print(value)
asyncio.run(main())
Key points:
async def+yield= async generator. That combination is all it takes.async fordesugars to repeated calls to__anext__()on the async generator object, awaiting each one.- Unlike sync generators, async generators can contain
awaitexpressions — they can pause at bothyieldandawaitpoints, giving the event loop a chance to run other tasks. - The return type of calling an async generator function is
AsyncGeneratorType, not a coroutine.
Expected Output
0\n1\n2\n3\n4Hints
Hint 1: An async generator is an async def function that contains at least one yield statement.
Hint 2: Use "async for" to consume an async generator — you cannot use a regular for loop.
Implement the async iterator protocol manually on a class AsyncRange that yields a fixed list of values.
import asyncio
class AsyncRange:
def __init__(self, values):
self._values = values
self._index = 0
def __aiter__(self):
# return self
pass
async def __anext__(self):
# return next value or raise StopAsyncIteration
pass
async def main():
async for v in AsyncRange([10, 20, 30]):
print(v)
asyncio.run(main())
Solution
import asyncio
class AsyncRange:
def __init__(self, values):
self._values = values
self._index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self._index >= len(self._values):
raise StopAsyncIteration
value = self._values[self._index]
self._index += 1
return value
async def main():
async for v in AsyncRange([10, 20, 30]):
print(v)
asyncio.run(main())
Protocol breakdown:
__aiter__is a regular (sync) method — it is called once to get the iterator object. It should returnself.__anext__is anasyncmethod. Each call must either return the next value or raiseStopAsyncIteration.async forcalls__aiter__()once, then calls__anext__()and awaits it in a loop untilStopAsyncIterationis raised.- Compare with the sync protocol:
__iter__/__next__/StopIteration. The async version just addsasyncandAsyncprefixes.
Expected Output
10\n20\n30Hints
Hint 1: __aiter__ must return self (the iterator object itself).
Hint 2: __anext__ must be an async method. Raise StopAsyncIteration when exhausted.
Write an async generator stream_items(items) that yields each item after a brief asyncio.sleep(0) to simulate async I/O, then collect all results into a list.
import asyncio
async def stream_items(items):
# yield each item after asyncio.sleep(0)
pass
async def main():
results = []
async for item in stream_items(["item_0", "item_1", "item_2"]):
results.append(item)
print("\n".join(results))
asyncio.run(main())
Solution
import asyncio
async def stream_items(items):
for item in items:
await asyncio.sleep(0)
yield item
async def main():
results = []
async for item in stream_items(["item_0", "item_1", "item_2"]):
results.append(item)
print("\n".join(results))
asyncio.run(main())
Why asyncio.sleep(0) matters:
- It inserts a checkpoint — a point where the event loop can run other coroutines.
- In production, this
awaitwould be a real I/O call: reading from a socket, querying a database, etc. - Without an
await, an async generator that only yields never actually relinquishes the event loop, and you lose the concurrency benefit of async. asyncio.sleep(0)is the idiomatic "yield to event loop" pattern in asyncio code.
Expected Output
item_0\nitem_1\nitem_2Hints
Hint 1: async generators can contain await expressions — this is what makes them useful for real streaming.
Hint 2: asyncio.sleep(0) yields control to the event loop without actually sleeping.
Medium
Build a two-stage async pipeline: async_source yields strings, async_upper takes an async iterable and yields uppercased strings.
import asyncio
async def async_source(items):
# yield each item after asyncio.sleep(0)
pass
async def async_upper(source):
# async for item in source, yield item.upper()
pass
async def main():
source = async_source(["hello", "world", "foo", "bar"])
pipeline = async_upper(source)
async for item in pipeline:
print(item)
asyncio.run(main())
Solution
import asyncio
async def async_source(items):
for item in items:
await asyncio.sleep(0)
yield item
async def async_upper(source):
async for item in source:
yield item.upper()
async def main():
source = async_source(["hello", "world", "foo", "bar"])
pipeline = async_upper(source)
async for item in pipeline:
print(item)
asyncio.run(main())
Pipeline pattern:
- Each stage is an async generator that consumes an async iterable and yields transformed values.
- Stages are lazy: no data flows until the consumer (
async forinmain) starts pulling. - This is the async equivalent of generator-based pipelines in sync Python (
itertoolschaining). - Real-world use: streaming HTTP response bytes → decode chunks → parse JSON lines → filter records — all without loading the entire dataset into memory.
- The key insight: the consumer drives the pipeline by calling
__anext__, which propagates back through each stage.
Expected Output
HELLO\nWORLD\nFOO\nBARHints
Hint 1: Each stage is an async generator that takes an async iterable as input and yields transformed values.
Hint 2: You can pass one async generator as the argument to the next — they chain naturally.
Compare two ways to collect values from an async generator: an explicit async for loop and an async list comprehension. Verify they produce the same result.
import asyncio
async def squares(n):
for i in range(n):
await asyncio.sleep(0)
yield i * i
async def main():
# Method 1: explicit async for loop
result_loop = []
async for val in squares(5):
result_loop.append(val)
# Method 2: async list comprehension
result_comp = [] # replace this with an async comprehension
print(result_loop)
print(result_comp)
print(f"Results match: {result_loop == result_comp}")
asyncio.run(main())
Solution
import asyncio
async def squares(n):
for i in range(n):
await asyncio.sleep(0)
yield i * i
async def main():
result_loop = []
async for val in squares(5):
result_loop.append(val)
result_comp = [val async for val in squares(5)]
print(result_loop)
print(result_comp)
print(f"Results match: {result_loop == result_comp}")
asyncio.run(main())
Async comprehension rules:
- Syntax:
[expr async for var in async_iterable]or with filter:[expr async for var in async_iterable if condition] - Also works for
{k: v async for ...}(dict),{v async for ...}(set), and(v async for ...)(async generator expression). - Must be inside an
async deffunction — they require an event loop context. awaitcan also appear inside the expression:[await coro(x) async for x in source]- Performance is similar to the loop version; the comprehension is just more readable and Pythonic.
Expected Output
[0, 1, 4, 9, 16]\n[0, 1, 4, 9, 16]\nResults match: TrueHints
Hint 1: Async list comprehensions use the syntax: [expr async for item in async_gen]
Hint 2: They must be inside an async function — you cannot use them at module level.
Write an async generator that prints a cleanup message in its finally block. Break early from the consumer and verify that cleanup still runs.
import asyncio
async def guarded_counter(n):
try:
for i in range(n):
await asyncio.sleep(0)
print(f"yielding {i}")
yield i
finally:
# print cleanup message
pass
async def main():
gen = guarded_counter(10)
results = []
async for val in gen:
results.append(val)
if val >= 1:
break
await gen.aclose()
print(f"Consumed: {results}")
asyncio.run(main())
Solution
import asyncio
async def guarded_counter(n):
try:
for i in range(n):
await asyncio.sleep(0)
print(f"yielding {i}")
yield i
finally:
print("Generator cleanup ran")
async def main():
gen = guarded_counter(10)
results = []
async for val in gen:
results.append(val)
if val >= 1:
break
await gen.aclose()
print(f"Consumed: {results}")
asyncio.run(main())
Finalization protocol:
aclose()is a coroutine that throwsGeneratorExitinto the generator at its current suspension point.- The
finallyblock runs regardless of whether the generator exhausted naturally or was closed early. - This is the async equivalent of the sync generator's
.close()method. - Critical production use case: releasing resources (database connections, file handles, HTTP sessions) when a streaming consumer stops early.
async forcallsaclose()automatically when the loop exits normally or viabreak— but calling it explicitly (as shown) is the safe pattern when you hold a reference to the generator outside the loop.
Expected Output
yielding 0\nyielding 1\nGenerator cleanup ran\nConsumed: [0, 1]Hints
Hint 1: Place cleanup logic in a try/finally block inside the async generator — the finally runs when the generator is closed.
Hint 2: Calling aclose() on an async generator throws GeneratorExit into it, triggering the finally block.
Write an async generator batch(source, size) that groups items from an async iterable into fixed-size batches.
import asyncio
async def number_source(n):
for i in range(n):
await asyncio.sleep(0)
yield i
async def batch(source, size):
# collect items into batches of `size` and yield each batch
pass
async def main():
async for b in batch(number_source(10), 3):
print(b)
asyncio.run(main())
Solution
import asyncio
async def number_source(n):
for i in range(n):
await asyncio.sleep(0)
yield i
async def batch(source, size):
current = []
async for item in source:
current.append(item)
if len(current) == size:
yield current
current = []
if current:
yield current
async def main():
async for b in batch(number_source(10), 3):
print(b)
asyncio.run(main())
Batching pattern analysis:
- The trailing partial batch (
[9]in this case) is a common bug — always check for leftover items after the source is exhausted. - In production, batching is critical for database bulk inserts, API calls with rate limits, and message queue publishing.
- This pattern extends naturally: add a
timeoutparameter to yield a partial batch if no new items arrive within N seconds — a common pattern in real-time stream processing. - Note that
current = []after each yield creates a new list — do not usecurrent.clear()as that would mutate the list that was just yielded, corrupting the consumer's data.
Expected Output
[0, 1, 2]\n[3, 4, 5]\n[6, 7, 8]\n[9]Hints
Hint 1: Collect items into a list until the batch is full, then yield the batch and reset.
Hint 2: Do not forget to yield a final partial batch after the source is exhausted.
Hard
Write an async generator echo_gen() that yields received values back to the caller. Drive it manually using asend().
import asyncio
async def echo_gen():
received = None
while True:
received = yield received
if received is None:
break
async def main():
gen = echo_gen()
await gen.asend(None) # prime the generator
val1 = await gen.asend("hello")
print(f"Received: {val1}")
val2 = await gen.asend("world")
print(f"Received: {val2}")
await gen.aclose()
print("Done")
asyncio.run(main())
Solution
import asyncio
async def echo_gen():
received = None
while True:
received = yield received
if received is None:
break
async def main():
gen = echo_gen()
await gen.asend(None) # prime: advances to first yield
val1 = await gen.asend("hello")
print(f"Received: {val1}")
val2 = await gen.asend("world")
print(f"Received: {val2}")
await gen.aclose()
print("Done")
asyncio.run(main())
How asend() works:
asend(value)resumes the generator and injectsvalueas the result of theyieldexpression.__anext__()is equivalent toasend(None).- The first call must always send
None(or use__anext__()) because the generator has not yet reached ayield— there is nowhere to inject a value yet. - This bidirectional protocol powers coroutine-style communication patterns, custom schedulers, and reactive stream processors.
- Compare: the sync version uses
generator.send(value). The async version usesawait gen.asend(value).
Expected Output
Received: hello\nReceived: world\nDoneHints
Hint 1: Use asend() instead of __anext__() to send a value into the generator. The sent value becomes the result of the yield expression.
Hint 2: The first call must be asend(None) or __anext__() to advance to the first yield.
Write a merge_generators(*sources) async generator that interleaves values from multiple async generators concurrently.
import asyncio
async def slow_source(name, values, delay):
for v in values:
await asyncio.sleep(delay)
yield f"{name}:{v}"
async def merge_generators(*sources):
# Use a queue; launch a task per source; yield from queue
pass
async def main():
s1 = slow_source("A", [1, 2], 0.01)
s2 = slow_source("B", [3, 4], 0.015)
results = []
async for item in merge_generators(s1, s2):
results.append(item)
print(sorted(results))
asyncio.run(main())
Solution
import asyncio
async def slow_source(name, values, delay):
for v in values:
await asyncio.sleep(delay)
yield f"{name}:{v}"
async def merge_generators(*sources):
queue = asyncio.Queue()
active = len(sources)
async def drain(source):
nonlocal active
async for item in source:
await queue.put(item)
active -= 1
await queue.put(None) # sentinel
tasks = [asyncio.create_task(drain(src)) for src in sources]
finished = 0
while finished < len(sources):
item = await queue.get()
if item is None:
finished += 1
else:
yield item
for t in tasks:
await t
async def main():
s1 = slow_source("A", [1, 2], 0.01)
s2 = slow_source("B", [3, 4], 0.015)
results = []
async for item in merge_generators(s1, s2):
results.append(item)
print(sorted(results))
asyncio.run(main())
Fan-in pattern breakdown:
- One task per source drains the source into a shared queue — this enables true concurrent reading.
- A
Nonesentinel per completed source lets the merger count completions without a separate signaling mechanism. - The
nonlocal activecounter is optional here since we use sentinel counting — but it can be useful for health monitoring. - Real-world application: merging WebSocket streams, combining results from parallel API calls, fan-in from multiple Kafka partitions.
- Edge case to consider: if a source raises an exception, the task will die silently. Production code wraps
drainin a try/except and puts the exception object into the queue.
Expected Output
See solution — interleaved values from all sourcesHints
Hint 1: Use an asyncio.Queue as a shared channel. Each source pushes into the queue; the merger pulls from it.
Hint 2: Track how many sources are active. When a source finishes, decrement the count. Stop when count reaches zero.
Wrap an async generator with a rate_limit(source, rps) decorator that ensures at most rps items are yielded per second.
import asyncio
import time
async def fast_source(n):
for i in range(n):
yield i
async def rate_limit(source, rps):
# ensure at most rps items per second
pass
async def main():
start = time.monotonic()
count = 0
async for item in rate_limit(fast_source(6), rps=3):
count += 1
elapsed = time.monotonic() - start
print(f"Items: {count}, Elapsed: {elapsed:.2f}s, Rate: {count/elapsed:.1f}/s")
asyncio.run(main())
Solution
import asyncio
import time
async def fast_source(n):
for i in range(n):
yield i
async def rate_limit(source, rps):
interval = 1.0 / rps
last = time.monotonic() - interval # allow first item immediately
async for item in source:
now = time.monotonic()
wait = interval - (now - last)
if wait > 0:
await asyncio.sleep(wait)
last = time.monotonic()
yield item
async def main():
start = time.monotonic()
count = 0
async for item in rate_limit(fast_source(6), rps=3):
count += 1
elapsed = time.monotonic() - start
print(f"Items: {count}, Elapsed: {elapsed:.2f}s, Rate: {count/elapsed:.1f}/s")
asyncio.run(main())
Rate limiter design:
last = time.monotonic() - intervalpre-primes the timer so the first item is emitted immediately.wait = interval - (now - last)computes the remaining time in the current window.- We update
lastafter sleeping (not before) to prevent drift accumulation over many items. - This is a "leaky bucket" approach — it enforces steady throughput. A token bucket approach would allow bursting up to a maximum.
- Production extensions: add burst allowance, per-key rate limiting (using a dict of last-emission times), and adaptive rate adjustment based on downstream backpressure signals.
Expected Output
See solution — items emitted at a max rate of N per secondHints
Hint 1: Track the time of the last emission. If items arrive faster than the limit, sleep for the remaining interval.
Hint 2: Use time.monotonic() for accurate timing — it is not affected by system clock changes.
Build a resilient async pipeline where errors in individual items are caught, reported, and skipped rather than crashing the entire stream.
import asyncio
async def number_source(values):
for v in values:
await asyncio.sleep(0)
yield v
async def resilient_processor(source):
# process each item; on ValueError, print error and continue
async for item in source:
try:
if item < 0:
raise ValueError(f"cannot process {item}")
yield f"processed: {item}"
except ValueError as e:
print(f"ERROR: {e}")
async def main():
data = [1, 2, -1, 4, 5]
async for result in resilient_processor(number_source(data)):
print(result)
asyncio.run(main())
Solution
import asyncio
async def number_source(values):
for v in values:
await asyncio.sleep(0)
yield v
async def resilient_processor(source):
async for item in source:
try:
if item < 0:
raise ValueError(f"cannot process {item}")
yield f"processed: {item}"
except ValueError as e:
print(f"ERROR: {e}")
# continue to next item — no yield for errors
async def main():
data = [1, 2, -1, 4, 5]
async for result in resilient_processor(number_source(data)):
print(result)
asyncio.run(main())
Error handling in pipelines:
- The
try/exceptis inside theasync forbody, so errors are caught per-item. - When an exception is caught, the generator does not yield — effectively dropping the bad item.
- Alternative strategies: yield an
Either-style object (success or error), send errors to a dead-letter queue, or emit a structured error event downstream. - Critical insight: do NOT put the
try/exceptaround theasync foritself — that would abort the entire pipeline on the first error. The per-item try/except is what makes the pipeline resilient. - This pattern is the foundation of production data pipelines where partial failures are expected and must not halt processing.
Expected Output
processed: 1\nprocessed: 2\nERROR: cannot process -1\nprocessed: 4\nprocessed: 5Hints
Hint 1: Wrap the processing in a try/except inside the generator. On error, yield an error sentinel or handle inline.
Hint 2: Design the pipeline to continue processing remaining items even when one fails.
