Python Multiprocessing in Python Practice Problems & Exercises
Practice: Multiprocessing in Python
← Back to lessonCreate, start, and join a subprocess — the multiprocessing equivalent of threading basics.
Solution:
from multiprocessing import Process
import time
def worker(name, duration):
time.sleep(duration)
print(f'Process {name} done')
if __name__ == '__main__':
p = Process(target=worker, args=('Alpha', 0.1))
p.start()
p.join()
print('Main process done')
from multiprocessing import Process
import time
def worker(name, duration):
time.sleep(duration)
print(f'Process {name} done')
# TODO: Create a Process with target=worker, start it, and join it.
# Print "Main process done" after the worker finishes.
if __name__ == '__main__':
p = Process(target=worker, args=('Alpha', 0.1))
# start, join, then print
Expected Output
Process Alpha done
Main process doneHints
Hint 1: Process(target=fn, args=(a, b)) mirrors threading.Thread syntax.
Hint 2: p.start() launches the process; p.join() blocks until it exits.
Hint 3: Always guard multiprocessing code with if __name__ == '__main__':.
Use Pool.map to apply a CPU-intensive function to a list of inputs in parallel.
Solution:
from multiprocessing import Pool
def cpu_intensive(n):
total = 0
for i in range(n * 10000):
total += i
return total
if __name__ == '__main__':
numbers = [100, 200, 150, 300, 250]
with Pool() as pool:
results = pool.map(cpu_intensive, numbers)
print(results)
from multiprocessing import Pool
import time
def cpu_intensive(n):
# Simulate CPU work
total = 0
for i in range(n * 10000):
total += i
return total
if __name__ == '__main__':
numbers = [100, 200, 150, 300, 250]
# TODO: Use Pool.map to compute cpu_intensive for each number in parallel.
# Print the results.
pass
Expected Output
[49999500000, 199999000000, 112498500000, 449997000000, 312498750000]Hints
Hint 1: with Pool() as pool: results = pool.map(fn, iterable)
Hint 2: Pool() defaults to os.cpu_count() workers.
Hint 3: map() blocks until all tasks complete and returns results in order.
Use multiprocessing.Queue to send computed results from a child process back to the parent.
Solution:
from multiprocessing import Process, Queue
def compute_primes(limit, result_queue):
primes = [n for n in range(2, limit) if all(n % i != 0 for i in range(2, n))]
result_queue.put(primes)
if __name__ == '__main__':
q = Queue()
p = Process(target=compute_primes, args=(50, q))
p.start()
result = q.get()
p.join()
print(result)
from multiprocessing import Process, Queue
def compute_primes(limit, result_queue):
primes = [n for n in range(2, limit) if all(n % i != 0 for i in range(2, n))]
result_queue.put(primes)
if __name__ == '__main__':
q = Queue()
# TODO: Start a Process that finds primes up to 50 using compute_primes.
# Retrieve the result from the queue and print it.
pass
Expected Output
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47]Hints
Hint 1: Pass the Queue object to the child process via args.
Hint 2: result_queue.put(data) in the child; result_queue.get() in the parent.
Hint 3: Queue is process-safe; values are serialized via pickle.
Use Pool.starmap to apply a function with multiple arguments to a list of argument tuples.
Solution:
from multiprocessing import Pool
def power(base, exp):
return base ** exp
if __name__ == '__main__':
pairs = [(2, 10), (3, 5), (5, 4), (10, 3), (7, 2)]
with Pool() as pool:
results = pool.starmap(power, pairs)
print(results)
from multiprocessing import Pool
def power(base, exp):
return base ** exp
if __name__ == '__main__':
# TODO: Use Pool.starmap to compute powers for these (base, exp) pairs:
# (2, 10), (3, 5), (5, 4), (10, 3), (7, 2)
pairs = [(2, 10), (3, 5), (5, 4), (10, 3), (7, 2)]
pass
Expected Output
[1024, 243, 625, 1000, 49]Hints
Hint 1: pool.starmap(fn, list_of_tuples) unpacks each tuple as positional args.
Hint 2: This is the multiprocessing equivalent of itertools.starmap.
Hint 3: Use with Pool() as pool: for automatic cleanup.
Use multiprocessing.Value and Array to share state between a parent and child process.
Solution:
from multiprocessing import Process, Value, Array
def fill_array(arr, val):
for i in range(len(arr)):
arr[i] = val * (i + 1)
def increment_counter(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1
if __name__ == '__main__':
shared_arr = Array('i', 5)
counter = Value('i', 0)
p1 = Process(target=fill_array, args=(shared_arr, 3))
p2 = Process(target=increment_counter, args=(counter, 100))
p1.start(); p2.start()
p1.join(); p2.join()
print('Array:', list(shared_arr))
print('Counter:', counter.value)
from multiprocessing import Process, Value, Array
import ctypes
def fill_array(arr, val):
for i in range(len(arr)):
arr[i] = val * (i + 1)
def increment_counter(counter, n):
for _ in range(n):
counter.value += 1
if __name__ == '__main__':
# TODO: Create a shared Array of 5 integers and a shared Value (integer).
# Run fill_array in a process to fill the array with multiples of 3.
# Run increment_counter in a process to increment the counter 100 times.
# Print the final array and counter.
pass
Expected Output
Array: [3, 6, 9, 12, 15]
Counter: 100Hints
Hint 1: Value('i', 0) creates a shared integer; Array('i', 5) creates a shared int array.
Hint 2: Pass them to processes via args — they live in shared memory.
Hint 3: For safe concurrent access to Value, use value.get_lock().
Use a Pipe for bidirectional request-response communication between a parent and child process.
Solution:
from multiprocessing import Process, Pipe
def calculator(conn):
while True:
msg = conn.recv()
if msg == 'stop':
conn.close()
break
op, a, b = msg
if op == 'add':
conn.send(a + b)
elif op == 'mul':
conn.send(a * b)
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=calculator, args=(child_conn,))
p.start()
operations = [('add', 10, 5), ('mul', 4, 7), ('add', 100, 200)]
for op, a, b in operations:
parent_conn.send((op, a, b))
result = parent_conn.recv()
print(f'{op}({a}, {b}) = {result}')
parent_conn.send('stop')
p.join()
from multiprocessing import Process, Pipe
def calculator(conn):
while True:
msg = conn.recv()
if msg == 'stop':
conn.close()
break
op, a, b = msg
if op == 'add':
conn.send(a + b)
elif op == 'mul':
conn.send(a * b)
if __name__ == '__main__':
# TODO: Use a Pipe to communicate with the calculator process.
# Send three operations and receive the results.
# Then send 'stop' to shut it down.
pass
Expected Output
add(10, 5) = 15
mul(4, 7) = 28
add(100, 200) = 300Hints
Hint 1: parent_conn, child_conn = Pipe() creates a duplex pipe.
Hint 2: parent_conn.send(('add', 10, 5)) sends a tuple to the child.
Hint 3: result = parent_conn.recv() blocks until the child sends a response.
Use apply_async with get(timeout=...) and exception handling to manage unreliable parallel tasks.
Solution:
from multiprocessing import Pool
import time
def flaky_task(n):
time.sleep(n * 0.1)
if n == 3:
raise ValueError(f"Task {n} failed!")
return n * 10
if __name__ == '__main__':
with Pool(4) as pool:
async_results = [(n, pool.apply_async(flaky_task, args=(n,))) for n in range(1, 6)]
results = []
for n, ar in async_results:
try:
results.append((n, ar.get(timeout=2)))
except ValueError as e:
results.append((n, f'ERROR: {e}'))
print(results)
from multiprocessing import Pool
import time
def flaky_task(n):
time.sleep(n * 0.1)
if n == 3:
raise ValueError(f"Task {n} failed!")
return n * 10
if __name__ == '__main__':
# TODO: Submit 5 tasks (n=1..5) using pool.apply_async.
# Collect results with a 2-second timeout per task.
# Catch ValueError for failing tasks.
# Return list of (n, result_or_error).
pass
Expected Output
[(1, 10), (2, 20), (3, 'ERROR: Task 3 failed!'), (4, 40), (5, 50)]Hints
Hint 1: pool.apply_async(fn, args=(n,)) returns an AsyncResult.
Hint 2: result.get(timeout=2) raises the exception from the worker if it failed.
Hint 3: Wrap result.get() in try/except ValueError to catch worker errors.
Use imap_unordered to stream results from a process pool as each task completes, not waiting for all.
Solution:
from multiprocessing import Pool
import time
import random
def slow_process(item):
delay = random.uniform(0.05, 0.2)
time.sleep(delay)
return item ** 2, delay
if __name__ == '__main__':
random.seed(99)
items = list(range(1, 9))
received = []
with Pool(4) as pool:
for result in pool.imap_unordered(slow_process, items):
square, delay = result
received.append(square)
print(f'All 8 results received: {len(received) == 8}')
from multiprocessing import Pool
import time
import random
def slow_process(item):
delay = random.uniform(0.05, 0.2)
time.sleep(delay)
return item ** 2, delay
if __name__ == '__main__':
random.seed(99)
items = list(range(1, 9))
# TODO: Use pool.imap_unordered to process items as they complete.
# Print each result as it arrives (not waiting for all to finish).
# Count results and verify all 8 were processed.
pass
Expected Output
All 8 results received: TrueHints
Hint 1: pool.imap_unordered(fn, iterable) is a lazy iterator over results as they complete.
Hint 2: Unlike map(), results arrive out of order (whichever finishes first).
Hint 3: Iterate: for result in pool.imap_unordered(fn, items): print(result)
Parallelize a CPU-intensive image processing pipeline using Pool.map and verify result correctness.
Solution:
from multiprocessing import Pool
def process_images_parallel(images):
with Pool() as pool:
return pool.map(process_image, images)
from multiprocessing import Pool
import time
import random
import math
# Simulate an image processing pipeline where each "image" is a list of pixels.
# The operation is CPU-intensive (computing per-pixel transformations).
def process_pixel(val):
# Simulate expensive per-pixel computation
return int(math.sqrt(val * 255) * math.sin(val / 255.0 * math.pi))
def process_image(pixels):
return [process_pixel(p) for p in pixels]
def process_images_serial(images):
return [process_image(img) for img in images]
def process_images_parallel(images):
# TODO: Use Pool.map to process images in parallel
pass
if __name__ == '__main__':
random.seed(7)
images = [[random.randint(0, 255) for _ in range(1000)] for _ in range(8)]
t0 = time.perf_counter()
serial_results = process_images_serial(images)
serial_ms = (time.perf_counter() - t0) * 1000
t0 = time.perf_counter()
parallel_results = process_images_parallel(images)
parallel_ms = (time.perf_counter() - t0) * 1000
print(f'Results match: {serial_results == parallel_results}')
print(f'Serial: {serial_ms:.1f}ms, Parallel: {parallel_ms:.1f}ms')
Expected Output
Results match: True
Serial: X.Xms, Parallel: X.XmsHints
Hint 1: with Pool() as pool: return pool.map(process_image, images)
Hint 2: Pool.map preserves order — results match the serial version.
Hint 3: The speedup depends on CPU count; on multi-core machines it should be near N-core speedup.
Use multiprocessing.Manager().dict() as a shared data structure across pool worker processes.
Solution:
from multiprocessing import Pool, Manager
def analyze_text(args):
text_id, text, shared_stats = args
words = text.lower().split()
shared_stats[text_id] = {'words': len(words), 'chars': len(text)}
return text_id
if __name__ == '__main__':
texts = [
'The quick brown fox jumps over the lazy dog',
'Python is a great programming language for data science',
'Multiprocessing bypasses the GIL for CPU bound tasks',
'Shared memory and queues enable inter process communication',
]
with Manager() as manager:
shared_stats = manager.dict()
args = [(i, t, shared_stats) for i, t in enumerate(texts)]
with Pool(4) as pool:
pool.map(analyze_text, args)
for i in range(len(texts)):
s = shared_stats[i]
print(f'Text {i}: {s["words"]} words, {s["chars"]} chars')
from multiprocessing import Pool, Manager
import time
def analyze_text(args):
text_id, text, shared_stats = args
words = text.lower().split()
word_count = len(words)
char_count = len(text)
shared_stats[text_id] = {'words': word_count, 'chars': char_count}
return text_id
if __name__ == '__main__':
texts = [
'The quick brown fox jumps over the lazy dog',
'Python is a great programming language for data science',
'Multiprocessing bypasses the GIL for CPU bound tasks',
'Shared memory and queues enable inter process communication',
]
# TODO: Use Manager().dict() for shared_stats.
# Run analyze_text in a Pool.map call.
# Print the word and char counts for each text.
pass
Expected Output
Text 0: 9 words, 43 chars
Text 1: 9 words, 54 chars
Text 2: 8 words, 52 chars
Text 3: 8 words, 57 charsHints
Hint 1: manager = Manager() then shared_stats = manager.dict() creates a proxy dict.
Hint 2: Pass shared_stats in the args tuple: [(i, t, shared_stats) for i, t in enumerate(texts)]
Hint 3: Manager dict is process-safe but slower than Value/Array — use for complex shared state.
Build a process supervisor that automatically restarts a crashing worker process up to a maximum retry count.
Solution:
from multiprocessing import Process
import time
class Supervisor:
def __init__(self, target, args, max_restarts=5):
self.target = target
self.args = args
self.max_restarts = max_restarts
def run(self):
restarts = 0
while restarts <= self.max_restarts:
p = Process(target=self.target, args=self.args)
p.start()
p.join()
if p.exitcode == 0:
return # Success
restarts += 1
if restarts <= self.max_restarts:
time.sleep(0.01) # Brief backoff before restart
print(f'Max restarts ({self.max_restarts}) exceeded')
from multiprocessing import Process
import time
import random
def unreliable_worker(worker_id, max_rounds):
random.seed(worker_id)
for i in range(max_rounds):
time.sleep(0.05)
if random.random() < 0.3: # 30% chance of crash
raise RuntimeError(f'Worker {worker_id} crashed at round {i}')
print(f'Worker {worker_id} completed all rounds')
# TODO: Implement a Supervisor that:
# - Starts a worker process
# - Monitors it (join with a timeout)
# - If it crashes (non-zero exitcode), restarts it
# - Stops after the worker completes successfully OR after max_restarts
class Supervisor:
def __init__(self, target, args, max_restarts=5):
pass
def run(self):
pass
if __name__ == '__main__':
sup = Supervisor(target=unreliable_worker, args=(42, 5), max_restarts=10)
sup.run()
Expected Output
Worker 42 completed all roundsHints
Hint 1: p.exitcode == 0 means clean exit; exitcode != 0 means crash or non-zero exit.
Hint 2: p.join() after p.start() blocks until the process ends.
Hint 3: Loop: while restarts < max_restarts: start, join, check exitcode.
