Skip to main content

Python Multiprocessing in Python Practice Problems & Exercises

Practice: Multiprocessing in Python

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1Start a Process and Wait for ItEasy
Processstartjointarget

Create, start, and join a subprocess — the multiprocessing equivalent of threading basics.

Solution:

from multiprocessing import Process
import time

def worker(name, duration):
time.sleep(duration)
print(f'Process {name} done')

if __name__ == '__main__':
p = Process(target=worker, args=('Alpha', 0.1))
p.start()
p.join()
print('Main process done')
from multiprocessing import Process
import time

def worker(name, duration):
  time.sleep(duration)
  print(f'Process {name} done')

# TODO: Create a Process with target=worker, start it, and join it.
# Print "Main process done" after the worker finishes.

if __name__ == '__main__':
  p = Process(target=worker, args=('Alpha', 0.1))
  # start, join, then print
Expected Output
Process Alpha done
Main process done
Hints

Hint 1: Process(target=fn, args=(a, b)) mirrors threading.Thread syntax.

Hint 2: p.start() launches the process; p.join() blocks until it exits.

Hint 3: Always guard multiprocessing code with if __name__ == '__main__':.


#2Pool.map — Parallel CPU WorkEasy
PoolmapCPU-bound

Use Pool.map to apply a CPU-intensive function to a list of inputs in parallel.

Solution:

from multiprocessing import Pool

def cpu_intensive(n):
total = 0
for i in range(n * 10000):
total += i
return total

if __name__ == '__main__':
numbers = [100, 200, 150, 300, 250]
with Pool() as pool:
results = pool.map(cpu_intensive, numbers)
print(results)
from multiprocessing import Pool
import time

def cpu_intensive(n):
  # Simulate CPU work
  total = 0
  for i in range(n * 10000):
      total += i
  return total

if __name__ == '__main__':
  numbers = [100, 200, 150, 300, 250]
  # TODO: Use Pool.map to compute cpu_intensive for each number in parallel.
  # Print the results.
  pass
Expected Output
[49999500000, 199999000000, 112498500000, 449997000000, 312498750000]
Hints

Hint 1: with Pool() as pool: results = pool.map(fn, iterable)

Hint 2: Pool() defaults to os.cpu_count() workers.

Hint 3: map() blocks until all tasks complete and returns results in order.


#3Process with Queue — Pass Results BackEasy
QueuemultiprocessingIPC

Use multiprocessing.Queue to send computed results from a child process back to the parent.

Solution:

from multiprocessing import Process, Queue

def compute_primes(limit, result_queue):
primes = [n for n in range(2, limit) if all(n % i != 0 for i in range(2, n))]
result_queue.put(primes)

if __name__ == '__main__':
q = Queue()
p = Process(target=compute_primes, args=(50, q))
p.start()
result = q.get()
p.join()
print(result)
from multiprocessing import Process, Queue

def compute_primes(limit, result_queue):
  primes = [n for n in range(2, limit) if all(n % i != 0 for i in range(2, n))]
  result_queue.put(primes)

if __name__ == '__main__':
  q = Queue()
  # TODO: Start a Process that finds primes up to 50 using compute_primes.
  # Retrieve the result from the queue and print it.
  pass
Expected Output
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47]
Hints

Hint 1: Pass the Queue object to the child process via args.

Hint 2: result_queue.put(data) in the child; result_queue.get() in the parent.

Hint 3: Queue is process-safe; values are serialized via pickle.


#4Pool.starmap — Multiple ArgumentsEasy
starmapmultiple argsPool

Use Pool.starmap to apply a function with multiple arguments to a list of argument tuples.

Solution:

from multiprocessing import Pool

def power(base, exp):
return base ** exp

if __name__ == '__main__':
pairs = [(2, 10), (3, 5), (5, 4), (10, 3), (7, 2)]
with Pool() as pool:
results = pool.starmap(power, pairs)
print(results)
from multiprocessing import Pool

def power(base, exp):
  return base ** exp

if __name__ == '__main__':
  # TODO: Use Pool.starmap to compute powers for these (base, exp) pairs:
  # (2, 10), (3, 5), (5, 4), (10, 3), (7, 2)
  pairs = [(2, 10), (3, 5), (5, 4), (10, 3), (7, 2)]
  pass
Expected Output
[1024, 243, 625, 1000, 49]
Hints

Hint 1: pool.starmap(fn, list_of_tuples) unpacks each tuple as positional args.

Hint 2: This is the multiprocessing equivalent of itertools.starmap.

Hint 3: Use with Pool() as pool: for automatic cleanup.


#5Shared Memory with Value and ArrayMedium
ValueArrayshared memorymultiprocessing

Use multiprocessing.Value and Array to share state between a parent and child process.

Solution:

from multiprocessing import Process, Value, Array

def fill_array(arr, val):
for i in range(len(arr)):
arr[i] = val * (i + 1)

def increment_counter(counter, n):
for _ in range(n):
with counter.get_lock():
counter.value += 1

if __name__ == '__main__':
shared_arr = Array('i', 5)
counter = Value('i', 0)

p1 = Process(target=fill_array, args=(shared_arr, 3))
p2 = Process(target=increment_counter, args=(counter, 100))

p1.start(); p2.start()
p1.join(); p2.join()

print('Array:', list(shared_arr))
print('Counter:', counter.value)
from multiprocessing import Process, Value, Array
import ctypes

def fill_array(arr, val):
  for i in range(len(arr)):
      arr[i] = val * (i + 1)

def increment_counter(counter, n):
  for _ in range(n):
      counter.value += 1

if __name__ == '__main__':
  # TODO: Create a shared Array of 5 integers and a shared Value (integer).
  # Run fill_array in a process to fill the array with multiples of 3.
  # Run increment_counter in a process to increment the counter 100 times.
  # Print the final array and counter.
  pass
Expected Output
Array: [3, 6, 9, 12, 15]
Counter: 100
Hints

Hint 1: Value('i', 0) creates a shared integer; Array('i', 5) creates a shared int array.

Hint 2: Pass them to processes via args — they live in shared memory.

Hint 3: For safe concurrent access to Value, use value.get_lock().


#6Pipe — Bidirectional CommunicationMedium
Pipesendrecvduplex

Use a Pipe for bidirectional request-response communication between a parent and child process.

Solution:

from multiprocessing import Process, Pipe

def calculator(conn):
while True:
msg = conn.recv()
if msg == 'stop':
conn.close()
break
op, a, b = msg
if op == 'add':
conn.send(a + b)
elif op == 'mul':
conn.send(a * b)

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=calculator, args=(child_conn,))
p.start()

operations = [('add', 10, 5), ('mul', 4, 7), ('add', 100, 200)]
for op, a, b in operations:
parent_conn.send((op, a, b))
result = parent_conn.recv()
print(f'{op}({a}, {b}) = {result}')

parent_conn.send('stop')
p.join()
from multiprocessing import Process, Pipe

def calculator(conn):
  while True:
      msg = conn.recv()
      if msg == 'stop':
          conn.close()
          break
      op, a, b = msg
      if op == 'add':
          conn.send(a + b)
      elif op == 'mul':
          conn.send(a * b)

if __name__ == '__main__':
  # TODO: Use a Pipe to communicate with the calculator process.
  # Send three operations and receive the results.
  # Then send 'stop' to shut it down.
  pass
Expected Output
add(10, 5) = 15
mul(4, 7) = 28
add(100, 200) = 300
Hints

Hint 1: parent_conn, child_conn = Pipe() creates a duplex pipe.

Hint 2: parent_conn.send(('add', 10, 5)) sends a tuple to the child.

Hint 3: result = parent_conn.recv() blocks until the child sends a response.


#7Pool with Timeout and Error HandlingMedium
async_resultget(timeout)error handlingPool

Use apply_async with get(timeout=...) and exception handling to manage unreliable parallel tasks.

Solution:

from multiprocessing import Pool
import time

def flaky_task(n):
time.sleep(n * 0.1)
if n == 3:
raise ValueError(f"Task {n} failed!")
return n * 10

if __name__ == '__main__':
with Pool(4) as pool:
async_results = [(n, pool.apply_async(flaky_task, args=(n,))) for n in range(1, 6)]

results = []
for n, ar in async_results:
try:
results.append((n, ar.get(timeout=2)))
except ValueError as e:
results.append((n, f'ERROR: {e}'))
print(results)
from multiprocessing import Pool
import time

def flaky_task(n):
  time.sleep(n * 0.1)
  if n == 3:
      raise ValueError(f"Task {n} failed!")
  return n * 10

if __name__ == '__main__':
  # TODO: Submit 5 tasks (n=1..5) using pool.apply_async.
  # Collect results with a 2-second timeout per task.
  # Catch ValueError for failing tasks.
  # Return list of (n, result_or_error).
  pass
Expected Output
[(1, 10), (2, 20), (3, 'ERROR: Task 3 failed!'), (4, 40), (5, 50)]
Hints

Hint 1: pool.apply_async(fn, args=(n,)) returns an AsyncResult.

Hint 2: result.get(timeout=2) raises the exception from the worker if it failed.

Hint 3: Wrap result.get() in try/except ValueError to catch worker errors.


#8imap_unordered — Stream Results as ReadyMedium
imap_unorderedstreaminglazyPool

Use imap_unordered to stream results from a process pool as each task completes, not waiting for all.

Solution:

from multiprocessing import Pool
import time
import random

def slow_process(item):
delay = random.uniform(0.05, 0.2)
time.sleep(delay)
return item ** 2, delay

if __name__ == '__main__':
random.seed(99)
items = list(range(1, 9))

received = []
with Pool(4) as pool:
for result in pool.imap_unordered(slow_process, items):
square, delay = result
received.append(square)

print(f'All 8 results received: {len(received) == 8}')
from multiprocessing import Pool
import time
import random

def slow_process(item):
  delay = random.uniform(0.05, 0.2)
  time.sleep(delay)
  return item ** 2, delay

if __name__ == '__main__':
  random.seed(99)
  items = list(range(1, 9))

  # TODO: Use pool.imap_unordered to process items as they complete.
  # Print each result as it arrives (not waiting for all to finish).
  # Count results and verify all 8 were processed.
  pass
Expected Output
All 8 results received: True
Hints

Hint 1: pool.imap_unordered(fn, iterable) is a lazy iterator over results as they complete.

Hint 2: Unlike map(), results arrive out of order (whichever finishes first).

Hint 3: Iterate: for result in pool.imap_unordered(fn, items): print(result)


#9Parallel Image Processing SimulationHard
CPU-boundPoolchunkingparallel speedup

Parallelize a CPU-intensive image processing pipeline using Pool.map and verify result correctness.

Solution:

from multiprocessing import Pool

def process_images_parallel(images):
with Pool() as pool:
return pool.map(process_image, images)
from multiprocessing import Pool
import time
import random
import math

# Simulate an image processing pipeline where each "image" is a list of pixels.
# The operation is CPU-intensive (computing per-pixel transformations).

def process_pixel(val):
  # Simulate expensive per-pixel computation
  return int(math.sqrt(val * 255) * math.sin(val / 255.0 * math.pi))

def process_image(pixels):
  return [process_pixel(p) for p in pixels]

def process_images_serial(images):
  return [process_image(img) for img in images]

def process_images_parallel(images):
  # TODO: Use Pool.map to process images in parallel
  pass

if __name__ == '__main__':
  random.seed(7)
  images = [[random.randint(0, 255) for _ in range(1000)] for _ in range(8)]

  t0 = time.perf_counter()
  serial_results = process_images_serial(images)
  serial_ms = (time.perf_counter() - t0) * 1000

  t0 = time.perf_counter()
  parallel_results = process_images_parallel(images)
  parallel_ms = (time.perf_counter() - t0) * 1000

  print(f'Results match: {serial_results == parallel_results}')
  print(f'Serial: {serial_ms:.1f}ms, Parallel: {parallel_ms:.1f}ms')
Expected Output
Results match: True
Serial: X.Xms, Parallel: X.Xms
Hints

Hint 1: with Pool() as pool: return pool.map(process_image, images)

Hint 2: Pool.map preserves order — results match the serial version.

Hint 3: The speedup depends on CPU count; on multi-core machines it should be near N-core speedup.


#10Process Pool with Shared Manager DictHard
Managershared dictmultiprocessingIPC

Use multiprocessing.Manager().dict() as a shared data structure across pool worker processes.

Solution:

from multiprocessing import Pool, Manager

def analyze_text(args):
text_id, text, shared_stats = args
words = text.lower().split()
shared_stats[text_id] = {'words': len(words), 'chars': len(text)}
return text_id

if __name__ == '__main__':
texts = [
'The quick brown fox jumps over the lazy dog',
'Python is a great programming language for data science',
'Multiprocessing bypasses the GIL for CPU bound tasks',
'Shared memory and queues enable inter process communication',
]

with Manager() as manager:
shared_stats = manager.dict()
args = [(i, t, shared_stats) for i, t in enumerate(texts)]
with Pool(4) as pool:
pool.map(analyze_text, args)
for i in range(len(texts)):
s = shared_stats[i]
print(f'Text {i}: {s["words"]} words, {s["chars"]} chars')
from multiprocessing import Pool, Manager
import time

def analyze_text(args):
  text_id, text, shared_stats = args
  words = text.lower().split()
  word_count = len(words)
  char_count = len(text)
  shared_stats[text_id] = {'words': word_count, 'chars': char_count}
  return text_id

if __name__ == '__main__':
  texts = [
      'The quick brown fox jumps over the lazy dog',
      'Python is a great programming language for data science',
      'Multiprocessing bypasses the GIL for CPU bound tasks',
      'Shared memory and queues enable inter process communication',
  ]
  # TODO: Use Manager().dict() for shared_stats.
  # Run analyze_text in a Pool.map call.
  # Print the word and char counts for each text.
  pass
Expected Output
Text 0: 9 words, 43 chars
Text 1: 9 words, 54 chars
Text 2: 8 words, 52 chars
Text 3: 8 words, 57 chars
Hints

Hint 1: manager = Manager() then shared_stats = manager.dict() creates a proxy dict.

Hint 2: Pass shared_stats in the args tuple: [(i, t, shared_stats) for i, t in enumerate(texts)]

Hint 3: Manager dict is process-safe but slower than Value/Array — use for complex shared state.


#11Process Supervisor — Restart on FailureHard
supervisorexitcoderestartprocess monitoring

Build a process supervisor that automatically restarts a crashing worker process up to a maximum retry count.

Solution:

from multiprocessing import Process
import time

class Supervisor:
def __init__(self, target, args, max_restarts=5):
self.target = target
self.args = args
self.max_restarts = max_restarts

def run(self):
restarts = 0
while restarts <= self.max_restarts:
p = Process(target=self.target, args=self.args)
p.start()
p.join()
if p.exitcode == 0:
return # Success
restarts += 1
if restarts <= self.max_restarts:
time.sleep(0.01) # Brief backoff before restart
print(f'Max restarts ({self.max_restarts}) exceeded')
from multiprocessing import Process
import time
import random

def unreliable_worker(worker_id, max_rounds):
  random.seed(worker_id)
  for i in range(max_rounds):
      time.sleep(0.05)
      if random.random() < 0.3:  # 30% chance of crash
          raise RuntimeError(f'Worker {worker_id} crashed at round {i}')
  print(f'Worker {worker_id} completed all rounds')

# TODO: Implement a Supervisor that:
# - Starts a worker process
# - Monitors it (join with a timeout)
# - If it crashes (non-zero exitcode), restarts it
# - Stops after the worker completes successfully OR after max_restarts

class Supervisor:
  def __init__(self, target, args, max_restarts=5):
      pass

  def run(self):
      pass

if __name__ == '__main__':
  sup = Supervisor(target=unreliable_worker, args=(42, 5), max_restarts=10)
  sup.run()
Expected Output
Worker 42 completed all rounds
Hints

Hint 1: p.exitcode == 0 means clean exit; exitcode != 0 means crash or non-zero exit.

Hint 2: p.join() after p.start() blocks until the process ends.

Hint 3: Loop: while restarts < max_restarts: start, join, check exitcode.

© 2026 EngineersOfAI. All rights reserved.