Threading in Python
Reading time: ~40 minutes | Level: Intermediate → Engineering
Before reading further, predict the output of this benchmark:
import threading
import time
def cpu_crunch():
"""Pure CPU work - no I/O, no sleep."""
x = 0
for _ in range(25_000_000):
x += 1
return x
# Sequential: run the function twice, back to back
start = time.perf_counter()
cpu_crunch()
cpu_crunch()
sequential_time = time.perf_counter() - start
# Parallel: run the function in 2 threads simultaneously
start = time.perf_counter()
t1 = threading.Thread(target=cpu_crunch)
t2 = threading.Thread(target=cpu_crunch)
t1.start(); t2.start()
t1.join(); t2.join()
threaded_time = time.perf_counter() - start
print(f"Sequential : {sequential_time:.3f}s")
print(f"2 Threads : {threaded_time:.3f}s")
print(f"Speedup : {sequential_time / threaded_time:.2f}x")
A developer adds threading expecting a 2x speedup. They have 4 physical cores available. What does the output actually show?
Show Answer
Typical output:
Sequential : 1.421s
2 Threads : 1.687s
Speedup : 0.84x
The threaded version is slower, not faster. The speedup is less than 1x.
This is the GIL in action. Because cpu_crunch() is pure Python computation, the GIL prevents both threads from running Python bytecode at the same time. They do not run in parallel - they take turns, each waiting for the other to release the GIL. The overhead of thread creation, context switching, and GIL contention makes the threaded version measurably slower than sequential execution.
The developer made the single most common threading mistake in Python: adding threads to CPU-bound code. Threading does not speed up CPU work in CPython. It never has. The correct tool for CPU-bound parallelism is multiprocessing - each process has its own GIL.
Threading does provide real, measurable speedup for I/O-bound work - network requests, file I/O, database calls. When a thread is waiting for the operating system to return data, it releases the GIL, and another thread can run. This is the only scenario where threading in Python improves throughput.
Every part of this lesson is designed to make that distinction viscerally clear - not just understood intellectually, but felt through code you can run and measure.
What You Will Learn
threading.Thread-target,args,kwargs,start(),join(),is_alive()- The thread lifecycle - New, Runnable, Running, Blocked, Terminated
- Daemon threads - what they are, when they help, why they can lose data
- The GIL - what it actually locks, why CPU-bound threads don't benefit, why I/O-bound threads do
- Sharing state between threads - the race condition hazard with full diagnosis
threading.local()- per-thread storage for state that must not be shared- When threading is the right tool and when it is the wrong one
- Common threading mistakes and how to recognize them
- A full concurrent file downloader using
ThreadPoolExecutor
Prerequisites
- Module 08 Overview - read the GIL section and the concurrency decision table
- Comfortable with Python functions, closures, and
withstatements - Basic understanding of what a process is (Module 03 - Python Internals recommended)
Part 1 - threading.Thread
The threading.Thread class is Python's basic unit of concurrent execution. Each Thread object represents an operating-system thread managed by the Python runtime.
Creating and Starting a Thread
import threading
import time
def worker(name: str, delay: float) -> None:
"""A simple task that simulates work with a sleep."""
print(f"[{name}] Starting")
time.sleep(delay)
print(f"[{name}] Done after {delay}s")
# Create a Thread - does NOT start it
t = threading.Thread(
target=worker, # the callable to run in the new thread
args=("Thread-A",), # positional arguments to target - note the trailing comma
kwargs={"delay": 1.5}, # keyword arguments to target
name="my-worker", # optional human-readable name (appears in tracebacks)
)
print(f"Thread alive before start: {t.is_alive()}") # False
t.start() # create the OS thread and begin execution
print(f"Thread alive after start: {t.is_alive()}") # True (probably)
t.join() # block until the thread finishes
print(f"Thread alive after join: {t.is_alive()}") # False
Thread alive before start: False
[Thread-A] Starting
Thread alive after start: True
[Thread-A] Done after 1.5s
Thread alive after join: False
Key Methods
| Method / Attribute | What it does |
|---|---|
t.start() | Start the thread - can only be called once |
t.join(timeout=None) | Block the calling thread until t finishes (or timeout expires) |
t.is_alive() | Returns True if the thread has been started and has not yet finished |
t.name | Human-readable name, used in logging and tracebacks |
t.ident | OS-assigned thread identifier (integer), available after start() |
t.daemon | Whether this thread is a daemon thread (see Part 3) |
threading.current_thread() | Returns the Thread object for the thread calling this |
threading.main_thread() | Returns the Thread object for the main thread |
threading.enumerate() | Returns a list of all currently alive Thread objects |
threading.active_count() | Returns the number of currently alive threads |
Running Multiple Threads
import threading
import time
def download_file(url: str, idx: int) -> None:
"""Simulate downloading a file from a URL."""
print(f"[{idx}] Downloading {url}")
time.sleep(1.0) # simulate network latency
print(f"[{idx}] Finished {url}")
urls = [
"https://example.com/file1.csv",
"https://example.com/file2.csv",
"https://example.com/file3.csv",
"https://example.com/file4.csv",
]
start = time.perf_counter()
# Create all threads
threads = [
threading.Thread(target=download_file, args=(url, i))
for i, url in enumerate(urls)
]
# Start all threads
for t in threads:
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
elapsed = time.perf_counter() - start
print(f"\nAll downloads complete in {elapsed:.2f}s")
# Sequential would take 4.0s; concurrent takes ~1.0s
:::tip Always Call join() on Every Thread You Start
A thread that you start but never join() is a thread you have lost control of. If it raises an exception, you will not see it. If it is still running when your program tries to exit, behavior depends on whether it is a daemon thread. Make it a habit to collect all threads and join them before your program proceeds past the concurrent section.
:::
Part 2 - Thread Lifecycle
Every thread passes through a defined set of states from creation to termination.
State Descriptions
New - The Thread object has been created with Thread(target=...) but start() has not been called. No OS thread exists yet. is_alive() returns False.
Runnable - start() has been called. The OS thread exists and is ready to run. It may or may not be executing at any given moment - the OS scheduler decides which runnable thread gets CPU time. is_alive() returns True.
Running - The thread is actively executing Python bytecode on a CPU core. In CPython, only one thread can be in this state at a time (the GIL enforces this). is_alive() returns True.
Blocked - The thread is waiting for something external: an I/O operation to complete, a lock to be released, a queue.get() to return data, or a time.sleep() timer to expire. While blocked, the thread releases the GIL, allowing other threads to run. is_alive() returns True.
Terminated - The target function has returned (or raised an uncaught exception). The OS thread is gone. is_alive() returns False. join() on a terminated thread returns immediately.
The GIL and State Transitions
The GIL interacts with the Runnable ↔ Running ↔ Blocked cycle:
- A thread in the Running state holds the GIL
- When a thread enters Blocked (waiting on I/O, a lock, or sleep), it releases the GIL - immediately allowing another thread to transition from Runnable to Running
- When the blocking operation completes, the thread re-acquires the GIL and transitions back to Runnable (not directly to Running - it must compete for the GIL again)
This is why I/O-bound threads benefit from concurrency: every thread that blocks on I/O frees the GIL for another thread's computation. Multiple threads can be in I/O simultaneously - the OS handles the I/O, and Python threads overlap.
Part 3 - Daemon Threads
A daemon thread is a thread that does not prevent the Python interpreter from exiting. When all non-daemon threads have finished, the interpreter shuts down - and any daemon threads still running are killed immediately, without cleanup.
import threading
import time
def background_monitor():
"""A long-running background task."""
while True:
print("[monitor] checking system health...")
time.sleep(2.0)
def foreground_work():
"""The main application work."""
print("[main] Starting work")
time.sleep(3.0)
print("[main] Work complete")
# daemon=True means this thread dies with the main thread
monitor = threading.Thread(
target=background_monitor,
daemon=True, # ← key argument
name="health-monitor",
)
worker = threading.Thread(target=foreground_work)
monitor.start()
worker.start()
worker.join() # wait for foreground work to finish
# The interpreter exits here - monitor is killed immediately
print("[main] All done, exiting")
[monitor] checking system health...
[main] Starting work
[monitor] checking system health...
[main] Work complete
[main] All done, exiting
The monitor thread was still running its infinite loop when the program exited - it was killed because it was a daemon thread.
When to Use Daemon Threads
| Use case | Daemon? | Reason |
|---|---|---|
| Background metrics collector | Yes | It is acceptable to lose an in-flight metric on exit |
| Background cache warmer | Yes | Cache will be rebuilt on next startup |
| Log file rotator | No | Losing the final log lines is dangerous |
| Database connection closer | No | Open connections should be closed gracefully |
| Task queue consumer | No | In-progress task must complete before exit |
:::warning Daemon Threads Cannot Clean Up
When a daemon thread is killed by interpreter shutdown, its finally blocks do not run, its context managers do not __exit__, and any data it was writing may be partially committed. Daemon threads are appropriate for work that is purely advisory (logging, monitoring, caching) and never appropriate for work that modifies persistent state (writing files, writing to databases, sending API calls).
:::
Checking and Setting the Daemon Property
import threading
t = threading.Thread(target=lambda: None)
# daemon must be set BEFORE start() - setting it after raises RuntimeError
t.daemon = True # property setter
# equivalently: threading.Thread(target=..., daemon=True)
print(threading.main_thread().daemon) # False - main thread is never a daemon
Threads inherit the daemon status of the thread that creates them. If a daemon thread creates a child thread, that child is also a daemon by default. Main thread is never a daemon.
Part 4 - The GIL in Depth
The Global Interpreter Lock is a mutex (mutual exclusion lock) inside the CPython interpreter. It ensures that only one thread at a time can execute Python bytecode.
What the GIL Actually Protects
The GIL protects CPython's internal state - primarily the reference counts on every Python object. CPython uses reference counting for memory management: every object has a ob_refcnt field. Incrementing and decrementing that field without the GIL would require every single object to have its own lock. The GIL is a single coarse-grained lock that protects all reference counts simultaneously.
The GIL does not protect your data structures. It does not make list.append() thread-safe. It does not make dictionary updates atomic. It does not protect any state that lives in your application code.
GIL Check Interval
The GIL is not held forever. CPython releases it every 5ms (configurable via sys.setswitchinterval()) to give other threads a chance to run. For I/O operations, the GIL is released immediately at the start of the system call - not after 5ms.
import sys
print(sys.getswitchinterval()) # 0.005 - default 5ms
# Increase to reduce GIL contention in thread-heavy applications
# (reduces switching overhead at the cost of thread fairness)
sys.setswitchinterval(0.020) # 20ms intervals
# Decrease to improve thread fairness in latency-sensitive code
sys.setswitchinterval(0.001) # 1ms intervals
CPU-Bound vs I/O-Bound: A Measured Comparison
import threading
import time
import urllib.request
def cpu_task():
x = 0
for _ in range(10_000_000):
x += 1
def io_task():
# urllib.request.urlopen releases the GIL during the network wait
try:
urllib.request.urlopen("https://httpbin.org/delay/1", timeout=10)
except Exception:
time.sleep(1) # fallback if no network
# --- CPU-bound benchmark ---
start = time.perf_counter()
cpu_task(); cpu_task()
print(f"CPU sequential : {time.perf_counter() - start:.3f}s")
start = time.perf_counter()
t1 = threading.Thread(target=cpu_task)
t2 = threading.Thread(target=cpu_task)
t1.start(); t2.start(); t1.join(); t2.join()
print(f"CPU 2 threads : {time.perf_counter() - start:.3f}s")
# 2 threads ≈ sequential or slower - GIL prevents overlap
# --- I/O-bound benchmark ---
start = time.perf_counter()
io_task(); io_task()
print(f"I/O sequential : {time.perf_counter() - start:.3f}s")
start = time.perf_counter()
t1 = threading.Thread(target=io_task)
t2 = threading.Thread(target=io_task)
t1.start(); t2.start(); t1.join(); t2.join()
print(f"I/O 2 threads : {time.perf_counter() - start:.3f}s")
# 2 threads ≈ 1x I/O time - both waited concurrently
Part 5 - Sharing State Between Threads
All threads in a process share the same memory. This is both threading's main advantage (easy communication between threads) and its main hazard (concurrent modification of shared data).
The Race Condition - Full Diagnosis
import threading
counter = 0 # shared mutable state
def increment(n: int) -> None:
global counter
for _ in range(n):
counter += 1 # NOT atomic - read-modify-write
t1 = threading.Thread(target=increment, args=(500_000,))
t2 = threading.Thread(target=increment, args=(500_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Final: {counter}") # Should be 1,000,000 - rarely is
Why counter += 1 is not atomic - disassemble it:
import dis
def f():
counter += 1
dis.dis(f)
2 LOAD_FAST 0 (counter) # 1. read current value
LOAD_CONST 1 (1)
BINARY_OP 0 (+) # 2. compute + 1
STORE_FAST 0 (counter) # 3. write new value
The GIL can be released between any two bytecode instructions. The window between instruction 1 (read) and instruction 3 (write) is wide enough for another thread to read the same value, compute, and write - causing one thread's update to overwrite the other's.
:::danger Shared Mutable State Is Dangerous Without Explicit Protection
A race condition with a counter produces a wrong number. A race condition in a bank transfer produces lost money. A race condition in a web session store produces a user seeing another user's data. The GIL does not protect your application data - it only protects CPython's internal reference counts. Every shared mutable data structure that multiple threads read and write must be explicitly protected with a synchronization primitive (a Lock, a Queue, or an atomic type). There are no exceptions.
:::
Fixing the Race with a Lock
import threading
counter = 0
lock = threading.Lock() # one lock protects the counter
def increment_safe(n: int) -> None:
global counter
for _ in range(n):
with lock: # acquire lock → do work → release lock
counter += 1
t1 = threading.Thread(target=increment_safe, args=(500_000,))
t2 = threading.Thread(target=increment_safe, args=(500_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(f"Final: {counter}") # Always 1,000,000
The with lock: block is a critical section - only one thread can be inside it at a time. When Thread 1 holds the lock, Thread 2 blocks at with lock: until Thread 1 exits the with block and releases the lock.
:::note Locks Are Covered in Full Depth in Lesson 06
This lesson introduces the race condition problem and shows the minimal lock-based fix. Lesson 06 - Locks and Semaphores covers Lock, RLock, Semaphore, Event, Condition, and Barrier in engineering depth, including deadlock patterns and production-safe usage.
:::
Thread-Safe Data Structures
Python's queue.Queue is thread-safe by design - it uses internal locks to protect all operations:
import threading
import queue
# queue.Queue is the right way to pass data between threads
work_queue: queue.Queue = queue.Queue()
results: queue.Queue = queue.Queue()
def producer(items: list) -> None:
for item in items:
work_queue.put(item) # thread-safe put
work_queue.put(None) # sentinel to signal completion
def consumer() -> None:
while True:
item = work_queue.get() # thread-safe get - blocks until item available
if item is None:
break
results.put(item * item) # thread-safe put
items = list(range(20))
p = threading.Thread(target=producer, args=(items,))
c = threading.Thread(target=consumer)
p.start(); c.start()
p.join(); c.join()
all_results = []
while not results.empty():
all_results.append(results.get())
print(sorted(all_results))
Python built-in operations that are thread-safe (protected by the GIL for the duration of the single bytecode instruction):
| Operation | Thread-safe? | Notes |
|---|---|---|
list.append(x) | Yes | Single bytecode instruction |
list[i] = x | Yes | Single bytecode instruction |
dict[key] = value | Yes | Single bytecode instruction |
dict.update(other) | No | Multiple steps internally |
counter += 1 | No | Three bytecode instructions (load, add, store) |
if x: y = x | No | Read-then-write; another thread can change x between check and use |
list.sort() | No | Mutates list across many steps |
:::warning "Thread-Safe Operations" Does Not Mean "Thread-Safe Logic"
Even though list.append() is internally thread-safe, the logic around it may not be. If one thread checks if len(lst) < MAX: and then another thread appends before the first thread appends, both threads have now appended - violating the intent. Use locks to protect logical invariants, not just individual operations.
:::
Part 6 - threading.local() - Per-Thread State
threading.local() creates an object where each thread sees its own isolated copy of the attributes. Threads do not share each other's local storage.
import threading
# Thread-local storage - each thread has its own 'state.user_id'
state = threading.local()
def handle_request(user_id: int) -> None:
# Setting state.user_id in Thread A has no effect on Thread B's state.user_id
state.user_id = user_id
process_request()
def process_request() -> None:
# Safe to read state.user_id - it is this thread's own copy
user = state.user_id
print(f"Processing for user {user} in thread {threading.current_thread().name}")
threads = [
threading.Thread(target=handle_request, args=(i,), name=f"worker-{i}")
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
Processing for user 0 in thread worker-0
Processing for user 2 in thread worker-2
Processing for user 1 in thread worker-1
Processing for user 3 in thread worker-3
Processing for user 4 in thread worker-4
Each thread sees only its own state.user_id. There is no race condition because there is no sharing.
Real-World Use Cases for threading.local()
- Database connection per thread - each worker thread holds its own connection from the pool; no locks needed
- Request context in web frameworks - Flask uses
threading.local()to store the current request object so thatflask.requestworks without passing the request object everywhere - Per-thread logging context - store a request ID in thread-local storage, then read it in the logging formatter without passing it through every function call
import threading
import logging
_log_context = threading.local()
def set_request_id(request_id: str) -> None:
_log_context.request_id = request_id
def get_request_id() -> str:
return getattr(_log_context, "request_id", "no-request")
class RequestIdFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = get_request_id()
return True
# Now every log line emitted by any function in this thread
# will include the request_id without any parameter passing
Part 7 - When to Use Threading
Threading is the right tool when your bottleneck is waiting, not computing.
Threading Helps
import threading
import urllib.request
import time
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
results = {}
lock = threading.Lock()
def fetch(url: str, idx: int) -> None:
start = time.perf_counter()
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = resp.read()
elapsed = time.perf_counter() - start
with lock:
results[idx] = {"url": url, "bytes": len(data), "elapsed": elapsed}
except Exception as exc:
with lock:
results[idx] = {"url": url, "error": str(exc)}
# Sequential: 5 × 1s = ~5 seconds
# Threaded: all 5 wait concurrently = ~1 second
threads = [threading.Thread(target=fetch, args=(url, i)) for i, url in enumerate(URLS)]
start = time.perf_counter()
for t in threads: t.start()
for t in threads: t.join()
print(f"Fetched {len(URLS)} URLs in {time.perf_counter() - start:.2f}s")
Threading Does Not Help (Use Multiprocessing Instead)
import threading
import multiprocessing
import time
def compute_heavy(n: int) -> int:
"""Pure CPU computation - no I/O."""
return sum(i * i for i in range(n))
N = 5_000_000
# Threading: both threads fight for the GIL - no actual parallelism
start = time.perf_counter()
t1 = threading.Thread(target=compute_heavy, args=(N,))
t2 = threading.Thread(target=compute_heavy, args=(N,))
t1.start(); t2.start(); t1.join(); t2.join()
thread_time = time.perf_counter() - start
# Multiprocessing: each process has its own GIL - true parallelism
start = time.perf_counter()
p1 = multiprocessing.Process(target=compute_heavy, args=(N,))
p2 = multiprocessing.Process(target=compute_heavy, args=(N,))
p1.start(); p2.start(); p1.join(); p2.join()
process_time = time.perf_counter() - start
print(f"Threading: {thread_time:.3f}s")
print(f"Multiprocessing: {process_time:.3f}s")
# Multiprocessing is typically ~2x faster on a dual-core machine
The Decision: Thread vs Process vs Async
Part 8 - Common Threading Mistakes
Mistake 1 - Forgetting join()
import threading
import time
results = []
def worker(idx: int) -> None:
time.sleep(0.5)
results.append(idx * idx)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
t.start()
# BUG: forgot to join - reading results before threads finish
print(results) # [] or partially filled - threads still running!
# FIX: join all threads before accessing shared results
for t in threads:
t.join()
print(results) # [0, 1, 4, 9, 16] - all results present
Mistake 2 - Passing a Mutable Default Argument
import threading
# BUG: the default list is shared across all calls - classic Python footgun
def append_to(item, collection=[]):
collection.append(item)
return collection
t1 = threading.Thread(target=lambda: print(append_to("a")))
t2 = threading.Thread(target=lambda: print(append_to("b")))
t1.start(); t1.join()
t2.start(); t2.join()
# Output: ['a'], then ['a', 'b'] - the list is shared!
# FIX: use None as default and create a new list inside the function
def append_to_safe(item, collection=None):
if collection is None:
collection = []
collection.append(item)
return collection
Mistake 3 - Daemon Thread Data Loss
import threading
import time
data_written = []
def write_data() -> None:
for i in range(10):
time.sleep(0.1)
data_written.append(i)
print(f"Wrote item {i}")
# BUG: daemon thread is killed when main thread exits
t = threading.Thread(target=write_data, daemon=True)
t.start()
time.sleep(0.35) # main thread sleeps briefly then exits
# Program exits here - daemon thread is killed mid-loop
# Only items 0, 1, 2 may have been written
print(f"Items written: {data_written}")
# FIX: either make it non-daemon (join it) or don't use daemon for stateful work
t2 = threading.Thread(target=write_data, daemon=False)
t2.start()
t2.join() # program waits for all writes to complete
print(f"Items written (safe): {data_written}")
Mistake 4 - Catching Exceptions in Threads
import threading
def risky_worker() -> None:
raise ValueError("Something went wrong in the thread")
t = threading.Thread(target=risky_worker)
t.start()
t.join()
# The exception was printed to stderr but did NOT propagate to the main thread
# Your main thread continues unaware that the worker failed
# FIX: use a results container to capture exceptions
import traceback
class Result:
def __init__(self):
self.value = None
self.exception = None
def safe_worker(result: Result) -> None:
try:
raise ValueError("Something went wrong")
except Exception as e:
result.exception = e
r = Result()
t = threading.Thread(target=safe_worker, args=(r,))
t.start()
t.join()
if r.exception:
raise r.exception # now the main thread knows the worker failed
:::tip Use ThreadPoolExecutor Instead of Raw Thread for Most Cases
concurrent.futures.ThreadPoolExecutor handles thread creation, exception propagation, result collection, and shutdown automatically. It is easier to use correctly than raw Thread objects. Lesson 07 covers ThreadPoolExecutor in full - it is the recommended API for any code that creates more than one or two threads.
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch(url: str) -> str:
import urllib.request
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode()
urls = ["https://httpbin.org/get"] * 5
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result() # re-raises exceptions from the worker
print(f"{url}: {len(data)} bytes")
except Exception as exc:
print(f"{url}: failed with {exc}")
:::
Full Example - Concurrent File Downloader
This example combines everything from this lesson: ThreadPoolExecutor for thread management, threading.Lock for protecting shared state, exception handling, and timing.
"""
Concurrent file downloader using ThreadPoolExecutor.
Demonstrates:
- ThreadPoolExecutor for managed thread pools
- threading.Lock for protecting shared results dict
- Exception handling per future (workers can fail independently)
- as_completed() for processing results as they arrive
- time.perf_counter() for accurate benchmarking
"""
from __future__ import annotations
import threading
import time
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class DownloadResult:
url: str
bytes_downloaded: int = 0
elapsed_seconds: float = 0.0
error: Optional[str] = None
@property
def success(self) -> bool:
return self.error is None
def download_url(url: str, timeout: int = 30) -> DownloadResult:
"""Download a single URL. Raises on network error."""
start = time.perf_counter()
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
data = response.read()
return DownloadResult(
url=url,
bytes_downloaded=len(data),
elapsed_seconds=time.perf_counter() - start,
)
except Exception as exc:
return DownloadResult(
url=url,
elapsed_seconds=time.perf_counter() - start,
error=str(exc),
)
def download_all(
urls: list[str],
max_workers: int = 10,
timeout: int = 30,
) -> list[DownloadResult]:
"""
Download all URLs concurrently using a thread pool.
Args:
urls: List of URLs to download.
max_workers: Maximum number of concurrent threads.
timeout: Per-request timeout in seconds.
Returns:
List of DownloadResult objects in completion order.
"""
results: list[DownloadResult] = []
lock = threading.Lock()
# ThreadPoolExecutor as context manager - automatically joins all threads on exit
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="downloader") as pool:
# Map each URL to a Future
future_to_url: dict[Future[DownloadResult], str] = {
pool.submit(download_url, url, timeout): url
for url in urls
}
# Process results as they complete - not in submission order
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result() # re-raises exceptions from download_url
except Exception as exc:
result = DownloadResult(url=url, error=f"Unexpected: {exc}")
with lock:
results.append(result)
status = "OK" if result.success else f"FAILED: {result.error}"
print(f" [{result.elapsed_seconds:.2f}s] {url[:60]:<60} {status}")
return results
def print_summary(results: list[DownloadResult], total_elapsed: float) -> None:
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
total_bytes = sum(r.bytes_downloaded for r in successful)
print(f"\n{'=' * 60}")
print(f"Downloads : {len(results)} total, {len(successful)} succeeded, {len(failed)} failed")
print(f"Total data : {total_bytes:,} bytes ({total_bytes / 1024:.1f} KB)")
print(f"Wall time : {total_elapsed:.2f}s")
if successful:
avg_time = sum(r.elapsed_seconds for r in successful) / len(successful)
print(f"Avg per URL : {avg_time:.2f}s")
if failed:
print("\nFailed URLs:")
for r in failed:
print(f" {r.url}: {r.error}")
if __name__ == "__main__":
# httpbin.org delay endpoints simulate real network latency
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
"https://httpbin.org/uuid",
"https://httpbin.org/headers",
"https://httpbin.org/ip",
]
print(f"Downloading {len(URLS)} URLs with max_workers=5\n")
wall_start = time.perf_counter()
results = download_all(URLS, max_workers=5)
wall_elapsed = time.perf_counter() - wall_start
print_summary(results, wall_elapsed)
# Sequential would take: sum of all delays (~9s)
# Concurrent (5 workers): ~2-3s - the I/O waits overlap
Sample output:
Downloading 8 URLs with max_workers=5
[0.31s] https://httpbin.org/get OK
[0.29s] https://httpbin.org/uuid OK
[0.30s] https://httpbin.org/headers OK
[0.33s] https://httpbin.org/ip OK
[1.28s] https://httpbin.org/delay/1 OK
[1.31s] https://httpbin.org/delay/1 OK
[1.30s] https://httpbin.org/delay/1 OK
[2.35s] https://httpbin.org/delay/2 OK
============================================================
Downloads : 8 total, 8 succeeded, 0 failed
Total data : 12,480 bytes (12.2 KB)
Wall time : 2.41s
Avg per URL : 0.96s
Sequential execution of these 8 URLs would take approximately 9 seconds. The concurrent version completes in 2.4 seconds because all I/O waits overlap.
Graded Practice Challenges
Beginner - Parallel Ping
Challenge: Write a function ping_hosts(hosts: list[str]) -> dict[str, bool] that checks whether each hostname in the list is reachable (returns HTTP 200) concurrently using threading.Thread. Return a dictionary mapping each host to True (reachable) or False (unreachable). Measure and print the wall time.
Requirements:
- Use raw
threading.Thread(notThreadPoolExecutor) - Use
threading.Lockto protect the shared results dict - Use a 3-second timeout per request
- Print how long the concurrent version takes vs sequential
Show Solution
import threading
import time
import urllib.request
def ping_hosts(hosts: list[str]) -> dict[str, bool]:
results: dict[str, bool] = {}
lock = threading.Lock()
def check(host: str) -> None:
url = f"https://{host}"
try:
with urllib.request.urlopen(url, timeout=3) as resp:
reachable = resp.status == 200
except Exception:
reachable = False
with lock:
results[host] = reachable
threads = [threading.Thread(target=check, args=(h,)) for h in hosts]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Checked {len(hosts)} hosts in {time.perf_counter() - start:.2f}s")
return results
if __name__ == "__main__":
hosts = ["httpbin.org", "python.org", "github.com", "nonexistent.invalid"]
print(ping_hosts(hosts))
Key points:
- The
lockprotectsresults- even thoughdict.__setitem__is internally atomic, protecting it with a lock here is a good habit and becomes important the moment you do a read-modify-write (e.g.,results[host] += 1). - All threads are started before any
join()- starting and immediately joining each thread would serialize them. - Threads that fail (bad hostname, timeout) do not crash the loop - exceptions are caught inside
check().
Intermediate - Thread-Safe Counter Class
Challenge: Implement a ThreadSafeCounter class that can be incremented and decremented by multiple threads concurrently without race conditions. The class must support:
increment(amount: int = 1)- add to the counterdecrement(amount: int = 1)- subtract from the counterreset()- set counter to zerovalueproperty - return the current count__enter__/__exit__- so it can be used as a context manager that resets on exit
Prove it is race-condition-free by running 10 threads each incrementing 100,000 times and asserting the final value equals 1,000,000.
Show Solution
import threading
class ThreadSafeCounter:
def __init__(self, initial: int = 0) -> None:
self._value = initial
self._lock = threading.Lock()
def increment(self, amount: int = 1) -> None:
with self._lock:
self._value += amount
def decrement(self, amount: int = 1) -> None:
with self._lock:
self._value -= amount
def reset(self) -> None:
with self._lock:
self._value = 0
@property
def value(self) -> int:
with self._lock:
return self._value
def __enter__(self) -> "ThreadSafeCounter":
return self
def __exit__(self, *_) -> None:
self.reset()
def __repr__(self) -> str:
return f"ThreadSafeCounter(value={self.value})"
# Proof of correctness
counter = ThreadSafeCounter()
N_THREADS = 10
N_PER_THREAD = 100_000
def worker():
for _ in range(N_PER_THREAD):
counter.increment()
threads = [threading.Thread(target=worker) for _ in range(N_THREADS)]
for t in threads: t.start()
for t in threads: t.join()
expected = N_THREADS * N_PER_THREAD
actual = counter.value
assert actual == expected, f"Race condition! Expected {expected}, got {actual}"
print(f"PASSED: counter = {actual}")
# Context manager usage
with counter:
counter.increment(500)
print(f"Inside context: {counter.value}")
print(f"After context (reset): {counter.value}") # 0
Design notes:
- The
valueproperty acquires the lock even for a read. This prevents a stale read if another thread is mid-increment. In practice, a read of a Python integer is atomic at the bytecode level, but the lock is a good habit that scales correctly to non-atomic reads. __exit__callsreset()which acquires the lock internally - no double-locking becausereset()acquires, not__exit__.- The
_lockis an instance attribute - eachThreadSafeCounterinstance has its own lock. Shared locks across instances would create unnecessary contention.
Advanced - Worker Pool with Backpressure
Challenge: Implement a WorkerPool class that:
- Maintains a fixed number of worker threads (passed as
n_workers) - Accepts work items via a
submit(task: callable)method that blocks if the internal queue is full (backpressure) - Workers process tasks from the queue and store results
- Provides
results()which blocks until all submitted work is complete and returns all results - Provides
shutdown(wait=True)for graceful shutdown - Handles task exceptions - a task that raises should not crash the worker thread; the exception should be captured and included in the results
Prove it works correctly by submitting 100 tasks where 10 of them raise exceptions, and verifying that all 100 results are collected (90 success + 10 exceptions).
Show Solution
import queue
import threading
import time
from dataclasses import dataclass
from typing import Any, Callable, Optional
_SENTINEL = object() # unique object used to signal worker shutdown
@dataclass
class TaskResult:
task_id: int
value: Any = None
exception: Optional[Exception] = None
@property
def success(self) -> bool:
return self.exception is None
class WorkerPool:
"""
Fixed-size thread pool with a bounded work queue (backpressure).
Args:
n_workers: Number of worker threads to maintain.
max_queue_size: Maximum items in the work queue before submit() blocks.
"""
def __init__(self, n_workers: int, max_queue_size: int = 0) -> None:
self._n_workers = n_workers
self._work_queue: queue.Queue = queue.Queue(maxsize=max_queue_size)
self._results: list[TaskResult] = []
self._results_lock = threading.Lock()
self._task_counter = 0
self._counter_lock = threading.Lock()
self._workers = [
threading.Thread(
target=self._worker_loop,
name=f"pool-worker-{i}",
daemon=False,
)
for i in range(n_workers)
]
for w in self._workers:
w.start()
def _next_task_id(self) -> int:
with self._counter_lock:
self._task_counter += 1
return self._task_counter
def _worker_loop(self) -> None:
while True:
item = self._work_queue.get()
if item is _SENTINEL:
self._work_queue.task_done()
break
task_id, task = item
try:
value = task()
result = TaskResult(task_id=task_id, value=value)
except Exception as exc:
result = TaskResult(task_id=task_id, exception=exc)
finally:
with self._results_lock:
self._results.append(result)
self._work_queue.task_done()
def submit(self, task: Callable[[], Any]) -> int:
"""
Submit a task. Blocks if the queue is full (backpressure).
Returns the task_id assigned to this task.
"""
task_id = self._next_task_id()
self._work_queue.put((task_id, task)) # blocks if maxsize reached
return task_id
def results(self) -> list[TaskResult]:
"""Block until all submitted tasks complete, then return all results."""
self._work_queue.join() # blocks until all items have been task_done()
with self._results_lock:
return list(self._results)
def shutdown(self, wait: bool = True) -> None:
"""Send sentinel to each worker, optionally wait for them to finish."""
for _ in self._workers:
self._work_queue.put(_SENTINEL)
if wait:
for w in self._workers:
w.join()
# --- Correctness proof ---
if __name__ == "__main__":
pool = WorkerPool(n_workers=4, max_queue_size=20)
N_TASKS = 100
FAIL_IDS = set(range(0, 100, 10)) # tasks 0, 10, 20, ... will raise
for i in range(N_TASKS):
task_index = i # capture for closure
if task_index in FAIL_IDS:
def failing_task(idx=task_index):
raise ValueError(f"Task {idx} failed intentionally")
pool.submit(failing_task)
else:
def success_task(idx=task_index):
time.sleep(0.01) # simulate work
return idx * idx
pool.submit(success_task)
all_results = pool.results()
pool.shutdown(wait=True)
successes = [r for r in all_results if r.success]
failures = [r for r in all_results if not r.success]
assert len(all_results) == N_TASKS, f"Expected {N_TASKS} results, got {len(all_results)}"
assert len(successes) == 90, f"Expected 90 successes, got {len(successes)}"
assert len(failures) == 10, f"Expected 10 failures, got {len(failures)}"
print(f"PASSED: {len(successes)} successes, {len(failures)} failures, {len(all_results)} total")
print(f"Sample failure: {failures[0].exception}")
Design notes:
queue.Queue(maxsize=20)provides backpressure -submit()blocks when the queue is full, preventing unbounded memory growth. This is a production-essential pattern.queue.Queue.join()+task_done()is the standard way to wait for all work items to be processed. Everyget()must be paired withtask_done()- thefinallyblock ensures this even when the task raises.- Sentinel pattern - a unique sentinel object (
_SENTINEL) tells workers to shut down. One sentinel per worker ensures exactly one worker exits per sentinel. - Task IDs allow correlating results to submissions, even when results arrive out of order.
- Exception capture - the worker never crashes on a task exception; it captures it in
TaskResultand continues. A crashing worker would reduce the pool size silently.
Key Takeaways
threading.Threadis started withstart()and joined withjoin()- always join every thread you start, or useThreadPoolExecutorwhich handles joining automatically- The thread lifecycle is: New → Runnable → Running → Blocked → Terminated. The GIL governs which Runnable thread transitions to Running at any moment
- Daemon threads are killed when all non-daemon threads exit - never use daemon threads for stateful work that must complete; use them only for advisory background tasks
- The GIL prevents multiple threads from executing Python bytecode simultaneously - threading does not speed up CPU-bound code; it actually adds overhead. For CPU parallelism, use
multiprocessing - Threading provides real speedup for I/O-bound work - when a thread blocks on I/O it releases the GIL, and other threads run during the wait. Multiple I/O waits overlap, reducing wall time
- Shared mutable state is dangerous without a lock -
counter += 1is three bytecode instructions, and the GIL can be released between any two. Protect every shared read-modify-write withthreading.Lock threading.local()provides per-thread storage - attributes are isolated per thread with no sharing or locks required; the right tool for per-request context in server frameworksThreadPoolExecutoris preferred over rawThread- it handles thread creation, exception propagation, result collection, and clean shutdown; use rawThreadonly when you need daemon behavior or fine-grained lifecycle controlqueue.Queueis the thread-safe communication channel - do not use lists or dicts without locks to pass data between threads;Queuehandles all synchronization internally
What's Next
Lesson 02 covers Multiprocessing - the tool that actually bypasses the GIL. You will learn how Python spawns separate interpreter processes, how data moves between them (pickling, Pipe, Queue, shared_memory), how to use ProcessPoolExecutor for parallel CPU computation, and why multiprocessing comes with a different set of tradeoffs: process startup cost, serialization overhead, and the impossibility of sharing in-memory state directly. If threading is concurrency, multiprocessing is true parallelism.
