Skip to main content

Multiprocessing in Python

Reading time: ~30 minutes | Level: Intermediate → Engineering

Before reading further, predict what happens here:

from PIL import Image
import threading, multiprocessing, time, io

def resize_image(data):
img = Image.open(io.BytesIO(data))
img = img.resize((128, 128))
return img.tobytes()

images = [open(f"photo_{i}.jpg", "rb").read() for i in range(4)]

# Version A - 4 threads
start = time.perf_counter()
threads = [threading.Thread(target=resize_image, args=(img,)) for img in images]
for t in threads: t.start()
for t in threads: t.join()
thread_time = time.perf_counter() - start

# Version B - 4 processes
start = time.perf_counter()
with multiprocessing.Pool(4) as pool:
results = pool.map(resize_image, images)
process_time = time.perf_counter() - start

print(f"Threads: {thread_time:.2f}s")
print(f"Processes: {process_time:.2f}s")
# ?
Show Answer

Typical output on a 4-core machine:

Threads: 0.95s
Processes: 0.26s

Why? Image resizing in Pillow is CPU-bound C code. Pillow does release the GIL during pixel operations - so threads can run somewhat concurrently. But Python-level orchestration, data marshalling, and interpreter overhead still bottleneck threaded execution.

Processes sidestep the GIL entirely. Each of the 4 processes runs its own CPython interpreter on its own CPU core with zero contention. On a 4-core machine you get near-linear speedup: 4 tasks in roughly the time of 1.

The exact ratio depends on your hardware, Pillow version, and image size. The pattern holds universally for CPU-bound work: processes scale; threads do not.

This gap - threads giving no speedup, processes giving 3–4x speedup - is the defining characteristic of the GIL's impact on CPU-bound work. Knowing when to reach for multiprocessing, and how to use it correctly, separates engineers who write scalable Python from those who wonder why their "parallel" code is slow.

What You Will Learn

  • How multiprocessing.Process works - spawning, joining, exit codes, PIDs
  • The process vs thread memory model - why processes are isolated and why that matters
  • Pool.map(), Pool.starmap(), Pool.apply_async(), Pool.imap() - the full toolkit
  • Inter-process communication with Queue and Pipe - the producer/consumer pattern
  • Shared memory with Value, Array, and multiprocessing.shared_memory
  • ProcessPoolExecutor - the modern concurrent.futures approach
  • Serialization constraints - why everything crossing process boundaries must be picklable
  • Why if __name__ == "__main__" is non-negotiable on Windows and macOS

Prerequisites

  • Lesson 01 (Threading) - thread vs process mental model
  • GIL Explained (Module 03) - why the GIL prevents CPU parallelism in threads
  • Basic familiarity with concurrent.futures.ThreadPoolExecutor

Part 1 - multiprocessing.Process

Spawning a Process

multiprocessing.Process is the lowest-level API. It mirrors threading.Thread almost exactly:

import multiprocessing
import os
import time


def worker(name: str, delay: float) -> None:
"""This function runs in a separate OS process."""
print(f"[{name}] PID={os.getpid()}, parent PID={os.getppid()}")
time.sleep(delay)
print(f"[{name}] done")


if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=("alpha", 1.0))
p2 = multiprocessing.Process(target=worker, args=("beta", 0.5))

p1.start() # spawn OS process, does NOT block
p2.start()

print(f"Main PID={os.getpid()}")
print(f"p1 pid={p1.pid}, alive={p1.is_alive()}")

p2.join() # wait for p2 to finish
p1.join() # wait for p1 to finish

print(f"p1 exitcode={p1.exitcode}") # 0 = success
print(f"p2 exitcode={p2.exitcode}") # 0 = success

Key attributes and methods:

Attribute / MethodWhat it does
p.start()Spawn the OS process - returns immediately
p.join(timeout=None)Block until process finishes (or timeout expires)
p.is_alive()True if process is still running
p.pidOS process ID (available after start())
p.exitcodeNone while running; 0 on success; negative = killed by signal
p.terminate()Send SIGTERM (Unix) or TerminateProcess (Windows)
p.kill()Send SIGKILL (Unix only) - cannot be caught or ignored
p.daemonIf True, process is killed when parent exits

Process Lifecycle

import multiprocessing
import time


def crasher():
"""Simulates a process that fails."""
time.sleep(0.2)
raise RuntimeError("something went wrong")


if __name__ == "__main__":
p = multiprocessing.Process(target=crasher)
p.start()
p.join()

print(f"exitcode: {p.exitcode}")
# exitcode: 1 - non-zero means the process raised an exception
# The exception traceback is printed in the child's stderr, not the parent's

:::warning Process Exceptions Are Silent in the Parent If a child process raises an exception, the parent does not receive it automatically. The traceback prints to stderr in the child, but p.exitcode will be non-zero. When using bare Process, always check p.exitcode after join(). The Pool and ProcessPoolExecutor APIs propagate exceptions back properly. :::

Part 2 - Process vs Thread - The Memory Model

Why Processes Are Isolated

Threads share the same memory space - one thread can read or write any variable owned by another. Processes have completely separate memory spaces. Each process gets its own copy of everything.

Decision Table - Process vs Thread

FactorThreadProcess
MemoryShared - easy to communicate, easy to corruptIsolated - safe by default, IPC required
Startup cost~0.1ms50–500ms (spawn) / ~1ms (fork on Linux)
GIL impactSerialized for Python bytecodesEach process has its own GIL - true parallelism
CPU-bound PythonNo speedup (often slower)Near-linear speedup up to core count
I/O-boundExcellent - GIL released during I/OWorks but wastes memory and startup cost
Shared stateEasy (but needs locks)Hard (Queue, Pipe, shared_memory, Manager)
Crash isolationOne thread crash can kill the processChild process crash doesn't affect parent
Memory usageLow - threads share heapHigh - each process has its own heap
Best forI/O-bound: HTTP, DB, file, socketsCPU-bound: image processing, parsing, math

Rule of thumb: If your bottleneck is waiting (network, disk, database) - use threads or asyncio. If your bottleneck is computing (Python loops, data transformation, parsing) - use processes.

Part 3 - Pool - Distributing Work Across Processes

multiprocessing.Pool manages a fixed pool of worker processes and distributes tasks across them. It is far more ergonomic than managing Process objects manually.

Pool.map() - The Workhorse

import multiprocessing
import math


def is_prime(n: int) -> bool:
"""CPU-bound primality check - pure Python."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True


if __name__ == "__main__":
numbers = list(range(1_000_000, 1_100_000))

# Sequential
sequential_primes = [n for n in numbers if is_prime(n)]

# Parallel - Pool.map blocks until all results are ready
# Returns results in the SAME ORDER as input - guaranteed
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(is_prime, numbers)
parallel_primes = [n for n, prime in zip(numbers, results) if prime]

assert sequential_primes == parallel_primes # same results, different order of computation
print(f"Found {len(parallel_primes)} primes")

pool.map(func, iterable) - equivalent to list(map(func, iterable)) but distributed across processes. Results are in input order, regardless of which process finished first.

Pool.starmap() - Multiple Arguments

import multiprocessing


def power(base: int, exponent: int) -> int:
return base ** exponent


if __name__ == "__main__":
args = [(2, 10), (3, 8), (5, 6), (7, 4), (11, 3)]

with multiprocessing.Pool() as pool:
# starmap unpacks each tuple as separate arguments
results = pool.starmap(power, args)
# [1024, 6561, 15625, 2401, 1331]
print(results)

Pool.apply_async() - Non-Blocking Submission

import multiprocessing
import time


def slow_compute(n: int) -> int:
time.sleep(0.5)
return n * n


if __name__ == "__main__":
with multiprocessing.Pool(4) as pool:
# Submit all tasks immediately - don't wait
futures = [pool.apply_async(slow_compute, (i,)) for i in range(8)]

# Do other work in the main process while workers compute...
print("Tasks submitted, doing other work...")
time.sleep(0.1)

# Collect results - .get() blocks until the specific task is done
results = [f.get(timeout=5.0) for f in futures]
print(results) # [0, 1, 4, 9, 16, 25, 36, 49]

apply_async also accepts callback and error_callback:

results = []
errors = []

def on_success(result):
results.append(result) # called in the MAIN process when task completes

def on_error(exc):
errors.append(exc) # called in the MAIN process when task raises

with multiprocessing.Pool(4) as pool:
for i in range(10):
pool.apply_async(slow_compute, (i,), callback=on_success, error_callback=on_error)
pool.close()
pool.join() # wait for all tasks to complete

print(f"Success: {len(results)}, Errors: {len(errors)}")

Pool.imap() - Lazy Evaluation

import multiprocessing


def process_line(line: str) -> str:
return line.strip().upper()


if __name__ == "__main__":
# imap returns an iterator - results are yielded as they complete
# Memory-efficient for large iterables: doesn't load all results into RAM
with multiprocessing.Pool(4) as pool:
# chunksize=100: sends 100 lines at a time to each worker (less IPC overhead)
result_iter = pool.imap(process_line, open("large_file.txt"), chunksize=100)

for processed in result_iter:
print(processed) # yield results as they're ready

map vs imap - choose based on memory:

MethodReturnsResult orderMemory
pool.map()list - waits for allInput orderAll results in RAM
pool.imap()Iterator - lazyInput order (buffers internally)Streaming
pool.imap_unordered()Iterator - lazyCompletion orderStreaming

:::tip Use chunksize to Reduce IPC Overhead By default, Pool.map() sends one item at a time to each worker via IPC (inter-process communication). For small, fast tasks this IPC overhead dominates. Use chunksize to batch items: pool.map(func, items, chunksize=100) sends 100 items per batch, massively reducing IPC round-trips. :::

Part 4 - Inter-Process Communication - Queue and Pipe

Processes cannot share Python objects directly. Communication happens through explicit IPC mechanisms.

multiprocessing.Queue - Thread and Process Safe

import multiprocessing
import time
import random


def producer(queue: multiprocessing.Queue, n_items: int) -> None:
"""Generates work items and puts them on the queue."""
for i in range(n_items):
item = {"id": i, "value": random.randint(1, 100)}
queue.put(item)
print(f"[producer] sent item {i}")
time.sleep(0.05)

# Sentinel value signals consumers to stop
queue.put(None)
print("[producer] done")


def consumer(queue: multiprocessing.Queue, worker_id: int) -> None:
"""Reads work items from the queue and processes them."""
while True:
item = queue.get() # blocks until an item is available
if item is None:
queue.put(None) # pass the sentinel to the next consumer
print(f"[consumer-{worker_id}] done")
break
result = item["value"] ** 2
print(f"[consumer-{worker_id}] processed item {item['id']}: {result}")


if __name__ == "__main__":
q = multiprocessing.Queue(maxsize=10) # maxsize=10: producer blocks if queue is full

prod = multiprocessing.Process(target=producer, args=(q, 20))
cons1 = multiprocessing.Process(target=consumer, args=(q, 1))
cons2 = multiprocessing.Process(target=consumer, args=(q, 2))

prod.start()
cons1.start()
cons2.start()

prod.join()
cons1.join()
cons2.join()

multiprocessing.Pipe - Bidirectional Channel

import multiprocessing


def child_process(conn):
"""Receives a number, returns its square."""
while True:
data = conn.recv() # blocks until parent sends
if data is None:
conn.close()
break
conn.send(data ** 2) # send result back to parent


if __name__ == "__main__":
# Pipe() returns two Connection objects - one for each end
parent_conn, child_conn = multiprocessing.Pipe()

p = multiprocessing.Process(target=child_process, args=(child_conn,))
p.start()
child_conn.close() # parent doesn't use the child end

for n in [3, 7, 12, 25]:
parent_conn.send(n)
result = parent_conn.recv()
print(f"{n}^2 = {result}")

parent_conn.send(None) # shutdown signal
parent_conn.close()
p.join()

Queue vs Pipe - when to use each:

QueuePipe
DirectionMany producers → many consumersPoint-to-point (2 ends)
ProcessesAny number can use itExactly 2 processes
Thread-safeYesNo (not safe for multiple threads on same end)
OverheadHigher (uses a Pipe + thread internally)Lower
Best forWorker pools, task distributionSingle child ↔ parent communication

Part 5 - Shared Memory

Value and Array - Low-Level Shared Memory

import multiprocessing
import ctypes
import time


def increment_counter(counter, n: int) -> None:
"""Increment a shared counter n times."""
for _ in range(n):
with counter.get_lock(): # Value has a built-in lock
counter.value += 1


def fill_array(shared_array, start: int) -> None:
"""Fill a shared array with computed values."""
for i in range(len(shared_array)):
shared_array[i] = start + i


if __name__ == "__main__":
# Value: single shared value
# ctypes.c_int: underlying C type (c_int, c_double, c_long, etc.)
counter = multiprocessing.Value(ctypes.c_int, 0)

p1 = multiprocessing.Process(target=increment_counter, args=(counter, 100_000))
p2 = multiprocessing.Process(target=increment_counter, args=(counter, 100_000))
p1.start(); p2.start()
p1.join(); p2.join()
print(f"counter = {counter.value}") # 200,000

# Array: fixed-size shared array
shared_arr = multiprocessing.Array(ctypes.c_double, 10) # 10 doubles, init to 0.0
p3 = multiprocessing.Process(target=fill_array, args=(shared_arr, 100))
p3.start()
p3.join()
print(list(shared_arr)) # [100.0, 101.0, ..., 109.0]

:::danger Shared Mutable State Without Synchronization Is Undefined Behavior A multiprocessing.Value without counter.get_lock() is a data race - multiple processes read and write the same memory location simultaneously. The result is non-deterministic corruption. Always use the built-in lock (value.get_lock()) or an external multiprocessing.Lock when multiple processes modify a Value or Array.

# WRONG - race condition, counter will be wrong
def unsafe_increment(counter, n):
for _ in range(n):
counter.value += 1 # read-modify-write without lock

# CORRECT
def safe_increment(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1

:::

multiprocessing.shared_memory - Modern High-Performance Shared Memory

Python 3.8 introduced shared_memory for sharing large buffers (e.g., NumPy arrays) between processes with zero-copy:

import multiprocessing.shared_memory as shm
import multiprocessing
import numpy as np


def worker_process(shm_name: str, shape: tuple, dtype: str) -> None:
"""Attach to existing shared memory and modify the array in-place."""
# Attach to the existing shared block - no copying
existing_shm = shm.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)

# Modify the array in-place - visible to the parent immediately
arr *= 2
arr += 1

existing_shm.close() # detach - do NOT unlink here, parent owns it


if __name__ == "__main__":
# Create a NumPy array in shared memory
original = np.arange(100, dtype=np.float64)
shared_block = shm.SharedMemory(create=True, size=original.nbytes)
shared_arr = np.ndarray(original.shape, dtype=original.dtype, buffer=shared_block.buf)
shared_arr[:] = original # copy data into shared memory

print(f"Before: {shared_arr[:5]}") # [0. 1. 2. 3. 4.]

p = multiprocessing.Process(
target=worker_process,
args=(shared_block.name, original.shape, str(original.dtype))
)
p.start()
p.join()

print(f"After: {shared_arr[:5]}") # [1. 3. 5. 7. 9.] - modified by child

shared_block.close()
shared_block.unlink() # ALWAYS unlink - releases the shared memory block

Value/Array vs shared_memory:

Value / Arrayshared_memory
SizeSmall (single value / fixed array)Arbitrary - gigabytes of data
Built-in lockYes (Value.get_lock())No - you manage synchronization
NumPy supportLimitedExcellent - wrap with np.ndarray
Python versionAll versions3.8+
Zero-copyYes (both use shared memory)Yes

Part 6 - ProcessPoolExecutor - The Modern Way

concurrent.futures.ProcessPoolExecutor is the recommended API for new code. It provides a cleaner interface, better exception propagation, and integrates with asyncio.

from concurrent.futures import ProcessPoolExecutor, as_completed
import math
import time


def factorize(n: int) -> list[int]:
"""Return all prime factors of n. CPU-bound pure Python."""
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors


if __name__ == "__main__":
numbers = [2**31 - 1, 2**29 - 1, 2**27 - 1, 2**25 - 1,
999_999_937, 999_961_003, 999_961_007, 987_654_321]

# map(): ordered results, blocks until all done
print("=== pool.map() ===")
with ProcessPoolExecutor(max_workers=4) as executor:
start = time.perf_counter()
results = list(executor.map(factorize, numbers))
elapsed = time.perf_counter() - start

for n, factors in zip(numbers, results):
print(f"{n:>15,}{factors}")
print(f"Elapsed: {elapsed:.3f}s")

print("\n=== submit() + as_completed() ===")
# submit(): process results as they finish (fastest-first order)
with ProcessPoolExecutor(max_workers=4) as executor:
future_to_n = {executor.submit(factorize, n): n for n in numbers}

for future in as_completed(future_to_n):
n = future_to_n[future]
try:
factors = future.result()
print(f"{n:>15,}{factors}")
except Exception as exc:
print(f"{n} raised: {exc}")

Exception Propagation - ProcessPoolExecutor vs Pool

from concurrent.futures import ProcessPoolExecutor


def may_fail(n: int) -> int:
if n == 3:
raise ValueError(f"cannot process {n}")
return n * n


if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(may_fail, i) for i in range(5)]

for i, future in enumerate(futures):
try:
print(f"result[{i}] = {future.result()}")
except ValueError as e:
print(f"result[{i}] FAILED: {e}")
# result[0] = 0
# result[1] = 1
# result[2] = 4
# result[3] FAILED: cannot process 3 ← exception re-raised in parent
# result[4] = 16

:::tip ProcessPoolExecutor Over multiprocessing.Pool for New Code ProcessPoolExecutor propagates exceptions cleanly to the parent, integrates with asyncio via loop.run_in_executor(), and has a consistent Future-based API shared with ThreadPoolExecutor. Reserve multiprocessing.Pool for cases where you need imap(), imap_unordered(), or apply_async() with callbacks. :::

Part 7 - Serialization - The Pickle Constraint

Everything passed between processes - function arguments, return values, task functions themselves - must be picklable. Python's pickle module serializes the data, sends it through a Pipe, and deserializes it in the target process.

What Is and Is Not Picklable

import pickle
import multiprocessing


# Picklable - safe to pass between processes
pickle.dumps(42) # int
pickle.dumps("hello") # str
pickle.dumps([1, 2, 3]) # list
pickle.dumps({"a": 1}) # dict
pickle.dumps((1, 2)) # tuple

import math
pickle.dumps(math.sqrt) # built-in functions - OK
pickle.dumps(math.pi) # module-level constants - OK


# NOT picklable - will crash with PicklingError

# Lambdas - defined inline, no importable name
fn = lambda x: x * 2
# pickle.dumps(fn) # PicklingError: Can't pickle <function <lambda>>

# Local functions - defined inside another function
def make_multiplier(factor):
def multiply(x): # this function cannot be pickled
return x * factor
return multiply

fn2 = make_multiplier(3)
# pickle.dumps(fn2) # AttributeError: Can't pickle local object

# File handles, sockets, database connections, locks
import threading
lock = threading.Lock()
# pickle.dumps(lock) # TypeError: cannot pickle '_thread.lock' object

:::danger Lambdas and Local Functions Cannot Cross Process Boundaries The most common multiprocessing mistake is passing a lambda or locally-defined function to Pool.map() or ProcessPoolExecutor.submit(). Pickle serializes functions by name - it stores the module and qualified name, then reimports on the other side. Lambdas have no importable name. Define worker functions at module level.

# WRONG - PicklingError at runtime
with ProcessPoolExecutor() as executor:
results = list(executor.map(lambda x: x * 2, range(10)))

# CORRECT - module-level function
def double(x):
return x * 2

with ProcessPoolExecutor() as executor:
results = list(executor.map(double, range(10)))

:::

Working Around Pickle Limitations

# Strategy 1: functools.partial for parameterized functions
from functools import partial
import multiprocessing


def multiply_by(factor: int, x: int) -> int:
return x * factor


if __name__ == "__main__":
triple = partial(multiply_by, 3) # partial objects ARE picklable
with multiprocessing.Pool(4) as pool:
results = pool.map(triple, range(10))
print(results) # [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
# Strategy 2: initializer - pass non-picklable state to pool workers at startup
import multiprocessing
import sqlite3

db_conn = None # will be initialized per worker

def init_worker(db_path: str) -> None:
global db_conn
db_conn = sqlite3.connect(db_path) # each worker gets its own connection


def query_db(user_id: int) -> dict:
cursor = db_conn.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
return {"id": row[0], "name": row[1]} if row else {}


if __name__ == "__main__":
with multiprocessing.Pool(
processes=4,
initializer=init_worker,
initargs=("users.db",)
) as pool:
users = pool.map(query_db, range(1, 101))

Part 8 - if __name__ == "__main__" - Why It Is Required

On Windows and macOS, the default process start method is spawn: Python starts a fresh interpreter and imports your script to initialize the worker. If your script creates a Pool or Process at the top level (not guarded by __name__ == "__main__"), the imported script creates another Pool, which imports the script again, infinitely.

# WRONG - will crash on Windows/macOS with RecursionError or OSError
from multiprocessing import Pool

def work(n):
return n * n

with Pool(4) as pool: # this line runs when workers import the module
results = pool.map(work, range(10)) # → infinite process spawning


# CORRECT - the Pool is only created in the original script invocation
from multiprocessing import Pool

def work(n):
return n * n

if __name__ == "__main__": # guards against re-execution on worker import
with Pool(4) as pool:
results = pool.map(work, range(10))
print(results)

Start methods by platform:

MethodPlatformsHow it worksSpeed
forkLinux (default)Copy parent process memory via fork()Fast (~1ms)
spawnWindows (default), macOS (default 3.8+)Start fresh interpreter, import moduleSlow (100–500ms)
forkserverUnix (opt-in)Dedicated server process handles forkingMedium
# On Linux, you can use fork explicitly for speed:
import multiprocessing

if __name__ == "__main__":
multiprocessing.set_start_method("fork") # must be called before any Pool/Process
with multiprocessing.Pool(4) as pool:
results = pool.map(work, range(10))

Full Example - Parallel Image Thumbnail Generator

A production-quality parallel thumbnail generator demonstrating ProcessPoolExecutor, proper error handling, progress tracking, and pickling-safe design:

"""
parallel_thumbnailer.py - Generate thumbnails from a directory of images.

Usage:
python parallel_thumbnailer.py ./photos ./thumbs --size 256 --workers 8
"""
from __future__ import annotations

import argparse
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path

try:
from PIL import Image
except ImportError:
raise SystemExit("Install Pillow: pip install Pillow")


# --- Data classes (picklable - no lambdas, no file handles) ---

@dataclass
class ThumbnailTask:
source_path: str
dest_path: str
size: int


@dataclass
class ThumbnailResult:
source_path: str
dest_path: str
success: bool
error: str | None
elapsed_ms: float


# --- Worker function at MODULE LEVEL - required for pickling ---

def generate_thumbnail(task: ThumbnailTask) -> ThumbnailResult:
"""
Resize one image to a square thumbnail.
Runs in a worker process - must be picklable (module-level function).
"""
start = time.perf_counter()
try:
with Image.open(task.source_path) as img:
# LANCZOS is high-quality downsampling
img.thumbnail((task.size, task.size), Image.LANCZOS)
os.makedirs(os.path.dirname(task.dest_path), exist_ok=True)
img.save(task.dest_path, optimize=True, quality=85)

elapsed_ms = (time.perf_counter() - start) * 1000
return ThumbnailResult(
source_path=task.source_path,
dest_path=task.dest_path,
success=True,
error=None,
elapsed_ms=elapsed_ms,
)
except Exception as exc:
elapsed_ms = (time.perf_counter() - start) * 1000
return ThumbnailResult(
source_path=task.source_path,
dest_path=task.dest_path,
success=False,
error=str(exc),
elapsed_ms=elapsed_ms,
)


def build_tasks(source_dir: Path, dest_dir: Path, size: int) -> list[ThumbnailTask]:
"""Scan source directory and build a list of ThumbnailTask objects."""
extensions = {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".webp"}
tasks = []
for src in source_dir.rglob("*"):
if src.suffix.lower() not in extensions:
continue
rel = src.relative_to(source_dir)
dest = dest_dir / rel.with_suffix(".jpg")
tasks.append(ThumbnailTask(
source_path=str(src),
dest_path=str(dest),
size=size,
))
return tasks


def run_parallel(tasks: list[ThumbnailTask], max_workers: int) -> None:
total = len(tasks)
completed = 0
failed = 0
wall_start = time.perf_counter()

print(f"Processing {total} images with {max_workers} workers...")

with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_map = {executor.submit(generate_thumbnail, t): t for t in tasks}

for future in as_completed(future_map):
result: ThumbnailResult = future.result()
completed += 1

if result.success:
status = f"OK ({result.elapsed_ms:.0f}ms)"
else:
failed += 1
status = f"ERR {result.error}"

# Progress indicator
pct = (completed / total) * 100
print(f"[{pct:5.1f}%] {os.path.basename(result.source_path):30s} {status}")

wall_elapsed = time.perf_counter() - wall_start
print(f"\nDone: {completed - failed}/{total} succeeded, "
f"{failed} failed, {wall_elapsed:.2f}s total")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parallel thumbnail generator")
parser.add_argument("source", type=Path, help="Source image directory")
parser.add_argument("dest", type=Path, help="Destination thumbnail directory")
parser.add_argument("--size", type=int, default=256, help="Max thumbnail dimension (px)")
parser.add_argument("--workers", type=int, default=os.cpu_count(),
help="Number of worker processes")
args = parser.parse_args()

tasks = build_tasks(args.source, args.dest, args.size)
if not tasks:
print(f"No images found in {args.source}")
else:
run_parallel(tasks, args.workers)

Design decisions in this example:

  • ThumbnailTask and ThumbnailResult are dataclass objects - picklable, no hidden state
  • generate_thumbnail is at module level - picklable by name
  • Errors are returned as ThumbnailResult(success=False) rather than raised - prevents one bad image from killing other futures
  • as_completed() reports progress in real time rather than waiting for all tasks
  • if __name__ == "__main__" guards all Pool-creating code

Graded Practice Challenges

Beginner - Predict the Output

import multiprocessing

counter = 0 # module-level in the parent

def increment():
global counter
for _ in range(1000):
counter += 1
print(f"child counter = {counter}")

if __name__ == "__main__":
p = multiprocessing.Process(target=increment)
p.start()
p.join()
print(f"parent counter = {counter}")

What do the two print statements output, and why?

Show Answer

Output:

child counter = 1000
parent counter = 0

The child process receives a copy of the parent's memory at spawn time. counter = 0 is copied. The child increments its own copy to 1000. The parent's counter is untouched - it remains 0.

This is the fundamental process isolation property. Changes in child processes are never visible to the parent (unless you use Queue, Pipe, Value, Array, or shared_memory).

Intermediate - Fix the Bugs

This code has three bugs. Identify and fix each one:

from multiprocessing import Pool
import time

# Bug 1: worker is a lambda
square = lambda x: x * x

# Bug 2: missing __name__ guard
with Pool(4) as pool:
results = pool.map(square, range(10))

# Bug 3: shared state modified from workers without synchronization
shared_results = []

def collect(n):
time.sleep(0.01)
shared_results.append(n * n) # wrong approach for multiprocessing

with Pool(4) as pool:
pool.map(collect, range(20))

print(len(shared_results)) # will print 0, not 20
Show Solution

Bug 1 - Lambda is not picklable. Replace with a module-level function:

def square(x):
return x * x

Bug 2 - Missing __name__ == "__main__" guard. On Windows/macOS, worker processes re-import the module and re-execute Pool(4) at import time, causing infinite recursion:

if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(square, range(10))
print(results)

Bug 3 - Shared list not shared between processes. Each worker gets a copy of shared_results. Appends in the child are invisible to the parent. Use pool.map() return values instead:

def compute(n):
time.sleep(0.01)
return n * n

if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(compute, range(20))
print(len(results)) # 20

Advanced - Design Challenge

Design a ParallelPipeline class that:

  1. Accepts a list of processing stages, each a Callable[[T], T]
  2. Has a run(items) method that applies each stage to all items using ProcessPoolExecutor
  3. After each stage completes, passes results as input to the next stage
  4. Tracks per-stage timing
  5. Returns the final results and a timing report

Example usage:

def normalize(x): return x / 255.0
def augment(x): return x * 1.1
def threshold(x): return 1.0 if x > 0.5 else 0.0

pipeline = ParallelPipeline(stages=[normalize, augment, threshold], workers=4)
results, timing = pipeline.run(list(range(256)))
# timing = {"stage_0": 0.12s, "stage_1": 0.09s, "stage_2": 0.08s}
Show Reference Solution
from __future__ import annotations
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, TypeVar, Any
import time

T = TypeVar("T")


class ParallelPipeline:
"""
Applies a sequence of transformation stages to a list of items,
running each stage in parallel via ProcessPoolExecutor.

All stage functions must be picklable (module-level definitions).
"""

def __init__(
self,
stages: list[Callable],
workers: int | None = None,
) -> None:
self.stages = stages
self.workers = workers

def run(self, items: list[Any]) -> tuple[list[Any], dict[str, float]]:
"""
Run all pipeline stages on items.

Returns:
(final_results, timing_report)
timing_report maps stage index to elapsed seconds
"""
timing: dict[str, float] = {}
current_items = list(items)

for stage_idx, stage_fn in enumerate(self.stages):
stage_key = f"stage_{stage_idx} ({stage_fn.__name__})"
stage_start = time.perf_counter()

with ProcessPoolExecutor(max_workers=self.workers) as executor:
current_items = list(executor.map(stage_fn, current_items))

timing[stage_key] = time.perf_counter() - stage_start
print(f" {stage_key}: {timing[stage_key]:.3f}s → {len(current_items)} items")

return current_items, timing


# Module-level functions - picklable
def normalize(x: float) -> float:
return x / 255.0

def augment(x: float) -> float:
return min(x * 1.1, 1.0)

def threshold(x: float) -> float:
return 1.0 if x > 0.5 else 0.0


if __name__ == "__main__":
pipeline = ParallelPipeline(stages=[normalize, augment, threshold], workers=4)
results, timing = pipeline.run(list(range(256)))

print(f"\nResults sample: {results[:8]}")
print(f"\nTiming report:")
for stage, elapsed in timing.items():
print(f" {stage}: {elapsed:.3f}s")
print(f" Total: {sum(timing.values()):.3f}s")

Key design decisions:

  • Each ProcessPoolExecutor is a fresh context manager per stage - allows different worker counts per stage if extended
  • current_items is reassigned after each stage - simple and correct; avoids shared mutable state
  • Stage functions must be module-level (not nested) - explicitly stated in the docstring
  • timing keys include the function name for readability in reports

Key Takeaways

  • multiprocessing.Process spawns a true OS process with an isolated memory space and its own GIL - changes in one process are invisible to others unless you use IPC
  • The GIL has no effect across processes - each process runs Python bytecodes independently on its own CPU core, enabling true CPU parallelism
  • Process startup cost is significant (50–500ms on spawn, ~1ms on fork) - don't use multiprocessing for tasks that complete in milliseconds; the overhead dominates
  • Pool.map() distributes work and returns ordered results; Pool.imap() is lazy and memory-efficient for large datasets; use chunksize to reduce IPC overhead for small tasks
  • Queue and Pipe are the correct tools for passing data between processes - never use plain Python variables (each process has its own copy)
  • Value and Array provide low-overhead shared memory for primitive types; multiprocessing.shared_memory enables zero-copy sharing of large buffers (e.g., NumPy arrays)
  • Everything sent between processes must be picklable - lambdas, local functions, file handles, locks, and sockets are not picklable
  • Use functools.partial or module-level functions to work around pickle constraints; use Pool(initializer=...) to set up non-picklable per-worker state at startup
  • if __name__ == "__main__" is mandatory on Windows and macOS (spawn start method) - without it, worker imports trigger infinite process creation
  • Prefer ProcessPoolExecutor from concurrent.futures for new code - it propagates exceptions cleanly and integrates with asyncio via loop.run_in_executor()

What's Next

Lesson 03 - Asyncio and Async/Await introduces Python's third concurrency model: cooperative multitasking on a single thread. Where threads and processes shine for blocking I/O and CPU work respectively, asyncio handles massive concurrency - tens of thousands of simultaneous connections - with minimal memory overhead. You will learn coroutines, await, asyncio.gather(), async context managers, and how to build high-performance I/O applications.

© 2026 EngineersOfAI. All rights reserved.