Python Race Conditions Practice Problems & Exercises
Practice: Race Conditions and Thread Safety
← Back to lessonIdentify the read-modify-write race condition in a shared counter and fix it with a lock.
Solution:
import threading
count = 0
lock = threading.Lock()
def increment_safe():
global count
for _ in range(1000):
with lock:
count += 1
def run_safe():
global count
count = 0
threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
return count
print('Safe count:', run_safe())
import threading
# This code has a race condition. The final count is NOT always 10000.
# TODO: Identify the race, fix it, and verify the count is always 10000.
count = 0
def increment_unsafe():
global count
for _ in range(1000):
count += 1 # This is NOT atomic: read -> add -> write
def run_unsafe():
threads = [threading.Thread(target=increment_unsafe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
return count
def increment_safe():
# TODO: Fix the race condition
pass
def run_safe():
global count
count = 0
threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
return count
print('Safe count:', run_safe())
Expected Output
Safe count: 10000Hints
Hint 1: count += 1 is three operations: LOAD count, ADD 1, STORE count.
Hint 2: Between LOAD and STORE, another thread can modify count.
Hint 3: Fix: use a threading.Lock() to make the read-modify-write atomic.
Fix a Time-of-Check-to-Time-of-Use (TOCTOU) race in a bank withdrawal by atomizing check and act.
Solution:
import threading
lock = threading.Lock()
def safe_withdraw(amount):
global balance
with lock:
if balance >= amount:
balance -= amount
return True
return False
import threading
import time
balance = 100
def unsafe_withdraw(amount):
global balance
if balance >= amount: # CHECK
time.sleep(0.001) # Simulate processing delay
balance -= amount # ACT — race window here!
return True
return False
# Run two concurrent withdrawals of 80 each — balance is only 100!
# This can result in negative balance.
def safe_withdraw(amount):
# TODO: Fix the TOCTOU vulnerability
pass
def test_safe():
global balance
balance = 100
results = []
lock = threading.Lock() # hint: you'll need this
def worker(amt, results_list):
result = safe_withdraw(amt)
results_list.append(result)
threads = [threading.Thread(target=worker, args=(80, results)) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
return balance, sorted(results, reverse=True)
final_balance, results = test_safe()
print(f'Balance: {final_balance}') # Should be 20, not -60
print(f'Results: {results}') # One True, one False
Expected Output
Balance: 20
Results: [True, False]Hints
Hint 1: The check (balance >= amount) and act (balance -= amount) must be atomic.
Hint 2: Use a lock to wrap both operations together.
Hint 3: After acquiring the lock, re-check the condition — by then another thread may have withdrawn.
Fix a lazy initialization race condition to ensure a singleton is created exactly once.
Solution:
import threading
_singleton_lock = threading.Lock()
def get_instance_safe():
global instance
if instance is None:
with _singleton_lock:
if instance is None:
instance = ExpensiveResource()
return instance
import threading
import time
instance = None
init_count = [0]
class ExpensiveResource:
def __init__(self):
init_count[0] += 1
time.sleep(0.05) # Simulate expensive setup
def get_instance_unsafe():
global instance
if instance is None: # Multiple threads can pass this check
instance = ExpensiveResource()
return instance
# TODO: Implement a thread-safe singleton using a lock
def get_instance_safe():
pass
def test_safe():
global instance, init_count
instance = None
init_count[0] = 0
threads = [threading.Thread(target=get_instance_safe) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
return init_count[0]
count = test_safe()
print(f'Init count: {count}') # Should be exactly 1
Expected Output
Init count: 1Hints
Hint 1: Use a module-level lock: _lock = threading.Lock()
Hint 2: Double-checked locking: check outside lock (fast path), check again inside (safe path).
Hint 3: with _lock: if instance is None: instance = ExpensiveResource()
Verify that list.append() is GIL-protected and understand which list operations are and aren't thread-safe.
Solution:
import threading
results_safe = []
def safe_appender(val):
results_safe.append(val)
def test_safe_append():
results_safe.clear()
threads = [threading.Thread(target=safe_appender, args=(i,)) for i in range(100)]
for t in threads: t.start()
for t in threads: t.join()
return len(results_safe)
count = test_safe_append()
print(f'Safe append count: {count}')
print(f'All values present: {len(set(results_safe)) == 100}')
import threading
# Python's list.append() is GIL-protected — it IS thread-safe.
# But building a list with += is NOT atomic.
results_safe = []
results_unsafe = []
def safe_appender(val):
results_safe.append(val) # Atomic under GIL
def unsafe_adder(val):
# This is NOT atomic: reads list, creates new list, assigns back
# results_unsafe += [val] # Unsafe!
pass
# TODO: Verify that 100 threads each appending once gives 100 items.
# Also demonstrate why += is dangerous (show it with a local copy trick).
def test_safe_append():
results_safe.clear()
threads = [threading.Thread(target=safe_appender, args=(i,)) for i in range(100)]
for t in threads: t.start()
for t in threads: t.join()
return len(results_safe)
count = test_safe_append()
print(f'Safe append count: {count}')
print(f'All values present: {len(set(results_safe)) == 100}')
Expected Output
Safe append count: 100
All values present: TrueHints
Hint 1: list.append() is atomic under the GIL — no explicit lock needed for appending.
Hint 2: list += [item] is NOT atomic: it calls __iadd__ which involves multiple bytecode ops.
Hint 3: For dict operations, .update() and direct assignment are GIL-safe but compound ops are not.
Implement a readers-writer lock that allows concurrent reads but serializes writes.
Solution:
class RWLock:
def __init__(self):
self._read_count = 0
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()
def acquire_read(self):
with self._read_lock:
self._read_count += 1
if self._read_count == 1:
self._write_lock.acquire()
def release_read(self):
with self._read_lock:
self._read_count -= 1
if self._read_count == 0:
self._write_lock.release()
def acquire_write(self):
self._write_lock.acquire()
def release_write(self):
self._write_lock.release()
import threading
import time
cache = {}
def update_cache(key, value):
cache[key] = value
def read_cache(key):
return cache.get(key)
# TODO: Simulate a race condition in a shared cache:
# - 3 writer threads each update 100 keys
# - 5 reader threads each read 100 keys repeatedly
# Implement a RWLock (readers-writer lock) that allows
# concurrent reads but exclusive writes.
class RWLock:
def __init__(self):
self._read_count = 0
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()
def acquire_read(self):
pass
def release_read(self):
pass
def acquire_write(self):
pass
def release_write(self):
pass
# Test: verify no RuntimeError during concurrent access
def test_rwlock():
rw = RWLock()
errors = []
def writer(n):
for i in range(50):
rw.acquire_write()
try:
cache[f'key{n}'] = i
except Exception as e:
errors.append(str(e))
finally:
rw.release_write()
def reader():
for _ in range(50):
rw.acquire_read()
try:
_ = dict(cache)
except Exception as e:
errors.append(str(e))
finally:
rw.release_read()
threads = [threading.Thread(target=writer, args=(n,)) for n in range(3)]
threads += [threading.Thread(target=reader) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
return errors
errors = test_rwlock()
print(f'Errors: {len(errors)}')
print(f'Cache size: {len(cache)}')
Expected Output
Errors: 0
Cache size: 3Hints
Hint 1: acquire_read: lock read_lock, increment read_count, if first reader acquire write_lock, unlock read_lock.
Hint 2: release_read: lock read_lock, decrement read_count, if last reader release write_lock, unlock read_lock.
Hint 3: acquire_write: acquire write_lock directly.
Fix an asyncio race condition by wrapping a check-then-act sequence in an asyncio.Lock.
Solution:
import asyncio
async_lock = asyncio.Lock()
async def withdraw_safe(amount, name):
global balance
async with async_lock:
if balance >= amount:
await asyncio.sleep(0.01)
balance -= amount
return f'{name}: withdrew {amount}'
return f'{name}: insufficient funds'
import asyncio
# In asyncio, race conditions happen between yield points (await).
# Operations between awaits are atomic — no other coroutine can run.
balance = 1000
async def withdraw_unsafe(amount, name):
global balance
if balance >= amount:
await asyncio.sleep(0.01) # Yield point — another coroutine can run here!
balance -= amount
return f'{name}: withdrew {amount}'
return f'{name}: insufficient funds'
# This is vulnerable: both tasks check balance, both see 1000, both withdraw.
async def test_race():
global balance
balance = 1000
results = await asyncio.gather(
withdraw_unsafe(800, 'Alice'),
withdraw_unsafe(800, 'Bob'),
)
return results, balance
# TODO: Fix withdraw_unsafe to be race-free using asyncio.Lock
async_lock = asyncio.Lock()
async def withdraw_safe(amount, name):
pass
async def test_safe():
global balance
balance = 1000
results = await asyncio.gather(
withdraw_safe(800, 'Alice'),
withdraw_safe(800, 'Bob'),
)
return results, balance
results, final = asyncio.run(test_safe())
print(f'Results: {sorted(results)}')
print(f'Final balance: {final}')
Expected Output
Results: ['Alice: withdrew 800', 'Bob: insufficient funds']
Final balance: 200Hints
Hint 1: asyncio.Lock() is the async equivalent of threading.Lock().
Hint 2: async with async_lock: wraps the check-and-act atomically.
Hint 3: Between the check and the deduction, no other coroutine can run.
Fix a race condition in a shared dictionary word counter using a lock.
Solution:
import threading
word_counts = {}
lock = threading.Lock()
def count_word_safe(word):
with lock:
word_counts[word] = word_counts.get(word, 0) + 1
import threading
word_counts = {}
def count_word_unsafe(word):
if word not in word_counts:
word_counts[word] = 0
word_counts[word] += 1
# TODO: Fix count_word to be thread-safe.
# Use dict.setdefault() or a lock — or both.
lock = threading.Lock()
def count_word_safe(word):
pass
def test_safe_count():
word_counts.clear()
words = ['python'] * 500 + ['java'] * 300 + ['rust'] * 200
import random
random.shuffle(words)
threads = [threading.Thread(target=count_word_safe, args=(w,)) for w in words]
for t in threads: t.start()
for t in threads: t.join()
return dict(word_counts)
result = test_safe_count()
print(f'python: {result.get("python")}')
print(f'java: {result.get("java")}')
print(f'rust: {result.get("rust")}')
Expected Output
python: 500
java: 300
rust: 200Hints
Hint 1: The check-then-set (if word not in dict: dict[word] = 0; dict[word] += 1) is a race.
Hint 2: Solution 1: wrap with a lock. Solution 2: use collections.Counter.
Hint 3: dict.setdefault(key, 0) is atomic under the GIL but += is still a separate operation.
Use O_CREAT | O_EXCL flags to atomically create a file, eliminating the TOCTOU race in file creation.
Solution:
import os
def safe_create_file(path):
try:
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
os.write(fd, b'created')
os.close(fd)
return 'created'
except FileExistsError:
return 'exists'
import os
import tempfile
import threading
# Race: check-then-create leaves a window where two threads both
# "see" the file doesn't exist and both try to create it.
def unsafe_create_file(path):
if not os.path.exists(path):
with open(path, 'w') as f:
f.write('created')
else:
return 'exists'
# TODO: Implement safe_create_file using O_CREAT | O_EXCL flags
# which create the file atomically (fails if it already exists).
# Return 'created' on success, 'exists' on failure.
def safe_create_file(path):
pass
def test_exclusive_create():
tmpdir = tempfile.mkdtemp()
path = os.path.join(tmpdir, 'test.lock')
results = []
lock = threading.Lock()
def worker():
result = safe_create_file(path)
with lock:
results.append(result)
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
created = results.count('created')
exists = results.count('exists')
os.unlink(path)
os.rmdir(tmpdir)
return created, exists
created, exists = test_exclusive_create()
print(f'Created: {created} (should be 1)')
print(f'Already existed: {exists} (should be 9)')
Expected Output
Created: 1 (should be 1)
Already existed: 9 (should be 9)Hints
Hint 1: Use os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644).
Hint 2: O_EXCL combined with O_CREAT makes the operation atomic — raises FileExistsError if file exists.
Hint 3: Catch FileExistsError and return 'exists'.
Use a canary-based race detector to empirically demonstrate a race condition and verify the fix.
Solution:
def safe_critical_section(detector, lock, n):
for _ in range(n):
with lock:
detector.enter()
detector.exit()
import threading
import time
import random
# A "canary" detects races: set a flag inside a critical section.
# If two threads enter simultaneously, the canary catches it.
class RaceDetector:
def __init__(self):
self.inside = False
self.violations = 0
def enter(self):
if self.inside:
self.violations += 1
self.inside = True
time.sleep(random.uniform(0, 0.001)) # Widen the race window
def exit(self):
self.inside = False
# TODO: Write an unsafe function using the detector,
# then write a safe version. Verify the unsafe version has violations
# and the safe version has zero.
def unsafe_critical_section(detector, n):
for _ in range(n):
detector.enter()
# critical work here
detector.exit()
def safe_critical_section(detector, lock, n):
pass
def test_unsafe():
detector = RaceDetector()
threads = [threading.Thread(target=unsafe_critical_section, args=(detector, 20)) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
return detector.violations
def test_safe():
detector = RaceDetector()
lock = threading.Lock()
threads = [threading.Thread(target=safe_critical_section, args=(detector, lock, 20)) for _ in range(5)]
for t in threads: t.start()
for t in threads: t.join()
return detector.violations
unsafe_v = test_unsafe()
safe_v = test_safe()
print(f'Unsafe violations: {unsafe_v} (expected > 0)')
print(f'Safe violations: {safe_v} (expected 0)')
Expected Output
Unsafe violations: X (expected > 0)
Safe violations: 0 (expected 0)Hints
Hint 1: safe_critical_section: acquire lock, call detector.enter(), do work, detector.exit(), release lock.
Hint 2: The lock ensures only one thread is inside the critical section at a time.
Hint 3: The canary (detector.inside flag) catches overlapping entries.
Demonstrate the ABA problem and fix it using a stamped atomic reference with a version counter.
Solution:
The StampedRef class above is the solution — it uses a stamp (version counter) alongside the value so that an A→B→A sequence is detected because the stamp will be 2, not 0.
# The key insight: StampedRef.compare_and_set requires BOTH
# the expected value AND the expected stamp to match.
# After thread2 does A->B->A, the stamp is 2.
# Thread1 has stamp=0 from before the changes, so CAS fails.
import threading
import time
# The ABA problem: a value changes from A to B then back to A.
# A compare-and-swap (CAS) sees A and incorrectly thinks nothing changed.
class AtomicRef:
def __init__(self, value):
self._value = value
self._lock = threading.Lock()
def get(self):
return self._value
def compare_and_set(self, expected, new_value):
with self._lock:
if self._value == expected:
self._value = new_value
return True
return False
# TODO: Simulate the ABA problem:
# Thread 1 reads value A, gets preempted.
# Thread 2 changes A -> B -> A.
# Thread 1 resumes, sees A, CAS succeeds -- but the world changed.
# Demonstrate and then fix using a version counter (stamped reference).
class StampedRef:
def __init__(self, value):
self._value = value
self._stamp = 0
self._lock = threading.Lock()
def get(self):
with self._lock:
return self._value, self._stamp
def compare_and_set(self, expected_value, expected_stamp, new_value):
with self._lock:
if self._value == expected_value and self._stamp == expected_stamp:
self._value = new_value
self._stamp += 1
return True
return False
def demonstrate_aba():
ref = AtomicRef('A')
results = []
def thread1():
old = ref.get() # sees 'A'
time.sleep(0.05) # gets preempted
# After sleep, world changed A->B->A, but CAS still succeeds!
success = ref.compare_and_set(old, 'C')
results.append(('thread1', success))
def thread2():
time.sleep(0.01)
ref.compare_and_set('A', 'B') # A -> B
ref.compare_and_set('B', 'A') # B -> A (ABA!)
results.append(('thread2', 'done'))
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start(); t2.start()
t1.join(); t2.join()
return results, ref.get()
results, final = demonstrate_aba()
print('ABA demonstration:', results)
print('Final value:', final) # Thread1's CAS succeeded despite the ABA!
# Now demonstrate StampedRef prevents this
def demonstrate_stamped():
ref = StampedRef('A')
results = []
def thread1():
val, stamp = ref.get() # sees ('A', 0)
time.sleep(0.05)
success = ref.compare_and_set(val, stamp, 'C')
results.append(('thread1', success))
def thread2():
time.sleep(0.01)
v, s = ref.get()
ref.compare_and_set(v, s, 'B')
v, s = ref.get()
ref.compare_and_set(v, s, 'A')
results.append(('thread2', 'done'))
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)
t1.start(); t2.start()
t1.join(); t2.join()
val, stamp = ref.get()
return results, val, stamp
results2, val, stamp = demonstrate_stamped()
print('Stamped ref:', results2)
print('Thread1 CAS succeeded:', results2[0][1] if results2 else None)
Expected Output
ABA demonstration: [('thread2', 'done'), ('thread1', True)]
Final value: C
Stamped ref: [('thread2', 'done'), ('thread1', False)]
Thread1 CAS succeeded: FalseHints
Hint 1: The ABA problem: value is A, changes to B, changes back to A — CAS sees A and succeeds incorrectly.
Hint 2: StampedRef adds a monotonically increasing stamp — even if value returns to A, the stamp differs.
Hint 3: CAS with stamp: both value AND stamp must match for the operation to succeed.
Implement safe data publishing with a lock to ensure full visibility of published data to readers.
Solution:
import copy
class DataPublisher:
def __init__(self):
self._data = None
self._ready = False
self._lock = threading.Lock()
def publish(self, data):
with self._lock:
self._data = data
self._ready = True
def read_if_ready(self):
with self._lock:
if self._ready:
return copy.copy(self._data)
return None
import threading
import time
# In Python, the GIL prevents true CPU-level races, but logical races remain.
# This problem explores happens-before relationships and visibility.
# The "published before read" pattern: a writer publishes data,
# a reader must see the latest published version.
class DataPublisher:
def __init__(self):
self._data = None
self._ready = False
self._lock = threading.Lock()
def publish(self, data):
pass
def read_if_ready(self):
pass
def test_publisher():
pub = DataPublisher()
read_results = []
errors = []
def writer():
for i in range(10):
pub.publish({'iteration': i, 'value': i * i})
time.sleep(0.01)
def reader():
for _ in range(30):
result = pub.read_if_ready()
if result is not None:
read_results.append(result)
time.sleep(0.005)
wt = threading.Thread(target=writer)
rt = threading.Thread(target=reader)
wt.start(); rt.start()
wt.join(); rt.join()
return len(read_results) > 0, all(isinstance(r, dict) for r in read_results)
has_results, all_valid = test_publisher()
print(f'Got results: {has_results}')
print(f'All valid dicts: {all_valid}')
Expected Output
Got results: True
All valid dicts: TrueHints
Hint 1: publish(): acquire lock, set self._data, set self._ready = True, release lock.
Hint 2: read_if_ready(): acquire lock, check _ready, return copy of _data if ready, release lock.
Hint 3: The lock ensures visibility: the reader always sees a fully-constructed data dict.
