Python Async Synchronization Patterns: Practice Problems & Exercises
Practice: Async Synchronization Patterns
← Back to lessonEasy
Use asyncio.Lock to protect a shared counter from concurrent modification by multiple tasks.
import asyncio
async def increment(counter, lock, n):
for _ in range(n):
async with lock:
counter[0] += 1
async def main():
counter = [0]
lock = asyncio.Lock()
await asyncio.gather(
increment(counter, lock, 500),
increment(counter, lock, 500),
)
print(f"Final counter: {counter[0]}")
asyncio.run(main())
Solution
import asyncio
async def increment(counter, lock, n):
for _ in range(n):
async with lock:
counter[0] += 1
async def main():
counter = [0]
lock = asyncio.Lock()
await asyncio.gather(
increment(counter, lock, 500),
increment(counter, lock, 500),
)
print(f"Final counter: {counter[0]}")
asyncio.run(main())
Lock mechanics:
asyncio.Lockis a mutual exclusion primitive for async code. Only one coroutine holds it at a time.async with lock:acquires on entry, releases on exit — even if an exception occurs.- In this example, the counter update is safe because only one coroutine executes the
counter[0] += 1line at a time. - Asyncio is single-threaded, so technically the counter would be correct without a lock in this specific case (no
awaitinside the critical section). But as soon as the critical section includes anawait, interleaving can corrupt state — the lock prevents that. - Always add a lock when a critical section contains an
await.
Expected Output
Final counter: 1000Hints
Hint 1: Without a lock, concurrent tasks reading and writing the same variable can interleave, causing lost updates.
Hint 2: Use async with lock: to protect the critical section.
Use a Semaphore(3) to limit concurrent "API calls" to at most 3 at a time.
import asyncio
async def api_call(task_id, semaphore, active):
async with semaphore:
active.add(task_id)
assert len(active) <= 3, f"Too many concurrent tasks: {len(active)}"
await asyncio.sleep(0.01)
active.discard(task_id)
async def main():
semaphore = asyncio.Semaphore(3)
active = set()
await asyncio.gather(*[api_call(i, semaphore, active) for i in range(10)])
print("At most 3 tasks running concurrently")
asyncio.run(main())
Solution
import asyncio
async def api_call(task_id, semaphore, active):
async with semaphore:
active.add(task_id)
assert len(active) <= 3, f"Too many concurrent tasks: {len(active)}"
await asyncio.sleep(0.01)
active.discard(task_id)
async def main():
semaphore = asyncio.Semaphore(3)
active = set()
await asyncio.gather(*[api_call(i, semaphore, active) for i in range(10)])
print("At most 3 tasks running concurrently")
asyncio.run(main())
Semaphore use cases:
- Rate limiting API calls: limit to N concurrent requests to avoid overloading a downstream service.
- Connection pool: limit to N active database connections.
Semaphore(1)is equivalent toLockbut allows more flexible semantics (e.g., releasing from a different coroutine).- Unlike a Lock, a Semaphore does not track ownership — any coroutine can release it.
asyncio.BoundedSemaphore(n)raisesValueErrorif you release more times than you acquired, preventing accidental over-release bugs.
Expected Output
At most 3 tasks running concurrentlyHints
Hint 1: asyncio.Semaphore(n) allows at most n coroutines to proceed past the acquire point simultaneously.
Hint 2: Use async with semaphore: to automatically acquire and release.
Use asyncio.Event to signal a worker to start processing after an initialization step completes.
import asyncio
async def worker(ready_event):
print("Worker waiting for signal")
await ready_event.wait()
print("Worker received signal and proceeding")
async def initializer(ready_event):
await asyncio.sleep(0.02) # simulate startup
print("Signal sent")
ready_event.set()
async def main():
ready = asyncio.Event()
await asyncio.gather(
worker(ready),
initializer(ready),
)
asyncio.run(main())
Solution
import asyncio
async def worker(ready_event):
print("Worker waiting for signal")
await ready_event.wait()
print("Worker received signal and proceeding")
async def initializer(ready_event):
await asyncio.sleep(0.02)
print("Signal sent")
ready_event.set()
async def main():
ready = asyncio.Event()
await asyncio.gather(
worker(ready),
initializer(ready),
)
asyncio.run(main())
asyncio.Event uses:
- One-shot broadcast: one
set()wakes ALL waiters simultaneously (unlike a Lock, which wakes one). - Startup sequencing: wait for a service to be ready before processing requests.
- Shutdown signaling: set the event on SIGINT to trigger graceful shutdown.
- After
set(), futurewait()calls return immediately without suspending. event.is_set()is a non-blocking check — useful for guard conditions.event.clear()resets the event for reuse in the next cycle.
Expected Output
Worker waiting for signal\nSignal sent\nWorker received signal and proceedingHints
Hint 1: event.set() signals all waiters. event.wait() suspends until the event is set.
Hint 2: Once set, an Event stays set. Use event.clear() to reset it.
Medium
Use asyncio.Condition to implement a producer-consumer pattern where the consumer waits until items are available.
import asyncio
from collections import deque
async def producer(queue, condition, items):
for item in items:
await asyncio.sleep(0.01)
async with condition:
queue.append(item)
print(f"Producer added item: {item}")
condition.notify()
async def consumer(queue, condition, n):
for _ in range(n):
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.popleft()
print(f"Consumer got: {item}")
async def main():
queue = deque()
condition = asyncio.Condition()
print("Consumer waiting for items")
await asyncio.gather(
producer(queue, condition, ["A", "B"]),
consumer(queue, condition, 2),
)
asyncio.run(main())
Solution
import asyncio
from collections import deque
async def producer(queue, condition, items):
for item in items:
await asyncio.sleep(0.01)
async with condition:
queue.append(item)
print(f"Producer added item: {item}")
condition.notify()
async def consumer(queue, condition, n):
for _ in range(n):
print("Consumer waiting for items")
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.popleft()
print(f"Consumer got: {item}")
async def main():
queue = deque()
condition = asyncio.Condition()
await asyncio.gather(
producer(queue, condition, ["A", "B"]),
consumer(queue, condition, 2),
)
asyncio.run(main())
Condition variable mechanics:
Conditioncombines a Lock with a wait/notify mechanism.async with condition:acquires the underlying lock.condition.wait_for(pred)atomically releases the lock, suspends untilpred()is True, then reacquires the lock.condition.notify()wakes ONE waiter.notify_all()wakes all waiters.- Always check the predicate in a loop (or use
wait_for) to handle spurious wakeups. - Use case: thread-safe queues, bounded buffers, pipeline flow control.
Expected Output
Consumer waiting for items\nProducer added item: A\nConsumer got: A\nProducer added item: B\nConsumer got: BHints
Hint 1: condition.wait_for(predicate) suspends until predicate() returns True.
Hint 2: Always acquire the condition before calling wait_for or notify.
Implement an async token bucket rate limiter that allows bursting up to max_tokens and refills at rate tokens per second.
import asyncio
import time
class TokenBucket:
def __init__(self, rate, max_tokens):
self._rate = rate
self._max = max_tokens
self._tokens = max_tokens
self._last = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self._max, self._tokens + elapsed * self._rate)
self._last = now
if self._tokens < 1:
wait = (1 - self._tokens) / self._rate
await asyncio.sleep(wait)
self._tokens = 0
else:
self._tokens -= 1
async def main():
limiter = TokenBucket(rate=5, max_tokens=10)
tasks = []
async def request(i):
await limiter.acquire()
await asyncio.gather(*[request(i) for i in range(15)])
print("All requests completed — rate limited to 5/s with burst of 10")
asyncio.run(main())
Solution
import asyncio
import time
class TokenBucket:
def __init__(self, rate, max_tokens):
self._rate = rate
self._max = max_tokens
self._tokens = max_tokens
self._last = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self._max, self._tokens + elapsed * self._rate)
self._last = now
if self._tokens < 1:
wait = (1 - self._tokens) / self._rate
await asyncio.sleep(wait)
self._tokens = 0
else:
self._tokens -= 1
async def main():
limiter = TokenBucket(rate=5, max_tokens=10)
async def request(i):
await limiter.acquire()
await asyncio.gather(*[request(i) for i in range(15)])
print("All requests completed — rate limited to 5/s with burst of 10")
asyncio.run(main())
Token bucket algorithm:
- Tokens accumulate at
rateper second, up tomax_tokens— this is the burst capacity. - Each request consumes one token. If the bucket is empty, we sleep until a token is available.
- The
Lockensures token accounting is atomic — no two coroutines compute the refill simultaneously. await asyncio.sleep(wait)while holding the lock means other requesters queue up — this is intentional (serialized waiting).- For high-throughput scenarios, release the lock before sleeping and re-check after waking.
- Token bucket vs leaky bucket: token bucket allows bursting (idle time accumulates capacity); leaky bucket enforces strictly constant rate.
Expected Output
All requests completed — rate limited to 5/s with burst of 10Hints
Hint 1: A token bucket fills at a constant rate up to a maximum. Each request consumes one token.
Hint 2: If no tokens are available, sleep until the next refill time.
Implement a bounded producer-consumer pipeline using asyncio.Queue with backpressure.
import asyncio
async def producer(queue, n):
for i in range(n):
await queue.put(i) # blocks if queue is full
print("Producer done")
async def consumer(queue):
while True:
item = await queue.get()
await asyncio.sleep(0.005) # simulate processing
queue.task_done()
if item == 9: # last item
break
print("Consumer done")
async def main():
queue = asyncio.Queue(maxsize=3) # at most 3 items buffered
await asyncio.gather(
producer(queue, 10),
consumer(queue),
)
print("All 10 items processed")
asyncio.run(main())
Solution
import asyncio
async def producer(queue, n):
for i in range(n):
await queue.put(i)
print("Producer done")
async def consumer(queue):
while True:
item = await queue.get()
await asyncio.sleep(0.005)
queue.task_done()
if item == 9:
break
print("Consumer done")
async def main():
queue = asyncio.Queue(maxsize=3)
await asyncio.gather(
producer(queue, 10),
consumer(queue),
)
print("All 10 items processed")
asyncio.run(main())
asyncio.Queue backpressure:
Queue(maxsize=3)blocksqueue.put()when the queue has 3 items — the producer backs off automatically.- This is natural backpressure: if the consumer is slow, the producer slows down to match.
queue.task_done()signals that an item has been processed.queue.join()waits until all items are processed.queue.put_nowait()/queue.get_nowait()raiseQueueFull/QueueEmptyinstead of blocking — use for non-blocking checks.- Production pattern: multiple workers call
queue.get()concurrently for parallel processing.queue.join()in the main coroutine waits until all items are processed.
Expected Output
Producer done\nConsumer done\nAll 10 items processedHints
Hint 1: asyncio.Queue(maxsize) blocks the producer when full — this is natural backpressure.
Hint 2: queue.join() waits until all items have been processed (task_done() called for each).
Use asyncio.Barrier to synchronize three tasks at a checkpoint — all must arrive before any can continue.
import asyncio
async def worker(worker_id, barrier):
# Phase 1: setup
await asyncio.sleep(worker_id * 0.005)
print(f"Worker {worker_id} reached barrier")
await barrier.wait() # block until all workers arrive
print(f"Worker {worker_id} continuing")
async def main():
barrier = asyncio.Barrier(3)
await asyncio.gather(*[worker(i, barrier) for i in range(3)])
print("All workers continuing")
asyncio.run(main())
Solution
import asyncio
async def worker(worker_id, barrier):
await asyncio.sleep(worker_id * 0.005)
print(f"Worker {worker_id} reached barrier")
await barrier.wait()
print(f"Worker {worker_id} continuing")
async def main():
barrier = asyncio.Barrier(3)
await asyncio.gather(*[worker(i, barrier) for i in range(3)])
print("All workers continuing")
asyncio.run(main())
Barrier use cases:
- Phased computation: all workers complete phase 1, synchronize, then all proceed to phase 2.
- Distributed testing: all test processes reach a checkpoint before measuring results.
- MapReduce-style: all mappers finish before any reducer starts.
barrier.wait()returns the number of parties that arrived at this round (0 means the barrier was broken).barrier.brokenis True if the barrier was broken (a waiting party was cancelled).- Barriers are reusable — after all parties pass, the counter resets for the next round.
Expected Output
All 3 workers reached barrier\nAll workers continuingHints
Hint 1: asyncio.Barrier(n) blocks until n coroutines have called barrier.wait(). Then all are released simultaneously.
Hint 2: Available in Python 3.11+.
Hard
Implement an async circuit breaker that opens after a failure threshold and resets after a timeout.
import asyncio
import time
from enum import Enum
class State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half-open"
class CircuitBreaker:
def __init__(self, threshold=3, timeout=0.1):
self._threshold = threshold
self._timeout = timeout
self._failures = 0
self._state = State.CLOSED
self._opened_at = None
async def call(self, coro_fn, *args):
if self._state == State.OPEN:
elapsed = time.monotonic() - self._opened_at
if elapsed >= self._timeout:
self._state = State.HALF_OPEN
else:
raise RuntimeError("Circuit is OPEN")
try:
result = await coro_fn(*args)
if self._state == State.HALF_OPEN:
self._state = State.CLOSED
self._failures = 0
return result
except Exception as e:
self._failures += 1
if self._failures >= self._threshold:
self._state = State.OPEN
self._opened_at = time.monotonic()
print(f"Circuit OPEN after {self._failures} failures")
raise
async def main():
cb = CircuitBreaker(threshold=3, timeout=0.05)
call_count = [0]
async def flaky_service():
call_count[0] += 1
if call_count[0] <= 3:
raise ConnectionError("service down")
return "ok"
# Trigger failures
for _ in range(3):
try:
await cb.call(flaky_service)
except Exception:
pass
# Now circuit is open
try:
await cb.call(flaky_service)
except RuntimeError as e:
print(f"Circuit OPEN after 3 failures")
# Wait for timeout
await asyncio.sleep(0.06)
print("Circuit HALF-OPEN after timeout")
# Recovery call
result = await cb.call(flaky_service)
print(f"Success after recovery")
asyncio.run(main())
Solution
import asyncio
import time
from enum import Enum
class State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half-open"
class CircuitBreaker:
def __init__(self, threshold=3, timeout=0.1):
self._threshold = threshold
self._timeout = timeout
self._failures = 0
self._state = State.CLOSED
self._opened_at = None
async def call(self, coro_fn, *args):
if self._state == State.OPEN:
elapsed = time.monotonic() - self._opened_at
if elapsed >= self._timeout:
self._state = State.HALF_OPEN
else:
raise RuntimeError("Circuit is OPEN")
try:
result = await coro_fn(*args)
if self._state == State.HALF_OPEN:
self._state = State.CLOSED
self._failures = 0
return result
except Exception as e:
self._failures += 1
if self._failures >= self._threshold:
self._state = State.OPEN
self._opened_at = time.monotonic()
print(f"Circuit OPEN after {self._failures} failures")
raise
async def main():
cb = CircuitBreaker(threshold=3, timeout=0.05)
call_count = [0]
async def flaky_service():
call_count[0] += 1
if call_count[0] <= 3:
raise ConnectionError("service down")
return "ok"
for _ in range(3):
try:
await cb.call(flaky_service)
except Exception:
pass
try:
await cb.call(flaky_service)
except RuntimeError:
print("Circuit OPEN after 3 failures")
await asyncio.sleep(0.06)
print("Circuit HALF-OPEN after timeout")
result = await cb.call(flaky_service)
print("Success after recovery")
asyncio.run(main())
Circuit breaker state machine:
- CLOSED: normal operation. Failures are counted; at threshold, transition to OPEN.
- OPEN: reject all requests fast. After timeout, transition to HALF_OPEN.
- HALF_OPEN: allow one test request. Success → CLOSED (reset failure count). Failure → back to OPEN.
- This pattern prevents "thundering herd" on a recovering service — if every request waits for a slow downstream, the service gets overwhelmed.
- Production extensions: per-exception-type failure counting, success threshold in HALF_OPEN before resetting, metrics export, and thread-safe lock if used in multi-threaded context.
Expected Output
Success: ok\nCircuit OPEN after 3 failures\nCircuit HALF-OPEN after timeout\nSuccess after recoveryHints
Hint 1: Circuit breaker states: CLOSED (normal), OPEN (rejecting requests), HALF-OPEN (testing recovery).
Hint 2: Count failures. After threshold failures, open the circuit. After timeout, move to HALF-OPEN to test.
Implement an async read-write lock that allows concurrent readers but exclusive writers.
import asyncio
class AsyncRWLock:
def __init__(self):
self._cond = asyncio.Condition()
self._readers = 0
self._writing = False
async def acquire_read(self):
async with self._cond:
await self._cond.wait_for(lambda: not self._writing)
self._readers += 1
async def release_read(self):
async with self._cond:
self._readers -= 1
if self._readers == 0:
self._cond.notify_all()
async def acquire_write(self):
async with self._cond:
await self._cond.wait_for(lambda: self._readers == 0 and not self._writing)
self._writing = True
async def release_write(self):
async with self._cond:
self._writing = False
self._cond.notify_all()
async def reader_task(lock, reader_id, active_readers):
await lock.acquire_read()
active_readers.add(reader_id)
await asyncio.sleep(0.02)
active_readers.discard(reader_id)
await lock.release_read()
async def writer_task(lock, writer_id, active_readers):
await lock.acquire_write()
assert len(active_readers) == 0, "Writer has exclusive access"
await asyncio.sleep(0.01)
await lock.release_write()
async def main():
lock = AsyncRWLock()
active_readers = set()
await asyncio.gather(
reader_task(lock, 1, active_readers),
reader_task(lock, 2, active_readers),
reader_task(lock, 3, active_readers),
writer_task(lock, 1, active_readers),
)
print("Multiple concurrent readers; writer gets exclusive access")
asyncio.run(main())
Solution
import asyncio
class AsyncRWLock:
def __init__(self):
self._cond = asyncio.Condition()
self._readers = 0
self._writing = False
async def acquire_read(self):
async with self._cond:
await self._cond.wait_for(lambda: not self._writing)
self._readers += 1
async def release_read(self):
async with self._cond:
self._readers -= 1
if self._readers == 0:
self._cond.notify_all()
async def acquire_write(self):
async with self._cond:
await self._cond.wait_for(lambda: self._readers == 0 and not self._writing)
self._writing = True
async def release_write(self):
async with self._cond:
self._writing = False
self._cond.notify_all()
async def main():
lock = AsyncRWLock()
active_readers = set()
async def reader_task(reader_id):
await lock.acquire_read()
active_readers.add(reader_id)
await asyncio.sleep(0.02)
active_readers.discard(reader_id)
await lock.release_read()
async def writer_task(writer_id):
await lock.acquire_write()
assert len(active_readers) == 0
await asyncio.sleep(0.01)
await lock.release_write()
await asyncio.gather(
reader_task(1), reader_task(2), reader_task(3),
writer_task(1),
)
print("Multiple concurrent readers; writer gets exclusive access")
asyncio.run(main())
Read-write lock design:
- Readers can proceed concurrently as long as no writer holds the lock.
- A writer must wait until all readers finish and no other writer is active.
notify_all()after reader release and writer release wakes all waiters to re-check the predicate.- This implementation is "readers-preferring" — writers can starve if readers arrive continuously. A "writers-preferring" variant tracks waiting writers and blocks new readers when a writer is waiting.
- Real use case: read-heavy cache with occasional invalidation writes.
Expected Output
Multiple concurrent readers; writer gets exclusive accessHints
Hint 1: Multiple readers can hold the lock simultaneously. A writer needs exclusive access (no readers or other writers).
Hint 2: Use a Condition and a reader count. Writers wait until reader count is 0.
Implement an async priority queue where higher-priority items are dequeued first.
import asyncio
import heapq
class AsyncPriorityQueue:
def __init__(self):
self._heap = []
self._counter = 0
self._cond = asyncio.Condition()
async def put(self, priority, item):
async with self._cond:
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
self._cond.notify()
async def get(self):
async with self._cond:
await self._cond.wait_for(lambda: len(self._heap) > 0)
_, _, item = heapq.heappop(self._heap)
return item
async def main():
pq = AsyncPriorityQueue()
async def producer():
await pq.put(3, "LOW")
await pq.put(1, "CRITICAL")
await pq.put(2, "HIGH")
await pq.put(3, "NORMAL")
async def consumer():
results = []
for _ in range(4):
item = await pq.get()
results.append(item)
print(f"Items dequeued by priority: {', '.join(results)}")
await asyncio.gather(producer(), consumer())
asyncio.run(main())
Solution
import asyncio
import heapq
class AsyncPriorityQueue:
def __init__(self):
self._heap = []
self._counter = 0
self._cond = asyncio.Condition()
async def put(self, priority, item):
async with self._cond:
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
self._cond.notify()
async def get(self):
async with self._cond:
await self._cond.wait_for(lambda: len(self._heap) > 0)
_, _, item = heapq.heappop(self._heap)
return item
async def main():
pq = AsyncPriorityQueue()
async def producer():
await pq.put(3, "LOW")
await pq.put(1, "CRITICAL")
await pq.put(2, "HIGH")
await pq.put(3, "NORMAL")
async def consumer():
results = []
for _ in range(4):
item = await pq.get()
results.append(item)
print(f"Items dequeued by priority: {', '.join(results)}")
await asyncio.gather(producer(), consumer())
asyncio.run(main())
Priority queue design:
(priority, counter, item)tuples ensure that items with equal priority are dequeued FIFO (insertion order).heapqis a min-heap — lower numeric priority values are dequeued first. Use negative values for "max priority = max number" convention.- The
Conditionprovides both the mutual exclusion (for heap access) and the wait/notify mechanism (for blocking consumers). notify()wakes one consumer. Usenotify_all()if multiple consumers might be waiting.asyncio.PriorityQueuein the stdlib is similar but doesn't allow custom per-item priorities at runtime.
Expected Output
Items dequeued by priority: CRITICAL, HIGH, NORMAL, LOWHints
Hint 1: Use heapq internally. Wrap items as (priority, sequence, value) tuples for stable ordering.
Hint 2: Use a Condition to notify waiting consumers when an item is added.
Build a production-grade async resource pool with health checking and automatic eviction of stale resources.
import asyncio
import time
class Resource:
def __init__(self, resource_id):
self.id = resource_id
self.created_at = time.monotonic()
self.healthy = True
def check_health(self):
# Resources expire after 0.1s in this demo
return time.monotonic() - self.created_at < 0.1
class HealthyPool:
def __init__(self, size, factory):
self._size = size
self._factory = factory
self._available = asyncio.Queue()
self._sem = asyncio.Semaphore(size)
for i in range(size):
self._available.put_nowait(factory(i))
async def checkout(self):
await self._sem.acquire()
resource = await self._available.get()
if not resource.check_health():
resource = self._factory(resource.id)
return resource
async def checkin(self, resource):
await self._available.put(resource)
self._sem.release()
async def main():
pool = HealthyPool(size=2, factory=Resource)
async def worker(worker_id):
resource = await pool.checkout()
await asyncio.sleep(0.05) # use resource for 50ms
await pool.checkin(resource)
return f"worker_{worker_id}_done"
results = await asyncio.gather(*[worker(i) for i in range(4)])
print(f"Pool managed {len(results)} requests with health checks")
asyncio.run(main())
Solution
import asyncio
import time
class Resource:
def __init__(self, resource_id):
self.id = resource_id
self.created_at = time.monotonic()
self.healthy = True
def check_health(self):
return time.monotonic() - self.created_at < 0.1
class HealthyPool:
def __init__(self, size, factory):
self._size = size
self._factory = factory
self._available = asyncio.Queue()
self._sem = asyncio.Semaphore(size)
for i in range(size):
self._available.put_nowait(factory(i))
async def checkout(self):
await self._sem.acquire()
resource = await self._available.get()
if not resource.check_health():
resource = self._factory(resource.id)
return resource
async def checkin(self, resource):
await self._available.put(resource)
self._sem.release()
async def main():
pool = HealthyPool(size=2, factory=Resource)
async def worker(worker_id):
resource = await pool.checkout()
await asyncio.sleep(0.05)
await pool.checkin(resource)
return f"worker_{worker_id}_done"
results = await asyncio.gather(*[worker(i) for i in range(4)])
print(f"Pool managed {len(results)} requests with health checks")
asyncio.run(main())
Healthy pool design:
- Semaphore bounds total checkouts; queue holds available resources.
- Health check on checkout: evict stale resources before returning them to the caller.
- Production extensions: async health check (e.g.,
await conn.ping()), background eviction loop, min/max pool sizes, connection retry on eviction, metrics (pool size, wait time, eviction rate). - The checkin always returns the resource to the pool — even unhealthy ones (the next checkout evicts them). Alternatively, close unhealthy resources immediately and create a fresh one.
- Thread safety: this implementation is safe for asyncio (single-threaded) but not for multi-threaded access.
Expected Output
See solution — pool manages resource health and evicts stale resourcesHints
Hint 1: Track each resource's last use time. On checkout, validate health; evict and replace if stale.
Hint 2: Use a background task to periodically check all idle resources.
