Python Asyncio and Async/Await Practice Problems & Exercises
Practice: Asyncio and Async/Await
← Back to lessonWrite your first coroutine using async def and await, then run it with asyncio.run.
Solution:
import asyncio
async def greet(name):
await asyncio.sleep(0.1)
return f'Hello, {name}!'
result = asyncio.run(greet('World'))
print(result)
import asyncio
# TODO: Define an async function greet(name) that:
# - Awaits asyncio.sleep(0.1)
# - Returns "Hello, {name}!"
# Run it with asyncio.run() and print the result.
async def greet(name):
pass
result = asyncio.run(greet('World'))
print(result)
Expected Output
Hello, World!Hints
Hint 1: async def defines a coroutine — a function that can be paused.
Hint 2: await asyncio.sleep(0.1) suspends for 100ms without blocking the thread.
Hint 3: asyncio.run(coroutine) runs the coroutine in a new event loop.
Demonstrate that asyncio.gather runs coroutines concurrently, slashing total time versus sequential await.
Solution:
async def concurrent():
t0 = time.perf_counter()
results = await asyncio.gather(
fetch('url1', 0.1),
fetch('url2', 0.1),
fetch('url3', 0.1),
)
elapsed = time.perf_counter() - t0
return list(results), elapsed
import asyncio
import time
async def fetch(url, delay):
await asyncio.sleep(delay)
return f'Response from {url}'
async def sequential():
t0 = time.perf_counter()
r1 = await fetch('url1', 0.1)
r2 = await fetch('url2', 0.1)
r3 = await fetch('url3', 0.1)
elapsed = time.perf_counter() - t0
return [r1, r2, r3], elapsed
async def concurrent():
t0 = time.perf_counter()
# TODO: Run all three fetches concurrently using asyncio.gather
results = None
elapsed = time.perf_counter() - t0
return results, elapsed
seq_results, seq_time = asyncio.run(sequential())
con_results, con_time = asyncio.run(concurrent())
print(f'Sequential: {seq_time:.2f}s')
print(f'Concurrent: {con_time:.2f}s')
print(f'Same results: {seq_results == con_results}')
Expected Output
Sequential: 0.30s
Concurrent: 0.10s
Same results: TrueHints
Hint 1: asyncio.gather(coro1, coro2, coro3) runs all concurrently and returns results in order.
Hint 2: Sequential awaits each one-by-one: 0.1 + 0.1 + 0.1 = 0.3s total.
Hint 3: Concurrent gather runs them simultaneously: max(0.1, 0.1, 0.1) = 0.1s total.
Use asyncio.create_task() to schedule coroutines for concurrent execution.
Solution:
import asyncio
async def slow_multiply(a, b, delay=0.1):
await asyncio.sleep(delay)
return a * b
async def main():
t1 = asyncio.create_task(slow_multiply(3, 4))
t2 = asyncio.create_task(slow_multiply(5, 6))
result1 = await t1
result2 = await t2
print(result1)
print(result2)
asyncio.run(main())
import asyncio
async def slow_multiply(a, b, delay=0.1):
await asyncio.sleep(delay)
return a * b
async def main():
# TODO: Create tasks for slow_multiply(3, 4) and slow_multiply(5, 6).
# Await both tasks and print the results.
# asyncio.create_task() schedules them to run concurrently.
pass
asyncio.run(main())
Expected Output
12
30Hints
Hint 1: t1 = asyncio.create_task(slow_multiply(3, 4)) schedules the coroutine.
Hint 2: result1 = await t1 gets the result once the task completes.
Hint 3: Tasks created with create_task() start immediately (not when awaited).
Write an async generator that yields values with delays, and consume it with async for.
Solution:
import asyncio
async def countdown(n):
for i in range(n, 0, -1):
await asyncio.sleep(0.05)
yield i
async def main():
values = []
async for val in countdown(5):
values.append(val)
print(values)
asyncio.run(main())
import asyncio
# TODO: Define an async generator countdown(n) that yields
# n, n-1, ... 1 with a 0.05s delay between each.
# Use async for to iterate it and collect the values.
async def countdown(n):
pass
async def main():
values = []
async for val in countdown(5):
values.append(val)
print(values)
asyncio.run(main())
Expected Output
[5, 4, 3, 2, 1]Hints
Hint 1: async def countdown(n): ... yield n makes it an async generator.
Hint 2: await asyncio.sleep(0.05) inside the generator suspends without blocking.
Hint 3: async for val in countdown(5): iterates over the async generator.
Use asyncio.wait_for to add a timeout to a coroutine, returning a fallback value on timeout.
Solution:
import asyncio
async def run_with_timeout(coro, timeout):
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return 'TIMEOUT'
import asyncio
async def slow_operation(duration):
await asyncio.sleep(duration)
return f'Completed in {duration}s'
async def run_with_timeout(coro, timeout):
# TODO: Run coro with the given timeout.
# Return the result if it completes in time.
# Return "TIMEOUT" (string) if it exceeds the timeout.
pass
async def main():
r1 = await run_with_timeout(slow_operation(0.1), timeout=0.5)
r2 = await run_with_timeout(slow_operation(1.0), timeout=0.3)
print(r1)
print(r2)
asyncio.run(main())
Expected Output
Completed in 0.1s
TIMEOUTHints
Hint 1: asyncio.wait_for(coro, timeout=N) cancels the coroutine after N seconds.
Hint 2: It raises asyncio.TimeoutError when the timeout elapses.
Hint 3: Catch asyncio.TimeoutError and return 'TIMEOUT' string.
Use gather(return_exceptions=True) to collect both results and exceptions from concurrent coroutines.
Solution:
import asyncio
async def main():
coros = [risky_fetch(n) for n in range(1, 7)]
results = await asyncio.gather(*coros, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
errors = [str(e) for r in results if isinstance(r, Exception) for e in [r]]
print(f'Successes: {successes}')
print(f'Errors: {errors}')
import asyncio
async def risky_fetch(n):
await asyncio.sleep(0.05)
if n % 3 == 0:
raise ValueError(f'Failed for n={n}')
return n * 10
async def main():
# TODO: Run risky_fetch for n in 1..6 concurrently using gather.
# Use return_exceptions=True so failures don't cancel other tasks.
# Print successes and errors separately.
pass
asyncio.run(main())
Expected Output
Successes: [10, 20, 40, 50]
Errors: ['Failed for n=3', 'Failed for n=6']Hints
Hint 1: asyncio.gather(*coros, return_exceptions=True) returns exceptions as values instead of raising.
Hint 2: Iterate results and check isinstance(result, Exception) to separate errors from successes.
Hint 3: Without return_exceptions=True, the first exception cancels all remaining tasks.
Implement an async context manager using __aenter__ and __aexit__ to time async blocks.
Solution:
import asyncio
import time
class AsyncTimer:
def __init__(self):
self.elapsed_ms = 0.0
self._start = 0.0
async def __aenter__(self):
self._start = time.perf_counter()
return self
async def __aexit__(self, *args):
self.elapsed_ms = (time.perf_counter() - self._start) * 1000
import asyncio
# TODO: Build an AsyncTimer context manager that:
# - Records the start time on __aenter__
# - Records the end time on __aexit__
# - Exposes elapsed_ms after the block
# Use it to time an async operation.
class AsyncTimer:
def __init__(self):
self.elapsed_ms = 0.0
async def __aenter__(self):
pass
async def __aexit__(self, *args):
pass
async def main():
timer = AsyncTimer()
async with timer:
await asyncio.sleep(0.15)
print(f'Elapsed: {timer.elapsed_ms:.0f}ms (expected ~150ms)')
print(f'In range: {140 <= timer.elapsed_ms <= 300}')
asyncio.run(main())
Expected Output
Elapsed: 150ms (expected ~150ms)
In range: TrueHints
Hint 1: __aenter__ must be async def and return self.
Hint 2: __aexit__(self, exc_type, exc_val, exc_tb) captures the end time.
Hint 3: import time; time.perf_counter() gives high-resolution timestamps.
Use asyncio.Semaphore to limit concurrent coroutine execution to at most N at a time.
Solution:
import asyncio
async def main():
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(limited_fetch(sem, i)) for i in range(10)]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
print(f'Max concurrent: {max_active[0]} (should be <= 3)')
import asyncio
import time
# Without a semaphore, all tasks run at once.
# With a semaphore(N), at most N tasks run concurrently.
active = [0]
max_active = [0]
async def limited_fetch(sem, item):
async with sem:
active[0] += 1
max_active[0] = max(max_active[0], active[0])
await asyncio.sleep(0.1)
active[0] -= 1
return item * 2
async def main():
# TODO: Create a Semaphore(3) and run 10 limited_fetch tasks concurrently.
# Verify max_active never exceeds 3.
pass
asyncio.run(main())
print(f'Max concurrent: {max_active[0]} (should be <= 3)')
Expected Output
Max concurrent: 3 (should be <= 3)Hints
Hint 1: sem = asyncio.Semaphore(3) limits to 3 concurrent executions.
Hint 2: async with sem: inside the coroutine acquires/releases automatically.
Hint 3: Create all tasks with asyncio.gather — the semaphore throttles concurrency.
Use asyncio.as_completed to process task results in the order they finish, not submission order.
Solution:
import asyncio
import random
async def variable_fetch(item, seed):
rng = random.Random(seed)
delay = rng.uniform(0.05, 0.25)
await asyncio.sleep(delay)
return item, delay
async def main():
tasks = [asyncio.create_task(variable_fetch(i, i)) for i in range(8)]
order = []
for coro in asyncio.as_completed(tasks):
item, delay = await coro
print(f' Item {item} completed in {delay:.3f}s')
order.append(item)
return order
order = asyncio.run(main())
print(f'Completed in order: {order}')
print(f'All 8 processed: {len(order) == 8}')
import asyncio
import random
async def variable_fetch(item, seed):
rng = random.Random(seed)
delay = rng.uniform(0.05, 0.25)
await asyncio.sleep(delay)
return item, delay
async def main():
# TODO: Create 8 tasks with variable_fetch.
# Use asyncio.as_completed to process each result as it arrives.
# Print (item, delay) as they complete (not in submission order).
# Return the order of items as they completed.
pass
order = asyncio.run(main())
print(f'Completed in order: {order}')
print(f'All 8 processed: {len(order) == 8}')
Expected Output
Completed in order: [...] (fastest first)
All 8 processed: TrueHints
Hint 1: tasks = [asyncio.create_task(variable_fetch(i, i)) for i in range(8)]
Hint 2: for coro in asyncio.as_completed(tasks): result = await coro
Hint 3: as_completed yields futures in completion order, not submission order.
Build an async multi-consumer pipeline where multiple coroutines drain a shared asyncio.Queue.
Solution:
import asyncio
import random
async def producer(queue, n_items, n_consumers):
for i in range(n_items):
await asyncio.sleep(random.uniform(0.01, 0.05))
await queue.put(i)
for _ in range(n_consumers):
await queue.put(None)
async def consumer(queue, results):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.02)
results.append(item * item)
queue.task_done()
async def main():
queue = asyncio.Queue()
results = []
n_consumers = 3
await asyncio.gather(
producer(queue, 10, n_consumers),
consumer(queue, results),
consumer(queue, results),
consumer(queue, results),
)
return sorted(results)
results = asyncio.run(main())
print('Count:', len(results))
print('Sum:', sum(results))
import asyncio
import random
async def producer(queue, n_items):
for i in range(n_items):
await asyncio.sleep(random.uniform(0.01, 0.05))
await queue.put(i)
await queue.put(None) # sentinel
async def consumer(queue, results):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.02) # simulate processing
results.append(item * item)
queue.task_done()
async def main():
# TODO: Run 1 producer and 3 consumers concurrently on the same asyncio.Queue.
# Each consumer reads until it sees the sentinel.
# Handle distributing sentinels correctly (one per consumer).
# Return sorted results.
pass
results = asyncio.run(main())
print('Count:', len(results))
print('Sum:', sum(results))
Expected Output
Count: 10
Sum: 285Hints
Hint 1: queue = asyncio.Queue() is async-safe; use await queue.put() and await queue.get().
Hint 2: With 3 consumers, put 3 sentinel Nones to stop all of them.
Hint 3: asyncio.gather(producer(...), consumer(...), consumer(...), consumer(...)) runs all concurrently.
Implement an async rate limiter using a sliding window to throttle coroutines to N per second.
Solution:
import asyncio
import time
import collections
class RateLimiter:
def __init__(self, max_rate, period=1.0):
self.max_rate = max_rate
self.period = period
self._slots = collections.deque()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
# Remove expired slots
while self._slots and now - self._slots[0] >= self.period:
self._slots.popleft()
if len(self._slots) >= self.max_rate:
# Wait until the oldest slot expires
wait = self.period - (now - self._slots[0])
if wait > 0:
await asyncio.sleep(wait)
self._slots.popleft()
self._slots.append(time.monotonic())
import asyncio
import time
import collections
# Rate limiting: process at most N requests per second.
class RateLimiter:
def __init__(self, max_rate, period=1.0):
self.max_rate = max_rate
self.period = period
# TODO: implement token bucket or sliding window
async def acquire(self):
# Wait until a slot is available
pass
async def fetch(rate_limiter, item):
await rate_limiter.acquire()
return item * 2
async def main():
limiter = RateLimiter(max_rate=5, period=1.0) # 5 per second
t0 = time.perf_counter()
tasks = [asyncio.create_task(fetch(limiter, i)) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - t0
# 10 requests at 5/sec takes ~2 seconds
print(f'Results: {sorted(results)}')
print(f'Elapsed: {elapsed:.1f}s (expected ~2s)')
print(f'Rate limited: {elapsed >= 1.5}')
asyncio.run(main())
Expected Output
Results: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Elapsed: 2.0s (expected ~2s)
Rate limited: TrueHints
Hint 1: Track request timestamps in a deque. If deque is full, wait until the oldest timestamp is old enough.
Hint 2: asyncio.sleep(wait_time) yields control while waiting for the next slot.
Hint 3: deque(maxlen=max_rate) automatically evicts old entries when full.
