Skip to main content

Python Async Generators Practice Problems & Exercises

Practice: Async Generators and Iterators

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1Your First Async GeneratorEasy
async-generatorasync-foryield

Write an async generator count_up(n) that yields integers from 0 to n-1, then consume it with async for.

import asyncio

async def count_up(n):
# yield integers from 0 to n-1
pass

async def main():
async for value in count_up(5):
print(value)

asyncio.run(main())
Solution
import asyncio

async def count_up(n):
for i in range(n):
yield i

async def main():
async for value in count_up(5):
print(value)

asyncio.run(main())

Key points:

  • async def + yield = async generator. That combination is all it takes.
  • async for desugars to repeated calls to __anext__() on the async generator object, awaiting each one.
  • Unlike sync generators, async generators can contain await expressions — they can pause at both yield and await points, giving the event loop a chance to run other tasks.
  • The return type of calling an async generator function is AsyncGeneratorType, not a coroutine.
Expected Output
0\n1\n2\n3\n4
Hints

Hint 1: An async generator is an async def function that contains at least one yield statement.

Hint 2: Use "async for" to consume an async generator — you cannot use a regular for loop.

#2Implement __aiter__ and __anext__Easy
__aiter____anext__async-iterator-protocol

Implement the async iterator protocol manually on a class AsyncRange that yields a fixed list of values.

import asyncio

class AsyncRange:
def __init__(self, values):
self._values = values
self._index = 0

def __aiter__(self):
# return self
pass

async def __anext__(self):
# return next value or raise StopAsyncIteration
pass

async def main():
async for v in AsyncRange([10, 20, 30]):
print(v)

asyncio.run(main())
Solution
import asyncio

class AsyncRange:
def __init__(self, values):
self._values = values
self._index = 0

def __aiter__(self):
return self

async def __anext__(self):
if self._index >= len(self._values):
raise StopAsyncIteration
value = self._values[self._index]
self._index += 1
return value

async def main():
async for v in AsyncRange([10, 20, 30]):
print(v)

asyncio.run(main())

Protocol breakdown:

  • __aiter__ is a regular (sync) method — it is called once to get the iterator object. It should return self.
  • __anext__ is an async method. Each call must either return the next value or raise StopAsyncIteration.
  • async for calls __aiter__() once, then calls __anext__() and awaits it in a loop until StopAsyncIteration is raised.
  • Compare with the sync protocol: __iter__ / __next__ / StopIteration. The async version just adds async and Async prefixes.
Expected Output
10\n20\n30
Hints

Hint 1: __aiter__ must return self (the iterator object itself).

Hint 2: __anext__ must be an async method. Raise StopAsyncIteration when exhausted.

#3Async Generator with DelayEasy
async-generatorasyncio.sleepstreaming

Write an async generator stream_items(items) that yields each item after a brief asyncio.sleep(0) to simulate async I/O, then collect all results into a list.

import asyncio

async def stream_items(items):
# yield each item after asyncio.sleep(0)
pass

async def main():
results = []
async for item in stream_items(["item_0", "item_1", "item_2"]):
results.append(item)
print("\n".join(results))

asyncio.run(main())
Solution
import asyncio

async def stream_items(items):
for item in items:
await asyncio.sleep(0)
yield item

async def main():
results = []
async for item in stream_items(["item_0", "item_1", "item_2"]):
results.append(item)
print("\n".join(results))

asyncio.run(main())

Why asyncio.sleep(0) matters:

  • It inserts a checkpoint — a point where the event loop can run other coroutines.
  • In production, this await would be a real I/O call: reading from a socket, querying a database, etc.
  • Without an await, an async generator that only yields never actually relinquishes the event loop, and you lose the concurrency benefit of async.
  • asyncio.sleep(0) is the idiomatic "yield to event loop" pattern in asyncio code.
Expected Output
item_0\nitem_1\nitem_2
Hints

Hint 1: async generators can contain await expressions — this is what makes them useful for real streaming.

Hint 2: asyncio.sleep(0) yields control to the event loop without actually sleeping.


Medium

#4Async Generator PipelineMedium
async-pipelinegenerator-chainingasync-for

Build a two-stage async pipeline: async_source yields strings, async_upper takes an async iterable and yields uppercased strings.

import asyncio

async def async_source(items):
# yield each item after asyncio.sleep(0)
pass

async def async_upper(source):
# async for item in source, yield item.upper()
pass

async def main():
source = async_source(["hello", "world", "foo", "bar"])
pipeline = async_upper(source)
async for item in pipeline:
print(item)

asyncio.run(main())
Solution
import asyncio

async def async_source(items):
for item in items:
await asyncio.sleep(0)
yield item

async def async_upper(source):
async for item in source:
yield item.upper()

async def main():
source = async_source(["hello", "world", "foo", "bar"])
pipeline = async_upper(source)
async for item in pipeline:
print(item)

asyncio.run(main())

Pipeline pattern:

  • Each stage is an async generator that consumes an async iterable and yields transformed values.
  • Stages are lazy: no data flows until the consumer (async for in main) starts pulling.
  • This is the async equivalent of generator-based pipelines in sync Python (itertools chaining).
  • Real-world use: streaming HTTP response bytes → decode chunks → parse JSON lines → filter records — all without loading the entire dataset into memory.
  • The key insight: the consumer drives the pipeline by calling __anext__, which propagates back through each stage.
Expected Output
HELLO\nWORLD\nFOO\nBAR
Hints

Hint 1: Each stage is an async generator that takes an async iterable as input and yields transformed values.

Hint 2: You can pass one async generator as the argument to the next — they chain naturally.

#5Async Comprehension vs async forMedium
async-comprehensionasync-forlist-comprehension

Compare two ways to collect values from an async generator: an explicit async for loop and an async list comprehension. Verify they produce the same result.

import asyncio

async def squares(n):
for i in range(n):
await asyncio.sleep(0)
yield i * i

async def main():
# Method 1: explicit async for loop
result_loop = []
async for val in squares(5):
result_loop.append(val)

# Method 2: async list comprehension
result_comp = [] # replace this with an async comprehension

print(result_loop)
print(result_comp)
print(f"Results match: {result_loop == result_comp}")

asyncio.run(main())
Solution
import asyncio

async def squares(n):
for i in range(n):
await asyncio.sleep(0)
yield i * i

async def main():
result_loop = []
async for val in squares(5):
result_loop.append(val)

result_comp = [val async for val in squares(5)]

print(result_loop)
print(result_comp)
print(f"Results match: {result_loop == result_comp}")

asyncio.run(main())

Async comprehension rules:

  • Syntax: [expr async for var in async_iterable] or with filter: [expr async for var in async_iterable if condition]
  • Also works for {k: v async for ...} (dict), {v async for ...} (set), and (v async for ...) (async generator expression).
  • Must be inside an async def function — they require an event loop context.
  • await can also appear inside the expression: [await coro(x) async for x in source]
  • Performance is similar to the loop version; the comprehension is just more readable and Pythonic.
Expected Output
[0, 1, 4, 9, 16]\n[0, 1, 4, 9, 16]\nResults match: True
Hints

Hint 1: Async list comprehensions use the syntax: [expr async for item in async_gen]

Hint 2: They must be inside an async function — you cannot use them at module level.

#6Finalization with aclose()Medium
aclosefinalizationtry-finallyasync-generator

Write an async generator that prints a cleanup message in its finally block. Break early from the consumer and verify that cleanup still runs.

import asyncio

async def guarded_counter(n):
try:
for i in range(n):
await asyncio.sleep(0)
print(f"yielding {i}")
yield i
finally:
# print cleanup message
pass

async def main():
gen = guarded_counter(10)
results = []
async for val in gen:
results.append(val)
if val >= 1:
break
await gen.aclose()
print(f"Consumed: {results}")

asyncio.run(main())
Solution
import asyncio

async def guarded_counter(n):
try:
for i in range(n):
await asyncio.sleep(0)
print(f"yielding {i}")
yield i
finally:
print("Generator cleanup ran")

async def main():
gen = guarded_counter(10)
results = []
async for val in gen:
results.append(val)
if val >= 1:
break
await gen.aclose()
print(f"Consumed: {results}")

asyncio.run(main())

Finalization protocol:

  • aclose() is a coroutine that throws GeneratorExit into the generator at its current suspension point.
  • The finally block runs regardless of whether the generator exhausted naturally or was closed early.
  • This is the async equivalent of the sync generator's .close() method.
  • Critical production use case: releasing resources (database connections, file handles, HTTP sessions) when a streaming consumer stops early.
  • async for calls aclose() automatically when the loop exits normally or via break — but calling it explicitly (as shown) is the safe pattern when you hold a reference to the generator outside the loop.
Expected Output
yielding 0\nyielding 1\nGenerator cleanup ran\nConsumed: [0, 1]
Hints

Hint 1: Place cleanup logic in a try/finally block inside the async generator — the finally runs when the generator is closed.

Hint 2: Calling aclose() on an async generator throws GeneratorExit into it, triggering the finally block.

#7Batching Async StreamMedium
async-generatorbatchingstreamingpipeline

Write an async generator batch(source, size) that groups items from an async iterable into fixed-size batches.

import asyncio

async def number_source(n):
for i in range(n):
await asyncio.sleep(0)
yield i

async def batch(source, size):
# collect items into batches of `size` and yield each batch
pass

async def main():
async for b in batch(number_source(10), 3):
print(b)

asyncio.run(main())
Solution
import asyncio

async def number_source(n):
for i in range(n):
await asyncio.sleep(0)
yield i

async def batch(source, size):
current = []
async for item in source:
current.append(item)
if len(current) == size:
yield current
current = []
if current:
yield current

async def main():
async for b in batch(number_source(10), 3):
print(b)

asyncio.run(main())

Batching pattern analysis:

  • The trailing partial batch ([9] in this case) is a common bug — always check for leftover items after the source is exhausted.
  • In production, batching is critical for database bulk inserts, API calls with rate limits, and message queue publishing.
  • This pattern extends naturally: add a timeout parameter to yield a partial batch if no new items arrive within N seconds — a common pattern in real-time stream processing.
  • Note that current = [] after each yield creates a new list — do not use current.clear() as that would mutate the list that was just yielded, corrupting the consumer's data.
Expected Output
[0, 1, 2]\n[3, 4, 5]\n[6, 7, 8]\n[9]
Hints

Hint 1: Collect items into a list until the batch is full, then yield the batch and reset.

Hint 2: Do not forget to yield a final partial batch after the source is exhausted.


Hard

#8Async Generator with send()Hard
asendasync-generatorbidirectionalcoroutine-protocol

Write an async generator echo_gen() that yields received values back to the caller. Drive it manually using asend().

import asyncio

async def echo_gen():
received = None
while True:
received = yield received
if received is None:
break

async def main():
gen = echo_gen()
await gen.asend(None) # prime the generator
val1 = await gen.asend("hello")
print(f"Received: {val1}")
val2 = await gen.asend("world")
print(f"Received: {val2}")
await gen.aclose()
print("Done")

asyncio.run(main())
Solution
import asyncio

async def echo_gen():
received = None
while True:
received = yield received
if received is None:
break

async def main():
gen = echo_gen()
await gen.asend(None) # prime: advances to first yield
val1 = await gen.asend("hello")
print(f"Received: {val1}")
val2 = await gen.asend("world")
print(f"Received: {val2}")
await gen.aclose()
print("Done")

asyncio.run(main())

How asend() works:

  • asend(value) resumes the generator and injects value as the result of the yield expression.
  • __anext__() is equivalent to asend(None).
  • The first call must always send None (or use __anext__()) because the generator has not yet reached a yield — there is nowhere to inject a value yet.
  • This bidirectional protocol powers coroutine-style communication patterns, custom schedulers, and reactive stream processors.
  • Compare: the sync version uses generator.send(value). The async version uses await gen.asend(value).
Expected Output
Received: hello\nReceived: world\nDone
Hints

Hint 1: Use asend() instead of __anext__() to send a value into the generator. The sent value becomes the result of the yield expression.

Hint 2: The first call must be asend(None) or __anext__() to advance to the first yield.

#9Merge Multiple Async GeneratorsHard
async-generatorasyncio.Queuefan-inconcurrency

Write a merge_generators(*sources) async generator that interleaves values from multiple async generators concurrently.

import asyncio

async def slow_source(name, values, delay):
for v in values:
await asyncio.sleep(delay)
yield f"{name}:{v}"

async def merge_generators(*sources):
# Use a queue; launch a task per source; yield from queue
pass

async def main():
s1 = slow_source("A", [1, 2], 0.01)
s2 = slow_source("B", [3, 4], 0.015)
results = []
async for item in merge_generators(s1, s2):
results.append(item)
print(sorted(results))

asyncio.run(main())
Solution
import asyncio

async def slow_source(name, values, delay):
for v in values:
await asyncio.sleep(delay)
yield f"{name}:{v}"

async def merge_generators(*sources):
queue = asyncio.Queue()
active = len(sources)

async def drain(source):
nonlocal active
async for item in source:
await queue.put(item)
active -= 1
await queue.put(None) # sentinel

tasks = [asyncio.create_task(drain(src)) for src in sources]

finished = 0
while finished < len(sources):
item = await queue.get()
if item is None:
finished += 1
else:
yield item

for t in tasks:
await t

async def main():
s1 = slow_source("A", [1, 2], 0.01)
s2 = slow_source("B", [3, 4], 0.015)
results = []
async for item in merge_generators(s1, s2):
results.append(item)
print(sorted(results))

asyncio.run(main())

Fan-in pattern breakdown:

  • One task per source drains the source into a shared queue — this enables true concurrent reading.
  • A None sentinel per completed source lets the merger count completions without a separate signaling mechanism.
  • The nonlocal active counter is optional here since we use sentinel counting — but it can be useful for health monitoring.
  • Real-world application: merging WebSocket streams, combining results from parallel API calls, fan-in from multiple Kafka partitions.
  • Edge case to consider: if a source raises an exception, the task will die silently. Production code wraps drain in a try/except and puts the exception object into the queue.
Expected Output
See solution — interleaved values from all sources
Hints

Hint 1: Use an asyncio.Queue as a shared channel. Each source pushes into the queue; the merger pulls from it.

Hint 2: Track how many sources are active. When a source finishes, decrement the count. Stop when count reaches zero.

#10Rate-Limited Async GeneratorHard
rate-limitingasync-generatorasyncio.sleeptoken-bucket

Wrap an async generator with a rate_limit(source, rps) decorator that ensures at most rps items are yielded per second.

import asyncio
import time

async def fast_source(n):
for i in range(n):
yield i

async def rate_limit(source, rps):
# ensure at most rps items per second
pass

async def main():
start = time.monotonic()
count = 0
async for item in rate_limit(fast_source(6), rps=3):
count += 1
elapsed = time.monotonic() - start
print(f"Items: {count}, Elapsed: {elapsed:.2f}s, Rate: {count/elapsed:.1f}/s")

asyncio.run(main())
Solution
import asyncio
import time

async def fast_source(n):
for i in range(n):
yield i

async def rate_limit(source, rps):
interval = 1.0 / rps
last = time.monotonic() - interval # allow first item immediately
async for item in source:
now = time.monotonic()
wait = interval - (now - last)
if wait > 0:
await asyncio.sleep(wait)
last = time.monotonic()
yield item

async def main():
start = time.monotonic()
count = 0
async for item in rate_limit(fast_source(6), rps=3):
count += 1
elapsed = time.monotonic() - start
print(f"Items: {count}, Elapsed: {elapsed:.2f}s, Rate: {count/elapsed:.1f}/s")

asyncio.run(main())

Rate limiter design:

  • last = time.monotonic() - interval pre-primes the timer so the first item is emitted immediately.
  • wait = interval - (now - last) computes the remaining time in the current window.
  • We update last after sleeping (not before) to prevent drift accumulation over many items.
  • This is a "leaky bucket" approach — it enforces steady throughput. A token bucket approach would allow bursting up to a maximum.
  • Production extensions: add burst allowance, per-key rate limiting (using a dict of last-emission times), and adaptive rate adjustment based on downstream backpressure signals.
Expected Output
See solution — items emitted at a max rate of N per second
Hints

Hint 1: Track the time of the last emission. If items arrive faster than the limit, sleep for the remaining interval.

Hint 2: Use time.monotonic() for accurate timing — it is not affected by system clock changes.

#11Typed Async Pipeline with Error HandlingHard
async-generatorerror-handlingpipelinetyping

Build a resilient async pipeline where errors in individual items are caught, reported, and skipped rather than crashing the entire stream.

import asyncio

async def number_source(values):
for v in values:
await asyncio.sleep(0)
yield v

async def resilient_processor(source):
# process each item; on ValueError, print error and continue
async for item in source:
try:
if item < 0:
raise ValueError(f"cannot process {item}")
yield f"processed: {item}"
except ValueError as e:
print(f"ERROR: {e}")

async def main():
data = [1, 2, -1, 4, 5]
async for result in resilient_processor(number_source(data)):
print(result)

asyncio.run(main())
Solution
import asyncio

async def number_source(values):
for v in values:
await asyncio.sleep(0)
yield v

async def resilient_processor(source):
async for item in source:
try:
if item < 0:
raise ValueError(f"cannot process {item}")
yield f"processed: {item}"
except ValueError as e:
print(f"ERROR: {e}")
# continue to next item — no yield for errors

async def main():
data = [1, 2, -1, 4, 5]
async for result in resilient_processor(number_source(data)):
print(result)

asyncio.run(main())

Error handling in pipelines:

  • The try/except is inside the async for body, so errors are caught per-item.
  • When an exception is caught, the generator does not yield — effectively dropping the bad item.
  • Alternative strategies: yield an Either-style object (success or error), send errors to a dead-letter queue, or emit a structured error event downstream.
  • Critical insight: do NOT put the try/except around the async for itself — that would abort the entire pipeline on the first error. The per-item try/except is what makes the pipeline resilient.
  • This pattern is the foundation of production data pipelines where partial failures are expected and must not halt processing.
Expected Output
processed: 1\nprocessed: 2\nERROR: cannot process -1\nprocessed: 4\nprocessed: 5
Hints

Hint 1: Wrap the processing in a try/except inside the generator. On error, yield an error sentinel or handle inline.

Hint 2: Design the pipeline to continue processing remaining items even when one fails.

© 2026 EngineersOfAI. All rights reserved.