Multiprocessing in Python
Reading time: ~30 minutes | Level: Intermediate → Engineering
Before reading further, predict what happens here:
from PIL import Image
import threading, multiprocessing, time, io
def resize_image(data):
img = Image.open(io.BytesIO(data))
img = img.resize((128, 128))
return img.tobytes()
images = [open(f"photo_{i}.jpg", "rb").read() for i in range(4)]
# Version A - 4 threads
start = time.perf_counter()
threads = [threading.Thread(target=resize_image, args=(img,)) for img in images]
for t in threads: t.start()
for t in threads: t.join()
thread_time = time.perf_counter() - start
# Version B - 4 processes
start = time.perf_counter()
with multiprocessing.Pool(4) as pool:
results = pool.map(resize_image, images)
process_time = time.perf_counter() - start
print(f"Threads: {thread_time:.2f}s")
print(f"Processes: {process_time:.2f}s")
# ?
Show Answer
Typical output on a 4-core machine:
Threads: 0.95s
Processes: 0.26s
Why? Image resizing in Pillow is CPU-bound C code. Pillow does release the GIL during pixel operations - so threads can run somewhat concurrently. But Python-level orchestration, data marshalling, and interpreter overhead still bottleneck threaded execution.
Processes sidestep the GIL entirely. Each of the 4 processes runs its own CPython interpreter on its own CPU core with zero contention. On a 4-core machine you get near-linear speedup: 4 tasks in roughly the time of 1.
The exact ratio depends on your hardware, Pillow version, and image size. The pattern holds universally for CPU-bound work: processes scale; threads do not.
This gap - threads giving no speedup, processes giving 3–4x speedup - is the defining characteristic of the GIL's impact on CPU-bound work. Knowing when to reach for multiprocessing, and how to use it correctly, separates engineers who write scalable Python from those who wonder why their "parallel" code is slow.
What You Will Learn
- How
multiprocessing.Processworks - spawning, joining, exit codes, PIDs - The process vs thread memory model - why processes are isolated and why that matters
Pool.map(),Pool.starmap(),Pool.apply_async(),Pool.imap()- the full toolkit- Inter-process communication with
QueueandPipe- the producer/consumer pattern - Shared memory with
Value,Array, andmultiprocessing.shared_memory ProcessPoolExecutor- the modernconcurrent.futuresapproach- Serialization constraints - why everything crossing process boundaries must be picklable
- Why
if __name__ == "__main__"is non-negotiable on Windows and macOS
Prerequisites
- Lesson 01 (Threading) - thread vs process mental model
- GIL Explained (Module 03) - why the GIL prevents CPU parallelism in threads
- Basic familiarity with
concurrent.futures.ThreadPoolExecutor
Part 1 - multiprocessing.Process
Spawning a Process
multiprocessing.Process is the lowest-level API. It mirrors threading.Thread almost exactly:
import multiprocessing
import os
import time
def worker(name: str, delay: float) -> None:
"""This function runs in a separate OS process."""
print(f"[{name}] PID={os.getpid()}, parent PID={os.getppid()}")
time.sleep(delay)
print(f"[{name}] done")
if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=("alpha", 1.0))
p2 = multiprocessing.Process(target=worker, args=("beta", 0.5))
p1.start() # spawn OS process, does NOT block
p2.start()
print(f"Main PID={os.getpid()}")
print(f"p1 pid={p1.pid}, alive={p1.is_alive()}")
p2.join() # wait for p2 to finish
p1.join() # wait for p1 to finish
print(f"p1 exitcode={p1.exitcode}") # 0 = success
print(f"p2 exitcode={p2.exitcode}") # 0 = success
Key attributes and methods:
| Attribute / Method | What it does |
|---|---|
p.start() | Spawn the OS process - returns immediately |
p.join(timeout=None) | Block until process finishes (or timeout expires) |
p.is_alive() | True if process is still running |
p.pid | OS process ID (available after start()) |
p.exitcode | None while running; 0 on success; negative = killed by signal |
p.terminate() | Send SIGTERM (Unix) or TerminateProcess (Windows) |
p.kill() | Send SIGKILL (Unix only) - cannot be caught or ignored |
p.daemon | If True, process is killed when parent exits |
Process Lifecycle
import multiprocessing
import time
def crasher():
"""Simulates a process that fails."""
time.sleep(0.2)
raise RuntimeError("something went wrong")
if __name__ == "__main__":
p = multiprocessing.Process(target=crasher)
p.start()
p.join()
print(f"exitcode: {p.exitcode}")
# exitcode: 1 - non-zero means the process raised an exception
# The exception traceback is printed in the child's stderr, not the parent's
:::warning Process Exceptions Are Silent in the Parent
If a child process raises an exception, the parent does not receive it automatically. The traceback prints to stderr in the child, but p.exitcode will be non-zero. When using bare Process, always check p.exitcode after join(). The Pool and ProcessPoolExecutor APIs propagate exceptions back properly.
:::
Part 2 - Process vs Thread - The Memory Model
Why Processes Are Isolated
Threads share the same memory space - one thread can read or write any variable owned by another. Processes have completely separate memory spaces. Each process gets its own copy of everything.
Decision Table - Process vs Thread
| Factor | Thread | Process |
|---|---|---|
| Memory | Shared - easy to communicate, easy to corrupt | Isolated - safe by default, IPC required |
| Startup cost | ~0.1ms | 50–500ms (spawn) / ~1ms (fork on Linux) |
| GIL impact | Serialized for Python bytecodes | Each process has its own GIL - true parallelism |
| CPU-bound Python | No speedup (often slower) | Near-linear speedup up to core count |
| I/O-bound | Excellent - GIL released during I/O | Works but wastes memory and startup cost |
| Shared state | Easy (but needs locks) | Hard (Queue, Pipe, shared_memory, Manager) |
| Crash isolation | One thread crash can kill the process | Child process crash doesn't affect parent |
| Memory usage | Low - threads share heap | High - each process has its own heap |
| Best for | I/O-bound: HTTP, DB, file, sockets | CPU-bound: image processing, parsing, math |
Rule of thumb: If your bottleneck is waiting (network, disk, database) - use threads or asyncio. If your bottleneck is computing (Python loops, data transformation, parsing) - use processes.
Part 3 - Pool - Distributing Work Across Processes
multiprocessing.Pool manages a fixed pool of worker processes and distributes tasks across them. It is far more ergonomic than managing Process objects manually.
Pool.map() - The Workhorse
import multiprocessing
import math
def is_prime(n: int) -> bool:
"""CPU-bound primality check - pure Python."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
if __name__ == "__main__":
numbers = list(range(1_000_000, 1_100_000))
# Sequential
sequential_primes = [n for n in numbers if is_prime(n)]
# Parallel - Pool.map blocks until all results are ready
# Returns results in the SAME ORDER as input - guaranteed
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(is_prime, numbers)
parallel_primes = [n for n, prime in zip(numbers, results) if prime]
assert sequential_primes == parallel_primes # same results, different order of computation
print(f"Found {len(parallel_primes)} primes")
pool.map(func, iterable) - equivalent to list(map(func, iterable)) but distributed across processes. Results are in input order, regardless of which process finished first.
Pool.starmap() - Multiple Arguments
import multiprocessing
def power(base: int, exponent: int) -> int:
return base ** exponent
if __name__ == "__main__":
args = [(2, 10), (3, 8), (5, 6), (7, 4), (11, 3)]
with multiprocessing.Pool() as pool:
# starmap unpacks each tuple as separate arguments
results = pool.starmap(power, args)
# [1024, 6561, 15625, 2401, 1331]
print(results)
Pool.apply_async() - Non-Blocking Submission
import multiprocessing
import time
def slow_compute(n: int) -> int:
time.sleep(0.5)
return n * n
if __name__ == "__main__":
with multiprocessing.Pool(4) as pool:
# Submit all tasks immediately - don't wait
futures = [pool.apply_async(slow_compute, (i,)) for i in range(8)]
# Do other work in the main process while workers compute...
print("Tasks submitted, doing other work...")
time.sleep(0.1)
# Collect results - .get() blocks until the specific task is done
results = [f.get(timeout=5.0) for f in futures]
print(results) # [0, 1, 4, 9, 16, 25, 36, 49]
apply_async also accepts callback and error_callback:
results = []
errors = []
def on_success(result):
results.append(result) # called in the MAIN process when task completes
def on_error(exc):
errors.append(exc) # called in the MAIN process when task raises
with multiprocessing.Pool(4) as pool:
for i in range(10):
pool.apply_async(slow_compute, (i,), callback=on_success, error_callback=on_error)
pool.close()
pool.join() # wait for all tasks to complete
print(f"Success: {len(results)}, Errors: {len(errors)}")
Pool.imap() - Lazy Evaluation
import multiprocessing
def process_line(line: str) -> str:
return line.strip().upper()
if __name__ == "__main__":
# imap returns an iterator - results are yielded as they complete
# Memory-efficient for large iterables: doesn't load all results into RAM
with multiprocessing.Pool(4) as pool:
# chunksize=100: sends 100 lines at a time to each worker (less IPC overhead)
result_iter = pool.imap(process_line, open("large_file.txt"), chunksize=100)
for processed in result_iter:
print(processed) # yield results as they're ready
map vs imap - choose based on memory:
| Method | Returns | Result order | Memory |
|---|---|---|---|
pool.map() | list - waits for all | Input order | All results in RAM |
pool.imap() | Iterator - lazy | Input order (buffers internally) | Streaming |
pool.imap_unordered() | Iterator - lazy | Completion order | Streaming |
:::tip Use chunksize to Reduce IPC Overhead
By default, Pool.map() sends one item at a time to each worker via IPC (inter-process communication). For small, fast tasks this IPC overhead dominates. Use chunksize to batch items: pool.map(func, items, chunksize=100) sends 100 items per batch, massively reducing IPC round-trips.
:::
Part 4 - Inter-Process Communication - Queue and Pipe
Processes cannot share Python objects directly. Communication happens through explicit IPC mechanisms.
multiprocessing.Queue - Thread and Process Safe
import multiprocessing
import time
import random
def producer(queue: multiprocessing.Queue, n_items: int) -> None:
"""Generates work items and puts them on the queue."""
for i in range(n_items):
item = {"id": i, "value": random.randint(1, 100)}
queue.put(item)
print(f"[producer] sent item {i}")
time.sleep(0.05)
# Sentinel value signals consumers to stop
queue.put(None)
print("[producer] done")
def consumer(queue: multiprocessing.Queue, worker_id: int) -> None:
"""Reads work items from the queue and processes them."""
while True:
item = queue.get() # blocks until an item is available
if item is None:
queue.put(None) # pass the sentinel to the next consumer
print(f"[consumer-{worker_id}] done")
break
result = item["value"] ** 2
print(f"[consumer-{worker_id}] processed item {item['id']}: {result}")
if __name__ == "__main__":
q = multiprocessing.Queue(maxsize=10) # maxsize=10: producer blocks if queue is full
prod = multiprocessing.Process(target=producer, args=(q, 20))
cons1 = multiprocessing.Process(target=consumer, args=(q, 1))
cons2 = multiprocessing.Process(target=consumer, args=(q, 2))
prod.start()
cons1.start()
cons2.start()
prod.join()
cons1.join()
cons2.join()
multiprocessing.Pipe - Bidirectional Channel
import multiprocessing
def child_process(conn):
"""Receives a number, returns its square."""
while True:
data = conn.recv() # blocks until parent sends
if data is None:
conn.close()
break
conn.send(data ** 2) # send result back to parent
if __name__ == "__main__":
# Pipe() returns two Connection objects - one for each end
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=child_process, args=(child_conn,))
p.start()
child_conn.close() # parent doesn't use the child end
for n in [3, 7, 12, 25]:
parent_conn.send(n)
result = parent_conn.recv()
print(f"{n}^2 = {result}")
parent_conn.send(None) # shutdown signal
parent_conn.close()
p.join()
Queue vs Pipe - when to use each:
Queue | Pipe | |
|---|---|---|
| Direction | Many producers → many consumers | Point-to-point (2 ends) |
| Processes | Any number can use it | Exactly 2 processes |
| Thread-safe | Yes | No (not safe for multiple threads on same end) |
| Overhead | Higher (uses a Pipe + thread internally) | Lower |
| Best for | Worker pools, task distribution | Single child ↔ parent communication |
Part 5 - Shared Memory
Value and Array - Low-Level Shared Memory
import multiprocessing
import ctypes
import time
def increment_counter(counter, n: int) -> None:
"""Increment a shared counter n times."""
for _ in range(n):
with counter.get_lock(): # Value has a built-in lock
counter.value += 1
def fill_array(shared_array, start: int) -> None:
"""Fill a shared array with computed values."""
for i in range(len(shared_array)):
shared_array[i] = start + i
if __name__ == "__main__":
# Value: single shared value
# ctypes.c_int: underlying C type (c_int, c_double, c_long, etc.)
counter = multiprocessing.Value(ctypes.c_int, 0)
p1 = multiprocessing.Process(target=increment_counter, args=(counter, 100_000))
p2 = multiprocessing.Process(target=increment_counter, args=(counter, 100_000))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"counter = {counter.value}") # 200,000
# Array: fixed-size shared array
shared_arr = multiprocessing.Array(ctypes.c_double, 10) # 10 doubles, init to 0.0
p3 = multiprocessing.Process(target=fill_array, args=(shared_arr, 100))
p3.start()
p3.join()
print(list(shared_arr)) # [100.0, 101.0, ..., 109.0]
:::danger Shared Mutable State Without Synchronization Is Undefined Behavior
A multiprocessing.Value without counter.get_lock() is a data race - multiple processes read and write the same memory location simultaneously. The result is non-deterministic corruption. Always use the built-in lock (value.get_lock()) or an external multiprocessing.Lock when multiple processes modify a Value or Array.
# WRONG - race condition, counter will be wrong
def unsafe_increment(counter, n):
for _ in range(n):
counter.value += 1 # read-modify-write without lock
# CORRECT
def safe_increment(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1
:::
multiprocessing.shared_memory - Modern High-Performance Shared Memory
Python 3.8 introduced shared_memory for sharing large buffers (e.g., NumPy arrays) between processes with zero-copy:
import multiprocessing.shared_memory as shm
import multiprocessing
import numpy as np
def worker_process(shm_name: str, shape: tuple, dtype: str) -> None:
"""Attach to existing shared memory and modify the array in-place."""
# Attach to the existing shared block - no copying
existing_shm = shm.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# Modify the array in-place - visible to the parent immediately
arr *= 2
arr += 1
existing_shm.close() # detach - do NOT unlink here, parent owns it
if __name__ == "__main__":
# Create a NumPy array in shared memory
original = np.arange(100, dtype=np.float64)
shared_block = shm.SharedMemory(create=True, size=original.nbytes)
shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shared_block.buf)
shared_arr[:] = original # copy data into shared memory
print(f"Before: {shared_arr[:5]}") # [0. 1. 2. 3. 4.]
p = multiprocessing.Process(
target=worker_process,
args=(shared_block.name, original.shape, str(original.dtype))
)
p.start()
p.join()
print(f"After: {shared_arr[:5]}") # [1. 3. 5. 7. 9.] - modified by child
shared_block.close()
shared_block.unlink() # ALWAYS unlink - releases the shared memory block
Value/Array vs shared_memory:
Value / Array | shared_memory | |
|---|---|---|
| Size | Small (single value / fixed array) | Arbitrary - gigabytes of data |
| Built-in lock | Yes (Value.get_lock()) | No - you manage synchronization |
| NumPy support | Limited | Excellent - wrap with np.ndarray |
| Python version | All versions | 3.8+ |
| Zero-copy | Yes (both use shared memory) | Yes |
Part 6 - ProcessPoolExecutor - The Modern Way
concurrent.futures.ProcessPoolExecutor is the recommended API for new code. It provides a cleaner interface, better exception propagation, and integrates with asyncio.
from concurrent.futures import ProcessPoolExecutor, as_completed
import math
import time
def factorize(n: int) -> list[int]:
"""Return all prime factors of n. CPU-bound pure Python."""
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
if __name__ == "__main__":
numbers = [2**31 - 1, 2**29 - 1, 2**27 - 1, 2**25 - 1,
999_999_937, 999_961_003, 999_961_007, 987_654_321]
# map(): ordered results, blocks until all done
print("=== pool.map() ===")
with ProcessPoolExecutor(max_workers=4) as executor:
start = time.perf_counter()
results = list(executor.map(factorize, numbers))
elapsed = time.perf_counter() - start
for n, factors in zip(numbers, results):
print(f"{n:>15,} → {factors}")
print(f"Elapsed: {elapsed:.3f}s")
print("\n=== submit() + as_completed() ===")
# submit(): process results as they finish (fastest-first order)
with ProcessPoolExecutor(max_workers=4) as executor:
future_to_n = {executor.submit(factorize, n): n for n in numbers}
for future in as_completed(future_to_n):
n = future_to_n[future]
try:
factors = future.result()
print(f"{n:>15,} → {factors}")
except Exception as exc:
print(f"{n} raised: {exc}")
Exception Propagation - ProcessPoolExecutor vs Pool
from concurrent.futures import ProcessPoolExecutor
def may_fail(n: int) -> int:
if n == 3:
raise ValueError(f"cannot process {n}")
return n * n
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(may_fail, i) for i in range(5)]
for i, future in enumerate(futures):
try:
print(f"result[{i}] = {future.result()}")
except ValueError as e:
print(f"result[{i}] FAILED: {e}")
# result[0] = 0
# result[1] = 1
# result[2] = 4
# result[3] FAILED: cannot process 3 ← exception re-raised in parent
# result[4] = 16
:::tip ProcessPoolExecutor Over multiprocessing.Pool for New Code
ProcessPoolExecutor propagates exceptions cleanly to the parent, integrates with asyncio via loop.run_in_executor(), and has a consistent Future-based API shared with ThreadPoolExecutor. Reserve multiprocessing.Pool for cases where you need imap(), imap_unordered(), or apply_async() with callbacks.
:::
Part 7 - Serialization - The Pickle Constraint
Everything passed between processes - function arguments, return values, task functions themselves - must be picklable. Python's pickle module serializes the data, sends it through a Pipe, and deserializes it in the target process.
What Is and Is Not Picklable
import pickle
import multiprocessing
# Picklable - safe to pass between processes
pickle.dumps(42) # int
pickle.dumps("hello") # str
pickle.dumps([1, 2, 3]) # list
pickle.dumps({"a": 1}) # dict
pickle.dumps((1, 2)) # tuple
import math
pickle.dumps(math.sqrt) # built-in functions - OK
pickle.dumps(math.pi) # module-level constants - OK
# NOT picklable - will crash with PicklingError
# Lambdas - defined inline, no importable name
fn = lambda x: x * 2
# pickle.dumps(fn) # PicklingError: Can't pickle <function <lambda>>
# Local functions - defined inside another function
def make_multiplier(factor):
def multiply(x): # this function cannot be pickled
return x * factor
return multiply
fn2 = make_multiplier(3)
# pickle.dumps(fn2) # AttributeError: Can't pickle local object
# File handles, sockets, database connections, locks
import threading
lock = threading.Lock()
# pickle.dumps(lock) # TypeError: cannot pickle '_thread.lock' object
:::danger Lambdas and Local Functions Cannot Cross Process Boundaries
The most common multiprocessing mistake is passing a lambda or locally-defined function to Pool.map() or ProcessPoolExecutor.submit(). Pickle serializes functions by name - it stores the module and qualified name, then reimports on the other side. Lambdas have no importable name. Define worker functions at module level.
# WRONG - PicklingError at runtime
with ProcessPoolExecutor() as executor:
results = list(executor.map(lambda x: x * 2, range(10)))
# CORRECT - module-level function
def double(x):
return x * 2
with ProcessPoolExecutor() as executor:
results = list(executor.map(double, range(10)))
:::
Working Around Pickle Limitations
# Strategy 1: functools.partial for parameterized functions
from functools import partial
import multiprocessing
def multiply_by(factor: int, x: int) -> int:
return x * factor
if __name__ == "__main__":
triple = partial(multiply_by, 3) # partial objects ARE picklable
with multiprocessing.Pool(4) as pool:
results = pool.map(triple, range(10))
print(results) # [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
# Strategy 2: initializer - pass non-picklable state to pool workers at startup
import multiprocessing
import sqlite3
db_conn = None # will be initialized per worker
def init_worker(db_path: str) -> None:
global db_conn
db_conn = sqlite3.connect(db_path) # each worker gets its own connection
def query_db(user_id: int) -> dict:
cursor = db_conn.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
return {"id": row[0], "name": row[1]} if row else {}
if __name__ == "__main__":
with multiprocessing.Pool(
processes=4,
initializer=init_worker,
initargs=("users.db",)
) as pool:
users = pool.map(query_db, range(1, 101))
Part 8 - if __name__ == "__main__" - Why It Is Required
On Windows and macOS, the default process start method is spawn: Python starts a fresh interpreter and imports your script to initialize the worker. If your script creates a Pool or Process at the top level (not guarded by __name__ == "__main__"), the imported script creates another Pool, which imports the script again, infinitely.
# WRONG - will crash on Windows/macOS with RecursionError or OSError
from multiprocessing import Pool
def work(n):
return n * n
with Pool(4) as pool: # this line runs when workers import the module
results = pool.map(work, range(10)) # → infinite process spawning
# CORRECT - the Pool is only created in the original script invocation
from multiprocessing import Pool
def work(n):
return n * n
if __name__ == "__main__": # guards against re-execution on worker import
with Pool(4) as pool:
results = pool.map(work, range(10))
print(results)
Start methods by platform:
| Method | Platforms | How it works | Speed |
|---|---|---|---|
fork | Linux (default) | Copy parent process memory via fork() | Fast (~1ms) |
spawn | Windows (default), macOS (default 3.8+) | Start fresh interpreter, import module | Slow (100–500ms) |
forkserver | Unix (opt-in) | Dedicated server process handles forking | Medium |
# On Linux, you can use fork explicitly for speed:
import multiprocessing
if __name__ == "__main__":
multiprocessing.set_start_method("fork") # must be called before any Pool/Process
with multiprocessing.Pool(4) as pool:
results = pool.map(work, range(10))
Full Example - Parallel Image Thumbnail Generator
A production-quality parallel thumbnail generator demonstrating ProcessPoolExecutor, proper error handling, progress tracking, and pickling-safe design:
"""
parallel_thumbnailer.py - Generate thumbnails from a directory of images.
Usage:
python parallel_thumbnailer.py ./photos ./thumbs --size 256 --workers 8
"""
from __future__ import annotations
import argparse
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
try:
from PIL import Image
except ImportError:
raise SystemExit("Install Pillow: pip install Pillow")
# --- Data classes (picklable - no lambdas, no file handles) ---
@dataclass
class ThumbnailTask:
source_path: str
dest_path: str
size: int
@dataclass
class ThumbnailResult:
source_path: str
dest_path: str
success: bool
error: str | None
elapsed_ms: float
# --- Worker function at MODULE LEVEL - required for pickling ---
def generate_thumbnail(task: ThumbnailTask) -> ThumbnailResult:
"""
Resize one image to a square thumbnail.
Runs in a worker process - must be picklable (module-level function).
"""
start = time.perf_counter()
try:
with Image.open(task.source_path) as img:
# LANCZOS is high-quality downsampling
img.thumbnail((task.size, task.size), Image.LANCZOS)
os.makedirs(os.path.dirname(task.dest_path), exist_ok=True)
img.save(task.dest_path, optimize=True, quality=85)
elapsed_ms = (time.perf_counter() - start) * 1000
return ThumbnailResult(
source_path=task.source_path,
dest_path=task.dest_path,
success=True,
error=None,
elapsed_ms=elapsed_ms,
)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start) * 1000
return ThumbnailResult(
source_path=task.source_path,
dest_path=task.dest_path,
success=False,
error=str(exc),
elapsed_ms=elapsed_ms,
)
def build_tasks(source_dir: Path, dest_dir: Path, size: int) -> list[ThumbnailTask]:
"""Scan source directory and build a list of ThumbnailTask objects."""
extensions = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".webp"}
tasks = []
for src in source_dir.rglob("*"):
if src.suffix.lower() not in extensions:
continue
rel = src.relative_to(source_dir)
dest = dest_dir / rel.with_suffix(".jpg")
tasks.append(ThumbnailTask(
source_path=str(src),
dest_path=str(dest),
size=size,
))
return tasks
def run_parallel(tasks: list[ThumbnailTask], max_workers: int) -> None:
total = len(tasks)
completed = 0
failed = 0
wall_start = time.perf_counter()
print(f"Processing {total} images with {max_workers} workers...")
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_map = {executor.submit(generate_thumbnail, t): t for t in tasks}
for future in as_completed(future_map):
result: ThumbnailResult = future.result()
completed += 1
if result.success:
status = f"OK ({result.elapsed_ms:.0f}ms)"
else:
failed += 1
status = f"ERR {result.error}"
# Progress indicator
pct = (completed / total) * 100
print(f"[{pct:5.1f}%] {os.path.basename(result.source_path):30s} {status}")
wall_elapsed = time.perf_counter() - wall_start
print(f"\nDone: {completed - failed}/{total} succeeded, "
f"{failed} failed, {wall_elapsed:.2f}s total")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parallel thumbnail generator")
parser.add_argument("source", type=Path, help="Source image directory")
parser.add_argument("dest", type=Path, help="Destination thumbnail directory")
parser.add_argument("--size", type=int, default=256, help="Max thumbnail dimension (px)")
parser.add_argument("--workers", type=int, default=os.cpu_count(),
help="Number of worker processes")
args = parser.parse_args()
tasks = build_tasks(args.source, args.dest, args.size)
if not tasks:
print(f"No images found in {args.source}")
else:
run_parallel(tasks, args.workers)
Design decisions in this example:
ThumbnailTaskandThumbnailResultaredataclassobjects - picklable, no hidden stategenerate_thumbnailis at module level - picklable by name- Errors are returned as
ThumbnailResult(success=False)rather than raised - prevents one bad image from killing other futures as_completed()reports progress in real time rather than waiting for all tasksif __name__ == "__main__"guards all Pool-creating code
Graded Practice Challenges
Beginner - Predict the Output
import multiprocessing
counter = 0 # module-level in the parent
def increment():
global counter
for _ in range(1000):
counter += 1
print(f"child counter = {counter}")
if __name__ == "__main__":
p = multiprocessing.Process(target=increment)
p.start()
p.join()
print(f"parent counter = {counter}")
What do the two print statements output, and why?
Show Answer
Output:
child counter = 1000
parent counter = 0
The child process receives a copy of the parent's memory at spawn time. counter = 0 is copied. The child increments its own copy to 1000. The parent's counter is untouched - it remains 0.
This is the fundamental process isolation property. Changes in child processes are never visible to the parent (unless you use Queue, Pipe, Value, Array, or shared_memory).
Intermediate - Fix the Bugs
This code has three bugs. Identify and fix each one:
from multiprocessing import Pool
import time
# Bug 1: worker is a lambda
square = lambda x: x * x
# Bug 2: missing __name__ guard
with Pool(4) as pool:
results = pool.map(square, range(10))
# Bug 3: shared state modified from workers without synchronization
shared_results = []
def collect(n):
time.sleep(0.01)
shared_results.append(n * n) # wrong approach for multiprocessing
with Pool(4) as pool:
pool.map(collect, range(20))
print(len(shared_results)) # will print 0, not 20
Show Solution
Bug 1 - Lambda is not picklable. Replace with a module-level function:
def square(x):
return x * x
Bug 2 - Missing __name__ == "__main__" guard. On Windows/macOS, worker processes re-import the module and re-execute Pool(4) at import time, causing infinite recursion:
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(square, range(10))
print(results)
Bug 3 - Shared list not shared between processes. Each worker gets a copy of shared_results. Appends in the child are invisible to the parent. Use pool.map() return values instead:
def compute(n):
time.sleep(0.01)
return n * n
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(compute, range(20))
print(len(results)) # 20
Advanced - Design Challenge
Design a ParallelPipeline class that:
- Accepts a list of processing stages, each a
Callable[[T], T] - Has a
run(items)method that applies each stage to all items usingProcessPoolExecutor - After each stage completes, passes results as input to the next stage
- Tracks per-stage timing
- Returns the final results and a timing report
Example usage:
def normalize(x): return x / 255.0
def augment(x): return x * 1.1
def threshold(x): return 1.0 if x > 0.5 else 0.0
pipeline = ParallelPipeline(stages=[normalize, augment, threshold], workers=4)
results, timing = pipeline.run(list(range(256)))
# timing = {"stage_0": 0.12s, "stage_1": 0.09s, "stage_2": 0.08s}
Show Reference Solution
from __future__ import annotations
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, TypeVar, Any
import time
T = TypeVar("T")
class ParallelPipeline:
"""
Applies a sequence of transformation stages to a list of items,
running each stage in parallel via ProcessPoolExecutor.
All stage functions must be picklable (module-level definitions).
"""
def __init__(
self,
stages: list[Callable],
workers: int | None = None,
) -> None:
self.stages = stages
self.workers = workers
def run(self, items: list[Any]) -> tuple[list[Any], dict[str, float]]:
"""
Run all pipeline stages on items.
Returns:
(final_results, timing_report)
timing_report maps stage index to elapsed seconds
"""
timing: dict[str, float] = {}
current_items = list(items)
for stage_idx, stage_fn in enumerate(self.stages):
stage_key = f"stage_{stage_idx} ({stage_fn.__name__})"
stage_start = time.perf_counter()
with ProcessPoolExecutor(max_workers=self.workers) as executor:
current_items = list(executor.map(stage_fn, current_items))
timing[stage_key] = time.perf_counter() - stage_start
print(f" {stage_key}: {timing[stage_key]:.3f}s → {len(current_items)} items")
return current_items, timing
# Module-level functions - picklable
def normalize(x: float) -> float:
return x / 255.0
def augment(x: float) -> float:
return min(x * 1.1, 1.0)
def threshold(x: float) -> float:
return 1.0 if x > 0.5 else 0.0
if __name__ == "__main__":
pipeline = ParallelPipeline(stages=[normalize, augment, threshold], workers=4)
results, timing = pipeline.run(list(range(256)))
print(f"\nResults sample: {results[:8]}")
print(f"\nTiming report:")
for stage, elapsed in timing.items():
print(f" {stage}: {elapsed:.3f}s")
print(f" Total: {sum(timing.values()):.3f}s")
Key design decisions:
- Each
ProcessPoolExecutoris a fresh context manager per stage - allows different worker counts per stage if extended current_itemsis reassigned after each stage - simple and correct; avoids shared mutable state- Stage functions must be module-level (not nested) - explicitly stated in the docstring
timingkeys include the function name for readability in reports
Key Takeaways
multiprocessing.Processspawns a true OS process with an isolated memory space and its own GIL - changes in one process are invisible to others unless you use IPC- The GIL has no effect across processes - each process runs Python bytecodes independently on its own CPU core, enabling true CPU parallelism
- Process startup cost is significant (50–500ms on spawn, ~1ms on fork) - don't use multiprocessing for tasks that complete in milliseconds; the overhead dominates
Pool.map()distributes work and returns ordered results;Pool.imap()is lazy and memory-efficient for large datasets; usechunksizeto reduce IPC overhead for small tasksQueueandPipeare the correct tools for passing data between processes - never use plain Python variables (each process has its own copy)ValueandArrayprovide low-overhead shared memory for primitive types;multiprocessing.shared_memoryenables zero-copy sharing of large buffers (e.g., NumPy arrays)- Everything sent between processes must be picklable - lambdas, local functions, file handles, locks, and sockets are not picklable
- Use
functools.partialor module-level functions to work around pickle constraints; usePool(initializer=...)to set up non-picklable per-worker state at startup if __name__ == "__main__"is mandatory on Windows and macOS (spawn start method) - without it, worker imports trigger infinite process creation- Prefer
ProcessPoolExecutorfromconcurrent.futuresfor new code - it propagates exceptions cleanly and integrates withasynciovialoop.run_in_executor()
What's Next
Lesson 03 - Asyncio and Async/Await introduces Python's third concurrency model: cooperative multitasking on a single thread. Where threads and processes shine for blocking I/O and CPU work respectively, asyncio handles massive concurrency - tens of thousands of simultaneous connections - with minimal memory overhead. You will learn coroutines, await, asyncio.gather(), async context managers, and how to build high-performance I/O applications.
