Skip to main content

Python Locks, Semaphores, Practice Problems & Exercises

Practice: Locks, Semaphores, and Synchronization

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1Basic Lock UsageEasy
Lockacquirereleasecontext manager

Implement a thread-safe bank account with deposit, withdraw, and balance operations protected by a Lock.

Solution:

import threading

class BankAccount:
def __init__(self, initial=0):
self.balance = initial
self._lock = threading.Lock()

def deposit(self, amount):
with self._lock:
self.balance += amount

def withdraw(self, amount):
with self._lock:
if self.balance < amount:
raise ValueError(f'Insufficient: need {amount}, have {self.balance}')
self.balance -= amount

def get_balance(self):
with self._lock:
return self.balance
import threading

# TODO: Implement a thread-safe bank account using threading.Lock.
# deposit(amount) and withdraw(amount) must be atomic.
# get_balance() reads atomically.

class BankAccount:
  def __init__(self, initial=0):
      self.balance = initial
      # Add lock here

  def deposit(self, amount):
      pass

  def withdraw(self, amount):
      # Raise ValueError if insufficient funds
      pass

  def get_balance(self):
      pass

def test_account():
  account = BankAccount(1000)
  errors = []

  def worker():
      try:
          account.deposit(100)
          account.withdraw(50)
      except ValueError as e:
          errors.append(str(e))

  threads = [threading.Thread(target=worker) for _ in range(10)]
  for t in threads: t.start()
  for t in threads: t.join()

  return account.get_balance(), len(errors)

balance, err_count = test_account()
print(f'Balance: {balance}')  # 1000 + 10*100 - 10*50 = 1500
print(f'Errors: {err_count}')
Expected Output
Balance: 1500
Errors: 0
Hints

Hint 1: self._lock = threading.Lock() in __init__.

Hint 2: Use 'with self._lock:' as a context manager — automatically acquires and releases.

Hint 3: Never hold a lock longer than necessary — only wrap the critical section.


#2RLock — Reentrant LockEasy
RLockreentrantnested locking

Use threading.RLock to safely handle nested/recursive locking without deadlock.

Solution:

import threading

class TreeNode:
def __init__(self, value):
self.value = value
self.children = []
self._lock = threading.RLock()

def add_child(self, node):
with self._lock:
self.children.append(node)

def get_subtree_sum(self):
with self._lock:
total = self.value
for child in self.children:
total += child.get_subtree_sum()
return total

def test_rlock():
root = TreeNode(1)
root.children = [TreeNode(2), TreeNode(3)]
root.children[0].children = [TreeNode(4), TreeNode(5)]
return root.get_subtree_sum()

print(test_rlock())
import threading

# A regular Lock deadlocks if the same thread tries to acquire it twice.
# RLock (Reentrant Lock) allows a thread to acquire the same lock multiple times.

class TreeNode:
  def __init__(self, value):
      self.value = value
      self.children = []
      # TODO: Use RLock so nested method calls don't deadlock

  def add_child(self, node):
      # Acquires lock, then calls a method that also acquires the lock
      pass

  def get_subtree_sum(self):
      # Acquires lock, calls itself recursively (needs RLock)
      pass

def test_rlock():
  root = TreeNode(1)
  root.children = [TreeNode(2), TreeNode(3)]
  root.children[0].children = [TreeNode(4), TreeNode(5)]

  # This would deadlock with a regular Lock
  total = root.get_subtree_sum()
  return total

print(test_rlock())  # 1+2+3+4+5 = 15
Expected Output
15
Hints

Hint 1: self._lock = threading.RLock() — same interface as Lock but reentrant.

Hint 2: The same thread can call acquire() multiple times; each must have a matching release().

Hint 3: Using 'with self._lock:' handles both acquire and release correctly.


#3Semaphore — Resource PoolEasy
Semaphoreresource poolbounded

Use a Semaphore to limit concurrent access to a shared resource pool.

Solution:

def test_semaphore():
sem = threading.Semaphore(MAX_CONNECTIONS)
threads = [
threading.Thread(target=use_connection, args=(sem, i))
for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
import threading
import time

# Semaphore limits concurrent access to N units of a resource.
# Classic use: connection pool — at most N connections open simultaneously.

MAX_CONNECTIONS = 3
active = [0]
max_active = [0]
lock = threading.Lock()

def use_connection(sem, task_id):
  with sem:
      with lock:
          active[0] += 1
          max_active[0] = max(max_active[0], active[0])
      time.sleep(0.05)
      with lock:
          active[0] -= 1

# TODO: Create a Semaphore(3) and run 10 tasks.
# Verify max_active never exceeds 3.

def test_semaphore():
  # Create semaphore and run 10 tasks
  pass

test_semaphore()
print(f'Max concurrent: {max_active[0]}')
print(f'Within limit: {max_active[0] <= MAX_CONNECTIONS}')
Expected Output
Max concurrent: 3
Within limit: True
Hints

Hint 1: sem = threading.Semaphore(3) allows up to 3 concurrent acquisitions.

Hint 2: Create 10 threads passing sem and task_id, start and join all.

Hint 3: The Semaphore blocks any thread beyond the 3rd until one is released.


#4Event — One-Shot SignalEasy
Eventsetwaitis_set

Use threading.Event to coordinate multiple worker threads with a one-shot start signal.

Solution:

def test_event():
results.clear()
start_event.clear()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
time.sleep(0.1)
start_event.set()
for t in threads:
t.join()
return results
import threading
import time

# threading.Event allows one thread to signal others.
# Multiple threads can wait for a single event to fire.

start_event = threading.Event()
results = []
lock = threading.Lock()

def worker(worker_id):
  start_event.wait()  # Block until event is set
  with lock:
      results.append(worker_id)

# TODO: Start 5 workers that wait on start_event.
# After a 0.1s delay, set the event to release all workers.
# Return the results (all 5 worker_ids should be present).

def test_event():
  results.clear()
  # Start workers, sleep, set event, join, return results
  pass

workers_ran = test_event()
print(f'Workers ran: {len(workers_ran)}')
print(f'All ids present: {sorted(workers_ran) == list(range(5))}')
Expected Output
Workers ran: 5
All ids present: True
Hints

Hint 1: event.wait() blocks until event.set() is called.

Hint 2: event.set() releases ALL waiting threads simultaneously.

Hint 3: event.clear() resets it to unset state for reuse.


#5Condition Variable — Producer/ConsumerMedium
Conditionwaitnotifynotify_all

Implement a bounded buffer using Condition variables for producer-consumer synchronization.

Solution:

class BoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.cond = threading.Condition()

def put(self, item):
with self.cond:
while len(self.buffer) >= self.capacity:
self.cond.wait()
self.buffer.append(item)
self.cond.notify()

def get(self):
with self.cond:
while len(self.buffer) == 0:
self.cond.wait()
item = self.buffer.pop(0)
self.cond.notify()
return item
import threading
import time

# Condition variables allow threads to wait for a condition to become true.

class BoundedBuffer:
  def __init__(self, capacity):
      self.buffer = []
      self.capacity = capacity
      self.cond = threading.Condition()

  def put(self, item):
      # Block if buffer is full
      pass

  def get(self):
      # Block if buffer is empty
      pass

def test_bounded_buffer():
  buf = BoundedBuffer(3)
  produced = []
  consumed = []

  def producer():
      for i in range(8):
          buf.put(i)
          produced.append(i)
          time.sleep(0.01)

  def consumer():
      for _ in range(8):
          item = buf.get()
          consumed.append(item)

  p = threading.Thread(target=producer)
  c = threading.Thread(target=consumer)
  p.start(); c.start()
  p.join(); c.join()

  return sorted(produced), sorted(consumed)

produced, consumed = test_bounded_buffer()
print(f'Produced: {produced}')
print(f'Consumed: {consumed}')
print(f'Match: {produced == consumed}')
Expected Output
Produced: [0, 1, 2, 3, 4, 5, 6, 7]
Consumed: [0, 1, 2, 3, 4, 5, 6, 7]
Match: True
Hints

Hint 1: with self.cond: while len(self.buffer) >= self.capacity: self.cond.wait()

Hint 2: After putting an item: self.cond.notify() wakes one waiting consumer.

Hint 3: get(): wait while buffer is empty, pop an item, notify a waiting producer.


#6Barrier — Phased ComputationMedium
Barrierwaitphasessynchronization point

Use threading.Barrier to synchronize phases of a parallel computation, ensuring all workers complete phase 1 before any begin phase 2.

Solution:

def test_barrier():
phase_log.clear()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(NTHREADS)]
for t in threads: t.start()
for t in threads: t.join()
return phase_log
import threading
import time

# Barrier synchronizes N threads at a checkpoint.
# All threads must reach the barrier before any can proceed.

NTHREADS = 4
barrier = threading.Barrier(NTHREADS)
phase_log = []
lock = threading.Lock()

def worker(worker_id):
  # Phase 1: parallel work
  time.sleep(worker_id * 0.01)
  with lock:
      phase_log.append(f'w{worker_id}_phase1')

  barrier.wait()  # Wait for all workers to finish phase 1

  # Phase 2: all workers proceed together
  with lock:
      phase_log.append(f'w{worker_id}_phase2')

# TODO: Start 4 worker threads and join them.
# Verify all phase1 entries appear before any phase2 entries.

def test_barrier():
  phase_log.clear()
  threads = [threading.Thread(target=worker, args=(i,)) for i in range(NTHREADS)]
  for t in threads: t.start()
  for t in threads: t.join()
  return phase_log

log = test_barrier()
phase1_done = [e for e in log if 'phase1' in e]
phase2_done = [e for e in log if 'phase2' in e]
print(f'Phase 1 count: {len(phase1_done)}')
print(f'Phase 2 count: {len(phase2_done)}')
# All phase1 should precede all phase2 in the log
last_phase1_idx = max(log.index(e) for e in phase1_done)
first_phase2_idx = min(log.index(e) for e in phase2_done)
print(f'Phase order correct: {last_phase1_idx < first_phase2_idx}')
Expected Output
Phase 1 count: 4
Phase 2 count: 4
Phase order correct: True
Hints

Hint 1: barrier.wait() blocks the calling thread until all N threads have called wait().

Hint 2: Once the last thread calls wait(), all are released simultaneously.

Hint 3: This guarantees phase1 of ALL workers completes before ANY worker enters phase2.


#7BoundedSemaphore — Prevent Over-ReleaseMedium
BoundedSemaphoreover-releasesafety

Use BoundedSemaphore to implement a resource pool with protection against double-release bugs.

Solution:

The ResourcePool implementation above is the solution — it uses BoundedSemaphore for the acquire/release logic, catching ValueError on over-release.

# Key insight: BoundedSemaphore raises ValueError on over-release,
# which Semaphore silently allows. For resource pools, BoundedSemaphore
# is the correct choice as it enforces the invariant.
import threading

# BoundedSemaphore raises ValueError if released more times than acquired.
# This catches bugs where release() is called without acquire().

def test_bounded_vs_unbounded():
  # Regular Semaphore — can be released more times than acquired
  sem = threading.Semaphore(1)
  sem.release()  # Over-release — goes to 2, no error!
  sem.release()  # Goes to 3
  print(f'Semaphore value after 2 extra releases: >= 1 (no error)')

  # BoundedSemaphore — raises ValueError on over-release
  bsem = threading.BoundedSemaphore(1)
  try:
      bsem.release()  # This should raise ValueError
      print('No error raised (unexpected)')
  except ValueError as e:
      print(f'BoundedSemaphore caught: {e}')

# TODO: Create a resource pool using BoundedSemaphore.
# The pool has N=2 slots.
# Simulate acquiring and releasing resources.
# Attempt an illegal extra release and catch it.

class ResourcePool:
  def __init__(self, n):
      self._sem = threading.BoundedSemaphore(n)
      self._n = n

  def acquire(self):
      return self._sem.acquire(blocking=False)

  def release(self):
      try:
          self._sem.release()
          return True
      except ValueError:
          return False  # Already at max

pool = ResourcePool(2)
print('Acquire 1:', pool.acquire())
print('Acquire 2:', pool.acquire())
print('Acquire 3 (should fail):', pool.acquire())
print('Release 1:', pool.release())
print('Release 2:', pool.release())
print('Release 3 (over-release):', pool.release())
Expected Output
Acquire 1: True
Acquire 2: True
Acquire 3 (should fail): False
Release 1: True
Release 2: True
Release 3 (over-release): False
Hints

Hint 1: BoundedSemaphore(n).release() raises ValueError if it would exceed n.

Hint 2: acquire(blocking=False) returns False immediately if no slot is available.

Hint 3: This is safer than Semaphore for resource pools — catches double-release bugs.


#8Timed Lock AcquisitionMedium
Lockacquire(timeout)deadlock avoidance

Use lock.acquire(timeout=...) to prevent deadlock when acquiring multiple locks, with retry logic.

Solution:

import random

def grab_both_locks_safe(name, results, max_retries=10):
for _ in range(max_retries):
got_a = lock_a.acquire(timeout=0.05)
if not got_a:
time.sleep(random.uniform(0, 0.01))
continue
got_b = lock_b.acquire(timeout=0.05)
if not got_b:
lock_a.release()
time.sleep(random.uniform(0, 0.01))
continue
results.append(name)
lock_b.release()
lock_a.release()
return
import threading
import time

# TODO: Implement a deadlock-safe resource grabber.
# Two threads each need both resources (lock_a and lock_b).
# Without timeout, they can deadlock.
# With acquire(timeout=...), detect the failure and retry.

lock_a = threading.Lock()
lock_b = threading.Lock()

def grab_both_locks_unsafe(name, results):
  # This can deadlock: thread1 holds a, waits for b;
  # thread2 holds b, waits for a.
  lock_a.acquire()
  time.sleep(0.05)
  lock_b.acquire()
  results.append(name)
  lock_b.release()
  lock_a.release()

def grab_both_locks_safe(name, results, max_retries=10):
  # TODO: Use acquire(timeout=0.01) to avoid deadlock.
  # Retry if acquisition fails.
  pass

def test_safe():
  results = []
  t1 = threading.Thread(target=grab_both_locks_safe, args=('Alice', results))
  t2 = threading.Thread(target=grab_both_locks_safe, args=('Bob', results))
  t1.start(); t2.start()
  t1.join(timeout=5); t2.join(timeout=5)
  return sorted(results)

result = test_safe()
print(f'Both completed: {result}')
Expected Output
Both completed: ['Alice', 'Bob']
Hints

Hint 1: lock.acquire(timeout=0.1) returns True on success, False on timeout.

Hint 2: If acquire_b fails, release lock_a and retry from the beginning.

Hint 3: Add random jitter (time.sleep(random.uniform(0, 0.02))) to avoid livelock.


#9Read-Write Lock with PriorityHard
RWLockwriter prioritystarvationfairness

Build a writer-priority RWLock that prevents writer starvation while maintaining read-write exclusion.

Solution:

class WriterPriorityRWLock:
def __init__(self):
self._lock = threading.Lock()
self._read_ready = threading.Condition(self._lock)
self._readers = 0
self._writers_waiting = 0
self._writing = False

def acquire_read(self):
with self._lock:
while self._writing or self._writers_waiting > 0:
self._read_ready.wait()
self._readers += 1

def release_read(self):
with self._lock:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()

def acquire_write(self):
with self._lock:
self._writers_waiting += 1
while self._readers > 0 or self._writing:
self._read_ready.wait()
self._writers_waiting -= 1
self._writing = True

def release_write(self):
with self._lock:
self._writing = False
self._read_ready.notify_all()
import threading
import time

# A simple RWLock can starve writers if readers keep arriving.
# This problem builds a writer-priority RWLock.

class WriterPriorityRWLock:
  def __init__(self):
      self._lock = threading.Lock()
      self._read_ready = threading.Condition(self._lock)
      self._readers = 0
      self._writers_waiting = 0
      self._writing = False

  def acquire_read(self):
      with self._lock:
          while self._writing or self._writers_waiting > 0:
              self._read_ready.wait()
          self._readers += 1

  def release_read(self):
      with self._lock:
          self._readers -= 1
          if self._readers == 0:
              self._read_ready.notify_all()

  def acquire_write(self):
      pass

  def release_write(self):
      pass

def test_writer_priority():
  rw = WriterPriorityRWLock()
  log = []
  lock = threading.Lock()

  def reader(rid):
      rw.acquire_read()
      with lock:
          log.append(f'R{rid}_start')
      time.sleep(0.02)
      with lock:
          log.append(f'R{rid}_end')
      rw.release_read()

  def writer(wid):
      rw.acquire_write()
      with lock:
          log.append(f'W{wid}_start')
      time.sleep(0.02)
      with lock:
          log.append(f'W{wid}_end')
      rw.release_write()

  threads = [
      threading.Thread(target=reader, args=(0,)),
      threading.Thread(target=reader, args=(1,)),
      threading.Thread(target=writer, args=(0,)),
  ]
  for t in threads: t.start()
  for t in threads: t.join()
  return log

log = test_writer_priority()
print('Log:', log)
print('No concurrent writes and reads:', True)
Expected Output
Log: [...]
No concurrent writes and reads: True
Hints

Hint 1: acquire_write(): increment writers_waiting, wait while readers > 0 or writing, decrement writers_waiting, set writing = True.

Hint 2: release_write(): set writing = False, notify_all() to wake waiting readers and writers.

Hint 3: Writers waiting block new readers (writers_waiting > 0 check in acquire_read).


#10Recursive Mutex ImplementationHard
recursive mutexthread identityreentrant

Implement a recursive mutex from scratch that allows the owning thread to re-acquire without deadlocking.

Solution:

class RecursiveMutex:
def __init__(self):
self._lock = threading.Condition()
self._owner = None
self._count = 0

def acquire(self):
current = threading.current_thread()
with self._lock:
if self._owner == current:
self._count += 1
return
while self._owner is not None:
self._lock.wait()
self._owner = current
self._count = 1

def release(self):
current = threading.current_thread()
with self._lock:
if self._owner != current:
raise RuntimeError("release() called by non-owner")
self._count -= 1
if self._count == 0:
self._owner = None
self._lock.notify()

def __enter__(self):
self.acquire()
return self

def __exit__(self, *args):
self.release()
import threading

# Build a recursive mutex from scratch using basic primitives.
# It tracks the owning thread and an acquisition count.

class RecursiveMutex:
  def __init__(self):
      self._lock = threading.Lock()
      self._owner = None
      self._count = 0
      self._wait_lock = threading.Condition(threading.Lock())

  def acquire(self):
      pass

  def release(self):
      pass

  def __enter__(self):
      self.acquire()
      return self

  def __exit__(self, *args):
      self.release()

def test_recursive_mutex():
  mutex = RecursiveMutex()
  results = []

  def recursive_work(depth):
      with mutex:
          results.append(f'enter_{depth}')
          if depth > 0:
              recursive_work(depth - 1)
          results.append(f'exit_{depth}')

  recursive_work(3)
  return results

log = test_recursive_mutex()
print(log)
Expected Output
['enter_3', 'enter_2', 'enter_1', 'enter_0', 'exit_0', 'exit_1', 'exit_2', 'exit_3']
Hints

Hint 1: If current thread == owner, increment count and return immediately.

Hint 2: Otherwise, wait until _owner is None, then set _owner = current_thread and count = 1.

Hint 3: release(): decrement count; if count == 0, set _owner = None and notify.


#11Semaphore-Based Rate LimiterHard
Semaphorerate limitertoken bucketthreading.Timer

Build a thread-safe rate limiter using a Semaphore and a background refill thread (token bucket algorithm).

Solution:

import threading
import time

class SemaphoreRateLimiter:
def __init__(self, rate, burst):
self.rate = rate
self.burst = burst
self._sem = threading.Semaphore(burst)
self._running = True
self._refill_thread = None
self._lock = threading.Lock()
self._available = burst

def _refill(self):
interval = 1.0 / self.rate
while self._running:
time.sleep(interval)
with self._lock:
if self._available < self.burst:
self._available += 1
self._sem.release()

def start(self):
self._refill_thread = threading.Thread(target=self._refill, daemon=True)
self._refill_thread.start()

def acquire(self, timeout=5.0):
got = self._sem.acquire(timeout=timeout)
if got:
with self._lock:
self._available = max(0, self._available - 1)
return got

def stop(self):
self._running = False
import threading
import time

# Build a rate limiter using Semaphore + Timer.
# Tokens are consumed on each request and refilled at a fixed rate.

class SemaphoreRateLimiter:
  def __init__(self, rate, burst):
      self.rate = rate    # tokens per second
      self.burst = burst  # max tokens (bucket size)
      self._sem = threading.Semaphore(burst)
      self._running = True
      self._refill_thread = None

  def start(self):
      # TODO: Start a background thread that refills tokens at self.rate/sec
      pass

  def acquire(self, timeout=5.0):
      return self._sem.acquire(timeout=timeout)

  def stop(self):
      self._running = False
      if self._refill_thread:
          self._refill_thread.join()

def test_rate_limiter():
  limiter = SemaphoreRateLimiter(rate=5, burst=5)  # 5 requests/sec, burst of 5
  limiter.start()

  times = []
  for _ in range(10):
      if limiter.acquire():
          times.append(time.perf_counter())

  limiter.stop()

  if len(times) >= 2:
      gaps = [times[i+1] - times[i] for i in range(len(times)-1)]
      avg_gap = sum(gaps) / len(gaps)
      return len(times), avg_gap
  return len(times), 0

count, avg_gap = test_rate_limiter()
print(f'Requests processed: {count}')
print(f'Rate controlled: {avg_gap >= 0.1}')  # At ~5/sec, gap >= 0.1s after burst
Expected Output
Requests processed: 10
Rate controlled: True
Hints

Hint 1: The refill thread runs a loop: time.sleep(1.0/rate); sem.release() (unless already at burst).

Hint 2: Use try/except ValueError to handle BoundedSemaphore over-release when already full.

Hint 3: The burst allows initial fast requests; the refill rate controls steady-state throughput.

© 2026 EngineersOfAI. All rights reserved.