Locks, Semaphores, and Synchronization
Reading time: ~35 minutes | Level: Intermediate → Engineering
Before reading further, predict what happens when this code runs:
import threading
lock = threading.Lock()
def transfer(amount):
with lock:
# ... do some work ...
log_transfer(amount) # calls acquire_lock() internally
def log_transfer(amount):
with lock: # same lock, same thread
print(f"Transferred: {amount}")
t = threading.Thread(target=transfer, args=(100,))
t.start()
t.join()
Most developers expect this to print Transferred: 100. Instead, the program hangs forever.
transfer acquires the lock. Then it calls log_transfer, which tries to acquire the same lock from the same thread. A threading.Lock cannot be acquired twice from the same thread - the second with lock: blocks waiting for the lock to be released, but the lock will never be released because the thread is stuck waiting for itself.
This is a self-deadlock - one of the most common synchronization bugs in threaded Python code.
What You Will Learn
- How
threading.Lockworks and why always use the context manager - What
threading.RLock(reentrant lock) solves and when you need it - What deadlock is, how it forms, and how to prevent it with lock ordering and timeouts
threading.SemaphoreandBoundedSemaphorefor limiting concurrent accessthreading.Eventfor signaling between threadsthreading.Conditionfor the producer-consumer pattern done correctlythreading.Barrierfor synchronizing N threads at a checkpoint- Asyncio equivalents:
asyncio.Lock,asyncio.Semaphore,asyncio.Event
Prerequisites
- Threading fundamentals (Module 08, Lesson 01)
- Race conditions (Module 08, Lesson 05)
- Comfortable reading
withstatements and context managers
Part 1 - threading.Lock: The Fundamental Primitive
A Lock is the most basic synchronization primitive. At any moment, at most one thread can hold it. All other threads trying to acquire it block until the holder releases it.
import threading
counter = 0
lock = threading.Lock()
def increment(n):
global counter
for _ in range(n):
lock.acquire()
counter += 1
lock.release()
threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # Always 500000 - race condition eliminated
Why Always Use the Context Manager
The manual acquire() / release() pattern above is dangerous. If an exception is raised between acquire and release, the lock is never released - every other thread that needs it will block forever.
# WRONG - lock leaked if exception occurs
lock.acquire()
result = risky_operation() # raises ValueError
lock.release() # never reached
# RIGHT - context manager guarantees release
with lock:
result = risky_operation() # exception? lock still released on exit
The with lock: statement calls lock.__enter__() (acquire) on entry and lock.__exit__() (release) on exit - even if an exception is raised. This is the only safe way to use a lock.
:::tip Always Use with lock: - Never Acquire/Release Manually
The context manager is not a convenience shorthand. It is the correct and safe API. Manual acquire()/release() exists for very specific low-level use cases. In production code, always write with lock:.
:::
Acquire with Timeout
Sometimes you want to attempt lock acquisition without blocking indefinitely:
import threading
lock = threading.Lock()
def try_update():
acquired = lock.acquire(timeout=2.0) # wait at most 2 seconds
if not acquired:
print("Could not acquire lock - giving up")
return
try:
# critical section
pass
finally:
lock.release()
# Or with context manager and timeout check:
def try_update_v2():
if lock.acquire(timeout=2.0):
with lock: # won't block - already acquired
pass
# Note: use try/finally with manual acquire for timeout pattern
The cleaner timeout pattern:
def try_update_clean():
acquired = lock.acquire(timeout=2.0)
if not acquired:
return False
try:
# critical section
return True
finally:
lock.release() # always release
Part 2 - threading.RLock: Reentrant Lock
An RLock (reentrant lock) can be acquired multiple times by the same thread without deadlocking. Internally it tracks which thread owns it and a recursion count. The lock is only fully released when the owning thread calls release() the same number of times it called acquire().
import threading
rlock = threading.RLock()
def transfer(amount):
with rlock:
# critical section
log_transfer(amount) # safe - same thread can re-enter
def log_transfer(amount):
with rlock: # same thread, same lock - no deadlock
print(f"Transferred: {amount}")
t = threading.Thread(target=transfer, args=(100,))
t.start()
t.join()
# Output: Transferred: 100
The recursion count inside the opening puzzle's scenario:
Thread calls transfer → rlock count = 1
transfer calls log_transfer → rlock count = 2
log_transfer exits → rlock count = 1
transfer exits → rlock count = 0 → lock released
When to Use RLock vs Lock
| Scenario | Use |
|---|---|
| Simple mutual exclusion, no nested calls | Lock - lower overhead |
| A method calls another method that needs the same lock | RLock |
| Recursive functions that need the lock at each level | RLock |
| You're not sure | RLock is safer, Lock is faster |
:::note Lock vs RLock Performance
Lock is slightly faster than RLock because it does not track ownership or count. For hot paths with millions of lock operations, prefer Lock. For most application code, the difference is negligible - correctness matters more.
:::
Part 3 - Deadlock: What It Is and How to Prevent It
A deadlock occurs when two or more threads are each waiting for a lock held by the other - a circular dependency with no resolution.
The Classic Two-Thread Deadlock
import threading
import time
lock_a = threading.Lock()
lock_b = threading.Lock()
def thread_one():
with lock_a:
print("Thread 1: acquired lock_a")
time.sleep(0.1) # gives Thread 2 time to acquire lock_b
print("Thread 1: waiting for lock_b ...")
with lock_b: # BLOCKS - Thread 2 holds lock_b
print("Thread 1: acquired lock_b")
def thread_two():
with lock_b:
print("Thread 2: acquired lock_b")
time.sleep(0.1) # gives Thread 1 time to acquire lock_a
print("Thread 2: waiting for lock_a ...")
with lock_a: # BLOCKS - Thread 1 holds lock_a
print("Thread 2: acquired lock_a")
t1 = threading.Thread(target=thread_one)
t2 = threading.Thread(target=thread_two)
t1.start()
t2.start()
t1.join() # hangs forever
t2.join() # hangs forever
The deadlock cycle:
Both threads are blocked. Neither can proceed. The program hangs until killed.
Prevention Strategy 1: Consistent Lock Ordering
If every thread always acquires locks in the same global order, a circular dependency cannot form.
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
# Establish a global order: always acquire lock_a before lock_b
def thread_one():
with lock_a: # order: A then B
with lock_b:
print("Thread 1 working")
def thread_two():
with lock_a: # same order: A then B - no circular dependency
with lock_b:
print("Thread 2 working")
Thread 2 blocks waiting for lock_a while Thread 1 holds it. When Thread 1 finishes and releases both locks, Thread 2 proceeds. No deadlock.
For programmatic lock ordering with arbitrary objects:
def acquire_both(lock1, lock2):
"""Acquire two locks in a consistent order based on id()."""
first, second = sorted([lock1, lock2], key=id)
with first:
with second:
yield # use as context manager
Prevention Strategy 2: Timeouts
Use acquire(timeout=...) to detect deadlocks rather than hang:
import threading
import logging
lock_a = threading.Lock()
lock_b = threading.Lock()
def safe_transfer():
if not lock_a.acquire(timeout=5.0):
logging.error("Could not acquire lock_a - possible deadlock")
return False
try:
if not lock_b.acquire(timeout=5.0):
logging.error("Could not acquire lock_b - possible deadlock")
return False
try:
# critical section
return True
finally:
lock_b.release()
finally:
lock_a.release()
:::danger Nested Locks Without Consistent Ordering = Deadlock Whenever you hold one lock and try to acquire another, you are creating a potential deadlock if any other code path does the same in a different order. Either enforce a strict global acquisition order, or use timeouts with retry/abort logic. Never hold multiple locks without a plan. :::
Part 4 - threading.Semaphore: Counting Concurrent Access
A Semaphore generalises a lock. Instead of allowing exactly one thread at a time, it allows up to N threads at a time. Internally it maintains a counter: acquire() decrements it, release() increments it. When the counter reaches zero, further acquire() calls block.
import threading
import time
# Allow at most 3 concurrent "workers"
semaphore = threading.Semaphore(3)
def worker(worker_id):
with semaphore:
print(f"Worker {worker_id}: started")
time.sleep(1)
print(f"Worker {worker_id}: done")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
# At most 3 workers run simultaneously despite 8 threads being created
Real-World Use Case: Database Connection Pool
import threading
import time
import contextlib
class ConnectionPool:
"""Limits concurrent database connections using a Semaphore."""
def __init__(self, max_connections: int = 5):
self._semaphore = threading.BoundedSemaphore(max_connections)
self._max = max_connections
self._active = 0
self._lock = threading.Lock()
@contextlib.contextmanager
def connection(self):
self._semaphore.acquire()
with self._lock:
self._active += 1
print(f"Connection acquired - active: {self._active}/{self._max}")
try:
yield self._get_connection()
finally:
with self._lock:
self._active -= 1
print(f"Connection released - active: {self._active}/{self._max}")
self._semaphore.release()
def _get_connection(self):
# In production: return actual db connection from pool
return {"conn": "db_connection_object"}
pool = ConnectionPool(max_connections=5)
def query_database(thread_id):
with pool.connection() as conn:
# Simulate query
time.sleep(0.5)
print(f"Thread {thread_id}: query complete")
# Launch 20 threads - at most 5 will have a connection at any moment
threads = [threading.Thread(target=query_database, args=(i,)) for i in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
BoundedSemaphore vs Semaphore
Semaphore | BoundedSemaphore | |
|---|---|---|
| Initial value | N | N |
| Can exceed initial value? | Yes - extra release() calls increment past N | No - raises ValueError |
| Use case | General counting | Resource pool (catch bugs where release is called too many times) |
:::tip Prefer BoundedSemaphore for Resource Pools
If you are using a semaphore to guard a fixed resource pool (connections, file handles, API slots), use BoundedSemaphore. An extra release() call - often a bug - will raise ValueError immediately rather than silently corrupting your pool count.
:::
Part 5 - threading.Event: Simple Signaling
An Event is a simple flag that threads can wait on. One thread sets the flag (set()), other threads blocking on wait() are woken up. The flag can be cleared with clear() and checked non-blocking with is_set().
import threading
import time
ready_event = threading.Event()
def producer():
print("Producer: preparing data...")
time.sleep(2) # simulate data preparation
print("Producer: data ready - signaling consumers")
ready_event.set() # wake up all waiting consumers
def consumer(consumer_id):
print(f"Consumer {consumer_id}: waiting for data...")
ready_event.wait() # blocks until set() is called
print(f"Consumer {consumer_id}: processing data")
# Start consumers first - they will wait
consumers = [threading.Thread(target=consumer, args=(i,)) for i in range(3)]
for c in consumers:
c.start()
# Start producer - will signal after 2 seconds
p = threading.Thread(target=producer)
p.start()
p.join()
for c in consumers:
c.join()
Event API Reference
event = threading.Event()
event.set() # set the flag - wakes all threads in wait()
event.clear() # clear the flag - future wait() calls will block again
event.is_set() # returns True if flag is set - non-blocking check
event.wait(timeout=5.0) # block until flag is set OR timeout seconds pass
# returns True if flag was set, False if timed out
One-Shot vs Reusable Events
# One-shot: signal once, all consumers proceed, never reset
startup_complete = threading.Event()
# Reusable: acts like a toggle - set then clear then set ...
data_available = threading.Event()
def producer_loop():
while True:
data = fetch_data()
data_available.set() # signal consumers
time.sleep(0) # yield to let consumers run
data_available.clear() # reset for next round
time.sleep(1)
Part 6 - threading.Condition: Coordinated Waiting
An Event is one-directional: one thread signals, others wake up. A Condition supports two-way coordination: one thread can notify specific waiters, and waiters can check a predicate before proceeding.
The canonical use case is the producer-consumer pattern:
import threading
import time
from collections import deque
buffer = deque()
MAX_SIZE = 5
condition = threading.Condition()
def producer(items):
for item in items:
with condition:
while len(buffer) >= MAX_SIZE:
print(f"Producer: buffer full, waiting...")
condition.wait() # releases lock and waits
buffer.append(item)
print(f"Producer: added {item} - buffer size: {len(buffer)}")
condition.notify_all() # wake up consumers
def consumer(consumer_id, count):
consumed = 0
while consumed < count:
with condition:
while not buffer:
condition.wait() # releases lock and waits
item = buffer.popleft()
consumed += 1
print(f"Consumer {consumer_id}: got {item}")
condition.notify_all() # wake up producer (may be full)
# Producer creates 20 items, two consumers each take 10
items = list(range(20))
p = threading.Thread(target=producer, args=(items,))
c1 = threading.Thread(target=consumer, args=(1, 10))
c2 = threading.Thread(target=consumer, args=(2, 10))
c1.start(); c2.start(); p.start()
p.join(); c1.join(); c2.join()
The Critical Pattern: Always Check in a while Loop
# WRONG - spurious wakeups and race conditions
with condition:
if not buffer: # check once
condition.wait()
item = buffer.popleft() # buffer might be empty again!
# RIGHT - re-check the predicate after every wakeup
with condition:
while not buffer: # check in loop
condition.wait()
item = buffer.popleft() # guaranteed: buffer is non-empty here
condition.wait() can return due to:
- A
notify()ornotify_all()call - A timeout expiration
- A spurious wakeup (rare but possible on some platforms)
Always re-check the condition in a while loop.
notify() vs notify_all()
condition.notify() # wakes ONE waiting thread (arbitrary which one)
condition.notify_all() # wakes ALL waiting threads
# Use notify() when: only one waiter can productively proceed
# Use notify_all() when: multiple waiters might be able to proceed
# or when you don't know which waiter to wake
Condition Wrapping an Existing Lock
rlock = threading.RLock()
condition = threading.Condition(rlock) # Condition wraps your lock
# Now condition.acquire() / release() use the RLock
# Useful when you need reentrant behavior with condition variables
Part 7 - threading.Barrier: Rendezvous Point for N Threads
A Barrier is used when you want N threads to all reach a certain point before any of them proceeds. Each thread calls barrier.wait() and blocks until all N threads have called it - then all are released simultaneously.
import threading
import time
import random
NUM_WORKERS = 4
barrier = threading.Barrier(NUM_WORKERS)
def worker(worker_id):
# Phase 1: each worker does its own preparation independently
prep_time = random.uniform(0.5, 2.0)
time.sleep(prep_time)
print(f"Worker {worker_id}: preparation done in {prep_time:.2f}s")
barrier.wait() # wait for ALL workers to finish preparation
print(f"Worker {worker_id}: all workers ready - starting phase 2")
# Phase 2: all workers start simultaneously
time.sleep(0.1)
print(f"Worker {worker_id}: phase 2 complete")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(NUM_WORKERS)]
for t in threads:
t.start()
for t in threads:
t.join()
Barrier with Action
The Barrier constructor accepts an optional action - a callable called by one thread (the last to arrive) before all are released:
def log_phase_transition():
print("=== All workers reached barrier - releasing ===")
barrier = threading.Barrier(NUM_WORKERS, action=log_phase_transition)
Barrier for Iterative Algorithms
Barriers shine in iterative parallel algorithms where each iteration requires all threads to complete before the next begins:
def parallel_iteration(worker_id, data_chunk, shared_results, barrier):
for iteration in range(10):
# Each thread processes its chunk
result = process(data_chunk)
shared_results[worker_id] = result
barrier.wait() # all threads finish this iteration before next starts
# Now it's safe to read other workers' results from shared_results
global_agg = aggregate(shared_results)
barrier.wait() # synchronize before starting next iteration
Part 8 - Asyncio Equivalents
Every threading synchronization primitive has an asyncio counterpart with the same semantics but designed for coroutines. The key difference: await instead of blocking, and they must not be shared across threads.
import asyncio
async def asyncio_lock_demo():
lock = asyncio.Lock()
async def worker(worker_id):
async with lock: # await-based acquire/release
print(f"Worker {worker_id}: inside critical section")
await asyncio.sleep(0.1)
await asyncio.gather(*[worker(i) for i in range(5)])
async def asyncio_semaphore_demo():
# Limit to 3 concurrent operations
semaphore = asyncio.Semaphore(3)
async def fetch(url):
async with semaphore:
await asyncio.sleep(0.5) # simulate HTTP request
return f"data from {url}"
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = await asyncio.gather(*[fetch(url) for url in urls])
return results
async def asyncio_event_demo():
event = asyncio.Event()
async def producer():
await asyncio.sleep(1)
event.set() # signal consumers
async def consumer(cid):
await event.wait() # non-blocking wait in async context
print(f"Consumer {cid}: event received")
await asyncio.gather(
producer(),
consumer(1),
consumer(2),
consumer(3),
)
Asyncio Condition
async def asyncio_condition_demo():
condition = asyncio.Condition()
buffer = []
async def producer():
for i in range(5):
async with condition:
buffer.append(i)
condition.notify_all()
await asyncio.sleep(0.1)
async def consumer():
async with condition:
while not buffer:
await condition.wait()
item = buffer.pop(0)
print(f"Consumed: {item}")
Asyncio vs Threading Primitives
| Threading | Asyncio | Notes |
|---|---|---|
threading.Lock | asyncio.Lock | async with instead of with |
threading.RLock | No direct equivalent | Rarely needed in async code |
threading.Semaphore | asyncio.Semaphore | Same counting semantics |
threading.Event | asyncio.Event | await event.wait() |
threading.Condition | asyncio.Condition | await condition.wait() |
threading.Barrier | asyncio.Barrier (3.11+) | Added in Python 3.11 |
:::warning Do Not Mix Threading and Asyncio Primitives
threading.Lock is not safe to use inside an asyncio coroutine - it blocks the entire event loop, preventing all other coroutines from running. Inside async code, always use asyncio.Lock, asyncio.Semaphore, etc.
:::
Full Example: Connection Pool with Semaphore and Event
This production-grade example combines BoundedSemaphore, Lock, Event, and threading.local() for a thread-safe database connection pool:
import threading
import time
import contextlib
import logging
from typing import Optional
logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
class DatabaseConnectionPool:
"""
Thread-safe connection pool backed by BoundedSemaphore.
- BoundedSemaphore limits concurrent connections
- Lock protects the free-connection list
- Event signals when connections become available
- BoundedSemaphore raises ValueError on over-release (bug detection)
"""
def __init__(self, dsn: str, max_connections: int = 5):
self._dsn = dsn
self._max = max_connections
self._semaphore = threading.BoundedSemaphore(max_connections)
self._lock = threading.Lock()
self._free_connections: list = []
self._all_connections: list = []
self._closed = False
self._shutdown_event = threading.Event()
# Pre-create all connections
for i in range(max_connections):
conn = self._create_connection(i)
self._free_connections.append(conn)
self._all_connections.append(conn)
logging.info(f"Pool ready: {max_connections} connections to {dsn}")
def _create_connection(self, conn_id: int) -> dict:
"""Simulate creating a database connection."""
return {"id": conn_id, "dsn": self._dsn, "queries": 0}
@contextlib.contextmanager
def acquire(self, timeout: float = 30.0):
"""Acquire a connection from the pool."""
if self._closed:
raise RuntimeError("Connection pool is closed")
acquired = self._semaphore.acquire(timeout=timeout)
if not acquired:
raise TimeoutError(
f"Could not acquire connection within {timeout}s "
f"(max_connections={self._max})"
)
conn: Optional[dict] = None
try:
with self._lock:
conn = self._free_connections.pop()
logging.info(f"Acquired connection {conn['id']}")
yield conn
conn["queries"] += 1
finally:
if conn is not None:
with self._lock:
self._free_connections.append(conn)
logging.info(f"Released connection {conn['id']}")
self._semaphore.release() # BoundedSemaphore: raises if over-released
def close(self):
"""Shut down the pool gracefully."""
self._closed = True
self._shutdown_event.set()
logging.info(f"Pool closed. Total queries: "
f"{sum(c['queries'] for c in self._all_connections)}")
def run_query(pool: DatabaseConnectionPool, query_id: int):
try:
with pool.acquire(timeout=10.0) as conn:
logging.info(f"Running query {query_id} on connection {conn['id']}")
time.sleep(0.3) # simulate query time
except TimeoutError as e:
logging.error(f"Query {query_id} failed: {e}")
if __name__ == "__main__":
pool = DatabaseConnectionPool("postgresql://localhost/mydb", max_connections=5)
# Launch 15 concurrent queries - pool limits to 5 at a time
threads = [
threading.Thread(
target=run_query,
args=(pool, i),
name=f"Query-{i}"
)
for i in range(15)
]
for t in threads:
t.start()
for t in threads:
t.join()
pool.close()
Synchronization Primitive Decision Guide
Graded Practice Challenges
Beginner - Thread-Safe Counter
Implement a SafeCounter class that multiple threads can increment and decrement without race conditions. It must also support a reset() method.
import threading
class SafeCounter:
def __init__(self):
# Your implementation here
pass
def increment(self):
pass
def decrement(self):
pass
def reset(self):
pass
@property
def value(self):
pass
# Test:
counter = SafeCounter()
threads = [threading.Thread(target=counter.increment) for _ in range(1000)]
threads += [threading.Thread(target=counter.decrement) for _ in range(500)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter.value) # Should reliably print 500
Show Solution
import threading
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
def reset(self):
with self._lock:
self._value = 0
@property
def value(self):
with self._lock:
return self._value
counter = SafeCounter()
threads = [threading.Thread(target=counter.increment) for _ in range(1000)]
threads += [threading.Thread(target=counter.decrement) for _ in range(500)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter.value) # 500
The value property also acquires the lock. Without this, a thread reading _value could see a partially-written integer on some platforms, though CPython's GIL makes this unlikely in practice. Correct code acquires the lock for reads too.
Intermediate - Rate Limiter Using Semaphore and Event
Build a RateLimiter that allows at most N operations per second. Use a BoundedSemaphore to track available "tokens" and a background thread to replenish them every second.
import threading
import time
class RateLimiter:
"""Token bucket rate limiter: allows up to `rate` operations per second."""
def __init__(self, rate: int):
# Your implementation here
pass
def acquire(self):
"""Block until a token is available."""
pass
def close(self):
"""Stop the background replenishment thread."""
pass
# Test:
limiter = RateLimiter(rate=3) # 3 ops/sec
start = time.time()
for i in range(9):
limiter.acquire()
print(f"Op {i} at t={time.time() - start:.2f}s")
limiter.close()
# Ops 0-2 should happen near t=0, ops 3-5 near t=1, ops 6-8 near t=2
Show Solution
import threading
import time
class RateLimiter:
def __init__(self, rate: int):
self._rate = rate
self._tokens = threading.BoundedSemaphore(rate)
self._stop = threading.Event()
# Background thread replenishes tokens every second
self._replenisher = threading.Thread(
target=self._replenish,
daemon=True,
name="RateLimiter-Replenisher"
)
self._replenisher.start()
def _replenish(self):
while not self._stop.wait(timeout=1.0):
for _ in range(self._rate):
try:
self._tokens.release()
except ValueError:
pass # semaphore already at max - tokens unused last second
def acquire(self):
self._tokens.acquire() # blocks until a token is available
def close(self):
self._stop.set()
self._replenisher.join(timeout=2.0)
# Test
limiter = RateLimiter(rate=3)
start = time.time()
for i in range(9):
limiter.acquire()
print(f"Op {i} at t={time.time() - start:.2f}s")
limiter.close()
Key insight: BoundedSemaphore(rate) starts with rate tokens. The replenisher calls release() up to rate times per second, but BoundedSemaphore prevents the count from exceeding the initial value - so unused tokens from one second do not accumulate into a burst in the next.
Advanced - Deadlock-Free Multi-Lock Context Manager
Implement a context manager multi_lock(*locks) that acquires any number of locks in a consistent order (by id()) to prevent deadlocks. It must handle exceptions and release all locks on exit.
import threading
from contextlib import contextmanager
@contextmanager
def multi_lock(*locks):
"""Acquire multiple locks in a consistent global order to prevent deadlocks."""
# Your implementation here
pass
# Test: these two threads should NOT deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()
def operation_one():
with multi_lock(lock_a, lock_b): # acquires in id order
time.sleep(0.1)
print("Operation 1 complete")
def operation_two():
with multi_lock(lock_b, lock_a): # same id order despite different arg order
time.sleep(0.1)
print("Operation 2 complete")
import time
t1 = threading.Thread(target=operation_one)
t2 = threading.Thread(target=operation_two)
t1.start(); t2.start()
t1.join(); t2.join()
print("Both operations completed - no deadlock")
Show Solution
import threading
import time
from contextlib import contextmanager
@contextmanager
def multi_lock(*locks):
"""
Acquire multiple locks in a consistent global order (by id) to prevent deadlocks.
Releases all locks in reverse order on exit, even if an exception occurs.
"""
# Sort by id() to establish a global consistent acquisition order
sorted_locks = sorted(set(locks), key=id)
acquired = []
try:
for lock in sorted_locks:
lock.acquire()
acquired.append(lock)
yield
finally:
# Release in reverse acquisition order
for lock in reversed(acquired):
lock.release()
# Test
lock_a = threading.Lock()
lock_b = threading.Lock()
def operation_one():
with multi_lock(lock_a, lock_b):
time.sleep(0.1)
print("Operation 1 complete")
def operation_two():
with multi_lock(lock_b, lock_a): # args reversed - but id order is the same
time.sleep(0.1)
print("Operation 2 complete")
t1 = threading.Thread(target=operation_one)
t2 = threading.Thread(target=operation_two)
t1.start(); t2.start()
t1.join(); t2.join()
print("Both operations completed - no deadlock")
Why it works: sorted(locks, key=id) assigns a total order based on memory address. Both operation_one(lock_a, lock_b) and operation_two(lock_b, lock_a) resolve to the same acquisition sequence after sorting. A circular wait is impossible because both threads always request locks in the same order.
The set(locks) call deduplicates: if the same lock appears twice in the arguments, it is only acquired once (avoids self-deadlock with Lock; use RLock if true re-entrancy is needed).
Key Takeaways
threading.Lockallows exactly one thread at a time - always usewith lock:, never acquire/release manuallythreading.RLock(reentrant lock) allows the same thread to acquire the lock multiple times - use it when a method calls another method that needs the same lock- Deadlock occurs when threads form a circular lock dependency - prevent it with consistent lock acquisition ordering or timeouts
threading.Semaphore(N)allows up to N threads concurrently - use it for resource pools (DB connections, API rate limits)BoundedSemaphoreraisesValueErroron over-release - prefer it overSemaphorefor resource pools as a bug-detection mechanismthreading.Eventis a simple flag:set()wakes all waiters,clear()resets it,wait()blocks until set - use for one-way signalingthreading.Conditionwraps a lock and addswait()/notify()/notify_all()- always check the condition in awhileloop, not anifthreading.Barrier(N)makes N threads wait for each other at a checkpoint before any proceeds - use for iterative parallel algorithms- Every threading primitive has an asyncio counterpart (
asyncio.Lock,asyncio.Semaphore,asyncio.Event) - never mix threading and asyncio primitives
What's Next
Lesson 07 - ThreadPoolExecutor and ProcessPoolExecutor
Writing raw threading.Thread code requires you to manage creation, starting, joining, and exception propagation for every single thread. concurrent.futures.ThreadPoolExecutor handles all of that in 3 lines - and adds Future objects, as_completed() for latency-optimal result handling, and a unified interface that switches between thread and process pools by changing one class name.
