Skip to main content

Python Async Synchronization Patterns: Practice Problems & Exercises

Practice: Async Synchronization Patterns

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1asyncio.Lock — Mutual ExclusionEasy
asyncio.Lockmutual-exclusioncritical-section

Use asyncio.Lock to protect a shared counter from concurrent modification by multiple tasks.

import asyncio

async def increment(counter, lock, n):
for _ in range(n):
async with lock:
counter[0] += 1

async def main():
counter = [0]
lock = asyncio.Lock()
await asyncio.gather(
increment(counter, lock, 500),
increment(counter, lock, 500),
)
print(f"Final counter: {counter[0]}")

asyncio.run(main())
Solution
import asyncio

async def increment(counter, lock, n):
for _ in range(n):
async with lock:
counter[0] += 1

async def main():
counter = [0]
lock = asyncio.Lock()
await asyncio.gather(
increment(counter, lock, 500),
increment(counter, lock, 500),
)
print(f"Final counter: {counter[0]}")

asyncio.run(main())

Lock mechanics:

  • asyncio.Lock is a mutual exclusion primitive for async code. Only one coroutine holds it at a time.
  • async with lock: acquires on entry, releases on exit — even if an exception occurs.
  • In this example, the counter update is safe because only one coroutine executes the counter[0] += 1 line at a time.
  • Asyncio is single-threaded, so technically the counter would be correct without a lock in this specific case (no await inside the critical section). But as soon as the critical section includes an await, interleaving can corrupt state — the lock prevents that.
  • Always add a lock when a critical section contains an await.
Expected Output
Final counter: 1000
Hints

Hint 1: Without a lock, concurrent tasks reading and writing the same variable can interleave, causing lost updates.

Hint 2: Use async with lock: to protect the critical section.

#2asyncio.Semaphore — Bounded ConcurrencyEasy
asyncio.Semaphorebounded-concurrencyrate-limiting

Use a Semaphore(3) to limit concurrent "API calls" to at most 3 at a time.

import asyncio

async def api_call(task_id, semaphore, active):
async with semaphore:
active.add(task_id)
assert len(active) <= 3, f"Too many concurrent tasks: {len(active)}"
await asyncio.sleep(0.01)
active.discard(task_id)

async def main():
semaphore = asyncio.Semaphore(3)
active = set()
await asyncio.gather(*[api_call(i, semaphore, active) for i in range(10)])
print("At most 3 tasks running concurrently")

asyncio.run(main())
Solution
import asyncio

async def api_call(task_id, semaphore, active):
async with semaphore:
active.add(task_id)
assert len(active) <= 3, f"Too many concurrent tasks: {len(active)}"
await asyncio.sleep(0.01)
active.discard(task_id)

async def main():
semaphore = asyncio.Semaphore(3)
active = set()
await asyncio.gather(*[api_call(i, semaphore, active) for i in range(10)])
print("At most 3 tasks running concurrently")

asyncio.run(main())

Semaphore use cases:

  • Rate limiting API calls: limit to N concurrent requests to avoid overloading a downstream service.
  • Connection pool: limit to N active database connections.
  • Semaphore(1) is equivalent to Lock but allows more flexible semantics (e.g., releasing from a different coroutine).
  • Unlike a Lock, a Semaphore does not track ownership — any coroutine can release it.
  • asyncio.BoundedSemaphore(n) raises ValueError if you release more times than you acquired, preventing accidental over-release bugs.
Expected Output
At most 3 tasks running concurrently
Hints

Hint 1: asyncio.Semaphore(n) allows at most n coroutines to proceed past the acquire point simultaneously.

Hint 2: Use async with semaphore: to automatically acquire and release.

#3asyncio.Event — One-Shot SignalEasy
asyncio.Eventsignalwait-notify

Use asyncio.Event to signal a worker to start processing after an initialization step completes.

import asyncio

async def worker(ready_event):
print("Worker waiting for signal")
await ready_event.wait()
print("Worker received signal and proceeding")

async def initializer(ready_event):
await asyncio.sleep(0.02) # simulate startup
print("Signal sent")
ready_event.set()

async def main():
ready = asyncio.Event()
await asyncio.gather(
worker(ready),
initializer(ready),
)

asyncio.run(main())
Solution
import asyncio

async def worker(ready_event):
print("Worker waiting for signal")
await ready_event.wait()
print("Worker received signal and proceeding")

async def initializer(ready_event):
await asyncio.sleep(0.02)
print("Signal sent")
ready_event.set()

async def main():
ready = asyncio.Event()
await asyncio.gather(
worker(ready),
initializer(ready),
)

asyncio.run(main())

asyncio.Event uses:

  • One-shot broadcast: one set() wakes ALL waiters simultaneously (unlike a Lock, which wakes one).
  • Startup sequencing: wait for a service to be ready before processing requests.
  • Shutdown signaling: set the event on SIGINT to trigger graceful shutdown.
  • After set(), future wait() calls return immediately without suspending.
  • event.is_set() is a non-blocking check — useful for guard conditions.
  • event.clear() resets the event for reuse in the next cycle.
Expected Output
Worker waiting for signal\nSignal sent\nWorker received signal and proceeding
Hints

Hint 1: event.set() signals all waiters. event.wait() suspends until the event is set.

Hint 2: Once set, an Event stays set. Use event.clear() to reset it.


Medium

#4asyncio.Condition — Wait-For-PredicateMedium
asyncio.Conditionwait_forproducer-consumer

Use asyncio.Condition to implement a producer-consumer pattern where the consumer waits until items are available.

import asyncio
from collections import deque

async def producer(queue, condition, items):
for item in items:
await asyncio.sleep(0.01)
async with condition:
queue.append(item)
print(f"Producer added item: {item}")
condition.notify()

async def consumer(queue, condition, n):
for _ in range(n):
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.popleft()
print(f"Consumer got: {item}")

async def main():
queue = deque()
condition = asyncio.Condition()
print("Consumer waiting for items")
await asyncio.gather(
producer(queue, condition, ["A", "B"]),
consumer(queue, condition, 2),
)

asyncio.run(main())
Solution
import asyncio
from collections import deque

async def producer(queue, condition, items):
for item in items:
await asyncio.sleep(0.01)
async with condition:
queue.append(item)
print(f"Producer added item: {item}")
condition.notify()

async def consumer(queue, condition, n):
for _ in range(n):
print("Consumer waiting for items")
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.popleft()
print(f"Consumer got: {item}")

async def main():
queue = deque()
condition = asyncio.Condition()
await asyncio.gather(
producer(queue, condition, ["A", "B"]),
consumer(queue, condition, 2),
)

asyncio.run(main())

Condition variable mechanics:

  • Condition combines a Lock with a wait/notify mechanism.
  • async with condition: acquires the underlying lock.
  • condition.wait_for(pred) atomically releases the lock, suspends until pred() is True, then reacquires the lock.
  • condition.notify() wakes ONE waiter. notify_all() wakes all waiters.
  • Always check the predicate in a loop (or use wait_for) to handle spurious wakeups.
  • Use case: thread-safe queues, bounded buffers, pipeline flow control.
Expected Output
Consumer waiting for items\nProducer added item: A\nConsumer got: A\nProducer added item: B\nConsumer got: B
Hints

Hint 1: condition.wait_for(predicate) suspends until predicate() returns True.

Hint 2: Always acquire the condition before calling wait_for or notify.

#5Token Bucket Rate LimiterMedium
rate-limitertoken-bucketasyncioLock

Implement an async token bucket rate limiter that allows bursting up to max_tokens and refills at rate tokens per second.

import asyncio
import time

class TokenBucket:
def __init__(self, rate, max_tokens):
self._rate = rate
self._max = max_tokens
self._tokens = max_tokens
self._last = time.monotonic()
self._lock = asyncio.Lock()

async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self._max, self._tokens + elapsed * self._rate)
self._last = now
if self._tokens < 1:
wait = (1 - self._tokens) / self._rate
await asyncio.sleep(wait)
self._tokens = 0
else:
self._tokens -= 1

async def main():
limiter = TokenBucket(rate=5, max_tokens=10)
tasks = []
async def request(i):
await limiter.acquire()
await asyncio.gather(*[request(i) for i in range(15)])
print("All requests completed — rate limited to 5/s with burst of 10")

asyncio.run(main())
Solution
import asyncio
import time

class TokenBucket:
def __init__(self, rate, max_tokens):
self._rate = rate
self._max = max_tokens
self._tokens = max_tokens
self._last = time.monotonic()
self._lock = asyncio.Lock()

async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self._max, self._tokens + elapsed * self._rate)
self._last = now
if self._tokens < 1:
wait = (1 - self._tokens) / self._rate
await asyncio.sleep(wait)
self._tokens = 0
else:
self._tokens -= 1

async def main():
limiter = TokenBucket(rate=5, max_tokens=10)
async def request(i):
await limiter.acquire()
await asyncio.gather(*[request(i) for i in range(15)])
print("All requests completed — rate limited to 5/s with burst of 10")

asyncio.run(main())

Token bucket algorithm:

  • Tokens accumulate at rate per second, up to max_tokens — this is the burst capacity.
  • Each request consumes one token. If the bucket is empty, we sleep until a token is available.
  • The Lock ensures token accounting is atomic — no two coroutines compute the refill simultaneously.
  • await asyncio.sleep(wait) while holding the lock means other requesters queue up — this is intentional (serialized waiting).
  • For high-throughput scenarios, release the lock before sleeping and re-check after waking.
  • Token bucket vs leaky bucket: token bucket allows bursting (idle time accumulates capacity); leaky bucket enforces strictly constant rate.
Expected Output
All requests completed — rate limited to 5/s with burst of 10
Hints

Hint 1: A token bucket fills at a constant rate up to a maximum. Each request consumes one token.

Hint 2: If no tokens are available, sleep until the next refill time.

#6asyncio.Queue — Bounded Producer-ConsumerMedium
asyncio.Queuebounded-queueproducer-consumerbackpressure

Implement a bounded producer-consumer pipeline using asyncio.Queue with backpressure.

import asyncio

async def producer(queue, n):
for i in range(n):
await queue.put(i) # blocks if queue is full
print("Producer done")

async def consumer(queue):
while True:
item = await queue.get()
await asyncio.sleep(0.005) # simulate processing
queue.task_done()
if item == 9: # last item
break
print("Consumer done")

async def main():
queue = asyncio.Queue(maxsize=3) # at most 3 items buffered
await asyncio.gather(
producer(queue, 10),
consumer(queue),
)
print("All 10 items processed")

asyncio.run(main())
Solution
import asyncio

async def producer(queue, n):
for i in range(n):
await queue.put(i)
print("Producer done")

async def consumer(queue):
while True:
item = await queue.get()
await asyncio.sleep(0.005)
queue.task_done()
if item == 9:
break
print("Consumer done")

async def main():
queue = asyncio.Queue(maxsize=3)
await asyncio.gather(
producer(queue, 10),
consumer(queue),
)
print("All 10 items processed")

asyncio.run(main())

asyncio.Queue backpressure:

  • Queue(maxsize=3) blocks queue.put() when the queue has 3 items — the producer backs off automatically.
  • This is natural backpressure: if the consumer is slow, the producer slows down to match.
  • queue.task_done() signals that an item has been processed. queue.join() waits until all items are processed.
  • queue.put_nowait() / queue.get_nowait() raise QueueFull / QueueEmpty instead of blocking — use for non-blocking checks.
  • Production pattern: multiple workers call queue.get() concurrently for parallel processing. queue.join() in the main coroutine waits until all items are processed.
Expected Output
Producer done\nConsumer done\nAll 10 items processed
Hints

Hint 1: asyncio.Queue(maxsize) blocks the producer when full — this is natural backpressure.

Hint 2: queue.join() waits until all items have been processed (task_done() called for each).

#7asyncio.Barrier — Synchronization PointMedium
asyncio.Barrierrendezvoussynchronization-pointphases

Use asyncio.Barrier to synchronize three tasks at a checkpoint — all must arrive before any can continue.

import asyncio

async def worker(worker_id, barrier):
# Phase 1: setup
await asyncio.sleep(worker_id * 0.005)
print(f"Worker {worker_id} reached barrier")
await barrier.wait() # block until all workers arrive
print(f"Worker {worker_id} continuing")

async def main():
barrier = asyncio.Barrier(3)
await asyncio.gather(*[worker(i, barrier) for i in range(3)])
print("All workers continuing")

asyncio.run(main())
Solution
import asyncio

async def worker(worker_id, barrier):
await asyncio.sleep(worker_id * 0.005)
print(f"Worker {worker_id} reached barrier")
await barrier.wait()
print(f"Worker {worker_id} continuing")

async def main():
barrier = asyncio.Barrier(3)
await asyncio.gather(*[worker(i, barrier) for i in range(3)])
print("All workers continuing")

asyncio.run(main())

Barrier use cases:

  • Phased computation: all workers complete phase 1, synchronize, then all proceed to phase 2.
  • Distributed testing: all test processes reach a checkpoint before measuring results.
  • MapReduce-style: all mappers finish before any reducer starts.
  • barrier.wait() returns the number of parties that arrived at this round (0 means the barrier was broken).
  • barrier.broken is True if the barrier was broken (a waiting party was cancelled).
  • Barriers are reusable — after all parties pass, the counter resets for the next round.
Expected Output
All 3 workers reached barrier\nAll workers continuing
Hints

Hint 1: asyncio.Barrier(n) blocks until n coroutines have called barrier.wait(). Then all are released simultaneously.

Hint 2: Available in Python 3.11+.


Hard

#8Circuit Breaker PatternHard
circuit-breakerasyncioerror-handlingresilience

Implement an async circuit breaker that opens after a failure threshold and resets after a timeout.

import asyncio
import time
from enum import Enum

class State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half-open"

class CircuitBreaker:
def __init__(self, threshold=3, timeout=0.1):
self._threshold = threshold
self._timeout = timeout
self._failures = 0
self._state = State.CLOSED
self._opened_at = None

async def call(self, coro_fn, *args):
if self._state == State.OPEN:
elapsed = time.monotonic() - self._opened_at
if elapsed >= self._timeout:
self._state = State.HALF_OPEN
else:
raise RuntimeError("Circuit is OPEN")

try:
result = await coro_fn(*args)
if self._state == State.HALF_OPEN:
self._state = State.CLOSED
self._failures = 0
return result
except Exception as e:
self._failures += 1
if self._failures >= self._threshold:
self._state = State.OPEN
self._opened_at = time.monotonic()
print(f"Circuit OPEN after {self._failures} failures")
raise

async def main():
cb = CircuitBreaker(threshold=3, timeout=0.05)
call_count = [0]

async def flaky_service():
call_count[0] += 1
if call_count[0] <= 3:
raise ConnectionError("service down")
return "ok"

# Trigger failures
for _ in range(3):
try:
await cb.call(flaky_service)
except Exception:
pass

# Now circuit is open
try:
await cb.call(flaky_service)
except RuntimeError as e:
print(f"Circuit OPEN after 3 failures")

# Wait for timeout
await asyncio.sleep(0.06)
print("Circuit HALF-OPEN after timeout")

# Recovery call
result = await cb.call(flaky_service)
print(f"Success after recovery")

asyncio.run(main())
Solution
import asyncio
import time
from enum import Enum

class State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half-open"

class CircuitBreaker:
def __init__(self, threshold=3, timeout=0.1):
self._threshold = threshold
self._timeout = timeout
self._failures = 0
self._state = State.CLOSED
self._opened_at = None

async def call(self, coro_fn, *args):
if self._state == State.OPEN:
elapsed = time.monotonic() - self._opened_at
if elapsed >= self._timeout:
self._state = State.HALF_OPEN
else:
raise RuntimeError("Circuit is OPEN")

try:
result = await coro_fn(*args)
if self._state == State.HALF_OPEN:
self._state = State.CLOSED
self._failures = 0
return result
except Exception as e:
self._failures += 1
if self._failures >= self._threshold:
self._state = State.OPEN
self._opened_at = time.monotonic()
print(f"Circuit OPEN after {self._failures} failures")
raise

async def main():
cb = CircuitBreaker(threshold=3, timeout=0.05)
call_count = [0]

async def flaky_service():
call_count[0] += 1
if call_count[0] <= 3:
raise ConnectionError("service down")
return "ok"

for _ in range(3):
try:
await cb.call(flaky_service)
except Exception:
pass

try:
await cb.call(flaky_service)
except RuntimeError:
print("Circuit OPEN after 3 failures")

await asyncio.sleep(0.06)
print("Circuit HALF-OPEN after timeout")

result = await cb.call(flaky_service)
print("Success after recovery")

asyncio.run(main())

Circuit breaker state machine:

  • CLOSED: normal operation. Failures are counted; at threshold, transition to OPEN.
  • OPEN: reject all requests fast. After timeout, transition to HALF_OPEN.
  • HALF_OPEN: allow one test request. Success → CLOSED (reset failure count). Failure → back to OPEN.
  • This pattern prevents "thundering herd" on a recovering service — if every request waits for a slow downstream, the service gets overwhelmed.
  • Production extensions: per-exception-type failure counting, success threshold in HALF_OPEN before resetting, metrics export, and thread-safe lock if used in multi-threaded context.
Expected Output
Success: ok\nCircuit OPEN after 3 failures\nCircuit HALF-OPEN after timeout\nSuccess after recovery
Hints

Hint 1: Circuit breaker states: CLOSED (normal), OPEN (rejecting requests), HALF-OPEN (testing recovery).

Hint 2: Count failures. After threshold failures, open the circuit. After timeout, move to HALF-OPEN to test.

#9Read-Write LockHard
read-write-lockasyncio.Conditionconcurrent-readsexclusive-writes

Implement an async read-write lock that allows concurrent readers but exclusive writers.

import asyncio

class AsyncRWLock:
def __init__(self):
self._cond = asyncio.Condition()
self._readers = 0
self._writing = False

async def acquire_read(self):
async with self._cond:
await self._cond.wait_for(lambda: not self._writing)
self._readers += 1

async def release_read(self):
async with self._cond:
self._readers -= 1
if self._readers == 0:
self._cond.notify_all()

async def acquire_write(self):
async with self._cond:
await self._cond.wait_for(lambda: self._readers == 0 and not self._writing)
self._writing = True

async def release_write(self):
async with self._cond:
self._writing = False
self._cond.notify_all()

async def reader_task(lock, reader_id, active_readers):
await lock.acquire_read()
active_readers.add(reader_id)
await asyncio.sleep(0.02)
active_readers.discard(reader_id)
await lock.release_read()

async def writer_task(lock, writer_id, active_readers):
await lock.acquire_write()
assert len(active_readers) == 0, "Writer has exclusive access"
await asyncio.sleep(0.01)
await lock.release_write()

async def main():
lock = AsyncRWLock()
active_readers = set()
await asyncio.gather(
reader_task(lock, 1, active_readers),
reader_task(lock, 2, active_readers),
reader_task(lock, 3, active_readers),
writer_task(lock, 1, active_readers),
)
print("Multiple concurrent readers; writer gets exclusive access")

asyncio.run(main())
Solution
import asyncio

class AsyncRWLock:
def __init__(self):
self._cond = asyncio.Condition()
self._readers = 0
self._writing = False

async def acquire_read(self):
async with self._cond:
await self._cond.wait_for(lambda: not self._writing)
self._readers += 1

async def release_read(self):
async with self._cond:
self._readers -= 1
if self._readers == 0:
self._cond.notify_all()

async def acquire_write(self):
async with self._cond:
await self._cond.wait_for(lambda: self._readers == 0 and not self._writing)
self._writing = True

async def release_write(self):
async with self._cond:
self._writing = False
self._cond.notify_all()

async def main():
lock = AsyncRWLock()
active_readers = set()

async def reader_task(reader_id):
await lock.acquire_read()
active_readers.add(reader_id)
await asyncio.sleep(0.02)
active_readers.discard(reader_id)
await lock.release_read()

async def writer_task(writer_id):
await lock.acquire_write()
assert len(active_readers) == 0
await asyncio.sleep(0.01)
await lock.release_write()

await asyncio.gather(
reader_task(1), reader_task(2), reader_task(3),
writer_task(1),
)
print("Multiple concurrent readers; writer gets exclusive access")

asyncio.run(main())

Read-write lock design:

  • Readers can proceed concurrently as long as no writer holds the lock.
  • A writer must wait until all readers finish and no other writer is active.
  • notify_all() after reader release and writer release wakes all waiters to re-check the predicate.
  • This implementation is "readers-preferring" — writers can starve if readers arrive continuously. A "writers-preferring" variant tracks waiting writers and blocks new readers when a writer is waiting.
  • Real use case: read-heavy cache with occasional invalidation writes.
Expected Output
Multiple concurrent readers; writer gets exclusive access
Hints

Hint 1: Multiple readers can hold the lock simultaneously. A writer needs exclusive access (no readers or other writers).

Hint 2: Use a Condition and a reader count. Writers wait until reader count is 0.

#10Async Priority QueueHard
priority-queueasyncio.Conditionheapqscheduling

Implement an async priority queue where higher-priority items are dequeued first.

import asyncio
import heapq

class AsyncPriorityQueue:
def __init__(self):
self._heap = []
self._counter = 0
self._cond = asyncio.Condition()

async def put(self, priority, item):
async with self._cond:
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
self._cond.notify()

async def get(self):
async with self._cond:
await self._cond.wait_for(lambda: len(self._heap) > 0)
_, _, item = heapq.heappop(self._heap)
return item

async def main():
pq = AsyncPriorityQueue()

async def producer():
await pq.put(3, "LOW")
await pq.put(1, "CRITICAL")
await pq.put(2, "HIGH")
await pq.put(3, "NORMAL")

async def consumer():
results = []
for _ in range(4):
item = await pq.get()
results.append(item)
print(f"Items dequeued by priority: {', '.join(results)}")

await asyncio.gather(producer(), consumer())

asyncio.run(main())
Solution
import asyncio
import heapq

class AsyncPriorityQueue:
def __init__(self):
self._heap = []
self._counter = 0
self._cond = asyncio.Condition()

async def put(self, priority, item):
async with self._cond:
heapq.heappush(self._heap, (priority, self._counter, item))
self._counter += 1
self._cond.notify()

async def get(self):
async with self._cond:
await self._cond.wait_for(lambda: len(self._heap) > 0)
_, _, item = heapq.heappop(self._heap)
return item

async def main():
pq = AsyncPriorityQueue()

async def producer():
await pq.put(3, "LOW")
await pq.put(1, "CRITICAL")
await pq.put(2, "HIGH")
await pq.put(3, "NORMAL")

async def consumer():
results = []
for _ in range(4):
item = await pq.get()
results.append(item)
print(f"Items dequeued by priority: {', '.join(results)}")

await asyncio.gather(producer(), consumer())

asyncio.run(main())

Priority queue design:

  • (priority, counter, item) tuples ensure that items with equal priority are dequeued FIFO (insertion order).
  • heapq is a min-heap — lower numeric priority values are dequeued first. Use negative values for "max priority = max number" convention.
  • The Condition provides both the mutual exclusion (for heap access) and the wait/notify mechanism (for blocking consumers).
  • notify() wakes one consumer. Use notify_all() if multiple consumers might be waiting.
  • asyncio.PriorityQueue in the stdlib is similar but doesn't allow custom per-item priorities at runtime.
Expected Output
Items dequeued by priority: CRITICAL, HIGH, NORMAL, LOW
Hints

Hint 1: Use heapq internally. Wrap items as (priority, sequence, value) tuples for stable ordering.

Hint 2: Use a Condition to notify waiting consumers when an item is added.

#11Async Resource Pool with Health ChecksHard
resource-poolhealth-checkasyncio.Semaphoreasyncio.Queue

Build a production-grade async resource pool with health checking and automatic eviction of stale resources.

import asyncio
import time

class Resource:
def __init__(self, resource_id):
self.id = resource_id
self.created_at = time.monotonic()
self.healthy = True

def check_health(self):
# Resources expire after 0.1s in this demo
return time.monotonic() - self.created_at < 0.1

class HealthyPool:
def __init__(self, size, factory):
self._size = size
self._factory = factory
self._available = asyncio.Queue()
self._sem = asyncio.Semaphore(size)
for i in range(size):
self._available.put_nowait(factory(i))

async def checkout(self):
await self._sem.acquire()
resource = await self._available.get()
if not resource.check_health():
resource = self._factory(resource.id)
return resource

async def checkin(self, resource):
await self._available.put(resource)
self._sem.release()

async def main():
pool = HealthyPool(size=2, factory=Resource)

async def worker(worker_id):
resource = await pool.checkout()
await asyncio.sleep(0.05) # use resource for 50ms
await pool.checkin(resource)
return f"worker_{worker_id}_done"

results = await asyncio.gather(*[worker(i) for i in range(4)])
print(f"Pool managed {len(results)} requests with health checks")

asyncio.run(main())
Solution
import asyncio
import time

class Resource:
def __init__(self, resource_id):
self.id = resource_id
self.created_at = time.monotonic()
self.healthy = True

def check_health(self):
return time.monotonic() - self.created_at < 0.1

class HealthyPool:
def __init__(self, size, factory):
self._size = size
self._factory = factory
self._available = asyncio.Queue()
self._sem = asyncio.Semaphore(size)
for i in range(size):
self._available.put_nowait(factory(i))

async def checkout(self):
await self._sem.acquire()
resource = await self._available.get()
if not resource.check_health():
resource = self._factory(resource.id)
return resource

async def checkin(self, resource):
await self._available.put(resource)
self._sem.release()

async def main():
pool = HealthyPool(size=2, factory=Resource)

async def worker(worker_id):
resource = await pool.checkout()
await asyncio.sleep(0.05)
await pool.checkin(resource)
return f"worker_{worker_id}_done"

results = await asyncio.gather(*[worker(i) for i in range(4)])
print(f"Pool managed {len(results)} requests with health checks")

asyncio.run(main())

Healthy pool design:

  • Semaphore bounds total checkouts; queue holds available resources.
  • Health check on checkout: evict stale resources before returning them to the caller.
  • Production extensions: async health check (e.g., await conn.ping()), background eviction loop, min/max pool sizes, connection retry on eviction, metrics (pool size, wait time, eviction rate).
  • The checkin always returns the resource to the pool — even unhealthy ones (the next checkout evicts them). Alternatively, close unhealthy resources immediately and create a fresh one.
  • Thread safety: this implementation is safe for asyncio (single-threaded) but not for multi-threaded access.
Expected Output
See solution — pool manages resource health and evicts stale resources
Hints

Hint 1: Track each resource's last use time. On checkout, validate health; evict and replace if stale.

Hint 2: Use a background task to periodically check all idle resources.

© 2026 EngineersOfAI. All rights reserved.