Skip to main content

ThreadPoolExecutor and ProcessPoolExecutor

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, study these two implementations of the same task - fetching data from 10 URLs concurrently:

Before: raw threading.Thread

import threading
import urllib.request

results = {}
errors = {}
lock = threading.Lock()

def fetch(url):
try:
with urllib.request.urlopen(url, timeout=5) as r:
data = r.read()
with lock:
results[url] = data
except Exception as e:
with lock:
errors[url] = e

urls = [f"https://httpbin.org/get?n={i}" for i in range(10)]

threads = []
for url in urls:
t = threading.Thread(target=fetch, args=(url,))
threads.append(t)
t.start()

for t in threads:
t.join()

# Exceptions are swallowed into errors dict - easy to miss
print(f"Got {len(results)} results, {len(errors)} errors")

After: ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

def fetch(url):
with urllib.request.urlopen(url, timeout=5) as r:
return r.read() # exception propagates naturally

urls = [f"https://httpbin.org/get?n={i}" for i in range(10)]

with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
print(f"{url}: {len(data)} bytes")
except Exception as e:
print(f"{url}: failed - {e}")

The second version is shorter, handles exceptions correctly, requires no manual locking for result collection, and scales to any number of URLs by changing one number. This is what concurrent.futures is designed for.

What You Will Learn

  • The concurrent.futures unified interface for thread and process pools
  • ThreadPoolExecutor - submit(), map(), context manager, max_workers
  • Future objects - result(), exception(), done(), cancel(), add_done_callback()
  • as_completed() - why processing results as they arrive beats waiting for all
  • executor.map() - clean syntax for uniform workloads
  • Exception handling - how exceptions are stored and re-raised from Future.result()
  • ProcessPoolExecutor - same interface, bypass the GIL for CPU-bound work
  • Choosing pool size - the right heuristic for I/O vs CPU workloads
  • Combining asyncio with ThreadPoolExecutor via loop.run_in_executor()

Prerequisites

  • Threading fundamentals (Module 08, Lesson 01)
  • The GIL (Module 03, Lesson 04)
  • Asyncio (Module 08, Lesson 03)
  • Locks and synchronization (Module 08, Lesson 06)

Part 1 - concurrent.futures: The Unified Interface

The concurrent.futures module (introduced in Python 3.2) provides a high-level, unified API for both thread-based and process-based concurrency. The central concept is the executor: an object that manages a pool of workers and accepts tasks.

concurrent.futures
├── Executor (abstract base)
│ ├── ThreadPoolExecutor - thread pool, shared memory, GIL applies
│ └── ProcessPoolExecutor - process pool, separate memory, GIL bypassed
└── Future - represents the result of an async computation

The same code that runs on a ThreadPoolExecutor runs on a ProcessPoolExecutor - you switch backends by changing the class name. This design makes the I/O vs CPU decision easy to change later.

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def compute(n):
return n * n

# Thread pool version
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, range(10)))

# Process pool version - identical code, different backend
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, range(10)))

Part 2 - ThreadPoolExecutor: Core Usage

Creating the Pool

from concurrent.futures import ThreadPoolExecutor

# Explicit max_workers
executor = ThreadPoolExecutor(max_workers=10)

# Default: min(32, os.cpu_count() + 4) in Python 3.8+
executor = ThreadPoolExecutor()

# With a meaningful thread name prefix for debugging
executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="Fetcher")

The Context Manager Pattern

Always use ThreadPoolExecutor as a context manager:

with ThreadPoolExecutor(max_workers=5) as executor:
# Submit work here
pass
# On exit: waits for all submitted futures to complete, then shuts down

The with block's __exit__ calls executor.shutdown(wait=True) - it blocks until every submitted task finishes. This is the safe, correct pattern. Never leave a pool open without shutting it down.

submit() - Submit a Single Task

submit(fn, *args, **kwargs) schedules fn(*args, **kwargs) to run in a worker thread and immediately returns a Future object:

from concurrent.futures import ThreadPoolExecutor
import time

def slow_square(n):
time.sleep(1)
return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
# submit() returns immediately - does not wait for result
future = executor.submit(slow_square, 5)

print(f"Future done? {future.done()}") # False - still running
result = future.result() # blocks until done
print(f"Result: {result}") # 25
print(f"Future done? {future.done()}") # True

Submitting Many Tasks

from concurrent.futures import ThreadPoolExecutor

def process(item):
return item.upper()

items = ["alpha", "beta", "gamma", "delta", "epsilon"]

with ThreadPoolExecutor(max_workers=3) as executor:
# Submit all tasks and keep the futures
futures = [executor.submit(process, item) for item in items]

# Collect results in submission order
results = [f.result() for f in futures]

print(results) # ['ALPHA', 'BETA', 'GAMMA', 'DELTA', 'EPSILON']

Part 3 - Future Objects

A Future represents a computation that may not have completed yet. It is the central object in concurrent.futures.

Full Future API

from concurrent.futures import ThreadPoolExecutor
import time

def task():
time.sleep(0.5)
return "done"

def failing_task():
raise ValueError("something went wrong")

with ThreadPoolExecutor(max_workers=2) as executor:
f1 = executor.submit(task)
f2 = executor.submit(failing_task)

# Check status without blocking
print(f1.done()) # False (probably)
print(f1.running()) # True (probably)
print(f1.cancelled()) # False

# Block until result is ready (optionally with timeout)
result = f1.result(timeout=5.0) # raises TimeoutError if > 5s
print(result) # "done"

# Exception handling
exc = f2.exception() # returns the exception, does not raise
print(type(exc)) # <class 'ValueError'>

try:
f2.result() # raises the stored exception
except ValueError as e:
print(f"Caught: {e}") # "Caught: something went wrong"

add_done_callback()

Register a callback to run when the future completes - useful for non-blocking result handling:

from concurrent.futures import ThreadPoolExecutor

def on_complete(future):
"""Called in the thread that completes the future (or the calling thread)."""
if future.exception():
print(f"Task failed: {future.exception()}")
else:
print(f"Task succeeded: {future.result()}")

with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(lambda: 42)
future.add_done_callback(on_complete) # called when future completes
# Output: Task succeeded: 42

:::warning Callbacks Run in Worker Threads add_done_callback() callbacks run in the thread that resolves the Future - typically a worker thread, not the main thread. If your callback accesses shared state, you need synchronization. Keep callbacks short and safe. :::

cancel() - Cancelling Pending Tasks

from concurrent.futures import ThreadPoolExecutor
import time

def slow():
time.sleep(10)
return "result"

with ThreadPoolExecutor(max_workers=1) as executor:
# Submit 2 tasks to a pool with 1 worker
f1 = executor.submit(slow) # starts immediately
f2 = executor.submit(slow) # queued, not yet running

# Cancel f2 while it's still PENDING - this works
cancelled = f2.cancel()
print(f"f2 cancelled: {cancelled}") # True

# Cannot cancel f1 - it's already RUNNING
cancelled = f1.cancel()
print(f"f1 cancelled: {cancelled}") # False

f1.result() # wait for f1

A Future can only be cancelled if it has not yet started running. There is no mechanism to interrupt a running thread in Python - design your tasks to check a stop signal periodically if you need cancellation of running tasks.

Part 4 - as_completed(): Results as They Arrive

as_completed(futures) takes an iterable of Future objects and returns an iterator that yields futures as they complete, in completion order - not submission order.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def fetch_data(endpoint_id):
delay = random.uniform(0.1, 2.0)
time.sleep(delay)
return {"id": endpoint_id, "data": f"result_{endpoint_id}", "delay": delay}

endpoints = list(range(10))

with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(fetch_data, ep_id): ep_id
for ep_id in endpoints
}

for future in as_completed(futures):
ep_id = futures[future]
try:
result = future.result()
print(f"Endpoint {ep_id} done in {result['delay']:.2f}s: {result['data']}")
except Exception as e:
print(f"Endpoint {ep_id} failed: {e}")

as_completed() vs Ordered Collection - The Latency Difference

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

tasks = [3, 1, 2] # task durations in seconds

with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(time.sleep, t) for t in tasks]

# WRONG for latency: waits for task 0 (3s) even though task 1 (1s) is already done
start = time.time()
results = [f.result() for f in futures] # ordered - waits for each in order
print(f"Ordered collection: {time.time() - start:.1f}s") # 3.0s total

# RIGHT for latency: processes each result as soon as it's ready
futures = [executor.submit(time.sleep, t) for t in tasks]
start = time.time()
for f in as_completed(futures):
f.result() # available immediately
print(f"Got result at t={time.time() - start:.1f}s")
# Output: result at 1.0s, 2.0s, 3.0s - results processed as they arrive

The key insight: if you have 20 tasks and 19 complete in 0.1 seconds but one takes 30 seconds, ordered collection means you process nothing for 30 seconds. as_completed() lets you handle the 19 fast results immediately.

:::tip Prefer as_completed() When Results Are Independent If the results of your tasks do not depend on each other, use as_completed(). It minimises latency - you process results as they arrive rather than waiting for the slowest task before touching any result. Use ordered collection only when the order of results matters to your downstream logic. :::

Part 5 - executor.map(): Uniform Workloads

executor.map(fn, *iterables, timeout=None, chunksize=1) is the clean API for applying one function to many inputs. It is equivalent to the built-in map() but runs concurrently.

from concurrent.futures import ThreadPoolExecutor
import time

def process(item):
time.sleep(0.5)
return item * 2

items = list(range(10))

with ThreadPoolExecutor(max_workers=5) as executor:
# map() submits all tasks and returns results in INPUT ORDER
results = list(executor.map(process, items))
print(results) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

map() with Multiple Iterables

from concurrent.futures import ThreadPoolExecutor

def add(a, b):
return a + b

with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(add, [1, 2, 3, 4], [10, 20, 30, 40]))
print(results) # [11, 22, 33, 44]

Lazy Evaluation - map() Returns an Iterator

executor.map() returns a lazy iterator. Results are computed concurrently but only materialised when you iterate:

with ThreadPoolExecutor(max_workers=4) as executor:
result_iter = executor.map(process, items) # tasks start immediately

# Results materialise as you iterate - in input order
for result in result_iter:
print(result) # may block if the next result isn't ready yet

map() with Timeout

with ThreadPoolExecutor(max_workers=4) as executor:
try:
results = list(executor.map(process, items, timeout=5.0))
except TimeoutError:
print("At least one task exceeded 5 seconds")

map() Exception Handling

Unlike submit(), exceptions from map() are raised when you iterate - not stored for inspection:

def maybe_fail(n):
if n == 5:
raise ValueError(f"Failed on {n}")
return n * 2

with ThreadPoolExecutor(max_workers=4) as executor:
results_iter = executor.map(maybe_fail, range(10))
try:
results = list(results_iter) # raises ValueError when n=5 is reached
except ValueError as e:
print(f"map() raised: {e}")
# Previous results are lost - no partial collection

This is the key limitation of map(): a single exception stops iteration. For robust error handling where you want to continue processing despite some failures, use submit() + as_completed().

Part 6 - Exception Handling Patterns

Pattern 1: Per-Task Exception Handling with as_completed()

from concurrent.futures import ThreadPoolExecutor, as_completed

def risky_task(n):
if n % 3 == 0:
raise RuntimeError(f"Task {n} failed")
return n * 10

tasks = list(range(12))
successes = []
failures = []

with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(risky_task, n): n for n in tasks}

for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
successes.append((n, result))
except RuntimeError as e:
failures.append((n, str(e)))

print(f"Succeeded: {len(successes)}, Failed: {len(failures)}")
print(f"Failures: {failures}")

Pattern 2: Timeout per Task

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError

def slow_task(n):
import time
time.sleep(n)
return n

with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(slow_task, i) for i in [1, 5, 2, 8, 3]]

results = []
for future in futures:
try:
result = future.result(timeout=4.0) # give each task 4s max
results.append(result)
except FuturesTimeoutError:
results.append(None)
print(f"Task timed out")
except Exception as e:
results.append(None)
print(f"Task failed: {e}")

print(results) # [1, None, 2, None, 3] - tasks over 4s are None

Pattern 3: Retry on Failure

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_with_retry(url: str, max_retries: int = 3) -> str:
"""Retry logic lives inside the task function."""
for attempt in range(max_retries):
try:
# simulate fetch
import random
if random.random() < 0.5:
raise ConnectionError(f"Failed to connect to {url}")
return f"data from {url}"
except ConnectionError:
if attempt == max_retries - 1:
raise # re-raise on final attempt
time.sleep(2 ** attempt) # exponential backoff: 1s, 2s, 4s

urls = [f"https://api.example.com/endpoint/{i}" for i in range(5)]

with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_with_retry, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
print(f"{url}: {future.result()}")
except ConnectionError as e:
print(f"{url}: permanently failed - {e}")

Part 7 - ProcessPoolExecutor: Same Interface, Bypasses GIL

ProcessPoolExecutor uses separate processes instead of threads. Each process has its own Python interpreter and GIL, so CPU-bound work runs truly in parallel.

from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n: int) -> bool:
"""CPU-intensive primality check."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True

numbers = list(range(10_000_000, 10_001_000))

# ThreadPoolExecutor - GIL limits true parallelism for CPU work
# ProcessPoolExecutor - true parallelism, each process runs independently
with ProcessPoolExecutor(max_workers=4) as executor:
primes = [n for n, is_p in zip(numbers, executor.map(is_prime, numbers)) if is_p]

print(f"Found {len(primes)} primes")

When to Use Thread Pool vs Process Pool

Key Constraints of ProcessPoolExecutor

# 1. Function and arguments must be picklable
def add(a, b):
return a + b

# OK: add and (1, 2) are picklable
with ProcessPoolExecutor() as executor:
result = executor.submit(add, 1, 2).result()

# NOT OK: lambda functions are not picklable
# executor.submit(lambda x: x * 2, 5) # raises PicklingError

# 2. Results must also be picklable
# 3. Shared state requires multiprocessing primitives (Manager, Queue)
# 4. Startup overhead per process - not worth it for tiny tasks

:::warning ProcessPoolExecutor Requires Picklable Arguments and Results Everything passed to or returned from a process pool worker must be serialisable by pickle. Lambdas, local functions, open file handles, and locks are not picklable. Restructure your tasks to use module-level functions and plain data types. :::

Part 8 - Choosing Pool Size

Thread Pool: I/O-Bound Work

For I/O-bound work, threads spend most of their time waiting - so you can run many more threads than CPU cores:

import os
from concurrent.futures import ThreadPoolExecutor

cpu_count = os.cpu_count() or 4

# Common heuristics for I/O-bound thread pools:
# - Python 3.8+ default: min(32, cpu_count + 4)
# - Aggressive: cpu_count * 10 (many short I/O waits)
# - Conservative: cpu_count * 2 (some CPU work mixed in)

# For HTTP API calls (mostly waiting):
io_pool = ThreadPoolExecutor(max_workers=cpu_count * 5)

# For file I/O (disk-speed limited):
file_pool = ThreadPoolExecutor(max_workers=cpu_count * 2)

Process Pool: CPU-Bound Work

For CPU-bound work, use exactly as many processes as logical CPUs - more causes context-switching overhead:

import os
from concurrent.futures import ProcessPoolExecutor

cpu_pool = ProcessPoolExecutor(max_workers=os.cpu_count())

Empirical Tuning

The best pool size depends on your workload. Benchmark it:

import time
import os
from concurrent.futures import ThreadPoolExecutor

def benchmark_pool_size(task_fn, tasks, worker_counts):
for workers in worker_counts:
start = time.time()
with ThreadPoolExecutor(max_workers=workers) as executor:
list(executor.map(task_fn, tasks))
elapsed = time.time() - start
print(f"workers={workers:3d}: {elapsed:.2f}s")

# Run with different worker counts and observe where throughput peaks
benchmark_pool_size(my_io_task, my_tasks, [2, 4, 8, 16, 32, 64])

Part 9 - Combining asyncio with ThreadPoolExecutor

run_in_executor() bridges the async and sync worlds: it runs a blocking function in a thread pool without blocking the event loop.

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io(n):
"""A blocking function that cannot be made async."""
time.sleep(n)
return f"done in {n}s"

async def main():
loop = asyncio.get_event_loop()

# Run blocking_io in a thread pool - event loop stays free
with ThreadPoolExecutor(max_workers=5) as pool:
# asyncio.gather runs all three concurrently
results = await asyncio.gather(
loop.run_in_executor(pool, blocking_io, 1),
loop.run_in_executor(pool, blocking_io, 2),
loop.run_in_executor(pool, blocking_io, 3),
)
print(results) # ['done in 1s', 'done in 2s', 'done in 3s'] after ~3s total

asyncio.run(main())

Setting the Default Executor

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
loop = asyncio.get_event_loop()

# Set a custom default executor for the event loop
loop.set_default_executor(ThreadPoolExecutor(max_workers=20))

# run_in_executor(None, ...) uses the default executor
result = await loop.run_in_executor(None, time.sleep, 1)

Python 3.9+ - asyncio.to_thread()

Python 3.9 introduced a cleaner API for the common case:

import asyncio
import time

def blocking_fetch(url):
time.sleep(1) # blocking I/O
return f"data from {url}"

async def main():
# asyncio.to_thread() wraps run_in_executor with the default thread pool
result = await asyncio.to_thread(blocking_fetch, "https://example.com")
print(result)

# Concurrent execution
results = await asyncio.gather(
asyncio.to_thread(blocking_fetch, "https://api.one.com"),
asyncio.to_thread(blocking_fetch, "https://api.two.com"),
asyncio.to_thread(blocking_fetch, "https://api.three.com"),
)
print(results)

asyncio.run(main())

Full Example: Concurrent API Client with Timeout and Error Handling

A production-grade API client that fetches from 20 endpoints concurrently, handles timeouts, retries transient failures, and reports results as they arrive:

from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import urllib.request
import urllib.error
import time
import json
import logging
from dataclasses import dataclass, field
from typing import Optional

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(threadName)s: %(message)s",
datefmt="%H:%M:%S"
)

@dataclass
class FetchResult:
endpoint_id: int
url: str
success: bool
data: Optional[dict] = None
error: Optional[str] = None
duration_ms: float = 0.0
attempts: int = 0


def fetch_endpoint(
endpoint_id: int,
base_url: str,
timeout: float = 5.0,
max_retries: int = 3,
) -> FetchResult:
"""
Fetch a single endpoint with retry logic and timing.
Retries only on transient errors (timeouts, connection errors).
"""
url = f"{base_url}/endpoint/{endpoint_id}"
start = time.time()

for attempt in range(1, max_retries + 1):
try:
req = urllib.request.Request(
url,
headers={"User-Agent": "ConcurrentAPIClient/1.0"},
)
with urllib.request.urlopen(req, timeout=timeout) as response:
raw = response.read()
data = json.loads(raw)
duration = (time.time() - start) * 1000
logging.info(f"Endpoint {endpoint_id}: OK in {duration:.0f}ms")
return FetchResult(
endpoint_id=endpoint_id,
url=url,
success=True,
data=data,
duration_ms=duration,
attempts=attempt,
)

except urllib.error.HTTPError as e:
# HTTP 4xx/5xx - 4xx are not retryable, 5xx are
if e.code < 500:
duration = (time.time() - start) * 1000
return FetchResult(
endpoint_id=endpoint_id,
url=url,
success=False,
error=f"HTTP {e.code}: {e.reason}",
duration_ms=duration,
attempts=attempt,
)
# 5xx: retryable - fall through to retry logic
last_error = f"HTTP {e.code}"

except (urllib.error.URLError, TimeoutError, OSError) as e:
last_error = str(e)

# Exponential backoff before retry
if attempt < max_retries:
backoff = 0.5 * (2 ** (attempt - 1)) # 0.5s, 1s, 2s
logging.warning(
f"Endpoint {endpoint_id}: attempt {attempt} failed - "
f"retrying in {backoff:.1f}s ({last_error})"
)
time.sleep(backoff)

duration = (time.time() - start) * 1000
logging.error(f"Endpoint {endpoint_id}: all {max_retries} attempts failed")
return FetchResult(
endpoint_id=endpoint_id,
url=url,
success=False,
error=last_error,
duration_ms=duration,
attempts=max_retries,
)


def fetch_all_endpoints(
base_url: str,
endpoint_ids: list[int],
max_workers: int = 10,
per_task_timeout: float = 30.0,
) -> list[FetchResult]:
"""
Fetch all endpoints concurrently.
Results are collected as they complete - fast endpoints are processed first.
"""
results: list[FetchResult] = []
total = len(endpoint_ids)
completed = 0

with ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="APIFetcher"
) as executor:
# Submit all tasks
future_to_id = {
executor.submit(fetch_endpoint, ep_id, base_url): ep_id
for ep_id in endpoint_ids
}

# Process results as they arrive - not in submission order
for future in as_completed(future_to_id, timeout=per_task_timeout):
ep_id = future_to_id[future]
completed += 1

try:
result = future.result()
except Exception as e:
# Should not happen - fetch_endpoint returns FetchResult even on error
result = FetchResult(
endpoint_id=ep_id,
url=f"{base_url}/endpoint/{ep_id}",
success=False,
error=f"Unexpected: {e}",
)

results.append(result)
status = "OK" if result.success else "FAIL"
logging.info(
f"[{completed}/{total}] Endpoint {ep_id}: {status} "
f"({result.duration_ms:.0f}ms, {result.attempts} attempt(s))"
)

return results


def print_summary(results: list[FetchResult]) -> None:
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
avg_ms = sum(r.duration_ms for r in results) / len(results) if results else 0

print(f"\n{'='*50}")
print(f"SUMMARY: {len(successes)}/{len(results)} succeeded")
print(f"Average duration: {avg_ms:.0f}ms")
if failures:
print(f"\nFailed endpoints:")
for r in failures:
print(f" Endpoint {r.endpoint_id}: {r.error}")
print('='*50)


if __name__ == "__main__":
BASE_URL = "https://httpbin.org" # public test API
ENDPOINT_IDS = list(range(1, 21)) # 20 endpoints

start = time.time()
results = fetch_all_endpoints(
base_url=BASE_URL,
endpoint_ids=ENDPOINT_IDS,
max_workers=10,
per_task_timeout=60.0,
)
elapsed = time.time() - start

print_summary(results)
print(f"Total wall time: {elapsed:.2f}s")

submit() vs map() vs as_completed() - When to Use Which

APIOrderException handlingBest for
executor.map(fn, items)Input order preservedRaised on iteration, stops allUniform tasks, ordered results needed
executor.submit() + list comprehensionInput orderRaised on .result()When you want ordered results but per-task error handling
executor.submit() + as_completed()Completion orderPer-future try/exceptHeterogeneous tasks, minimise latency, per-task errors

Graded Practice Challenges

Beginner - Parallel File Reader

Write a function read_files_concurrently(paths) that reads a list of file paths in parallel using ThreadPoolExecutor and returns a dict mapping path → content. Handle FileNotFoundError gracefully (map missing files to None).

from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Optional

def read_files_concurrently(paths: list[str]) -> dict[str, Optional[str]]:
# Your implementation here
pass

# Test:
import tempfile, os

with tempfile.TemporaryDirectory() as tmpdir:
# Create some test files
for i in range(5):
Path(os.path.join(tmpdir, f"file_{i}.txt")).write_text(f"content {i}")

test_paths = [
os.path.join(tmpdir, f"file_{i}.txt") for i in range(5)
] + ["/nonexistent/file.txt"]

results = read_files_concurrently(test_paths)
for path, content in results.items():
print(f"{os.path.basename(path)}: {content!r}")
Show Solution
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Optional

def read_files_concurrently(paths: list[str]) -> dict[str, Optional[str]]:
results: dict[str, Optional[str]] = {}

def read_one(path: str) -> tuple[str, Optional[str]]:
try:
return path, Path(path).read_text()
except FileNotFoundError:
return path, None

with ThreadPoolExecutor(max_workers=min(len(paths), 10)) as executor:
futures = {executor.submit(read_one, p): p for p in paths}
for future in as_completed(futures):
path, content = future.result()
results[path] = content

return results

Key points:

  • min(len(paths), 10) - no point creating more workers than tasks
  • The inner read_one function returns a tuple so no shared state or locking is needed
  • FileNotFoundError is caught inside the task - future.result() will not raise it

Intermediate - Parallel Data Pipeline with Progress Tracking

Build a function parallel_pipeline(items, transform_fn, max_workers) that:

  1. Processes all items concurrently using ThreadPoolExecutor
  2. Tracks and prints progress as each item completes (e.g., 3/10 complete)
  3. Collects results in completion order (fast items first)
  4. Separates successes from failures without stopping on errors
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, Any
import time
import random

def parallel_pipeline(
items: list,
transform_fn: Callable,
max_workers: int = 4,
) -> tuple[list, list]:
"""Returns (successes, failures) where failures are (item, exception) tuples."""
# Your implementation here
pass

# Test:
def flaky_transform(x):
time.sleep(random.uniform(0.1, 0.5))
if x % 4 == 0:
raise ValueError(f"Cannot process {x}")
return x * x

successes, failures = parallel_pipeline(range(16), flaky_transform, max_workers=4)
print(f"Successes: {sorted(successes)}")
print(f"Failures: {[(item, str(err)) for item, err in failures]}")
Show Solution
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
import time
import random

def parallel_pipeline(
items: list,
transform_fn: Callable,
max_workers: int = 4,
) -> tuple[list, list]:
items = list(items)
total = len(items)
completed = 0
successes = []
failures = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_item = {
executor.submit(transform_fn, item): item
for item in items
}

for future in as_completed(future_to_item):
item = future_to_item[future]
completed += 1
print(f" {completed}/{total} complete", end="\r", flush=True)

try:
result = future.result()
successes.append(result)
except Exception as e:
failures.append((item, e))

print() # newline after progress line
return successes, failures

The {completed}/{total} complete line uses \r (carriage return) to overwrite itself on each update - giving a live progress display without flooding the terminal.

Advanced - Adaptive Concurrent Fetcher with Circuit Breaker

Implement a ResilientFetcher class that uses ThreadPoolExecutor to fetch URLs concurrently and includes a circuit breaker: after 3 consecutive failures for a domain, stop sending requests to that domain for 30 seconds.

import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
from urllib.request import urlopen
from urllib.error import URLError
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class CircuitBreaker:
"""Opens after `threshold` failures; auto-resets after `reset_timeout` seconds."""
threshold: int = 3
reset_timeout: float = 30.0
failure_count: int = 0
opened_at: Optional[float] = None
lock: threading.Lock = field(default_factory=threading.Lock)

def is_open(self) -> bool:
with self.lock:
if self.opened_at is None:
return False
if time.time() - self.opened_at > self.reset_timeout:
# Half-open: allow one request through
self.failure_count = 0
self.opened_at = None
return False
return True

def record_success(self):
with self.lock:
self.failure_count = 0
self.opened_at = None

def record_failure(self):
with self.lock:
self.failure_count += 1
if self.failure_count >= self.threshold:
self.opened_at = time.time()


class ResilientFetcher:
def __init__(self, max_workers: int = 10):
# Your implementation here
pass

def fetch_all(self, urls: list[str]) -> dict[str, Optional[str]]:
# Your implementation here
pass


# Test:
fetcher = ResilientFetcher(max_workers=5)
urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/500", # will fail
"https://httpbin.org/status/500",
"https://httpbin.org/status/500", # 3rd failure: circuit opens for httpbin.org
"https://httpbin.org/get", # circuit open: skipped immediately
]
results = fetcher.fetch_all(urls)
for url, data in results.items():
print(f"{url}: {'OK' if data else 'BLOCKED/FAILED'}")
Show Solution
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
from urllib.parse import urlparse
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class CircuitBreaker:
threshold: int = 3
reset_timeout: float = 30.0
failure_count: int = 0
opened_at: Optional[float] = None
lock: threading.Lock = field(default_factory=threading.Lock)

def is_open(self) -> bool:
with self.lock:
if self.opened_at is None:
return False
if time.time() - self.opened_at > self.reset_timeout:
self.failure_count = 0
self.opened_at = None
return False
return True

def record_success(self):
with self.lock:
self.failure_count = 0
self.opened_at = None

def record_failure(self):
with self.lock:
self.failure_count += 1
if self.failure_count >= self.threshold:
if self.opened_at is None:
self.opened_at = time.time()
print(f"Circuit opened after {self.failure_count} failures")


class ResilientFetcher:
def __init__(self, max_workers: int = 10):
self._max_workers = max_workers
self._breakers: dict[str, CircuitBreaker] = {}
self._lock = threading.Lock()

def _get_breaker(self, domain: str) -> CircuitBreaker:
with self._lock:
if domain not in self._breakers:
self._breakers[domain] = CircuitBreaker()
return self._breakers[domain]

def _fetch_one(self, url: str) -> Optional[str]:
domain = urlparse(url).netloc
breaker = self._get_breaker(domain)

if breaker.is_open():
print(f"Circuit OPEN for {domain} - skipping {url}")
return None

try:
req = Request(url, headers={"User-Agent": "ResilientFetcher/1.0"})
with urlopen(req, timeout=5.0) as r:
data = r.read().decode("utf-8")
breaker.record_success()
return data

except (HTTPError, URLError, OSError) as e:
breaker.record_failure()
print(f"Fetch failed: {url} - {e}")
return None

def fetch_all(self, urls: list[str]) -> dict[str, Optional[str]]:
results: dict[str, Optional[str]] = {}

with ThreadPoolExecutor(
max_workers=self._max_workers,
thread_name_prefix="ResilientFetcher"
) as executor:
future_to_url = {
executor.submit(self._fetch_one, url): url
for url in urls
}

for future in as_completed(future_to_url):
url = future_to_url[future]
try:
results[url] = future.result()
except Exception as e:
print(f"Unexpected error for {url}: {e}")
results[url] = None

return results

The circuit breaker is per-domain and uses its own Lock for thread safety. The ResilientFetcher uses a separate Lock to guard the _breakers dict itself (so two threads don't create a breaker for the same domain simultaneously). Each CircuitBreaker instance then manages its own state thread-safely.

Key Takeaways

  • concurrent.futures provides a unified executor API - swap ThreadPoolExecutor for ProcessPoolExecutor by changing one class name
  • Always use ThreadPoolExecutor as a context manager - __exit__ waits for all tasks and shuts down cleanly
  • submit(fn, *args) returns a Future immediately - call future.result() to block and get the value (or re-raise a stored exception)
  • as_completed(futures) yields futures in completion order - use it to process fast results without waiting for slow ones; this is the primary tool for low-latency result handling
  • executor.map(fn, items) preserves input order and raises exceptions on iteration - use it for uniform tasks where order matters and partial failure is unacceptable
  • Exceptions in submitted tasks are stored in the Future, not raised at submit() time - they are raised when you call future.result()
  • ProcessPoolExecutor has the same interface as ThreadPoolExecutor but runs in separate processes - use it for CPU-bound work; requires picklable arguments and return values
  • Thread pool sizing: I/O-bound → 5-10x CPU count; CPU-bound → exactly CPU count
  • loop.run_in_executor(pool, fn) or asyncio.to_thread(fn) integrates blocking functions into asyncio without blocking the event loop

What's Next

Lesson 08 - Async API Service

You now have all the concurrency primitives: threads, processes, asyncio, locks, and pools. The next lesson applies them together in a real production scenario - building a fully async API service with FastAPI that handles concurrent requests, talks to external services with connection pooling, and stays responsive under load.

© 2026 EngineersOfAI. All rights reserved.