Skip to main content

Python ThreadPoolExecutor Practice Problems & Exercises

Practice: ThreadPoolExecutor and ProcessPoolExecutor

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1executor.map — Parallel ProcessingEasy
ThreadPoolExecutormapconcurrent

Use ThreadPoolExecutor.map() to apply a function to a list in parallel, preserving order.

Solution:

from concurrent.futures import ThreadPoolExecutor

def parallel_double(numbers):
with ThreadPoolExecutor(max_workers=5) as executor:
return list(executor.map(slow_double, numbers))
from concurrent.futures import ThreadPoolExecutor
import time

def slow_double(n):
  time.sleep(0.1)
  return n * 2

# TODO: Use ThreadPoolExecutor.map() to double all numbers in parallel.
# Return the results as a list.

def parallel_double(numbers):
  pass

result = parallel_double([1, 2, 3, 4, 5])
print(result)
Expected Output
[2, 4, 6, 8, 10]
Hints

Hint 1: with ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map(fn, items))

Hint 2: executor.map preserves the order of results (unlike as_completed).

Hint 3: The context manager automatically shuts down the pool on exit.


#2submit and Future.result()Easy
submitFutureresult

Use executor.submit() to submit tasks and Future.result() to retrieve their outputs.

Solution:

from concurrent.futures import ThreadPoolExecutor

def parallel_compute():
tasks = [(2, 10), (3, 5), (5, 4), (10, 3)]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(compute, b, p) for b, p in tasks]
return [f.result() for f in futures]
from concurrent.futures import ThreadPoolExecutor
import time

def compute(base, power):
  time.sleep(0.05)
  return base ** power

# TODO: Submit 4 computations using executor.submit().
# Collect the futures and retrieve results.
# Return results as a list.

def parallel_compute():
  tasks = [(2, 10), (3, 5), (5, 4), (10, 3)]
  pass

print(parallel_compute())
Expected Output
[1024, 243, 625, 1000]
Hints

Hint 1: futures = [executor.submit(compute, b, p) for b, p in tasks]

Hint 2: future.result() blocks until the task completes and returns the value.

Hint 3: Results from submit() are NOT in order unless you retrieve them in submission order.


#3Exception Handling in FuturesEasy
Futureexceptionresult()exception()

Handle exceptions raised in submitted tasks using Future.exception() and Future.result().

Solution:

from concurrent.futures import ThreadPoolExecutor

def run_with_error_handling():
successes = []
errors = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(risky_task, n): n for n in range(1, 7)}
for future in futures:
exc = future.exception()
if exc:
errors.append(str(exc))
else:
successes.append(future.result())
return successes, errors
from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
  if n % 3 == 0:
      raise ValueError(f'Bad value: {n}')
  return n * 10

# TODO: Submit tasks for n in 1..6.
# Separate successes from failures.
# Return (successes, errors) as two lists.

def run_with_error_handling():
  pass

successes, errors = run_with_error_handling()
print('Successes:', sorted(successes))
print('Errors:', sorted(errors))
Expected Output
Successes: [10, 20, 40, 50]
Errors: ['Bad value: 3', 'Bad value: 6']
Hints

Hint 1: future.exception() returns the exception if one was raised, or None on success.

Hint 2: future.result() raises the exception — wrap in try/except.

Hint 3: Check future.exception() to determine success/failure before calling result().


#4Future Callbacks — add_done_callbackEasy
add_done_callbackcallbackFuture

Attach completion callbacks to futures using add_done_callback for fire-and-forget notification.

Solution:

from concurrent.futures import ThreadPoolExecutor

def test_callbacks():
completed.clear()
with ThreadPoolExecutor(max_workers=5) as executor:
for n in range(1, 6):
f = executor.submit(work, n)
f.add_done_callback(on_done)
return sorted(completed)
from concurrent.futures import ThreadPoolExecutor
import time
import threading

completed = []
lock = threading.Lock()

def work(n):
  time.sleep(0.05)
  return n * n

def on_done(future):
  with lock:
      completed.append(future.result())

# TODO: Submit 5 tasks and attach on_done as a callback using add_done_callback.
# Wait for all tasks, then print sorted completed results.

def test_callbacks():
  completed.clear()
  with ThreadPoolExecutor(max_workers=5) as executor:
      for n in range(1, 6):
          f = executor.submit(work, n)
          f.add_done_callback(on_done)
  return sorted(completed)

print(test_callbacks())
Expected Output
[1, 4, 9, 16, 25]
Hints

Hint 1: future.add_done_callback(fn) calls fn(future) when the task completes.

Hint 2: The callback runs in the thread that completed the work (or the calling thread if already done).

Hint 3: The executor context manager waits for all pending futures before exiting.


#5as_completed — First Results FirstMedium
as_completedfirst resultstreaming

Use as_completed to stream results from a pool as tasks finish, regardless of submission order.

Solution:

from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import time

def process_as_completed():
items = list(range(1, 9))
order = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(variable_work, i, i): i for i in items}
for future in as_completed(futures):
item, square = future.result()
order.append(item)
return order
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def variable_work(item, seed):
  rng = random.Random(seed)
  time.sleep(rng.uniform(0.05, 0.25))
  return item, item ** 2

# TODO: Submit 8 tasks using executor.submit.
# Use as_completed to process results as they arrive (fastest first).
# Return the items in completion order.

def process_as_completed():
  items = list(range(1, 9))
  order = []
  with ThreadPoolExecutor(max_workers=8) as executor:
      futures = {executor.submit(variable_work, i, i): i for i in items}
      for future in as_completed(futures):
          item, square = future.result()
          order.append(item)
  return order

order = process_as_completed()
print(f'Completion order: {order}')
print(f'All 8 processed: {sorted(order) == list(range(1, 9))}')
Expected Output
Completion order: [...] (fastest first)
All 8 processed: True
Hints

Hint 1: as_completed(futures_dict) yields futures as they complete, not in submission order.

Hint 2: Use futures = {executor.submit(fn, arg): arg for arg in items} to track which arg produced which future.

Hint 3: futures_dict[future] gives the original argument for a completed future.


#6wait() — Wait for First CompletionMedium
waitFIRST_COMPLETEDcancelfutures

Use wait(FIRST_COMPLETED) to take the first available result and cancel the remaining tasks.

Solution:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

def first_result_wins():
searches = [
(['a', 'b', 'c', 'x', 'y'], 'x', 0.2),
(['p', 'q', 'r'], 'r', 0.1),
(['m', 'n', 'o', 'x'], 'z', 0.3),
(['d', 'e', 'f'], 'f', 0.4),
]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(slow_search, h, n, d) for h, n, d in searches]
done, not_done = wait(futures, return_when=FIRST_COMPLETED)

first = list(done)[0].result()
cancelled = sum(1 for f in not_done if f.cancel())
return first, cancelled
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time

def slow_search(haystack, needle, delay):
  time.sleep(delay)
  return needle in haystack, haystack[:20]

# TODO: Submit 4 search tasks with different datasets and delays.
# Use wait(FIRST_COMPLETED) to get the first result.
# Cancel remaining tasks.
# Return (first_result, cancelled_count).

def first_result_wins():
  searches = [
      (['a', 'b', 'c', 'x', 'y'], 'x', 0.2),
      (['p', 'q', 'r'], 'r', 0.1),   # fastest
      (['m', 'n', 'o', 'x'], 'z', 0.3),
      (['d', 'e', 'f'], 'f', 0.4),
  ]
  pass

first, cancelled = first_result_wins()
print(f'First result: {first}')
print(f'Cancelled: {cancelled}')
Expected Output
First result: (True, ['p', 'q', 'r'])
Cancelled: X
Hints

Hint 1: done, not_done = wait(futures, return_when=FIRST_COMPLETED)

Hint 2: For each future in not_done: future.cancel() attempts cancellation.

Hint 3: list(done)[0].result() gives the first completed result.


#7ProcessPoolExecutor — CPU-Bound WorkMedium
ProcessPoolExecutorCPU-boundmap

Compare ThreadPoolExecutor vs ProcessPoolExecutor for CPU-bound work and observe the GIL's impact.

Solution:

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def compare_executors():
numbers = [500, 1000, 1500, 2000]

t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=4) as ex:
thread_results = list(ex.map(cpu_work, numbers))
t_ms = (time.perf_counter() - t0) * 1000

t0 = time.perf_counter()
with ProcessPoolExecutor(max_workers=4) as ex:
process_results = list(ex.map(cpu_work, numbers))
p_ms = (time.perf_counter() - t0) * 1000

return t_ms, p_ms, thread_results == process_results
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import math

def cpu_work(n):
  # CPU-intensive: compute sum of primes up to n
  primes = [i for i in range(2, n) if all(i % j != 0 for j in range(2, int(math.sqrt(i)) + 1))]
  return len(primes)

# TODO: Compare ThreadPoolExecutor vs ProcessPoolExecutor for this CPU-bound task.
# Run cpu_work for [500, 1000, 1500, 2000] using both.
# Return (thread_time_ms, process_time_ms, results_match).

def compare_executors():
  numbers = [500, 1000, 1500, 2000]
  pass

if __name__ == '__main__':
  t_ms, p_ms, match = compare_executors()
  print(f'Thread pool: {t_ms:.0f}ms')
  print(f'Process pool: {p_ms:.0f}ms')
  print(f'Results match: {match}')
Expected Output
Thread pool: X.Xms
Process pool: X.Xms
Results match: True
Hints

Hint 1: ProcessPoolExecutor bypasses the GIL — true parallelism for CPU-bound work.

Hint 2: Must be inside if __name__ == '__main__': for multiprocessing to work.

Hint 3: time.perf_counter() before and after the pool.map() call measures wall time.


#8Chained Futures PipelineMedium
pipelinechained futuressubmit chain

Build a two-stage parallel pipeline where stage 2 receives the output of stage 1.

Solution:

from concurrent.futures import ThreadPoolExecutor

def run_pipeline(items):
with ThreadPoolExecutor(max_workers=5) as executor:
stage1_futures = [executor.submit(stage1_fetch, i) for i in items]
stage1_results = [f.result() for f in stage1_futures]

stage2_futures = [executor.submit(stage2_process, r) for r in stage1_results]
return [f.result() for f in stage2_futures]
from concurrent.futures import ThreadPoolExecutor
import time

# Build a two-stage pipeline where stage 2 depends on stage 1 results.
# Stage 1: fetch data (simulate I/O)
# Stage 2: process data (CPU work)

def stage1_fetch(item):
  time.sleep(0.05)
  return item * 2

def stage2_process(fetched):
  time.sleep(0.03)
  return fetched ** 2

# TODO: For each item in 1..5:
# Submit stage1_fetch, then when it completes, submit stage2_process.
# Run all stage1 concurrently, then all stage2 concurrently.
# Return final results sorted.

def run_pipeline(items):
  pass

results = run_pipeline(list(range(1, 6)))
print(sorted(results))
Expected Output
[4, 16, 36, 64, 100]
Hints

Hint 1: First run all stage1 tasks: stage1_futures = [executor.submit(stage1_fetch, i) for i in items]

Hint 2: Collect stage1 results: s1_results = [f.result() for f in stage1_futures]

Hint 3: Then run all stage2 tasks on the stage1 outputs.


#9Timeout on map()Hard
map timeoutTimeoutErrorpartial results

Use executor.map(timeout=...) to collect partial results and count how many tasks exceed the deadline.

Solution:

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import random

def run_with_timeout():
tasks = list(range(10))
results_got = 0
timed_out = 0
with ThreadPoolExecutor(max_workers=10) as executor:
gen = executor.map(variable_task, tasks, tasks, timeout=0.2)
while True:
try:
result = next(gen)
results_got += 1
except StopIteration:
break
except TimeoutError:
timed_out += 1
break
# Remaining tasks count as timed out
timed_out += 10 - results_got - timed_out
return results_got, timed_out
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
import random

def variable_task(n, seed):
  rng = random.Random(seed)
  delay = rng.uniform(0.05, 0.5)
  time.sleep(delay)
  return n, delay

# TODO: Run 10 tasks with executor.map(timeout=0.2).
# Collect as many results as complete within the timeout.
# Catch TimeoutError for tasks that don't finish in time.
# Return (results_got, timed_out_count).

def run_with_timeout():
  tasks = list(range(10))
  pass

got, timed_out = run_with_timeout()
print(f'Got: {got}, Timed out: {timed_out}')
print(f'Total: {got + timed_out}')
Expected Output
Got: X, Timed out: X
Total: 10
Hints

Hint 1: executor.map(fn, items, timeout=0.2) raises TimeoutError when iterating past the deadline.

Hint 2: Wrap the iteration in try/except TimeoutError.

Hint 3: Count results collected before the timeout and tasks that raised TimeoutError.


#10Executor Context Manager — Graceful ShutdownHard
shutdownwaitcancel_futuresgraceful shutdown

Demonstrate graceful executor shutdown using shutdown(wait=True) to ensure in-flight tasks complete.

Solution:

def test_graceful_shutdown():
submitted.clear()
completed.clear()
executor = ThreadPoolExecutor(max_workers=4)

futures = []
for i in range(10):
f = executor.submit(slow_task, i, 0.05 * (i % 4 + 1))
with lock:
submitted.append(i)
futures.append(f)

executor.shutdown(wait=True)
return len(submitted), len(completed)
from concurrent.futures import ThreadPoolExecutor
import time
import threading

# shutdown(wait=True) blocks until all futures complete.
# shutdown(wait=False) returns immediately (tasks may still run).
# shutdown(cancel_futures=True) (Python 3.9+) cancels pending tasks.

submitted = []
completed = []
lock = threading.Lock()

def slow_task(task_id, duration):
  time.sleep(duration)
  with lock:
      completed.append(task_id)
  return task_id

# TODO: Create an executor, submit 10 slow tasks (0.1-0.5s each).
# After submitting, call shutdown(wait=True).
# Verify all submitted tasks that were running completed.
# Return (submitted_count, completed_count).

def test_graceful_shutdown():
  submitted.clear()
  completed.clear()
  executor = ThreadPoolExecutor(max_workers=4)

  # Submit tasks
  futures = []
  for i in range(10):
      f = executor.submit(slow_task, i, 0.05 * (i % 4 + 1))
      with lock:
          submitted.append(i)
      futures.append(f)

  # TODO: Shutdown gracefully
  pass

s, c = test_graceful_shutdown()
print(f'Submitted: {s}, Completed: {c}')
print(f'All completed: {s == c}')
Expected Output
Submitted: 10, Completed: 10
All completed: True
Hints

Hint 1: executor.shutdown(wait=True) waits for ALL running futures to finish.

Hint 2: After shutdown(wait=True), all futures that were running will have completed.

Hint 3: Futures that were PENDING (not yet started) may be cancelled with cancel_futures=True.


#11Retry Executor — Resilient Task RunnerHard
retryexecutorresiliencewrapper

Build a RetryExecutor wrapper around ThreadPoolExecutor that automatically retries failing tasks with exponential backoff.

Solution:

The RetryExecutor and _retry_wrapper implementation above is the full solution. The key is that _retry_wrapper catches exceptions, sleeps with exponential backoff, and re-raises after exhausting retries.

# Key implementation detail:
# _retry_wrapper runs in the thread pool — it handles the retry logic
# transparently. The caller just awaits future.result() and gets either
# a successful value or the final exception after all retries.
from concurrent.futures import ThreadPoolExecutor, Future
import threading
import time
import random

# Build a RetryExecutor that wraps ThreadPoolExecutor.
# Each submitted task retries up to max_retries times on exception.
# Uses exponential backoff between retries.

class RetryExecutor:
  def __init__(self, max_workers=4, max_retries=3, base_delay=0.01):
      self._executor = ThreadPoolExecutor(max_workers=max_workers)
      self.max_retries = max_retries
      self.base_delay = base_delay

  def submit(self, fn, *args, **kwargs):
      return self._executor.submit(self._retry_wrapper, fn, args, kwargs)

  def _retry_wrapper(self, fn, args, kwargs):
      last_exc = None
      for attempt in range(self.max_retries + 1):
          try:
              return fn(*args, **kwargs)
          except Exception as e:
              last_exc = e
              if attempt < self.max_retries:
                  time.sleep(self.base_delay * (2 ** attempt))
      raise last_exc

  def shutdown(self, wait=True):
      self._executor.shutdown(wait=wait)

  def __enter__(self):
      return self

  def __exit__(self, *args):
      self.shutdown()

# Test: flaky function that fails on first 2 attempts
attempt_counters = {}
lock = threading.Lock()

def flaky(task_id):
  with lock:
      attempt_counters[task_id] = attempt_counters.get(task_id, 0) + 1
      attempt = attempt_counters[task_id]
  if attempt < 3:
      raise RuntimeError(f'Task {task_id} attempt {attempt} failed')
  return task_id * 10

if __name__ == '__main__':
  with RetryExecutor(max_workers=4, max_retries=3) as executor:
      futures = [executor.submit(flaky, i) for i in range(5)]
      results = [f.result() for f in futures]

  print(f'Results: {sorted(results)}')
  print(f'All succeeded after retries: {sorted(results) == [0, 10, 20, 30, 40]}')
Expected Output
Results: [0, 10, 20, 30, 40]
All succeeded after retries: [0, 10, 20, 30, 40]
Hints

Hint 1: _retry_wrapper loops up to max_retries+1 times, catching exceptions and sleeping between attempts.

Hint 2: time.sleep(base_delay * 2**attempt) provides exponential backoff.

Hint 3: After exhausting retries, re-raise the last exception.

© 2026 EngineersOfAI. All rights reserved.