Python ThreadPoolExecutor Practice Problems & Exercises
Practice: ThreadPoolExecutor and ProcessPoolExecutor
← Back to lessonUse ThreadPoolExecutor.map() to apply a function to a list in parallel, preserving order.
Solution:
from concurrent.futures import ThreadPoolExecutor
def parallel_double(numbers):
with ThreadPoolExecutor(max_workers=5) as executor:
return list(executor.map(slow_double, numbers))
from concurrent.futures import ThreadPoolExecutor
import time
def slow_double(n):
time.sleep(0.1)
return n * 2
# TODO: Use ThreadPoolExecutor.map() to double all numbers in parallel.
# Return the results as a list.
def parallel_double(numbers):
pass
result = parallel_double([1, 2, 3, 4, 5])
print(result)
Expected Output
[2, 4, 6, 8, 10]Hints
Hint 1: with ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map(fn, items))
Hint 2: executor.map preserves the order of results (unlike as_completed).
Hint 3: The context manager automatically shuts down the pool on exit.
Use executor.submit() to submit tasks and Future.result() to retrieve their outputs.
Solution:
from concurrent.futures import ThreadPoolExecutor
def parallel_compute():
tasks = [(2, 10), (3, 5), (5, 4), (10, 3)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(compute, b, p) for b, p in tasks]
return [f.result() for f in futures]
from concurrent.futures import ThreadPoolExecutor
import time
def compute(base, power):
time.sleep(0.05)
return base ** power
# TODO: Submit 4 computations using executor.submit().
# Collect the futures and retrieve results.
# Return results as a list.
def parallel_compute():
tasks = [(2, 10), (3, 5), (5, 4), (10, 3)]
pass
print(parallel_compute())
Expected Output
[1024, 243, 625, 1000]Hints
Hint 1: futures = [executor.submit(compute, b, p) for b, p in tasks]
Hint 2: future.result() blocks until the task completes and returns the value.
Hint 3: Results from submit() are NOT in order unless you retrieve them in submission order.
Handle exceptions raised in submitted tasks using Future.exception() and Future.result().
Solution:
from concurrent.futures import ThreadPoolExecutor
def run_with_error_handling():
successes = []
errors = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(risky_task, n): n for n in range(1, 7)}
for future in futures:
exc = future.exception()
if exc:
errors.append(str(exc))
else:
successes.append(future.result())
return successes, errors
from concurrent.futures import ThreadPoolExecutor
def risky_task(n):
if n % 3 == 0:
raise ValueError(f'Bad value: {n}')
return n * 10
# TODO: Submit tasks for n in 1..6.
# Separate successes from failures.
# Return (successes, errors) as two lists.
def run_with_error_handling():
pass
successes, errors = run_with_error_handling()
print('Successes:', sorted(successes))
print('Errors:', sorted(errors))
Expected Output
Successes: [10, 20, 40, 50]
Errors: ['Bad value: 3', 'Bad value: 6']Hints
Hint 1: future.exception() returns the exception if one was raised, or None on success.
Hint 2: future.result() raises the exception — wrap in try/except.
Hint 3: Check future.exception() to determine success/failure before calling result().
Attach completion callbacks to futures using add_done_callback for fire-and-forget notification.
Solution:
from concurrent.futures import ThreadPoolExecutor
def test_callbacks():
completed.clear()
with ThreadPoolExecutor(max_workers=5) as executor:
for n in range(1, 6):
f = executor.submit(work, n)
f.add_done_callback(on_done)
return sorted(completed)
from concurrent.futures import ThreadPoolExecutor
import time
import threading
completed = []
lock = threading.Lock()
def work(n):
time.sleep(0.05)
return n * n
def on_done(future):
with lock:
completed.append(future.result())
# TODO: Submit 5 tasks and attach on_done as a callback using add_done_callback.
# Wait for all tasks, then print sorted completed results.
def test_callbacks():
completed.clear()
with ThreadPoolExecutor(max_workers=5) as executor:
for n in range(1, 6):
f = executor.submit(work, n)
f.add_done_callback(on_done)
return sorted(completed)
print(test_callbacks())
Expected Output
[1, 4, 9, 16, 25]Hints
Hint 1: future.add_done_callback(fn) calls fn(future) when the task completes.
Hint 2: The callback runs in the thread that completed the work (or the calling thread if already done).
Hint 3: The executor context manager waits for all pending futures before exiting.
Use as_completed to stream results from a pool as tasks finish, regardless of submission order.
Solution:
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time
def process_as_completed():
items = list(range(1, 9))
order = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(variable_work, i, i): i for i in items}
for future in as_completed(futures):
item, square = future.result()
order.append(item)
return order
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def variable_work(item, seed):
rng = random.Random(seed)
time.sleep(rng.uniform(0.05, 0.25))
return item, item ** 2
# TODO: Submit 8 tasks using executor.submit.
# Use as_completed to process results as they arrive (fastest first).
# Return the items in completion order.
def process_as_completed():
items = list(range(1, 9))
order = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(variable_work, i, i): i for i in items}
for future in as_completed(futures):
item, square = future.result()
order.append(item)
return order
order = process_as_completed()
print(f'Completion order: {order}')
print(f'All 8 processed: {sorted(order) == list(range(1, 9))}')
Expected Output
Completion order: [...] (fastest first)
All 8 processed: TrueHints
Hint 1: as_completed(futures_dict) yields futures as they complete, not in submission order.
Hint 2: Use futures = {executor.submit(fn, arg): arg for arg in items} to track which arg produced which future.
Hint 3: futures_dict[future] gives the original argument for a completed future.
Use wait(FIRST_COMPLETED) to take the first available result and cancel the remaining tasks.
Solution:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
def first_result_wins():
searches = [
(['a', 'b', 'c', 'x', 'y'], 'x', 0.2),
(['p', 'q', 'r'], 'r', 0.1),
(['m', 'n', 'o', 'x'], 'z', 0.3),
(['d', 'e', 'f'], 'f', 0.4),
]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(slow_search, h, n, d) for h, n, d in searches]
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
first = list(done)[0].result()
cancelled = sum(1 for f in not_done if f.cancel())
return first, cancelled
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time
def slow_search(haystack, needle, delay):
time.sleep(delay)
return needle in haystack, haystack[:20]
# TODO: Submit 4 search tasks with different datasets and delays.
# Use wait(FIRST_COMPLETED) to get the first result.
# Cancel remaining tasks.
# Return (first_result, cancelled_count).
def first_result_wins():
searches = [
(['a', 'b', 'c', 'x', 'y'], 'x', 0.2),
(['p', 'q', 'r'], 'r', 0.1), # fastest
(['m', 'n', 'o', 'x'], 'z', 0.3),
(['d', 'e', 'f'], 'f', 0.4),
]
pass
first, cancelled = first_result_wins()
print(f'First result: {first}')
print(f'Cancelled: {cancelled}')
Expected Output
First result: (True, ['p', 'q', 'r'])
Cancelled: XHints
Hint 1: done, not_done = wait(futures, return_when=FIRST_COMPLETED)
Hint 2: For each future in not_done: future.cancel() attempts cancellation.
Hint 3: list(done)[0].result() gives the first completed result.
Compare ThreadPoolExecutor vs ProcessPoolExecutor for CPU-bound work and observe the GIL's impact.
Solution:
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def compare_executors():
numbers = [500, 1000, 1500, 2000]
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as ex:
thread_results = list(ex.map(cpu_work, numbers))
t_ms = (time.perf_counter() - t0) * 1000
t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
process_results = list(ex.map(cpu_work, numbers))
p_ms = (time.perf_counter() - t0) * 1000
return t_ms, p_ms, thread_results == process_results
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import math
def cpu_work(n):
# CPU-intensive: compute sum of primes up to n
primes = [i for i in range(2, n) if all(i % j != 0 for j in range(2, int(math.sqrt(i)) + 1))]
return len(primes)
# TODO: Compare ThreadPoolExecutor vs ProcessPoolExecutor for this CPU-bound task.
# Run cpu_work for [500, 1000, 1500, 2000] using both.
# Return (thread_time_ms, process_time_ms, results_match).
def compare_executors():
numbers = [500, 1000, 1500, 2000]
pass
if __name__ == '__main__':
t_ms, p_ms, match = compare_executors()
print(f'Thread pool: {t_ms:.0f}ms')
print(f'Process pool: {p_ms:.0f}ms')
print(f'Results match: {match}')
Expected Output
Thread pool: X.Xms
Process pool: X.Xms
Results match: TrueHints
Hint 1: ProcessPoolExecutor bypasses the GIL — true parallelism for CPU-bound work.
Hint 2: Must be inside if __name__ == '__main__': for multiprocessing to work.
Hint 3: time.perf_counter() before and after the pool.map() call measures wall time.
Build a two-stage parallel pipeline where stage 2 receives the output of stage 1.
Solution:
from concurrent.futures import ThreadPoolExecutor
def run_pipeline(items):
with ThreadPoolExecutor(max_workers=5) as executor:
stage1_futures = [executor.submit(stage1_fetch, i) for i in items]
stage1_results = [f.result() for f in stage1_futures]
stage2_futures = [executor.submit(stage2_process, r) for r in stage1_results]
return [f.result() for f in stage2_futures]
from concurrent.futures import ThreadPoolExecutor
import time
# Build a two-stage pipeline where stage 2 depends on stage 1 results.
# Stage 1: fetch data (simulate I/O)
# Stage 2: process data (CPU work)
def stage1_fetch(item):
time.sleep(0.05)
return item * 2
def stage2_process(fetched):
time.sleep(0.03)
return fetched ** 2
# TODO: For each item in 1..5:
# Submit stage1_fetch, then when it completes, submit stage2_process.
# Run all stage1 concurrently, then all stage2 concurrently.
# Return final results sorted.
def run_pipeline(items):
pass
results = run_pipeline(list(range(1, 6)))
print(sorted(results))
Expected Output
[4, 16, 36, 64, 100]Hints
Hint 1: First run all stage1 tasks: stage1_futures = [executor.submit(stage1_fetch, i) for i in items]
Hint 2: Collect stage1 results: s1_results = [f.result() for f in stage1_futures]
Hint 3: Then run all stage2 tasks on the stage1 outputs.
Use executor.map(timeout=...) to collect partial results and count how many tasks exceed the deadline.
Solution:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import random
def run_with_timeout():
tasks = list(range(10))
results_got = 0
timed_out = 0
with ThreadPoolExecutor(max_workers=10) as executor:
gen = executor.map(variable_task, tasks, tasks, timeout=0.2)
while True:
try:
result = next(gen)
results_got += 1
except StopIteration:
break
except TimeoutError:
timed_out += 1
break
# Remaining tasks count as timed out
timed_out += 10 - results_got - timed_out
return results_got, timed_out
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
import random
def variable_task(n, seed):
rng = random.Random(seed)
delay = rng.uniform(0.05, 0.5)
time.sleep(delay)
return n, delay
# TODO: Run 10 tasks with executor.map(timeout=0.2).
# Collect as many results as complete within the timeout.
# Catch TimeoutError for tasks that don't finish in time.
# Return (results_got, timed_out_count).
def run_with_timeout():
tasks = list(range(10))
pass
got, timed_out = run_with_timeout()
print(f'Got: {got}, Timed out: {timed_out}')
print(f'Total: {got + timed_out}')
Expected Output
Got: X, Timed out: X
Total: 10Hints
Hint 1: executor.map(fn, items, timeout=0.2) raises TimeoutError when iterating past the deadline.
Hint 2: Wrap the iteration in try/except TimeoutError.
Hint 3: Count results collected before the timeout and tasks that raised TimeoutError.
Demonstrate graceful executor shutdown using shutdown(wait=True) to ensure in-flight tasks complete.
Solution:
def test_graceful_shutdown():
submitted.clear()
completed.clear()
executor = ThreadPoolExecutor(max_workers=4)
futures = []
for i in range(10):
f = executor.submit(slow_task, i, 0.05 * (i % 4 + 1))
with lock:
submitted.append(i)
futures.append(f)
executor.shutdown(wait=True)
return len(submitted), len(completed)
from concurrent.futures import ThreadPoolExecutor
import time
import threading
# shutdown(wait=True) blocks until all futures complete.
# shutdown(wait=False) returns immediately (tasks may still run).
# shutdown(cancel_futures=True) (Python 3.9+) cancels pending tasks.
submitted = []
completed = []
lock = threading.Lock()
def slow_task(task_id, duration):
time.sleep(duration)
with lock:
completed.append(task_id)
return task_id
# TODO: Create an executor, submit 10 slow tasks (0.1-0.5s each).
# After submitting, call shutdown(wait=True).
# Verify all submitted tasks that were running completed.
# Return (submitted_count, completed_count).
def test_graceful_shutdown():
submitted.clear()
completed.clear()
executor = ThreadPoolExecutor(max_workers=4)
# Submit tasks
futures = []
for i in range(10):
f = executor.submit(slow_task, i, 0.05 * (i % 4 + 1))
with lock:
submitted.append(i)
futures.append(f)
# TODO: Shutdown gracefully
pass
s, c = test_graceful_shutdown()
print(f'Submitted: {s}, Completed: {c}')
print(f'All completed: {s == c}')
Expected Output
Submitted: 10, Completed: 10
All completed: TrueHints
Hint 1: executor.shutdown(wait=True) waits for ALL running futures to finish.
Hint 2: After shutdown(wait=True), all futures that were running will have completed.
Hint 3: Futures that were PENDING (not yet started) may be cancelled with cancel_futures=True.
Build a RetryExecutor wrapper around ThreadPoolExecutor that automatically retries failing tasks with exponential backoff.
Solution:
The RetryExecutor and _retry_wrapper implementation above is the full solution. The key is that _retry_wrapper catches exceptions, sleeps with exponential backoff, and re-raises after exhausting retries.
# Key implementation detail:
# _retry_wrapper runs in the thread pool — it handles the retry logic
# transparently. The caller just awaits future.result() and gets either
# a successful value or the final exception after all retries.
from concurrent.futures import ThreadPoolExecutor, Future
import threading
import time
import random
# Build a RetryExecutor that wraps ThreadPoolExecutor.
# Each submitted task retries up to max_retries times on exception.
# Uses exponential backoff between retries.
class RetryExecutor:
def __init__(self, max_workers=4, max_retries=3, base_delay=0.01):
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self.max_retries = max_retries
self.base_delay = base_delay
def submit(self, fn, *args, **kwargs):
return self._executor.submit(self._retry_wrapper, fn, args, kwargs)
def _retry_wrapper(self, fn, args, kwargs):
last_exc = None
for attempt in range(self.max_retries + 1):
try:
return fn(*args, **kwargs)
except Exception as e:
last_exc = e
if attempt < self.max_retries:
time.sleep(self.base_delay * (2 ** attempt))
raise last_exc
def shutdown(self, wait=True):
self._executor.shutdown(wait=wait)
def __enter__(self):
return self
def __exit__(self, *args):
self.shutdown()
# Test: flaky function that fails on first 2 attempts
attempt_counters = {}
lock = threading.Lock()
def flaky(task_id):
with lock:
attempt_counters[task_id] = attempt_counters.get(task_id, 0) + 1
attempt = attempt_counters[task_id]
if attempt < 3:
raise RuntimeError(f'Task {task_id} attempt {attempt} failed')
return task_id * 10
if __name__ == '__main__':
with RetryExecutor(max_workers=4, max_retries=3) as executor:
futures = [executor.submit(flaky, i) for i in range(5)]
results = [f.result() for f in futures]
print(f'Results: {sorted(results)}')
print(f'All succeeded after retries: {sorted(results) == [0, 10, 20, 30, 40]}')
Expected Output
Results: [0, 10, 20, 30, 40]
All succeeded after retries: [0, 10, 20, 30, 40]Hints
Hint 1: _retry_wrapper loops up to max_retries+1 times, catching exceptions and sleeping between attempts.
Hint 2: time.sleep(base_delay * 2**attempt) provides exponential backoff.
Hint 3: After exhausting retries, re-raise the last exception.
