Skip to main content

Concurrency Primitives

The Data Loader That Ate the GPU

Your GPU utilization is 35%. The model is fast. The problem is the data pipeline. Every training step, all eight data-loading workers fight over the same preprocessing cache - a Python dict protected by a single lock. While one worker writes a preprocessed batch, the other seven spin, waiting. The GPU sits idle for 65% of each step, burning money and time.

This is not an unusual situation. It is the default situation for engineers who know Python but not concurrency. They reach for a global lock because it is simple and correct, without understanding that correctness is the floor, not the ceiling. The ceiling is knowing which primitive matches the access pattern - and knowing that the wrong primitive at the wrong granularity is just a slow, hard-to-debug sequential program wearing a concurrent costume.

The ML training loop is a concurrency problem at every layer. Multiple CPU workers feed a GPU queue. Multiple GPU streams overlap compute and memory transfers. Multiple nodes synchronize gradients over the network. A model server handles hundreds of concurrent inference requests, each of which might spawn multiple threads for preprocessing, batching, and postprocessing. Get the synchronization wrong and you get either data races (silent corruption) or excessive lock contention (the 35% GPU utilization problem).

This lesson builds the mental model for concurrency primitives from first principles. Every primitive - mutex, condition variable, semaphore, atomic, read-write lock - exists to solve a specific class of problem. Understanding which problem each one solves, and what it costs, is what separates engineers who can design correct and fast concurrent systems from those who produce correct and slow ones.

The examples go from theory to practice: from the C++ primitives that underpin PyTorch's parallel data loading, to the Python threading and concurrent.futures APIs that ML engineers use daily, to production patterns like producer-consumer pipelines, model hot-reload without downtime, and API rate limiting with asyncio semaphores.

Why This Exists

A single-threaded program is a sequence of instructions with a total ordering. Every instruction happens before the next, and the programmer can reason about the state of the program at any point by tracing that sequence. This simplicity comes at a price: modern CPUs have multiple cores, and a single-threaded program uses at most one.

The moment you add a second thread, you lose the total ordering. Two threads executing on two cores can interleave their instructions in any order, and the resulting state depends on timing that varies with CPU load, OS scheduling, and hardware prefetching. This is the root cause of every concurrency bug: two threads accessing shared state without coordination.

Concurrency primitives exist to restore partial ordering where it matters. A mutex says "only one thread may be in this section at a time." A condition variable says "this thread should sleep until another thread signals a specific condition is true." A semaphore says "at most N threads may be doing this operation simultaneously." An atomic says "this read-modify-write operation on a single word happens as one indivisible unit."

The insight that makes concurrent programming tractable is that you do not need a total ordering across all instructions. You only need to coordinate access to shared mutable state. Identify what state is shared, choose the right primitive to protect it, and keep the critical section as small as possible. Everything else can run in parallel.

Historical Context

The theoretical foundations of concurrent programming were established in the 1960s. Edsger Dijkstra introduced the semaphore in 1965 - the first formal synchronization primitive - along with the mutual exclusion problem and the concept of critical sections. Tony Hoare introduced monitors (a higher-level construct that wraps a mutex and condition variables) in 1974. These concepts remain the foundation of all modern synchronization libraries.

The pthread (POSIX Threads) API, standardized in 1995, gave Unix systems a portable threading model with mutexes (pthread_mutex_t), condition variables (pthread_cond_t), and read-write locks (pthread_rwlock_t). This is the API that Python's threading module wraps, and that C++ 11's std::thread, std::mutex, and std::condition_variable formalize.

C++11 (2011) was a watershed moment: it standardized the C++ memory model (defining what "happens-before" means across threads) and added std::thread, std::mutex, std::lock_guard, std::condition_variable, and std::atomic. Before C++11, multi-threaded C++ was technically undefined behavior. C++17 added std::shared_mutex (read-write lock) and parallel STL algorithms. C++20 added std::counting_semaphore, std::latch, std::barrier, and std::jthread.

Python's threading module has been in the standard library since Python 1.5.2 (1999). The GIL (Global Interpreter Lock) means that Python threads cannot execute Python bytecode in parallel on multiple cores - but they do run in parallel for I/O-bound work and for calls into C extensions (including NumPy and PyTorch). concurrent.futures (Python 3.2+) added a higher-level interface. asyncio (Python 3.4+) added cooperative concurrency for I/O-bound workloads.

Core Concepts

The Mutex: Mutual Exclusion

A mutex (mutual exclusion lock) ensures that at most one thread executes a critical section at a time. The thread that acquires the mutex "holds" it; all other threads trying to acquire it block (sleep) until the holder releases it.

// C++: std::mutex
#include <mutex>
#include <iostream>

std::mutex mu;
int shared_counter = 0;

void increment(int n) {
for (int i = 0; i < n; i++) {
std::lock_guard<std::mutex> lock(mu); // RAII: locks on construction
shared_counter++;
// lock released automatically when lock goes out of scope
}
}

std::lock_guard is RAII for mutexes: it locks on construction and unlocks on destruction, even if an exception is thrown. Never use mu.lock() / mu.unlock() directly - a return or throw between them leaves the mutex locked forever.

std::unique_lock is more flexible: it can be unlocked and relocked, passed to condition variables, and locked with a timeout:

#include <mutex>
#include <chrono>

std::mutex mu;

bool try_update(int timeout_ms) {
std::unique_lock<std::mutex> lock(mu,
std::chrono::milliseconds(timeout_ms));
if (!lock.owns_lock()) {
return false; // timeout - did not acquire
}
// ... critical section ...
return true;
}

Condition Variables: Waiting for a Condition

A condition variable lets a thread sleep until another thread signals that a condition has changed. This is how a consumer thread waits for items in a queue without busy-waiting (spinning and burning CPU):

#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mu;
std::condition_variable cv;
std::queue<int> work_queue;
bool done = false;

// Producer thread
void producer() {
for (int i = 0; i < 100; i++) {
{
std::lock_guard<std::mutex> lock(mu);
work_queue.push(i);
}
cv.notify_one(); // wake one waiting consumer
}
{
std::lock_guard<std::mutex> lock(mu);
done = true;
}
cv.notify_all(); // wake all consumers so they can exit
}

// Consumer thread
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mu);

// wait() atomically: releases lock + sleeps until notified
// then re-acquires lock and checks predicate
cv.wait(lock, [] { return !work_queue.empty() || done; });

if (work_queue.empty() && done) break;

int item = work_queue.front();
work_queue.pop();
lock.unlock(); // unlock before doing work

// ... process item ...
}
}

The predicate in cv.wait(lock, predicate) guards against spurious wakeups - the OS can wake a thread without a notify. Always use the two-argument form of wait with a predicate.

Semaphores: Counting Permits

A semaphore is a counter. acquire() decrements the counter; if the counter would go negative, the thread blocks until another thread calls release(). A binary semaphore (count 0 or 1) is equivalent to a mutex. A counting semaphore with count N limits concurrency to N simultaneous threads.

// C++20: std::counting_semaphore
#include <semaphore>

// Allow at most 4 concurrent GPU preprocessing threads
std::counting_semaphore<4> gpu_slots(4);

void preprocess_on_gpu(int batch_id) {
gpu_slots.acquire(); // blocks if 4 threads already running
// ... use GPU ...
gpu_slots.release(); // release slot for next thread
}

Semaphores are the right primitive when you want to limit concurrency to a resource pool: GPU handles, database connections, API rate limits. A mutex is a semaphore with count 1.

Atomics: Lock-Free Single-Word Operations

std::atomic<T> wraps a type T so that read-modify-write operations are guaranteed to be indivisible (atomic) without a mutex. Operations include load, store, fetch_add, fetch_sub, compare_exchange_weak, and compare_exchange_strong.

#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> counter(0);
std::atomic<bool> stop_flag(false);

// Thread-safe counter - no mutex needed
void worker() {
while (!stop_flag.load(std::memory_order_relaxed)) {
counter.fetch_add(1, std::memory_order_relaxed);
}
}

// Compare-and-swap (CAS): the building block of lock-free algorithms
bool try_increment_to(std::atomic<int>& val, int old_val, int new_val) {
return val.compare_exchange_strong(old_val, new_val);
// Returns true and sets val = new_val if val == old_val
// Returns false (and sets old_val = current value) otherwise
}

The memory_order parameter controls how the compiler and CPU are allowed to reorder memory accesses. memory_order_relaxed allows maximum reordering (fastest, but only safe for counters where ordering does not matter). memory_order_seq_cst (the default) gives full sequential consistency (safe for all uses, but slower).

Spinlocks vs Sleeping Locks

A spinlock is a lock that busy-waits (spins in a loop) instead of sleeping:

#include <atomic>

class Spinlock {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() {
while (flag.test_and_set(std::memory_order_acquire))
; // spin until flag is clear
}
void unlock() {
flag.clear(std::memory_order_release);
}
};

Spinlocks are faster than sleeping locks when the critical section is very short and contention is low - the thread does not pay the OS context switch cost (typically 1-10 microseconds). They are catastrophically bad when the critical section is long or contention is high: spinning threads consume CPU without doing useful work, starving the thread that holds the lock.

Rule of thumb: use spinlocks only when the critical section completes in under a few hundred nanoseconds and you can verify contention is low with a profiler.

Read-Write Locks: Concurrent Reads, Exclusive Writes

A read-write lock (shared mutex) allows multiple threads to hold the lock for reading simultaneously, but gives exclusive access to a single writer:

#include <shared_mutex>

std::shared_mutex rw_lock;
ModelWeights cached_weights;

// Readers - run concurrently
float get_weight(int layer, int idx) {
std::shared_lock<std::shared_mutex> lock(rw_lock); // shared (read) lock
return cached_weights.get(layer, idx);
}

// Writer - exclusive access
void hot_reload_model(const std::string& path) {
ModelWeights new_weights = load_weights(path);
std::unique_lock<std::shared_mutex> lock(rw_lock); // exclusive (write) lock
cached_weights = std::move(new_weights);
// lock released here - readers can proceed
}

This pattern is exactly what you need for model hot-reload in a production inference server: many request threads read the current model weights while a background thread occasionally reloads new weights. A plain mutex would block all readers for the duration of the reload.

Deadlocks

A deadlock occurs when two or more threads each hold a lock and are waiting for a lock held by the other. The system freezes permanently.

Thread A holds lock 1, waiting for lock 2
Thread B holds lock 2, waiting for lock 1

The four conditions for deadlock (Coffman conditions): mutual exclusion, hold-and-wait, no preemption, circular wait. Breaking any one condition prevents deadlock.

The standard prevention technique is lock ordering: always acquire locks in the same global order. If all threads acquire lock 1 before lock 2, the circular wait condition cannot occur.

// WRONG: potential deadlock
void transfer(Account& from, Account& to, float amount) {
std::lock_guard<std::mutex> lock_from(from.mutex);
std::lock_guard<std::mutex> lock_to(to.mutex);
from.balance -= amount;
to.balance += amount;
}
// Thread A: transfer(alice, bob, 10) - locks alice, then bob
// Thread B: transfer(bob, alice, 20) - locks bob, then alice => deadlock

// RIGHT: std::lock acquires both atomically (no partial acquisition)
void transfer_safe(Account& from, Account& to, float amount) {
std::scoped_lock lock(from.mutex, to.mutex); // C++17: deadlock-free
from.balance -= amount;
to.balance += amount;
}

std::scoped_lock (C++17) acquires multiple mutexes using a deadlock avoidance algorithm. Use it whenever you need to hold two locks simultaneously.

Code Examples

Thread Pool in Python

# thread_pool.py
import threading
import queue
import time
from typing import Callable, Any, List

class ThreadPool:
"""
Fixed-size thread pool with a work queue.
Useful for ML preprocessing: submit batch tasks, workers drain the queue.
"""
def __init__(self, n_workers: int, queue_size: int = 0):
self.queue: queue.Queue = queue.Queue(maxsize=queue_size)
self.workers: List[threading.Thread] = []
self._stop_event = threading.Event()

for _ in range(n_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)

def _worker(self):
while not self._stop_event.is_set():
try:
fn, args, kwargs = self.queue.get(timeout=0.1)
try:
fn(*args, **kwargs)
except Exception as e:
print(f"Worker error: {e}")
finally:
self.queue.task_done()
except queue.Empty:
continue

def submit(self, fn: Callable, *args, **kwargs):
"""Submit a task. Blocks if queue is full."""
self.queue.put((fn, args, kwargs))

def join(self):
"""Wait for all submitted tasks to complete."""
self.queue.join()

def shutdown(self):
"""Gracefully stop all workers."""
self._stop_event.set()
for t in self.workers:
t.join()


# Usage: parallel image preprocessing
import numpy as np

def preprocess_image(path: str, results: list, idx: int):
# Simulate preprocessing
time.sleep(0.01)
results[idx] = np.random.rand(224, 224, 3).astype(np.float32)

def preprocess_batch_parallel(paths: List[str]) -> List[np.ndarray]:
results = [None] * len(paths)
pool = ThreadPool(n_workers=8)

for i, path in enumerate(paths):
pool.submit(preprocess_image, path, results, i)

pool.join()
pool.shutdown()
return results

# Test
paths = [f"image_{i}.jpg" for i in range(32)]
batch = preprocess_batch_parallel(paths)
print(f"Preprocessed {len(batch)} images, shape: {batch[0].shape}")

Producer-Consumer ML Data Pipeline

# ml_data_pipeline.py
import threading
import queue
import numpy as np
import time
from typing import Iterator, List

class MLDataPipeline:
"""
Multi-stage pipeline: raw data -> preprocess -> augment -> GPU queue.
Each stage runs in its own thread. Backpressure via bounded queues.
"""
def __init__(self,
data_paths: List[str],
n_preprocess_workers: int = 4,
prefetch_size: int = 16):

self.raw_queue = queue.Queue(maxsize=32)
self.preprocess_queue = queue.Queue(maxsize=prefetch_size)
self.ready_queue = queue.Queue(maxsize=8) # GPU-ready batches
self._stop = threading.Event()

# Stage 1: read raw data
self._loader = threading.Thread(
target=self._load_loop, args=(data_paths,), daemon=True)

# Stage 2: preprocess (multiple workers)
self._preprocessors = [
threading.Thread(target=self._preprocess_loop, daemon=True)
for _ in range(n_preprocess_workers)
]

# Stage 3: batch and send to GPU queue
self._batcher = threading.Thread(
target=self._batch_loop, daemon=True)

self._loader.start()
for w in self._preprocessors: w.start()
self._batcher.start()

def _load_loop(self, paths: List[str]):
for path in paths:
if self._stop.is_set():
break
# Simulate reading from disk
raw = np.random.rand(256, 256, 3).astype(np.float32)
self.raw_queue.put(raw) # blocks if full (backpressure)
# Sentinel values to shut down preprocessors
for _ in self._preprocessors:
self.raw_queue.put(None)

def _preprocess_loop(self):
while True:
raw = self.raw_queue.get()
if raw is None:
self.preprocess_queue.put(None) # pass sentinel forward
return
# Normalize and resize (simplified)
processed = (raw - 0.5) / 0.5
processed = processed[:224, :224, :] # crop to 224x224
self.preprocess_queue.put(processed)

def _batch_loop(self):
batch, sentinel_count = [], 0
n_workers = len(self._preprocessors)

while sentinel_count < n_workers:
item = self.preprocess_queue.get()
if item is None:
sentinel_count += 1
continue
batch.append(item)
if len(batch) == 32: # batch size
stacked = np.stack(batch) # (32, 224, 224, 3)
self.ready_queue.put(stacked)
batch = []

if batch: # flush partial batch
self.ready_queue.put(np.stack(batch))
self.ready_queue.put(None) # signal end

def __iter__(self) -> Iterator[np.ndarray]:
while True:
batch = self.ready_queue.get()
if batch is None:
return
yield batch

def stop(self):
self._stop.set()


# Training loop using the pipeline
def train_one_epoch(data_paths: List[str]):
pipeline = MLDataPipeline(data_paths, n_preprocess_workers=4)

for batch in pipeline:
# batch shape: (32, 224, 224, 3)
print(f"Training on batch shape: {batch.shape}")
time.sleep(0.05) # simulate training step

pipeline.stop()
print("Epoch complete")

# Quick test
train_one_epoch([f"img_{i}.jpg" for i in range(128)])

Read-Write Lock for Model Hot-Reload

# model_hot_reload.py
import threading
import time
import numpy as np
from typing import Optional

class ModelWeights:
def __init__(self, path: str):
self.path = path
# Simulate loading weights
self.weights = {
"layer_0": np.random.rand(256, 512).astype(np.float32),
"layer_1": np.random.rand(512, 256).astype(np.float32),
}
self.version = 1
print(f"Loaded weights v{self.version} from {path}")


class HotReloadableModel:
"""
Thread-safe model wrapper that allows zero-downtime weight reloading.
Multiple inference threads read concurrently; one reload thread writes.
"""
def __init__(self, initial_path: str):
self._weights = ModelWeights(initial_path)
self._lock = threading.RLock() # RLock allows reentrant read
# For true reader-writer semantics, use a custom implementation below

def infer(self, x: np.ndarray) -> np.ndarray:
with self._lock:
w0 = self._weights.weights["layer_0"]
w1 = self._weights.weights["layer_1"]
version = self._weights.version

# Do actual computation outside lock
h = x @ w0
out = h @ w1
return out

def reload(self, new_path: str):
new_weights = ModelWeights(new_path) # load outside lock (slow)
with self._lock:
old_version = self._weights.version
self._weights = new_weights
self._weights.version = old_version + 1
print(f"Hot-reloaded weights, now v{self._weights.version}")


# True readers-writer lock (Python)
class ReadWriteLock:
"""Allows N concurrent readers OR 1 exclusive writer."""
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0

def acquire_read(self):
with self._read_ready:
self._readers += 1

def release_read(self):
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()

def acquire_write(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()

def release_write(self):
self._read_ready.release()


# Test hot reload under load
model = HotReloadableModel("model_v1.pt")

def inference_worker(worker_id: int, n_requests: int):
x = np.random.rand(4, 256).astype(np.float32)
for _ in range(n_requests):
out = model.infer(x)
time.sleep(0.001)

def reload_worker(n_reloads: int):
for i in range(n_reloads):
time.sleep(0.1)
model.reload(f"model_v{i+2}.pt")

threads = []
for i in range(8):
t = threading.Thread(target=inference_worker, args=(i, 50))
threads.append(t)
threads.append(threading.Thread(target=reload_worker, args=(5,)))

for t in threads: t.start()
for t in threads: t.join()
print("All done - no crashes")

Lock-Free Ring Buffer

# lock_free_ringbuffer.py
"""
Single-producer single-consumer lock-free ring buffer using atomics.
Used in ML systems for low-latency inter-thread communication,
e.g., between a GPU inference thread and a network I/O thread.
"""
import threading
import ctypes
import time
from typing import Optional, Any

class SPSCRingBuffer:
"""
Single Producer Single Consumer ring buffer.
Lock-free: producer advances write_head, consumer advances read_head.
Works because head/tail are only written by one thread each.
"""
def __init__(self, capacity: int):
# Capacity must be power of 2 for efficient modulo with bitmask
assert (capacity & (capacity - 1)) == 0, "Capacity must be power of 2"
self.capacity = capacity
self.mask = capacity - 1
self.buffer = [None] * capacity

# These are written by one thread each - no lock needed in this model
self._write_head = 0 # only written by producer
self._read_head = 0 # only written by consumer

def try_push(self, item: Any) -> bool:
"""Push item. Returns False if buffer is full."""
wh = self._write_head
next_wh = (wh + 1) & self.mask
if next_wh == self._read_head:
return False # full
self.buffer[wh] = item
self._write_head = next_wh # visible to consumer after this
return True

def try_pop(self) -> Optional[Any]:
"""Pop item. Returns None if buffer is empty."""
rh = self._read_head
if rh == self._write_head:
return None # empty
item = self.buffer[rh]
self.buffer[rh] = None # release reference
self._read_head = (rh + 1) & self.mask
return item

def size(self) -> int:
wh = self._write_head
rh = self._read_head
return (wh - rh) & self.mask


# Test SPSC buffer between a producer and consumer thread
ring = SPSCRingBuffer(capacity=64)
results = []

def producer():
for i in range(100):
while not ring.try_push(i):
time.sleep(0) # yield to OS

def consumer():
received = 0
while received < 100:
item = ring.try_pop()
if item is not None:
results.append(item)
received += 1
else:
time.sleep(0)

p = threading.Thread(target=producer)
c = threading.Thread(target=consumer)
p.start(); c.start()
p.join(); c.join()

assert results == list(range(100)), "Order not preserved"
print(f"Ring buffer passed: received {len(results)} items in order")

concurrent.futures for Parallel Preprocessing

# parallel_preprocessing.py
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
import time
from typing import List, Tuple

def preprocess_image(path: str) -> np.ndarray:
"""CPU-bound preprocessing. ProcessPoolExecutor bypasses GIL."""
time.sleep(0.01) # simulate disk read
img = np.random.rand(256, 256, 3).astype(np.float32)
# Normalize
img = (img - np.array([0.485, 0.456, 0.406])) / \
np.array([0.229, 0.224, 0.225])
return img.transpose(2, 0, 1) # HWC -> CHW


def decode_and_load(path: str) -> np.ndarray:
"""I/O bound task - ThreadPoolExecutor is fine (releases GIL on I/O)."""
time.sleep(0.005) # simulate network / disk I/O
return np.random.rand(256, 256, 3).astype(np.float32)


def benchmark_parallel(paths: List[str]):
# Sequential baseline
t0 = time.perf_counter()
results = [preprocess_image(p) for p in paths]
seq_time = time.perf_counter() - t0
print(f"Sequential: {seq_time:.2f}s for {len(paths)} images")

# ThreadPoolExecutor - good for I/O bound (GIL released on I/O)
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(decode_and_load, paths))
thread_time = time.perf_counter() - t0
print(f"ThreadPool (I/O): {thread_time:.2f}s")

# ProcessPoolExecutor - good for CPU bound (true parallelism)
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(preprocess_image, paths))
proc_time = time.perf_counter() - t0
print(f"ProcessPool (CPU): {proc_time:.2f}s")

print(f"CPU speedup: {seq_time / proc_time:.1f}x")


paths = [f"image_{i}.jpg" for i in range(64)]
benchmark_parallel(paths)

asyncio Semaphore for API Rate Limiting

# asyncio_rate_limit.py
import asyncio
import time
import random
from typing import List

# Rate limiting inference API calls to external service
MAX_CONCURRENT_API_CALLS = 5 # API rate limit: 5 concurrent requests

async def call_inference_api(
request_id: int,
semaphore: asyncio.Semaphore
) -> dict:
async with semaphore: # blocks if 5 calls already in flight
# Simulate API latency
latency = random.uniform(0.1, 0.5)
await asyncio.sleep(latency)
return {
"request_id": request_id,
"result": f"prediction_{request_id}",
"latency_ms": int(latency * 1000),
}


async def batch_inference(request_ids: List[int]) -> List[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENT_API_CALLS)

t0 = time.perf_counter()
tasks = [call_inference_api(rid, semaphore) for rid in request_ids]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - t0

print(f"Processed {len(results)} requests in {elapsed:.2f}s "
f"(max {MAX_CONCURRENT_API_CALLS} concurrent)")
return list(results)


# asyncio Lock and Event examples
async def demonstrate_async_primitives():
# asyncio.Lock: mutual exclusion in async context
lock = asyncio.Lock()
shared_state = {"count": 0}

async def increment():
async with lock:
val = shared_state["count"]
await asyncio.sleep(0) # yield (simulates I/O)
shared_state["count"] = val + 1

await asyncio.gather(*[increment() for _ in range(10)])
assert shared_state["count"] == 10, "Race condition detected!"
print(f"asyncio Lock: count = {shared_state['count']}")

# asyncio.Event: signal between coroutines
event = asyncio.Event()

async def waiter():
await event.wait()
print("Event received - proceeding")

async def signaler():
await asyncio.sleep(0.05)
event.set()

await asyncio.gather(waiter(), signaler())


# Run the demos
asyncio.run(demonstrate_async_primitives())
request_ids = list(range(20))
results = asyncio.run(batch_inference(request_ids))
print(f"First result: {results[0]}")

Deadlock Detection with Timeouts

# deadlock_detection.py
import threading
import time
import logging

logging.basicConfig(level=logging.INFO)

class DeadlockDetector:
"""
Wraps threading.Lock with timeout-based deadlock detection.
In production: emit a metric, dump a stack trace, alert on-call.
"""
def __init__(self, name: str, timeout: float = 5.0):
self._lock = threading.Lock()
self._name = name
self._timeout = timeout

def __enter__(self):
acquired = self._lock.acquire(timeout=self._timeout)
if not acquired:
# Log all thread stacks for diagnosis
import traceback
import sys
frame_map = sys._current_frames()
for tid, frame in frame_map.items():
logging.error(
f"Thread {tid} stack:\n" +
"".join(traceback.format_stack(frame))
)
raise TimeoutError(
f"Possible deadlock: could not acquire {self._name} "
f"within {self._timeout}s"
)
return self

def __exit__(self, *args):
self._lock.release()


# Demonstrate safe lock ordering to prevent deadlock
lock_a = threading.Lock()
lock_b = threading.Lock()

# WRONG: can deadlock
def wrong_transfer(amount):
with lock_a:
time.sleep(0.001)
with lock_b:
print(f"Wrong path: transferred {amount}")

def wrong_reverse(amount):
with lock_b:
time.sleep(0.001)
with lock_a:
print(f"Wrong reverse: transferred {amount}")

# RIGHT: always acquire in the same order
def safe_op_a(amount):
with lock_a:
with lock_b:
print(f"Safe op A: {amount}")

def safe_op_b(amount):
with lock_a: # same order: a then b
with lock_b:
print(f"Safe op B: {amount}")

# These will never deadlock
t1 = threading.Thread(target=safe_op_a, args=(100,))
t2 = threading.Thread(target=safe_op_b, args=(200,))
t1.start(); t2.start()
t1.join(); t2.join()

Production Engineering Notes

Lock Contention Profiling

When GPU utilization is low despite fast data loading, the bottleneck is often lock contention in the preprocessing pipeline. Profile with Linux perf:

# Profile lock contention (futex waits) for a Python process
perf record -e futex:futex_wait -p $(pgrep -f train.py) -- sleep 10
perf report

# Or use py-spy for pure Python stack profiling
pip install py-spy
py-spy record -o profile.svg --pid $(pgrep -f train.py)

High futex wait counts on a specific lock indicate either a hot critical section (reduce work inside the lock), high contention (use finer-grained locking), or the wrong primitive (replace mutex with RW lock if reads dominate).

The Python GIL and CPU-Bound Concurrency

Python's Global Interpreter Lock (GIL) prevents multiple threads from executing Python bytecode in parallel. For CPU-bound work (pure Python, NumPy operations that do not release the GIL), ThreadPoolExecutor does not give you speedup - use ProcessPoolExecutor or multiprocessing.Pool instead.

NumPy releases the GIL during its C-level computations. PyTorch releases the GIL during C++ dispatch. So ThreadPoolExecutor does work for concurrent PyTorch inference (N threads each running a forward pass) - the GIL is not held during the actual computation, only during Python overhead.

Rule: if the bottleneck is Python code (pure Python loops, list comprehensions, dict lookups), use processes. If the bottleneck is I/O or C extension calls, use threads.

Backpressure Is Not Optional

In a multi-stage pipeline (load -> preprocess -> train), using unbounded queues means a fast producer can accumulate memory indefinitely if the consumer is slower. Always use bounded queues (queue.Queue(maxsize=N)). The producer blocks when the queue is full, naturally slowing down to match the consumer. This is backpressure, and it prevents out-of-memory crashes in long training runs.

Avoiding Lock Granularity Mistakes

A common mistake is using one global lock to protect a large data structure when only a small subset of it is accessed at a time. For example, protecting a dictionary of 1000 model versions with one lock means that looking up version 1 blocks all access while version 2 is being written. Use a separate lock per entry, or a sharded lock (one lock per hash bucket), to reduce contention.

Common Mistakes

:::danger Forgetting the Predicate in condition_variable.wait

The two-argument form of wait is not optional. Without the predicate, spurious wakeups cause the consumer to dequeue nothing, or to dequeue from an empty queue, causing a crash or infinite loop.

# WRONG: no predicate - spurious wakeups cause empty dequeue
import threading
q = []
cv = threading.Condition()

def consumer_wrong():
with cv:
cv.wait() # can wake up spuriously
item = q.pop(0) # crashes if q is empty

# RIGHT: predicate ensures we only proceed when the condition is actually true
def consumer_right():
with cv:
cv.wait_for(lambda: len(q) > 0) # loops until predicate is true
item = q.pop(0)

:::

:::danger Lock Inversion (Acquiring Lock While Holding Another)

Acquiring lock B while holding lock A, while another thread does the reverse, is a deadlock. This is especially easy to trigger in callbacks and event handlers.

# Thread A holds cache_lock, calls model.infer() which acquires model_lock
# Thread B holds model_lock, reads cache which acquires cache_lock
# => deadlock

# Prevention: establish a global lock ordering document and enforce it
# LOCK ORDER: model_lock -> cache_lock -> metrics_lock (always in this order)
# Never acquire an earlier-ordered lock while holding a later one

:::

:::warning Using threading.Lock in asyncio Code

threading.Lock blocks the entire event loop thread. In async code, always use asyncio.Lock, asyncio.Semaphore, etc.

import asyncio
import threading

# WRONG: blocks the event loop
lock = threading.Lock()
async def wrong_handler():
with lock: # blocks entire event loop thread
await some_io() # never reached while lock is held

# RIGHT: asyncio primitives are coroutine-friendly
alock = asyncio.Lock()
async def right_handler():
async with alock: # suspends this coroutine, not the whole loop
await some_io()

:::

:::warning ProcessPoolExecutor Has High Startup Overhead

Each process in a ProcessPoolExecutor imports the entire module tree, initializes Python, and loads shared libraries. For ML workloads that import PyTorch, this can take 5-10 seconds per process at cold start. Use initializer and initargs to pre-load expensive resources once at process start rather than on every task.

from concurrent.futures import ProcessPoolExecutor
import numpy as np

# Pre-load model once per worker process
def init_worker(model_path):
global _model
import torch
_model = torch.jit.load(model_path)

def infer(batch_array):
import torch
with torch.no_grad():
return _model(torch.from_numpy(batch_array)).numpy()

with ProcessPoolExecutor(
max_workers=4,
initializer=init_worker,
initargs=("model.pt",)
) as pool:
results = list(pool.map(infer, batches))

:::

Interview Questions and Answers

Q1: What is the difference between a mutex, a semaphore, and a condition variable? When would you use each in an ML pipeline?

A mutex enforces mutual exclusion: one thread in the critical section at a time. Use it to protect shared state (a preprocessing cache, a metrics counter) from concurrent modification. A semaphore is a counting permit: it limits concurrency to N threads simultaneously. Use it to limit concurrent GPU preprocessors, database connections, or API calls. A condition variable lets a thread sleep until another thread signals that a condition has changed. Use it in a producer-consumer queue where the consumer should sleep when the queue is empty rather than spinning. In a data pipeline: semaphores control how many workers run concurrently; condition variables coordinate the queue between loader and trainer; mutexes protect shared metadata.

Q2: What is a deadlock and how do you prevent it?

A deadlock occurs when two or more threads each hold a resource and are waiting for a resource held by another, forming a cycle with no exit. Prevention requires breaking at least one of Coffman's four conditions: mutual exclusion (unavoidable for most primitives), hold-and-wait (acquire all locks atomically with std::scoped_lock), no preemption (use lock timeouts), or circular wait (the most practical: enforce a global lock ordering and never acquire lock A while holding lock B if B comes after A in the order). In Python, threading.Lock.acquire(timeout=N) with a timeout lets you detect and report deadlocks rather than hanging silently.

Q3: What is the difference between a spinlock and a sleeping lock? Which should you use for a PyTorch custom op's internal state?

A spinlock busy-waits in a loop instead of sleeping. It is faster for very short critical sections (under a few hundred nanoseconds) because it avoids OS context switch overhead (~1-10 microseconds). It is catastrophically bad for longer critical sections because spinning threads consume CPU without doing useful work, starving the lock holder. For a PyTorch custom op's internal state (e.g., a small counter, a flag), a spinlock is appropriate if the critical section is just a few instructions. For anything involving memory allocation, I/O, or calling back into Python, use a sleeping mutex.

Q4: Explain the Python GIL. Does it prevent data races between Python threads?

The GIL (Global Interpreter Lock) ensures that only one thread executes Python bytecode at a time. This means pure Python operations (list append, dict update) are atomic with respect to GIL-holding threads - the GIL prevents most races on Python objects. However, the GIL does NOT prevent: races in C extensions that release the GIL (NumPy, PyTorch), races involving multiple Python operations that need to be atomic as a unit (check-then-act patterns), or races when using multiprocessing (separate processes with no GIL). Never rely on the GIL for correctness - use explicit synchronization. The GIL only prevents some races by accident; it is not a substitute for proper locking.

Q5: What is a read-write lock and when is it the right choice for an ML inference server?

A read-write lock (shared mutex) allows multiple concurrent readers OR one exclusive writer, but not both. It is the right choice when reads are much more frequent than writes and reads are safe to run concurrently. In an ML inference server: many request-handling threads read the current model weights simultaneously (shared lock), while a background thread periodically reloads new weights (exclusive lock). With a plain mutex, all readers would serialize even though they do not modify state, artificially limiting throughput. With a read-write lock, all readers run at full concurrency until a reload begins. Python: threading.RLock is a reentrant mutex, not a RW lock. Use concurrent.futures or implement an explicit RW lock with a Condition.

Q6: What is a lock-free algorithm and what does compare-and-swap do?

A lock-free algorithm guarantees system-wide progress: at least one thread always makes progress, even if individual threads are delayed. It achieves this with atomic operations, primarily compare-and-swap (CAS). CAS atomically reads a memory location, compares it to an expected value, and writes a new value only if the current value matches the expected. It returns whether the swap succeeded. Lock-free algorithms use a "read, compute new value, CAS" retry loop: if the CAS fails (another thread changed the value), retry from scratch. This is safe and non-blocking because the retry always reflects the latest state. Lock-free data structures (ring buffers, queues, stacks) are used in ML systems for very low-latency inter-thread communication where a mutex would introduce unacceptable tail latency.

Q7: How does asyncio concurrency differ from threading, and when is it the right choice for ML workloads?

asyncio uses cooperative concurrency: a single OS thread runs multiple coroutines that explicitly yield control (await) when waiting for I/O. There is no parallelism (no multiple cores used) but there is concurrency (multiple requests can be in flight simultaneously while one is waiting for I/O). Threading uses preemptive concurrency with OS-managed context switches. asyncio is the right choice when the bottleneck is I/O latency (waiting for HTTP responses, database queries, disk reads) and you need to handle thousands of concurrent connections - asyncio scales better than threads at high concurrency because each coroutine uses much less memory than an OS thread. For ML: asyncio is excellent for inference API servers that rate-limit external calls, but wrong for CPU-bound preprocessing (no parallelism) or GPU work (no benefit from concurrency).

© 2026 EngineersOfAI. All rights reserved.