Skip to main content

Async Synchronization Patterns

Before reading any explanation, predict the output of this program:

import asyncio

async def worker(semaphore, worker_id):
print(f"Worker {worker_id} waiting")
async with semaphore:
print(f"Worker {worker_id} acquired")
await asyncio.sleep(0.1)
print(f"Worker {worker_id} released")

async def main():
sem = asyncio.Semaphore(2)

tasks = [worker(sem, i) for i in range(4)]
await asyncio.gather(*tasks)

asyncio.run(main())

How many workers run concurrently? In what order do they acquire the semaphore?

# Output:
# Worker 0 waiting
# Worker 1 waiting
# Worker 2 waiting
# Worker 3 waiting
# Worker 0 acquired
# Worker 1 acquired
# Worker 0 released
# Worker 1 released
# Worker 2 acquired
# Worker 3 acquired
# Worker 2 released
# Worker 3 released

The semaphore allows exactly 2 concurrent acquisitions. Workers 0 and 1 acquire immediately, workers 2 and 3 wait. When 0 and 1 release, 2 and 3 acquire. The order is FIFO among waiters. This fundamental pattern -- bounded concurrency -- is the basis for rate limiting, connection pooling, and resource protection in async systems.

What You Will Learn

  • asyncio.Lock for mutual exclusion in single-threaded async code
  • asyncio.Semaphore and BoundedSemaphore for bounded concurrency
  • asyncio.Event for one-to-many notification
  • asyncio.Condition for complex waiting conditions
  • asyncio.Barrier (Python 3.11+) for synchronizing groups of tasks
  • Rate limiting with semaphores and token buckets
  • The circuit breaker pattern for fault tolerance
  • Real-world patterns: API rate limiting, connection pool management

Prerequisites

  • Threading locks and semaphores from the Intermediate course
  • asyncio.TaskGroup and structured concurrency from Lesson 3
  • Async context managers from Lesson 2
  • Understanding of await and the event loop from Lessons 4-5

Part 1 -- Why Async Code Needs Synchronization

A common misconception: "asyncio is single-threaded, so there are no race conditions." This is wrong. While there are no data races (simultaneous memory access), there are logical race conditions whenever a coroutine suspends at an await point:

import asyncio

balance = 100

async def withdraw(amount):
global balance
current = balance
# Other coroutines can run here!
await asyncio.sleep(0)
if current >= amount:
balance = current - amount
return True
return False

async def main():
# Both coroutines read balance=100 before either writes
results = await asyncio.gather(
withdraw(80),
withdraw(80)
)
print(f"Results: {results}") # [True, True] -- both succeed!
print(f"Balance: {balance}") # 20 -- should be -60 or one should fail

asyncio.run(main())

Both coroutines read balance = 100, then both check 100 >= 80, then both subtract. The await asyncio.sleep(0) is where the interleaving happens. A Lock prevents this.

Part 2 -- asyncio.Lock

asyncio.Lock provides mutual exclusion. Only one coroutine can hold the lock at a time. Others wait (without blocking the event loop) until the lock is released.

import asyncio

balance = 100
lock = asyncio.Lock()

async def safe_withdraw(amount):
global balance
async with lock:
current = balance
await asyncio.sleep(0) # Simulate async I/O
if current >= amount:
balance = current - amount
return True
return False

async def main():
results = await asyncio.gather(
safe_withdraw(80),
safe_withdraw(80)
)
print(f"Results: {results}") # [True, False]
print(f"Balance: {balance}") # 20

asyncio.run(main())

Lock is NOT Reentrant

Unlike threading.RLock, asyncio.Lock is not reentrant. A coroutine that already holds the lock will deadlock if it tries to acquire it again:

async def deadlock_example():
lock = asyncio.Lock()

async with lock:
print("Acquired once")
async with lock: # DEADLOCK -- waits forever
print("Never reached")
danger

If you need reentrant locking in async code, you likely have a design problem. Refactor your code so that the locked section calls functions that do not try to re-acquire the same lock. If you truly need reentrancy, track ownership manually with asyncio.current_task().

Manual Lock Usage

lock = asyncio.Lock()

# Preferred: async with
async with lock:
await do_work()

# Manual: acquire/release
await lock.acquire()
try:
await do_work()
finally:
lock.release()

# Non-blocking check
if lock.locked():
print("Lock is held by another coroutine")

Part 3 -- asyncio.Semaphore

A semaphore allows up to N concurrent acquisitions. This is the fundamental primitive for bounded concurrency.

Basic Usage

import asyncio
import time

async def limited_fetch(sem: asyncio.Semaphore, url: str) -> dict:
async with sem:
print(f"Fetching {url}")
await asyncio.sleep(0.5) # Simulate HTTP request
return {"url": url, "status": 200}

async def main():
sem = asyncio.Semaphore(3) # Max 3 concurrent fetches
urls = [f"https://api.example.com/page/{i}" for i in range(10)]

start = time.perf_counter()
tasks = [limited_fetch(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start

print(f"Fetched {len(results)} URLs in {elapsed:.1f}s")
# 10 URLs with 3 concurrent, 0.5s each = ~2.0s total
# Without semaphore: ~0.5s total

asyncio.run(main())

BoundedSemaphore

BoundedSemaphore raises ValueError if you release more times than you acquired. This catches bugs where release is called without a matching acquire:

sem = asyncio.BoundedSemaphore(3)

# This is fine
await sem.acquire()
sem.release()

# This raises ValueError
sem.release() # ValueError: BoundedSemaphore released too many times
tip

Always prefer BoundedSemaphore over Semaphore in production code. The extra check catches accidental double-releases that would silently increase the concurrency limit with a regular Semaphore.

Weighted Semaphore Pattern

Sometimes different operations need different amounts of "capacity":

class WeightedSemaphore:
"""A semaphore where different acquisitions consume different weights."""

def __init__(self, capacity: int):
self._capacity = capacity
self._used = 0
self._condition = asyncio.Condition()

async def acquire(self, weight: int = 1):
async with self._condition:
while self._used + weight > self._capacity:
await self._condition.wait()
self._used += weight

async def release(self, weight: int = 1):
async with self._condition:
self._used -= weight
self._condition.notify_all()


async def main():
sem = WeightedSemaphore(capacity=10)

async def light_task():
await sem.acquire(weight=1)
try:
await asyncio.sleep(0.1)
finally:
await sem.release(weight=1)

async def heavy_task():
await sem.acquire(weight=5)
try:
await asyncio.sleep(0.1)
finally:
await sem.release(weight=5)

# Can run 10 light tasks or 2 heavy tasks concurrently
async with asyncio.TaskGroup() as tg:
for _ in range(10):
tg.create_task(light_task())
for _ in range(3):
tg.create_task(heavy_task())

Part 4 -- asyncio.Event

An Event is a simple flag that coroutines can wait for. One coroutine sets the event, and all waiting coroutines wake up.

import asyncio

async def waiter(event: asyncio.Event, name: str):
print(f"{name}: waiting for event")
await event.wait()
print(f"{name}: event received!")

async def setter(event: asyncio.Event):
print("Setter: doing setup work")
await asyncio.sleep(1)
print("Setter: firing event")
event.set()

async def main():
event = asyncio.Event()

async with asyncio.TaskGroup() as tg:
tg.create_task(waiter(event, "A"))
tg.create_task(waiter(event, "B"))
tg.create_task(waiter(event, "C"))
tg.create_task(setter(event))

asyncio.run(main())
# Output:
# A: waiting for event
# B: waiting for event
# C: waiting for event
# Setter: doing setup work
# Setter: firing event
# A: event received!
# B: event received!
# C: event received!

Event as a Shutdown Signal

class GracefulService:
def __init__(self):
self._shutdown = asyncio.Event()

async def run(self):
async with asyncio.TaskGroup() as tg:
tg.create_task(self._worker("processor"))
tg.create_task(self._worker("indexer"))
tg.create_task(self._shutdown_watcher())

async def _worker(self, name):
while not self._shutdown.is_set():
print(f"{name}: processing")
try:
await asyncio.wait_for(
self._shutdown.wait(),
timeout=2.0 # Process every 2 seconds
)
except asyncio.TimeoutError:
pass # Normal -- continue processing
print(f"{name}: shutting down")

async def _shutdown_watcher(self):
await asyncio.sleep(5) # Run for 5 seconds
print("Initiating shutdown")
self._shutdown.set()

Event.clear() for Repeatable Signals

async def producer(event, data_queue):
for i in range(5):
await asyncio.sleep(0.5)
await data_queue.put(i)
event.set() # Signal that data is available
event.clear() # Reset for next signal

async def consumer(event, data_queue):
for _ in range(5):
await event.wait()
item = await data_queue.get()
print(f"Consumed: {item}")
note

event.clear() immediately after event.set() can cause waiters to miss the signal if they have not been scheduled yet. For reliable producer-consumer patterns, use asyncio.Queue instead, which handles the synchronization internally.

Part 5 -- asyncio.Condition

A Condition combines a lock with notification, allowing coroutines to wait until a specific condition is true:

import asyncio

class AsyncBoundedBuffer:
"""Thread-safe bounded buffer using Condition."""

def __init__(self, capacity: int):
self._buffer = []
self._capacity = capacity
self._condition = asyncio.Condition()

async def put(self, item):
async with self._condition:
while len(self._buffer) >= self._capacity:
await self._condition.wait() # Wait until space available
self._buffer.append(item)
self._condition.notify() # Wake one waiting consumer

async def get(self):
async with self._condition:
while not self._buffer:
await self._condition.wait() # Wait until item available
item = self._buffer.pop(0)
self._condition.notify() # Wake one waiting producer
return item


async def producer(buffer, producer_id):
for i in range(5):
item = f"P{producer_id}-{i}"
await buffer.put(item)
print(f"Produced: {item}")

async def consumer(buffer, consumer_id):
for _ in range(5):
item = await buffer.get()
print(f"Consumer {consumer_id} got: {item}")
await asyncio.sleep(0.1) # Simulate processing

async def main():
buffer = AsyncBoundedBuffer(capacity=3)

async with asyncio.TaskGroup() as tg:
tg.create_task(producer(buffer, 1))
tg.create_task(producer(buffer, 2))
tg.create_task(consumer(buffer, 1))
tg.create_task(consumer(buffer, 2))

asyncio.run(main())

Condition.wait_for

A convenience method that combines wait() with a predicate:

async def get_when_ready(condition, data, min_items):
async with condition:
# wait_for automatically loops on the predicate
await condition.wait_for(lambda: len(data) >= min_items)
return data[:min_items]

notify vs notify_all

  • notify(n=1): Wake at most n waiting coroutines. Use when only one consumer should process the event.
  • notify_all(): Wake ALL waiting coroutines. Use when all waiters need to re-evaluate their condition (such as shutdown).

Part 6 -- asyncio.Barrier (Python 3.11+)

A Barrier synchronizes a fixed number of tasks at a rendezvous point. All tasks must reach the barrier before any can proceed.

import asyncio

async def phase_worker(barrier: asyncio.Barrier, worker_id: int):
# Phase 1: Initialization
print(f"Worker {worker_id}: initializing")
await asyncio.sleep(worker_id * 0.1) # Different init times

# Wait for all workers to finish initialization
await barrier.wait()
print(f"Worker {worker_id}: all initialized, starting phase 2")

# Phase 2: Processing
await asyncio.sleep(0.1)
await barrier.wait()
print(f"Worker {worker_id}: all done with phase 2")

async def main():
barrier = asyncio.Barrier(3)

async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(phase_worker(barrier, i))

asyncio.run(main())
# Output:
# Worker 0: initializing
# Worker 1: initializing
# Worker 2: initializing
# Worker 0: all initialized, starting phase 2 (after all 3 init)
# Worker 1: all initialized, starting phase 2
# Worker 2: all initialized, starting phase 2
# Worker 0: all done with phase 2
# Worker 1: all done with phase 2
# Worker 2: all done with phase 2

Barrier with Action

async def summarize():
"""Called once when all parties reach the barrier."""
print("--- All workers synchronized ---")
return "sync_token"

barrier = asyncio.Barrier(3, action=summarize)

Part 7 -- Rate Limiting

Token Bucket Rate Limiter

The token bucket algorithm is the standard approach for API rate limiting:

import asyncio
import time

class TokenBucket:
"""Token bucket rate limiter for async code."""

def __init__(self, rate: float, burst: int):
"""
Args:
rate: Tokens per second (sustained rate)
burst: Maximum tokens (burst capacity)
"""
self.rate = rate
self.burst = burst
self._tokens = float(burst)
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()

async def acquire(self, tokens: int = 1):
"""Wait until tokens are available."""
while True:
async with self._lock:
self._refill()
if self._tokens >= tokens:
self._tokens -= tokens
return

# Calculate wait time for tokens to become available
wait_time = (tokens - self._tokens) / self.rate
await asyncio.sleep(max(0.01, wait_time))

def _refill(self):
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last_refill = now


async def rate_limited_fetch(limiter: TokenBucket, url: str) -> dict:
await limiter.acquire()
# Perform the actual request
await asyncio.sleep(0.05) # Simulate HTTP request
return {"url": url, "status": 200}


async def main():
# 10 requests per second, burst of 5
limiter = TokenBucket(rate=10, burst=5)

start = time.perf_counter()
tasks = [rate_limited_fetch(limiter, f"url/{i}") for i in range(20)]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start

print(f"Completed {len(results)} requests in {elapsed:.1f}s")
# First 5 requests burst immediately, remaining 15 at 10/s = ~1.5s more
# Total: ~1.5-2.0s

asyncio.run(main())

Sliding Window Rate Limiter

from collections import deque

class SlidingWindowLimiter:
"""Sliding window rate limiter."""

def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self._timestamps: deque[float] = deque()
self._lock = asyncio.Lock()

async def acquire(self):
while True:
async with self._lock:
now = time.monotonic()
# Remove timestamps outside the window
while self._timestamps and self._timestamps[0] < now - self.window:
self._timestamps.popleft()

if len(self._timestamps) < self.max_requests:
self._timestamps.append(now)
return

# Calculate when the oldest request will leave the window
wait_time = self._timestamps[0] + self.window - now

await asyncio.sleep(max(0.01, wait_time))

Part 8 -- Circuit Breaker Pattern

The circuit breaker prevents cascading failures by stopping requests to a failing service:

import asyncio
import time
from enum import Enum

class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing -- reject requests
HALF_OPEN = "half_open" # Testing if service recovered


class CircuitBreaker:
"""Async circuit breaker for external service calls."""

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
half_open_max_calls: int = 1,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls

self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time = 0.0
self._half_open_calls = 0
self._lock = asyncio.Lock()

@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time > self.recovery_timeout:
return CircuitState.HALF_OPEN
return self._state

async def call(self, func, *args, **kwargs):
"""Execute a function through the circuit breaker."""
async with self._lock:
current_state = self.state

if current_state == CircuitState.OPEN:
raise CircuitBreakerOpen(
f"Circuit is open. Retry after {self.recovery_timeout}s"
)

if current_state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerOpen("Circuit half-open: max test calls reached")
self._half_open_calls += 1

try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise

async def _on_success(self):
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
# Service recovered -- close the circuit
print("Circuit CLOSED (service recovered)")
self._state = CircuitState.CLOSED
self._failure_count = 0
self._half_open_calls = 0

async def _on_failure(self):
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.monotonic()

if self._state == CircuitState.HALF_OPEN:
# Still failing -- reopen
print("Circuit OPEN (still failing)")
self._state = CircuitState.OPEN
self._half_open_calls = 0
elif self._failure_count >= self.failure_threshold:
print(f"Circuit OPEN (threshold {self.failure_threshold} reached)")
self._state = CircuitState.OPEN


class CircuitBreakerOpen(Exception):
pass

Usage:

breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=10.0)

async def fetch_from_service(url: str) -> dict:
# Simulate a flaky service
if time.monotonic() % 2 < 1:
raise ConnectionError("Service unavailable")
return {"status": "ok"}

async def main():
for i in range(10):
try:
result = await breaker.call(fetch_from_service, f"url/{i}")
print(f"Request {i}: {result}")
except CircuitBreakerOpen as e:
print(f"Request {i}: BLOCKED - {e}")
except ConnectionError as e:
print(f"Request {i}: FAILED - {e}")
await asyncio.sleep(0.5)

asyncio.run(main())

Part 9 -- Real-World Connection Pool Management

Combining multiple synchronization primitives for a production-grade connection pool:

import asyncio
from contextlib import asynccontextmanager

class ManagedConnectionPool:
"""Production-ready connection pool with health checking."""

def __init__(self, factory, max_size: int = 10,
max_idle_time: float = 300.0):
self._factory = factory
self._max_size = max_size
self._max_idle_time = max_idle_time

self._available: asyncio.Queue = asyncio.Queue()
self._in_use: set = set()
self._semaphore = asyncio.BoundedSemaphore(max_size)
self._lock = asyncio.Lock()
self._closed = False

@asynccontextmanager
async def connection(self):
"""Acquire a connection from the pool."""
if self._closed:
raise RuntimeError("Pool is closed")

await self._semaphore.acquire()
conn = None
try:
conn = await self._get_or_create()
self._in_use.add(id(conn))
yield conn
finally:
if conn is not None:
self._in_use.discard(id(conn))
if not self._closed:
await self._available.put(conn)
self._semaphore.release()

async def _get_or_create(self):
# Try to get an existing connection
while not self._available.empty():
conn = self._available.get_nowait()
if await self._is_healthy(conn):
return conn
await self._destroy(conn)

# Create a new one
return await self._factory()

async def _is_healthy(self, conn):
try:
await conn.ping()
return True
except Exception:
return False

async def _destroy(self, conn):
try:
await conn.close()
except Exception:
pass

async def close(self):
self._closed = True
while not self._available.empty():
conn = self._available.get_nowait()
await self._destroy(conn)

@property
def stats(self):
return {
"available": self._available.qsize(),
"in_use": len(self._in_use),
"max_size": self._max_size,
}

Key Takeaways

  • Async code has logical race conditions at every await point, even though it is single-threaded. Synchronization primitives prevent interleaving bugs.
  • asyncio.Lock provides mutual exclusion. It is NOT reentrant -- attempting to re-acquire a held lock deadlocks.
  • asyncio.Semaphore allows N concurrent acquisitions. Use BoundedSemaphore in production to catch double-release bugs.
  • asyncio.Event provides one-to-many notification. set() wakes all waiters; clear() resets the flag. Prefer asyncio.Queue for producer-consumer patterns.
  • asyncio.Condition combines a lock with wait/notify for complex state-dependent waiting.
  • asyncio.Barrier (Python 3.11+) synchronizes N tasks at a rendezvous point. All must arrive before any proceed.
  • Token bucket and sliding window are the standard rate limiting algorithms. Both compose naturally with async/await.
  • The circuit breaker pattern uses state tracking and timers to prevent cascading failures when external services are down.
  • Combine primitives for production patterns: semaphore for concurrency limiting, lock for state protection, event for shutdown signals, queue for work distribution.

Graded Practice Challenges

Level 1 -- Predict the Output

Question 1: What does this print?

import asyncio

async def main():
lock = asyncio.Lock()

async def worker(name):
print(f"{name} waiting")
async with lock:
print(f"{name} acquired")
await asyncio.sleep(0.1)
print(f"{name} released")

async with asyncio.TaskGroup() as tg:
tg.create_task(worker("A"))
tg.create_task(worker("B"))

asyncio.run(main())
Answer
A waiting
B waiting
A acquired
A released
B acquired
B released

Task A is created first and acquires the lock. Task B waits. When A releases, B acquires. The lock ensures sequential access despite concurrent tasks.

Question 2: What does this print?

import asyncio

async def main():
event = asyncio.Event()
event.set()

# Event is already set before wait() is called
await event.wait()
print("first wait done")

event.clear()

# Wait with a timeout since event is cleared
try:
await asyncio.wait_for(event.wait(), timeout=0.1)
except asyncio.TimeoutError:
print("timed out")

print(f"is_set: {event.is_set()}")

asyncio.run(main())
Answer
first wait done
timed out
is_set: False

The first wait() returns immediately because the event is already set. After clear(), the second wait() blocks until timeout because nobody sets it. is_set() returns False.

Question 3: What happens with this semaphore code?

import asyncio

async def main():
sem = asyncio.Semaphore(2)

await sem.acquire()
await sem.acquire()
# sem internal counter is now 0

sem.release()
sem.release()
sem.release() # Release 3 times, acquired only 2

print(f"Can acquire: {not sem.locked()}")

asyncio.run(main())
Answer
Can acquire: True

With a regular Semaphore, the extra release() increases the counter beyond its initial value. The semaphore now allows 3 concurrent acquisitions instead of 2. This is a bug. With BoundedSemaphore, the third release() would raise ValueError.

Level 2 -- Debug Challenge

This rate limiter is supposed to allow 5 requests per second, but it allows bursts of unlimited requests. Find the bug.

import asyncio
import time

class BrokenRateLimiter:
def __init__(self, max_per_second: int):
self.max_per_second = max_per_second
self.semaphore = asyncio.Semaphore(max_per_second)

async def acquire(self):
await self.semaphore.acquire()
# Release after 1 second to allow the next request
asyncio.get_event_loop().call_later(1.0, self.semaphore.release)


async def main():
limiter = BrokenRateLimiter(5)

start = time.perf_counter()
for i in range(20):
await limiter.acquire()
elapsed = time.perf_counter() - start
print(f"Request {i} at {elapsed:.2f}s")

asyncio.run(main())
Answer

The semaphore starts with count 5, which correctly allows a burst of 5. But after 1 second, ALL 5 semaphore slots release simultaneously, allowing another burst of 5. This creates bursty behavior (5 requests instantly, wait 1s, 5 more instantly) instead of smooth rate limiting (1 request every 0.2s).

The fix is to release each token independently after its own 1-second window, OR use a token bucket that refills continuously:

class FixedRateLimiter:
def __init__(self, max_per_second: int):
self.interval = 1.0 / max_per_second # Time between requests
self._lock = asyncio.Lock()
self._last_request = 0.0

async def acquire(self):
async with self._lock:
now = time.monotonic()
wait = self._last_request + self.interval - now
if wait > 0:
await asyncio.sleep(wait)
self._last_request = time.monotonic()

This ensures requests are evenly spaced at the desired rate.

Level 3 -- Design Challenge

Design an AsyncResourceManager that:

  1. Manages a pool of heterogeneous resources (database connections, cache connections, API clients)
  2. Each resource type has its own concurrency limit
  3. Supports health checking with automatic removal of unhealthy resources
  4. Implements circuit breaking per resource type (if too many health checks fail, stop creating new ones)
  5. Provides metrics: current usage, wait times, failure counts
  6. Shuts down gracefully, waiting for in-use resources to be returned
Design Hints
from dataclasses import dataclass, field

@dataclass
class ResourceTypeConfig:
name: str
factory: Callable
max_concurrent: int
health_check: Callable | None = None
circuit_threshold: int = 5

class AsyncResourceManager:
def __init__(self):
self._types: dict[str, ResourceTypeConfig] = {}
self._semaphores: dict[str, asyncio.BoundedSemaphore] = {}
self._breakers: dict[str, CircuitBreaker] = {}
self._metrics: dict[str, dict] = {}

def register(self, config: ResourceTypeConfig):
self._types[config.name] = config
self._semaphores[config.name] = asyncio.BoundedSemaphore(
config.max_concurrent
)
self._breakers[config.name] = CircuitBreaker(
failure_threshold=config.circuit_threshold
)
self._metrics[config.name] = {
"acquired": 0, "released": 0,
"failures": 0, "total_wait_ms": 0
}

@asynccontextmanager
async def acquire(self, type_name: str):
config = self._types[type_name]
sem = self._semaphores[type_name]
breaker = self._breakers[type_name]

start = time.monotonic()
await sem.acquire()
wait_ms = (time.monotonic() - start) * 1000
self._metrics[type_name]["total_wait_ms"] += wait_ms

try:
resource = await breaker.call(config.factory)
self._metrics[type_name]["acquired"] += 1
try:
yield resource
finally:
self._metrics[type_name]["released"] += 1
except Exception:
self._metrics[type_name]["failures"] += 1
raise
finally:
sem.release()

What's Next

You now have a complete toolkit of async synchronization primitives. The final lesson in this module, Production Async Architecture, brings everything together -- error handling strategies, graceful shutdown, health checks, backpressure, testing, and structured logging -- into patterns you can deploy in production services.

© 2026 EngineersOfAI. All rights reserved.