Skip to main content

Python Asyncio and Async/Await Practice Problems & Exercises

Practice: Asyncio and Async/Await

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1Your First CoroutineEasy
async defawaitasyncio.run

Write your first coroutine using async def and await, then run it with asyncio.run.

Solution:

import asyncio

async def greet(name):
await asyncio.sleep(0.1)
return f'Hello, {name}!'

result = asyncio.run(greet('World'))
print(result)
import asyncio

# TODO: Define an async function greet(name) that:
# - Awaits asyncio.sleep(0.1)
# - Returns "Hello, {name}!"
# Run it with asyncio.run() and print the result.

async def greet(name):
  pass

result = asyncio.run(greet('World'))
print(result)
Expected Output
Hello, World!
Hints

Hint 1: async def defines a coroutine — a function that can be paused.

Hint 2: await asyncio.sleep(0.1) suspends for 100ms without blocking the thread.

Hint 3: asyncio.run(coroutine) runs the coroutine in a new event loop.


#2Sequential vs Concurrent CoroutinesEasy
gathersequentialconcurrenttiming

Demonstrate that asyncio.gather runs coroutines concurrently, slashing total time versus sequential await.

Solution:

async def concurrent():
t0 = time.perf_counter()
results = await asyncio.gather(
fetch('url1', 0.1),
fetch('url2', 0.1),
fetch('url3', 0.1),
)
elapsed = time.perf_counter() - t0
return list(results), elapsed
import asyncio
import time

async def fetch(url, delay):
  await asyncio.sleep(delay)
  return f'Response from {url}'

async def sequential():
  t0 = time.perf_counter()
  r1 = await fetch('url1', 0.1)
  r2 = await fetch('url2', 0.1)
  r3 = await fetch('url3', 0.1)
  elapsed = time.perf_counter() - t0
  return [r1, r2, r3], elapsed

async def concurrent():
  t0 = time.perf_counter()
  # TODO: Run all three fetches concurrently using asyncio.gather
  results = None
  elapsed = time.perf_counter() - t0
  return results, elapsed

seq_results, seq_time = asyncio.run(sequential())
con_results, con_time = asyncio.run(concurrent())

print(f'Sequential: {seq_time:.2f}s')
print(f'Concurrent: {con_time:.2f}s')
print(f'Same results: {seq_results == con_results}')
Expected Output
Sequential: 0.30s
Concurrent: 0.10s
Same results: True
Hints

Hint 1: asyncio.gather(coro1, coro2, coro3) runs all concurrently and returns results in order.

Hint 2: Sequential awaits each one-by-one: 0.1 + 0.1 + 0.1 = 0.3s total.

Hint 3: Concurrent gather runs them simultaneously: max(0.1, 0.1, 0.1) = 0.1s total.


#3Create and Await a TaskEasy
create_taskTaskawait

Use asyncio.create_task() to schedule coroutines for concurrent execution.

Solution:

import asyncio

async def slow_multiply(a, b, delay=0.1):
await asyncio.sleep(delay)
return a * b

async def main():
t1 = asyncio.create_task(slow_multiply(3, 4))
t2 = asyncio.create_task(slow_multiply(5, 6))
result1 = await t1
result2 = await t2
print(result1)
print(result2)

asyncio.run(main())
import asyncio

async def slow_multiply(a, b, delay=0.1):
  await asyncio.sleep(delay)
  return a * b

async def main():
  # TODO: Create tasks for slow_multiply(3, 4) and slow_multiply(5, 6).
  # Await both tasks and print the results.
  # asyncio.create_task() schedules them to run concurrently.
  pass

asyncio.run(main())
Expected Output
12
30
Hints

Hint 1: t1 = asyncio.create_task(slow_multiply(3, 4)) schedules the coroutine.

Hint 2: result1 = await t1 gets the result once the task completes.

Hint 3: Tasks created with create_task() start immediately (not when awaited).


#4Async GeneratorEasy
async generatorasync foryield

Write an async generator that yields values with delays, and consume it with async for.

Solution:

import asyncio

async def countdown(n):
for i in range(n, 0, -1):
await asyncio.sleep(0.05)
yield i

async def main():
values = []
async for val in countdown(5):
values.append(val)
print(values)

asyncio.run(main())
import asyncio

# TODO: Define an async generator countdown(n) that yields
# n, n-1, ... 1 with a 0.05s delay between each.
# Use async for to iterate it and collect the values.

async def countdown(n):
  pass

async def main():
  values = []
  async for val in countdown(5):
      values.append(val)
  print(values)

asyncio.run(main())
Expected Output
[5, 4, 3, 2, 1]
Hints

Hint 1: async def countdown(n): ... yield n makes it an async generator.

Hint 2: await asyncio.sleep(0.05) inside the generator suspends without blocking.

Hint 3: async for val in countdown(5): iterates over the async generator.


#5asyncio.wait_for — TimeoutMedium
wait_fortimeoutasyncio.TimeoutError

Use asyncio.wait_for to add a timeout to a coroutine, returning a fallback value on timeout.

Solution:

import asyncio

async def run_with_timeout(coro, timeout):
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
return 'TIMEOUT'
import asyncio

async def slow_operation(duration):
  await asyncio.sleep(duration)
  return f'Completed in {duration}s'

async def run_with_timeout(coro, timeout):
  # TODO: Run coro with the given timeout.
  # Return the result if it completes in time.
  # Return "TIMEOUT" (string) if it exceeds the timeout.
  pass

async def main():
  r1 = await run_with_timeout(slow_operation(0.1), timeout=0.5)
  r2 = await run_with_timeout(slow_operation(1.0), timeout=0.3)
  print(r1)
  print(r2)

asyncio.run(main())
Expected Output
Completed in 0.1s
TIMEOUT
Hints

Hint 1: asyncio.wait_for(coro, timeout=N) cancels the coroutine after N seconds.

Hint 2: It raises asyncio.TimeoutError when the timeout elapses.

Hint 3: Catch asyncio.TimeoutError and return 'TIMEOUT' string.


#6gather with return_exceptionsMedium
gatherreturn_exceptionserror handling

Use gather(return_exceptions=True) to collect both results and exceptions from concurrent coroutines.

Solution:

import asyncio

async def main():
coros = [risky_fetch(n) for n in range(1, 7)]
results = await asyncio.gather(*coros, return_exceptions=True)

successes = [r for r in results if not isinstance(r, Exception)]
errors = [str(e) for r in results if isinstance(r, Exception) for e in [r]]

print(f'Successes: {successes}')
print(f'Errors: {errors}')
import asyncio

async def risky_fetch(n):
  await asyncio.sleep(0.05)
  if n % 3 == 0:
      raise ValueError(f'Failed for n={n}')
  return n * 10

async def main():
  # TODO: Run risky_fetch for n in 1..6 concurrently using gather.
  # Use return_exceptions=True so failures don't cancel other tasks.
  # Print successes and errors separately.
  pass

asyncio.run(main())
Expected Output
Successes: [10, 20, 40, 50]
Errors: ['Failed for n=3', 'Failed for n=6']
Hints

Hint 1: asyncio.gather(*coros, return_exceptions=True) returns exceptions as values instead of raising.

Hint 2: Iterate results and check isinstance(result, Exception) to separate errors from successes.

Hint 3: Without return_exceptions=True, the first exception cancels all remaining tasks.


#7Async Context ManagerMedium
async context manager__aenter____aexit__async with

Implement an async context manager using __aenter__ and __aexit__ to time async blocks.

Solution:

import asyncio
import time

class AsyncTimer:
def __init__(self):
self.elapsed_ms = 0.0
self._start = 0.0

async def __aenter__(self):
self._start = time.perf_counter()
return self

async def __aexit__(self, *args):
self.elapsed_ms = (time.perf_counter() - self._start) * 1000
import asyncio

# TODO: Build an AsyncTimer context manager that:
# - Records the start time on __aenter__
# - Records the end time on __aexit__
# - Exposes elapsed_ms after the block
# Use it to time an async operation.

class AsyncTimer:
  def __init__(self):
      self.elapsed_ms = 0.0

  async def __aenter__(self):
      pass

  async def __aexit__(self, *args):
      pass

async def main():
  timer = AsyncTimer()
  async with timer:
      await asyncio.sleep(0.15)
  print(f'Elapsed: {timer.elapsed_ms:.0f}ms (expected ~150ms)')
  print(f'In range: {140 <= timer.elapsed_ms <= 300}')

asyncio.run(main())
Expected Output
Elapsed: 150ms (expected ~150ms)
In range: True
Hints

Hint 1: __aenter__ must be async def and return self.

Hint 2: __aexit__(self, exc_type, exc_val, exc_tb) captures the end time.

Hint 3: import time; time.perf_counter() gives high-resolution timestamps.


#8Semaphore — Limit ConcurrencyMedium
Semaphorelimit concurrencyasyncio.Semaphore

Use asyncio.Semaphore to limit concurrent coroutine execution to at most N at a time.

Solution:

import asyncio

async def main():
sem = asyncio.Semaphore(3)
tasks = [asyncio.create_task(limited_fetch(sem, i)) for i in range(10)]
results = await asyncio.gather(*tasks)
return results

asyncio.run(main())
print(f'Max concurrent: {max_active[0]} (should be <= 3)')
import asyncio
import time

# Without a semaphore, all tasks run at once.
# With a semaphore(N), at most N tasks run concurrently.

active = [0]
max_active = [0]

async def limited_fetch(sem, item):
  async with sem:
      active[0] += 1
      max_active[0] = max(max_active[0], active[0])
      await asyncio.sleep(0.1)
      active[0] -= 1
      return item * 2

async def main():
  # TODO: Create a Semaphore(3) and run 10 limited_fetch tasks concurrently.
  # Verify max_active never exceeds 3.
  pass

asyncio.run(main())
print(f'Max concurrent: {max_active[0]} (should be <= 3)')
Expected Output
Max concurrent: 3 (should be <= 3)
Hints

Hint 1: sem = asyncio.Semaphore(3) limits to 3 concurrent executions.

Hint 2: async with sem: inside the coroutine acquires/releases automatically.

Hint 3: Create all tasks with asyncio.gather — the semaphore throttles concurrency.


#9as_completed — Process Results as They ArriveHard
as_completedstreaming resultsordering

Use asyncio.as_completed to process task results in the order they finish, not submission order.

Solution:

import asyncio
import random

async def variable_fetch(item, seed):
rng = random.Random(seed)
delay = rng.uniform(0.05, 0.25)
await asyncio.sleep(delay)
return item, delay

async def main():
tasks = [asyncio.create_task(variable_fetch(i, i)) for i in range(8)]
order = []
for coro in asyncio.as_completed(tasks):
item, delay = await coro
print(f' Item {item} completed in {delay:.3f}s')
order.append(item)
return order

order = asyncio.run(main())
print(f'Completed in order: {order}')
print(f'All 8 processed: {len(order) == 8}')
import asyncio
import random

async def variable_fetch(item, seed):
  rng = random.Random(seed)
  delay = rng.uniform(0.05, 0.25)
  await asyncio.sleep(delay)
  return item, delay

async def main():
  # TODO: Create 8 tasks with variable_fetch.
  # Use asyncio.as_completed to process each result as it arrives.
  # Print (item, delay) as they complete (not in submission order).
  # Return the order of items as they completed.
  pass

order = asyncio.run(main())
print(f'Completed in order: {order}')
print(f'All 8 processed: {len(order) == 8}')
Expected Output
Completed in order: [...]  (fastest first)
All 8 processed: True
Hints

Hint 1: tasks = [asyncio.create_task(variable_fetch(i, i)) for i in range(8)]

Hint 2: for coro in asyncio.as_completed(tasks): result = await coro

Hint 3: as_completed yields futures in completion order, not submission order.


#10Async Queue — Producer/Consumer PipelineHard
asyncio.Queueproducerconsumerpipeline

Build an async multi-consumer pipeline where multiple coroutines drain a shared asyncio.Queue.

Solution:

import asyncio
import random

async def producer(queue, n_items, n_consumers):
for i in range(n_items):
await asyncio.sleep(random.uniform(0.01, 0.05))
await queue.put(i)
for _ in range(n_consumers):
await queue.put(None)

async def consumer(queue, results):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.02)
results.append(item * item)
queue.task_done()

async def main():
queue = asyncio.Queue()
results = []
n_consumers = 3
await asyncio.gather(
producer(queue, 10, n_consumers),
consumer(queue, results),
consumer(queue, results),
consumer(queue, results),
)
return sorted(results)

results = asyncio.run(main())
print('Count:', len(results))
print('Sum:', sum(results))
import asyncio
import random

async def producer(queue, n_items):
  for i in range(n_items):
      await asyncio.sleep(random.uniform(0.01, 0.05))
      await queue.put(i)
  await queue.put(None)  # sentinel

async def consumer(queue, results):
  while True:
      item = await queue.get()
      if item is None:
          queue.task_done()
          break
      await asyncio.sleep(0.02)  # simulate processing
      results.append(item * item)
      queue.task_done()

async def main():
  # TODO: Run 1 producer and 3 consumers concurrently on the same asyncio.Queue.
  # Each consumer reads until it sees the sentinel.
  # Handle distributing sentinels correctly (one per consumer).
  # Return sorted results.
  pass

results = asyncio.run(main())
print('Count:', len(results))
print('Sum:', sum(results))
Expected Output
Count: 10
Sum: 285
Hints

Hint 1: queue = asyncio.Queue() is async-safe; use await queue.put() and await queue.get().

Hint 2: With 3 consumers, put 3 sentinel Nones to stop all of them.

Hint 3: asyncio.gather(producer(...), consumer(...), consumer(...), consumer(...)) runs all concurrently.


#11Rate-Limited Async FetcherHard
rate limitingSemaphoreasynciotoken bucket

Implement an async rate limiter using a sliding window to throttle coroutines to N per second.

Solution:

import asyncio
import time
import collections

class RateLimiter:
def __init__(self, max_rate, period=1.0):
self.max_rate = max_rate
self.period = period
self._slots = collections.deque()
self._lock = asyncio.Lock()

async def acquire(self):
async with self._lock:
now = time.monotonic()
# Remove expired slots
while self._slots and now - self._slots[0] >= self.period:
self._slots.popleft()

if len(self._slots) >= self.max_rate:
# Wait until the oldest slot expires
wait = self.period - (now - self._slots[0])
if wait > 0:
await asyncio.sleep(wait)
self._slots.popleft()

self._slots.append(time.monotonic())
import asyncio
import time
import collections

# Rate limiting: process at most N requests per second.

class RateLimiter:
  def __init__(self, max_rate, period=1.0):
      self.max_rate = max_rate
      self.period = period
      # TODO: implement token bucket or sliding window

  async def acquire(self):
      # Wait until a slot is available
      pass

async def fetch(rate_limiter, item):
  await rate_limiter.acquire()
  return item * 2

async def main():
  limiter = RateLimiter(max_rate=5, period=1.0)  # 5 per second
  t0 = time.perf_counter()
  tasks = [asyncio.create_task(fetch(limiter, i)) for i in range(10)]
  results = await asyncio.gather(*tasks)
  elapsed = time.perf_counter() - t0
  # 10 requests at 5/sec takes ~2 seconds
  print(f'Results: {sorted(results)}')
  print(f'Elapsed: {elapsed:.1f}s (expected ~2s)')
  print(f'Rate limited: {elapsed >= 1.5}')

asyncio.run(main())
Expected Output
Results: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Elapsed: 2.0s (expected ~2s)
Rate limited: True
Hints

Hint 1: Track request timestamps in a deque. If deque is full, wait until the oldest timestamp is old enough.

Hint 2: asyncio.sleep(wait_time) yields control while waiting for the next slot.

Hint 3: deque(maxlen=max_rate) automatically evicts old entries when full.

© 2026 EngineersOfAI. All rights reserved.