Skip to main content

Python Async Context Managers Practice Problems & Exercises

Practice: Async Context Managers

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1Implement __aenter__ and __aexit__Easy
__aenter____aexit__async-with

Implement a simple async context manager class AsyncResource that prints acquisition and release messages.

import asyncio

class AsyncResource:
async def __aenter__(self):
# print "Acquiring resource" and return self
pass

async def __aexit__(self, exc_type, exc_val, exc_tb):
# print "Releasing resource"
pass

async def main():
async with AsyncResource() as resource:
print("Using resource")

asyncio.run(main())
Solution
import asyncio

class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
return None # do not suppress exceptions

async def main():
async with AsyncResource() as resource:
print("Using resource")

asyncio.run(main())

Protocol details:

  • __aenter__ is awaited when entering the async with block. Its return value is bound to the as variable.
  • __aexit__ is awaited when leaving the block — whether normally or due to an exception.
  • Returning None or False from __aexit__ lets exceptions propagate. Returning True suppresses them.
  • The three parameters to __aexit__ are the exception type, value, and traceback. All are None if no exception occurred.
  • This is identical to the sync __enter__/__exit__ protocol, just with async added and await used at the call site.
Expected Output
Acquiring resource\nUsing resource\nReleasing resource
Hints

Hint 1: __aenter__ must be an async method that returns the resource to bind to the "as" variable.

Hint 2: __aexit__ receives (exc_type, exc_val, exc_tb). Return True to suppress exceptions, None/False to propagate.

#2asynccontextmanager DecoratorEasy
asynccontextmanagercontextlibyield

Rewrite the database connection context manager using @asynccontextmanager instead of a class.

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_connection(host):
# setup: print "Opening connection to {host}"
# yield the connection object (use the string "conn" as a mock)
# teardown: print "Closing connection"
pass

async def main():
async with db_connection("db.example.com") as conn:
print("Running query")

asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_connection(host):
print(f"Opening connection to {host}")
conn = f"connection_to_{host}"
try:
yield conn
finally:
print("Closing connection")

async def main():
async with db_connection("db.example.com") as conn:
print("Running query")

asyncio.run(main())

@asynccontextmanager pattern:

  • The decorator wraps the generator function, handling the __aenter__/__aexit__ protocol automatically.
  • Code before yield = setup (__aenter__). The yielded value is what the as clause receives.
  • Code after yield = teardown (__aexit__).
  • Always use try/finally around the yield — the finally block runs even if the consumer raises an exception, ensuring cleanup always happens.
  • This is far more concise than writing a full class for simple resource management.
Expected Output
Opening connection to db.example.com\nRunning query\nClosing connection
Hints

Hint 1: Use @asynccontextmanager from contextlib. The function must be async and contain exactly one yield.

Hint 2: Code before yield is __aenter__; code after yield is __aexit__.

#3Timing Context ManagerEasy
asynccontextmanagertimingtime.monotonic

Write an @asynccontextmanager called timer(label) that prints how long the wrapped block took.

import asyncio
import time
from contextlib import asynccontextmanager

@asynccontextmanager
async def timer(label):
# record start time, yield, print elapsed
pass

async def main():
async with timer("Section"):
await asyncio.sleep(0.01)

asyncio.run(main())
Solution
import asyncio
import time
from contextlib import asynccontextmanager

@asynccontextmanager
async def timer(label):
start = time.monotonic()
try:
yield
finally:
elapsed = time.monotonic() - start
print(f"{label} ran in approximately {elapsed:.2f}s")

async def main():
async with timer("Section"):
await asyncio.sleep(0.01)

asyncio.run(main())

Design notes:

  • No value is yielded here (just bare yield) — the as clause is optional for context managers that exist purely for side effects.
  • time.monotonic() is the correct choice for measuring durations: it always increases and is unaffected by NTP adjustments or daylight saving time changes.
  • The finally guarantees the timing message prints even if the block raises an exception, which is the expected behavior for a profiling tool.
  • Production extension: add a threshold and only log if elapsed exceeds it — a simple "slow operation" detector.
Expected Output
Section ran in approximately 0.01s
Hints

Hint 1: Record time.monotonic() before yield and after yield.

Hint 2: Use time.monotonic() not time.time() — monotonic is guaranteed to never go backwards.


Medium

#4Context Manager with Exception SuppressionMedium
__aexit__exception-suppressionerror-handling

Write SuppressErrors(error_types) that suppresses specific exception types and logs the suppressed error.

import asyncio

class SuppressErrors:
def __init__(self, *error_types):
self._error_types = error_types

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
# if exc_type is in self._error_types, log and return True
pass

async def main():
async with SuppressErrors(ValueError):
raise ValueError("intentional error")
print("Execution continued after suppressed block")

asyncio.run(main())
Solution
import asyncio

class SuppressErrors:
def __init__(self, *error_types):
self._error_types = error_types

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None and issubclass(exc_type, self._error_types):
print(f"Caught and suppressed: {exc_val}")
return True # suppress
return None # propagate

async def main():
async with SuppressErrors(ValueError):
raise ValueError("intentional error")
print("Execution continued after suppressed block")

asyncio.run(main())

Exception suppression mechanics:

  • __aexit__ receives (exc_type, exc_val, exc_tb) when an exception escapes the block.
  • issubclass(exc_type, self._error_types) handles subclass matching — a KeyError would be suppressed if LookupError is in the list.
  • Returning True from __aexit__ tells Python to suppress the exception — execution continues after the async with block as if no exception occurred.
  • This is how contextlib.suppress works internally.
  • Production use: ignoring asyncio.CancelledError in cleanup code (though carefully — cancellation propagation is usually important).
Expected Output
Caught and suppressed: intentional error\nExecution continued after suppressed block
Hints

Hint 1: Return True from __aexit__ to suppress the exception.

Hint 2: Check exc_type is not None to detect whether an exception occurred.

#5Reentrant Async LockMedium
asynccontextmanagerasyncio.Lockreentrant-lock

Implement a reentrant async lock context manager that allows the same task to acquire it multiple times without deadlocking.

import asyncio

class ReentrantAsyncLock:
def __init__(self):
self._lock = asyncio.Lock()
self._owner = None
self._count = 0

async def __aenter__(self):
# if current task already owns the lock, increment count
# otherwise, acquire the underlying lock
pass

async def __aexit__(self, *args):
# decrement count; release underlying lock when count hits zero
pass

async def main():
rlock = ReentrantAsyncLock()
async with rlock:
print("Outer lock acquired")
async with rlock:
print("Inner lock acquired")
print("Inner released")
print("Outer released")

asyncio.run(main())
Solution
import asyncio

class ReentrantAsyncLock:
def __init__(self):
self._lock = asyncio.Lock()
self._owner = None
self._count = 0

async def __aenter__(self):
current = asyncio.current_task()
if self._owner is current:
self._count += 1
else:
await self._lock.acquire()
self._owner = current
self._count = 1
return self

async def __aexit__(self, *args):
self._count -= 1
if self._count == 0:
self._owner = None
self._lock.release()

async def main():
rlock = ReentrantAsyncLock()
async with rlock:
print("Outer lock acquired")
async with rlock:
print("Inner lock acquired")
print("Inner released")
print("Outer released")

asyncio.run(main())

Reentrant lock design:

  • asyncio.current_task() returns the Task object running the current coroutine — this is the "owner" identifier.
  • On re-entry, we just increment _count without trying to acquire the lock (which would deadlock).
  • On exit, decrement _count. Only release the real lock when _count reaches zero.
  • This mirrors threading.RLock behavior but for the async world.
  • Important caveat: this does NOT work across different coroutines called from the same task via await — they share the same task, so nested await calls could bypass the lock. Use with awareness of your call graph.
Expected Output
Outer lock acquired\nInner lock acquired\nInner released\nOuter released
Hints

Hint 1: A reentrant lock tracks how many times the current task has acquired it. Only release the underlying lock when the count drops to zero.

Hint 2: Use asyncio.current_task() to identify the current task.

#6AsyncExitStack for Dynamic CleanupMedium
AsyncExitStackdynamic-contextcontextlib

Use AsyncExitStack to manage a dynamic number of database connections — the count is not known until runtime.

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager

@asynccontextmanager
async def open_connection(conn_id):
print(f"Opened: conn_{conn_id}")
try:
yield f"conn_{conn_id}"
finally:
print(f"Closed: conn_{conn_id}")

async def main():
connection_ids = [0, 1, 2]
async with AsyncExitStack() as stack:
# enter each connection context dynamically
connections = []
for cid in connection_ids:
conn = await stack.enter_async_context(open_connection(cid))
connections.append(conn)
print("All queries done")

asyncio.run(main())
Solution
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager

@asynccontextmanager
async def open_connection(conn_id):
print(f"Opened: conn_{conn_id}")
try:
yield f"conn_{conn_id}"
finally:
print(f"Closed: conn_{conn_id}")

async def main():
connection_ids = [0, 1, 2]
async with AsyncExitStack() as stack:
connections = []
for cid in connection_ids:
conn = await stack.enter_async_context(open_connection(cid))
connections.append(conn)
print("All queries done")
# All connections closed here in LIFO order

asyncio.run(main())

AsyncExitStack use cases:

  • When you don't know at write time how many context managers you need (dynamic list from config, command-line args, etc.).
  • LIFO exit order ensures correct reverse-order cleanup (last opened, first closed).
  • stack.callback(fn) registers a plain async callback (not a context manager) for cleanup.
  • stack.push_async_exit(fn) registers an exit callback with (exc_type, exc_val, exc_tb) signature.
  • The stack itself is a context manager — all registrations are cleaned up when the stack's __aexit__ runs.
  • This is the production pattern for managing a pool of connections or a set of locks acquired based on runtime conditions.
Expected Output
Opened: conn_0\nOpened: conn_1\nOpened: conn_2\nAll queries done\nClosed: conn_2\nClosed: conn_1\nClosed: conn_0
Hints

Hint 1: AsyncExitStack.enter_async_context() registers a context manager dynamically.

Hint 2: All registered context managers are exited in LIFO order when the stack exits.

#7Context Manager Factory PatternMedium
asynccontextmanagerfactoryparameterized

Write a transaction(shard_id) async context manager that prints COMMIT on success and ROLLBACK on exception.

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def transaction(shard_id):
# print BEGIN, yield, then COMMIT or ROLLBACK
pass

async def main():
async with transaction("shard_0"):
print("INSERT INTO users")

try:
async with transaction("shard_1"):
print("INSERT INTO orders")
raise RuntimeError("DB constraint violated")
except RuntimeError:
pass # already rolled back

asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def transaction(shard_id):
print(f"Transaction BEGIN on {shard_id}")
try:
yield
print(f"Transaction COMMIT on {shard_id}")
except Exception:
print(f"Transaction ROLLBACK on {shard_id}")
raise # re-raise so the caller sees the exception

async def main():
async with transaction("shard_0"):
print("INSERT INTO users")

try:
async with transaction("shard_1"):
print("INSERT INTO orders")
raise RuntimeError("DB constraint violated")
except RuntimeError:
pass

asyncio.run(main())

Transaction pattern details:

  • The except Exception catches any exception from the yield (i.e., from inside the async with block).
  • raise re-raises it so the caller can handle it — the context manager handles cleanup but does not suppress the error.
  • If no exception occurs, execution falls through to the COMMIT line.
  • In a real database driver, yield would be replaced by yield conn and COMMIT/ROLLBACK would call actual DB methods.
  • Note the difference from try/finally: finally always runs both paths; using try/except/raise lets you differentiate between the success and failure paths.
Expected Output
Transaction BEGIN on shard_0\nINSERT INTO users\nTransaction COMMIT on shard_0\nTransaction BEGIN on shard_1\nINSERT INTO orders\nTransaction ROLLBACK on shard_1
Hints

Hint 1: The context manager should accept shard_id. Track whether the block raised an exception to decide commit vs rollback.

Hint 2: Check exc_type in __aexit__ to determine success or failure.


Hard

#8Connection Pool Context ManagerHard
connection-poolasyncio.Semaphoreasynccontextmanager

Implement a simple async connection pool that limits concurrent connections to a fixed size using a semaphore and queue.

import asyncio

class ConnectionPool:
def __init__(self, pool_size):
self._semaphore = asyncio.Semaphore(pool_size)
self._pool = asyncio.Queue()
for i in range(pool_size):
self._pool.put_nowait(f"conn_{i}")

async def acquire(self):
await self._semaphore.acquire()
conn = await self._pool.get()
return conn

async def release(self, conn):
await self._pool.put(conn)
self._semaphore.release()

def connection(self):
return _PooledConnection(self)

class _PooledConnection:
def __init__(self, pool):
self._pool = pool
self._conn = None

async def __aenter__(self):
self._conn = await self._pool.acquire()
return self._conn

async def __aexit__(self, *args):
await self._pool.release(self._conn)

async def main():
pool = ConnectionPool(pool_size=2)

async def worker(worker_id):
async with pool.connection() as conn:
print(f"Worker {worker_id} using {conn}")
await asyncio.sleep(0.01)

await asyncio.gather(*[worker(i) for i in range(5)])
print("All workers done")

asyncio.run(main())
Solution
import asyncio

class ConnectionPool:
def __init__(self, pool_size):
self._semaphore = asyncio.Semaphore(pool_size)
self._pool = asyncio.Queue()
for i in range(pool_size):
self._pool.put_nowait(f"conn_{i}")

async def acquire(self):
await self._semaphore.acquire()
conn = await self._pool.get()
return conn

async def release(self, conn):
await self._pool.put(conn)
self._semaphore.release()

def connection(self):
return _PooledConnection(self)

class _PooledConnection:
def __init__(self, pool):
self._pool = pool
self._conn = None

async def __aenter__(self):
self._conn = await self._pool.acquire()
return self._conn

async def __aexit__(self, *args):
await self._pool.release(self._conn)

async def main():
pool = ConnectionPool(pool_size=2)

async def worker(worker_id):
async with pool.connection() as conn:
print(f"Worker {worker_id} using {conn}")
await asyncio.sleep(0.01)

await asyncio.gather(*[worker(i) for i in range(5)])
print("All workers done")

asyncio.run(main())

Pool design analysis:

  • The semaphore limits the number of concurrent checkouts to pool_size.
  • The queue holds the actual connection objects — callers get a specific connection, not just a permit.
  • release puts the connection back AND releases the semaphore, in that order, to prevent a race where a waiting task acquires the semaphore but finds the queue empty.
  • Real connection pools (asyncpg, aioredis) add: connection health checks, max lifetime, timeout on acquire, and connection factory callbacks.
  • The _PooledConnection context manager is a separate object so pool.connection() returns a new context each time without being a coroutine itself.
Expected Output
See solution — connections checked out and returned to pool
Hints

Hint 1: Use asyncio.Semaphore to bound concurrency to pool_size. Track available connections in a queue.

Hint 2: On __aexit__, return the connection back to the pool.

#9Scoped Resource Manager with CallbacksHard
AsyncExitStackcallbacksdefer-patternscoped-cleanup

Implement a Scope class using AsyncExitStack that supports registering defer callbacks (Go-style) and handles cleanup in reverse registration order.

import asyncio
from contextlib import AsyncExitStack

class Scope:
def __init__(self):
self._stack = AsyncExitStack()

async def __aenter__(self):
await self._stack.__aenter__()
return self

async def __aexit__(self, *args):
return await self._stack.__aexit__(*args)

def defer(self, coro_fn, *args, **kwargs):
"""Register an async cleanup callback."""
self._stack.push_async_callback(coro_fn, *args, **kwargs)

async def main():
async with Scope() as scope:
# Open file
print("Opened file: data.csv")
scope.defer(print, "Closed file: data.csv")

# Start transaction
print("Started transaction")
scope.defer(print, "Rolled back transaction")

print("Work done")
raise RuntimeError("Simulated failure")

asyncio.run(main())
Solution
import asyncio
from contextlib import AsyncExitStack

class Scope:
def __init__(self):
self._stack = AsyncExitStack()

async def __aenter__(self):
await self._stack.__aenter__()
return self

async def __aexit__(self, *args):
return await self._stack.__aexit__(*args)

def defer(self, coro_fn, *args, **kwargs):
self._stack.push_async_callback(coro_fn, *args, **kwargs)

async def main():
try:
async with Scope() as scope:
print("Opened file: data.csv")
scope.defer(print, "Closed file: data.csv")

print("Started transaction")
scope.defer(print, "Rolled back transaction")

print("Work done")
raise RuntimeError("Simulated failure")
except RuntimeError:
pass # cleanup already ran

asyncio.run(main())

Defer / scoped cleanup pattern:

  • push_async_callback registers a plain async callable (not a context manager) for cleanup.
  • Callbacks run in LIFO order — the last registered runs first, mirroring how stack frames unwind.
  • This is the Python equivalent of Go's defer statement or C++'s RAII destructors.
  • The key advantage over explicit try/finally chains: cleanup is registered at the point of resource acquisition, keeping the code linear and readable.
  • AsyncExitStack handles exceptions correctly: if a cleanup callback raises, the exception is chained with any existing exception, and remaining cleanups still run.
Expected Output
Opened file: data.csv\nStarted transaction\nWork done\nRolled back transaction\nClosed file: data.csv
Hints

Hint 1: Use AsyncExitStack and register cleanup callbacks with stack.push_async_callback().

Hint 2: The defer pattern: register cleanup immediately after acquiring each resource so cleanup order matches acquisition order reversed.

#10Retry Context ManagerHard
retryasynccontextmanagerexponential-backofferror-handling

Implement a retry(max_attempts, backoff_base) context manager that automatically retries the block on failure with exponential backoff.

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def retry(max_attempts=3, backoff_base=0.01):
# attempt the block up to max_attempts times with exponential backoff
pass

async def main():
attempt_counter = [0]

async with retry(max_attempts=3, backoff_base=0.001):
attempt_counter[0] += 1
if attempt_counter[0] < 3:
print(f"Attempt {attempt_counter[0]} failed: transient error")
raise ConnectionError("transient error")
print(f"Attempt {attempt_counter[0]} succeeded")

asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def retry(max_attempts=3, backoff_base=0.01):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
yield
return # success — exit the generator
except Exception as e:
last_exc = e
if attempt < max_attempts:
await asyncio.sleep(backoff_base * (2 ** (attempt - 1)))
raise last_exc

async def main():
attempt_counter = [0]

async with retry(max_attempts=3, backoff_base=0.001):
attempt_counter[0] += 1
if attempt_counter[0] < 3:
print(f"Attempt {attempt_counter[0]} failed: transient error")
raise ConnectionError("transient error")
print(f"Attempt {attempt_counter[0]} succeeded")

asyncio.run(main())

Retry context manager design:

  • Each iteration of the loop yields once. If the block succeeds, return exits the generator cleanly.
  • If the block raises, the exception is caught, backoff sleep runs, and the next iteration yields again.
  • After max_attempts failures, the last exception is re-raised.
  • Exponential backoff: attempt 1 sleeps base, attempt 2 sleeps 2*base, attempt 3 sleeps 4*base.
  • Production extension: add a retryable_exceptions tuple to only retry specific error types; add jitter (random.uniform(0, 1) * backoff) to prevent thundering herd on a shared service.
Expected Output
Attempt 1 failed: transient error\nAttempt 2 failed: transient error\nAttempt 3 succeeded
Hints

Hint 1: Use a loop inside the async generator. After each failed attempt, sleep for backoff_base * 2^attempt seconds.

Hint 2: The consumer re-enters the context manager on each retry iteration.

#11Observability Context ManagerHard
observabilityasynccontextmanagerstructured-loggingmetrics

Build an observe(operation_name) context manager that logs structured span data: name, duration, outcome, and any exception details.

import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Span:
name: str
start: float = field(default_factory=time.monotonic)
duration: float = 0.0
success: bool = True
error: Optional[str] = None
tags: dict = field(default_factory=dict)

@asynccontextmanager
async def observe(operation_name):
# create Span, yield it, record duration and outcome on exit
pass

async def main():
async with observe("fetch_user") as span:
span.tags["user_id"] = 42
await asyncio.sleep(0.01)

try:
async with observe("update_order") as span:
span.tags["order_id"] = 99
raise ValueError("Stock depleted")
except ValueError:
pass

asyncio.run(main())
Solution
import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Span:
name: str
start: float = field(default_factory=time.monotonic)
duration: float = 0.0
success: bool = True
error: Optional[str] = None
tags: dict = field(default_factory=dict)

@asynccontextmanager
async def observe(operation_name):
span = Span(name=operation_name)
try:
yield span
span.success = True
except Exception as e:
span.success = False
span.error = f"{type(e).__name__}: {e}"
raise
finally:
span.duration = time.monotonic() - span.start
status = "OK" if span.success else "ERROR"
print(
f"[SPAN] {span.name} | {status} | {span.duration*1000:.1f}ms"
+ (f" | {span.error}" if span.error else "")
+ (f" | tags={span.tags}" if span.tags else "")
)

async def main():
async with observe("fetch_user") as span:
span.tags["user_id"] = 42
await asyncio.sleep(0.01)

try:
async with observe("update_order") as span:
span.tags["order_id"] = 99
raise ValueError("Stock depleted")
except ValueError:
pass

asyncio.run(main())

Observability pattern design:

  • The Span dataclass is a mutable object yielded to the consumer, allowing them to attach metadata without coupling to the context manager's internal state.
  • try/except/raise separates success and failure paths; finally always records duration.
  • This pattern is the foundation of distributed tracing (OpenTelemetry spans work similarly).
  • In production, replace print with a structured logger that emits JSON to a log aggregator.
  • The tags dict is the equivalent of OpenTelemetry span attributes — arbitrary key-value metadata attached at the call site.
Expected Output
See solution — span logs with timing, success/failure status, and exception details
Hints

Hint 1: Capture start time in setup. In teardown, determine success or failure from exc_type.

Hint 2: Yield a mutable object (like a dict or dataclass) so the consumer can attach metadata.

© 2026 EngineersOfAI. All rights reserved.