Python Async Context Managers Practice Problems & Exercises
Practice: Async Context Managers
← Back to lessonEasy
Implement a simple async context manager class AsyncResource that prints acquisition and release messages.
import asyncio
class AsyncResource:
async def __aenter__(self):
# print "Acquiring resource" and return self
pass
async def __aexit__(self, exc_type, exc_val, exc_tb):
# print "Releasing resource"
pass
async def main():
async with AsyncResource() as resource:
print("Using resource")
asyncio.run(main())
Solution
import asyncio
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
return None # do not suppress exceptions
async def main():
async with AsyncResource() as resource:
print("Using resource")
asyncio.run(main())
Protocol details:
__aenter__is awaited when entering theasync withblock. Its return value is bound to theasvariable.__aexit__is awaited when leaving the block — whether normally or due to an exception.- Returning
NoneorFalsefrom__aexit__lets exceptions propagate. ReturningTruesuppresses them. - The three parameters to
__aexit__are the exception type, value, and traceback. All areNoneif no exception occurred. - This is identical to the sync
__enter__/__exit__protocol, just withasyncadded andawaitused at the call site.
Expected Output
Acquiring resource\nUsing resource\nReleasing resourceHints
Hint 1: __aenter__ must be an async method that returns the resource to bind to the "as" variable.
Hint 2: __aexit__ receives (exc_type, exc_val, exc_tb). Return True to suppress exceptions, None/False to propagate.
Rewrite the database connection context manager using @asynccontextmanager instead of a class.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def db_connection(host):
# setup: print "Opening connection to {host}"
# yield the connection object (use the string "conn" as a mock)
# teardown: print "Closing connection"
pass
async def main():
async with db_connection("db.example.com") as conn:
print("Running query")
asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def db_connection(host):
print(f"Opening connection to {host}")
conn = f"connection_to_{host}"
try:
yield conn
finally:
print("Closing connection")
async def main():
async with db_connection("db.example.com") as conn:
print("Running query")
asyncio.run(main())
@asynccontextmanager pattern:
- The decorator wraps the generator function, handling the
__aenter__/__aexit__protocol automatically. - Code before
yield= setup (__aenter__). The yielded value is what theasclause receives. - Code after
yield= teardown (__aexit__). - Always use
try/finallyaround theyield— thefinallyblock runs even if the consumer raises an exception, ensuring cleanup always happens. - This is far more concise than writing a full class for simple resource management.
Expected Output
Opening connection to db.example.com\nRunning query\nClosing connectionHints
Hint 1: Use @asynccontextmanager from contextlib. The function must be async and contain exactly one yield.
Hint 2: Code before yield is __aenter__; code after yield is __aexit__.
Write an @asynccontextmanager called timer(label) that prints how long the wrapped block took.
import asyncio
import time
from contextlib import asynccontextmanager
@asynccontextmanager
async def timer(label):
# record start time, yield, print elapsed
pass
async def main():
async with timer("Section"):
await asyncio.sleep(0.01)
asyncio.run(main())
Solution
import asyncio
import time
from contextlib import asynccontextmanager
@asynccontextmanager
async def timer(label):
start = time.monotonic()
try:
yield
finally:
elapsed = time.monotonic() - start
print(f"{label} ran in approximately {elapsed:.2f}s")
async def main():
async with timer("Section"):
await asyncio.sleep(0.01)
asyncio.run(main())
Design notes:
- No value is yielded here (just bare
yield) — theasclause is optional for context managers that exist purely for side effects. time.monotonic()is the correct choice for measuring durations: it always increases and is unaffected by NTP adjustments or daylight saving time changes.- The
finallyguarantees the timing message prints even if the block raises an exception, which is the expected behavior for a profiling tool. - Production extension: add a threshold and only log if elapsed exceeds it — a simple "slow operation" detector.
Expected Output
Section ran in approximately 0.01sHints
Hint 1: Record time.monotonic() before yield and after yield.
Hint 2: Use time.monotonic() not time.time() — monotonic is guaranteed to never go backwards.
Medium
Write SuppressErrors(error_types) that suppresses specific exception types and logs the suppressed error.
import asyncio
class SuppressErrors:
def __init__(self, *error_types):
self._error_types = error_types
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# if exc_type is in self._error_types, log and return True
pass
async def main():
async with SuppressErrors(ValueError):
raise ValueError("intentional error")
print("Execution continued after suppressed block")
asyncio.run(main())
Solution
import asyncio
class SuppressErrors:
def __init__(self, *error_types):
self._error_types = error_types
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None and issubclass(exc_type, self._error_types):
print(f"Caught and suppressed: {exc_val}")
return True # suppress
return None # propagate
async def main():
async with SuppressErrors(ValueError):
raise ValueError("intentional error")
print("Execution continued after suppressed block")
asyncio.run(main())
Exception suppression mechanics:
__aexit__receives(exc_type, exc_val, exc_tb)when an exception escapes the block.issubclass(exc_type, self._error_types)handles subclass matching — aKeyErrorwould be suppressed ifLookupErroris in the list.- Returning
Truefrom__aexit__tells Python to suppress the exception — execution continues after theasync withblock as if no exception occurred. - This is how
contextlib.suppressworks internally. - Production use: ignoring
asyncio.CancelledErrorin cleanup code (though carefully — cancellation propagation is usually important).
Expected Output
Caught and suppressed: intentional error\nExecution continued after suppressed blockHints
Hint 1: Return True from __aexit__ to suppress the exception.
Hint 2: Check exc_type is not None to detect whether an exception occurred.
Implement a reentrant async lock context manager that allows the same task to acquire it multiple times without deadlocking.
import asyncio
class ReentrantAsyncLock:
def __init__(self):
self._lock = asyncio.Lock()
self._owner = None
self._count = 0
async def __aenter__(self):
# if current task already owns the lock, increment count
# otherwise, acquire the underlying lock
pass
async def __aexit__(self, *args):
# decrement count; release underlying lock when count hits zero
pass
async def main():
rlock = ReentrantAsyncLock()
async with rlock:
print("Outer lock acquired")
async with rlock:
print("Inner lock acquired")
print("Inner released")
print("Outer released")
asyncio.run(main())
Solution
import asyncio
class ReentrantAsyncLock:
def __init__(self):
self._lock = asyncio.Lock()
self._owner = None
self._count = 0
async def __aenter__(self):
current = asyncio.current_task()
if self._owner is current:
self._count += 1
else:
await self._lock.acquire()
self._owner = current
self._count = 1
return self
async def __aexit__(self, *args):
self._count -= 1
if self._count == 0:
self._owner = None
self._lock.release()
async def main():
rlock = ReentrantAsyncLock()
async with rlock:
print("Outer lock acquired")
async with rlock:
print("Inner lock acquired")
print("Inner released")
print("Outer released")
asyncio.run(main())
Reentrant lock design:
asyncio.current_task()returns the Task object running the current coroutine — this is the "owner" identifier.- On re-entry, we just increment
_countwithout trying to acquire the lock (which would deadlock). - On exit, decrement
_count. Only release the real lock when_countreaches zero. - This mirrors
threading.RLockbehavior but for the async world. - Important caveat: this does NOT work across different coroutines called from the same task via
await— they share the same task, so nestedawaitcalls could bypass the lock. Use with awareness of your call graph.
Expected Output
Outer lock acquired\nInner lock acquired\nInner released\nOuter releasedHints
Hint 1: A reentrant lock tracks how many times the current task has acquired it. Only release the underlying lock when the count drops to zero.
Hint 2: Use asyncio.current_task() to identify the current task.
Use AsyncExitStack to manage a dynamic number of database connections — the count is not known until runtime.
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
@asynccontextmanager
async def open_connection(conn_id):
print(f"Opened: conn_{conn_id}")
try:
yield f"conn_{conn_id}"
finally:
print(f"Closed: conn_{conn_id}")
async def main():
connection_ids = [0, 1, 2]
async with AsyncExitStack() as stack:
# enter each connection context dynamically
connections = []
for cid in connection_ids:
conn = await stack.enter_async_context(open_connection(cid))
connections.append(conn)
print("All queries done")
asyncio.run(main())
Solution
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
@asynccontextmanager
async def open_connection(conn_id):
print(f"Opened: conn_{conn_id}")
try:
yield f"conn_{conn_id}"
finally:
print(f"Closed: conn_{conn_id}")
async def main():
connection_ids = [0, 1, 2]
async with AsyncExitStack() as stack:
connections = []
for cid in connection_ids:
conn = await stack.enter_async_context(open_connection(cid))
connections.append(conn)
print("All queries done")
# All connections closed here in LIFO order
asyncio.run(main())
AsyncExitStack use cases:
- When you don't know at write time how many context managers you need (dynamic list from config, command-line args, etc.).
- LIFO exit order ensures correct reverse-order cleanup (last opened, first closed).
stack.callback(fn)registers a plain async callback (not a context manager) for cleanup.stack.push_async_exit(fn)registers an exit callback with(exc_type, exc_val, exc_tb)signature.- The stack itself is a context manager — all registrations are cleaned up when the stack's
__aexit__runs. - This is the production pattern for managing a pool of connections or a set of locks acquired based on runtime conditions.
Expected Output
Opened: conn_0\nOpened: conn_1\nOpened: conn_2\nAll queries done\nClosed: conn_2\nClosed: conn_1\nClosed: conn_0Hints
Hint 1: AsyncExitStack.enter_async_context() registers a context manager dynamically.
Hint 2: All registered context managers are exited in LIFO order when the stack exits.
Write a transaction(shard_id) async context manager that prints COMMIT on success and ROLLBACK on exception.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def transaction(shard_id):
# print BEGIN, yield, then COMMIT or ROLLBACK
pass
async def main():
async with transaction("shard_0"):
print("INSERT INTO users")
try:
async with transaction("shard_1"):
print("INSERT INTO orders")
raise RuntimeError("DB constraint violated")
except RuntimeError:
pass # already rolled back
asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def transaction(shard_id):
print(f"Transaction BEGIN on {shard_id}")
try:
yield
print(f"Transaction COMMIT on {shard_id}")
except Exception:
print(f"Transaction ROLLBACK on {shard_id}")
raise # re-raise so the caller sees the exception
async def main():
async with transaction("shard_0"):
print("INSERT INTO users")
try:
async with transaction("shard_1"):
print("INSERT INTO orders")
raise RuntimeError("DB constraint violated")
except RuntimeError:
pass
asyncio.run(main())
Transaction pattern details:
- The
except Exceptioncatches any exception from theyield(i.e., from inside theasync withblock). raisere-raises it so the caller can handle it — the context manager handles cleanup but does not suppress the error.- If no exception occurs, execution falls through to the
COMMITline. - In a real database driver,
yieldwould be replaced byyield connand COMMIT/ROLLBACK would call actual DB methods. - Note the difference from
try/finally:finallyalways runs both paths; usingtry/except/raiselets you differentiate between the success and failure paths.
Expected Output
Transaction BEGIN on shard_0\nINSERT INTO users\nTransaction COMMIT on shard_0\nTransaction BEGIN on shard_1\nINSERT INTO orders\nTransaction ROLLBACK on shard_1Hints
Hint 1: The context manager should accept shard_id. Track whether the block raised an exception to decide commit vs rollback.
Hint 2: Check exc_type in __aexit__ to determine success or failure.
Hard
Implement a simple async connection pool that limits concurrent connections to a fixed size using a semaphore and queue.
import asyncio
class ConnectionPool:
def __init__(self, pool_size):
self._semaphore = asyncio.Semaphore(pool_size)
self._pool = asyncio.Queue()
for i in range(pool_size):
self._pool.put_nowait(f"conn_{i}")
async def acquire(self):
await self._semaphore.acquire()
conn = await self._pool.get()
return conn
async def release(self, conn):
await self._pool.put(conn)
self._semaphore.release()
def connection(self):
return _PooledConnection(self)
class _PooledConnection:
def __init__(self, pool):
self._pool = pool
self._conn = None
async def __aenter__(self):
self._conn = await self._pool.acquire()
return self._conn
async def __aexit__(self, *args):
await self._pool.release(self._conn)
async def main():
pool = ConnectionPool(pool_size=2)
async def worker(worker_id):
async with pool.connection() as conn:
print(f"Worker {worker_id} using {conn}")
await asyncio.sleep(0.01)
await asyncio.gather(*[worker(i) for i in range(5)])
print("All workers done")
asyncio.run(main())
Solution
import asyncio
class ConnectionPool:
def __init__(self, pool_size):
self._semaphore = asyncio.Semaphore(pool_size)
self._pool = asyncio.Queue()
for i in range(pool_size):
self._pool.put_nowait(f"conn_{i}")
async def acquire(self):
await self._semaphore.acquire()
conn = await self._pool.get()
return conn
async def release(self, conn):
await self._pool.put(conn)
self._semaphore.release()
def connection(self):
return _PooledConnection(self)
class _PooledConnection:
def __init__(self, pool):
self._pool = pool
self._conn = None
async def __aenter__(self):
self._conn = await self._pool.acquire()
return self._conn
async def __aexit__(self, *args):
await self._pool.release(self._conn)
async def main():
pool = ConnectionPool(pool_size=2)
async def worker(worker_id):
async with pool.connection() as conn:
print(f"Worker {worker_id} using {conn}")
await asyncio.sleep(0.01)
await asyncio.gather(*[worker(i) for i in range(5)])
print("All workers done")
asyncio.run(main())
Pool design analysis:
- The semaphore limits the number of concurrent checkouts to
pool_size. - The queue holds the actual connection objects — callers get a specific connection, not just a permit.
releaseputs the connection back AND releases the semaphore, in that order, to prevent a race where a waiting task acquires the semaphore but finds the queue empty.- Real connection pools (asyncpg, aioredis) add: connection health checks, max lifetime, timeout on acquire, and connection factory callbacks.
- The
_PooledConnectioncontext manager is a separate object sopool.connection()returns a new context each time without being a coroutine itself.
Expected Output
See solution — connections checked out and returned to poolHints
Hint 1: Use asyncio.Semaphore to bound concurrency to pool_size. Track available connections in a queue.
Hint 2: On __aexit__, return the connection back to the pool.
Implement a Scope class using AsyncExitStack that supports registering defer callbacks (Go-style) and handles cleanup in reverse registration order.
import asyncio
from contextlib import AsyncExitStack
class Scope:
def __init__(self):
self._stack = AsyncExitStack()
async def __aenter__(self):
await self._stack.__aenter__()
return self
async def __aexit__(self, *args):
return await self._stack.__aexit__(*args)
def defer(self, coro_fn, *args, **kwargs):
"""Register an async cleanup callback."""
self._stack.push_async_callback(coro_fn, *args, **kwargs)
async def main():
async with Scope() as scope:
# Open file
print("Opened file: data.csv")
scope.defer(print, "Closed file: data.csv")
# Start transaction
print("Started transaction")
scope.defer(print, "Rolled back transaction")
print("Work done")
raise RuntimeError("Simulated failure")
asyncio.run(main())
Solution
import asyncio
from contextlib import AsyncExitStack
class Scope:
def __init__(self):
self._stack = AsyncExitStack()
async def __aenter__(self):
await self._stack.__aenter__()
return self
async def __aexit__(self, *args):
return await self._stack.__aexit__(*args)
def defer(self, coro_fn, *args, **kwargs):
self._stack.push_async_callback(coro_fn, *args, **kwargs)
async def main():
try:
async with Scope() as scope:
print("Opened file: data.csv")
scope.defer(print, "Closed file: data.csv")
print("Started transaction")
scope.defer(print, "Rolled back transaction")
print("Work done")
raise RuntimeError("Simulated failure")
except RuntimeError:
pass # cleanup already ran
asyncio.run(main())
Defer / scoped cleanup pattern:
push_async_callbackregisters a plain async callable (not a context manager) for cleanup.- Callbacks run in LIFO order — the last registered runs first, mirroring how stack frames unwind.
- This is the Python equivalent of Go's
deferstatement or C++'s RAII destructors. - The key advantage over explicit try/finally chains: cleanup is registered at the point of resource acquisition, keeping the code linear and readable.
AsyncExitStackhandles exceptions correctly: if a cleanup callback raises, the exception is chained with any existing exception, and remaining cleanups still run.
Expected Output
Opened file: data.csv\nStarted transaction\nWork done\nRolled back transaction\nClosed file: data.csvHints
Hint 1: Use AsyncExitStack and register cleanup callbacks with stack.push_async_callback().
Hint 2: The defer pattern: register cleanup immediately after acquiring each resource so cleanup order matches acquisition order reversed.
Implement a retry(max_attempts, backoff_base) context manager that automatically retries the block on failure with exponential backoff.
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def retry(max_attempts=3, backoff_base=0.01):
# attempt the block up to max_attempts times with exponential backoff
pass
async def main():
attempt_counter = [0]
async with retry(max_attempts=3, backoff_base=0.001):
attempt_counter[0] += 1
if attempt_counter[0] < 3:
print(f"Attempt {attempt_counter[0]} failed: transient error")
raise ConnectionError("transient error")
print(f"Attempt {attempt_counter[0]} succeeded")
asyncio.run(main())
Solution
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def retry(max_attempts=3, backoff_base=0.01):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
yield
return # success — exit the generator
except Exception as e:
last_exc = e
if attempt < max_attempts:
await asyncio.sleep(backoff_base * (2 ** (attempt - 1)))
raise last_exc
async def main():
attempt_counter = [0]
async with retry(max_attempts=3, backoff_base=0.001):
attempt_counter[0] += 1
if attempt_counter[0] < 3:
print(f"Attempt {attempt_counter[0]} failed: transient error")
raise ConnectionError("transient error")
print(f"Attempt {attempt_counter[0]} succeeded")
asyncio.run(main())
Retry context manager design:
- Each iteration of the loop
yields once. If the block succeeds,returnexits the generator cleanly. - If the block raises, the exception is caught, backoff sleep runs, and the next iteration yields again.
- After
max_attemptsfailures, the last exception is re-raised. - Exponential backoff: attempt 1 sleeps
base, attempt 2 sleeps2*base, attempt 3 sleeps4*base. - Production extension: add a
retryable_exceptionstuple to only retry specific error types; add jitter (random.uniform(0, 1) * backoff) to prevent thundering herd on a shared service.
Expected Output
Attempt 1 failed: transient error\nAttempt 2 failed: transient error\nAttempt 3 succeededHints
Hint 1: Use a loop inside the async generator. After each failed attempt, sleep for backoff_base * 2^attempt seconds.
Hint 2: The consumer re-enters the context manager on each retry iteration.
Build an observe(operation_name) context manager that logs structured span data: name, duration, outcome, and any exception details.
import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Span:
name: str
start: float = field(default_factory=time.monotonic)
duration: float = 0.0
success: bool = True
error: Optional[str] = None
tags: dict = field(default_factory=dict)
@asynccontextmanager
async def observe(operation_name):
# create Span, yield it, record duration and outcome on exit
pass
async def main():
async with observe("fetch_user") as span:
span.tags["user_id"] = 42
await asyncio.sleep(0.01)
try:
async with observe("update_order") as span:
span.tags["order_id"] = 99
raise ValueError("Stock depleted")
except ValueError:
pass
asyncio.run(main())
Solution
import asyncio
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Span:
name: str
start: float = field(default_factory=time.monotonic)
duration: float = 0.0
success: bool = True
error: Optional[str] = None
tags: dict = field(default_factory=dict)
@asynccontextmanager
async def observe(operation_name):
span = Span(name=operation_name)
try:
yield span
span.success = True
except Exception as e:
span.success = False
span.error = f"{type(e).__name__}: {e}"
raise
finally:
span.duration = time.monotonic() - span.start
status = "OK" if span.success else "ERROR"
print(
f"[SPAN] {span.name} | {status} | {span.duration*1000:.1f}ms"
+ (f" | {span.error}" if span.error else "")
+ (f" | tags={span.tags}" if span.tags else "")
)
async def main():
async with observe("fetch_user") as span:
span.tags["user_id"] = 42
await asyncio.sleep(0.01)
try:
async with observe("update_order") as span:
span.tags["order_id"] = 99
raise ValueError("Stock depleted")
except ValueError:
pass
asyncio.run(main())
Observability pattern design:
- The
Spandataclass is a mutable object yielded to the consumer, allowing them to attach metadata without coupling to the context manager's internal state. try/except/raiseseparates success and failure paths;finallyalways records duration.- This pattern is the foundation of distributed tracing (OpenTelemetry spans work similarly).
- In production, replace
printwith a structured logger that emits JSON to a log aggregator. - The
tagsdict is the equivalent of OpenTelemetry span attributes — arbitrary key-value metadata attached at the call site.
Expected Output
See solution — span logs with timing, success/failure status, and exception detailsHints
Hint 1: Capture start time in setup. In teardown, determine success or failure from exc_type.
Hint 2: Yield a mutable object (like a dict or dataclass) so the consumer can attach metadata.
