Skip to main content

Python Threading in Python Practice Problems & Exercises

Practice: Threading in Python

11 problems4 Easy4 Medium3 Hard50–70 min
← Back to lesson

#1Create and Start a ThreadEasy
Threadstartjointarget

Create a thread, pass it arguments via args, start it, and wait for completion with join.

Solution:

import threading
import time

def greet(name):
time.sleep(0.1)
print(f'Hello from thread {name}')

t = threading.Thread(target=greet, args=('Worker',))
t.start()
t.join()
print('Main done')
import threading
import time

# TODO: Create a thread that prints "Hello from thread {name}" after sleeping 0.1s.
# Start it and wait for it to finish before printing "Main done".

def greet(name):
  pass

t = threading.Thread(target=greet, args=('Worker',))
# Start and join the thread
# ...
print('Main done')
Expected Output
Hello from thread Worker
Main done
Hints

Hint 1: threading.Thread(target=fn, args=(arg,)) creates a thread.

Hint 2: t.start() launches the thread; t.join() blocks until it finishes.

Hint 3: Without t.join(), 'Main done' might print before the thread runs.


#2Spawn Multiple Threads and Join AllEasy
multiple threadsjoinlist

Spawn multiple threads, collect results, and join all before printing.

Solution:

import threading
import time

results = []
lock = threading.Lock()

def compute(n, delay=0.05):
time.sleep(delay)
with lock:
results.append(n * n)

threads = [threading.Thread(target=compute, args=(n,)) for n in range(1, 6)]
for t in threads:
t.start()
for t in threads:
t.join()

print(sorted(results))
import threading
import time

results = []

def compute(n, delay=0.05):
  time.sleep(delay)
  results.append(n * n)

# TODO: Spawn 5 threads that compute squares of 1..5.
# Wait for ALL threads to finish, then print sorted results.
Expected Output
[1, 4, 9, 16, 25]
Hints

Hint 1: Create a list of threads: threads = [threading.Thread(...) for n in range(1,6)]

Hint 2: Start all threads: for t in threads: t.start()

Hint 3: Then join all: for t in threads: t.join()


#3Daemon Thread — Background HeartbeatEasy
daemondaemon threadbackground

Start a heartbeat thread as a daemon so it automatically terminates when the main program ends.

Solution:

import threading
import time

beat_count = [0]

def heartbeat(interval=0.1):
while True:
beat_count[0] += 1
time.sleep(interval)

t = threading.Thread(target=heartbeat, kwargs={'interval': 0.1}, daemon=True)
t.start()
time.sleep(0.35)
print(f'Beats: {beat_count[0]}')
import threading
import time

beat_count = [0]

def heartbeat(interval=0.1):
  while True:
      beat_count[0] += 1
      time.sleep(interval)

# TODO: Start the heartbeat as a DAEMON thread.
# Let the main thread sleep for 0.35 seconds.
# After waking, print the beat count.
# The daemon should stop automatically when main exits.
Expected Output
Beats: 3
Hints

Hint 1: Set t.daemon = True before t.start(), or pass daemon=True to Thread().

Hint 2: A daemon thread is killed automatically when all non-daemon threads finish.

Hint 3: After 0.35s sleep, about 3 beats should have occurred (0.1s interval).


#4Thread with Return Value via ListEasy
return valueshared listThread

Collect return values from threads by writing results into a pre-allocated shared list.

Solution:

import threading
import time

def slow_square(n, results, index):
time.sleep(0.1)
results[index] = n * n

def parallel_sum(values):
results = [None] * len(values)
threads = [
threading.Thread(target=slow_square, args=(v, results, i))
for i, v in enumerate(values)
]
for t in threads:
t.start()
for t in threads:
t.join()
return sum(results)

print(parallel_sum([3, 4, 5]))
import threading
import time

# Python threads don't have a return value.
# TODO: Run two computation tasks concurrently using threads,
# collecting results into a shared list.
# Return the sum of the results.

def slow_square(n, results, index):
  time.sleep(0.1)
  # Store result at index
  pass

def parallel_sum(values):
  # Run each slow_square in a thread, then return sum of results
  pass

print(parallel_sum([3, 4, 5]))  # 9 + 16 + 25 = 50
Expected Output
50
Hints

Hint 1: Pre-allocate a results list: results = [None] * len(values)

Hint 2: Pass the list and an index to each thread function.

Hint 3: After joining all threads, sum(results) gives the total.


#5Thread-Safe CounterMedium
Lockthread-safecounterrace condition

Implement a thread-safe counter using a Lock to prevent race conditions on concurrent increments.

Solution:

import threading

class ThreadSafeCounter:
def __init__(self):
self.count = 0
self._lock = threading.Lock()

def increment(self):
with self._lock:
self.count += 1

def value(self):
return self.count
import threading

# Without a lock, concurrent increments cause data races.
# TODO: Implement ThreadSafeCounter using a Lock.
# 10 threads each increment 1000 times.
# Final count must always be exactly 10000.

class ThreadSafeCounter:
  def __init__(self):
      self.count = 0

  def increment(self):
      pass

  def value(self):
      return self.count

def test_thread_safe_counter():
  counter = ThreadSafeCounter()
  threads = [
      threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
      for _ in range(10)
  ]
  for t in threads:
      t.start()
  for t in threads:
      t.join()
  return counter.value()

print(test_thread_safe_counter())  # Must be exactly 10000
Expected Output
10000
Hints

Hint 1: Add self._lock = threading.Lock() in __init__.

Hint 2: In increment: with self._lock: self.count += 1

Hint 3: Without the lock, multiple threads read the same value before any writes it back.


#6Producer-Consumer with QueueMedium
Queueproducerconsumersentinel

Build a thread-safe producer-consumer pipeline using queue.Queue and a sentinel value to signal completion.

Solution:

import threading
import queue
import time

def producer(q, items):
for item in items:
time.sleep(0.02)
q.put(item)
q.put(None) # sentinel

def consumer(q, results):
while True:
item = q.get()
if item is None:
break
results.append(item * 2)

def run_pipeline(items):
q = queue.Queue()
results = []
p = threading.Thread(target=producer, args=(q, items))
c = threading.Thread(target=consumer, args=(q, results))
p.start()
c.start()
p.join()
c.join()
return results
import threading
import queue
import time

# TODO: Implement a producer that puts 10 items into a queue (one every 0.02s),
# and a consumer that reads items and doubles them.
# Use a sentinel value (None) to signal the consumer to stop.
# Return the list of doubled values in order.

def producer(q, items):
  pass

def consumer(q, results):
  pass

def run_pipeline(items):
  pass

print(run_pipeline(list(range(1, 6))))
Expected Output
[2, 4, 6, 8, 10]
Hints

Hint 1: Producer: q.put(item); after all items, q.put(None) as sentinel.

Hint 2: Consumer: while True: item = q.get(); if item is None: break; results.append(item*2)

Hint 3: queue.Queue() is thread-safe; no explicit locking needed.


#7Thread-Local StorageMedium
threading.localthread-localper-thread state

Use threading.local() to give each thread its own isolated storage, preventing cross-thread contamination.

Solution:

import threading
import time

local_data = threading.local()

def worker(thread_id, results, lock):
local_data.value = thread_id
time.sleep(0.05)
my_value = local_data.value
with lock:
results.append(my_value == thread_id)

def test_thread_local():
results = []
lock = threading.Lock()
threads = [
threading.Thread(target=worker, args=(i, results, lock))
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
return all(results)
import threading
import time

# Thread-local storage gives each thread its own isolated copy of a variable.
# This is critical for per-request context in web servers.

local_data = threading.local()

# TODO: Each of 5 threads sets local_data.value = thread_id,
# sleeps briefly, then reads it back.
# Verify each thread reads its OWN value (no cross-thread contamination).
# Return True if all threads read back their own value.

def worker(thread_id, results, lock):
  pass

def test_thread_local():
  results = []
  lock = threading.Lock()
  threads = [
      threading.Thread(target=worker, args=(i, results, lock))
      for i in range(5)
  ]
  for t in threads:
      t.start()
  for t in threads:
      t.join()
  return all(results)

print(test_thread_local())
Expected Output
True
Hints

Hint 1: local_data.value = thread_id sets the thread-local value.

Hint 2: After sleep, assert local_data.value == thread_id — each thread sees only its own value.

Hint 3: threading.local() is like a dict keyed by thread identity.


#8Thread SubclassingMedium
Thread subclassrun()custom thread

Subclass threading.Thread to create a timed task that records execution time and result.

Solution:

import threading
import time

class TimedTask(threading.Thread):
def __init__(self, name, fn, *args):
super().__init__(name=name)
self.fn = fn
self.args = args
self.result = None
self.elapsed_ms = 0.0

def run(self):
t0 = time.perf_counter()
self.result = self.fn(*self.args)
self.elapsed_ms = (time.perf_counter() - t0) * 1000
import threading
import time

# TODO: Create a TimedTask class that subclasses threading.Thread.
# It receives a name, a callable, and args.
# Override run() to time the callable execution.
# After join(), the result and elapsed_ms should be accessible.

class TimedTask(threading.Thread):
  def __init__(self, name, fn, *args):
      pass

  def run(self):
      pass

def slow_sum(n):
  time.sleep(0.1)
  return sum(range(n))

task = TimedTask('sum_task', slow_sum, 100)
task.start()
task.join()
print(f'Result: {task.result}')
print(f'Elapsed > 100ms: {task.elapsed_ms > 100}')
Expected Output
Result: 4950
Elapsed > 100ms: True
Hints

Hint 1: Call super().__init__() in __init__ to properly initialize Thread.

Hint 2: In run(): t0 = time.perf_counter(); self.result = self.fn(*self.args); self.elapsed_ms = ...

Hint 3: Store result and elapsed_ms as instance attributes so they're readable after join().


#9Parallel File Download SimulationHard
I/O-boundconcurrent threadsThreadPoolExecutor

Simulate concurrent I/O-bound downloads with threads and measure the wall-clock speedup versus serial execution.

Solution:

import threading
import time
import random

def simulate_download(url, results, lock):
duration = random.uniform(0.05, 0.15)
time.sleep(duration)
with lock:
results.append((url, duration))

def parallel_downloads(urls):
results = []
lock = threading.Lock()
threads = [
threading.Thread(target=simulate_download, args=(url, results, lock))
for url in urls
]
for t in threads:
t.start()
for t in threads:
t.join()
return results
import threading
import time
import random

# The GIL doesn't hurt I/O-bound work — threads can run concurrently
# while waiting for I/O (simulated here with time.sleep).

# TODO: Simulate downloading N files concurrently using threads.
# Each "download" sleeps for a random duration between 0.05 and 0.15s.
# Return the total wall-clock time and verify it's much less than serial time.

def simulate_download(url, results, lock):
  duration = random.uniform(0.05, 0.15)
  time.sleep(duration)
  with lock:
      results.append((url, duration))

def parallel_downloads(urls):
  pass

random.seed(42)
urls = [f'https://example.com/file{i}.dat' for i in range(8)]

import time
t0 = time.perf_counter()
results = parallel_downloads(urls)
total_ms = (time.perf_counter() - t0) * 1000

serial_ms = sum(d for _, d in results) * 1000
print(f'Parallel: {total_ms:.0f}ms, Serial would be: {serial_ms:.0f}ms')
print(f'Speedup: {serial_ms/total_ms:.1f}x')
print(f'All {len(results)} files downloaded: {len(results) == 8}')
Expected Output
Parallel: XXXms, Serial would be: XXXms
Speedup: X.Xx
All 8 files downloaded: True
Hints

Hint 1: Spawn one thread per URL; all sleep concurrently.

Hint 2: The parallel time should be approximately max(durations), not sum(durations).

Hint 3: Use a threading.Lock to safely append to the shared results list.


#10Graceful Thread Shutdown with EventHard
Eventstop_eventgraceful shutdown

Implement a gracefully-stoppable background worker using threading.Event for clean shutdown signaling.

Solution:

import threading
import queue
import time

class BackgroundWorker:
def __init__(self):
self._queue = queue.Queue()
self._stop_event = threading.Event()
self._processed = 0
self._thread = None

def _run(self):
while not self._stop_event.is_set():
try:
item = self._queue.get(timeout=0.05)
self._processed += 1
self._queue.task_done()
except queue.Empty:
continue

def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()

def submit(self, item):
self._queue.put(item)

def stop(self, timeout=2.0):
self._queue.join()
self._stop_event.set()
if self._thread:
self._thread.join(timeout=timeout)
return self._processed

def processed_count(self):
return self._processed
import threading
import time

# TODO: Implement a background worker that runs until signaled to stop.
# Use threading.Event for clean shutdown.
# The worker should process items from a queue.
# Return the number of items processed when stopped.

import queue

class BackgroundWorker:
  def __init__(self):
      self._queue = queue.Queue()
      self._stop_event = threading.Event()
      self._processed = 0
      self._thread = None

  def start(self):
      pass

  def submit(self, item):
      pass

  def stop(self, timeout=2.0):
      pass

  def processed_count(self):
      return self._processed

worker = BackgroundWorker()
worker.start()

for i in range(10):
  worker.submit(i)
  time.sleep(0.02)

count = worker.stop()
print(f'Processed: {count}')
print(f'All items handled: {count == 10}')
Expected Output
Processed: 10
All items handled: True
Hints

Hint 1: The worker loop: while not self._stop_event.is_set(): try: item = self._queue.get(timeout=0.05) ...

Hint 2: stop() sets the event AND joins the thread, returning processed count.

Hint 3: Use queue.get(timeout=...) so the loop checks the stop event periodically.


#11Thread Pool with Work StealingHard
thread poolwork stealingqueuedynamic dispatch

Implement a fixed-size thread pool with a shared work queue, returning Future objects for asynchronous results.

Solution:

import threading
import queue
import time

class _Future:
def __init__(self):
self._result = None
self._done = threading.Event()

def set_result(self, value):
self._result = value
self._done.set()

def result(self, timeout=5.0):
self._done.wait(timeout=timeout)
return self._result

class SimpleThreadPool:
def __init__(self, n_workers):
self._queue = queue.Queue()
self._workers = []
for _ in range(n_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self._workers.append(t)

def _worker(self):
while True:
item = self._queue.get()
if item is None:
break
fn, args, future = item
try:
future.set_result(fn(*args))
except Exception as e:
future.set_result(e)
self._queue.task_done()

def submit(self, fn, *args):
future = _Future()
self._queue.put((fn, args, future))
return future

def shutdown(self, wait=True):
for _ in self._workers:
self._queue.put(None)
if wait:
for t in self._workers:
t.join()
import threading
import queue
import time
from typing import Callable, Any

# TODO: Implement a simple fixed-size thread pool.
# - __init__(n_workers): create n_workers threads
# - submit(fn, *args) -> Future-like object with .result() blocking
# - shutdown(wait=True): stop all workers
# Workers pull tasks from a shared queue (this is "work stealing").

class SimpleThreadPool:
  def __init__(self, n_workers):
      pass

  def submit(self, fn, *args):
      pass

  def shutdown(self, wait=True):
      pass

class _Future:
  def __init__(self):
      self._result = None
      self._done = threading.Event()

  def set_result(self, value):
      self._result = value
      self._done.set()

  def result(self, timeout=5.0):
      self._done.wait(timeout=timeout)
      return self._result

# Test
pool = SimpleThreadPool(4)

futures = [pool.submit(lambda x=i: x ** 2, ) for i in range(8)]
# Use named arg to avoid closure issues
futures2 = []
for i in range(8):
  def task(n=i):
      time.sleep(0.05)
      return n ** 2
  futures2.append(pool.submit(task))

results = sorted(f.result() for f in futures2)
pool.shutdown()
print(results)
Expected Output
[0, 1, 4, 9, 16, 25, 36, 49]
Hints

Hint 1: Use queue.Queue() for tasks; each task is a (fn, args, future) tuple.

Hint 2: Worker loop: while True: task = q.get(); if task is None: break; future.set_result(fn(*args))

Hint 3: shutdown() puts n_workers sentinel Nones into the queue to stop each worker.

© 2026 EngineersOfAI. All rights reserved.