Skip to main content

Async Context Managers

Before reading any explanation, predict the output of this program:

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def resource(name):
print(f"{name}: acquire")
try:
yield name
finally:
print(f"{name}: release")

async def main():
async with resource("A") as a, resource("B") as b:
print(f"Using {a} and {b}")
raise ValueError("boom")

asyncio.run(main())

Think about the order of acquire and release calls, and whether both resources get cleaned up despite the exception.

# Output:
# A: acquire
# B: acquire
# Using A and B
# B: release
# A: release
# Traceback: ValueError: boom

Both resources are released in reverse acquisition order, even though an exception was raised. This is the same LIFO (last-in, first-out) cleanup guarantee you get with synchronous context managers, but extended to coroutines. Getting this right in production async code -- especially with connection pools, database transactions, and HTTP sessions -- is what this lesson is about.

What You Will Learn

  • The __aenter__ / __aexit__ protocol and how async with desugars
  • Writing async context managers as classes and with @asynccontextmanager
  • Resource management patterns for connections, pools, and sessions
  • AsyncExitStack for dynamic, runtime-determined resource stacking
  • Error handling and cleanup guarantees in async contexts
  • Real-world patterns: aiohttp sessions, database connection pools, distributed locks

Prerequisites

  • Synchronous context managers (__enter__ / __exit__, @contextmanager)
  • async def, await, asyncio.run()
  • Async generators (yield inside async def) from Lesson 1
  • Basic familiarity with aiohttp or asyncpg (conceptual level)

Part 1 -- The Async Context Manager Protocol

An async context manager is any object that implements __aenter__ and __aexit__ as coroutines:

import asyncio

class AsyncTimer:
"""Measure wall-clock time of an async block."""

def __init__(self, label: str):
self.label = label
self.elapsed: float = 0.0

async def __aenter__(self):
self._start = asyncio.get_event_loop().time()
print(f"[{self.label}] Starting")
return self # The value bound by `as`

async def __aexit__(self, exc_type, exc_val, exc_tb):
self.elapsed = asyncio.get_event_loop().time() - self._start
print(f"[{self.label}] Elapsed: {self.elapsed:.3f}s")
# Return False (or None) to propagate exceptions
# Return True to suppress the exception
return False
async def main():
async with AsyncTimer("fetch") as timer:
await asyncio.sleep(0.5)
print(f"Recorded: {timer.elapsed:.3f}s")

asyncio.run(main())
# Output:
# [fetch] Starting
# [fetch] Elapsed: 0.500s
# Recorded: 0.500s

Desugaring async with

The async with statement expands to:

# async with expr as var:
# body

# Desugars to:
manager = expr
var = await manager.__aenter__()
try:
body
except BaseException as exc:
if not await manager.__aexit__(type(exc), exc, exc.__traceback__):
raise
else:
await manager.__aexit__(None, None, None)
danger

__aexit__ is always awaited, even during exception handling. If __aexit__ itself raises an exception, the original exception is replaced. Always wrap cleanup logic in try/except inside __aexit__ if it can fail.

Exception Suppression

Returning True from __aexit__ suppresses the exception:

class SuppressErrors:
"""Suppress specific exception types in async blocks."""

def __init__(self, *exceptions):
self.exceptions = exceptions
self.suppressed = None

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None and issubclass(exc_type, self.exceptions):
self.suppressed = exc_val
return True # Suppress
return False # Propagate


async def main():
ctx = SuppressErrors(ConnectionError, TimeoutError)
async with ctx:
raise ConnectionError("server down")

print(f"Suppressed: {ctx.suppressed}")
# Suppressed: server down

asyncio.run(main())

Part 2 -- The asynccontextmanager Decorator

Writing a class for every context manager is verbose. The contextlib.asynccontextmanager decorator converts an async generator function into a context manager:

from contextlib import asynccontextmanager

@asynccontextmanager
async def database_transaction(connection):
"""Manage a database transaction with auto-commit/rollback."""
tx = await connection.begin()
try:
yield tx
await tx.commit()
except Exception:
await tx.rollback()
raise

The function must yield exactly once. Everything before yield is __aenter__. Everything after yield (including the finally or except blocks) is __aexit__. The yielded value becomes the as target.

Multiple Yield Prevention

If your generator yields more than once, asynccontextmanager raises RuntimeError:

@asynccontextmanager
async def broken():
yield 1
yield 2 # RuntimeError: generator didn't stop

async def main():
async with broken() as v:
print(v)

# asyncio.run(main())
# RuntimeError: generator didn't stop

Reusable vs. Single-Use

Each call to the decorated function creates a new context manager. The context manager itself is single-use:

@asynccontextmanager
async def temp_resource():
print("setup")
yield "resource"
print("teardown")

async def main():
# Each call creates a new context manager -- this is fine
async with temp_resource() as r1:
print(r1)
async with temp_resource() as r2:
print(r2)

# But reusing the same instance would fail
# cm = temp_resource()
# async with cm as r:
# pass
# async with cm as r: # RuntimeError: already used
# pass

asyncio.run(main())
# Output:
# setup
# resource
# teardown
# setup
# resource
# teardown

Exception Handling in asynccontextmanager

The exception from the body is thrown into the generator at the yield point. You must handle it correctly:

@asynccontextmanager
async def safe_resource():
resource = await acquire()
try:
yield resource
except Exception:
await resource.rollback()
raise # Re-raise so the caller sees the error
else:
await resource.commit()
finally:
await resource.close() # Always runs
tip

The pattern try/except/else/finally around yield gives you full control: except for error recovery, else for success-only logic, finally for unconditional cleanup.

Part 3 -- Resource Management Patterns

Pattern 1: Connection Pool

A connection pool is the canonical use case for async context managers. The pool itself is a context manager (setup/teardown), and acquiring a connection from the pool is also a context manager:

import asyncio
from collections import deque
from contextlib import asynccontextmanager


class AsyncConnectionPool:
"""A simple async connection pool demonstrating the pattern."""

def __init__(self, dsn: str, min_size: int = 2, max_size: int = 10):
self.dsn = dsn
self.min_size = min_size
self.max_size = max_size
self._pool: deque = deque()
self._size = 0
self._semaphore: asyncio.Semaphore | None = None

async def __aenter__(self):
"""Initialize the pool with min_size connections."""
self._semaphore = asyncio.Semaphore(self.max_size)
for _ in range(self.min_size):
conn = await self._create_connection()
self._pool.append(conn)
print(f"Pool initialized with {self.min_size} connections")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close all connections in the pool."""
while self._pool:
conn = self._pool.pop()
await self._close_connection(conn)
print("Pool closed")
return False

@asynccontextmanager
async def acquire(self):
"""Acquire a connection from the pool."""
await self._semaphore.acquire()
try:
if self._pool:
conn = self._pool.popleft()
else:
conn = await self._create_connection()
try:
yield conn
finally:
# Return connection to pool instead of closing
self._pool.append(conn)
finally:
self._semaphore.release()

async def _create_connection(self):
self._size += 1
conn_id = self._size
await asyncio.sleep(0.01) # Simulate connection setup
print(f" Created connection #{conn_id}")
return {"id": conn_id, "dsn": self.dsn}

async def _close_connection(self, conn):
await asyncio.sleep(0.01) # Simulate connection teardown
print(f" Closed connection #{conn['id']}")
async def worker(pool, worker_id):
async with pool.acquire() as conn:
print(f"Worker {worker_id} using connection #{conn['id']}")
await asyncio.sleep(0.05) # Simulate query


async def main():
async with AsyncConnectionPool("postgresql://localhost/db",
min_size=2, max_size=3) as pool:
# Run 5 workers with max 3 concurrent connections
tasks = [worker(pool, i) for i in range(5)]
await asyncio.gather(*tasks)

asyncio.run(main())
# Output (order may vary):
# Created connection #1
# Created connection #2
# Pool initialized with 2 connections
# Worker 0 using connection #1
# Worker 1 using connection #2
# Created connection #3
# Worker 2 using connection #3
# Worker 3 using connection #1
# Worker 4 using connection #2
# Closed connection #3
# Closed connection #2
# Closed connection #1
# Pool closed

Pattern 2: Scoped Temporary Directory

import shutil
import tempfile
from pathlib import Path

@asynccontextmanager
async def async_temp_directory(prefix: str = "async_"):
"""Create a temporary directory that is cleaned up on exit."""
path = Path(tempfile.mkdtemp(prefix=prefix))
try:
yield path
finally:
# Run blocking cleanup in executor to avoid blocking event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, shutil.rmtree, str(path))


async def main():
async with async_temp_directory("work_") as tmpdir:
# Create files in the temporary directory
data_file = tmpdir / "data.json"
data_file.write_text('{"key": "value"}')
print(f"Working in: {tmpdir}")
print(f"File exists: {data_file.exists()}")
# Directory is cleaned up here
print(f"Directory exists after exit: {tmpdir.exists()}")

asyncio.run(main())
# Output:
# Working in: /tmp/work_xxxxx
# File exists: True
# Directory exists after exit: False

Pattern 3: Timeout Context Manager

@asynccontextmanager
async def timeout_context(seconds: float, message: str = "Operation timed out"):
"""Raise TimeoutError if the block takes longer than `seconds`."""
task = asyncio.current_task()
loop = asyncio.get_event_loop()

def _cancel():
task.cancel(msg=message)

handle = loop.call_later(seconds, _cancel)
try:
yield
except asyncio.CancelledError as e:
raise TimeoutError(message) from e
finally:
handle.cancel() # Cancel the timer if we finished in time


async def main():
try:
async with timeout_context(1.0, "API call timed out"):
await asyncio.sleep(5.0) # Will be cancelled after 1 second
except TimeoutError as e:
print(f"Caught: {e}")

asyncio.run(main())
# Output: Caught: API call timed out
note

Python 3.11+ provides asyncio.timeout() and asyncio.timeout_at() as built-in async context managers. Prefer those in new code. The example above shows the underlying mechanism.

Part 4 -- AsyncExitStack

When you do not know at coding time how many resources you need, you cannot write a fixed set of async with statements. AsyncExitStack manages a dynamic collection of context managers:

from contextlib import AsyncExitStack, asynccontextmanager
import asyncio


@asynccontextmanager
async def open_connection(host: str):
print(f"Connecting to {host}")
yield {"host": host, "status": "connected"}
print(f"Disconnecting from {host}")


async def main():
hosts = ["db-primary", "db-replica-1", "db-replica-2", "cache-01"]

async with AsyncExitStack() as stack:
# Dynamically enter context managers
connections = []
for host in hosts:
conn = await stack.enter_async_context(open_connection(host))
connections.append(conn)

print(f"Connected to {len(connections)} services")
for c in connections:
print(f" {c['host']}: {c['status']}")

# All connections are closed in reverse order when the stack exits

asyncio.run(main())
# Output:
# Connecting to db-primary
# Connecting to db-replica-1
# Connecting to db-replica-2
# Connecting to cache-01
# Connected to 4 services
# db-primary: connected
# db-replica-1: connected
# db-replica-2: connected
# cache-01: connected
# Disconnecting from cache-01
# Disconnecting from db-replica-2
# Disconnecting from db-replica-1
# Disconnecting from db-primary

Registering Callbacks

AsyncExitStack can register arbitrary async and sync cleanup callbacks:

async def cleanup_temp_files(path):
print(f"Cleaning up {path}")
await asyncio.sleep(0.01)


async def main():
async with AsyncExitStack() as stack:
# Register an async cleanup callback
stack.push_async_callback(cleanup_temp_files, "/tmp/session-123")

# Register a sync cleanup callback
stack.callback(print, "Sync cleanup done")

# Enter a context manager
conn = await stack.enter_async_context(open_connection("db-primary"))

print("Working...")

asyncio.run(main())
# Output:
# Connecting to db-primary
# Working...
# Disconnecting from db-primary
# Sync cleanup done
# Cleaning up /tmp/session-123

Cleanup order is LIFO: the last registered item (the connection) is cleaned up first, then the sync callback, then the async callback.

Transferring Ownership with pop_all

AsyncExitStack supports transferring resource ownership with pop_all(). This is essential when a factory function creates resources and hands them to the caller:

import sys

async def create_service_connections():
"""Create connections and transfer ownership to the caller."""
stack = AsyncExitStack()
try:
await stack.__aenter__()
db = await stack.enter_async_context(open_connection("db"))
cache = await stack.enter_async_context(open_connection("cache"))
# Transfer ownership -- caller is now responsible for cleanup
new_stack = stack.pop_all()
return new_stack, db, cache
except:
# If anything fails, clean up what we created
await stack.__aexit__(*sys.exc_info())
raise


async def main():
stack, db, cache = await create_service_connections()
try:
print(f"Using {db['host']} and {cache['host']}")
finally:
await stack.aclose()

asyncio.run(main())
# Output:
# Connecting to db
# Connecting to cache
# Using db and cache
# Disconnecting from cache
# Disconnecting from db

The pop_all() call moves all registered context managers and callbacks from the current stack to a new stack. The original stack becomes empty. If the function raises before pop_all(), the except block cleans up via the original stack.

Part 5 -- Real-World Patterns

aiohttp Session Management

import aiohttp
from contextlib import asynccontextmanager


class APIClient:
"""HTTP client with proper session lifecycle management."""

def __init__(self, base_url: str, timeout: float = 30.0):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: aiohttp.ClientSession | None = None

async def __aenter__(self):
self._session = aiohttp.ClientSession(
base_url=self.base_url,
timeout=self.timeout,
headers={"User-Agent": "EngineersOfAI/1.0"},
)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
# Allow underlying SSL connections to close gracefully
await asyncio.sleep(0.25)
return False

def _check_session(self):
if not self._session:
raise RuntimeError("Client not initialized. Use 'async with'.")

async def get(self, path: str) -> dict:
self._check_session()
async with self._session.get(path) as response:
response.raise_for_status()
return await response.json()

async def post(self, path: str, data: dict) -> dict:
self._check_session()
async with self._session.post(path, json=data) as response:
response.raise_for_status()
return await response.json()


# Usage
async def fetch_users():
async with APIClient("https://api.example.com") as client:
users = await client.get("/users")
for user in users:
details = await client.get(f"/users/{user['id']}")
print(details["name"])
tip

aiohttp.ClientSession should be created once and reused across requests within a scope. Creating a new session per request defeats connection pooling. Wrapping it in an async context manager at the application level ensures a single session with proper cleanup.

Database Connection with Transaction Management

@asynccontextmanager
async def managed_transaction(pool):
"""Provide a transactional scope with auto-commit/rollback."""
conn = await pool.acquire()
tx = conn.transaction()
await tx.start()
try:
yield conn
except Exception:
await tx.rollback()
raise
else:
await tx.commit()
finally:
await pool.release(conn)


async def transfer_funds(pool, from_id: int, to_id: int, amount: float):
async with managed_transaction(pool) as conn:
balance = await conn.fetchval(
"SELECT balance FROM accounts WHERE id = $1 FOR UPDATE",
from_id
)
if balance < amount:
raise ValueError(f"Insufficient funds: {balance} < {amount}")

await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
# Transaction commits automatically when the block exits without error

Distributed Lock with Redis

@asynccontextmanager
async def distributed_lock(redis, key: str, timeout: int = 30):
"""Acquire a distributed lock using Redis with auto-release."""
import uuid
lock_value = str(uuid.uuid4())
lock_key = f"lock:{key}"

# Try to acquire with NX (set if not exists) and EX (expiry)
acquired = await redis.set(lock_key, lock_value, nx=True, ex=timeout)
if not acquired:
raise RuntimeError(f"Could not acquire lock: {key}")

try:
yield lock_key
finally:
# Release only if we still own the lock (atomic compare-and-delete)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
await redis.eval(lua_script, 1, lock_key, lock_value)


async def process_payment(redis, payment_id: str):
async with distributed_lock(redis, f"payment:{payment_id}", timeout=60):
# Only one worker can process this payment at a time
await execute_payment(payment_id)

Part 6 -- Application Lifecycle Pattern

In production applications, async context managers compose into an application lifecycle:

from contextlib import AsyncExitStack

@asynccontextmanager
async def application_lifespan(config: dict):
"""Manage the full lifecycle of application dependencies."""
async with AsyncExitStack() as stack:
# Database pool
db_pool = await stack.enter_async_context(
create_db_pool(
config["database_url"],
min_size=5,
max_size=20
)
)

# Redis connection
redis = await stack.enter_async_context(
create_redis_pool(config["redis_url"])
)

# HTTP client for external APIs
http_client = await stack.enter_async_context(
APIClient(config["external_api_url"])
)

# Register a cleanup callback for temporary files
stack.push_async_callback(cleanup_session_files)

app_state = {
"db": db_pool,
"redis": redis,
"http": http_client,
}

yield app_state
# Everything cleaned up in reverse order by AsyncExitStack

This pattern is used by frameworks like FastAPI (via the lifespan parameter) and Starlette.

Part 7 -- Common Mistakes and Debugging

Mistake 1: Forgetting to await in aexit

class BrokenManager:
async def __aenter__(self):
self.conn = await create_connection()
return self.conn

async def __aexit__(self, *exc):
self.conn.close() # BUG: close() is a coroutine!
# Creates an unawaited coroutine object
# Python warns: RuntimeWarning: coroutine 'close' was never awaited
return False

Fix: await self.conn.close().

Mistake 2: Using sync context managers for async resources

# BAD: blocks the event loop during file I/O
async def bad_read():
with open("large_file.csv") as f:
data = f.read() # Blocks!
return data

# GOOD: use aiofiles for async file I/O
import aiofiles

async def good_read():
async with aiofiles.open("large_file.csv") as f:
data = await f.read()
return data

Mistake 3: Losing the original exception in cleanup

@asynccontextmanager
async def lossy_resource():
resource = await acquire()
try:
yield resource
finally:
await resource.close() # If this raises, the original exception is LOST

# BETTER: protect cleanup from its own failures
@asynccontextmanager
async def safe_resource():
resource = await acquire()
try:
yield resource
finally:
try:
await resource.close()
except Exception:
import logging
logging.exception("Failed to close resource")

Mistake 4: Not handling cancellation

@asynccontextmanager
async def cancellation_safe():
resource = await acquire()
try:
yield resource
except asyncio.CancelledError:
# Cleanup must still happen on cancellation
await resource.close()
raise # Always re-raise CancelledError
except Exception:
await resource.rollback()
raise
else:
await resource.commit()
finally:
await resource.release()
danger

Never swallow asyncio.CancelledError. It is the mechanism by which tasks are cancelled. Catching it for cleanup is fine, but always re-raise it.

Key Takeaways

  • Async context managers implement __aenter__() and __aexit__() as coroutines. Both are awaited by the async with statement.
  • @asynccontextmanager from contextlib converts an async generator (exactly one yield) into a context manager -- the preferred approach for simple cases.
  • Resources are cleaned up in LIFO order. __aexit__ runs even when exceptions occur. This is the fundamental guarantee.
  • Return True from __aexit__ to suppress an exception. Return False (or None) to propagate it. Be explicit.
  • AsyncExitStack manages dynamic numbers of context managers, supports callbacks, and enables ownership transfer with pop_all().
  • Always use try/finally around yield in @asynccontextmanager to ensure cleanup runs on both normal exit and exceptions.
  • Never perform blocking I/O inside an async with block. Use async-compatible libraries (aiofiles, aiohttp, asyncpg).
  • Protect cleanup code from its own exceptions to avoid losing the original error.

Graded Practice Challenges

Level 1 -- Predict the Output

Question 1: What does this print?

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def ctx(name):
print(f"{name} enter")
yield
print(f"{name} exit")

async def main():
async with ctx("outer"):
async with ctx("inner"):
print("body")

asyncio.run(main())
Answer
outer enter
inner enter
body
inner exit
outer exit

Context managers nest in LIFO order. Inner enters after outer, and exits before outer.

Question 2: What happens here?

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def faulty():
yield "value"
raise RuntimeError("cleanup failed")

async def main():
try:
async with faulty() as v:
print(v)
raise ValueError("body failed")
except Exception as e:
print(f"Caught: {type(e).__name__}: {e}")

asyncio.run(main())
Answer
value
Caught: RuntimeError: cleanup failed

The ValueError from the body is thrown into the generator at the yield point. The generator does not handle it, so it would propagate. But then the code after yield raises RuntimeError, which replaces the original ValueError. The caller only sees RuntimeError. This is why you should always use try/finally around yield in @asynccontextmanager generators.

Question 3: What does this print?

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager

@asynccontextmanager
async def numbered(n):
print(f"enter {n}")
yield n
print(f"exit {n}")

async def main():
async with AsyncExitStack() as stack:
values = []
for i in range(3):
v = await stack.enter_async_context(numbered(i))
values.append(v)
print(f"values: {values}")

asyncio.run(main())
Answer
enter 0
enter 1
enter 2
values: [0, 1, 2]
exit 2
exit 1
exit 0

AsyncExitStack enters context managers in registration order and exits them in reverse (LIFO).

Level 2 -- Debug Challenge

This code manages a database connection pool and runs queries, but connections leak when queries fail. Find and fix the bug.

import asyncio

class ConnectionPool:
def __init__(self, size):
self.size = size
self.connections = []
self.available = asyncio.Queue()

async def __aenter__(self):
for i in range(self.size):
conn = {"id": i, "in_use": False}
self.connections.append(conn)
await self.available.put(conn)
return self

async def __aexit__(self, *exc):
for conn in self.connections:
print(f"Closing connection {conn['id']}")
return False

async def acquire(self):
conn = await self.available.get()
conn["in_use"] = True
return conn

async def release(self, conn):
conn["in_use"] = False
await self.available.put(conn)

async def run_query(pool, query_id):
conn = await pool.acquire()
print(f"Query {query_id} on connection {conn['id']}")
await asyncio.sleep(0.1)
if query_id == 2:
raise ValueError("Query failed!")
await pool.release(conn)

async def main():
async with ConnectionPool(2) as pool:
tasks = [run_query(pool, i) for i in range(5)]
await asyncio.gather(*tasks, return_exceptions=True)

asyncio.run(main())
Answer

When query_id == 2 raises ValueError, pool.release(conn) never executes. The connection leaks and subsequent queries that try to acquire hang (the queue never gets that connection back).

Fix: wrap the query in try/finally:

async def run_query(pool, query_id):
conn = await pool.acquire()
try:
print(f"Query {query_id} on connection {conn['id']}")
await asyncio.sleep(0.1)
if query_id == 2:
raise ValueError("Query failed!")
finally:
await pool.release(conn)

Better yet, make acquire() return an async context manager:

from contextlib import asynccontextmanager

@asynccontextmanager
async def acquire(self):
conn = await self.available.get()
conn["in_use"] = True
try:
yield conn
finally:
conn["in_use"] = False
await self.available.put(conn)

# Usage:
async def run_query(pool, query_id):
async with pool.acquire() as conn:
print(f"Query {query_id} on connection {conn['id']}")
await asyncio.sleep(0.1)
if query_id == 2:
raise ValueError("Query failed!")
# Connection automatically returned to pool

Level 3 -- Design Challenge

Design an AsyncResourceGroup class that manages a collection of named async resources. Requirements:

  1. Resources are registered by name with a factory coroutine: group.register("db", create_db_pool)
  2. Resources are created lazily -- only when first accessed via await group.get("db")
  3. All created resources are cleaned up on exit (they are async context managers themselves)
  4. If any resource fails during cleanup, all remaining resources are still cleaned up, and all errors are collected
  5. The group itself is an async context manager
Design Hints

Use AsyncExitStack internally for LIFO cleanup:

from contextlib import AsyncExitStack
from typing import Any, Callable

class AsyncResourceGroup:
def __init__(self):
self._factories: dict[str, Callable] = {}
self._resources: dict[str, Any] = {}
self._stack: AsyncExitStack | None = None

def register(self, name: str, factory: Callable):
"""Register a factory. factory() must return an async context manager."""
self._factories[name] = factory

async def __aenter__(self):
self._stack = AsyncExitStack()
await self._stack.__aenter__()
return self

async def get(self, name: str) -> Any:
"""Lazily create and return a named resource."""
if name in self._resources:
return self._resources[name]
if name not in self._factories:
raise KeyError(f"Unknown resource: {name}")
resource = await self._stack.enter_async_context(
self._factories[name]()
)
self._resources[name] = resource
return resource

async def __aexit__(self, *exc):
return await self._stack.__aexit__(*exc)

Key design decisions:

  • AsyncExitStack handles LIFO cleanup and error collection automatically
  • Lazy creation means unused resources never allocate
  • get() is async because it may need to create (and __aenter__) the resource
  • __getitem__ cannot be async, so use an explicit get() method

What's Next

You now understand how to manage async resource lifecycles reliably. The next lesson, Structured Concurrency with TaskGroup, addresses the problem of running multiple concurrent tasks with proper error handling and cancellation -- building on the cleanup guarantees you learned here.

© 2026 EngineersOfAI. All rights reserved.