Skip to main content

Threading in Python

Reading time: ~40 minutes | Level: Intermediate → Engineering

Before reading further, predict the output of this benchmark:

import threading
import time

def cpu_crunch():
"""Pure CPU work - no I/O, no sleep."""
x = 0
for _ in range(25_000_000):
x += 1
return x

# Sequential: run the function twice, back to back
start = time.perf_counter()
cpu_crunch()
cpu_crunch()
sequential_time = time.perf_counter() - start

# Parallel: run the function in 2 threads simultaneously
start = time.perf_counter()
t1 = threading.Thread(target=cpu_crunch)
t2 = threading.Thread(target=cpu_crunch)
t1.start(); t2.start()
t1.join(); t2.join()
threaded_time = time.perf_counter() - start

print(f"Sequential : {sequential_time:.3f}s")
print(f"2 Threads : {threaded_time:.3f}s")
print(f"Speedup : {sequential_time / threaded_time:.2f}x")

A developer adds threading expecting a 2x speedup. They have 4 physical cores available. What does the output actually show?

Show Answer

Typical output:

Sequential : 1.421s
2 Threads : 1.687s
Speedup : 0.84x

The threaded version is slower, not faster. The speedup is less than 1x.

This is the GIL in action. Because cpu_crunch() is pure Python computation, the GIL prevents both threads from running Python bytecode at the same time. They do not run in parallel - they take turns, each waiting for the other to release the GIL. The overhead of thread creation, context switching, and GIL contention makes the threaded version measurably slower than sequential execution.

The developer made the single most common threading mistake in Python: adding threads to CPU-bound code. Threading does not speed up CPU work in CPython. It never has. The correct tool for CPU-bound parallelism is multiprocessing - each process has its own GIL.

Threading does provide real, measurable speedup for I/O-bound work - network requests, file I/O, database calls. When a thread is waiting for the operating system to return data, it releases the GIL, and another thread can run. This is the only scenario where threading in Python improves throughput.

Every part of this lesson is designed to make that distinction viscerally clear - not just understood intellectually, but felt through code you can run and measure.

What You Will Learn

  • threading.Thread - target, args, kwargs, start(), join(), is_alive()
  • The thread lifecycle - New, Runnable, Running, Blocked, Terminated
  • Daemon threads - what they are, when they help, why they can lose data
  • The GIL - what it actually locks, why CPU-bound threads don't benefit, why I/O-bound threads do
  • Sharing state between threads - the race condition hazard with full diagnosis
  • threading.local() - per-thread storage for state that must not be shared
  • When threading is the right tool and when it is the wrong one
  • Common threading mistakes and how to recognize them
  • A full concurrent file downloader using ThreadPoolExecutor

Prerequisites

  • Module 08 Overview - read the GIL section and the concurrency decision table
  • Comfortable with Python functions, closures, and with statements
  • Basic understanding of what a process is (Module 03 - Python Internals recommended)

Part 1 - threading.Thread

The threading.Thread class is Python's basic unit of concurrent execution. Each Thread object represents an operating-system thread managed by the Python runtime.

Creating and Starting a Thread

import threading
import time

def worker(name: str, delay: float) -> None:
"""A simple task that simulates work with a sleep."""
print(f"[{name}] Starting")
time.sleep(delay)
print(f"[{name}] Done after {delay}s")

# Create a Thread - does NOT start it
t = threading.Thread(
target=worker, # the callable to run in the new thread
args=("Thread-A",), # positional arguments to target - note the trailing comma
kwargs={"delay": 1.5}, # keyword arguments to target
name="my-worker", # optional human-readable name (appears in tracebacks)
)

print(f"Thread alive before start: {t.is_alive()}") # False
t.start() # create the OS thread and begin execution
print(f"Thread alive after start: {t.is_alive()}") # True (probably)
t.join() # block until the thread finishes
print(f"Thread alive after join: {t.is_alive()}") # False
Thread alive before start: False
[Thread-A] Starting
Thread alive after start: True
[Thread-A] Done after 1.5s
Thread alive after join: False

Key Methods

Method / AttributeWhat it does
t.start()Start the thread - can only be called once
t.join(timeout=None)Block the calling thread until t finishes (or timeout expires)
t.is_alive()Returns True if the thread has been started and has not yet finished
t.nameHuman-readable name, used in logging and tracebacks
t.identOS-assigned thread identifier (integer), available after start()
t.daemonWhether this thread is a daemon thread (see Part 3)
threading.current_thread()Returns the Thread object for the thread calling this
threading.main_thread()Returns the Thread object for the main thread
threading.enumerate()Returns a list of all currently alive Thread objects
threading.active_count()Returns the number of currently alive threads

Running Multiple Threads

import threading
import time

def download_file(url: str, idx: int) -> None:
"""Simulate downloading a file from a URL."""
print(f"[{idx}] Downloading {url}")
time.sleep(1.0) # simulate network latency
print(f"[{idx}] Finished {url}")

urls = [
"https://example.com/file1.csv",
"https://example.com/file2.csv",
"https://example.com/file3.csv",
"https://example.com/file4.csv",
]

start = time.perf_counter()

# Create all threads
threads = [
threading.Thread(target=download_file, args=(url, i))
for i, url in enumerate(urls)
]

# Start all threads
for t in threads:
t.start()

# Wait for all threads to complete
for t in threads:
t.join()

elapsed = time.perf_counter() - start
print(f"\nAll downloads complete in {elapsed:.2f}s")
# Sequential would take 4.0s; concurrent takes ~1.0s

:::tip Always Call join() on Every Thread You Start A thread that you start but never join() is a thread you have lost control of. If it raises an exception, you will not see it. If it is still running when your program tries to exit, behavior depends on whether it is a daemon thread. Make it a habit to collect all threads and join them before your program proceeds past the concurrent section. :::

Part 2 - Thread Lifecycle

Every thread passes through a defined set of states from creation to termination.

State Descriptions

New - The Thread object has been created with Thread(target=...) but start() has not been called. No OS thread exists yet. is_alive() returns False.

Runnable - start() has been called. The OS thread exists and is ready to run. It may or may not be executing at any given moment - the OS scheduler decides which runnable thread gets CPU time. is_alive() returns True.

Running - The thread is actively executing Python bytecode on a CPU core. In CPython, only one thread can be in this state at a time (the GIL enforces this). is_alive() returns True.

Blocked - The thread is waiting for something external: an I/O operation to complete, a lock to be released, a queue.get() to return data, or a time.sleep() timer to expire. While blocked, the thread releases the GIL, allowing other threads to run. is_alive() returns True.

Terminated - The target function has returned (or raised an uncaught exception). The OS thread is gone. is_alive() returns False. join() on a terminated thread returns immediately.

The GIL and State Transitions

The GIL interacts with the Runnable ↔ Running ↔ Blocked cycle:

  • A thread in the Running state holds the GIL
  • When a thread enters Blocked (waiting on I/O, a lock, or sleep), it releases the GIL - immediately allowing another thread to transition from Runnable to Running
  • When the blocking operation completes, the thread re-acquires the GIL and transitions back to Runnable (not directly to Running - it must compete for the GIL again)

This is why I/O-bound threads benefit from concurrency: every thread that blocks on I/O frees the GIL for another thread's computation. Multiple threads can be in I/O simultaneously - the OS handles the I/O, and Python threads overlap.

Part 3 - Daemon Threads

A daemon thread is a thread that does not prevent the Python interpreter from exiting. When all non-daemon threads have finished, the interpreter shuts down - and any daemon threads still running are killed immediately, without cleanup.

import threading
import time

def background_monitor():
"""A long-running background task."""
while True:
print("[monitor] checking system health...")
time.sleep(2.0)

def foreground_work():
"""The main application work."""
print("[main] Starting work")
time.sleep(3.0)
print("[main] Work complete")

# daemon=True means this thread dies with the main thread
monitor = threading.Thread(
target=background_monitor,
daemon=True, # ← key argument
name="health-monitor",
)

worker = threading.Thread(target=foreground_work)

monitor.start()
worker.start()

worker.join() # wait for foreground work to finish
# The interpreter exits here - monitor is killed immediately
print("[main] All done, exiting")
[monitor] checking system health...
[main] Starting work
[monitor] checking system health...
[main] Work complete
[main] All done, exiting

The monitor thread was still running its infinite loop when the program exited - it was killed because it was a daemon thread.

When to Use Daemon Threads

Use caseDaemon?Reason
Background metrics collectorYesIt is acceptable to lose an in-flight metric on exit
Background cache warmerYesCache will be rebuilt on next startup
Log file rotatorNoLosing the final log lines is dangerous
Database connection closerNoOpen connections should be closed gracefully
Task queue consumerNoIn-progress task must complete before exit

:::warning Daemon Threads Cannot Clean Up When a daemon thread is killed by interpreter shutdown, its finally blocks do not run, its context managers do not __exit__, and any data it was writing may be partially committed. Daemon threads are appropriate for work that is purely advisory (logging, monitoring, caching) and never appropriate for work that modifies persistent state (writing files, writing to databases, sending API calls). :::

Checking and Setting the Daemon Property

import threading

t = threading.Thread(target=lambda: None)

# daemon must be set BEFORE start() - setting it after raises RuntimeError
t.daemon = True # property setter
# equivalently: threading.Thread(target=..., daemon=True)

print(threading.main_thread().daemon) # False - main thread is never a daemon

Threads inherit the daemon status of the thread that creates them. If a daemon thread creates a child thread, that child is also a daemon by default. Main thread is never a daemon.

Part 4 - The GIL in Depth

The Global Interpreter Lock is a mutex (mutual exclusion lock) inside the CPython interpreter. It ensures that only one thread at a time can execute Python bytecode.

What the GIL Actually Protects

The GIL protects CPython's internal state - primarily the reference counts on every Python object. CPython uses reference counting for memory management: every object has a ob_refcnt field. Incrementing and decrementing that field without the GIL would require every single object to have its own lock. The GIL is a single coarse-grained lock that protects all reference counts simultaneously.

The GIL does not protect your data structures. It does not make list.append() thread-safe. It does not make dictionary updates atomic. It does not protect any state that lives in your application code.

GIL Check Interval

The GIL is not held forever. CPython releases it every 5ms (configurable via sys.setswitchinterval()) to give other threads a chance to run. For I/O operations, the GIL is released immediately at the start of the system call - not after 5ms.

import sys

print(sys.getswitchinterval()) # 0.005 - default 5ms

# Increase to reduce GIL contention in thread-heavy applications
# (reduces switching overhead at the cost of thread fairness)
sys.setswitchinterval(0.020) # 20ms intervals

# Decrease to improve thread fairness in latency-sensitive code
sys.setswitchinterval(0.001) # 1ms intervals

CPU-Bound vs I/O-Bound: A Measured Comparison

import threading
import time
import urllib.request

def cpu_task():
x = 0
for _ in range(10_000_000):
x += 1

def io_task():
# urllib.request.urlopen releases the GIL during the network wait
try:
urllib.request.urlopen("https://httpbin.org/delay/1", timeout=10)
except Exception:
time.sleep(1) # fallback if no network

# --- CPU-bound benchmark ---
start = time.perf_counter()
cpu_task(); cpu_task()
print(f"CPU sequential : {time.perf_counter() - start:.3f}s")

start = time.perf_counter()
t1 = threading.Thread(target=cpu_task)
t2 = threading.Thread(target=cpu_task)
t1.start(); t2.start(); t1.join(); t2.join()
print(f"CPU 2 threads : {time.perf_counter() - start:.3f}s")
# 2 threads ≈ sequential or slower - GIL prevents overlap

# --- I/O-bound benchmark ---
start = time.perf_counter()
io_task(); io_task()
print(f"I/O sequential : {time.perf_counter() - start:.3f}s")

start = time.perf_counter()
t1 = threading.Thread(target=io_task)
t2 = threading.Thread(target=io_task)
t1.start(); t2.start(); t1.join(); t2.join()
print(f"I/O 2 threads : {time.perf_counter() - start:.3f}s")
# 2 threads ≈ 1x I/O time - both waited concurrently

Part 5 - Sharing State Between Threads

All threads in a process share the same memory. This is both threading's main advantage (easy communication between threads) and its main hazard (concurrent modification of shared data).

The Race Condition - Full Diagnosis

import threading

counter = 0 # shared mutable state

def increment(n: int) -> None:
global counter
for _ in range(n):
counter += 1 # NOT atomic - read-modify-write

t1 = threading.Thread(target=increment, args=(500_000,))
t2 = threading.Thread(target=increment, args=(500_000,))
t1.start(); t2.start()
t1.join(); t2.join()

print(f"Final: {counter}") # Should be 1,000,000 - rarely is

Why counter += 1 is not atomic - disassemble it:

import dis

def f():
counter += 1

dis.dis(f)
2 LOAD_FAST 0 (counter) # 1. read current value
LOAD_CONST 1 (1)
BINARY_OP 0 (+) # 2. compute + 1
STORE_FAST 0 (counter) # 3. write new value

The GIL can be released between any two bytecode instructions. The window between instruction 1 (read) and instruction 3 (write) is wide enough for another thread to read the same value, compute, and write - causing one thread's update to overwrite the other's.

:::danger Shared Mutable State Is Dangerous Without Explicit Protection A race condition with a counter produces a wrong number. A race condition in a bank transfer produces lost money. A race condition in a web session store produces a user seeing another user's data. The GIL does not protect your application data - it only protects CPython's internal reference counts. Every shared mutable data structure that multiple threads read and write must be explicitly protected with a synchronization primitive (a Lock, a Queue, or an atomic type). There are no exceptions. :::

Fixing the Race with a Lock

import threading

counter = 0
lock = threading.Lock() # one lock protects the counter

def increment_safe(n: int) -> None:
global counter
for _ in range(n):
with lock: # acquire lock → do work → release lock
counter += 1

t1 = threading.Thread(target=increment_safe, args=(500_000,))
t2 = threading.Thread(target=increment_safe, args=(500_000,))
t1.start(); t2.start()
t1.join(); t2.join()

print(f"Final: {counter}") # Always 1,000,000

The with lock: block is a critical section - only one thread can be inside it at a time. When Thread 1 holds the lock, Thread 2 blocks at with lock: until Thread 1 exits the with block and releases the lock.

:::note Locks Are Covered in Full Depth in Lesson 06 This lesson introduces the race condition problem and shows the minimal lock-based fix. Lesson 06 - Locks and Semaphores covers Lock, RLock, Semaphore, Event, Condition, and Barrier in engineering depth, including deadlock patterns and production-safe usage. :::

Thread-Safe Data Structures

Python's queue.Queue is thread-safe by design - it uses internal locks to protect all operations:

import threading
import queue

# queue.Queue is the right way to pass data between threads
work_queue: queue.Queue = queue.Queue()
results: queue.Queue = queue.Queue()

def producer(items: list) -> None:
for item in items:
work_queue.put(item) # thread-safe put
work_queue.put(None) # sentinel to signal completion

def consumer() -> None:
while True:
item = work_queue.get() # thread-safe get - blocks until item available
if item is None:
break
results.put(item * item) # thread-safe put

items = list(range(20))

p = threading.Thread(target=producer, args=(items,))
c = threading.Thread(target=consumer)

p.start(); c.start()
p.join(); c.join()

all_results = []
while not results.empty():
all_results.append(results.get())

print(sorted(all_results))

Python built-in operations that are thread-safe (protected by the GIL for the duration of the single bytecode instruction):

OperationThread-safe?Notes
list.append(x)YesSingle bytecode instruction
list[i] = xYesSingle bytecode instruction
dict[key] = valueYesSingle bytecode instruction
dict.update(other)NoMultiple steps internally
counter += 1NoThree bytecode instructions (load, add, store)
if x: y = xNoRead-then-write; another thread can change x between check and use
list.sort()NoMutates list across many steps

:::warning "Thread-Safe Operations" Does Not Mean "Thread-Safe Logic" Even though list.append() is internally thread-safe, the logic around it may not be. If one thread checks if len(lst) < MAX: and then another thread appends before the first thread appends, both threads have now appended - violating the intent. Use locks to protect logical invariants, not just individual operations. :::

Part 6 - threading.local() - Per-Thread State

threading.local() creates an object where each thread sees its own isolated copy of the attributes. Threads do not share each other's local storage.

import threading

# Thread-local storage - each thread has its own 'state.user_id'
state = threading.local()

def handle_request(user_id: int) -> None:
# Setting state.user_id in Thread A has no effect on Thread B's state.user_id
state.user_id = user_id
process_request()

def process_request() -> None:
# Safe to read state.user_id - it is this thread's own copy
user = state.user_id
print(f"Processing for user {user} in thread {threading.current_thread().name}")

threads = [
threading.Thread(target=handle_request, args=(i,), name=f"worker-{i}")
for i in range(5)
]

for t in threads:
t.start()
for t in threads:
t.join()
Processing for user 0 in thread worker-0
Processing for user 2 in thread worker-2
Processing for user 1 in thread worker-1
Processing for user 3 in thread worker-3
Processing for user 4 in thread worker-4

Each thread sees only its own state.user_id. There is no race condition because there is no sharing.

Real-World Use Cases for threading.local()

  • Database connection per thread - each worker thread holds its own connection from the pool; no locks needed
  • Request context in web frameworks - Flask uses threading.local() to store the current request object so that flask.request works without passing the request object everywhere
  • Per-thread logging context - store a request ID in thread-local storage, then read it in the logging formatter without passing it through every function call
import threading
import logging

_log_context = threading.local()

def set_request_id(request_id: str) -> None:
_log_context.request_id = request_id

def get_request_id() -> str:
return getattr(_log_context, "request_id", "no-request")

class RequestIdFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = get_request_id()
return True

# Now every log line emitted by any function in this thread
# will include the request_id without any parameter passing

Part 7 - When to Use Threading

Threading is the right tool when your bottleneck is waiting, not computing.

Threading Helps

import threading
import urllib.request
import time

URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]

results = {}
lock = threading.Lock()

def fetch(url: str, idx: int) -> None:
start = time.perf_counter()
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = resp.read()
elapsed = time.perf_counter() - start
with lock:
results[idx] = {"url": url, "bytes": len(data), "elapsed": elapsed}
except Exception as exc:
with lock:
results[idx] = {"url": url, "error": str(exc)}

# Sequential: 5 × 1s = ~5 seconds
# Threaded: all 5 wait concurrently = ~1 second
threads = [threading.Thread(target=fetch, args=(url, i)) for i, url in enumerate(URLS)]
start = time.perf_counter()
for t in threads: t.start()
for t in threads: t.join()
print(f"Fetched {len(URLS)} URLs in {time.perf_counter() - start:.2f}s")

Threading Does Not Help (Use Multiprocessing Instead)

import threading
import multiprocessing
import time

def compute_heavy(n: int) -> int:
"""Pure CPU computation - no I/O."""
return sum(i * i for i in range(n))

N = 5_000_000

# Threading: both threads fight for the GIL - no actual parallelism
start = time.perf_counter()
t1 = threading.Thread(target=compute_heavy, args=(N,))
t2 = threading.Thread(target=compute_heavy, args=(N,))
t1.start(); t2.start(); t1.join(); t2.join()
thread_time = time.perf_counter() - start

# Multiprocessing: each process has its own GIL - true parallelism
start = time.perf_counter()
p1 = multiprocessing.Process(target=compute_heavy, args=(N,))
p2 = multiprocessing.Process(target=compute_heavy, args=(N,))
p1.start(); p2.start(); p1.join(); p2.join()
process_time = time.perf_counter() - start

print(f"Threading: {thread_time:.3f}s")
print(f"Multiprocessing: {process_time:.3f}s")
# Multiprocessing is typically ~2x faster on a dual-core machine

The Decision: Thread vs Process vs Async

Part 8 - Common Threading Mistakes

Mistake 1 - Forgetting join()

import threading
import time

results = []

def worker(idx: int) -> None:
time.sleep(0.5)
results.append(idx * idx)

threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()

# BUG: forgot to join - reading results before threads finish
print(results) # [] or partially filled - threads still running!

# FIX: join all threads before accessing shared results
for t in threads:
t.join()
print(results) # [0, 1, 4, 9, 16] - all results present

Mistake 2 - Passing a Mutable Default Argument

import threading

# BUG: the default list is shared across all calls - classic Python footgun
def append_to(item, collection=[]):
collection.append(item)
return collection

t1 = threading.Thread(target=lambda: print(append_to("a")))
t2 = threading.Thread(target=lambda: print(append_to("b")))
t1.start(); t1.join()
t2.start(); t2.join()
# Output: ['a'], then ['a', 'b'] - the list is shared!

# FIX: use None as default and create a new list inside the function
def append_to_safe(item, collection=None):
if collection is None:
collection = []
collection.append(item)
return collection

Mistake 3 - Daemon Thread Data Loss

import threading
import time

data_written = []

def write_data() -> None:
for i in range(10):
time.sleep(0.1)
data_written.append(i)
print(f"Wrote item {i}")

# BUG: daemon thread is killed when main thread exits
t = threading.Thread(target=write_data, daemon=True)
t.start()
time.sleep(0.35) # main thread sleeps briefly then exits
# Program exits here - daemon thread is killed mid-loop
# Only items 0, 1, 2 may have been written
print(f"Items written: {data_written}")

# FIX: either make it non-daemon (join it) or don't use daemon for stateful work
t2 = threading.Thread(target=write_data, daemon=False)
t2.start()
t2.join() # program waits for all writes to complete
print(f"Items written (safe): {data_written}")

Mistake 4 - Catching Exceptions in Threads

import threading

def risky_worker() -> None:
raise ValueError("Something went wrong in the thread")

t = threading.Thread(target=risky_worker)
t.start()
t.join()
# The exception was printed to stderr but did NOT propagate to the main thread
# Your main thread continues unaware that the worker failed

# FIX: use a results container to capture exceptions
import traceback

class Result:
def __init__(self):
self.value = None
self.exception = None

def safe_worker(result: Result) -> None:
try:
raise ValueError("Something went wrong")
except Exception as e:
result.exception = e

r = Result()
t = threading.Thread(target=safe_worker, args=(r,))
t.start()
t.join()

if r.exception:
raise r.exception # now the main thread knows the worker failed

:::tip Use ThreadPoolExecutor Instead of Raw Thread for Most Cases concurrent.futures.ThreadPoolExecutor handles thread creation, exception propagation, result collection, and shutdown automatically. It is easier to use correctly than raw Thread objects. Lesson 07 covers ThreadPoolExecutor in full - it is the recommended API for any code that creates more than one or two threads.

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch(url: str) -> str:
import urllib.request
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode()

urls = ["https://httpbin.org/get"] * 5

with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result() # re-raises exceptions from the worker
print(f"{url}: {len(data)} bytes")
except Exception as exc:
print(f"{url}: failed with {exc}")

:::

Full Example - Concurrent File Downloader

This example combines everything from this lesson: ThreadPoolExecutor for thread management, threading.Lock for protecting shared state, exception handling, and timing.

"""
Concurrent file downloader using ThreadPoolExecutor.

Demonstrates:
- ThreadPoolExecutor for managed thread pools
- threading.Lock for protecting shared results dict
- Exception handling per future (workers can fail independently)
- as_completed() for processing results as they arrive
- time.perf_counter() for accurate benchmarking
"""

from __future__ import annotations

import threading
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class DownloadResult:
url: str
bytes_downloaded: int = 0
elapsed_seconds: float = 0.0
error: Optional[str] = None

@property
def success(self) -> bool:
return self.error is None


def download_url(url: str, timeout: int = 30) -> DownloadResult:
"""Download a single URL. Raises on network error."""
start = time.perf_counter()
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
data = response.read()
return DownloadResult(
url=url,
bytes_downloaded=len(data),
elapsed_seconds=time.perf_counter() - start,
)
except Exception as exc:
return DownloadResult(
url=url,
elapsed_seconds=time.perf_counter() - start,
error=str(exc),
)


def download_all(
urls: list[str],
max_workers: int = 10,
timeout: int = 30,
) -> list[DownloadResult]:
"""
Download all URLs concurrently using a thread pool.

Args:
urls: List of URLs to download.
max_workers: Maximum number of concurrent threads.
timeout: Per-request timeout in seconds.

Returns:
List of DownloadResult objects in completion order.
"""
results: list[DownloadResult] = []
lock = threading.Lock()

# ThreadPoolExecutor as context manager - automatically joins all threads on exit
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="downloader") as pool:
# Map each URL to a Future
future_to_url: dict[Future[DownloadResult], str] = {
pool.submit(download_url, url, timeout): url
for url in urls
}

# Process results as they complete - not in submission order
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result() # re-raises exceptions from download_url
except Exception as exc:
result = DownloadResult(url=url, error=f"Unexpected: {exc}")

with lock:
results.append(result)

status = "OK" if result.success else f"FAILED: {result.error}"
print(f" [{result.elapsed_seconds:.2f}s] {url[:60]:<60} {status}")

return results


def print_summary(results: list[DownloadResult], total_elapsed: float) -> None:
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
total_bytes = sum(r.bytes_downloaded for r in successful)

print(f"\n{'=' * 60}")
print(f"Downloads : {len(results)} total, {len(successful)} succeeded, {len(failed)} failed")
print(f"Total data : {total_bytes:,} bytes ({total_bytes / 1024:.1f} KB)")
print(f"Wall time : {total_elapsed:.2f}s")
if successful:
avg_time = sum(r.elapsed_seconds for r in successful) / len(successful)
print(f"Avg per URL : {avg_time:.2f}s")
if failed:
print("\nFailed URLs:")
for r in failed:
print(f" {r.url}: {r.error}")


if __name__ == "__main__":
# httpbin.org delay endpoints simulate real network latency
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
]

print(f"Downloading {len(URLS)} URLs with max_workers=5\n")

wall_start = time.perf_counter()
results = download_all(URLS, max_workers=5)
wall_elapsed = time.perf_counter() - wall_start

print_summary(results, wall_elapsed)
# Sequential would take: sum of all delays (~9s)
# Concurrent (5 workers): ~2-3s - the I/O waits overlap

Sample output:

Downloading 8 URLs with max_workers=5

[0.31s] https://httpbin.org/get OK
[0.29s] https://httpbin.org/uuid OK
[0.30s] https://httpbin.org/headers OK
[0.33s] https://httpbin.org/ip OK
[1.28s] https://httpbin.org/delay/1 OK
[1.31s] https://httpbin.org/delay/1 OK
[1.30s] https://httpbin.org/delay/1 OK
[2.35s] https://httpbin.org/delay/2 OK

============================================================
Downloads : 8 total, 8 succeeded, 0 failed
Total data : 12,480 bytes (12.2 KB)
Wall time : 2.41s
Avg per URL : 0.96s

Sequential execution of these 8 URLs would take approximately 9 seconds. The concurrent version completes in 2.4 seconds because all I/O waits overlap.

Graded Practice Challenges

Beginner - Parallel Ping

Challenge: Write a function ping_hosts(hosts: list[str]) -> dict[str, bool] that checks whether each hostname in the list is reachable (returns HTTP 200) concurrently using threading.Thread. Return a dictionary mapping each host to True (reachable) or False (unreachable). Measure and print the wall time.

Requirements:

  • Use raw threading.Thread (not ThreadPoolExecutor)
  • Use threading.Lock to protect the shared results dict
  • Use a 3-second timeout per request
  • Print how long the concurrent version takes vs sequential
Show Solution
import threading
import time
import urllib.request

def ping_hosts(hosts: list[str]) -> dict[str, bool]:
results: dict[str, bool] = {}
lock = threading.Lock()

def check(host: str) -> None:
url = f"https://{host}"
try:
with urllib.request.urlopen(url, timeout=3) as resp:
reachable = resp.status == 200
except Exception:
reachable = False
with lock:
results[host] = reachable

threads = [threading.Thread(target=check, args=(h,)) for h in hosts]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Checked {len(hosts)} hosts in {time.perf_counter() - start:.2f}s")
return results

if __name__ == "__main__":
hosts = ["httpbin.org", "python.org", "github.com", "nonexistent.invalid"]
print(ping_hosts(hosts))

Key points:

  • The lock protects results - even though dict.__setitem__ is internally atomic, protecting it with a lock here is a good habit and becomes important the moment you do a read-modify-write (e.g., results[host] += 1).
  • All threads are started before any join() - starting and immediately joining each thread would serialize them.
  • Threads that fail (bad hostname, timeout) do not crash the loop - exceptions are caught inside check().

Intermediate - Thread-Safe Counter Class

Challenge: Implement a ThreadSafeCounter class that can be incremented and decremented by multiple threads concurrently without race conditions. The class must support:

  • increment(amount: int = 1) - add to the counter
  • decrement(amount: int = 1) - subtract from the counter
  • reset() - set counter to zero
  • value property - return the current count
  • __enter__ / __exit__ - so it can be used as a context manager that resets on exit

Prove it is race-condition-free by running 10 threads each incrementing 100,000 times and asserting the final value equals 1,000,000.

Show Solution
import threading

class ThreadSafeCounter:
def __init__(self, initial: int = 0) -> None:
self._value = initial
self._lock = threading.Lock()

def increment(self, amount: int = 1) -> None:
with self._lock:
self._value += amount

def decrement(self, amount: int = 1) -> None:
with self._lock:
self._value -= amount

def reset(self) -> None:
with self._lock:
self._value = 0

@property
def value(self) -> int:
with self._lock:
return self._value

def __enter__(self) -> "ThreadSafeCounter":
return self

def __exit__(self, *_) -> None:
self.reset()

def __repr__(self) -> str:
return f"ThreadSafeCounter(value={self.value})"


# Proof of correctness
counter = ThreadSafeCounter()
N_THREADS = 10
N_PER_THREAD = 100_000

def worker():
for _ in range(N_PER_THREAD):
counter.increment()

threads = [threading.Thread(target=worker) for _ in range(N_THREADS)]
for t in threads: t.start()
for t in threads: t.join()

expected = N_THREADS * N_PER_THREAD
actual = counter.value
assert actual == expected, f"Race condition! Expected {expected}, got {actual}"
print(f"PASSED: counter = {actual}")

# Context manager usage
with counter:
counter.increment(500)
print(f"Inside context: {counter.value}")
print(f"After context (reset): {counter.value}") # 0

Design notes:

  • The value property acquires the lock even for a read. This prevents a stale read if another thread is mid-increment. In practice, a read of a Python integer is atomic at the bytecode level, but the lock is a good habit that scales correctly to non-atomic reads.
  • __exit__ calls reset() which acquires the lock internally - no double-locking because reset() acquires, not __exit__.
  • The _lock is an instance attribute - each ThreadSafeCounter instance has its own lock. Shared locks across instances would create unnecessary contention.

Advanced - Worker Pool with Backpressure

Challenge: Implement a WorkerPool class that:

  • Maintains a fixed number of worker threads (passed as n_workers)
  • Accepts work items via a submit(task: callable) method that blocks if the internal queue is full (backpressure)
  • Workers process tasks from the queue and store results
  • Provides results() which blocks until all submitted work is complete and returns all results
  • Provides shutdown(wait=True) for graceful shutdown
  • Handles task exceptions - a task that raises should not crash the worker thread; the exception should be captured and included in the results

Prove it works correctly by submitting 100 tasks where 10 of them raise exceptions, and verifying that all 100 results are collected (90 success + 10 exceptions).

Show Solution
import queue
import threading
import time
from dataclasses import dataclass
from typing import Any, Callable, Optional

_SENTINEL = object() # unique object used to signal worker shutdown

@dataclass
class TaskResult:
task_id: int
value: Any = None
exception: Optional[Exception] = None

@property
def success(self) -> bool:
return self.exception is None


class WorkerPool:
"""
Fixed-size thread pool with a bounded work queue (backpressure).

Args:
n_workers: Number of worker threads to maintain.
max_queue_size: Maximum items in the work queue before submit() blocks.
"""

def __init__(self, n_workers: int, max_queue_size: int = 0) -> None:
self._n_workers = n_workers
self._work_queue: queue.Queue = queue.Queue(maxsize=max_queue_size)
self._results: list[TaskResult] = []
self._results_lock = threading.Lock()
self._task_counter = 0
self._counter_lock = threading.Lock()

self._workers = [
threading.Thread(
target=self._worker_loop,
name=f"pool-worker-{i}",
daemon=False,
)
for i in range(n_workers)
]
for w in self._workers:
w.start()

def _next_task_id(self) -> int:
with self._counter_lock:
self._task_counter += 1
return self._task_counter

def _worker_loop(self) -> None:
while True:
item = self._work_queue.get()
if item is _SENTINEL:
self._work_queue.task_done()
break

task_id, task = item
try:
value = task()
result = TaskResult(task_id=task_id, value=value)
except Exception as exc:
result = TaskResult(task_id=task_id, exception=exc)
finally:
with self._results_lock:
self._results.append(result)
self._work_queue.task_done()

def submit(self, task: Callable[[], Any]) -> int:
"""
Submit a task. Blocks if the queue is full (backpressure).
Returns the task_id assigned to this task.
"""
task_id = self._next_task_id()
self._work_queue.put((task_id, task)) # blocks if maxsize reached
return task_id

def results(self) -> list[TaskResult]:
"""Block until all submitted tasks complete, then return all results."""
self._work_queue.join() # blocks until all items have been task_done()
with self._results_lock:
return list(self._results)

def shutdown(self, wait: bool = True) -> None:
"""Send sentinel to each worker, optionally wait for them to finish."""
for _ in self._workers:
self._work_queue.put(_SENTINEL)
if wait:
for w in self._workers:
w.join()


# --- Correctness proof ---
if __name__ == "__main__":
pool = WorkerPool(n_workers=4, max_queue_size=20)

N_TASKS = 100
FAIL_IDS = set(range(0, 100, 10)) # tasks 0, 10, 20, ... will raise

for i in range(N_TASKS):
task_index = i # capture for closure
if task_index in FAIL_IDS:
def failing_task(idx=task_index):
raise ValueError(f"Task {idx} failed intentionally")
pool.submit(failing_task)
else:
def success_task(idx=task_index):
time.sleep(0.01) # simulate work
return idx * idx
pool.submit(success_task)

all_results = pool.results()
pool.shutdown(wait=True)

successes = [r for r in all_results if r.success]
failures = [r for r in all_results if not r.success]

assert len(all_results) == N_TASKS, f"Expected {N_TASKS} results, got {len(all_results)}"
assert len(successes) == 90, f"Expected 90 successes, got {len(successes)}"
assert len(failures) == 10, f"Expected 10 failures, got {len(failures)}"

print(f"PASSED: {len(successes)} successes, {len(failures)} failures, {len(all_results)} total")
print(f"Sample failure: {failures[0].exception}")

Design notes:

  • queue.Queue(maxsize=20) provides backpressure - submit() blocks when the queue is full, preventing unbounded memory growth. This is a production-essential pattern.
  • queue.Queue.join() + task_done() is the standard way to wait for all work items to be processed. Every get() must be paired with task_done() - the finally block ensures this even when the task raises.
  • Sentinel pattern - a unique sentinel object (_SENTINEL) tells workers to shut down. One sentinel per worker ensures exactly one worker exits per sentinel.
  • Task IDs allow correlating results to submissions, even when results arrive out of order.
  • Exception capture - the worker never crashes on a task exception; it captures it in TaskResult and continues. A crashing worker would reduce the pool size silently.

Key Takeaways

  • threading.Thread is started with start() and joined with join() - always join every thread you start, or use ThreadPoolExecutor which handles joining automatically
  • The thread lifecycle is: New → Runnable → Running → Blocked → Terminated. The GIL governs which Runnable thread transitions to Running at any moment
  • Daemon threads are killed when all non-daemon threads exit - never use daemon threads for stateful work that must complete; use them only for advisory background tasks
  • The GIL prevents multiple threads from executing Python bytecode simultaneously - threading does not speed up CPU-bound code; it actually adds overhead. For CPU parallelism, use multiprocessing
  • Threading provides real speedup for I/O-bound work - when a thread blocks on I/O it releases the GIL, and other threads run during the wait. Multiple I/O waits overlap, reducing wall time
  • Shared mutable state is dangerous without a lock - counter += 1 is three bytecode instructions, and the GIL can be released between any two. Protect every shared read-modify-write with threading.Lock
  • threading.local() provides per-thread storage - attributes are isolated per thread with no sharing or locks required; the right tool for per-request context in server frameworks
  • ThreadPoolExecutor is preferred over raw Thread - it handles thread creation, exception propagation, result collection, and clean shutdown; use raw Thread only when you need daemon behavior or fine-grained lifecycle control
  • queue.Queue is the thread-safe communication channel - do not use lists or dicts without locks to pass data between threads; Queue handles all synchronization internally

What's Next

Lesson 02 covers Multiprocessing - the tool that actually bypasses the GIL. You will learn how Python spawns separate interpreter processes, how data moves between them (pickling, Pipe, Queue, shared_memory), how to use ProcessPoolExecutor for parallel CPU computation, and why multiprocessing comes with a different set of tradeoffs: process startup cost, serialization overhead, and the impossibility of sharing in-memory state directly. If threading is concurrency, multiprocessing is true parallelism.

© 2026 EngineersOfAI. All rights reserved.