Skip to main content

Python Race Conditions Practice Problems & Exercises

Practice: Race Conditions and Thread Safety

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1Spot the Race ConditionEasy
race conditionread-modify-writeidentify

Identify the read-modify-write race condition in a shared counter and fix it with a lock.

Solution:

import threading

count = 0
lock = threading.Lock()

def increment_safe():
global count
for _ in range(1000):
with lock:
count += 1

def run_safe():
global count
count = 0
threads = [threading.Thread(target=increment_safe) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
return count

print('Safe count:', run_safe())
import threading

# This code has a race condition. The final count is NOT always 10000.
# TODO: Identify the race, fix it, and verify the count is always 10000.

count = 0

def increment_unsafe():
  global count
  for _ in range(1000):
      count += 1  # This is NOT atomic: read -> add -> write

def run_unsafe():
  threads = [threading.Thread(target=increment_unsafe) for _ in range(10)]
  for t in threads: t.start()
  for t in threads: t.join()
  return count

def increment_safe():
  # TODO: Fix the race condition
  pass

def run_safe():
  global count
  count = 0
  threads = [threading.Thread(target=increment_safe) for _ in range(10)]
  for t in threads: t.start()
  for t in threads: t.join()
  return count

print('Safe count:', run_safe())
Expected Output
Safe count: 10000
Hints

Hint 1: count += 1 is three operations: LOAD count, ADD 1, STORE count.

Hint 2: Between LOAD and STORE, another thread can modify count.

Hint 3: Fix: use a threading.Lock() to make the read-modify-write atomic.


#2Check-Then-Act Race (TOCTOU)Easy
TOCTOUcheck-then-actrace

Fix a Time-of-Check-to-Time-of-Use (TOCTOU) race in a bank withdrawal by atomizing check and act.

Solution:

import threading

lock = threading.Lock()

def safe_withdraw(amount):
global balance
with lock:
if balance >= amount:
balance -= amount
return True
return False
import threading
import time

balance = 100

def unsafe_withdraw(amount):
  global balance
  if balance >= amount:  # CHECK
      time.sleep(0.001)  # Simulate processing delay
      balance -= amount  # ACT — race window here!
      return True
  return False

# Run two concurrent withdrawals of 80 each — balance is only 100!
# This can result in negative balance.

def safe_withdraw(amount):
  # TODO: Fix the TOCTOU vulnerability
  pass

def test_safe():
  global balance
  balance = 100
  results = []
  lock = threading.Lock()  # hint: you'll need this

  def worker(amt, results_list):
      result = safe_withdraw(amt)
      results_list.append(result)

  threads = [threading.Thread(target=worker, args=(80, results)) for _ in range(2)]
  for t in threads: t.start()
  for t in threads: t.join()

  return balance, sorted(results, reverse=True)

final_balance, results = test_safe()
print(f'Balance: {final_balance}')  # Should be 20, not -60
print(f'Results: {results}')       # One True, one False
Expected Output
Balance: 20
Results: [True, False]
Hints

Hint 1: The check (balance >= amount) and act (balance -= amount) must be atomic.

Hint 2: Use a lock to wrap both operations together.

Hint 3: After acquiring the lock, re-check the condition — by then another thread may have withdrawn.


#3Lazy Initialization RaceEasy
lazy initsingletondouble-checked locking

Fix a lazy initialization race condition to ensure a singleton is created exactly once.

Solution:

import threading

_singleton_lock = threading.Lock()

def get_instance_safe():
global instance
if instance is None:
with _singleton_lock:
if instance is None:
instance = ExpensiveResource()
return instance
import threading
import time

instance = None
init_count = [0]

class ExpensiveResource:
  def __init__(self):
      init_count[0] += 1
      time.sleep(0.05)  # Simulate expensive setup

def get_instance_unsafe():
  global instance
  if instance is None:  # Multiple threads can pass this check
      instance = ExpensiveResource()
  return instance

# TODO: Implement a thread-safe singleton using a lock
def get_instance_safe():
  pass

def test_safe():
  global instance, init_count
  instance = None
  init_count[0] = 0

  threads = [threading.Thread(target=get_instance_safe) for _ in range(5)]
  for t in threads: t.start()
  for t in threads: t.join()

  return init_count[0]

count = test_safe()
print(f'Init count: {count}')  # Should be exactly 1
Expected Output
Init count: 1
Hints

Hint 1: Use a module-level lock: _lock = threading.Lock()

Hint 2: Double-checked locking: check outside lock (fast path), check again inside (safe path).

Hint 3: with _lock: if instance is None: instance = ExpensiveResource()


#4List Append — Is It Thread-Safe?Easy
GILlist.appendthread-safe operations

Verify that list.append() is GIL-protected and understand which list operations are and aren't thread-safe.

Solution:

import threading

results_safe = []

def safe_appender(val):
results_safe.append(val)

def test_safe_append():
results_safe.clear()
threads = [threading.Thread(target=safe_appender, args=(i,)) for i in range(100)]
for t in threads: t.start()
for t in threads: t.join()
return len(results_safe)

count = test_safe_append()
print(f'Safe append count: {count}')
print(f'All values present: {len(set(results_safe)) == 100}')
import threading

# Python's list.append() is GIL-protected — it IS thread-safe.
# But building a list with += is NOT atomic.

results_safe = []
results_unsafe = []

def safe_appender(val):
  results_safe.append(val)  # Atomic under GIL

def unsafe_adder(val):
  # This is NOT atomic: reads list, creates new list, assigns back
  # results_unsafe += [val]  # Unsafe!
  pass

# TODO: Verify that 100 threads each appending once gives 100 items.
# Also demonstrate why += is dangerous (show it with a local copy trick).

def test_safe_append():
  results_safe.clear()
  threads = [threading.Thread(target=safe_appender, args=(i,)) for i in range(100)]
  for t in threads: t.start()
  for t in threads: t.join()
  return len(results_safe)

count = test_safe_append()
print(f'Safe append count: {count}')
print(f'All values present: {len(set(results_safe)) == 100}')
Expected Output
Safe append count: 100
All values present: True
Hints

Hint 1: list.append() is atomic under the GIL — no explicit lock needed for appending.

Hint 2: list += [item] is NOT atomic: it calls __iadd__ which involves multiple bytecode ops.

Hint 3: For dict operations, .update() and direct assignment are GIL-safe but compound ops are not.


#5Dict Race — Concurrent Reads and WritesMedium
dictconcurrent modificationraceRWLock

Implement a readers-writer lock that allows concurrent reads but serializes writes.

Solution:

class RWLock:
def __init__(self):
self._read_count = 0
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()

def acquire_read(self):
with self._read_lock:
self._read_count += 1
if self._read_count == 1:
self._write_lock.acquire()

def release_read(self):
with self._read_lock:
self._read_count -= 1
if self._read_count == 0:
self._write_lock.release()

def acquire_write(self):
self._write_lock.acquire()

def release_write(self):
self._write_lock.release()
import threading
import time

cache = {}

def update_cache(key, value):
  cache[key] = value

def read_cache(key):
  return cache.get(key)

# TODO: Simulate a race condition in a shared cache:
# - 3 writer threads each update 100 keys
# - 5 reader threads each read 100 keys repeatedly
# Implement a RWLock (readers-writer lock) that allows
# concurrent reads but exclusive writes.

class RWLock:
  def __init__(self):
      self._read_count = 0
      self._read_lock = threading.Lock()
      self._write_lock = threading.Lock()

  def acquire_read(self):
      pass

  def release_read(self):
      pass

  def acquire_write(self):
      pass

  def release_write(self):
      pass

# Test: verify no RuntimeError during concurrent access
def test_rwlock():
  rw = RWLock()
  errors = []

  def writer(n):
      for i in range(50):
          rw.acquire_write()
          try:
              cache[f'key{n}'] = i
          except Exception as e:
              errors.append(str(e))
          finally:
              rw.release_write()

  def reader():
      for _ in range(50):
          rw.acquire_read()
          try:
              _ = dict(cache)
          except Exception as e:
              errors.append(str(e))
          finally:
              rw.release_read()

  threads = [threading.Thread(target=writer, args=(n,)) for n in range(3)]
  threads += [threading.Thread(target=reader) for _ in range(5)]
  for t in threads: t.start()
  for t in threads: t.join()
  return errors

errors = test_rwlock()
print(f'Errors: {len(errors)}')
print(f'Cache size: {len(cache)}')
Expected Output
Errors: 0
Cache size: 3
Hints

Hint 1: acquire_read: lock read_lock, increment read_count, if first reader acquire write_lock, unlock read_lock.

Hint 2: release_read: lock read_lock, decrement read_count, if last reader release write_lock, unlock read_lock.

Hint 3: acquire_write: acquire write_lock directly.


#6Async Race Condition — Missing awaitMedium
asyncioasync racemissing awaitnon-atomic

Fix an asyncio race condition by wrapping a check-then-act sequence in an asyncio.Lock.

Solution:

import asyncio

async_lock = asyncio.Lock()

async def withdraw_safe(amount, name):
global balance
async with async_lock:
if balance >= amount:
await asyncio.sleep(0.01)
balance -= amount
return f'{name}: withdrew {amount}'
return f'{name}: insufficient funds'
import asyncio

# In asyncio, race conditions happen between yield points (await).
# Operations between awaits are atomic — no other coroutine can run.

balance = 1000

async def withdraw_unsafe(amount, name):
  global balance
  if balance >= amount:
      await asyncio.sleep(0.01)  # Yield point — another coroutine can run here!
      balance -= amount
      return f'{name}: withdrew {amount}'
  return f'{name}: insufficient funds'

# This is vulnerable: both tasks check balance, both see 1000, both withdraw.
async def test_race():
  global balance
  balance = 1000
  results = await asyncio.gather(
      withdraw_unsafe(800, 'Alice'),
      withdraw_unsafe(800, 'Bob'),
  )
  return results, balance

# TODO: Fix withdraw_unsafe to be race-free using asyncio.Lock

async_lock = asyncio.Lock()

async def withdraw_safe(amount, name):
  pass

async def test_safe():
  global balance
  balance = 1000
  results = await asyncio.gather(
      withdraw_safe(800, 'Alice'),
      withdraw_safe(800, 'Bob'),
  )
  return results, balance

results, final = asyncio.run(test_safe())
print(f'Results: {sorted(results)}')
print(f'Final balance: {final}')
Expected Output
Results: ['Alice: withdrew 800', 'Bob: insufficient funds']
Final balance: 200
Hints

Hint 1: asyncio.Lock() is the async equivalent of threading.Lock().

Hint 2: async with async_lock: wraps the check-and-act atomically.

Hint 3: Between the check and the deduction, no other coroutine can run.


#7Compound Operation Race on a DictMedium
dict setdefaultcompound operationthread-safe pattern

Fix a race condition in a shared dictionary word counter using a lock.

Solution:

import threading

word_counts = {}
lock = threading.Lock()

def count_word_safe(word):
with lock:
word_counts[word] = word_counts.get(word, 0) + 1
import threading

word_counts = {}

def count_word_unsafe(word):
  if word not in word_counts:
      word_counts[word] = 0
  word_counts[word] += 1

# TODO: Fix count_word to be thread-safe.
# Use dict.setdefault() or a lock — or both.

lock = threading.Lock()

def count_word_safe(word):
  pass

def test_safe_count():
  word_counts.clear()
  words = ['python'] * 500 + ['java'] * 300 + ['rust'] * 200
  import random
  random.shuffle(words)

  threads = [threading.Thread(target=count_word_safe, args=(w,)) for w in words]
  for t in threads: t.start()
  for t in threads: t.join()

  return dict(word_counts)

result = test_safe_count()
print(f'python: {result.get("python")}')
print(f'java: {result.get("java")}')
print(f'rust: {result.get("rust")}')
Expected Output
python: 500
java: 300
rust: 200
Hints

Hint 1: The check-then-set (if word not in dict: dict[word] = 0; dict[word] += 1) is a race.

Hint 2: Solution 1: wrap with a lock. Solution 2: use collections.Counter.

Hint 3: dict.setdefault(key, 0) is atomic under the GIL but += is still a separate operation.


#8File-System Race — Exclusive File CreationMedium
file raceTOCTOUO_EXCLexclusive create

Use O_CREAT | O_EXCL flags to atomically create a file, eliminating the TOCTOU race in file creation.

Solution:

import os

def safe_create_file(path):
try:
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
os.write(fd, b'created')
os.close(fd)
return 'created'
except FileExistsError:
return 'exists'
import os
import tempfile
import threading

# Race: check-then-create leaves a window where two threads both
# "see" the file doesn't exist and both try to create it.

def unsafe_create_file(path):
  if not os.path.exists(path):
      with open(path, 'w') as f:
          f.write('created')
  else:
      return 'exists'

# TODO: Implement safe_create_file using O_CREAT | O_EXCL flags
# which create the file atomically (fails if it already exists).
# Return 'created' on success, 'exists' on failure.

def safe_create_file(path):
  pass

def test_exclusive_create():
  tmpdir = tempfile.mkdtemp()
  path = os.path.join(tmpdir, 'test.lock')
  results = []
  lock = threading.Lock()

  def worker():
      result = safe_create_file(path)
      with lock:
          results.append(result)

  threads = [threading.Thread(target=worker) for _ in range(10)]
  for t in threads: t.start()
  for t in threads: t.join()

  created = results.count('created')
  exists = results.count('exists')
  os.unlink(path)
  os.rmdir(tmpdir)
  return created, exists

created, exists = test_exclusive_create()
print(f'Created: {created} (should be 1)')
print(f'Already existed: {exists} (should be 9)')
Expected Output
Created: 1 (should be 1)
Already existed: 9 (should be 9)
Hints

Hint 1: Use os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644).

Hint 2: O_EXCL combined with O_CREAT makes the operation atomic — raises FileExistsError if file exists.

Hint 3: Catch FileExistsError and return 'exists'.


#9Detecting Races with a CanaryHard
race detectorcanarytesting concurrency

Use a canary-based race detector to empirically demonstrate a race condition and verify the fix.

Solution:

def safe_critical_section(detector, lock, n):
for _ in range(n):
with lock:
detector.enter()
detector.exit()
import threading
import time
import random

# A "canary" detects races: set a flag inside a critical section.
# If two threads enter simultaneously, the canary catches it.

class RaceDetector:
  def __init__(self):
      self.inside = False
      self.violations = 0

  def enter(self):
      if self.inside:
          self.violations += 1
      self.inside = True
      time.sleep(random.uniform(0, 0.001))  # Widen the race window

  def exit(self):
      self.inside = False

# TODO: Write an unsafe function using the detector,
# then write a safe version. Verify the unsafe version has violations
# and the safe version has zero.

def unsafe_critical_section(detector, n):
  for _ in range(n):
      detector.enter()
      # critical work here
      detector.exit()

def safe_critical_section(detector, lock, n):
  pass

def test_unsafe():
  detector = RaceDetector()
  threads = [threading.Thread(target=unsafe_critical_section, args=(detector, 20)) for _ in range(5)]
  for t in threads: t.start()
  for t in threads: t.join()
  return detector.violations

def test_safe():
  detector = RaceDetector()
  lock = threading.Lock()
  threads = [threading.Thread(target=safe_critical_section, args=(detector, lock, 20)) for _ in range(5)]
  for t in threads: t.start()
  for t in threads: t.join()
  return detector.violations

unsafe_v = test_unsafe()
safe_v = test_safe()
print(f'Unsafe violations: {unsafe_v} (expected > 0)')
print(f'Safe violations: {safe_v} (expected 0)')
Expected Output
Unsafe violations: X (expected > 0)
Safe violations: 0 (expected 0)
Hints

Hint 1: safe_critical_section: acquire lock, call detector.enter(), do work, detector.exit(), release lock.

Hint 2: The lock ensures only one thread is inside the critical section at a time.

Hint 3: The canary (detector.inside flag) catches overlapping entries.


#10ABA Problem SimulationHard
ABA problemcompare-and-swaplock-free

Demonstrate the ABA problem and fix it using a stamped atomic reference with a version counter.

Solution:

The StampedRef class above is the solution — it uses a stamp (version counter) alongside the value so that an A→B→A sequence is detected because the stamp will be 2, not 0.

# The key insight: StampedRef.compare_and_set requires BOTH
# the expected value AND the expected stamp to match.
# After thread2 does A->B->A, the stamp is 2.
# Thread1 has stamp=0 from before the changes, so CAS fails.
import threading
import time

# The ABA problem: a value changes from A to B then back to A.
# A compare-and-swap (CAS) sees A and incorrectly thinks nothing changed.

class AtomicRef:
  def __init__(self, value):
      self._value = value
      self._lock = threading.Lock()

  def get(self):
      return self._value

  def compare_and_set(self, expected, new_value):
      with self._lock:
          if self._value == expected:
              self._value = new_value
              return True
          return False

# TODO: Simulate the ABA problem:
# Thread 1 reads value A, gets preempted.
# Thread 2 changes A -> B -> A.
# Thread 1 resumes, sees A, CAS succeeds -- but the world changed.
# Demonstrate and then fix using a version counter (stamped reference).

class StampedRef:
  def __init__(self, value):
      self._value = value
      self._stamp = 0
      self._lock = threading.Lock()

  def get(self):
      with self._lock:
          return self._value, self._stamp

  def compare_and_set(self, expected_value, expected_stamp, new_value):
      with self._lock:
          if self._value == expected_value and self._stamp == expected_stamp:
              self._value = new_value
              self._stamp += 1
              return True
          return False

def demonstrate_aba():
  ref = AtomicRef('A')
  results = []

  def thread1():
      old = ref.get()  # sees 'A'
      time.sleep(0.05)  # gets preempted
      # After sleep, world changed A->B->A, but CAS still succeeds!
      success = ref.compare_and_set(old, 'C')
      results.append(('thread1', success))

  def thread2():
      time.sleep(0.01)
      ref.compare_and_set('A', 'B')  # A -> B
      ref.compare_and_set('B', 'A')  # B -> A (ABA!)
      results.append(('thread2', 'done'))

  t1 = threading.Thread(target=thread1)
  t2 = threading.Thread(target=thread2)
  t1.start(); t2.start()
  t1.join(); t2.join()
  return results, ref.get()

results, final = demonstrate_aba()
print('ABA demonstration:', results)
print('Final value:', final)  # Thread1's CAS succeeded despite the ABA!

# Now demonstrate StampedRef prevents this
def demonstrate_stamped():
  ref = StampedRef('A')
  results = []

  def thread1():
      val, stamp = ref.get()  # sees ('A', 0)
      time.sleep(0.05)
      success = ref.compare_and_set(val, stamp, 'C')
      results.append(('thread1', success))

  def thread2():
      time.sleep(0.01)
      v, s = ref.get()
      ref.compare_and_set(v, s, 'B')
      v, s = ref.get()
      ref.compare_and_set(v, s, 'A')
      results.append(('thread2', 'done'))

  t1 = threading.Thread(target=thread1)
  t2 = threading.Thread(target=thread2)
  t1.start(); t2.start()
  t1.join(); t2.join()
  val, stamp = ref.get()
  return results, val, stamp

results2, val, stamp = demonstrate_stamped()
print('Stamped ref:', results2)
print('Thread1 CAS succeeded:', results2[0][1] if results2 else None)
Expected Output
ABA demonstration: [('thread2', 'done'), ('thread1', True)]
Final value: C
Stamped ref: [('thread2', 'done'), ('thread1', False)]
Thread1 CAS succeeded: False
Hints

Hint 1: The ABA problem: value is A, changes to B, changes back to A — CAS sees A and succeeds incorrectly.

Hint 2: StampedRef adds a monotonically increasing stamp — even if value returns to A, the stamp differs.

Hint 3: CAS with stamp: both value AND stamp must match for the operation to succeed.


#11Memory Visibility and Memory OrderingHard
memory visibilityGILmemory orderinghappens-before

Implement safe data publishing with a lock to ensure full visibility of published data to readers.

Solution:

import copy

class DataPublisher:
def __init__(self):
self._data = None
self._ready = False
self._lock = threading.Lock()

def publish(self, data):
with self._lock:
self._data = data
self._ready = True

def read_if_ready(self):
with self._lock:
if self._ready:
return copy.copy(self._data)
return None
import threading
import time

# In Python, the GIL prevents true CPU-level races, but logical races remain.
# This problem explores happens-before relationships and visibility.

# The "published before read" pattern: a writer publishes data,
# a reader must see the latest published version.

class DataPublisher:
  def __init__(self):
      self._data = None
      self._ready = False
      self._lock = threading.Lock()

  def publish(self, data):
      pass

  def read_if_ready(self):
      pass

def test_publisher():
  pub = DataPublisher()
  read_results = []
  errors = []

  def writer():
      for i in range(10):
          pub.publish({'iteration': i, 'value': i * i})
          time.sleep(0.01)

  def reader():
      for _ in range(30):
          result = pub.read_if_ready()
          if result is not None:
              read_results.append(result)
          time.sleep(0.005)

  wt = threading.Thread(target=writer)
  rt = threading.Thread(target=reader)
  wt.start(); rt.start()
  wt.join(); rt.join()

  return len(read_results) > 0, all(isinstance(r, dict) for r in read_results)

has_results, all_valid = test_publisher()
print(f'Got results: {has_results}')
print(f'All valid dicts: {all_valid}')
Expected Output
Got results: True
All valid dicts: True
Hints

Hint 1: publish(): acquire lock, set self._data, set self._ready = True, release lock.

Hint 2: read_if_ready(): acquire lock, check _ready, return copy of _data if ready, release lock.

Hint 3: The lock ensures visibility: the reader always sees a fully-constructed data dict.

© 2026 EngineersOfAI. All rights reserved.