Python Threading in Python Practice Problems & Exercises
Practice: Threading in Python
← Back to lessonCreate a thread, pass it arguments via args, start it, and wait for completion with join.
Solution:
import threading
import time
def greet(name):
time.sleep(0.1)
print(f'Hello from thread {name}')
t = threading.Thread(target=greet, args=('Worker',))
t.start()
t.join()
print('Main done')
import threading
import time
# TODO: Create a thread that prints "Hello from thread {name}" after sleeping 0.1s.
# Start it and wait for it to finish before printing "Main done".
def greet(name):
pass
t = threading.Thread(target=greet, args=('Worker',))
# Start and join the thread
# ...
print('Main done')
Expected Output
Hello from thread Worker
Main doneHints
Hint 1: threading.Thread(target=fn, args=(arg,)) creates a thread.
Hint 2: t.start() launches the thread; t.join() blocks until it finishes.
Hint 3: Without t.join(), 'Main done' might print before the thread runs.
Spawn multiple threads, collect results, and join all before printing.
Solution:
import threading
import time
results = []
lock = threading.Lock()
def compute(n, delay=0.05):
time.sleep(delay)
with lock:
results.append(n * n)
threads = [threading.Thread(target=compute, args=(n,)) for n in range(1, 6)]
for t in threads:
t.start()
for t in threads:
t.join()
print(sorted(results))
import threading
import time
results = []
def compute(n, delay=0.05):
time.sleep(delay)
results.append(n * n)
# TODO: Spawn 5 threads that compute squares of 1..5.
# Wait for ALL threads to finish, then print sorted results.
Expected Output
[1, 4, 9, 16, 25]Hints
Hint 1: Create a list of threads: threads = [threading.Thread(...) for n in range(1,6)]
Hint 2: Start all threads: for t in threads: t.start()
Hint 3: Then join all: for t in threads: t.join()
Start a heartbeat thread as a daemon so it automatically terminates when the main program ends.
Solution:
import threading
import time
beat_count = [0]
def heartbeat(interval=0.1):
while True:
beat_count[0] += 1
time.sleep(interval)
t = threading.Thread(target=heartbeat, kwargs={'interval': 0.1}, daemon=True)
t.start()
time.sleep(0.35)
print(f'Beats: {beat_count[0]}')
import threading
import time
beat_count = [0]
def heartbeat(interval=0.1):
while True:
beat_count[0] += 1
time.sleep(interval)
# TODO: Start the heartbeat as a DAEMON thread.
# Let the main thread sleep for 0.35 seconds.
# After waking, print the beat count.
# The daemon should stop automatically when main exits.
Expected Output
Beats: 3Hints
Hint 1: Set t.daemon = True before t.start(), or pass daemon=True to Thread().
Hint 2: A daemon thread is killed automatically when all non-daemon threads finish.
Hint 3: After 0.35s sleep, about 3 beats should have occurred (0.1s interval).
Collect return values from threads by writing results into a pre-allocated shared list.
Solution:
import threading
import time
def slow_square(n, results, index):
time.sleep(0.1)
results[index] = n * n
def parallel_sum(values):
results = [None] * len(values)
threads = [
threading.Thread(target=slow_square, args=(v, results, i))
for i, v in enumerate(values)
]
for t in threads:
t.start()
for t in threads:
t.join()
return sum(results)
print(parallel_sum([3, 4, 5]))
import threading
import time
# Python threads don't have a return value.
# TODO: Run two computation tasks concurrently using threads,
# collecting results into a shared list.
# Return the sum of the results.
def slow_square(n, results, index):
time.sleep(0.1)
# Store result at index
pass
def parallel_sum(values):
# Run each slow_square in a thread, then return sum of results
pass
print(parallel_sum([3, 4, 5])) # 9 + 16 + 25 = 50
Expected Output
50Hints
Hint 1: Pre-allocate a results list: results = [None] * len(values)
Hint 2: Pass the list and an index to each thread function.
Hint 3: After joining all threads, sum(results) gives the total.
Implement a thread-safe counter using a Lock to prevent race conditions on concurrent increments.
Solution:
import threading
class ThreadSafeCounter:
def __init__(self):
self.count = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self.count += 1
def value(self):
return self.count
import threading
# Without a lock, concurrent increments cause data races.
# TODO: Implement ThreadSafeCounter using a Lock.
# 10 threads each increment 1000 times.
# Final count must always be exactly 10000.
class ThreadSafeCounter:
def __init__(self):
self.count = 0
def increment(self):
pass
def value(self):
return self.count
def test_thread_safe_counter():
counter = ThreadSafeCounter()
threads = [
threading.Thread(target=lambda: [counter.increment() for _ in range(1000)])
for _ in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
return counter.value()
print(test_thread_safe_counter()) # Must be exactly 10000
Expected Output
10000Hints
Hint 1: Add self._lock = threading.Lock() in __init__.
Hint 2: In increment: with self._lock: self.count += 1
Hint 3: Without the lock, multiple threads read the same value before any writes it back.
Build a thread-safe producer-consumer pipeline using queue.Queue and a sentinel value to signal completion.
Solution:
import threading
import queue
import time
def producer(q, items):
for item in items:
time.sleep(0.02)
q.put(item)
q.put(None) # sentinel
def consumer(q, results):
while True:
item = q.get()
if item is None:
break
results.append(item * 2)
def run_pipeline(items):
q = queue.Queue()
results = []
p = threading.Thread(target=producer, args=(q, items))
c = threading.Thread(target=consumer, args=(q, results))
p.start()
c.start()
p.join()
c.join()
return results
import threading
import queue
import time
# TODO: Implement a producer that puts 10 items into a queue (one every 0.02s),
# and a consumer that reads items and doubles them.
# Use a sentinel value (None) to signal the consumer to stop.
# Return the list of doubled values in order.
def producer(q, items):
pass
def consumer(q, results):
pass
def run_pipeline(items):
pass
print(run_pipeline(list(range(1, 6))))
Expected Output
[2, 4, 6, 8, 10]Hints
Hint 1: Producer: q.put(item); after all items, q.put(None) as sentinel.
Hint 2: Consumer: while True: item = q.get(); if item is None: break; results.append(item*2)
Hint 3: queue.Queue() is thread-safe; no explicit locking needed.
Use threading.local() to give each thread its own isolated storage, preventing cross-thread contamination.
Solution:
import threading
import time
local_data = threading.local()
def worker(thread_id, results, lock):
local_data.value = thread_id
time.sleep(0.05)
my_value = local_data.value
with lock:
results.append(my_value == thread_id)
def test_thread_local():
results = []
lock = threading.Lock()
threads = [
threading.Thread(target=worker, args=(i, results, lock))
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
return all(results)
import threading
import time
# Thread-local storage gives each thread its own isolated copy of a variable.
# This is critical for per-request context in web servers.
local_data = threading.local()
# TODO: Each of 5 threads sets local_data.value = thread_id,
# sleeps briefly, then reads it back.
# Verify each thread reads its OWN value (no cross-thread contamination).
# Return True if all threads read back their own value.
def worker(thread_id, results, lock):
pass
def test_thread_local():
results = []
lock = threading.Lock()
threads = [
threading.Thread(target=worker, args=(i, results, lock))
for i in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
return all(results)
print(test_thread_local())
Expected Output
TrueHints
Hint 1: local_data.value = thread_id sets the thread-local value.
Hint 2: After sleep, assert local_data.value == thread_id — each thread sees only its own value.
Hint 3: threading.local() is like a dict keyed by thread identity.
Subclass threading.Thread to create a timed task that records execution time and result.
Solution:
import threading
import time
class TimedTask(threading.Thread):
def __init__(self, name, fn, *args):
super().__init__(name=name)
self.fn = fn
self.args = args
self.result = None
self.elapsed_ms = 0.0
def run(self):
t0 = time.perf_counter()
self.result = self.fn(*self.args)
self.elapsed_ms = (time.perf_counter() - t0) * 1000
import threading
import time
# TODO: Create a TimedTask class that subclasses threading.Thread.
# It receives a name, a callable, and args.
# Override run() to time the callable execution.
# After join(), the result and elapsed_ms should be accessible.
class TimedTask(threading.Thread):
def __init__(self, name, fn, *args):
pass
def run(self):
pass
def slow_sum(n):
time.sleep(0.1)
return sum(range(n))
task = TimedTask('sum_task', slow_sum, 100)
task.start()
task.join()
print(f'Result: {task.result}')
print(f'Elapsed > 100ms: {task.elapsed_ms > 100}')
Expected Output
Result: 4950
Elapsed > 100ms: TrueHints
Hint 1: Call super().__init__() in __init__ to properly initialize Thread.
Hint 2: In run(): t0 = time.perf_counter(); self.result = self.fn(*self.args); self.elapsed_ms = ...
Hint 3: Store result and elapsed_ms as instance attributes so they're readable after join().
Simulate concurrent I/O-bound downloads with threads and measure the wall-clock speedup versus serial execution.
Solution:
import threading
import time
import random
def simulate_download(url, results, lock):
duration = random.uniform(0.05, 0.15)
time.sleep(duration)
with lock:
results.append((url, duration))
def parallel_downloads(urls):
results = []
lock = threading.Lock()
threads = [
threading.Thread(target=simulate_download, args=(url, results, lock))
for url in urls
]
for t in threads:
t.start()
for t in threads:
t.join()
return results
import threading
import time
import random
# The GIL doesn't hurt I/O-bound work — threads can run concurrently
# while waiting for I/O (simulated here with time.sleep).
# TODO: Simulate downloading N files concurrently using threads.
# Each "download" sleeps for a random duration between 0.05 and 0.15s.
# Return the total wall-clock time and verify it's much less than serial time.
def simulate_download(url, results, lock):
duration = random.uniform(0.05, 0.15)
time.sleep(duration)
with lock:
results.append((url, duration))
def parallel_downloads(urls):
pass
random.seed(42)
urls = [f'https://example.com/file{i}.dat' for i in range(8)]
import time
t0 = time.perf_counter()
results = parallel_downloads(urls)
total_ms = (time.perf_counter() - t0) * 1000
serial_ms = sum(d for _, d in results) * 1000
print(f'Parallel: {total_ms:.0f}ms, Serial would be: {serial_ms:.0f}ms')
print(f'Speedup: {serial_ms/total_ms:.1f}x')
print(f'All {len(results)} files downloaded: {len(results) == 8}')
Expected Output
Parallel: XXXms, Serial would be: XXXms
Speedup: X.Xx
All 8 files downloaded: TrueHints
Hint 1: Spawn one thread per URL; all sleep concurrently.
Hint 2: The parallel time should be approximately max(durations), not sum(durations).
Hint 3: Use a threading.Lock to safely append to the shared results list.
Implement a gracefully-stoppable background worker using threading.Event for clean shutdown signaling.
Solution:
import threading
import queue
import time
class BackgroundWorker:
def __init__(self):
self._queue = queue.Queue()
self._stop_event = threading.Event()
self._processed = 0
self._thread = None
def _run(self):
while not self._stop_event.is_set():
try:
item = self._queue.get(timeout=0.05)
self._processed += 1
self._queue.task_done()
except queue.Empty:
continue
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def submit(self, item):
self._queue.put(item)
def stop(self, timeout=2.0):
self._queue.join()
self._stop_event.set()
if self._thread:
self._thread.join(timeout=timeout)
return self._processed
def processed_count(self):
return self._processed
import threading
import time
# TODO: Implement a background worker that runs until signaled to stop.
# Use threading.Event for clean shutdown.
# The worker should process items from a queue.
# Return the number of items processed when stopped.
import queue
class BackgroundWorker:
def __init__(self):
self._queue = queue.Queue()
self._stop_event = threading.Event()
self._processed = 0
self._thread = None
def start(self):
pass
def submit(self, item):
pass
def stop(self, timeout=2.0):
pass
def processed_count(self):
return self._processed
worker = BackgroundWorker()
worker.start()
for i in range(10):
worker.submit(i)
time.sleep(0.02)
count = worker.stop()
print(f'Processed: {count}')
print(f'All items handled: {count == 10}')
Expected Output
Processed: 10
All items handled: TrueHints
Hint 1: The worker loop: while not self._stop_event.is_set(): try: item = self._queue.get(timeout=0.05) ...
Hint 2: stop() sets the event AND joins the thread, returning processed count.
Hint 3: Use queue.get(timeout=...) so the loop checks the stop event periodically.
Implement a fixed-size thread pool with a shared work queue, returning Future objects for asynchronous results.
Solution:
import threading
import queue
import time
class _Future:
def __init__(self):
self._result = None
self._done = threading.Event()
def set_result(self, value):
self._result = value
self._done.set()
def result(self, timeout=5.0):
self._done.wait(timeout=timeout)
return self._result
class SimpleThreadPool:
def __init__(self, n_workers):
self._queue = queue.Queue()
self._workers = []
for _ in range(n_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self._workers.append(t)
def _worker(self):
while True:
item = self._queue.get()
if item is None:
break
fn, args, future = item
try:
future.set_result(fn(*args))
except Exception as e:
future.set_result(e)
self._queue.task_done()
def submit(self, fn, *args):
future = _Future()
self._queue.put((fn, args, future))
return future
def shutdown(self, wait=True):
for _ in self._workers:
self._queue.put(None)
if wait:
for t in self._workers:
t.join()
import threading
import queue
import time
from typing import Callable, Any
# TODO: Implement a simple fixed-size thread pool.
# - __init__(n_workers): create n_workers threads
# - submit(fn, *args) -> Future-like object with .result() blocking
# - shutdown(wait=True): stop all workers
# Workers pull tasks from a shared queue (this is "work stealing").
class SimpleThreadPool:
def __init__(self, n_workers):
pass
def submit(self, fn, *args):
pass
def shutdown(self, wait=True):
pass
class _Future:
def __init__(self):
self._result = None
self._done = threading.Event()
def set_result(self, value):
self._result = value
self._done.set()
def result(self, timeout=5.0):
self._done.wait(timeout=timeout)
return self._result
# Test
pool = SimpleThreadPool(4)
futures = [pool.submit(lambda x=i: x ** 2, ) for i in range(8)]
# Use named arg to avoid closure issues
futures2 = []
for i in range(8):
def task(n=i):
time.sleep(0.05)
return n ** 2
futures2.append(pool.submit(task))
results = sorted(f.result() for f in futures2)
pool.shutdown()
print(results)
Expected Output
[0, 1, 4, 9, 16, 25, 36, 49]Hints
Hint 1: Use queue.Queue() for tasks; each task is a (fn, args, future) tuple.
Hint 2: Worker loop: while True: task = q.get(); if task is None: break; future.set_result(fn(*args))
Hint 3: shutdown() puts n_workers sentinel Nones into the queue to stop each worker.
