Skip to main content

Race Conditions and Thread Safety

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, predict the output:

import dis

x = 0

def increment():
global x
x += 1

dis.dis(increment)

Output:

4 0 LOAD_GLOBAL 0 (x)
2 LOAD_CONST 1 (1)
4 BINARY_OP 0 (+)
6 STORE_GLOBAL 0 (x)
8 RETURN_CONST 0 (None)

x += 1 is not one instruction. It is four bytecode operations: load the current value, load the constant 1, add them, store the result. The GIL can release between any two of these. Thread 1 can load x = 500, lose the GIL to Thread 2, which also loads x = 500, increments to 501, and stores it. Thread 1 then resumes with its stale 500, computes 501, and overwrites Thread 2's update. One increment is silently lost.

Most engineers see "GIL" and assume thread safety. This lesson explains precisely why that assumption is wrong - and what you must do about it.

What You Will Learn

  • What a race condition is - interleaved execution, non-deterministic outcomes, why they are hard to reproduce
  • What operations ARE atomic in CPython and which are NOT
  • Check-then-act races - the bank balance and file existence patterns
  • Read-modify-write races - the counter example with measured non-determinism
  • How to detect race conditions - stress testing, threading.settrace, careful review
  • Thread-safe data structures - queue.Queue, collections.deque, and their guarantees
  • Making code thread-safe - immutability, thread-local state, synchronization primitives
  • Async race conditions - yes, asyncio has them too - at await yield points

Prerequisites

  • Lesson 01 of this module: Threading - familiarity with threading.Thread
  • Lesson 04 of this module: The Event Loop - understanding of await as a yield point
  • Module 03, Lesson 03: Disassembly with dis - reading bytecode output

Part 1 - What a Race Condition Is

The Definition

A race condition occurs when the correctness of a program depends on the relative timing or ordering of two or more threads (or coroutines). If the threads happen to execute in a particular order, the program is correct. If they execute in a different order - which can happen at any time - the program produces wrong results.

The defining characteristics of race conditions:

  1. Non-deterministic - the bug does not happen every run; it depends on timing
  2. Hard to reproduce - it often disappears under a debugger (which slows execution and changes timing)
  3. Intermittent - may appear only under load, on certain hardware, or at specific times of day
  4. Silent - no exception is raised; the program continues with incorrect state
import threading
import time

# Shared mutable state - the source of the problem
balance = 1000

def withdraw(amount: int):
global balance
# This check-then-act is not atomic - a race lives between lines
if balance >= amount: # check: is there enough?
time.sleep(0.001) # simulate processing delay
balance -= amount # act: deduct the amount
print(f"Withdrew {amount}. Balance: {balance}")
else:
print(f"Insufficient funds for {amount}")

# Two threads both try to withdraw 800 from an account with 1000
t1 = threading.Thread(target=withdraw, args=(800,))
t2 = threading.Thread(target=withdraw, args=(800,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Final balance: {balance}")
# Possible outputs:
# Withdrew 800. Balance: 200
# Withdrew 800. Balance: -600 ← both passed the check, both deducted
# Final balance: -600

Both threads pass the if balance >= amount check (both read 1000). Both proceed to deduct 800. The final balance is -600 - a classic race condition. In banking systems this is called a double-spend.

Why Race Conditions Are Hard to Reproduce

The race only manifests when Thread 1 is preempted between the if check and the -= operation. This depends on:

  • OS scheduler decisions (non-deterministic)
  • CPU load at the time of execution
  • Number of CPUs and their current cache state
  • Python's GIL switch interval (5ms by default)

Under a debugger, the timing changes, and the race disappears. Under a test suite, it might appear 1 in 10,000 runs. In production under load, it appears every minute.

Part 2 - The GIL and Atomicity

What the GIL Guarantees

The GIL (Global Interpreter Lock) ensures that only one thread executes Python bytecodes at any given instant. It protects CPython's internal reference counts and memory allocator. It does NOT make your application logic atomic.

The GIL releases:

  • Every 5ms (the sys.getswitchinterval() check interval)
  • During any I/O operation (socket.recv, file.read, time.sleep)
  • When a C extension explicitly releases it

This means the GIL can release between ANY two bytecode instructions, including between the LOAD and STORE of a single Python statement.

Operations That ARE Atomic in CPython

These operations happen in a single C-level step and are safe under the GIL:

# ATOMIC (single C-level operation, GIL held throughout):
my_list.append(item) # list_append in C
my_dict[key] = value # dict_setitem in C - single operation
x = some_object # simple name binding (STORE_NAME)
i = 0 # LOAD_CONST + STORE_NAME - fast enough to be safe in practice

# Reading a simple variable (LOAD_GLOBAL, LOAD_NAME)
current = my_list[:] # list copy - atomic in CPython (but not guaranteed by spec)
danger

The atomicity of list.append and dict[key] = value is a CPython implementation detail, not a Python language guarantee. Code relying on this is fragile. It may not be true on PyPy, Jython, or future CPython builds. Always use explicit synchronization for shared mutable state. Relying on the GIL for thread safety is a common and serious mistake - the GIL protects CPython internals, not your application logic.

Operations That Are NOT Atomic

# NOT ATOMIC (multiple bytecodes, GIL can release between them):
x += 1 # LOAD_GLOBAL + LOAD_CONST + BINARY_OP + STORE_GLOBAL (4 instructions)
d[key] += 1 # LOAD + BINARY_SUBSCR + LOAD_CONST + BINARY_OP + STORE_SUBSCR (5+)
if key in d: # check
d[key] = val # ...then act - not atomic as a unit

my_list[0] = my_list[0] + 1 # read + compute + write - 3 separate operations

Let us verify with dis:

import dis

def not_atomic_dict_update(d, key):
d[key] += 1

dis.dis(not_atomic_dict_update)
2 0 LOAD_FAST 0 (d)
2 LOAD_FAST 1 (key)
4 COPY 2
6 COPY 2
8 BINARY_SUBSCR # read d[key]
10 LOAD_CONST 1 (1)
12 BINARY_OP 0 (+) # add 1
14 STORE_SUBSCR # write d[key] back
16 RETURN_CONST 0 (None)

Five separate bytecodes. The GIL can release after any of them. Two threads executing this on the same dict key can both read the same value, both add 1, and both write the same result - one increment is lost.

Part 3 - Check-Then-Act Race Conditions

The Classic Pattern

A check-then-act race occurs when a thread reads state to make a decision, then acts on that decision - but the state can change between the read and the act.

import threading
import time

# Classic bank balance race
class BankAccount:
def __init__(self, balance: float):
self.balance = balance

def withdraw(self, amount: float) -> bool:
# CHECK: is there enough?
if self.balance >= amount:
time.sleep(0.001) # simulate network/DB call
# ACT: deduct
self.balance -= amount
return True
return False


account = BankAccount(1000.0)

results = []

def try_withdraw(amount):
success = account.withdraw(amount)
results.append((amount, success, account.balance))

# Two concurrent withdrawals of 800 from a 1000-balance account
t1 = threading.Thread(target=try_withdraw, args=(800.0,))
t2 = threading.Thread(target=try_withdraw, args=(800.0,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Final balance: {account.balance}") # May be -600.0

The File Existence Race (TOCTOU)

A time-of-check to time-of-use (TOCTOU) race is the file system variant:

import os
import threading

def unsafe_write(filepath: str, content: str):
# CHECK: does file exist?
if not os.path.exists(filepath): # Thread A passes this check
time.sleep(0.001) # Thread B also passes this check
# ACT: create and write file
with open(filepath, "w") as f: # Thread A creates file
f.write(content) # Thread B also creates/overwrites it

# Both threads think they are the first - one overwrites the other
t1 = threading.Thread(target=unsafe_write, args=("/tmp/output.txt", "from thread 1"))
t2 = threading.Thread(target=unsafe_write, args=("/tmp/output.txt", "from thread 2"))
t1.start(); t2.start()
t1.join(); t2.join()
# File content is non-deterministic - "from thread 1" or "from thread 2"

# SAFE version: use O_EXCL flag (atomic at the OS level)
import os

def safe_write(filepath: str, content: str) -> bool:
try:
# O_CREAT | O_EXCL: create file, fail if it already exists - atomic OS call
fd = os.open(filepath, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
with os.fdopen(fd, "w") as f:
f.write(content)
return True
except FileExistsError:
return False # another thread got there first

The Correct Fix: Make Check and Act Atomic

import threading

class BankAccount:
def __init__(self, balance: float):
self.balance = balance
self._lock = threading.Lock()

def withdraw(self, amount: float) -> bool:
with self._lock: # lock covers BOTH check and act
if self.balance >= amount:
self.balance -= amount
return True
return False

def deposit(self, amount: float) -> None:
with self._lock:
self.balance += amount

The lock ensures that check and act are atomic as a unit - no other thread can read or modify balance between the two steps.

Part 4 - Read-Modify-Write Races

The Counter Example: Measuring Non-Determinism

The classic read-modify-write race produces measurably wrong results:

import threading

counter = 0

def increment_unsafe(n: int) -> None:
global counter
for _ in range(n):
counter += 1 # 4 bytecodes - race lives here

def run_race(n_threads: int = 2, n_increments: int = 100_000) -> int:
global counter
counter = 0

threads = [
threading.Thread(target=increment_unsafe, args=(n_increments,))
for _ in range(n_threads)
]
for t in threads: t.start()
for t in threads: t.join()

return counter

# Run the race multiple times - result varies each run
expected = 2 * 100_000 # 200,000
for i in range(5):
result = run_race()
lost = expected - result
print(f"Run {i+1}: got {result:,} (lost {lost:,} increments)")

# Typical output:
# Run 1: got 147,832 (lost 52,168 increments)
# Run 2: got 163,091 (lost 36,909 increments)
# Run 3: got 155,441 (lost 44,559 increments)
# Run 4: got 172,008 (lost 27,992 increments)
# Run 5: got 159,233 (lost 40,767 increments)

Each run loses a different number of increments. The loss is non-deterministic - it depends on exactly when the GIL releases during the 200,000 increment operations.

The Race Illustrated: Two Threads Interleaving on x += 1

Fixing Read-Modify-Write Races

import threading
from threading import Lock

# Fix 1: Use a Lock
counter = 0
counter_lock = Lock()

def increment_safe_lock(n: int) -> None:
global counter
for _ in range(n):
with counter_lock:
counter += 1 # only one thread at a time

# Fix 2: Use threading.local for per-thread state (avoid sharing)
local_data = threading.local()

def increment_local(n: int) -> None:
if not hasattr(local_data, "counter"):
local_data.counter = 0
for _ in range(n):
local_data.counter += 1 # each thread has its own counter - no sharing

# Fix 3: Use immutable values and aggregate at the end
def increment_with_return(n: int) -> int:
"""Return the count rather than mutating shared state."""
count = 0
for _ in range(n):
count += 1
return count # combine results in the caller after all threads finish

# Using Fix 1 - verify correctness
counter = 0
threads = [threading.Thread(target=increment_safe_lock, args=(100_000,)) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # always 200,000

Part 5 - Detecting Race Conditions

Stress Testing: Make the Race Appear

The most reliable way to expose race conditions is to run the concurrent code many times with high contention:

import threading
import time
from typing import Callable

def stress_test(
setup: Callable,
worker: Callable,
n_threads: int = 50,
n_iterations: int = 1000,
n_runs: int = 20,
) -> list:
"""
Run worker in n_threads threads simultaneously, n_runs times.
Returns list of results - look for non-deterministic variation.
"""
results = []

for run in range(n_runs):
state = setup()
barrier = threading.Barrier(n_threads) # sync all threads to start simultaneously

def synchronized_worker():
barrier.wait() # all threads start at exactly the same moment
worker(state, n_iterations)

threads = [threading.Thread(target=synchronized_worker) for _ in range(n_threads)]
for t in threads: t.start()
for t in threads: t.join()

results.append(state)

return results


# Test the bank account withdrawal race
def account_setup():
return {"balance": 1000, "withdrawals": 0}

def account_worker(state: dict, n: int):
for _ in range(n):
if state["balance"] > 10:
state["balance"] -= 10
state["withdrawals"] += 1

results = stress_test(account_setup, account_worker, n_threads=20, n_iterations=10, n_runs=10)

# If race-free: balance should always equal 1000 - (withdrawals * 10)
for i, state in enumerate(results):
expected_balance = 1000 - (state["withdrawals"] * 10)
ok = state["balance"] == expected_balance
print(f"Run {i+1}: balance={state['balance']}, withdrawals={state['withdrawals']}, consistent={ok}")

Using threading.Barrier for Deterministic Race Triggering

threading.Barrier lets all threads start executing at exactly the same instant, maximizing the chance of interleaving:

import threading

def demonstrate_race_with_barrier():
shared = {"value": 0}
n_threads = 100
barrier = threading.Barrier(n_threads)

def worker():
barrier.wait() # all 100 threads released simultaneously
shared["value"] += 1 # maximum contention

threads = [threading.Thread(target=worker) for _ in range(n_threads)]
for t in threads: t.start()
for t in threads: t.join()

return shared["value"]

# Run 10 times - value varies, proving the race
for _ in range(10):
result = demonstrate_race_with_barrier()
print(f"Result: {result} (expected {100})")

# Typical output:
# Result: 97 (expected 100)
# Result: 95 (expected 100)
# Result: 98 (expected 100)

Using threading.settrace for Race Hunting

threading.settrace installs a trace function on every new thread. You can use it to log every bytecode step and replay the execution order - but this is primarily a research tool, not a production debugger:

import threading
import sys

execution_log = []

def trace_func(frame, event, arg):
if event == "line":
thread_name = threading.current_thread().name
lineno = frame.f_lineno
execution_log.append(f"{thread_name}: line {lineno}")
return trace_func

# Install on all threads (very slow - only for debugging)
threading.settrace(trace_func)

shared = {"x": 0}

def worker():
shared["x"] += 1

t1 = threading.Thread(target=worker, name="T1")
t2 = threading.Thread(target=worker, name="T2")
t1.start(); t2.start()
t1.join(); t2.join()

threading.settrace(None) # remove trace

for entry in execution_log[:20]:
print(entry)
note

threading.settrace is extremely slow - it runs a Python function on every bytecode instruction. Use it only during debugging in a controlled environment, never in production. For production race detection, use stress testing with barriers (shown above) or purpose-built tools like ThreadSanitizer via Cython or C extensions.

Part 6 - Thread-Safe Data Structures

queue.Queue: Designed for Concurrency

queue.Queue is the correct tool for passing data between threads. It is fully thread-safe because all internal locking is handled by the implementation:

import queue
import threading
import time

# queue.Queue is thread-safe - use it instead of a shared list
task_queue = queue.Queue(maxsize=100) # bounded queue - blocks producers when full
result_queue = queue.Queue()

def producer(n_items: int):
for i in range(n_items):
task_queue.put(i) # blocks if queue is full (maxsize reached)
time.sleep(0.001)
task_queue.put(None) # sentinel: signal consumers to stop

def consumer(consumer_id: int):
while True:
item = task_queue.get() # blocks if queue is empty - no busy waiting
if item is None:
task_queue.put(None) # pass sentinel to next consumer
break
result = item * item
result_queue.put((consumer_id, item, result))
task_queue.task_done() # signal that item processing is complete

n_consumers = 3
threads = [threading.Thread(target=consumer, args=(i,)) for i in range(n_consumers)]
producer_thread = threading.Thread(target=producer, args=(10,))

for t in threads: t.start()
producer_thread.start()
producer_thread.join()
for t in threads: t.join()

results = []
while not result_queue.empty():
results.append(result_queue.get())
print(sorted(results))

Why queue.Queue Is Thread-Safe

queue.Queue uses threading.Lock internally for all mutations plus threading.Condition for the blocking get() and put() operations. The lock is held for the entire duration of each put() or get() call - the check and the mutation are atomic as a unit.

# queue.Queue internal structure (simplified):
class Queue:
def __init__(self):
self._queue = []
self._mutex = threading.Lock()
self._not_empty = threading.Condition(self._mutex)
self._not_full = threading.Condition(self._mutex)

def put(self, item):
with self._not_full:
# Lock held here - check AND append are atomic
self._queue.append(item)
self._not_empty.notify()

def get(self):
with self._not_empty:
# Lock held here - check AND pop are atomic
while not self._queue:
self._not_empty.wait() # releases lock and waits
return self._queue.pop(0)

collections.deque: Atomic Append and Popleft

collections.deque has thread-safe append (right end) and popleft (left end) operations in CPython. These are atomic because they are single C-level operations:

from collections import deque
import threading

# Safe: append and popleft are individually atomic in CPython
shared_deque = deque()

def producer():
for i in range(1000):
shared_deque.append(i) # atomic - safe without a lock

def consumer():
results = []
while True:
try:
item = shared_deque.popleft() # atomic - safe without a lock
results.append(item)
except IndexError:
break # deque is empty
return results

Important caveat: while individual append and popleft are atomic, combinations are not:

# NOT safe - check and pop are not atomic as a unit
if len(shared_deque) > 0: # check
item = shared_deque.popleft() # pop - deque could be empty by now

# SAFE - use try/except instead
try:
item = shared_deque.popleft() # atomic single operation
except IndexError:
pass # empty - handled cleanly

Part 7 - Making Code Thread-Safe

Strategy 1: Immutability - Avoid Shared Mutable State

The safest code has no shared mutable state. If threads only read data or work on their own copies, there are no races:

from dataclasses import dataclass, field
from typing import FrozenSet
import threading

# Immutable configuration - safe to share between threads
@dataclass(frozen=True)
class Config:
host: str
port: int
max_connections: int
allowed_paths: FrozenSet[str] = field(default_factory=frozenset)

config = Config(host="localhost", port=8080, max_connections=100,
allowed_paths=frozenset(["/api", "/health"]))

# Multiple threads can read config simultaneously - no locking needed
def worker(worker_id: int):
# Each thread has its own local state
local_counter = 0 # not shared
for path in config.allowed_paths:
local_counter += 1
return local_counter

threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

Strategy 2: Thread-Local State

threading.local() creates an object where each thread sees its own copy of every attribute. No sharing, no races:

import threading

# Each thread gets its own instance of thread_local
thread_local = threading.local()

def process_request(request_id: int):
# Thread-local: only visible to this thread
thread_local.request_id = request_id
thread_local.start_time = time.time()
thread_local.errors = []

# ... do work ...

# Safe to read - no other thread can modify our thread_local
duration = time.time() - thread_local.start_time
return {
"request_id": thread_local.request_id,
"duration": duration,
"errors": thread_local.errors,
}

threads = [threading.Thread(target=process_request, args=(i,)) for i in range(5)]
for t in threads: t.start()
for t in threads: t.join()
# Each thread's thread_local.request_id is independent - no races

Strategy 3: Synchronization Primitives (Preview)

When shared mutable state is unavoidable, use synchronization primitives. This is covered in depth in Lesson 06, but the core pattern:

import threading

class ThreadSafeCache:
"""A thread-safe in-memory cache using a read-write lock pattern."""

def __init__(self):
self._data: dict = {}
self._lock = threading.RLock() # reentrant lock - safe for recursive locking

def get(self, key: str, default=None):
with self._lock:
return self._data.get(key, default)

def set(self, key: str, value) -> None:
with self._lock:
self._data[key] = value

def get_or_set(self, key: str, factory) -> object:
"""Atomic check-then-set - the whole operation is locked."""
with self._lock:
if key not in self._data:
self._data[key] = factory()
return self._data[key]

def delete(self, key: str) -> bool:
with self._lock:
if key in self._data:
del self._data[key]
return True
return False

Part 8 - Async Race Conditions

Yes, asyncio Has Race Conditions Too

asyncio is single-threaded but it still has concurrency. When a coroutine awaits, it yields control to the event loop, which can then run other coroutines. If two coroutines share mutable state and a race exists between their await expressions, the result is a race condition - just without OS-level thread preemption.

import asyncio

# Shared async cache - looks safe but has a race
shared_cache: dict = {}

async def get_user_unsafe(user_id: int) -> dict:
# Race: two coroutines can both pass this check before either sets the value
if user_id not in shared_cache:
# YIELD POINT: event loop can run another coroutine here
user_data = await fetch_from_database(user_id)
# Both coroutines may now try to set the same key
shared_cache[user_id] = user_data

return shared_cache[user_id]

async def fetch_from_database(user_id: int) -> dict:
await asyncio.sleep(0.1) # simulates database latency
return {"id": user_id, "name": f"User {user_id}"}

Demonstrating the Async Race

import asyncio

fetch_count = 0

async def fetch_from_db(user_id: int) -> dict:
global fetch_count
fetch_count += 1 # count how many times we hit the "database"
await asyncio.sleep(0.05)
return {"id": user_id}

cache: dict = {}

async def get_user_unsafe(user_id: int) -> dict:
if user_id not in cache:
# yield point - another coroutine can run before we set the cache
data = await fetch_from_db(user_id)
cache[user_id] = data
return cache[user_id]

async def main():
global fetch_count
fetch_count = 0
cache.clear()

# 10 coroutines all request user 42 simultaneously
results = await asyncio.gather(*[get_user_unsafe(42) for _ in range(10)])

print(f"Cache hits expected: 9, Database fetches: {fetch_count}")
# Without fix: Database fetches: 10 (all 10 miss the cache and fetch)
# The race: all 10 coroutines pass `if user_id not in cache` before any sets it

asyncio.run(main())

The Async Race: Timeline

Fixing Async Race Conditions

import asyncio
from typing import Any

class AsyncCache:
"""Thread-safe (coroutine-safe) cache with deduplication of in-flight requests."""

def __init__(self):
self._cache: dict = {}
self._in_flight: dict[Any, asyncio.Future] = {}

async def get_or_fetch(self, key, fetch_coro):
"""
Return cached value if available.
If a fetch is already in flight for this key, await that - do not start another.
If nothing is in flight, start a fetch and cache the result.
"""
# Return immediately if cached
if key in self._cache:
return self._cache[key]

# Another coroutine is already fetching this key - piggyback on it
if key in self._in_flight:
return await self._in_flight[key]

# We are the first - create a Future that others can await
loop = asyncio.get_running_loop()
future: asyncio.Future = loop.create_future()
self._in_flight[key] = future

try:
result = await fetch_coro
self._cache[key] = result
future.set_result(result)
return result
except Exception as exc:
future.set_exception(exc)
raise
finally:
# Remove from in-flight regardless of success or failure
self._in_flight.pop(key, None)


# Demo
async def fetch_from_db(user_id: int) -> dict:
global fetch_count
fetch_count += 1
await asyncio.sleep(0.05)
return {"id": user_id}


fetch_count = 0
safe_cache = AsyncCache()

async def get_user_safe(user_id: int) -> dict:
return await safe_cache.get_or_fetch(
user_id,
fetch_from_db(user_id),
)

async def main():
global fetch_count
fetch_count = 0

results = await asyncio.gather(*[get_user_safe(42) for _ in range(10)])
print(f"Database fetches: {fetch_count}") # 1 - exactly one fetch
print(f"Results consistent: {len(set(str(r) for r in results)) == 1}") # True

asyncio.run(main())

Full Example - Demonstrating and Fixing a Race Condition in a Shared Cache

A complete example showing the race, measuring its impact, and applying a fix:

"""
shared_cache_race.py

Demonstrates a classic race condition in a shared cache,
measures how often it occurs, and shows the fixed version.
"""
import threading
import time
import random
from threading import Lock


class RacyCache:
"""
A cache with a race condition in get_or_compute.

Race: two threads can both see a cache miss and both compute the value,
wasting work and potentially causing correctness issues if compute has
side effects (like writing to a database or sending a notification).
"""

def __init__(self):
self._data: dict = {}
self._compute_count = 0

def get_or_compute(self, key: str, compute_fn) -> object:
if key not in self._data: # CHECK: cache miss?
time.sleep(0.001) # simulate compute latency
value = compute_fn(key)
self._compute_count += 1
self._data[key] = value # ACT: store result
return self._data[key]

@property
def compute_count(self) -> int:
return self._compute_count


class SafeCache:
"""
Thread-safe version: lock covers the entire check-then-compute-then-store.
"""

def __init__(self):
self._data: dict = {}
self._lock = Lock()
self._compute_count = 0

def get_or_compute(self, key: str, compute_fn) -> object:
# Fast path: check without lock (read-only check is safe)
if key in self._data:
return self._data[key]

# Slow path: full lock for check-then-store
with self._lock:
# Re-check after acquiring lock (double-checked locking pattern)
if key not in self._data:
value = compute_fn(key)
self._compute_count += 1
self._data[key] = value
return self._data[key]

@property
def compute_count(self) -> int:
return self._compute_count


def simulate_workload(cache, n_threads: int = 20, keys: list = None):
"""
Simulate n_threads threads all requesting the same keys simultaneously.
Returns compute_count - ideally equals len(keys) (one compute per unique key).
"""
if keys is None:
keys = ["user:1", "user:2", "user:3"]

def compute(key: str) -> str:
return f"value_for_{key}_{random.randint(1000, 9999)}"

barrier = threading.Barrier(n_threads)
results = []

def worker():
barrier.wait() # all threads start simultaneously
for key in keys:
value = cache.get_or_compute(key, compute)
results.append((key, value))

threads = [threading.Thread(target=worker) for _ in range(n_threads)]
for t in threads: t.start()
for t in threads: t.join()

return results, cache.compute_count


# Run the race
print("=== Racy Cache ===")
for trial in range(5):
cache = RacyCache()
results, computes = simulate_workload(cache, n_threads=30, keys=["k1", "k2", "k3"])
expected = 3 # 3 unique keys, should compute each exactly once
print(f"Trial {trial+1}: computed {computes} times (expected {expected}) "
f"{'OK' if computes == expected else 'RACE DETECTED'}")

print()

print("=== Safe Cache ===")
for trial in range(5):
cache = SafeCache()
results, computes = simulate_workload(cache, n_threads=30, keys=["k1", "k2", "k3"])
expected = 3
print(f"Trial {trial+1}: computed {computes} times (expected {expected}) "
f"{'OK' if computes == expected else 'BUG'}")
Expected Output
=== Racy Cache ===
Trial 1: computed 47 times (expected 3) RACE DETECTED
Trial 2: computed 38 times (expected 3) RACE DETECTED
Trial 3: computed 52 times (expected 3) RACE DETECTED
Trial 4: computed 41 times (expected 3) RACE DETECTED
Trial 5: computed 44 times (expected 3) RACE DETECTED

=== Safe Cache ===
Trial 1: computed 3 times (expected 3) OK
Trial 2: computed 3 times (expected 3) OK
Trial 3: computed 3 times (expected 3) OK
Trial 4: computed 3 times (expected 3) OK
Trial 5: computed 3 times (expected 3) OK

The racy cache computes each value dozens of times instead of once. The safe cache computes each value exactly once. The time.sleep(0.001) in the racy version amplifies the race by holding the check-to-store window open long enough for all 30 threads to miss the cache before any stores the result.

The safe cache uses the double-checked locking pattern: a fast lock-free read for the common case (cache hit), and a full lock for the slow path (cache miss). The second if key not in self._data check inside the lock handles the case where another thread stored the value between the first check and lock acquisition.

Graded Practice

Beginner - Identify the Race

For each code snippet below, identify whether a race condition exists and explain why.

Snippet A:

import threading

results = []

def worker(value):
results.append(value * value)

threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(sorted(results))

Snippet B:

import threading

seen = set()

def process(item):
if item not in seen:
seen.add(item)
do_work(item)

Snippet C:

import threading

total = 0
lock = threading.Lock()

def add(n):
global total
with lock:
total += n
Show Answer

Snippet A - No race condition (for CPython).

list.append() is a single C-level operation that is atomic under the GIL. Each append completes without interruption. The order of appends is non-deterministic, but sorted(results) always produces the correct sorted list. Note: this relies on CPython implementation detail - strictly speaking, you should use a lock or queue.Queue for portable code.

Snippet B - Race condition exists.

if item not in seen and seen.add(item) are two separate operations. Two threads can both pass the if item not in seen check before either calls seen.add(item). Both will then call do_work(item) - which might send a notification, write to a database, or charge a credit card twice.

Fix:

lock = threading.Lock()

def process(item):
with lock:
if item not in seen:
seen.add(item)
# do_work outside the lock if it is safe to do so
do_work(item) # or: move inside lock if do_work must be atomic with the check

Snippet C - No race condition.

The with lock: context manager ensures total += n is protected. Only one thread executes the += at a time. total will always have the correct final value. This is the correct pattern for shared numeric accumulators.

Intermediate - Fix the Cache Race

The following shared cache has a race condition in get_or_fetch. The function is called from multiple threads simultaneously. Find the exact race, explain the worst-case outcome, and fix it.

import threading
import time

class UserCache:
def __init__(self):
self._cache: dict[int, dict] = {}
self._fetch_count = 0

def get_or_fetch(self, user_id: int) -> dict:
if user_id in self._cache:
return self._cache[user_id]

# Simulate slow API call
time.sleep(0.01)
user = self._fetch_from_api(user_id)
self._cache[user_id] = user
return user

def _fetch_from_api(self, user_id: int) -> dict:
self._fetch_count += 1
return {"id": user_id, "name": f"User {user_id}"}

cache = UserCache()

# 50 threads all request user 1 at the same time
threads = [threading.Thread(target=cache.get_or_fetch, args=(1,)) for _ in range(50)]
for t in threads: t.start()
for t in threads: t.join()

print(f"API fetched {cache._fetch_count} times (expected 1)")
Show Answer

The race: All 50 threads simultaneously check if user_id in self._cache and get False (cache miss). All 50 then call self._fetch_from_api() - 50 duplicate API calls for the same user. The cache was supposed to prevent this.

There is a second race: self._fetch_count += 1 is itself a read-modify-write race - the fetch count itself may be inaccurate (though in this case, the bigger problem is the duplicate fetches).

Worst-case outcome: In a real system, this causes N API calls (one per thread) instead of 1. If the API has rate limits, the service is throttled. If _fetch_from_api has side effects (writes to a database, charges a customer), those effects happen N times.

Fixed version (double-checked locking):

import threading
import time

class UserCache:
def __init__(self):
self._cache: dict[int, dict] = {}
self._lock = threading.Lock()
self._fetch_count = 0

def get_or_fetch(self, user_id: int) -> dict:
# Fast path: no lock needed for read if key is present
# (dict reads are safe in CPython - only one value can exist at a key)
if user_id in self._cache:
return self._cache[user_id]

# Slow path: acquire lock, re-check, then fetch if still missing
with self._lock:
# Re-check after lock acquisition - another thread may have stored it
if user_id in self._cache:
return self._cache[user_id]

# Only one thread reaches here per user_id
user = self._fetch_from_api(user_id)
self._cache[user_id] = user
return user

def _fetch_from_api(self, user_id: int) -> dict:
self._fetch_count += 1 # safe - called only inside the lock
time.sleep(0.01)
return {"id": user_id, "name": f"User {user_id}"}


cache = UserCache()
threads = [threading.Thread(target=cache.get_or_fetch, args=(1,)) for _ in range(50)]
for t in threads: t.start()
for t in threads: t.join()
print(f"API fetched {cache._fetch_count} times (expected 1)") # always 1

The double-checked locking pattern provides both safety and performance: the fast path (cache hit) requires no lock, and the slow path (cache miss) uses a lock with a re-check inside to handle the race correctly.

Advanced - Detect and Fix an Async Race Condition

The following asyncio code has a race condition in get_config. Multiple coroutines call it simultaneously, but each one should only trigger one database fetch per unique key. Identify the exact yield point where the race occurs, then fix it using asyncio.Lock.

import asyncio

config_cache: dict[str, dict] = {}
db_fetch_count = 0

async def fetch_config_from_db(key: str) -> dict:
global db_fetch_count
db_fetch_count += 1
await asyncio.sleep(0.05) # simulate DB latency
return {"key": key, "value": f"config_{key}"}

async def get_config(key: str) -> dict:
if key not in config_cache:
# Is there a yield point here that creates a race?
config = await fetch_config_from_db(key)
config_cache[key] = config
return config_cache[key]

async def main():
global db_fetch_count
db_fetch_count = 0
config_cache.clear()

# 20 coroutines all request the same config key
results = await asyncio.gather(*[get_config("db_host") for _ in range(20)])
print(f"DB fetched {db_fetch_count} times (expected 1)")

asyncio.run(main())
Show Reference Solution

The race: The yield point is await fetch_config_from_db(key). When Coroutine 1 hits this await, it yields to the event loop. Coroutines 2 through 20 then run, all pass the if key not in config_cache check (the cache is still empty), and all also await fetch_config_from_db(key). When the fetches resolve, all 20 coroutines store the result. The DB is queried 20 times instead of 1.

Fixed version using asyncio.Lock plus in-flight tracking:

import asyncio

config_cache: dict[str, dict] = {}
in_flight: dict[str, asyncio.Future] = {}
config_lock = asyncio.Lock()
db_fetch_count = 0

async def fetch_config_from_db(key: str) -> dict:
global db_fetch_count
db_fetch_count += 1
await asyncio.sleep(0.05)
return {"key": key, "value": f"config_{key}"}

async def get_config(key: str) -> dict:
# Fast path: cache hit
if key in config_cache:
return config_cache[key]

# Check if another coroutine is already fetching this key
async with config_lock:
# Re-check after acquiring lock
if key in config_cache:
return config_cache[key]

if key in in_flight:
# Another coroutine started a fetch - wait for it
future = in_flight[key]
else:
# We are the first - register a future and start the fetch
loop = asyncio.get_running_loop()
future = loop.create_future()
in_flight[key] = future

# Only ONE coroutine actually fetches (the one that created the future)
# Others await the same future
if not future.done():
try:
result = await fetch_config_from_db(key)
config_cache[key] = result
future.set_result(result)
except Exception as exc:
future.set_exception(exc)
raise
finally:
in_flight.pop(key, None)

return await future

async def main():
global db_fetch_count
db_fetch_count = 0
config_cache.clear()
in_flight.clear()

results = await asyncio.gather(*[get_config("db_host") for _ in range(20)])
print(f"DB fetched {db_fetch_count} times (expected 1)")
print(f"All results consistent: {len(set(str(r) for r in results)) == 1}")

asyncio.run(main())
# DB fetched 1 times (expected 1)
# All results consistent: True

Key insight: asyncio race conditions occur at await points. Between if key not in config_cache and the next await, other coroutines can run and observe the same cache-miss state. The fix uses asyncio.Lock to make the check-and-register atomic across await points, then uses Future objects to let multiple coroutines wait on a single in-flight fetch.

Key Takeaways

  • A race condition occurs when the correctness of a program depends on the timing of two or more concurrent operations. Race conditions are non-deterministic - they appear inconsistently and disappear under debuggers.
  • x += 1 compiles to four bytecodes: LOAD, LOAD_CONST, BINARY_OP, STORE. The GIL can release between any two of them. Two threads can both load the same stale value and both store the same result, silently losing one update.
  • The GIL protects CPython internals (reference counts, memory allocator). It does NOT make your application logic atomic. counter += 1, check-then-act patterns, and any multi-step operation are all races without explicit locking.
  • list.append() and dict[key] = value are atomic in CPython as single C-level operations. Combinations (check-then-append, check-then-store) are not atomic. Relying on these as thread-safety guarantees is fragile.
  • The two main race condition patterns are: check-then-act (a thread reads state to decide what to do, but state can change before it acts) and read-modify-write (x += 1 style, where read and store are separate).
  • Use queue.Queue to pass data between threads - it is fully thread-safe by design. Use collections.deque.append and popleft for single-ended atomic operations, but not for combined check-and-pop patterns.
  • Stress testing with threading.Barrier (releasing all threads simultaneously) is the most reliable way to expose race conditions in development.
  • asyncio code has race conditions at await yield points. Between if key not in cache and await fetch(key), other coroutines can run and observe the same cache-miss, triggering duplicate work. Use asyncio.Lock and Future objects to prevent async races.
  • The three strategies for thread-safe code: immutability (no shared mutable state), thread-local state (threading.local()), and explicit synchronization (threading.Lock, asyncio.Lock, queue.Queue).

What's Next

Lesson 06 covers Locks and Semaphores - the full toolkit of Python synchronization primitives. You will learn the difference between Lock, RLock, Condition, Event, Semaphore, and BoundedSemaphore; how to avoid deadlocks by establishing lock acquisition ordering; and the correct patterns for producer-consumer, throttling, and event signaling in both threaded and async code.

© 2026 EngineersOfAI. All rights reserved.