Skip to main content

Memory Models and Concurrency

The Production Scenario

A production ML serving team deploys a model that runs inference in multiple threads to maximize GPU utilization. Each thread handles a separate request, and they share a Python dictionary that caches tokenized inputs for recently seen texts. The system runs in production for three weeks before a subtle bug surfaces: occasionally, the cache returns a tokenized result that belongs to a different request. The victim user sees model output that is coherent but completely irrelevant to their input - as if the model answered someone else's question.

The root cause is a race condition in the shared cache. Python's GIL (Global Interpreter Lock) protects individual bytecode operations but not compound operations. The cache lookup-and-insert sequence - "if key not in cache, compute result and insert" - is not atomic. Between the check and the insert, another thread can insert a different key, and the first thread's subsequent insert overwrites it with a mapping that associates the original key with a different computation's result.

The fix is six lines: a threading.Lock() protecting the cache operations. But finding the bug required reproducing it (difficult - it only happens at high concurrency under specific timing), understanding why the GIL does not protect this case (non-obvious), and tracing the corrupted output back to a cache collision (requires good logging infrastructure). The team spent two weeks on this. Six lines of code.

This lesson is about memory models: the rules that govern what value a read operation returns when multiple threads are writing. Without understanding memory models, concurrent code is a minefield of bugs that only appear in production. With that understanding, you can write concurrent code that is correct by design rather than correct by accident.


Why This Exists

Modern CPUs do not execute instructions in the order you wrote them. Compilers reorder instructions for optimization. CPUs execute instructions out-of-order for performance. Memory writes are buffered in store buffers before reaching main memory. Caches on different CPU cores are not automatically synchronized.

All of this is invisible when your program runs on a single thread: the CPU provides the illusion of sequential execution. When multiple threads share memory, the illusion breaks. Two threads can observe each other's memory writes in a different order than the writes actually occurred. This is not a hardware bug - it is the hardware behaving exactly as specified by the memory model.

A memory model defines the ordering guarantees that a hardware architecture or programming language provides for memory operations in concurrent programs. Different architectures have different memory models:

  • x86: "Total Store Order" - reads are never reordered with older reads or writes; writes are never reordered with other writes. Relatively strong.
  • ARM/POWER: "Relaxed" - almost any reordering is permitted. The programmer must explicitly insert barriers to prevent reordering.
  • Java/C# memory model: defined in terms of "happens-before" relationships - a language-level abstraction over hardware.
  • Python memory model: the GIL provides strong guarantees for Python object operations, but not for NumPy/C extension operations.

Understanding which operations are safe without explicit synchronization - and which are not - is the foundation of writing correct concurrent code for ML systems.


Historical Context

The foundations of memory consistency models were laid in the 1979 paper "How to Make a Multiprocessor Computer That Correctly Executes Multiprocess Programs" by Leslie Lamport, which defined sequential consistency (SC): the result of any execution is the same as if the operations of all processors were executed in some sequential order, with operations of each processor appearing in the order specified by its program. Sequential consistency is the strongest useful model - easy to reason about, expensive to implement in hardware.

The MESI cache coherence protocol (Modified, Exclusive, Shared, Invalid) was developed in the 1980s to maintain cache consistency across multiple cores. It guarantees that only one cache line at a time can hold a modified (dirty) copy of a memory location - preventing the situation where two caches have different values for the same address. MESI is about coherence (no stale reads from a single variable), not consistency (ordering of operations on multiple variables).

The x86 memory model was formalized in Intel's and AMD's architecture manuals in the 2000s after inconsistencies in documentation caused confusion. The formal model (Total Store Order, TSO) was described by Sewell et al. (2010) in the x86-TSO paper.

The C++11 standard (2011) was the first major systems language to include a formal memory model in its specification, with atomic types and explicit memory ordering (seq_cst, acquire, release, relaxed). Before C++11, concurrent C++ programs relied on compiler-specific primitives and had undefined behavior per the standard.

In the Python ecosystem, the GIL was a deliberate design decision by Guido van Rossum (Python 1.0, 1994) to simplify CPython's memory management. The GIL prevents multiple native threads from executing Python bytecodes simultaneously, making CPython's reference counting and object manipulation thread-safe by construction. This trades parallelism for simplicity. PEP 703 (Python 3.13) introduced the option to build CPython without the GIL (the "free-threaded" build), fundamentally changing Python's concurrency model.


Core Concepts

Hardware Memory Model: Sequential Consistency vs Relaxed

Sequential consistency is the model programmers intuitively assume: operations happen in program order, and all threads see the same global order of operations.

In practice, hardware provides weaker guarantees:

Store buffers: when a CPU writes to memory, the write first enters the core's store buffer. The store buffer drains to cache asynchronously. Until a write drains, a different core reading the same address will see the old value.

Load/store reordering: CPUs can execute loads that are waiting on cache misses speculatively, allowing later loads/stores to complete first. On ARM, almost any reordering is legal if there is no data dependency.

Write combining: the CPU can merge multiple writes to adjacent addresses into a single cache write. This is a performance optimization for memory-mapped I/O and framebuffer writes, but it means writes may not be individually visible to other cores.

The classic example that breaks on relaxed hardware:

// Thread 1 // Thread 2
x = 1; y = 1;
r1 = y; r2 = x;

On a sequentially consistent system, either Thread 1 runs first (r1=1, r2=1) or Thread 2 runs first (r1=1, r2=1) or they interleave (r1=1, r2=1). The combination r1=0, r2=0 is impossible - at least one thread must see the other's write.

On a relaxed memory model (ARM, POWER), both threads might have their writes in store buffers when the reads execute. Thread 1 reads y=0 (Thread 2's write is buffered), Thread 2 reads x=0 (Thread 1's write is buffered). Result: r1=0, r2=0. This is legal on ARM.

CPU1["CPU Core 1<br/>executes Thread 1"]:::blue
CPU2["CPU Core 2<br/>executes Thread 2"]:::blue
SB1["Store Buffer 1<br/>x=1 (draining...)"]:::orange
SB2["Store Buffer 2<br/>y=1 (draining...)"]:::orange
L1C1["L1 Cache<br/>Core 1"]:::teal
L1C2["L1 Cache<br/>Core 2"]:::teal
L3["Shared L3 Cache<br/>+ Main Memory"]:::green

CPU1 -->|"write x=1"| SB1
CPU2 -->|"write y=1"| SB2
SB1 -->|"drains eventually"| L1C1
SB2 -->|"drains eventually"| L1C2
L1C1 <-->|"cache coherence (MESI)"| L3
L1C2 <-->|"cache coherence (MESI)"| L3

classDef blue fill:#dbeafe,color:#1e293b,stroke:#2563eb
classDef teal fill:#ccfbf1,color:#134e4a,stroke:#14b8a6
classDef orange fill:#ffedd5,color:#7c2d12,stroke:#ea580c
classDef green fill:#dcfce7,color:#14532d,stroke:#16a34a
classDef purple fill:#ede9fe,color:#4c1d95,stroke:#7c3aed
classDef red fill:#fee2e2,color:#7f1d1d,stroke:#dc2626

Memory Barriers and Fence Instructions

A memory barrier (fence) is an instruction that prevents certain types of memory operation reordering across it. Different barriers prevent different reorderings.

Types of barriers:

  • Full fence (mfence on x86, dmb ish on ARM): no memory operation before the fence can be reordered with any operation after it. The strongest guarantee.
  • Store fence (sfence on x86): all stores before the fence are visible before any store after the fence.
  • Load fence (lfence on x86): all loads before the fence complete before any load after the fence.
  • Acquire barrier: all reads/writes after this point are not reordered before it. Used when "acquiring" a lock or reading from a shared flag.
  • Release barrier: all reads/writes before this point are not reordered after it. Used when "releasing" a lock or writing to a shared flag.

The acquire/release pair is the building block of lock-free programming. The pattern:

Producer: Consumer:
write data acquire fence (wait for flag)
release fence read data
write flag = 1
(after acquire, sees data writes)

Without the barriers, the consumer might see flag = 1 but still read stale data because the data writes are buffered.

// Memory barriers in C (portable via C11 atomics or GCC builtins)
#include <stdatomic.h>
#include <stdbool.h>

// Shared variables
int data = 0;
atomic_bool ready = false;

// Producer thread
void producer() {
data = 42; // write data
// release fence: all writes before this point are visible
// before the write to `ready`
atomic_store_explicit(&ready, true, memory_order_release);
}

// Consumer thread
void consumer() {
// acquire fence: all reads after this point see writes
// that happened before the corresponding release
while (!atomic_load_explicit(&ready, memory_order_acquire)) {
// spin wait
}
// Guaranteed: data == 42 here
int val = data;
(void)val;
}

// Calling from a Python C extension
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <pthread.h>
#include <stdatomic.h>

// Thread-safe flag using acquire/release semantics
typedef struct {
int result;
atomic_int flag; // 0 = not ready, 1 = ready
} SharedResult;

static void *compute_worker(void *arg) {
SharedResult *shared = (SharedResult *)arg;
shared->result = 12345; // write result
atomic_store_explicit(&shared->flag, 1, memory_order_release); // signal ready
return NULL;
}

static PyObject *run_with_barrier(PyObject *self, PyObject *args) {
SharedResult shared = {0, 0};
pthread_t tid;
pthread_create(&tid, NULL, compute_worker, &shared);
pthread_join(tid, NULL);

// After pthread_join, all writes from the thread are visible
// (pthread_join includes an implicit full barrier)
return PyLong_FromLong(shared.result);
}

Python's Memory Model and the GIL

The Python Global Interpreter Lock (GIL) is a mutex that prevents multiple Python threads from executing Python bytecodes simultaneously. One thread at a time holds the GIL. This means:

  • All reference count operations (incrementing/decrementing ob_refcnt) are atomic from the perspective of Python threads
  • All Python object attribute reads/writes (via __setattr__/__getattr__) are serialized
  • Built-in operations that are a single bytecode instruction (like list.append()) are effectively atomic

What the GIL does NOT protect:

  • Multi-step operations: reading a variable, computing a new value, writing it back is three steps. Another thread can execute between any two of them.
  • NumPy/C extension code: C extensions release the GIL during long computations. While a C extension is running, other Python threads can modify Python objects that the extension references.
  • multiprocessing processes: different processes have completely separate memory spaces.
import threading
import time

# Demonstrating that the GIL does NOT protect compound operations

counter = 0 # shared mutable state

def increment_naive(n_times: int):
global counter
for _ in range(n_times):
# This is NOT atomic: read counter, add 1, write counter
# Three bytecode instructions - another thread can interrupt between them
counter += 1 # LOAD_GLOBAL, BINARY_ADD, STORE_GLOBAL

def run_race_condition_demo():
global counter
counter = 0
n_threads = 4
n_increments = 100_000
expected = n_threads * n_increments

threads = [threading.Thread(target=increment_naive, args=(n_increments,))
for _ in range(n_threads)]

for t in threads:
t.start()
for t in threads:
t.join()

print(f"Expected: {expected:,}")
print(f"Got: {counter:,}")
print(f"Lost: {expected - counter:,} increments")
# On CPython, the GIL actually protects this specific case (STORE_GLOBAL is atomic)
# but on other Python implementations (PyPy, Jython) or free-threaded CPython,
# counter < expected is common

# The correct approach: use a Lock
counter_safe = 0
lock = threading.Lock()

def increment_safe(n_times: int):
global counter_safe
for _ in range(n_times):
with lock:
counter_safe += 1 # entire read-modify-write is protected

# Or use threading.atomic_int (conceptually - Python uses different mechanisms)
# In Python, use multiprocessing.Value with a lock for true atomic integers
import multiprocessing
import ctypes

def demonstrate_atomic_counter():
# multiprocessing.Value with a lock
counter = multiprocessing.Value(ctypes.c_int64, 0)

def worker(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1

processes = [multiprocessing.Process(target=worker, args=(counter, 25000))
for _ in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()

print(f"Atomic counter result: {counter.value}") # Always 100000

Python Multiprocessing: fork vs spawn

When you create a child process with multiprocessing, Python offers different "start methods":

fork (default on Linux/macOS): the child process is created by duplicating the parent process's memory using fork(). The child inherits all parent state, including open file descriptors, CUDA context, and Python objects. Copy-on-Write (CoW) means this is initially cheap - pages are shared between parent and child until either modifies them.

spawn (default on Windows; optional on Linux/macOS): a fresh Python interpreter is started. The child has no inherited state except what is explicitly passed. The if __name__ == '__main__': guard is required to prevent infinite spawning.

forkserver: a dedicated "server" process handles forking. Workers are forked from this clean server, not from the main process with its potentially dirty state.

The critical issue with fork in ML: CUDA contexts are not fork-safe. If the parent process has initialized CUDA (by creating any CUDA tensor), forking will create child processes with a corrupt CUDA context. PyTorch explicitly warns against using fork with CUDA.

import multiprocessing as mp
import torch
import numpy as np

# WRONG: fork after CUDA initialization
def wrong_approach():
model = torch.nn.Linear(100, 10).cuda() # initializes CUDA
# Forking now is unsafe - CUDA context in child is corrupt
pool = mp.Pool(4) # uses fork by default on Linux
# Workers will fail with mysterious CUDA errors

# CORRECT: use spawn for any CUDA-using code
def correct_approach():
ctx = mp.get_context('spawn')
pool = ctx.Pool(4)
# Each worker starts fresh, initializes its own CUDA context

# Copy-on-Write (CoW) behavior with numpy arrays
# In fork(), large numpy arrays are shared (CoW) - very efficient
# When workers modify arrays, CoW triggers and each gets their own copy

def worker_reads_array(shared_array, worker_id):
"""Worker that only reads the shared array - no CoW triggered."""
result = shared_array.sum()
print(f"Worker {worker_id}: sum = {result:.2f}")

def worker_modifies_array(shared_array, worker_id):
"""Worker that modifies - CoW triggers, each worker gets private copy."""
shared_array[0] = worker_id # triggers CoW for this page
print(f"Worker {worker_id}: modified first element to {worker_id}")

# Using shared memory to explicitly share arrays without CoW
from multiprocessing import shared_memory
import numpy as np

def create_shared_numpy_array(shape, dtype=np.float32):
"""Create a numpy array in shared memory accessible by all processes."""
shm = shared_memory.SharedMemory(create=True,
size=np.prod(shape) * np.dtype(dtype).itemsize)
arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
return arr, shm

def attach_shared_array(name: str, shape, dtype=np.float32):
"""Attach to an existing shared memory array."""
shm = shared_memory.SharedMemory(name=name)
arr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
return arr, shm

# Example: shared feature matrix across worker processes
def shared_memory_demo():
shape = (100000, 128)
features, shm = create_shared_numpy_array(shape)
features[:] = np.random.randn(*shape).astype(np.float32)

print(f"Shared memory name: {shm.name}")
print(f"Array shape: {features.shape}, dtype: {features.dtype}")

def worker(shm_name, shape, worker_id):
arr, shm = attach_shared_array(shm_name, shape)
# This reads from shared memory - no copy
row_sum = arr[worker_id * 1000].sum()
shm.close()
return row_sum

ctx = mp.get_context('spawn')
with ctx.Pool(4) as pool:
results = pool.starmap(worker,
[(shm.name, shape, i) for i in range(4)])
print(f"Results: {results}")

shm.close()
shm.unlink() # free the shared memory

Threading Race Conditions in Python

The GIL makes many Python operations safe from races that would be dangerous in other languages. But not all operations:

import threading
import time
from collections import defaultdict

# RACE CONDITION 1: check-then-act
cache = {}
cache_lock = threading.Lock()

def unsafe_cache_get(key, compute_fn):
"""BUG: check and insert are not atomic even with GIL."""
if key not in cache: # Step 1: check
# DANGER ZONE: another thread can run here
result = compute_fn(key) # Step 2: compute
cache[key] = result # Step 3: insert
return cache[key]

def safe_cache_get(key, compute_fn):
"""CORRECT: use double-checked locking."""
if key not in cache: # Fast path: check without lock
with cache_lock:
if key not in cache: # Re-check with lock held
cache[key] = compute_fn(key)
return cache[key]

# RACE CONDITION 2: list append vs iteration
shared_list = []
list_lock = threading.Lock()

def unsafe_append(item):
shared_list.append(item) # list.append is atomic in CPython
# (single bytecode: CALL_METHOD)
# But iterating while appending is not safe

def unsafe_iterate():
for item in shared_list: # BUG: list size may change during iteration
process(item) # RuntimeError: list changed size during iteration

def safe_iterate():
with list_lock:
snapshot = list(shared_list) # take a snapshot with lock held
for item in snapshot: # iterate the snapshot safely
process(item)

# RACE CONDITION 3: dict update vs read in multiple threads
# dict.update() and individual key assignments are NOT atomic
# A thread reading dict during an update may see a partially-updated state
stats = {}
stats_lock = threading.Lock()

def update_stats(new_values: dict):
"""WRONG: not atomic"""
stats.update(new_values) # BUG: reader may see incomplete update

def update_stats_safe(new_values: dict):
"""CORRECT: atomic replacement"""
with stats_lock:
stats.update(new_values)

def read_stats_safe():
with stats_lock:
return dict(stats) # return a copy while holding the lock

def process(item):
pass # placeholder

Atomic Operations and Thread-Safe Primitives

Python's threading module provides synchronization primitives built on OS-level mutexes and condition variables.

import threading
import queue
import time
from typing import Optional, Callable, Any

# threading.Event: a simple binary flag
# set() and wait() are the acquire/release pair
event = threading.Event()

def producer_thread(data_list: list, event: threading.Event):
for i in range(10):
data_list.append(i * i)
time.sleep(0.01)
event.set() # signal that production is complete

def consumer_thread(data_list: list, event: threading.Event):
event.wait() # block until event is set
# After event.wait() returns, we are guaranteed to see
# all writes that happened before event.set()
print(f"Received {len(data_list)} items: {data_list}")

# threading.Condition: more powerful - supports wait/notify pattern
class ThreadSafeQueue:
"""Thread-safe FIFO queue using Condition variable."""

def __init__(self, maxsize: int = 0):
self._queue = []
self._maxsize = maxsize
self._cond = threading.Condition()

def put(self, item, block: bool = True, timeout: Optional[float] = None):
with self._cond:
if self._maxsize > 0:
deadline = time.monotonic() + (timeout or float('inf'))
while len(self._queue) >= self._maxsize:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise queue.Full
self._cond.wait(timeout=remaining)
self._queue.append(item)
self._cond.notify() # wake one waiting consumer

def get(self, block: bool = True, timeout: Optional[float] = None):
with self._cond:
deadline = time.monotonic() + (timeout or float('inf'))
while not self._queue:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise queue.Empty
self._cond.wait(timeout=remaining)
item = self._queue.pop(0)
self._cond.notify() # wake one waiting producer
return item

# multiprocessing.Value with a lock: cross-process atomic integer
import multiprocessing
import ctypes

class AtomicCounter:
"""Cross-process atomic counter using multiprocessing.Value."""

def __init__(self, initial: int = 0):
self._val = multiprocessing.Value(ctypes.c_int64, initial)

def increment(self) -> int:
with self._val.get_lock():
self._val.value += 1
return self._val.value

def decrement(self) -> int:
with self._val.get_lock():
self._val.value -= 1
return self._val.value

def get(self) -> int:
return self._val.value # int reads are atomic on x86-64

def reset(self):
with self._val.get_lock():
self._val.value = 0

Lock-Free Ring Buffer

A ring buffer (circular buffer) is the standard data structure for producer-consumer communication in ML data pipelines. It allows one producer and one consumer to communicate without locks - the single-producer single-consumer (SPSC) ring buffer is provably lock-free if the index updates are atomic.

The key insight: the producer writes to head, the consumer reads from tail. If only one thread writes head and only one thread writes tail, and reads/writes of pointer-sized integers are atomic on the target architecture (they are on x86-64 and ARM64), then no explicit locking is needed.

import threading
import time
import numpy as np
from multiprocessing import shared_memory, Value
import ctypes

class SPSCRingBuffer:
"""
Single-Producer Single-Consumer lock-free ring buffer.
One thread calls put(), one thread calls get().
Do NOT use with multiple producers or multiple consumers.
"""

def __init__(self, capacity: int, item_size: int):
self.capacity = capacity
self.item_size = item_size

# Allocate buffer in shared memory for cross-process use
self._shm = shared_memory.SharedMemory(
create=True,
size=capacity * item_size
)
self._buf = np.ndarray((capacity, item_size), dtype=np.uint8,
buffer=self._shm.buf)

# Head: index where producer writes next (owned by producer)
# Tail: index where consumer reads next (owned by consumer)
# Using Value for cross-process atomic access
self._head = Value(ctypes.c_uint64, 0)
self._tail = Value(ctypes.c_uint64, 0)

def put(self, item: bytes, block: bool = True) -> bool:
"""Write item to buffer. Returns False if full (when block=False)."""
assert len(item) == self.item_size

head = self._head.value
next_head = (head + 1) % self.capacity

# Check if buffer is full
if next_head == self._tail.value:
if not block:
return False
while next_head == self._tail.value:
time.sleep(0.0001) # spin with yield

# Write item
self._buf[head] = np.frombuffer(item, dtype=np.uint8)

# Publish: update head AFTER writing data
# On x86-64, this store is visible to the consumer after
# all previous stores (TSO guarantees this)
self._head.value = next_head
return True

def get(self, block: bool = True) -> bytes:
"""Read item from buffer. Returns None if empty (when block=False)."""
tail = self._tail.value

# Check if buffer is empty
if tail == self._head.value:
if not block:
return None
while tail == self._head.value:
time.sleep(0.0001)

# Read item
item = bytes(self._buf[tail])

# Update tail AFTER reading
self._tail.value = (tail + 1) % self.capacity
return item

def size(self) -> int:
head = self._head.value
tail = self._tail.value
return (head - tail) % self.capacity

def is_empty(self) -> bool:
return self._head.value == self._tail.value

def is_full(self) -> bool:
return (self._head.value + 1) % self.capacity == self._tail.value

def close(self):
self._shm.close()
self._shm.unlink()


# Usage: DataLoader-style producer-consumer
import pickle

def tensor_ring_buffer_demo():
"""
Demonstrate ring buffer for ML batch prefetching.
Producer thread loads and preprocesses batches.
Consumer thread (training) reads batches.
"""
BATCH_SIZE = 32
FEATURE_DIM = 128
BUFFER_CAPACITY = 8 # 8 prefetched batches

# Serialize a batch as bytes for the ring buffer
def serialize_batch(features: np.ndarray, labels: np.ndarray) -> bytes:
return pickle.dumps((features, labels))

def deserialize_batch(data: bytes):
return pickle.loads(data)

# For production, use a fixed-size serialization (no pickle) for better performance
# e.g., struct.pack for fixed-size numeric arrays

# Use Python's built-in queue.Queue which handles all the locking
import queue

batch_queue = queue.Queue(maxsize=BUFFER_CAPACITY)

stop_signal = threading.Event()

def loader_worker():
"""Simulates a data loading worker."""
batch_idx = 0
while not stop_signal.is_set():
# Simulate loading and preprocessing
features = np.random.randn(BATCH_SIZE, FEATURE_DIM).astype(np.float32)
labels = np.random.randint(0, 10, size=BATCH_SIZE).astype(np.int64)

# put() blocks if queue is full (backpressure)
batch_queue.put((features, labels), block=True)
batch_idx += 1

loader_thread = threading.Thread(target=loader_worker, daemon=True)
loader_thread.start()

# Consume 20 batches
for i in range(20):
features, labels = batch_queue.get()
# Simulate training step
time.sleep(0.01)
print(f"Consumed batch {i}: features {features.shape}")

stop_signal.set()

Thread-Safe Gradient Accumulation

In distributed training, multiple workers compute gradients simultaneously and must accumulate them correctly. The naive approach - each worker writes directly to a shared gradient buffer - has race conditions: two workers reading the same gradient value and adding to it can produce old_val + grad1 or old_val + grad2 instead of old_val + grad1 + grad2.

import torch
import threading
from typing import List, Dict

class ThreadSafeGradientAccumulator:
"""
Thread-safe gradient accumulator for simulated data parallelism.
In production, use torch.distributed or NCCL for multi-GPU;
this demonstrates the concurrency patterns.
"""

def __init__(self, model: torch.nn.Module):
self.model = model
self._lock = threading.Lock()
self._grad_buffers: Dict[str, torch.Tensor] = {}
self._n_accumulated = 0
self._reset_buffers()

def _reset_buffers(self):
"""Initialize zero gradient buffers."""
with self._lock:
self._grad_buffers = {
name: torch.zeros_like(param)
for name, param in self.model.named_parameters()
if param.requires_grad
}
self._n_accumulated = 0

def accumulate(self, gradients: Dict[str, torch.Tensor]):
"""
Add a set of gradients to the accumulation buffers.
Thread-safe: multiple workers can call this concurrently.
"""
with self._lock:
for name, grad in gradients.items():
if name in self._grad_buffers:
self._grad_buffers[name].add_(grad)
self._n_accumulated += 1

def apply_and_reset(self, optimizer: torch.optim.Optimizer,
normalize: bool = True):
"""
Apply accumulated gradients to the model and reset.
Should be called by one thread after all workers have accumulated.
"""
with self._lock:
if self._n_accumulated == 0:
return

scale = 1.0 / self._n_accumulated if normalize else 1.0

# Copy accumulated gradients to model parameters
for name, param in self.model.named_parameters():
if param.requires_grad and name in self._grad_buffers:
param.grad = self._grad_buffers[name] * scale

self._n_accumulated_backup = self._n_accumulated

# Optimizer step outside the lock (it only accesses model parameters)
optimizer.step()
optimizer.zero_grad()

# Reset accumulation buffers
self._reset_buffers()


# Simulated multi-worker gradient accumulation
def simulate_data_parallel_training():
import torch.nn as nn

model = nn.Linear(128, 10)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
accumulator = ThreadSafeGradientAccumulator(model)

def worker_fn(worker_id: int, n_steps: int):
for step in range(n_steps):
# Each worker computes gradients on its own mini-batch
x = torch.randn(16, 128)
y = torch.randint(0, 10, (16,))

# Local forward-backward
output = model(x)
loss = nn.CrossEntropyLoss()(output, y)
loss.backward()

# Collect gradients
grads = {
name: param.grad.clone()
for name, param in model.named_parameters()
if param.grad is not None
}
model.zero_grad()

# Thread-safe accumulation
accumulator.accumulate(grads)

# Run 4 workers simultaneously
barrier = threading.Barrier(4)
n_rounds = 10

def worker_with_sync(worker_id):
for round_idx in range(n_rounds):
worker_fn(worker_id, n_steps=1)
barrier.wait() # all workers finish their step
if worker_id == 0:
# Only worker 0 applies gradients
accumulator.apply_and_reset(optimizer)
barrier.wait() # wait for apply before next round

threads = [threading.Thread(target=worker_with_sync, args=(i,))
for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()

print("Simulated data-parallel training complete")

Distributed Training Memory Model: AllReduce and NCCL

In distributed training across multiple GPUs, parameter servers and AllReduce are the two primary gradient aggregation patterns. Each has different memory model implications.

Parameter Server: one server process holds the model parameters. Workers send gradients to the server, which applies the update and broadcasts the new parameters. Memory model: the server acts as the sequentializing point. All gradient updates are totally ordered.

AllReduce (used by NCCL, the NVIDIA Collective Communications Library): all workers participate in a ring-allreduce or tree-allreduce. Each worker's gradient contribution is merged with all others' contributions simultaneously. No single sequentializing point. The memory model guarantee: after dist.all_reduce() returns, every process sees the fully reduced gradient. The NCCL barrier embedded in the allreduce is the synchronization point.

import torch
import torch.distributed as dist
import os

def init_distributed():
"""Initialize the distributed process group."""
dist.init_process_group(
backend="nccl", # NCCL for GPU-to-GPU communication
init_method="env://",
)
rank = dist.get_rank()
world_size = dist.get_world_size()
torch.cuda.set_device(rank)
return rank, world_size


def allreduce_gradients(model: torch.nn.Module, world_size: int):
"""
AllReduce all parameter gradients.
After this call, all processes have the average gradient.
NCCL provides the memory ordering guarantee:
after dist.all_reduce() returns, the result is fully visible.
"""
for param in model.parameters():
if param.grad is not None:
# SUM all gradients across processes
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)
# Divide by world_size to get the mean gradient
param.grad.div_(world_size)


def distributed_training_step(
model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
inputs: torch.Tensor,
targets: torch.Tensor,
world_size: int,
):
"""
Single training step in a distributed setting.
Each process computes gradients on its local batch,
then AllReduce synchronizes gradients across all processes.
"""
optimizer.zero_grad()

with torch.autocast(device_type='cuda', dtype=torch.bfloat16):
outputs = model(inputs)
loss = torch.nn.CrossEntropyLoss()(outputs, targets)

loss.backward()

# AllReduce: after this, all processes have the mean gradient
# Memory model guarantee: writes from backward() on all processes
# are visible to all processes after all_reduce() returns
allreduce_gradients(model, world_size)

optimizer.step()
return loss.item()


# NCCL barrier: explicit synchronization point
def synchronized_checkpoint_save(model: torch.nn.Module, path: str, rank: int):
"""Save checkpoint from rank 0 only, after all ranks finish training."""
# All ranks must reach this barrier before any rank proceeds
dist.barrier()
# Memory model: after barrier() returns on all ranks,
# all ranks see all writes from all other ranks

if rank == 0:
# Safe to save: all ranks have finished their last training step
torch.save(model.state_dict(), path)
print(f"Checkpoint saved to {path}")

# Wait for rank 0 to finish saving before other ranks proceed
dist.barrier()

Memory Ordering in ML Infrastructure

WRITE["Gradient Computed<br/>(backward pass)"]:::green
ACCUM["Gradient Accumulated<br/>(local buffer)"]:::blue
BARRIER["Synchronization Barrier<br/>(AllReduce / Lock.release())"]:::orange
READ["Optimizer Step<br/>(reads mean gradient)"]:::purple
NEXT["Next Training Step<br/>(uses updated params)"]:::teal

WRITE -->|"thread-local, no sync needed"| ACCUM
ACCUM -->|"all-reduce or lock"| BARRIER
BARRIER -->|"acquire semantics:<br/>all prior writes visible"| READ
READ -->|"parameter update"| NEXT

classDef blue fill:#dbeafe,color:#1e293b,stroke:#2563eb
classDef teal fill:#ccfbf1,color:#134e4a,stroke:#14b8a6
classDef orange fill:#ffedd5,color:#7c2d12,stroke:#ea580c
classDef green fill:#dcfce7,color:#14532d,stroke:#16a34a
classDef purple fill:#ede9fe,color:#4c1d95,stroke:#7c3aed
classDef red fill:#fee2e2,color:#7f1d1d,stroke:#dc2626

Production Engineering Notes

Debugging Race Conditions in Python

Race conditions are non-deterministic: they depend on thread scheduling, system load, and timing. A test that passes 999 times and fails once on the 1000th is a race condition.

Tools and techniques for finding them:

import threading
import logging
import traceback
import functools
from typing import Callable

# Technique 1: Logging with thread ID to trace interleaving
def logged(fn: Callable) -> Callable:
@functools.wraps(fn)
def wrapper(*args, **kwargs):
tid = threading.current_thread().name
logging.debug(f"[{tid}] ENTER {fn.__name__}")
result = fn(*args, **kwargs)
logging.debug(f"[{tid}] EXIT {fn.__name__}")
return result
return wrapper

# Technique 2: Thread Sanitizer (requires C extension or special Python build)
# For C extension code, compile with -fsanitize=thread
# Then run: TSAN_OPTIONS=halt_on_error=1 python script.py

# Technique 3: Deterministic replay (increase time under the race window)
import time

class RaceDetector:
"""Artificially slow down operations to make races more likely to manifest."""

def __init__(self, delay_ms: float = 1.0):
self._delay = delay_ms / 1000.0

def before_critical_section(self):
time.sleep(self._delay) # widen the race window

# In testing:
race_detector = RaceDetector(delay_ms=5.0)

def buggy_cache_get_with_detector(cache, key, compute_fn):
if key not in cache:
race_detector.before_critical_section() # force the race to happen
cache[key] = compute_fn(key)
return cache[key]

The GIL Release and NumPy Gotchas

NumPy operations release the GIL during computation. This means two threads can run NumPy operations simultaneously, which is great for throughput but means NumPy arrays can be mutated by one thread while another is reading them.

import numpy as np
import threading

# BUG: thread A reads array while thread B writes it
# Both operations are NumPy (GIL released) - they run concurrently
shared_array = np.zeros(1000000, dtype=np.float64)

def writer():
for i in range(10):
shared_array[:] = i # writes while GIL is released

def reader():
for i in range(10):
val = shared_array[500000] # reads while GIL is released
print(f"Read: {val}") # may print non-integer values (torn read)

# In practice: protect NumPy arrays with threading.Lock or
# use thread-local arrays (each thread has its own copy)

# Thread-local storage: no sharing, no races
thread_local = threading.local()

def get_thread_local_workspace(size: int) -> np.ndarray:
"""Each thread gets its own workspace array."""
if not hasattr(thread_local, 'workspace') or len(thread_local.workspace) < size:
thread_local.workspace = np.zeros(size, dtype=np.float32)
return thread_local.workspace[:size]

Common Mistakes

danger

Using a mutable default argument shared across threads: def f(cache={}) in Python creates one dictionary shared across all calls to f. In a multi-threaded context, this is a hidden shared mutable state that is guaranteed to cause race conditions. Use None as the default and create the object inside the function, or pass an explicitly shared object protected by a lock.

danger

Assuming dict/list operations are atomic beyond single-item operations: d[key] = value for a single key assignment is atomic in CPython (single STORE_SUBSCR bytecode). But d.update({'a': 1, 'b': 2}) is NOT atomic - another thread can observe the dict with only 'a' updated. For compound updates that must be seen atomically, create a new dict and atomically replace the reference: d = {**d, 'a': 1, 'b': 2}.

warning

Using fork() after CUDA initialization in PyTorch: PyTorch explicitly warns that CUDA contexts are not fork-safe. If you initialize any CUDA tensor (including by loading a model on GPU) and then use multiprocessing.Pool (which uses fork by default on Linux), workers will have a corrupted CUDA context. Always use multiprocessing.get_context('spawn') or 'forkserver' for any code that touches CUDA.

warning

Using threading.Event.wait() without a timeout in production code: event.wait() without a timeout will block forever if the event is never set (due to a bug in the producer, a crash, or a deadlock). Always pass a timeout: event.wait(timeout=30.0). Check the return value: True means the event was set, False means timeout. Never leave indefinite waits in production services.


Interview Q&A

Q: Explain why the GIL does not prevent all race conditions in Python, and give a concrete example.

A: The GIL prevents two threads from executing Python bytecodes simultaneously. It does not prevent race conditions on compound operations - operations that require multiple bytecodes to complete, where another thread can execute between any two of them.

The classic example: counter += 1 compiles to three bytecodes: LOAD_GLOBAL (read counter), BINARY_ADD (add 1), STORE_GLOBAL (write result). The GIL is released and potentially re-acquired between any two of these. Thread A reads counter=5, Thread B reads counter=5, Thread A writes counter=6, Thread B writes counter=6. Expected: 7. Actual: 6. One increment was lost.

In CPython, this specific case is actually often protected because the GIL switch interval (sys.getswitchinterval(), default 5ms) is long enough that the three bytecodes usually complete before a context switch. But this is an implementation detail, not a guarantee.

The more dangerous case is check-then-act: if key not in d: d[key] = compute(). This is two separate atomic operations. Between the check and the insert, another thread can insert the same key with a different value. After both threads complete, one thread's result has overwritten the other's. The cache serving a wrong result to the wrong user is exactly this pattern.

NumPy operations release the GIL entirely - two threads running NumPy code simultaneously can corrupt an array without any protection from the GIL. This is the most dangerous case for ML code, where heavy use of NumPy is ubiquitous.


Q: What is the difference between cache coherence and memory consistency?

A: These are related but distinct concepts at different levels of the memory hierarchy.

Cache coherence is about the semantics of a single memory location across multiple caches. The MESI protocol (and variants like MESIF, MOESI) ensures that at any time, at most one cache holds a "modified" (dirty, writable) copy of any cache line. If Cache 1 wants to write to a line that Cache 2 currently holds as modified, Cache 2 must first "invalidate" or "write back" its copy. Coherence guarantees that reads always see the latest write to any specific address - there are no stale reads from a single variable. Without coherence, thread A's writes to x might never be visible to thread B.

Memory consistency is about the ordering of operations on multiple memory locations. Even with perfect cache coherence (no stale reads), the order in which one thread's writes to different locations become visible to another thread can be surprising. Thread A writes to data, then writes to flag. Thread B reads flag, then reads data. Coherence guarantees B sees A's writes to flag and to data. But consistency determines whether B might see A's write to flag before seeing A's write to data (i.e., whether the write order is preserved).

Sequential consistency (the strong model) says the global execution order is consistent with each thread's program order. x86 TSO (Total Store Order) relaxes this: stores can be delayed in the store buffer and appear out of order to other cores, but loads always see the latest value from the store buffer of their own core. ARM has an even more relaxed model where almost any reordering is possible.

For ML systems: cache coherence is handled by the hardware automatically. Memory consistency is your responsibility - you must use explicit barriers, locks, or atomic operations with the appropriate memory ordering to get the consistency guarantees you need.


Q: How does Python's multiprocessing fork() work with Copy-on-Write, and what are the ML-specific pitfalls?

A: When fork() creates a child process, the OS does not immediately copy the parent's memory. Instead, both processes share the same physical pages mapped as read-only. When either process writes to a page, the OS intercepts the write (via a page fault on the write-protected page), copies the page to a new physical location, and maps the write to the private copy. This is Copy-on-Write (CoW).

The benefit: forking a process with 10 GB of model weights is nearly instantaneous, and if workers only read the weights (inference without modification), they never trigger CoW - all workers share the same physical pages. Loading a 10 GB model once and forking 8 workers costs 10 GB of RAM, not 80 GB.

The problem 1: Python's reference counting constantly writes to the ob_refcnt field of every Python object. Every time a worker accesses a Python object (even read-only in intent), the reference count write triggers CoW on that page. Over time, the "shared" data becomes private to each worker. Workers that were supposed to share model weights end up with private copies. This is why workers that use Python objects (not raw numpy) tend to increase RSS over time even when doing read-only work.

Solution: access shared data through numpy arrays (backed by raw memory pages without Python reference counting) rather than Python objects.

The problem 2: CUDA contexts are not fork-safe. CUDA uses Unix file descriptors, shared memory, and kernel modules that are not designed to be duplicated via fork(). Forking after CUDA initialization creates children with corrupt CUDA state. PyTorch's DataLoader with num_workers > 0 uses fork on Linux and must be created before any CUDA initialization, or must use spawn or forkserver.

Solution: always initialize DataLoader before loading any model to GPU, or use multiprocessing.set_start_method('spawn') to force spawn.


Q: Explain AllReduce in distributed training and what memory ordering guarantees NCCL provides.

A: AllReduce is a collective communication operation where every process contributes a tensor and receives, as a result, the element-wise sum (or mean) of all processes' contributions. It is the core primitive in data-parallel training: each worker computes gradients on its local batch, then AllReduce produces the mean gradient across all workers, and each worker applies the same optimizer update. The result is identical to training on a single machine with a batch size equal to the sum of all workers' batch sizes.

The ring-AllReduce algorithm divides each tensor into N chunks (for N workers). In the first phase (reduce-scatter), each worker sends its chunk to the next worker in a ring, accumulating partial sums. After N-1 steps, each worker has a fully reduced sum for one chunk. In the second phase (all-gather), each worker broadcasts its fully reduced chunk to all others. After N-1 more steps, every worker has the full reduced tensor. Total communication per worker: 2 × (N-1)/N × tensor_size, approaching 2 × tensor_size for large N - optimal for dense tensors.

NCCL's memory ordering guarantee: after dist.all_reduce() returns on a process, that process is guaranteed to see the fully reduced result in the output tensor. All writes by all processes that went into the reduction (their backward pass gradients) are visible to all processes after the call returns. This is a full barrier in terms of the writes that contributed to the reduction.

What NCCL does NOT guarantee: ordering between an AllReduce and subsequent local computation is determined by CUDA stream ordering. NCCL operations execute on CUDA streams; subsequent GPU kernels on the same stream will see the AllReduce result. Operations on different streams need explicit synchronization. PyTorch's DDP (DistributedDataParallel) manages stream synchronization correctly; custom distributed code must handle it explicitly with torch.cuda.synchronize() or stream dependencies.


Q: What is a lock-free data structure, and when is it appropriate to use one in ML infrastructure?

A: A lock-free data structure provides concurrent access to shared data without using mutual exclusion locks (mutexes). Instead of blocking threads that cannot immediately proceed, lock-free structures use atomic operations (compare-and-swap, fetch-and-add) that complete in a bounded number of steps.

The key distinction: lock-free guarantees that at least one thread makes progress in a finite number of steps, even if other threads are suspended. Lock-based structures can cause priority inversion (a high-priority thread blocked on a lock held by a low-priority thread) and deadlocks (two threads each holding a lock the other needs). Lock-free structures avoid both.

For ML infrastructure, the SPSC (single-producer single-consumer) ring buffer is the most practically useful lock-free structure. It is used in DataLoader pipelines, model serving request queues, and profiler event buffers. On x86-64, SPSC queues can be implemented without any atomic instructions at all - simple sequential reads and writes are sufficient because of x86's Total Store Order guarantee (store ordering makes the producer-consumer protocol work without explicit barriers).

MPSC (multiple-producer single-consumer) queues are used in logging systems and metric collection, where multiple workers emit events to a central consumer. These require atomic operations (compare-and-swap for the head pointer) but avoid global locks.

When NOT to use lock-free structures: if contention is low and the protected section is short, a mutex is simpler, easier to reason about, and often as fast. Lock-free structures are complex - the correctness arguments are subtle, and bugs are hard to find. Use them specifically when: (1) lock acquisition latency is measurable and matters, (2) the access pattern matches a well-studied lock-free design (SPSC, MPSC), or (3) lock-based alternatives cause priority inversion in a real-time or near-real-time context.


Q: How do you make gradient accumulation thread-safe, and what synchronization primitives are appropriate?

A: Gradient accumulation across threads requires three guarantees: (1) each worker's gradient contribution is fully computed before accumulation, (2) all accumulations are atomic (no torn reads or writes), and (3) the optimizer step sees a consistent accumulated gradient (not a gradient being modified by another thread).

For guarantee (1): in Python threading, each worker thread independently calls loss.backward() on its own computation graph. This is thread-safe because each worker has its own graph - they do not share intermediate tensors (only model parameters are shared). The backward pass writes to param.grad in the model, which is shared. This is the dangerous step.

For guarantee (2): the accumulation param.grad += worker_grad must be protected by a lock because it is a read-modify-write operation. The appropriate primitive is threading.Lock() wrapped around the accumulation loop. Alternatively, PyTorch provides grad.add_() which is atomic for GPU operations (CUDA kernels are implicitly serialized within a stream) but not for CPU tensors.

For guarantee (3): before the optimizer step, all workers must have completed their accumulation. The appropriate primitive is threading.Barrier(n_workers): all threads call barrier.wait(), and execution proceeds only when all threads have reached the barrier. After the barrier, the accumulated gradient is stable and the optimizer step can proceed.

In production, avoid thread-based gradient accumulation entirely for multi-GPU training - use torch.distributed with NCCL's AllReduce instead. NCCL handles all the memory ordering and synchronization through its own protocol, and it operates at PCIe and NVLink bandwidth rather than over CPU memory. Thread-based accumulation is appropriate for CPU-based model training or for accumulating across micro-batches within a single process (a common technique for simulating large batch sizes with limited memory).

© 2026 EngineersOfAI. All rights reserved.