Skip to main content

Processes, Threads, and Coroutines

The Night the Training Pipeline Fell Apart

It was 2 AM on a Sunday when the on-call engineer at a large language model startup got paged. Their data preprocessing pipeline - the one that fed 400 GPU machines - had ground to a halt. The GPUs were at 0% utilization. The CPUs were pegged at 100%. Something was deeply wrong with the Python workers that were supposed to tokenize and batch incoming training text.

The engineer pulled up the monitoring dashboard and saw something baffling. There were 32 Python processes running. All of them showed 100% CPU. But the throughput had collapsed to nearly zero. The logs showed millions of messages per second being processed - but nothing was making it to the GPUs. The pipeline was busy doing nothing.

An hour of digging revealed the problem: someone had refactored the preprocessing workers from multiprocessing.Pool to Python threading.Thread, thinking threads would be "lighter weight" and share memory more efficiently. They were right about memory sharing. They were catastrophically wrong about the outcome. Every "parallel" thread was actually serialized by Python's Global Interpreter Lock. Thirty-two "parallel" workers were taking turns - one at a time - tokenizing text. The CPUs were pegged because all 32 threads were constantly fighting to acquire the GIL, spending more time on lock contention than actual work.

The fix took 20 minutes. Switching back to multiprocessing.Pool with proper worker counts, correct spawn start method on Linux, and shared-memory NumPy arrays for data passing. Throughput went from near-zero to saturating the network. The GPUs spun back up. By 3 AM, training had resumed.

This incident illustrates something that nearly every ML engineer learns the hard way: the choice between processes, threads, and coroutines is not a performance micro-optimization. It is a fundamental architectural decision. Get it wrong and your pipeline either deadlocks, starves your GPUs, leaks memory, or silently produces wrong results. Get it right and your training runs at the speed of your hardware, not the speed of Python's internals.

This lesson is the deep-dive you need to never make that 2 AM mistake. We cover the OS-level mechanics, the Python-specific quirks (GIL, fork vs spawn), and concrete patterns for the three most common ML use cases: parallel data preprocessing, concurrent LLM API calls, and inference serving.


Why This Exists - The Problem of Doing Multiple Things at Once

Modern computers have multiple CPU cores. A training job that uses only one core wastes 95% of the hardware on a 96-core machine. The problem is: how do you structure software to use multiple cores without turning your codebase into an unmaintainable tangle of race conditions, deadlocks, and non-determinism?

Before we had the abstractions we have today, programmers had two options. Fork a new child process - expensive in memory, but isolated. Or write assembly-level cooperative multitasking - fragile and machine-specific. Neither scaled to the kinds of concurrent workloads ML systems demand.

Processes, threads, and coroutines are three different answers to this question, each with a different trade-off between isolation, overhead, and ease of programming. Understanding why each exists requires understanding what problem the previous solution failed to solve.


Historical Context - From Batch Jobs to Async Futures

1960s - Processes. Early operating systems ran one program at a time. IBM introduced time-sharing in the mid-1960s - the idea that the OS could rapidly switch between multiple programs, giving each the illusion of exclusive CPU access. Each running program became a "process" - an isolated unit of execution with its own memory, file descriptors, and CPU state. The Unix process model, formalized by Thompson and Ritchie at Bell Labs in 1969, became the foundation of modern operating systems.

1967 - Threads. Dijkstra and colleagues at Eindhoven explored the idea of multiple flows of execution within a single process - sharing the same memory space but maintaining separate stacks and CPU registers. Threads were lighter than processes (no memory duplication) but required careful synchronization. The POSIX threads standard (pthreads) arrived in 1995 and made threading portable across Unix systems.

1958 and then 1975 - Coroutines. Melvin Conway coined the term "coroutine" in 1958 to describe subroutines that could yield control to each other cooperatively. The idea lay dormant for decades, then re-emerged with Donald Knuth's description in "The Art of Computer Programming" (1975). Python 3.4 added the asyncio module in 2014 (PEP 3156), and Python 3.5 added the async/await syntax (PEP 492) in 2015, finally making coroutines a first-class language feature.

2008 - The GIL problem becomes critical. As multi-core CPUs went mainstream, Python's Global Interpreter Lock (introduced in CPython 1.5 in 1992 for thread safety) became a significant bottleneck. Guido van Rossum's 2007 response to proposals to remove the GIL explained why it could not simply be removed without rewriting the entire CPython memory model. The GIL remains in CPython today, with Python 3.12 beginning the path to an optional free-threaded mode.


Core Concepts

What Is a Process?

A process is the OS's fundamental unit of isolation. When you run python train.py, the OS creates a process with:

  • Its own virtual address space (typically 128 TB on x86-64 Linux)
  • Its own file descriptor table (open files, sockets)
  • Its own CPU register state
  • Its own signal handlers and user/group IDs
  • A Process Control Block (PCB) maintained by the kernel

The PCB is a kernel data structure that contains everything the OS needs to pause and resume a process. It includes the saved CPU registers (program counter, stack pointer, general-purpose registers), memory mappings, open file descriptors, scheduling metadata (priority, time slice used), and the process state (running, sleeping, zombie, etc.).

Process Control Block (PCB)
+---------------------------------+
| PID, PPID, process group |
| CPU registers (saved on switch) |
| Program counter |
| Memory maps (page table ptr) |
| File descriptor table |
| Signal handlers |
| Scheduling info (priority, CFS) |
| Resource limits (ulimit) |
+---------------------------------+

What Is a Thread?

A thread is an execution unit within a process. Multiple threads share the same address space, file descriptors, and heap. Each thread has its own:

  • Stack (default 8 MB on Linux)
  • CPU register state
  • Thread-local storage (TLS)

Creating a thread is much cheaper than creating a process: no new address space, no page table duplication, no copying of file descriptor tables. A pthread_create() call on Linux is roughly 10-50 microseconds. A fork() call can be 100-500 microseconds even with copy-on-write.

The trade-off: threads share memory, which means they can corrupt each other's state. A bug in one thread can crash the entire process. This requires synchronization primitives: mutexes, semaphores, condition variables, read-write locks.

What Is a Coroutine?

A coroutine is a function that can suspend its own execution and resume later - without blocking the OS thread it runs on. When a coroutine hits an await expression, it voluntarily yields control back to an event loop, which can then run another coroutine. No kernel involvement, no context switch overhead, no scheduler intervention.

The key insight: coroutines are cooperative, not preemptive. The OS does not interrupt them. They interrupt themselves. This makes them extremely lightweight (a Python coroutine frame is roughly 200 bytes) but also means one misbehaving coroutine that never yields can starve all others.

Context Switching Cost

When the OS switches between two processes or threads, it must:

  1. Save the current process's CPU registers to its PCB
  2. Update TLB (Translation Lookaside Buffer) entries
  3. Load the new process's registers from its PCB
  4. Potentially invalidate CPU caches if switching to a different NUMA domain

The cost of a context switch between processes is typically 1-10 microseconds on modern hardware. Thread context switches within the same process are cheaper (2-5 microseconds) because TLB entries may still be valid (same address space). Coroutine switches are pure userspace function calls that save/restore a small stack frame, and cost roughly 100-500 nanoseconds.

For a training pipeline processing 10,000 batches per second, even 5 microseconds per context switch across 32 threads adds up to significant overhead:

Context Switch Overhead=Nswitches/s×Tswitch×Nthreads\text{Context Switch Overhead} = N_{\text{switches/s}} \times T_{\text{switch}} \times N_{\text{threads}}

=10,000×5μs×32=1.6 seconds wasted per second= 10{,}000 \times 5\mu s \times 32 = 1.6 \text{ seconds wasted per second}

The Global Interpreter Lock (GIL)

CPython (the reference Python implementation) has a mutex called the Global Interpreter Lock. This lock must be held to execute any Python bytecode. Only one thread can hold the GIL at a time.

Why does it exist? CPython uses reference counting for memory management. Every Python object has a ob_refcnt field. When two threads simultaneously increment or decrement the same object's reference count without synchronization, the count can become corrupt, leading to use-after-free bugs or memory leaks. The GIL is the simplest solution: make reference count operations atomic by only allowing one thread to run at a time.

What the GIL actually does: The GIL is released when a thread performs I/O (network, disk), calls certain C extensions (NumPy, PyTorch operations), or executes time.sleep(). It is held during pure Python bytecode execution.

This creates an important division:

Work TypeGIL BehaviorThreading Works?
Pure Python computationGIL heldNo - threads serialized
NumPy/PyTorch C operationsGIL releasedYes - C-level parallelism
Network I/OGIL releasedYes - blocked threads yield
Disk I/OGIL releasedYes - but asyncio is better
time.sleep()GIL releasedYes - but use asyncio

Demonstration of GIL serialization:

import threading
import time
import multiprocessing

def cpu_bound_work(n: int) -> int:
"""Pure Python computation - GIL is held the entire time."""
count = 0
for _ in range(n):
count += 1
return count

def benchmark_threads_vs_processes(n: int = 50_000_000):
# Single thread baseline
start = time.perf_counter()
cpu_bound_work(n)
single_time = time.perf_counter() - start
print(f"Single thread: {single_time:.2f}s")

# Two threads - should be 2x faster if truly parallel
start = time.perf_counter()
t1 = threading.Thread(target=cpu_bound_work, args=(n,))
t2 = threading.Thread(target=cpu_bound_work, args=(n,))
t1.start(); t2.start()
t1.join(); t2.join()
thread_time = time.perf_counter() - start
print(f"Two threads: {thread_time:.2f}s (should be ~2x SLOWER due to GIL!)")

# Two processes - actually parallel
start = time.perf_counter()
p1 = multiprocessing.Process(target=cpu_bound_work, args=(n,))
p2 = multiprocessing.Process(target=cpu_bound_work, args=(n,))
p1.start(); p2.start()
p1.join(); p2.join()
proc_time = time.perf_counter() - start
print(f"Two processes: {proc_time:.2f}s (should be ~0.5x)")

# Typical output:
# Single thread: 4.83s
# Two threads: 9.41s (SLOWER - GIL contention overhead added!)
# Two processes: 2.51s (genuinely faster)

The two-thread version is slower than single-threaded. Both threads constantly contend for the GIL, spending more time on lock acquisition than actual work.


Concurrency Architecture Diagram


Fork vs Spawn vs Forkserver

When Python creates a new process, it uses one of three start methods. The choice matters enormously for ML workloads.

fork (default on Linux): The child process is an exact copy of the parent at the moment of the fork call. All memory is copied (lazily, via copy-on-write). All file descriptors are duplicated. The child inherits the parent's entire state - including any loaded PyTorch models, CUDA contexts, and open connections.

Problem with fork: CUDA is not fork-safe. Forking after CUDA initialization causes mysterious crashes and GPU corruption. OpenSSL is not fork-safe. Any background thread in the parent (Python's garbage collector thread, PyTorch's async operations) is not copied - the child starts with inconsistent state if those threads held locks.

spawn (default on macOS and Windows, recommended for PyTorch): A fresh Python interpreter is started. The child imports the module from scratch and re-executes only the worker function. No state is inherited. Safe for CUDA, OpenSSL, and any library that uses background threads.

Problem with spawn: Startup overhead. Each worker must re-import PyTorch, NumPy, and your model code. For short tasks, this overhead dominates. Shared objects (large datasets, model weights) must be explicitly passed through queues or shared memory.

forkserver: A dedicated "server" process is forked once at startup, before any CUDA/OpenSSL initialization. When a new worker is needed, the forkserver forks itself (not the main process) to create the worker. Safer than fork, faster than spawn for recurring task patterns.

import multiprocessing as mp
import numpy as np
from typing import List

# ALWAYS set start method at module level, guarded by __main__
def setup_worker(model_path: str):
"""Called once per worker process to load the model."""
import torch
global _model
_model = torch.load(model_path, map_location="cpu")
_model.eval()

def inference_worker(batch: np.ndarray) -> np.ndarray:
"""Runs inference using the per-process model."""
import torch
with torch.no_grad():
tensor = torch.from_numpy(batch).float()
output = _model(tensor)
return output.numpy()

if __name__ == "__main__":
# Critical: set to 'spawn' before any CUDA or multiprocessing usage
mp.set_start_method("spawn", force=True)

model_path = "model.pt"
n_workers = 4

with mp.Pool(
processes=n_workers,
initializer=setup_worker,
initargs=(model_path,)
) as pool:
batches: List[np.ndarray] = [np.random.randn(32, 512) for _ in range(20)]
results: List[np.ndarray] = pool.map(inference_worker, batches)

print(f"Processed {len(results)} batches")

Python Multiprocessing for Parallel Data Preprocessing

The most common ML use case for multiprocessing is the DataLoader pattern: transform raw data items in parallel before feeding them to the model.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Iterator, List, Dict, Any
import numpy as np
import time

# Module-level transform function (must be picklable - no lambdas!)
def preprocess_sample(sample_path: str) -> Dict[str, Any]:
"""
CPU-intensive preprocessing: load, normalize, augment.
This runs in a worker process, bypassing the GIL completely.
"""
import cv2 # import inside worker - cv2 is not fork-safe on some systems
img = cv2.imread(sample_path)
if img is None:
raise ValueError(f"Could not read: {sample_path}")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Normalize to [0, 1]
img = img.astype(np.float32) / 255.0
# Transpose HWC -> CHW for PyTorch
img = img.transpose(2, 0, 1)
return {
"path": sample_path,
"tensor": img,
"shape": img.shape
}

def parallel_preprocess_pool(
sample_paths: List[str],
n_workers: int = None,
chunksize: int = 4
) -> List[Dict[str, Any]]:
"""
Use multiprocessing.Pool for homogeneous task distribution.
chunksize batches tasks to reduce IPC overhead.
"""
if n_workers is None:
n_workers = mp.cpu_count() - 1 # leave one CPU for main process

with mp.Pool(processes=n_workers) as pool:
# pool.map blocks until all results are ready
results = pool.map(preprocess_sample, sample_paths, chunksize=chunksize)
return results

def parallel_preprocess_futures(
sample_paths: List[str],
n_workers: int = 8
) -> Iterator[Dict[str, Any]]:
"""
Use ProcessPoolExecutor for heterogeneous tasks or streaming results.
as_completed() yields results as they finish, not in submission order.
"""
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = {
executor.submit(preprocess_sample, path): path
for path in sample_paths
}
for future in as_completed(futures):
try:
yield future.result()
except Exception as e:
print(f"Failed to process {futures[future]}: {e}")

Shared Memory for Zero-Copy IPC

Passing large NumPy arrays through queues requires pickling and unpickling - expensive for large arrays. Python 3.8+ provides multiprocessing.shared_memory for true zero-copy sharing.

from multiprocessing import shared_memory, Pool
import numpy as np

def worker_with_shared_memory(
shm_name: str,
shape: tuple,
dtype: str,
start_idx: int,
end_idx: int
):
"""
Access shared memory directly - no copy, no pickle overhead.
Worker modifies its slice of the shared array in-place.
"""
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Process the slice [start_idx:end_idx] in-place
arr[start_idx:end_idx] = arr[start_idx:end_idx] * 2.0 + 1.0
existing_shm.close() # do not unlink - only the creator should unlink

def process_large_array_shared(data: np.ndarray, n_workers: int = 4) -> np.ndarray:
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_arr = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_arr[:] = data # copy once into shared memory

chunk_size = len(data) // n_workers
tasks = [
(shm.name, data.shape, str(data.dtype), i * chunk_size, (i + 1) * chunk_size)
for i in range(n_workers)
]

with Pool(n_workers) as pool:
pool.starmap(worker_with_shared_memory, tasks)

result = shared_arr.copy() # copy result out before cleanup
shm.close()
shm.unlink() # only creator unlinks
return result

asyncio for Concurrent LLM API Calls

When calling external LLM APIs (OpenAI, Anthropic, Cohere), the bottleneck is network I/O, not CPU. Using asyncio lets you send hundreds of requests concurrently from a single thread.

import asyncio
import time
from typing import List
from dataclasses import dataclass
import aiohttp

@dataclass
class LLMRequest:
prompt: str
max_tokens: int = 256
temperature: float = 0.0

@dataclass
class LLMResponse:
prompt: str
completion: str
latency_ms: float

async def call_llm_api(
session: aiohttp.ClientSession,
request: LLMRequest,
api_key: str,
semaphore: asyncio.Semaphore
) -> LLMResponse:
"""
Single async API call. The semaphore limits concurrency
to avoid hitting rate limits.
"""
async with semaphore:
start = time.perf_counter()
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": request.prompt}],
"max_tokens": request.max_tokens,
"temperature": request.temperature,
}
headers = {"Authorization": f"Bearer {api_key}"}

async with session.post(
"https://api.openai.com/v1/chat/completions",
json=payload,
headers=headers
) as response:
data = await response.json()
latency = (time.perf_counter() - start) * 1000

completion = data["choices"][0]["message"]["content"]
return LLMResponse(
prompt=request.prompt,
completion=completion,
latency_ms=latency
)

async def batch_llm_calls(
requests: List[LLMRequest],
api_key: str,
max_concurrent: int = 20,
timeout_seconds: float = 30.0
) -> List[LLMResponse]:
"""
Execute all LLM API calls concurrently.
max_concurrent controls parallelism without hitting rate limits.
"""
semaphore = asyncio.Semaphore(max_concurrent)
timeout = aiohttp.ClientTimeout(total=timeout_seconds)

async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = [
call_llm_api(session, req, api_key, semaphore)
for req in requests
]
# asyncio.gather preserves order (unlike as_completed)
responses = await asyncio.gather(*tasks, return_exceptions=True)

successful = [r for r in responses if isinstance(r, LLMResponse)]
failed = [r for r in responses if isinstance(r, Exception)]
print(f"Completed: {len(successful)}, Failed: {len(failed)}")
return successful

def run_batch_evaluation(prompts: List[str], api_key: str) -> List[LLMResponse]:
requests = [LLMRequest(prompt=p) for p in prompts]
return asyncio.run(batch_llm_calls(requests, api_key))

asyncio Event Loop Internals

The event loop runs in a tight cycle: check the ready queue, run the next coroutine until its first await, then check if any I/O polled events or timers have fired and add those coroutines back to the ready queue. Each await is a potential context switch point - nothing runs between two await expressions without explicit yielding.


Cooperative vs Preemptive Scheduling

Preemptive scheduling (threads, processes): The OS timer fires every 1-10ms and forcibly interrupts the running thread, saving its state and running another. The running code has no say in when it is interrupted. This guarantees fairness but introduces non-determinism - any two lines of code can be separated by a context switch.

Cooperative scheduling (asyncio coroutines): The running coroutine runs until it explicitly yields with await. The event loop then picks the next ready coroutine. The code knows exactly where context switches can occur: only at await points.

This distinction matters for correctness:

import threading

# THREAD SAFETY PROBLEM: preemptive scheduling
class UnsafeCounter:
def __init__(self):
self.value = 0

def increment(self):
# OS can interrupt between read and write!
current = self.value # read
# <--- CONTEXT SWITCH CAN HAPPEN HERE --->
self.value = current + 1 # write: stale current, lost update!

counter = UnsafeCounter()

def thread_worker():
for _ in range(100_000):
counter.increment()

threads = [threading.Thread(target=thread_worker) for _ in range(10)]
[t.start() for t in threads]
[t.join() for t in threads]
# Expected: 1,000,000. Actual: ~600,000-900,000 (race condition!)
print(f"Result: {counter.value}") # Wrong answer!
import asyncio

# ASYNCIO SAFE: cooperative scheduling
class SafeAsyncCounter:
def __init__(self):
self.value = 0

async def increment(self):
# No await here means this is atomic - cannot be interrupted
self.value += 1

async def async_worker(counter: SafeAsyncCounter):
for _ in range(100_000):
await counter.increment()

async def main():
counter = SafeAsyncCounter()
workers = [async_worker(counter) for _ in range(10)]
await asyncio.gather(*workers)
print(f"Result: {counter.value}") # Always exactly 1,000,000

asyncio.run(main())

Thread Safety with Locks

When threads must share mutable state, use synchronization primitives:

import threading
from typing import List
import queue

class ThreadSafeResultCollector:
"""Thread-safe container for collecting results from worker threads."""

def __init__(self):
self._lock = threading.Lock()
self._results: List = []
self._error_count = 0

def add_result(self, result):
with self._lock: # context manager ensures lock is always released
self._results.append(result)

def add_error(self):
with self._lock:
self._error_count += 1

@property
def results(self) -> List:
with self._lock:
return self._results.copy() # return copy to avoid external mutation

# Producer-consumer with thread-safe Queue
def producer_consumer_pattern(items: List, n_workers: int = 4):
"""
threading.Queue is safe for use across threads.
Use for I/O-bound work where GIL is released during waits.
"""
work_queue: queue.Queue = queue.Queue(maxsize=n_workers * 2)
result_queue: queue.Queue = queue.Queue()

def producer():
for item in items:
work_queue.put(item) # blocks if queue is full (backpressure)
for _ in range(n_workers):
work_queue.put(None) # sentinel to stop workers

def worker():
while True:
item = work_queue.get()
if item is None:
break
import time
time.sleep(0.001) # simulate I/O-bound work (GIL released)
result_queue.put(item * 2)
work_queue.task_done()

prod_thread = threading.Thread(target=producer)
worker_threads = [threading.Thread(target=worker) for _ in range(n_workers)]

prod_thread.start()
[t.start() for t in worker_threads]
prod_thread.join()
[t.join() for t in worker_threads]

results = []
while not result_queue.empty():
results.append(result_queue.get())
return results

ML Workload Patterns: When to Use What

PyTorch DataLoader Worker Configuration

import torch
from torch.utils.data import Dataset, DataLoader
import multiprocessing as mp
import numpy as np
from pathlib import Path

class ImageDataset(Dataset):
def __init__(self, paths, transform=None):
self.paths = paths
self.transform = transform

def __len__(self):
return len(self.paths)

def __getitem__(self, idx):
# This runs in a worker process (if num_workers > 0)
# Import here to avoid issues with fork safety
import cv2
img = cv2.imread(str(self.paths[idx]))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
tensor = torch.from_numpy(img.transpose(2, 0, 1))
if self.transform:
tensor = self.transform(tensor)
return tensor

def create_optimal_dataloader(dataset: Dataset, batch_size: int = 32) -> DataLoader:
n_cpus = mp.cpu_count()

return DataLoader(
dataset,
batch_size=batch_size,
num_workers=n_cpus - 1, # leave 1 CPU for training loop
prefetch_factor=2, # prefetch 2 batches per worker
persistent_workers=True, # keep workers alive between epochs
pin_memory=True, # page-lock memory for faster GPU transfer
multiprocessing_context="spawn", # CRITICAL: safe with CUDA
drop_last=True, # avoid last partial batch size issues
)

# WRONG: num_workers > 0 with default fork start method after CUDA init
# This causes: RuntimeError: Cannot re-initialize CUDA in forked subprocess
#
# RIGHT: always set spawn before any CUDA usage
if __name__ == "__main__":
mp.set_start_method("spawn", force=True)
dataset = ImageDataset(paths=list(Path("data/").glob("*.jpg")))
loader = create_optimal_dataloader(dataset, batch_size=64)
for batch in loader:
# Each batch is prefetched in worker processes, pinned in CPU memory
batch = batch.cuda(non_blocking=True) # async transfer to GPU

Green Threads and gevent

Python's gevent library provides "green threads" - coroutines that look like threads to the programmer but run cooperatively under the hood. gevent monkey-patches the standard library so that blocking I/O calls (socket.recv, open, time.sleep) automatically yield to the gevent hub.

# gevent: threads that are actually coroutines
# Must be first - patches stdlib I/O to be non-blocking
import gevent
from gevent import monkey
monkey.patch_all()

import requests # now automatically non-blocking via gevent

def fetch_url(url: str) -> str:
response = requests.get(url) # yields to gevent hub while waiting
return response.text[:100]

urls = [
"https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://api.example.com/data/3",
]

# gevent.pool.Pool limits concurrency
pool = gevent.pool.Pool(10)
results = pool.map(fetch_url, urls)

# gevent is mostly legacy today - prefer asyncio + aiohttp
# Use gevent when integrating with legacy code that uses requests/urllib
# and cannot be refactored to use async/await

gevent has the advantage of working transparently with synchronous code - useful for gradually migrating a legacy codebase. The disadvantage is that monkey.patch_all() is a global mutation of the stdlib that can cause surprising behavior in libraries that do not expect it.


asyncio Pitfalls in Practice

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

# WRONG: blocking call inside coroutine blocks entire event loop
async def bad_coroutine_blocking():
time.sleep(1) # Blocks the entire event loop - no other coroutine runs!
return "done"

# RIGHT: use asyncio.sleep for async waiting
async def good_coroutine_async():
await asyncio.sleep(1) # Yields to event loop - other coroutines can run
return "done"

# WRONG: CPU-bound work in asyncio starves other coroutines
async def bad_cpu_work():
result = sum(range(10_000_000)) # Runs for 1+ seconds without yielding
return result

# RIGHT: offload CPU work to thread or process pool
async def good_cpu_work_thread():
loop = asyncio.get_event_loop()
# run_in_executor runs in a thread, releases the GIL
result = await loop.run_in_executor(
None, # uses default ThreadPoolExecutor
sum,
range(10_000_000)
)
return result

# For true CPU parallelism in asyncio (separate processes):
async def cpu_work_in_process():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
# This actually runs in a separate Python process - true parallelism
result = await loop.run_in_executor(executor, sum, range(100_000_000))
return result

# Pipeline: async S3 fetcher + process pool inference
async def async_ml_pipeline(s3_keys: list, api_key: str):
queue = asyncio.Queue(maxsize=32) # bounded - backpressure prevents OOM
loop = asyncio.get_event_loop()
executor = ProcessPoolExecutor(max_workers=4)

async def s3_fetcher(key: str):
# Simulate async S3 download
await asyncio.sleep(0.05)
data = f"data_for_{key}".encode()
await queue.put(data)

async def inference_driver():
while True:
batch = await queue.get()
if batch is None:
break
# Offload CPU-intensive inference to process pool
result = await loop.run_in_executor(executor, len, batch)
queue.task_done()

fetchers = [s3_fetcher(key) for key in s3_keys]
drivers = [inference_driver() for _ in range(4)]
await asyncio.gather(*fetchers, *[asyncio.create_task(d) for d in drivers])
executor.shutdown(wait=True)

Production Engineering Notes

CPU-Bound vs I/O-Bound Decision Tree

Before choosing a concurrency model, profile your workload:

import cProfile
import pstats
import io

def profile_function(func, *args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()

s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
ps.print_stats(20)
print(s.getvalue())
return result

# If top entries show socket/ssl/file ops -> asyncio or threading
# If top entries show numpy/cv2/tokenizers -> multiprocessing
# If time is spent in torch C ops -> already parallel (CUDA or OpenBLAS)

Avoiding the Process Explosion on Import

import multiprocessing as mp
from typing import List
import os

# Module-level function - picklable
def square(x: int) -> int:
return x * x

# ALWAYS guard Pool creation under __main__ on Windows and macOS
if __name__ == "__main__":
mp.set_start_method("spawn") # or "fork" on Linux only

with mp.Pool(processes=os.cpu_count() - 1) as pool:
results = pool.map(square, range(100))

# WRONG: lambda not picklable
# pool.map(lambda x: x*x, items) -> PicklingError

# WRONG: no __main__ guard on Windows
# -> Each spawned worker imports module -> spawns more workers -> explosion

# PATTERN: Explicit timeout prevents zombie processes
def robust_pool_map(tasks: List, timeout: float = 300.0) -> List:
with mp.Pool() as pool:
async_result = pool.map_async(square, tasks)
try:
return async_result.get(timeout=timeout)
except mp.TimeoutError:
pool.terminate()
pool.join()
raise RuntimeError(f"Workers timed out after {timeout}s")

Environment Variables for Controlling Internal Thread Pools

NumPy and PyTorch use internal thread pools (OpenBLAS, MKL, oneDNN) for operations like matrix multiply. When you spawn multiple worker processes, each spawns its own thread pool, potentially oversubscribing your CPUs:

# Set before starting Python - limits internal thread pools
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
export VECLIB_MAXIMUM_THREADS=1
export NUMEXPR_NUM_THREADS=1
import os

# Or set programmatically before importing numpy/torch
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

import numpy as np # import AFTER setting env vars

:::danger Fatal Mistakes to Avoid

Using multiprocessing.fork after CUDA initialization. Once any CUDA operation has been called in the parent process (including importing PyTorch and calling .cuda() on a tensor), forking is unsafe. CUDA state is not fork-safe. The symptom is often a hang or segfault in the child process, not a clear error message. Always set mp.set_start_method("spawn") before any CUDA usage, or use multiprocessing_context="spawn" in your DataLoader constructor.

Passing large objects through multiprocessing queues. When you pass a 10 GB dataset through mp.Queue or as an argument to pool.map(), Python pickles the entire object and sends it through a pipe. This can take longer than the actual computation. Use multiprocessing.shared_memory or torch.multiprocessing.spawn with shared tensors instead.

Calling a blocking function inside an asyncio coroutine. requests.get(), time.sleep(), open() with standard I/O - any of these inside an async def function will block the entire event loop. All other coroutines freeze until the call returns. Use await asyncio.sleep(), aiohttp, aiofiles, and loop.run_in_executor() for blocking operations.

Not guarding process creation with if __name__ == "__main__". On Windows and macOS (spawn start method), failing to guard process creation causes an infinite loop of worker spawning when the worker module is imported by each new process. :::

:::warning Threading with NumPy and PyTorch

NumPy and PyTorch release the GIL during C-level operations. Python threads can run NumPy code in parallel - the GIL is released while numpy executes its C kernels. But this parallelism is unpredictable and depends on which operations release the GIL and for how long. Do not rely on this for CPU-bound training pipelines. Use multiprocessing for guaranteed parallelism on CPU-bound ML workloads.

Also: NumPy has its own internal thread pool (OpenBLAS/MKL) for operations like np.dot. If you spawn multiple processes that each call np.dot, each process spawns its own thread pool. On a 16-core machine with 8 worker processes each using 8 OpenBLAS threads, you end up with 64 threads competing for 16 cores. Set OMP_NUM_THREADS=1 and MKL_NUM_THREADS=1 in your worker environment. :::


Concurrency Model Comparison


Interview Questions and Answers

Q1: Explain the GIL. Why does it exist? What are the practical consequences for ML workloads?

The GIL (Global Interpreter Lock) is a mutex in CPython that ensures only one thread executes Python bytecode at a time. It exists because CPython uses reference counting for garbage collection: every Python object has an ob_refcnt integer. If two threads concurrently increment or decrement the same reference count, the count can become corrupted (non-atomic read-modify-write), leading to use-after-free bugs or memory leaks. The GIL is the simplest solution: make reference count operations atomic by only allowing one thread to run at a time.

For ML workloads: the GIL means Python threads cannot parallelize CPU-bound Python code. Training loops, tokenization, and pure Python preprocessing are serialized even across multiple threads. The fix is multiprocessing (separate Python interpreters, no shared GIL). The GIL is released during I/O operations and during C extension calls (NumPy, PyTorch), so threads do provide parallelism for I/O-bound work. Python 3.12 introduced an experimental free-threaded mode (PEP 703) but it is not yet production-ready as of 2025.

Q2: A PyTorch DataLoader with num_workers=8 is crashing with "RuntimeError: Cannot re-initialize CUDA in forked subprocess." What is happening and how do you fix it?

The main training script has already called a CUDA operation before the DataLoader spawns its workers. CUDA initializes a device context tied to the current process's PID. When fork() creates a child process, the child inherits this CUDA context - but CUDA contexts are not fork-safe. The CUDA driver detects the inconsistency and raises an error.

The fix is to set multiprocessing_context="spawn" in the DataLoader constructor, or call mp.set_start_method("spawn") at the top of the if __name__ == "__main__" block. With spawn, each worker starts a fresh Python interpreter without any inherited CUDA state. This adds startup overhead per epoch but eliminates the CUDA corruption risk entirely.

Q3: You need to call an LLM API 10,000 times to evaluate a model. The API allows 50 concurrent requests. Should you use multiprocessing, threading, or asyncio? Why?

asyncio is the right choice. The bottleneck is network I/O: each API call spends 95%+ of its time waiting for network bytes. The actual computation (serializing the request, deserializing JSON) is negligible.

With asyncio, you create 10,000 coroutines and use asyncio.Semaphore(50) to limit concurrency to 50 simultaneous requests. All coroutines run in a single OS thread, interleaved on each await. Each coroutine frame is roughly 200 bytes (vs 8 MB stack per OS thread), so 10,000 coroutines use only 2 MB of memory total.

Threading would also work (GIL is released during network I/O), but 10,000 threads would exhaust memory (8 MB stack per thread = 80 GB). Multiprocessing is completely wrong for this - subprocess overhead dominates, and the parallelism provides no benefit since we are waiting on network, not computing.

Q4: What is the difference between fork, spawn, and forkserver start methods? When would you use each?

fork: The child is a copy of the parent at fork time. Fast (copy-on-write memory), but inherits all parent state including locks, file descriptors, and library state. Unsafe with CUDA, OpenSSL, and any library that uses background threads. Use only for pure Python workloads with no native library initialization in the parent. Default on Linux.

spawn: A fresh Python interpreter starts and imports the module from scratch. Safe for CUDA and all native libraries because the child has no inherited state. Slower startup (must re-import all modules). Requires all arguments passed to workers to be picklable. Default on macOS and Windows. Always use this for PyTorch DataLoader workers.

forkserver: A dedicated server process is forked once at startup, before any native library initialization. When a new worker is needed, the forkserver forks itself (not the main process). Safer than fork (no CUDA contamination), faster than spawn (no full re-import). Useful as a middle ground for patterns with many short-lived worker processes.

Q5: What is the difference between cooperative and preemptive scheduling? Why does it matter for correctness in ML serving systems?

In preemptive scheduling (OS threads), the kernel can interrupt any running thread at any point between any two machine instructions. This means any shared mutable state requires explicit locking. Even counter += 1 (which compiles to load, increment, store) can be interrupted between load and store, causing a lost update. Race conditions are non-deterministic and hard to reproduce under load.

In cooperative scheduling (asyncio coroutines), a coroutine runs until it explicitly yields with await. Context switches only happen at await points. Code between two await statements is atomic from the scheduler's perspective.

For ML inference servers: using asyncio means you can maintain an in-memory cache of recent predictions without locks (no context switch inside the cache lookup code), while threading requires a lock around every cache read/write. For a high-throughput serving system handling 10,000 requests per second, lock contention in a threaded server can add 10-100 microseconds of latency per request. asyncio eliminates this class of problem entirely for the request routing and caching code.

Q6: A data pipeline uses 16 multiprocessing workers for image preprocessing. On a 16-core machine, CPU utilization is at 200% (16 cores x 12.5% each). What are two likely causes and how would you diagnose them?

First likely cause: each worker spawned its own OpenBLAS/MKL thread pool. If the image preprocessing uses any NumPy matrix operations, each of the 16 workers launched 16 OpenBLAS threads by default, creating 256 threads competing for 16 CPU cores. Diagnose by running ps -eLf | grep python | wc -l and comparing actual thread count. Fix: set OMP_NUM_THREADS=1 and MKL_NUM_THREADS=1 before importing NumPy.

Second likely cause: IPC serialization overhead. If preprocessed images are large (e.g., 224x224x3 float32 = 600 KB per sample), pickling and unpickling them through mp.Queue/pool.map pipes adds significant CPU overhead independent of the actual processing work. Diagnose by profiling with py-spy and looking for time in pickle.dumps/pickle.loads. Fix: use multiprocessing.shared_memory for zero-copy IPC or convert to HDF5/memory-mapped files that workers can read directly without pickling.


Quick Reference: Choosing the Right Concurrency Model

ScenarioModelReason
Image preprocessing for DataLoadermultiprocessing.Pool with spawnCPU-bound, needs true parallelism, CUDA-safe
Calling 10K LLM API endpointsasyncio + aiohttp + SemaphorePure I/O, needs high concurrency, low overhead
Simple parallel file downloadThreadPoolExecutorI/O-bound, GIL released, simple to implement
Multi-GPU inference servingOne Process per GPUCUDA contexts not shareable across processes
Background log shippingasyncio task or daemon threadI/O-bound, low priority
CPU feature engineeringProcessPoolExecutorCPU-bound, needs true parallelism
Streaming LLM responses to 1000 clientsasyncio with generatorI/O-bound, connection-per-coroutine is cheap

The rule that covers 90% of cases: multiprocessing for CPU-bound work, asyncio for high-concurrency I/O, threading for simple low-concurrency I/O.

© 2026 EngineersOfAI. All rights reserved.