Async Synchronization Patterns
Before reading any explanation, predict the output of this program:
import asyncio
async def worker(semaphore, worker_id):
print(f"Worker {worker_id} waiting")
async with semaphore:
print(f"Worker {worker_id} acquired")
await asyncio.sleep(0.1)
print(f"Worker {worker_id} released")
async def main():
sem = asyncio.Semaphore(2)
tasks = [worker(sem, i) for i in range(4)]
await asyncio.gather(*tasks)
asyncio.run(main())
How many workers run concurrently? In what order do they acquire the semaphore?
# Output:
# Worker 0 waiting
# Worker 1 waiting
# Worker 2 waiting
# Worker 3 waiting
# Worker 0 acquired
# Worker 1 acquired
# Worker 0 released
# Worker 1 released
# Worker 2 acquired
# Worker 3 acquired
# Worker 2 released
# Worker 3 released
The semaphore allows exactly 2 concurrent acquisitions. Workers 0 and 1 acquire immediately, workers 2 and 3 wait. When 0 and 1 release, 2 and 3 acquire. The order is FIFO among waiters. This fundamental pattern -- bounded concurrency -- is the basis for rate limiting, connection pooling, and resource protection in async systems.
What You Will Learn
asyncio.Lockfor mutual exclusion in single-threaded async codeasyncio.SemaphoreandBoundedSemaphorefor bounded concurrencyasyncio.Eventfor one-to-many notificationasyncio.Conditionfor complex waiting conditionsasyncio.Barrier(Python 3.11+) for synchronizing groups of tasks- Rate limiting with semaphores and token buckets
- The circuit breaker pattern for fault tolerance
- Real-world patterns: API rate limiting, connection pool management
Prerequisites
- Threading locks and semaphores from the Intermediate course
asyncio.TaskGroupand structured concurrency from Lesson 3- Async context managers from Lesson 2
- Understanding of
awaitand the event loop from Lessons 4-5
Part 1 -- Why Async Code Needs Synchronization
A common misconception: "asyncio is single-threaded, so there are no race conditions." This is wrong. While there are no data races (simultaneous memory access), there are logical race conditions whenever a coroutine suspends at an await point:
import asyncio
balance = 100
async def withdraw(amount):
global balance
current = balance
# Other coroutines can run here!
await asyncio.sleep(0)
if current >= amount:
balance = current - amount
return True
return False
async def main():
# Both coroutines read balance=100 before either writes
results = await asyncio.gather(
withdraw(80),
withdraw(80)
)
print(f"Results: {results}") # [True, True] -- both succeed!
print(f"Balance: {balance}") # 20 -- should be -60 or one should fail
asyncio.run(main())
Both coroutines read balance = 100, then both check 100 >= 80, then both subtract. The await asyncio.sleep(0) is where the interleaving happens. A Lock prevents this.
Part 2 -- asyncio.Lock
asyncio.Lock provides mutual exclusion. Only one coroutine can hold the lock at a time. Others wait (without blocking the event loop) until the lock is released.
import asyncio
balance = 100
lock = asyncio.Lock()
async def safe_withdraw(amount):
global balance
async with lock:
current = balance
await asyncio.sleep(0) # Simulate async I/O
if current >= amount:
balance = current - amount
return True
return False
async def main():
results = await asyncio.gather(
safe_withdraw(80),
safe_withdraw(80)
)
print(f"Results: {results}") # [True, False]
print(f"Balance: {balance}") # 20
asyncio.run(main())
Lock is NOT Reentrant
Unlike threading.RLock, asyncio.Lock is not reentrant. A coroutine that already holds the lock will deadlock if it tries to acquire it again:
async def deadlock_example():
lock = asyncio.Lock()
async with lock:
print("Acquired once")
async with lock: # DEADLOCK -- waits forever
print("Never reached")
If you need reentrant locking in async code, you likely have a design problem. Refactor your code so that the locked section calls functions that do not try to re-acquire the same lock. If you truly need reentrancy, track ownership manually with asyncio.current_task().
Manual Lock Usage
lock = asyncio.Lock()
# Preferred: async with
async with lock:
await do_work()
# Manual: acquire/release
await lock.acquire()
try:
await do_work()
finally:
lock.release()
# Non-blocking check
if lock.locked():
print("Lock is held by another coroutine")
Part 3 -- asyncio.Semaphore
A semaphore allows up to N concurrent acquisitions. This is the fundamental primitive for bounded concurrency.
Basic Usage
import asyncio
import time
async def limited_fetch(sem: asyncio.Semaphore, url: str) -> dict:
async with sem:
print(f"Fetching {url}")
await asyncio.sleep(0.5) # Simulate HTTP request
return {"url": url, "status": 200}
async def main():
sem = asyncio.Semaphore(3) # Max 3 concurrent fetches
urls = [f"https://api.example.com/page/{i}" for i in range(10)]
start = time.perf_counter()
tasks = [limited_fetch(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Fetched {len(results)} URLs in {elapsed:.1f}s")
# 10 URLs with 3 concurrent, 0.5s each = ~2.0s total
# Without semaphore: ~0.5s total
asyncio.run(main())
BoundedSemaphore
BoundedSemaphore raises ValueError if you release more times than you acquired. This catches bugs where release is called without a matching acquire:
sem = asyncio.BoundedSemaphore(3)
# This is fine
await sem.acquire()
sem.release()
# This raises ValueError
sem.release() # ValueError: BoundedSemaphore released too many times
Always prefer BoundedSemaphore over Semaphore in production code. The extra check catches accidental double-releases that would silently increase the concurrency limit with a regular Semaphore.
Weighted Semaphore Pattern
Sometimes different operations need different amounts of "capacity":
class WeightedSemaphore:
"""A semaphore where different acquisitions consume different weights."""
def __init__(self, capacity: int):
self._capacity = capacity
self._used = 0
self._condition = asyncio.Condition()
async def acquire(self, weight: int = 1):
async with self._condition:
while self._used + weight > self._capacity:
await self._condition.wait()
self._used += weight
async def release(self, weight: int = 1):
async with self._condition:
self._used -= weight
self._condition.notify_all()
async def main():
sem = WeightedSemaphore(capacity=10)
async def light_task():
await sem.acquire(weight=1)
try:
await asyncio.sleep(0.1)
finally:
await sem.release(weight=1)
async def heavy_task():
await sem.acquire(weight=5)
try:
await asyncio.sleep(0.1)
finally:
await sem.release(weight=5)
# Can run 10 light tasks or 2 heavy tasks concurrently
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tg.create_task(light_task())
for _ in range(3):
tg.create_task(heavy_task())
Part 4 -- asyncio.Event
An Event is a simple flag that coroutines can wait for. One coroutine sets the event, and all waiting coroutines wake up.
import asyncio
async def waiter(event: asyncio.Event, name: str):
print(f"{name}: waiting for event")
await event.wait()
print(f"{name}: event received!")
async def setter(event: asyncio.Event):
print("Setter: doing setup work")
await asyncio.sleep(1)
print("Setter: firing event")
event.set()
async def main():
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
tg.create_task(waiter(event, "A"))
tg.create_task(waiter(event, "B"))
tg.create_task(waiter(event, "C"))
tg.create_task(setter(event))
asyncio.run(main())
# Output:
# A: waiting for event
# B: waiting for event
# C: waiting for event
# Setter: doing setup work
# Setter: firing event
# A: event received!
# B: event received!
# C: event received!
Event as a Shutdown Signal
class GracefulService:
def __init__(self):
self._shutdown = asyncio.Event()
async def run(self):
async with asyncio.TaskGroup() as tg:
tg.create_task(self._worker("processor"))
tg.create_task(self._worker("indexer"))
tg.create_task(self._shutdown_watcher())
async def _worker(self, name):
while not self._shutdown.is_set():
print(f"{name}: processing")
try:
await asyncio.wait_for(
self._shutdown.wait(),
timeout=2.0 # Process every 2 seconds
)
except asyncio.TimeoutError:
pass # Normal -- continue processing
print(f"{name}: shutting down")
async def _shutdown_watcher(self):
await asyncio.sleep(5) # Run for 5 seconds
print("Initiating shutdown")
self._shutdown.set()
Event.clear() for Repeatable Signals
async def producer(event, data_queue):
for i in range(5):
await asyncio.sleep(0.5)
await data_queue.put(i)
event.set() # Signal that data is available
event.clear() # Reset for next signal
async def consumer(event, data_queue):
for _ in range(5):
await event.wait()
item = await data_queue.get()
print(f"Consumed: {item}")
event.clear() immediately after event.set() can cause waiters to miss the signal if they have not been scheduled yet. For reliable producer-consumer patterns, use asyncio.Queue instead, which handles the synchronization internally.
Part 5 -- asyncio.Condition
A Condition combines a lock with notification, allowing coroutines to wait until a specific condition is true:
import asyncio
class AsyncBoundedBuffer:
"""Thread-safe bounded buffer using Condition."""
def __init__(self, capacity: int):
self._buffer = []
self._capacity = capacity
self._condition = asyncio.Condition()
async def put(self, item):
async with self._condition:
while len(self._buffer) >= self._capacity:
await self._condition.wait() # Wait until space available
self._buffer.append(item)
self._condition.notify() # Wake one waiting consumer
async def get(self):
async with self._condition:
while not self._buffer:
await self._condition.wait() # Wait until item available
item = self._buffer.pop(0)
self._condition.notify() # Wake one waiting producer
return item
async def producer(buffer, producer_id):
for i in range(5):
item = f"P{producer_id}-{i}"
await buffer.put(item)
print(f"Produced: {item}")
async def consumer(buffer, consumer_id):
for _ in range(5):
item = await buffer.get()
print(f"Consumer {consumer_id} got: {item}")
await asyncio.sleep(0.1) # Simulate processing
async def main():
buffer = AsyncBoundedBuffer(capacity=3)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(buffer, 1))
tg.create_task(producer(buffer, 2))
tg.create_task(consumer(buffer, 1))
tg.create_task(consumer(buffer, 2))
asyncio.run(main())
Condition.wait_for
A convenience method that combines wait() with a predicate:
async def get_when_ready(condition, data, min_items):
async with condition:
# wait_for automatically loops on the predicate
await condition.wait_for(lambda: len(data) >= min_items)
return data[:min_items]
notify vs notify_all
notify(n=1): Wake at mostnwaiting coroutines. Use when only one consumer should process the event.notify_all(): Wake ALL waiting coroutines. Use when all waiters need to re-evaluate their condition (such as shutdown).
Part 6 -- asyncio.Barrier (Python 3.11+)
A Barrier synchronizes a fixed number of tasks at a rendezvous point. All tasks must reach the barrier before any can proceed.
import asyncio
async def phase_worker(barrier: asyncio.Barrier, worker_id: int):
# Phase 1: Initialization
print(f"Worker {worker_id}: initializing")
await asyncio.sleep(worker_id * 0.1) # Different init times
# Wait for all workers to finish initialization
await barrier.wait()
print(f"Worker {worker_id}: all initialized, starting phase 2")
# Phase 2: Processing
await asyncio.sleep(0.1)
await barrier.wait()
print(f"Worker {worker_id}: all done with phase 2")
async def main():
barrier = asyncio.Barrier(3)
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(phase_worker(barrier, i))
asyncio.run(main())
# Output:
# Worker 0: initializing
# Worker 1: initializing
# Worker 2: initializing
# Worker 0: all initialized, starting phase 2 (after all 3 init)
# Worker 1: all initialized, starting phase 2
# Worker 2: all initialized, starting phase 2
# Worker 0: all done with phase 2
# Worker 1: all done with phase 2
# Worker 2: all done with phase 2
Barrier with Action
async def summarize():
"""Called once when all parties reach the barrier."""
print("--- All workers synchronized ---")
return "sync_token"
barrier = asyncio.Barrier(3, action=summarize)
Part 7 -- Rate Limiting
Token Bucket Rate Limiter
The token bucket algorithm is the standard approach for API rate limiting:
import asyncio
import time
class TokenBucket:
"""Token bucket rate limiter for async code."""
def __init__(self, rate: float, burst: int):
"""
Args:
rate: Tokens per second (sustained rate)
burst: Maximum tokens (burst capacity)
"""
self.rate = rate
self.burst = burst
self._tokens = float(burst)
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1):
"""Wait until tokens are available."""
while True:
async with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return
# Calculate wait time for tokens to become available
wait_time = (tokens - self._tokens) / self.rate
await asyncio.sleep(max(0.01, wait_time))
def _refill(self):
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last_refill = now
async def rate_limited_fetch(limiter: TokenBucket, url: str) -> dict:
await limiter.acquire()
# Perform the actual request
await asyncio.sleep(0.05) # Simulate HTTP request
return {"url": url, "status": 200}
async def main():
# 10 requests per second, burst of 5
limiter = TokenBucket(rate=10, burst=5)
start = time.perf_counter()
tasks = [rate_limited_fetch(limiter, f"url/{i}") for i in range(20)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Completed {len(results)} requests in {elapsed:.1f}s")
# First 5 requests burst immediately, remaining 15 at 10/s = ~1.5s more
# Total: ~1.5-2.0s
asyncio.run(main())
Sliding Window Rate Limiter
from collections import deque
class SlidingWindowLimiter:
"""Sliding window rate limiter."""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self._timestamps: deque[float] = deque()
self._lock = asyncio.Lock()
async def acquire(self):
while True:
async with self._lock:
now = time.monotonic()
# Remove timestamps outside the window
while self._timestamps and self._timestamps[0] < now - self.window:
self._timestamps.popleft()
if len(self._timestamps) < self.max_requests:
self._timestamps.append(now)
return
# Calculate when the oldest request will leave the window
wait_time = self._timestamps[0] + self.window - now
await asyncio.sleep(max(0.01, wait_time))
Part 8 -- Circuit Breaker Pattern
The circuit breaker prevents cascading failures by stopping requests to a failing service:
import asyncio
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing -- reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""Async circuit breaker for external service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 1,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time = 0.0
self._half_open_calls = 0
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time > self.recovery_timeout:
return CircuitState.HALF_OPEN
return self._state
async def call(self, func, *args, **kwargs):
"""Execute a function through the circuit breaker."""
async with self._lock:
current_state = self.state
if current_state == CircuitState.OPEN:
raise CircuitBreakerOpen(
f"Circuit is open. Retry after {self.recovery_timeout}s"
)
if current_state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerOpen("Circuit half-open: max test calls reached")
self._half_open_calls += 1
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise
async def _on_success(self):
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
# Service recovered -- close the circuit
print("Circuit CLOSED (service recovered)")
self._state = CircuitState.CLOSED
self._failure_count = 0
self._half_open_calls = 0
async def _on_failure(self):
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._state == CircuitState.HALF_OPEN:
# Still failing -- reopen
print("Circuit OPEN (still failing)")
self._state = CircuitState.OPEN
self._half_open_calls = 0
elif self._failure_count >= self.failure_threshold:
print(f"Circuit OPEN (threshold {self.failure_threshold} reached)")
self._state = CircuitState.OPEN
class CircuitBreakerOpen(Exception):
pass
Usage:
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10.0)
async def fetch_from_service(url: str) -> dict:
# Simulate a flaky service
if time.monotonic() % 2 < 1:
raise ConnectionError("Service unavailable")
return {"status": "ok"}
async def main():
for i in range(10):
try:
result = await breaker.call(fetch_from_service, f"url/{i}")
print(f"Request {i}: {result}")
except CircuitBreakerOpen as e:
print(f"Request {i}: BLOCKED - {e}")
except ConnectionError as e:
print(f"Request {i}: FAILED - {e}")
await asyncio.sleep(0.5)
asyncio.run(main())
Part 9 -- Real-World Connection Pool Management
Combining multiple synchronization primitives for a production-grade connection pool:
import asyncio
from contextlib import asynccontextmanager
class ManagedConnectionPool:
"""Production-ready connection pool with health checking."""
def __init__(self, factory, max_size: int = 10,
max_idle_time: float = 300.0):
self._factory = factory
self._max_size = max_size
self._max_idle_time = max_idle_time
self._available: asyncio.Queue = asyncio.Queue()
self._in_use: set = set()
self._semaphore = asyncio.BoundedSemaphore(max_size)
self._lock = asyncio.Lock()
self._closed = False
@asynccontextmanager
async def connection(self):
"""Acquire a connection from the pool."""
if self._closed:
raise RuntimeError("Pool is closed")
await self._semaphore.acquire()
conn = None
try:
conn = await self._get_or_create()
self._in_use.add(id(conn))
yield conn
finally:
if conn is not None:
self._in_use.discard(id(conn))
if not self._closed:
await self._available.put(conn)
self._semaphore.release()
async def _get_or_create(self):
# Try to get an existing connection
while not self._available.empty():
conn = self._available.get_nowait()
if await self._is_healthy(conn):
return conn
await self._destroy(conn)
# Create a new one
return await self._factory()
async def _is_healthy(self, conn):
try:
await conn.ping()
return True
except Exception:
return False
async def _destroy(self, conn):
try:
await conn.close()
except Exception:
pass
async def close(self):
self._closed = True
while not self._available.empty():
conn = self._available.get_nowait()
await self._destroy(conn)
@property
def stats(self):
return {
"available": self._available.qsize(),
"in_use": len(self._in_use),
"max_size": self._max_size,
}
Key Takeaways
- Async code has logical race conditions at every
awaitpoint, even though it is single-threaded. Synchronization primitives prevent interleaving bugs. asyncio.Lockprovides mutual exclusion. It is NOT reentrant -- attempting to re-acquire a held lock deadlocks.asyncio.Semaphoreallows N concurrent acquisitions. UseBoundedSemaphorein production to catch double-release bugs.asyncio.Eventprovides one-to-many notification.set()wakes all waiters;clear()resets the flag. Preferasyncio.Queuefor producer-consumer patterns.asyncio.Conditioncombines a lock with wait/notify for complex state-dependent waiting.asyncio.Barrier(Python 3.11+) synchronizes N tasks at a rendezvous point. All must arrive before any proceed.- Token bucket and sliding window are the standard rate limiting algorithms. Both compose naturally with async/await.
- The circuit breaker pattern uses state tracking and timers to prevent cascading failures when external services are down.
- Combine primitives for production patterns: semaphore for concurrency limiting, lock for state protection, event for shutdown signals, queue for work distribution.
Graded Practice Challenges
Level 1 -- Predict the Output
Question 1: What does this print?
import asyncio
async def main():
lock = asyncio.Lock()
async def worker(name):
print(f"{name} waiting")
async with lock:
print(f"{name} acquired")
await asyncio.sleep(0.1)
print(f"{name} released")
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("A"))
tg.create_task(worker("B"))
asyncio.run(main())
Answer
A waiting
B waiting
A acquired
A released
B acquired
B released
Task A is created first and acquires the lock. Task B waits. When A releases, B acquires. The lock ensures sequential access despite concurrent tasks.
Question 2: What does this print?
import asyncio
async def main():
event = asyncio.Event()
event.set()
# Event is already set before wait() is called
await event.wait()
print("first wait done")
event.clear()
# Wait with a timeout since event is cleared
try:
await asyncio.wait_for(event.wait(), timeout=0.1)
except asyncio.TimeoutError:
print("timed out")
print(f"is_set: {event.is_set()}")
asyncio.run(main())
Answer
first wait done
timed out
is_set: False
The first wait() returns immediately because the event is already set. After clear(), the second wait() blocks until timeout because nobody sets it. is_set() returns False.
Question 3: What happens with this semaphore code?
import asyncio
async def main():
sem = asyncio.Semaphore(2)
await sem.acquire()
await sem.acquire()
# sem internal counter is now 0
sem.release()
sem.release()
sem.release() # Release 3 times, acquired only 2
print(f"Can acquire: {not sem.locked()}")
asyncio.run(main())
Answer
Can acquire: True
With a regular Semaphore, the extra release() increases the counter beyond its initial value. The semaphore now allows 3 concurrent acquisitions instead of 2. This is a bug. With BoundedSemaphore, the third release() would raise ValueError.
Level 2 -- Debug Challenge
This rate limiter is supposed to allow 5 requests per second, but it allows bursts of unlimited requests. Find the bug.
import asyncio
import time
class BrokenRateLimiter:
def __init__(self, max_per_second: int):
self.max_per_second = max_per_second
self.semaphore = asyncio.Semaphore(max_per_second)
async def acquire(self):
await self.semaphore.acquire()
# Release after 1 second to allow the next request
asyncio.get_event_loop().call_later(1.0, self.semaphore.release)
async def main():
limiter = BrokenRateLimiter(5)
start = time.perf_counter()
for i in range(20):
await limiter.acquire()
elapsed = time.perf_counter() - start
print(f"Request {i} at {elapsed:.2f}s")
asyncio.run(main())
Answer
The semaphore starts with count 5, which correctly allows a burst of 5. But after 1 second, ALL 5 semaphore slots release simultaneously, allowing another burst of 5. This creates bursty behavior (5 requests instantly, wait 1s, 5 more instantly) instead of smooth rate limiting (1 request every 0.2s).
The fix is to release each token independently after its own 1-second window, OR use a token bucket that refills continuously:
class FixedRateLimiter:
def __init__(self, max_per_second: int):
self.interval = 1.0 / max_per_second # Time between requests
self._lock = asyncio.Lock()
self._last_request = 0.0
async def acquire(self):
async with self._lock:
now = time.monotonic()
wait = self._last_request + self.interval - now
if wait > 0:
await asyncio.sleep(wait)
self._last_request = time.monotonic()
This ensures requests are evenly spaced at the desired rate.
Level 3 -- Design Challenge
Design an AsyncResourceManager that:
- Manages a pool of heterogeneous resources (database connections, cache connections, API clients)
- Each resource type has its own concurrency limit
- Supports health checking with automatic removal of unhealthy resources
- Implements circuit breaking per resource type (if too many health checks fail, stop creating new ones)
- Provides metrics: current usage, wait times, failure counts
- Shuts down gracefully, waiting for in-use resources to be returned
Design Hints
from dataclasses import dataclass, field
@dataclass
class ResourceTypeConfig:
name: str
factory: Callable
max_concurrent: int
health_check: Callable | None = None
circuit_threshold: int = 5
class AsyncResourceManager:
def __init__(self):
self._types: dict[str, ResourceTypeConfig] = {}
self._semaphores: dict[str, asyncio.BoundedSemaphore] = {}
self._breakers: dict[str, CircuitBreaker] = {}
self._metrics: dict[str, dict] = {}
def register(self, config: ResourceTypeConfig):
self._types[config.name] = config
self._semaphores[config.name] = asyncio.BoundedSemaphore(
config.max_concurrent
)
self._breakers[config.name] = CircuitBreaker(
failure_threshold=config.circuit_threshold
)
self._metrics[config.name] = {
"acquired": 0, "released": 0,
"failures": 0, "total_wait_ms": 0
}
@asynccontextmanager
async def acquire(self, type_name: str):
config = self._types[type_name]
sem = self._semaphores[type_name]
breaker = self._breakers[type_name]
start = time.monotonic()
await sem.acquire()
wait_ms = (time.monotonic() - start) * 1000
self._metrics[type_name]["total_wait_ms"] += wait_ms
try:
resource = await breaker.call(config.factory)
self._metrics[type_name]["acquired"] += 1
try:
yield resource
finally:
self._metrics[type_name]["released"] += 1
except Exception:
self._metrics[type_name]["failures"] += 1
raise
finally:
sem.release()
What's Next
You now have a complete toolkit of async synchronization primitives. The final lesson in this module, Production Async Architecture, brings everything together -- error handling strategies, graceful shutdown, health checks, backpressure, testing, and structured logging -- into patterns you can deploy in production services.
