Python Locks, Semaphores, Practice Problems & Exercises
Practice: Locks, Semaphores, and Synchronization
← Back to lessonImplement a thread-safe bank account with deposit, withdraw, and balance operations protected by a Lock.
Solution:
import threading
class BankAccount:
def __init__(self, initial=0):
self.balance = initial
self._lock = threading.Lock()
def deposit(self, amount):
with self._lock:
self.balance += amount
def withdraw(self, amount):
with self._lock:
if self.balance < amount:
raise ValueError(f'Insufficient: need {amount}, have {self.balance}')
self.balance -= amount
def get_balance(self):
with self._lock:
return self.balance
import threading
# TODO: Implement a thread-safe bank account using threading.Lock.
# deposit(amount) and withdraw(amount) must be atomic.
# get_balance() reads atomically.
class BankAccount:
def __init__(self, initial=0):
self.balance = initial
# Add lock here
def deposit(self, amount):
pass
def withdraw(self, amount):
# Raise ValueError if insufficient funds
pass
def get_balance(self):
pass
def test_account():
account = BankAccount(1000)
errors = []
def worker():
try:
account.deposit(100)
account.withdraw(50)
except ValueError as e:
errors.append(str(e))
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
return account.get_balance(), len(errors)
balance, err_count = test_account()
print(f'Balance: {balance}') # 1000 + 10*100 - 10*50 = 1500
print(f'Errors: {err_count}')
Expected Output
Balance: 1500
Errors: 0Hints
Hint 1: self._lock = threading.Lock() in __init__.
Hint 2: Use 'with self._lock:' as a context manager — automatically acquires and releases.
Hint 3: Never hold a lock longer than necessary — only wrap the critical section.
Use threading.RLock to safely handle nested/recursive locking without deadlock.
Solution:
import threading
class TreeNode:
def __init__(self, value):
self.value = value
self.children = []
self._lock = threading.RLock()
def add_child(self, node):
with self._lock:
self.children.append(node)
def get_subtree_sum(self):
with self._lock:
total = self.value
for child in self.children:
total += child.get_subtree_sum()
return total
def test_rlock():
root = TreeNode(1)
root.children = [TreeNode(2), TreeNode(3)]
root.children[0].children = [TreeNode(4), TreeNode(5)]
return root.get_subtree_sum()
print(test_rlock())
import threading
# A regular Lock deadlocks if the same thread tries to acquire it twice.
# RLock (Reentrant Lock) allows a thread to acquire the same lock multiple times.
class TreeNode:
def __init__(self, value):
self.value = value
self.children = []
# TODO: Use RLock so nested method calls don't deadlock
def add_child(self, node):
# Acquires lock, then calls a method that also acquires the lock
pass
def get_subtree_sum(self):
# Acquires lock, calls itself recursively (needs RLock)
pass
def test_rlock():
root = TreeNode(1)
root.children = [TreeNode(2), TreeNode(3)]
root.children[0].children = [TreeNode(4), TreeNode(5)]
# This would deadlock with a regular Lock
total = root.get_subtree_sum()
return total
print(test_rlock()) # 1+2+3+4+5 = 15
Expected Output
15Hints
Hint 1: self._lock = threading.RLock() — same interface as Lock but reentrant.
Hint 2: The same thread can call acquire() multiple times; each must have a matching release().
Hint 3: Using 'with self._lock:' handles both acquire and release correctly.
Use a Semaphore to limit concurrent access to a shared resource pool.
Solution:
def test_semaphore():
sem = threading.Semaphore(MAX_CONNECTIONS)
threads = [
threading.Thread(target=use_connection, args=(sem, i))
for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
import threading
import time
# Semaphore limits concurrent access to N units of a resource.
# Classic use: connection pool — at most N connections open simultaneously.
MAX_CONNECTIONS = 3
active = [0]
max_active = [0]
lock = threading.Lock()
def use_connection(sem, task_id):
with sem:
with lock:
active[0] += 1
max_active[0] = max(max_active[0], active[0])
time.sleep(0.05)
with lock:
active[0] -= 1
# TODO: Create a Semaphore(3) and run 10 tasks.
# Verify max_active never exceeds 3.
def test_semaphore():
# Create semaphore and run 10 tasks
pass
test_semaphore()
print(f'Max concurrent: {max_active[0]}')
print(f'Within limit: {max_active[0] <= MAX_CONNECTIONS}')
Expected Output
Max concurrent: 3
Within limit: TrueHints
Hint 1: sem = threading.Semaphore(3) allows up to 3 concurrent acquisitions.
Hint 2: Create 10 threads passing sem and task_id, start and join all.
Hint 3: The Semaphore blocks any thread beyond the 3rd until one is released.
Use threading.Event to coordinate multiple worker threads with a one-shot start signal.
Solution:
def test_event():
results.clear()
start_event.clear()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
time.sleep(0.1)
start_event.set()
for t in threads:
t.join()
return results
import threading
import time
# threading.Event allows one thread to signal others.
# Multiple threads can wait for a single event to fire.
start_event = threading.Event()
results = []
lock = threading.Lock()
def worker(worker_id):
start_event.wait() # Block until event is set
with lock:
results.append(worker_id)
# TODO: Start 5 workers that wait on start_event.
# After a 0.1s delay, set the event to release all workers.
# Return the results (all 5 worker_ids should be present).
def test_event():
results.clear()
# Start workers, sleep, set event, join, return results
pass
workers_ran = test_event()
print(f'Workers ran: {len(workers_ran)}')
print(f'All ids present: {sorted(workers_ran) == list(range(5))}')
Expected Output
Workers ran: 5
All ids present: TrueHints
Hint 1: event.wait() blocks until event.set() is called.
Hint 2: event.set() releases ALL waiting threads simultaneously.
Hint 3: event.clear() resets it to unset state for reuse.
Implement a bounded buffer using Condition variables for producer-consumer synchronization.
Solution:
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.cond = threading.Condition()
def put(self, item):
with self.cond:
while len(self.buffer) >= self.capacity:
self.cond.wait()
self.buffer.append(item)
self.cond.notify()
def get(self):
with self.cond:
while len(self.buffer) == 0:
self.cond.wait()
item = self.buffer.pop(0)
self.cond.notify()
return item
import threading
import time
# Condition variables allow threads to wait for a condition to become true.
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.cond = threading.Condition()
def put(self, item):
# Block if buffer is full
pass
def get(self):
# Block if buffer is empty
pass
def test_bounded_buffer():
buf = BoundedBuffer(3)
produced = []
consumed = []
def producer():
for i in range(8):
buf.put(i)
produced.append(i)
time.sleep(0.01)
def consumer():
for _ in range(8):
item = buf.get()
consumed.append(item)
p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start(); c.start()
p.join(); c.join()
return sorted(produced), sorted(consumed)
produced, consumed = test_bounded_buffer()
print(f'Produced: {produced}')
print(f'Consumed: {consumed}')
print(f'Match: {produced == consumed}')
Expected Output
Produced: [0, 1, 2, 3, 4, 5, 6, 7]
Consumed: [0, 1, 2, 3, 4, 5, 6, 7]
Match: TrueHints
Hint 1: with self.cond: while len(self.buffer) >= self.capacity: self.cond.wait()
Hint 2: After putting an item: self.cond.notify() wakes one waiting consumer.
Hint 3: get(): wait while buffer is empty, pop an item, notify a waiting producer.
Use threading.Barrier to synchronize phases of a parallel computation, ensuring all workers complete phase 1 before any begin phase 2.
Solution:
def test_barrier():
phase_log.clear()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(NTHREADS)]
for t in threads: t.start()
for t in threads: t.join()
return phase_log
import threading
import time
# Barrier synchronizes N threads at a checkpoint.
# All threads must reach the barrier before any can proceed.
NTHREADS = 4
barrier = threading.Barrier(NTHREADS)
phase_log = []
lock = threading.Lock()
def worker(worker_id):
# Phase 1: parallel work
time.sleep(worker_id * 0.01)
with lock:
phase_log.append(f'w{worker_id}_phase1')
barrier.wait() # Wait for all workers to finish phase 1
# Phase 2: all workers proceed together
with lock:
phase_log.append(f'w{worker_id}_phase2')
# TODO: Start 4 worker threads and join them.
# Verify all phase1 entries appear before any phase2 entries.
def test_barrier():
phase_log.clear()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(NTHREADS)]
for t in threads: t.start()
for t in threads: t.join()
return phase_log
log = test_barrier()
phase1_done = [e for e in log if 'phase1' in e]
phase2_done = [e for e in log if 'phase2' in e]
print(f'Phase 1 count: {len(phase1_done)}')
print(f'Phase 2 count: {len(phase2_done)}')
# All phase1 should precede all phase2 in the log
last_phase1_idx = max(log.index(e) for e in phase1_done)
first_phase2_idx = min(log.index(e) for e in phase2_done)
print(f'Phase order correct: {last_phase1_idx < first_phase2_idx}')
Expected Output
Phase 1 count: 4
Phase 2 count: 4
Phase order correct: TrueHints
Hint 1: barrier.wait() blocks the calling thread until all N threads have called wait().
Hint 2: Once the last thread calls wait(), all are released simultaneously.
Hint 3: This guarantees phase1 of ALL workers completes before ANY worker enters phase2.
Use BoundedSemaphore to implement a resource pool with protection against double-release bugs.
Solution:
The ResourcePool implementation above is the solution — it uses BoundedSemaphore for the acquire/release logic, catching ValueError on over-release.
# Key insight: BoundedSemaphore raises ValueError on over-release,
# which Semaphore silently allows. For resource pools, BoundedSemaphore
# is the correct choice as it enforces the invariant.
import threading
# BoundedSemaphore raises ValueError if released more times than acquired.
# This catches bugs where release() is called without acquire().
def test_bounded_vs_unbounded():
# Regular Semaphore — can be released more times than acquired
sem = threading.Semaphore(1)
sem.release() # Over-release — goes to 2, no error!
sem.release() # Goes to 3
print(f'Semaphore value after 2 extra releases: >= 1 (no error)')
# BoundedSemaphore — raises ValueError on over-release
bsem = threading.BoundedSemaphore(1)
try:
bsem.release() # This should raise ValueError
print('No error raised (unexpected)')
except ValueError as e:
print(f'BoundedSemaphore caught: {e}')
# TODO: Create a resource pool using BoundedSemaphore.
# The pool has N=2 slots.
# Simulate acquiring and releasing resources.
# Attempt an illegal extra release and catch it.
class ResourcePool:
def __init__(self, n):
self._sem = threading.BoundedSemaphore(n)
self._n = n
def acquire(self):
return self._sem.acquire(blocking=False)
def release(self):
try:
self._sem.release()
return True
except ValueError:
return False # Already at max
pool = ResourcePool(2)
print('Acquire 1:', pool.acquire())
print('Acquire 2:', pool.acquire())
print('Acquire 3 (should fail):', pool.acquire())
print('Release 1:', pool.release())
print('Release 2:', pool.release())
print('Release 3 (over-release):', pool.release())
Expected Output
Acquire 1: True
Acquire 2: True
Acquire 3 (should fail): False
Release 1: True
Release 2: True
Release 3 (over-release): FalseHints
Hint 1: BoundedSemaphore(n).release() raises ValueError if it would exceed n.
Hint 2: acquire(blocking=False) returns False immediately if no slot is available.
Hint 3: This is safer than Semaphore for resource pools — catches double-release bugs.
Use lock.acquire(timeout=...) to prevent deadlock when acquiring multiple locks, with retry logic.
Solution:
import random
def grab_both_locks_safe(name, results, max_retries=10):
for _ in range(max_retries):
got_a = lock_a.acquire(timeout=0.05)
if not got_a:
time.sleep(random.uniform(0, 0.01))
continue
got_b = lock_b.acquire(timeout=0.05)
if not got_b:
lock_a.release()
time.sleep(random.uniform(0, 0.01))
continue
results.append(name)
lock_b.release()
lock_a.release()
return
import threading
import time
# TODO: Implement a deadlock-safe resource grabber.
# Two threads each need both resources (lock_a and lock_b).
# Without timeout, they can deadlock.
# With acquire(timeout=...), detect the failure and retry.
lock_a = threading.Lock()
lock_b = threading.Lock()
def grab_both_locks_unsafe(name, results):
# This can deadlock: thread1 holds a, waits for b;
# thread2 holds b, waits for a.
lock_a.acquire()
time.sleep(0.05)
lock_b.acquire()
results.append(name)
lock_b.release()
lock_a.release()
def grab_both_locks_safe(name, results, max_retries=10):
# TODO: Use acquire(timeout=0.01) to avoid deadlock.
# Retry if acquisition fails.
pass
def test_safe():
results = []
t1 = threading.Thread(target=grab_both_locks_safe, args=('Alice', results))
t2 = threading.Thread(target=grab_both_locks_safe, args=('Bob', results))
t1.start(); t2.start()
t1.join(timeout=5); t2.join(timeout=5)
return sorted(results)
result = test_safe()
print(f'Both completed: {result}')
Expected Output
Both completed: ['Alice', 'Bob']Hints
Hint 1: lock.acquire(timeout=0.1) returns True on success, False on timeout.
Hint 2: If acquire_b fails, release lock_a and retry from the beginning.
Hint 3: Add random jitter (time.sleep(random.uniform(0, 0.02))) to avoid livelock.
Build a writer-priority RWLock that prevents writer starvation while maintaining read-write exclusion.
Solution:
class WriterPriorityRWLock:
def __init__(self):
self._lock = threading.Lock()
self._read_ready = threading.Condition(self._lock)
self._readers = 0
self._writers_waiting = 0
self._writing = False
def acquire_read(self):
with self._lock:
while self._writing or self._writers_waiting > 0:
self._read_ready.wait()
self._readers += 1
def release_read(self):
with self._lock:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
with self._lock:
self._writers_waiting += 1
while self._readers > 0 or self._writing:
self._read_ready.wait()
self._writers_waiting -= 1
self._writing = True
def release_write(self):
with self._lock:
self._writing = False
self._read_ready.notify_all()
import threading
import time
# A simple RWLock can starve writers if readers keep arriving.
# This problem builds a writer-priority RWLock.
class WriterPriorityRWLock:
def __init__(self):
self._lock = threading.Lock()
self._read_ready = threading.Condition(self._lock)
self._readers = 0
self._writers_waiting = 0
self._writing = False
def acquire_read(self):
with self._lock:
while self._writing or self._writers_waiting > 0:
self._read_ready.wait()
self._readers += 1
def release_read(self):
with self._lock:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
pass
def release_write(self):
pass
def test_writer_priority():
rw = WriterPriorityRWLock()
log = []
lock = threading.Lock()
def reader(rid):
rw.acquire_read()
with lock:
log.append(f'R{rid}_start')
time.sleep(0.02)
with lock:
log.append(f'R{rid}_end')
rw.release_read()
def writer(wid):
rw.acquire_write()
with lock:
log.append(f'W{wid}_start')
time.sleep(0.02)
with lock:
log.append(f'W{wid}_end')
rw.release_write()
threads = [
threading.Thread(target=reader, args=(0,)),
threading.Thread(target=reader, args=(1,)),
threading.Thread(target=writer, args=(0,)),
]
for t in threads: t.start()
for t in threads: t.join()
return log
log = test_writer_priority()
print('Log:', log)
print('No concurrent writes and reads:', True)
Expected Output
Log: [...]
No concurrent writes and reads: TrueHints
Hint 1: acquire_write(): increment writers_waiting, wait while readers > 0 or writing, decrement writers_waiting, set writing = True.
Hint 2: release_write(): set writing = False, notify_all() to wake waiting readers and writers.
Hint 3: Writers waiting block new readers (writers_waiting > 0 check in acquire_read).
Implement a recursive mutex from scratch that allows the owning thread to re-acquire without deadlocking.
Solution:
class RecursiveMutex:
def __init__(self):
self._lock = threading.Condition()
self._owner = None
self._count = 0
def acquire(self):
current = threading.current_thread()
with self._lock:
if self._owner == current:
self._count += 1
return
while self._owner is not None:
self._lock.wait()
self._owner = current
self._count = 1
def release(self):
current = threading.current_thread()
with self._lock:
if self._owner != current:
raise RuntimeError("release() called by non-owner")
self._count -= 1
if self._count == 0:
self._owner = None
self._lock.notify()
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
import threading
# Build a recursive mutex from scratch using basic primitives.
# It tracks the owning thread and an acquisition count.
class RecursiveMutex:
def __init__(self):
self._lock = threading.Lock()
self._owner = None
self._count = 0
self._wait_lock = threading.Condition(threading.Lock())
def acquire(self):
pass
def release(self):
pass
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
def test_recursive_mutex():
mutex = RecursiveMutex()
results = []
def recursive_work(depth):
with mutex:
results.append(f'enter_{depth}')
if depth > 0:
recursive_work(depth - 1)
results.append(f'exit_{depth}')
recursive_work(3)
return results
log = test_recursive_mutex()
print(log)
Expected Output
['enter_3', 'enter_2', 'enter_1', 'enter_0', 'exit_0', 'exit_1', 'exit_2', 'exit_3']Hints
Hint 1: If current thread == owner, increment count and return immediately.
Hint 2: Otherwise, wait until _owner is None, then set _owner = current_thread and count = 1.
Hint 3: release(): decrement count; if count == 0, set _owner = None and notify.
Build a thread-safe rate limiter using a Semaphore and a background refill thread (token bucket algorithm).
Solution:
import threading
import time
class SemaphoreRateLimiter:
def __init__(self, rate, burst):
self.rate = rate
self.burst = burst
self._sem = threading.Semaphore(burst)
self._running = True
self._refill_thread = None
self._lock = threading.Lock()
self._available = burst
def _refill(self):
interval = 1.0 / self.rate
while self._running:
time.sleep(interval)
with self._lock:
if self._available < self.burst:
self._available += 1
self._sem.release()
def start(self):
self._refill_thread = threading.Thread(target=self._refill, daemon=True)
self._refill_thread.start()
def acquire(self, timeout=5.0):
got = self._sem.acquire(timeout=timeout)
if got:
with self._lock:
self._available = max(0, self._available - 1)
return got
def stop(self):
self._running = False
import threading
import time
# Build a rate limiter using Semaphore + Timer.
# Tokens are consumed on each request and refilled at a fixed rate.
class SemaphoreRateLimiter:
def __init__(self, rate, burst):
self.rate = rate # tokens per second
self.burst = burst # max tokens (bucket size)
self._sem = threading.Semaphore(burst)
self._running = True
self._refill_thread = None
def start(self):
# TODO: Start a background thread that refills tokens at self.rate/sec
pass
def acquire(self, timeout=5.0):
return self._sem.acquire(timeout=timeout)
def stop(self):
self._running = False
if self._refill_thread:
self._refill_thread.join()
def test_rate_limiter():
limiter = SemaphoreRateLimiter(rate=5, burst=5) # 5 requests/sec, burst of 5
limiter.start()
times = []
for _ in range(10):
if limiter.acquire():
times.append(time.perf_counter())
limiter.stop()
if len(times) >= 2:
gaps = [times[i+1] - times[i] for i in range(len(times)-1)]
avg_gap = sum(gaps) / len(gaps)
return len(times), avg_gap
return len(times), 0
count, avg_gap = test_rate_limiter()
print(f'Requests processed: {count}')
print(f'Rate controlled: {avg_gap >= 0.1}') # At ~5/sec, gap >= 0.1s after burst
Expected Output
Requests processed: 10
Rate controlled: TrueHints
Hint 1: The refill thread runs a loop: time.sleep(1.0/rate); sem.release() (unless already at burst).
Hint 2: Use try/except ValueError to handle BoundedSemaphore over-release when already full.
Hint 3: The burst allows initial fast requests; the refill rate controls steady-state throughput.
