Skip to main content

Context Managers - The with Statement and Resource Management

Reading time: ~17 minutes | Level: Foundation → Engineering

Here is something most Python developers miss about context managers:

class SuppressAndLog:
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
print(f"Suppressed: {exc_type.__name__}: {exc_val}")
return True # This return value changes everything

with SuppressAndLog():
x = 1 / 0 # ZeroDivisionError

print("Program continues!") # This line executes!

# Output:
# Suppressed: ZeroDivisionError: division by zero
# Program continues!

Returning True from __exit__ suppresses the exception. The with block raises, __exit__ swallows it, and execution continues normally after the with statement.

Most developers think of context managers as "only for files." In reality, they are a general protocol for guaranteed cleanup, exception handling, and resource lifecycle management - one of the most powerful patterns in Python.

What You Will Learn

  • The __enter__ / __exit__ protocol - exactly what Python calls and when
  • The with statement desugared: the real sequence of operations step by step
  • The full __exit__ signature and the critical meaning of its return value
  • Writing your own context manager class with a real database connection example
  • The @contextmanager decorator: generator-based context managers with yield
  • Five key tools from contextlib: suppress, ExitStack, nullcontext, redirect_stdout, closing
  • Nested with statements and the combined with A() as a, B() as b: syntax
  • Exception handling strategies in __exit__: re-raise, suppress, or replace
  • Real-world patterns: database transactions, lock acquisition, timers, temp directories

Prerequisites

  • Python 3.8+ and understanding of classes and dunder methods
  • Familiarity with try/except/finally blocks
  • Basic understanding of generators and yield (for @contextmanager section)

Mental Model: The Lifecycle of a with Block

The diagram below shows the exact execution order when Python encounters a with statement - from evaluating the expression to the final exception decision in __exit__:

with statement execution flow - showing __enter__, body execution, and __exit__ with exception handling paths

And here is how the __enter__ / __exit__ protocol maps to Python runtime internals - what each call returns, what data flows where:

Context manager protocol memory view - Your Code, Python Runtime, and Context Manager Object columns

Part 1 - The Protocol: __enter__ and __exit__

__enter__ - Setup

__enter__ is called when the with block begins. Its return value is what gets bound to the as variable:

import time


class Timer:
def __enter__(self):
self.start = time.perf_counter()
return self # `as` variable will be this Timer instance

def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = time.perf_counter() - self.start
print(f"Elapsed: {self.elapsed:.4f}s")
return False # do not suppress exceptions

with Timer() as t:
result = sum(range(10_000_000))

print(f"Sum: {result}")
print(f"Time: {t.elapsed:.4f}s")
# Elapsed: 0.1823s
# Sum: 49999995000000
# Time: 0.1823s

Key point: __enter__ can return anything. File objects return self. Database connections often return a cursor. Locks return the lock itself. It is common but not required to return self.

__exit__ - Teardown

__exit__ is called when the with block exits - whether by normal completion, return, break, continue, or exception.

class DatabaseTransaction:
def __init__(self, connection):
self.conn = connection

def __enter__(self):
self.conn.begin()
return self.conn.cursor() # return cursor, not self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
# No exception - commit the transaction
self.conn.commit()
print("Transaction committed")
else:
# Exception occurred - roll back
self.conn.rollback()
print(f"Transaction rolled back due to: {exc_type.__name__}")

return False # never suppress exceptions - let them propagate

The __exit__ Signature in Full Detail

def __exit__(self, exc_type, exc_val, exc_tb):
# ↑ ↑ ↑
# exception type exception traceback object
# e.g. ValueError instance (for debugging)
# None if no exc None None
pass
class DetailedExit:
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
print("No exception occurred")
else:
import traceback
print(f"Exception type: {exc_type}")
print(f"Exception value: {exc_val}")
print("Traceback:")
traceback.print_tb(exc_tb)
return False

with DetailedExit():
raise ValueError("something went wrong")

# Exception type: <class 'ValueError'>
# Exception value: something went wrong
# Traceback:
# File "...", line N, in <module>
# raise ValueError("something went wrong")

The Return Value of __exit__

This is the most commonly misunderstood part:

# Return False (or None) - exception propagates normally
def __exit__(self, exc_type, exc_val, exc_tb):
cleanup()
return False # exception continues to propagate

# Return True - exception is suppressed (swallowed)
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is FileNotFoundError:
return True # suppress FileNotFoundError only
return False # propagate everything else
# Selective suppression example
import os


class IgnoreNotFound:
"""Suppress FileNotFoundError, let everything else propagate."""
def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
return exc_type is FileNotFoundError

# Safe delete that does not raise if file is missing
with IgnoreNotFound():
os.remove("might_not_exist.txt")

# Equivalent to:
try:
os.remove("might_not_exist.txt")
except FileNotFoundError:
pass

Part 2 - The with Statement Desugared

Understanding the exact Python translation of with removes all the magic:

# Source:
with EXPR as VAR:
BODY

# Desugared (approximate Python equivalent):
_cm = EXPR # evaluate the expression once
VAR = _cm.__enter__() # call __enter__, bind result
_exc_occurred = False
try:
try:
BODY
except:
_exc_occurred = True
if not _cm.__exit__(*sys.exc_info()):
raise
# if __exit__ returns True, exception is swallowed here
finally:
if not _exc_occurred:
_cm.__exit__(None, None, None)
# Without `as`:
with EXPR:
BODY

# Desugared:
_cm = EXPR
_cm.__enter__() # return value discarded
try:
BODY
except:
if not _cm.__exit__(*sys.exc_info()):
raise
else:
_cm.__exit__(None, None, None)

This desugaring explains several behaviors:

  1. EXPR is evaluated exactly once - you cannot modify it inside the with block
  2. __exit__ is always called if __enter__ succeeded
  3. If __exit__ raises, that exception replaces the original exception

:::warning If __enter__ raises If __enter__() raises an exception, __exit__() is not called. The exception just propagates. This means if your __enter__ acquires multiple resources and the second acquisition fails, you must clean up the first acquisition inside __enter__ itself (in a try/except), because __exit__ will not be called. :::

Part 3 - Writing a Context Manager Class

Let us build a realistic database connection context manager:

import sqlite3


class ManagedConnection:
"""
A context manager for SQLite database connections.
Commits on success, rolls back on exception.
"""

def __init__(self, database_path, isolation_level="DEFERRED"):
self.database_path = database_path
self.isolation_level = isolation_level
self.conn = None

def __enter__(self):
self.conn = sqlite3.connect(self.database_path)
self.conn.isolation_level = self.isolation_level
self.conn.execute("BEGIN")
return self.conn # caller gets the connection object

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()

self.conn.close()
self.conn = None

return False # always propagate exceptions after cleanup

def __repr__(self):
status = "open" if self.conn else "closed"
return f"ManagedConnection({self.database_path!r}, status={status})"


# Usage
with ManagedConnection("/tmp/app.db") as conn:
conn.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER, name TEXT)")
conn.execute("INSERT INTO users VALUES (1, 'Alice')")
conn.execute("INSERT INTO users VALUES (2, 'Bob')")
# commit happens automatically on exit

# Transaction rollback example
try:
with ManagedConnection("/tmp/app.db") as conn:
conn.execute("INSERT INTO users VALUES (3, 'Charlie')")
raise ValueError("Something went wrong!")
conn.execute("INSERT INTO users VALUES (4, 'Dave')")
except ValueError:
print("Transaction was rolled back - Charlie was not saved")

A Lock Context Manager with Timeout

import threading


class TimedLock:
"""
Acquire a lock with a timeout. Raises TimeoutError if lock
cannot be acquired within the given time.
"""

def __init__(self, lock, timeout=5.0):
self.lock = lock
self.timeout = timeout
self._acquired = False

def __enter__(self):
acquired = self.lock.acquire(timeout=self.timeout)
if not acquired:
raise TimeoutError(
f"Could not acquire lock within {self.timeout}s"
)
self._acquired = True
return self.lock

def __exit__(self, exc_type, exc_val, exc_tb):
if self._acquired:
self.lock.release()
self._acquired = False
return False


# Usage
shared_lock = threading.Lock()

def update_shared_state():
pass # placeholder

with TimedLock(shared_lock, timeout=2.0):
# Critical section - lock is held here
update_shared_state()
# Lock is always released here

Part 4 - @contextmanager: Generator-Based Context Managers

The @contextmanager decorator from contextlib lets you write context managers as generator functions. This is often simpler than writing a class:

from contextlib import contextmanager
import time


@contextmanager
def timer(label=""):
"""Context manager that prints elapsed time."""
start = time.perf_counter()
try:
yield # execution transfers to the `with` block body here
finally:
elapsed = time.perf_counter() - start
name = f"[{label}] " if label else ""
print(f"{name}Elapsed: {elapsed:.4f}s")


with timer("sorting"):
data = sorted(range(1_000_000), reverse=True)
# [sorting] Elapsed: 0.0823s

How @contextmanager Works

The generator is paused at yield. When the with block exits (normally or via exception), the generator is resumed from the yield point. If the body raised, the exception is thrown into the generator at the yield expression.

from contextlib import contextmanager
import sqlite3


@contextmanager
def transaction(database_path):
"""Generator-based transaction context manager."""
conn = sqlite3.connect(database_path)
try:
conn.execute("BEGIN")
yield conn # caller receives the connection
conn.commit() # reached only if no exception
except Exception:
conn.rollback()
raise # re-raise after rollback
finally:
conn.close() # always close


with transaction("/tmp/app.db") as conn:
conn.execute("INSERT INTO events VALUES (?, ?)", (1, "login"))
conn.execute("INSERT INTO events VALUES (?, ?)", (2, "purchase"))
# Committed

try:
with transaction("/tmp/app.db") as conn:
conn.execute("INSERT INTO events VALUES (?, ?)", (3, "logout"))
raise RuntimeError("Unexpected failure")
except RuntimeError:
pass
# Rolled back

Temp Directory with @contextmanager

from contextlib import contextmanager
import tempfile, os, shutil


@contextmanager
def temp_directory(prefix="tmp_"):
"""Create a temp dir, yield it, delete it on exit."""
tmpdir = tempfile.mkdtemp(prefix=prefix)
try:
yield tmpdir # caller gets the path string
finally:
shutil.rmtree(tmpdir, ignore_errors=True)


with temp_directory(prefix="build_") as workspace:
config_path = os.path.join(workspace, "config.json")
with open(config_path, "w") as f:
f.write('{"setting": true}')
print(f"Files in workspace: {os.listdir(workspace)}")
# Files in workspace: ['config.json']

# workspace and all its contents are deleted here
print(f"Workspace exists: {os.path.exists(workspace)}") # False

:::tip @contextmanager vs class: which to use? Use @contextmanager for:

  • Simple setup/teardown without complex state
  • One-off patterns used only in one place
  • When the generator form reads more clearly

Use a class when:

  • The context manager has significant state (multiple instance variables)
  • It needs additional methods beyond context manager protocol
  • It must be reusable across multiple modules (a named class is easier to import)
  • You need __repr__ for debugging :::

Part 5 - contextlib Tools

contextlib.suppress(*exceptions) - Clean Exception Suppression

from contextlib import suppress
import os

# The Pythonic way to delete a file if it exists:
with suppress(FileNotFoundError):
os.remove("cache.tmp")

# Equivalent to:
try:
os.remove("cache.tmp")
except FileNotFoundError:
pass

# Suppress multiple exception types:
with suppress(FileNotFoundError, PermissionError):
os.remove("/protected/file.txt")

suppress is cleaner than try/except/pass and signals intent explicitly - this is expected to sometimes fail, and that is acceptable.

contextlib.ExitStack - Dynamic Number of Context Managers

ExitStack is for situations where you do not know at write-time how many context managers you need. Context managers are entered in order and exited in reverse (LIFO) - guaranteeing cleanup even when the number of resources is determined at runtime:

ExitStack LIFO visualization - context managers entered in order 1, 2, 3 and exited in reverse order 3, 2, 1
from contextlib import ExitStack


def merge_files(input_paths, output_path):
"""Merge multiple files into one, all opened simultaneously."""
with ExitStack() as stack:
# Register each file with the stack
input_files = [
stack.enter_context(open(path, "r", encoding="utf-8"))
for path in input_paths
]
output = stack.enter_context(open(output_path, "w", encoding="utf-8"))

for f in input_files:
for line in f:
output.write(line)

# All files closed here, even if an exception occurred

merge_files(["part1.txt", "part2.txt", "part3.txt"], "merged.txt")
# ExitStack as a reusable defer mechanism
from contextlib import ExitStack
import os

stack = ExitStack()

# Register cleanup functions - called in LIFO order on stack.close()
stack.callback(lambda: print("cleanup 1 ran"))
stack.callback(lambda: print("cleanup 2 ran"))
tmp_path = "/tmp/work.tmp"
open(tmp_path, "w").close()
stack.callback(os.unlink, tmp_path)

try:
do_work()
finally:
stack.close()
# cleanup 2 ran
# cleanup 1 ran
# (and /tmp/work.tmp is deleted)
# ExitStack for conditional resource registration
from contextlib import ExitStack


def process_with_optional_backup(source, create_backup=False):
with ExitStack() as stack:
src = stack.enter_context(open(source, "r", encoding="utf-8"))

if create_backup:
backup_path = source + ".bak"
bak = stack.enter_context(open(backup_path, "w", encoding="utf-8"))
else:
bak = None

for line in src:
if bak:
bak.write(line)
# process line

contextlib.nullcontext - Do-Nothing Placeholder

from contextlib import nullcontext
import sys


def process_data(data, output=None):
"""
Process data, writing to output if provided.
output can be a file path (str), file object, or None.
"""
if output is None:
ctx = nullcontext(None) # no-op context manager
elif isinstance(output, str):
ctx = open(output, "w", encoding="utf-8")
else:
ctx = nullcontext(output) # wrap existing file object (no-op)

with ctx as f:
results = []
for item in data:
result = str(item).upper()
results.append(result)
if f is not None:
f.write(f"{result}\n")
return results

# All three work without duplicating the processing logic:
process_data(["a", "b", "c"]) # no file output
process_data(["a", "b", "c"], "output.txt") # write to file
process_data(["a", "b", "c"], sys.stdout) # write to stdout

contextlib.redirect_stdout and redirect_stderr

from contextlib import redirect_stdout, redirect_stderr
import io


# Capture print() output without modifying the function
def noisy_function():
print("Debug output A")
print("Debug output B")
return 42

captured = io.StringIO()
with redirect_stdout(captured):
result = noisy_function()

output = captured.getvalue() # "Debug output A\nDebug output B\n"
assert result == 42
assert "Debug output A" in output

# Redirect stderr to a file
with open("errors.log", "a") as f, redirect_stderr(f):
# any writes to stderr from code inside here go to errors.log
import warnings
warnings.warn("this is a warning")

contextlib.closing - Add Context Manager Protocol to Any Object

from contextlib import closing
import urllib.request


# closing() wraps any object that has a close() method
# Adds __enter__ (returns self) and __exit__ (calls self.close())
with closing(urllib.request.urlopen("https://httpbin.org/get")) as response:
content = response.read(200)

# Useful for database cursors that do not implement __enter__/__exit__:
# with closing(db_connection.cursor()) as cursor:
# cursor.execute("SELECT * FROM users")
# rows = cursor.fetchall()

Part 6 - Nested with Statements

Traditional Nesting

# Nested with statements - valid, but increases indentation:
with open("input.txt", "r", encoding="utf-8") as infile:
with open("output.txt", "w", encoding="utf-8") as outfile:
for line in infile:
outfile.write(line.upper())

Combined Syntax (Python 2.7+)

# Multiple context managers in one line - same as nesting:
with open("input.txt", "r", encoding="utf-8") as infile, \
open("output.txt", "w", encoding="utf-8") as outfile:
for line in infile:
outfile.write(line.upper())

The combined form is exactly equivalent to nesting - they are closed in LIFO order (outfile first, then infile).

Exception Propagation in Nested Managers

class CM:
def __init__(self, name):
self.name = name

def __enter__(self):
print(f"{self.name}: __enter__")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
print(f"{self.name}: __exit__ (exc={exc_type})")
return False # do not suppress

with CM("outer"), CM("inner"):
raise ValueError("test")

# Output:
# outer: __enter__
# inner: __enter__
# inner: __exit__ (exc=<class 'ValueError'>)
# outer: __exit__ (exc=<class 'ValueError'>)
# ValueError: test

Both __exit__ methods receive the original exception. The inner one is called first (LIFO). If the inner __exit__ suppresses the exception (returns True), the outer __exit__ is called with (None, None, None).

Part 7 - Exception Handling Strategies in __exit__

Strategy 1: Re-raise (Default)

def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
return False # or return None - same effect

# Use when: the exception is meaningful to the caller and should propagate

Strategy 2: Suppress Specific Exceptions

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is FileNotFoundError:
log.warning(f"File not found: {exc_val}")
return True # suppress
return False # propagate everything else

Strategy 3: Replace the Exception

import sqlite3


def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is sqlite3.IntegrityError:
# Translate low-level DB error to domain error
raise DuplicateUserError(str(exc_val)) from exc_val
return False

# Use when: lower-level exceptions should be wrapped in domain-specific ones
# The `from exc_val` preserves the original exception as __cause__

Strategy 4: Log and Re-raise

import logging

logger = logging.getLogger(__name__)


def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()
if exc_type is not None:
logger.error(
"Exception during database operation",
exc_info=(exc_type, exc_val, exc_tb)
)
return False # always re-raise after logging

Part 8 - Real-World Patterns

Pattern 1: Database Transaction Manager

from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)


@contextmanager
def db_transaction(session):
"""
SQLAlchemy session transaction context manager.
Commits on success, rolls back and re-raises on failure.
Always logs transaction outcome.
"""
try:
yield session
session.commit()
logger.debug("Transaction committed")
except Exception as exc:
session.rollback()
logger.warning(f"Transaction rolled back: {exc}")
raise
finally:
session.close()


# Usage in an API route handler:
# with db_transaction(get_session()) as session:
# order = Order(**order_data.dict())
# session.add(order)

Pattern 2: Timer Context Manager with Statistics

from contextlib import contextmanager
import time
from dataclasses import dataclass, field
from typing import List


@dataclass
class TimingStats:
label: str
durations: List[float] = field(default_factory=list)

@property
def count(self): return len(self.durations)
@property
def total(self): return sum(self.durations)
@property
def mean(self): return self.total / self.count if self.count else 0
@property
def minimum(self): return min(self.durations) if self.durations else 0
@property
def maximum(self): return max(self.durations) if self.durations else 0

def report(self):
print(f"[{self.label}] n={self.count} "
f"mean={self.mean*1000:.2f}ms "
f"min={self.minimum*1000:.2f}ms "
f"max={self.maximum*1000:.2f}ms")


@contextmanager
def measure(stats: TimingStats):
"""Accumulate timing measurements into a TimingStats object."""
start = time.perf_counter()
try:
yield stats
finally:
duration = time.perf_counter() - start
stats.durations.append(duration)


# Measure multiple iterations
db_stats = TimingStats("database_query")
for _ in range(100):
with measure(db_stats):
time.sleep(0.001) # simulate DB call

db_stats.report()
# [database_query] n=100 mean=1.05ms min=0.98ms max=3.21ms

Pattern 3: Managed Build Workspace

from contextlib import contextmanager
import os, shutil, tempfile, sys, logging

logger = logging.getLogger(__name__)


@contextmanager
def build_workspace(project_name, preserve_on_error=False):
"""
Create a temporary build workspace.
On success: clean up automatically.
On failure: optionally preserve for debugging.
"""
tmpdir = tempfile.mkdtemp(prefix=f"build_{project_name}_")
logger.info(f"Build workspace: {tmpdir}")
failed = False

try:
yield tmpdir
logger.info("Build succeeded, cleaning workspace")
except Exception as exc:
failed = True
if preserve_on_error:
logger.error(
f"Build failed - workspace preserved at {tmpdir}"
)
else:
logger.error(f"Build failed: {exc}")
raise
finally:
if os.path.exists(tmpdir):
if not (preserve_on_error and failed):
shutil.rmtree(tmpdir, ignore_errors=True)


# Usage
with build_workspace("myproject", preserve_on_error=True) as workspace:
src_dir = os.path.join(workspace, "src")
os.makedirs(src_dir)
# compile_sources(src_dir)
# run_tests(workspace)

Pattern 4: Rate Limiter Context Manager

import time
import threading


class RateLimiter:
"""Token bucket rate limiter as a context manager."""

def __init__(self, calls_per_second):
self.min_interval = 1.0 / calls_per_second
self.last_call = 0.0
self._lock = threading.Lock()

def __enter__(self):
with self._lock:
now = time.monotonic()
wait_time = self.min_interval - (now - self.last_call)
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.monotonic()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
return False


# Allow max 10 API calls per second
api_limiter = RateLimiter(calls_per_second=10)

def make_api_call(url):
pass # placeholder

endpoints = ["/api/users", "/api/orders", "/api/items"]
for endpoint in endpoints:
with api_limiter:
response = make_api_call(endpoint)

Interview Questions

Q1: What is a context manager and what protocol does it implement?

Answer: A context manager is an object that implements the context management protocol: __enter__(self) and __exit__(self, exc_type, exc_val, exc_tb). When used with the with statement, Python calls __enter__() before the block and __exit__() after the block exits - regardless of whether the exit was normal or due to an exception. The with statement is syntactic sugar for a try/finally pattern that guarantees __exit__() is called for cleanup. Context managers are used to manage any resource with a well-defined lifecycle: file handles, database connections, locks, network sockets, timers.

Q2: What does __exit__ returning True mean? Give a concrete use case.

Answer: When __exit__ returns a truthy value, the exception that caused the with block to exit is suppressed - it does not propagate to the caller. Execution continues at the statement after the with block. Returning False or None (the default) allows the exception to propagate normally.

Concrete use case: contextlib.suppress(FileNotFoundError) returns a context manager whose __exit__ returns True if exc_type is FileNotFoundError. This lets you write:

with suppress(FileNotFoundError):
os.remove("cache.tmp")

instead of a try/except/pass block. Only use exception suppression when you intentionally expect and accept a specific exception type - suppressing all exceptions masks bugs.

Q3: Explain the difference between @contextmanager and writing a context manager class. When would you use each?

Answer: Both achieve the same result. @contextmanager wraps a generator function: code before yield is __enter__, the yield value is the as variable, and code after yield (typically in finally) is __exit__. It is concise for simple patterns.

A class is better when: (1) the context manager has significant state (multiple instance variables); (2) it needs additional methods; (3) it must be reusable across multiple modules (a named class is easier to import and inspect); (4) you need __repr__ for debugging; (5) the cleanup logic is complex with multiple exception branches.

@contextmanager is better when: (1) the setup and teardown are simple; (2) the pattern is used only in one place; (3) the generator form reads more clearly than a class.

Q4: What is contextlib.ExitStack and when is it necessary?

Answer: ExitStack is a context manager that maintains a stack of context managers and cleanup callbacks. It allows you to dynamically compose context managers at runtime - when you don't know at write-time how many resources you need to manage.

It is necessary when: (1) you need to open a variable number of files based on runtime input; (2) you want to conditionally add context managers; (3) you want to register plain cleanup callbacks alongside context managers; (4) you want defer-style cleanup (register stack.callback(fn, *args) throughout the function).

Example: with ExitStack() as stack: files = [stack.enter_context(open(p)) for p in paths] opens all files and guarantees all are closed even if one raises.

Q5: What happens if __enter__ raises an exception?

Answer: If __enter__() raises, __exit__() is not called. There is no resource to clean up from the context manager's perspective - setup never completed. The exception propagates from the with statement. This means if your __enter__ acquires multiple resources (e.g., opens two files), and the second acquisition fails, you must clean up the first acquisition inside __enter__ itself (in a try/except), because __exit__ will not be called.

For this reason, ExitStack is the preferred approach when acquiring multiple resources: each successful acquisition is registered on the stack, and the stack's __exit__ handles cleanup of whatever was successfully acquired.

Q6: How would you implement a context manager that retries the body on specific exceptions?

Answer:

The @contextmanager approach cannot re-execute the with body because the body is not a callable from the context manager's perspective. A context manager wraps one execution of the body - it cannot loop around it.

The correct pattern for retry is a decorator or explicit loop:

import time
from functools import wraps


def retry_on(exc_type, max_attempts=3, delay=1.0):
"""Decorator that retries the wrapped function on specific exceptions."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exc_type as e:
if attempt == max_attempts:
raise
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
return wrapper
return decorator


@retry_on(ConnectionError, max_attempts=3, delay=0.5)
def fetch_data(url):
import urllib.request
with urllib.request.urlopen(url) as response:
return response.read()

Context managers are suitable for setup/teardown but not for retry loops - that distinction is an important design signal.

Practice Challenges

Beginner - Build a Simple Timer

Write a context manager class Timer that measures the elapsed time of the with block. The as variable should expose the result via t.elapsed after the block.

Solution
import time


class Timer:
"""
Context manager that measures elapsed time.

Usage:
with Timer() as t:
do_work()
print(f"Took {t.elapsed:.3f}s")
"""

def __init__(self, label=""):
self.label = label
self.elapsed = 0.0
self._start = None

def __enter__(self):
self._start = time.perf_counter()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.elapsed = time.perf_counter() - self._start
if self.label:
print(f"[{self.label}] {self.elapsed:.4f}s")
return False # never suppress exceptions

def __repr__(self):
return f"Timer(label={self.label!r}, elapsed={self.elapsed:.4f}s)"


# Test
with Timer("sorting") as t:
data = sorted(range(1_000_000), reverse=True)

print(f"Sorted in {t.elapsed:.4f}s") # Sorted in 0.0823s

# Works even when an exception occurs
try:
with Timer("will fail") as t:
time.sleep(0.1)
raise ValueError("oops")
except ValueError:
print(f"Failed after {t.elapsed:.4f}s") # Failed after 0.1003s

Intermediate - Database Transaction with Savepoints

Write a @contextmanager function savepoint(conn, name) that creates a SQLite savepoint. On success, release the savepoint. On exception, roll back to the savepoint and re-raise.

Savepoints let you nest transactions - rolling back to a savepoint undoes work since the savepoint without rolling back the entire transaction.

Solution
import sqlite3
from contextlib import contextmanager


@contextmanager
def savepoint(conn, name):
"""
Create a SQLite savepoint within an existing transaction.
On success: release the savepoint (work is kept).
On exception: rollback to the savepoint, then re-raise.

Savepoints allow nested rollback without losing the outer transaction.
"""
safe_name = name.replace("-", "_").replace(" ", "_")
conn.execute(f"SAVEPOINT {safe_name}")
try:
yield conn
conn.execute(f"RELEASE SAVEPOINT {safe_name}")
except Exception:
conn.execute(f"ROLLBACK TO SAVEPOINT {safe_name}")
conn.execute(f"RELEASE SAVEPOINT {safe_name}")
raise


# Test
def demo():
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE log (msg TEXT)")
conn.execute("BEGIN")

try:
conn.execute("INSERT INTO log VALUES ('step 1')")

# This savepoint will succeed
with savepoint(conn, "sp1"):
conn.execute("INSERT INTO log VALUES ('step 2')")

# This savepoint will fail and roll back - but steps 1 and 2 survive
try:
with savepoint(conn, "sp2"):
conn.execute("INSERT INTO log VALUES ('step 3 - will rollback')")
raise ValueError("Step 3 failed!")
except ValueError:
print("Step 3 rolled back to savepoint")

conn.execute("INSERT INTO log VALUES ('step 4')")
conn.commit()

except Exception:
conn.rollback()
raise

rows = conn.execute("SELECT msg FROM log ORDER BY rowid").fetchall()
print("Final records:")
for row in rows:
print(f" {row[0]}")
conn.close()

demo()
# Step 3 rolled back to savepoint
# Final records:
# step 1
# step 2
# step 4

Advanced - ExitStack-Based Resource Pool

Implement a ConnectionPool context manager that manages multiple database connections. When used as a context manager, it opens N connections, provides them, and closes all of them on exit - even if some fail.

Solution
import sqlite3
from contextlib import ExitStack
from dataclasses import dataclass, field
from typing import List, Optional


@dataclass
class Connection:
"""A database connection with context manager support."""
conn_id: int
db_path: str
_conn: Optional[sqlite3.Connection] = field(default=None, repr=False)

def open(self):
self._conn = sqlite3.connect(self.db_path)
print(f"Connection {self.conn_id}: opened")
return self

def execute(self, sql, params=()):
return self._conn.execute(sql, params)

def commit(self):
self._conn.commit()

def close(self):
if self._conn:
self._conn.close()
self._conn = None
print(f"Connection {self.conn_id}: closed")

def __enter__(self):
return self.open()

def __exit__(self, *args):
self.close()
return False


class ConnectionPool:
"""
A connection pool that uses ExitStack to manage multiple connections.
Guarantees all acquired connections are closed, even on partial failure.
"""

def __init__(self, db_path, pool_size=3):
self.db_path = db_path
self.pool_size = pool_size
self._connections: List[Connection] = []
self._stack: Optional[ExitStack] = None

def __enter__(self):
self._stack = ExitStack()
try:
for i in range(self.pool_size):
conn = Connection(conn_id=i, db_path=self.db_path)
# enter_context registers conn.__exit__ on the stack
# If this fails for connection i, connections 0..i-1 are still closed
self._stack.enter_context(conn)
self._connections.append(conn)
except Exception:
# If any connection fails, close all that were opened
self._stack.close()
raise

return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._connections.clear()
return self._stack.__exit__(exc_type, exc_val, exc_tb)

def get(self, index: int) -> Connection:
"""Get a specific connection from the pool."""
if index >= len(self._connections):
raise IndexError(f"No connection at index {index}")
return self._connections[index]

def execute_all(self, sql, params=()):
"""Execute SQL on all connections and collect results."""
return [conn.execute(sql, params).fetchall() for conn in self._connections]


# Demo
import tempfile, os

with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.db")

# Set up the schema
with sqlite3.connect(db_path) as setup:
setup.execute("CREATE TABLE data (conn_id INT, value TEXT)")

print("Opening pool:")
with ConnectionPool(db_path, pool_size=3) as pool:
# Each connection inserts its own records
for i in range(pool.pool_size):
conn = pool.get(i)
conn.execute("INSERT INTO data VALUES (?, ?)", (i, f"from_conn_{i}"))
conn.commit()

# Query all connections
all_results = pool.execute_all("SELECT * FROM data ORDER BY conn_id")
print("\nData visible to all connections:")
for rows in all_results:
for row in rows:
print(f" conn_id={row[0]}, value={row[1]}")

print("\nPool closed.")

# Output:
# Opening pool:
# Connection 0: opened
# Connection 1: opened
# Connection 2: opened
#
# Data visible to all connections:
# conn_id=0, value=from_conn_0
# conn_id=1, value=from_conn_1
# conn_id=2, value=from_conn_2
# (x3, once per connection's view)
#
# Connection 2: closed
# Connection 1: closed
# Connection 0: closed
# Pool closed.

Quick Reference

ToolUsagePurpose
with obj as x:Standard usagex = obj.__enter__()
with A() as a, B() as b:Multi-managerEquivalent to nested with
__enter__(self)Returns value for as varSetup; commonly returns self
__exit__(self, et, ev, tb)return True to suppressTeardown; handles exceptions
@contextmanagerDecorator on generatorCode before yield = enter, after = exit
yield valueIn @contextmanagervalue becomes the as variable
contextlib.suppress(*excs)with suppress(IOError):Suppress listed exception types
contextlib.ExitStack()Dynamic resource mgmtVariable number of context managers
stack.enter_context(cm)Inside ExitStackRegister a context manager
stack.callback(fn, *args)Inside ExitStackRegister a plain cleanup function
contextlib.nullcontext(v)Conditional managerNo-op; returns v as as var
redirect_stdout(f)with redirect_stdout(buf):Capture print() output
contextlib.closing(obj)Wraps objects with close()Adds context manager protocol

Key Takeaways

  • A context manager implements __enter__ (setup, returns the as value) and __exit__ (teardown, receives exception info). __exit__ is always called if __enter__ succeeded, even if the body raises.
  • Returning True from __exit__ suppresses the exception. Returning False or None lets it propagate. Use suppression only for explicitly expected, acceptable exceptions.
  • The with statement is a structured try/finally - it guarantees cleanup code runs. Use it for any resource with a lifecycle: files, connections, locks, timers.
  • @contextmanager lets you write a context manager as a generator: code before yield is __enter__, the yielded value is the as variable, code after yield (in finally) is __exit__.
  • contextlib.suppress is the idiomatic way to ignore expected exceptions. contextlib.ExitStack is the tool for managing a dynamic number of context managers. contextlib.nullcontext is a no-op placeholder for conditional context managers.
  • In the combined with A() as a, B() as b: syntax, managers are exited in LIFO order (B then A) - same as nested with blocks.
  • The canonical database pattern: yield session; session.commit() in the success path; session.rollback(); raise in the exception path - always with session.close() in finally.
© 2026 EngineersOfAI. All rights reserved.