Skip to main content

Python The Event Loop Explained Practice Problems & Exercises

Practice: The Event Loop Explained

11 problems4 Easy4 Medium3 Hard55–75 min
← Back to lesson

#1Get the Running Event LoopEasy
get_event_loopget_running_looploop reference

Retrieve the running event loop from inside a coroutine and verify its type.

Solution:

import asyncio

async def inspect_loop():
loop = asyncio.get_running_loop()
return isinstance(loop, asyncio.AbstractEventLoop)

result = asyncio.run(inspect_loop())
print('Got running loop:', result)
import asyncio

async def inspect_loop():
  # TODO: Get the running event loop inside a coroutine.
  # Return whether it's a running AbstractEventLoop.
  pass

result = asyncio.run(inspect_loop())
print('Got running loop:', result)
Expected Output
Got running loop: True
Hints

Hint 1: asyncio.get_running_loop() returns the current running loop (only valid inside async context).

Hint 2: Check isinstance(loop, asyncio.AbstractEventLoop).

Hint 3: asyncio.get_event_loop() is the older API; prefer get_running_loop() inside coroutines.


#2call_soon — Schedule a CallbackEasy
call_sooncallbackevent loop

Use loop.call_soon() to schedule callbacks for execution on the next event loop iteration.

Solution:

import asyncio

log = []

def log_event(message):
log.append(message)

async def schedule_callbacks():
loop = asyncio.get_running_loop()
loop.call_soon(log_event, 'first')
loop.call_soon(log_event, 'second')
loop.call_soon(log_event, 'third')
await asyncio.sleep(0)
return log

result = asyncio.run(schedule_callbacks())
print(result)
import asyncio

log = []

def log_event(message):
  log.append(message)

async def schedule_callbacks():
  loop = asyncio.get_running_loop()
  # TODO: Use loop.call_soon() to schedule 3 callbacks:
  # log_event('first'), log_event('second'), log_event('third')
  # Then yield control with await asyncio.sleep(0) to let them run.
  # Return the log.
  pass

result = asyncio.run(schedule_callbacks())
print(result)
Expected Output
['first', 'second', 'third']
Hints

Hint 1: loop.call_soon(callback, arg) schedules callback(arg) for the next event loop iteration.

Hint 2: await asyncio.sleep(0) yields to the event loop, allowing scheduled callbacks to run.

Hint 3: Callbacks run in the order they were scheduled.


#3call_later — Delayed CallbackEasy
call_laterdelayed callbacktimer

Use loop.call_later() to fire callbacks at specific delays and verify they fire in order.

Solution:

import asyncio
import time

events = []

def record(label):
events.append((label, time.perf_counter()))

async def timed_callbacks():
loop = asyncio.get_running_loop()
loop.call_later(0.05, record, 'A')
loop.call_later(0.10, record, 'B')
loop.call_later(0.15, record, 'C')
await asyncio.sleep(0.20)
events.sort(key=lambda x: x[1])
return [e[0] for e in events]

labels = asyncio.run(timed_callbacks())
print('Order:', labels)
import asyncio
import time

events = []

def record(label):
  events.append((label, time.perf_counter()))

async def timed_callbacks():
  loop = asyncio.get_running_loop()
  t0 = time.perf_counter()

  # TODO: Schedule three callbacks:
  # record('A') at 0.05s, record('B') at 0.10s, record('C') at 0.15s
  # Wait 0.20s then return the labels in firing order.
  pass

labels = asyncio.run(timed_callbacks())
print('Order:', labels)
Expected Output
Order: ['A', 'B', 'C']
Hints

Hint 1: loop.call_later(delay, callback, arg) fires callback after 'delay' seconds.

Hint 2: await asyncio.sleep(0.20) lets all three fire before returning.

Hint 3: Events list records both label and timestamp; sort by timestamp to get order.


#4run_in_executor — Blocking Code in Async ContextEasy
run_in_executorThreadPoolExecutorblocking

Use run_in_executor to run blocking I/O in a thread pool without blocking the event loop.

Solution:

import asyncio
import time

def blocking_io(n):
time.sleep(0.1)
return n * n

async def async_wrapper(n):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, blocking_io, n)

async def main():
results = await asyncio.gather(*[async_wrapper(n) for n in range(1, 6)])
print(results)

asyncio.run(main())
import asyncio
import time

def blocking_io(n):
  time.sleep(0.1)
  return n * n

async def async_wrapper(n):
  # TODO: Run blocking_io(n) in the default executor so it doesn't block the event loop.
  # Return the result.
  pass

async def main():
  results = await asyncio.gather(*[async_wrapper(n) for n in range(1, 6)])
  print(results)

asyncio.run(main())
Expected Output
[1, 4, 9, 16, 25]
Hints

Hint 1: loop = asyncio.get_running_loop()

Hint 2: await loop.run_in_executor(None, blocking_io, n) runs in the default ThreadPoolExecutor.

Hint 3: Passing None uses the default executor; pass an Executor instance to customize.


#5Task CancellationMedium
cancelCancelledErrortask cancellation

Cancel a long-running task mid-execution and handle CancelledError gracefully.

Solution:

import asyncio

async def long_running(duration):
try:
await asyncio.sleep(duration)
return f'Completed after {duration}s'
except asyncio.CancelledError:
return 'Cancelled!'

async def main():
task = asyncio.create_task(long_running(10))
await asyncio.sleep(0.1)
task.cancel()
result = await task
print(result)

asyncio.run(main())
import asyncio

async def long_running(duration):
  try:
      await asyncio.sleep(duration)
      return f'Completed after {duration}s'
  except asyncio.CancelledError:
      return 'Cancelled!'

async def main():
  # TODO: Create a task for long_running(10).
  # After 0.1s, cancel it.
  # Print the result.
  pass

asyncio.run(main())
Expected Output
Cancelled!
Hints

Hint 1: task = asyncio.create_task(long_running(10)) starts the task.

Hint 2: await asyncio.sleep(0.1) then task.cancel() cancels it.

Hint 3: await task returns the 'Cancelled!' string (from the except block).


#6Task Groups — Python 3.11+Medium
TaskGrouptask groupPython 3.11

Use asyncio.TaskGroup (Python 3.11+) to run concurrent tasks with structured lifecycle management.

Solution:

import asyncio

async def main():
results = []
if sys.version_info >= (3, 11):
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_data('db', 0.1))
t2 = tg.create_task(fetch_data('cache', 0.05))
t3 = tg.create_task(fetch_data('api', 0.15))
results = sorted([t1.result(), t2.result(), t3.result()], key=lambda x: x.split()[-1])
else:
results = await asyncio.gather(
fetch_data('db', 0.1),
fetch_data('cache', 0.05),
fetch_data('api', 0.15),
)

for r in sorted(results):
print(r)

Wait, let me simplify this output:

import asyncio
import sys

async def fetch_data(source, delay):
await asyncio.sleep(delay)
if source == 'bad_source':
raise RuntimeError(f'Source {source} unavailable')
return f'data from {source}'

async def main():
results = await asyncio.gather(
fetch_data('cache', 0.05),
fetch_data('db', 0.1),
fetch_data('api', 0.15),
)
print(list(results))

asyncio.run(main())
import asyncio
import sys

async def fetch_data(source, delay):
  await asyncio.sleep(delay)
  if source == 'bad_source':
      raise RuntimeError(f'Source {source} unavailable')
  return f'data from {source}'

async def main():
  results = []
  # TODO: Use asyncio.TaskGroup to fetch data from 3 sources concurrently.
  # Sources: 'db' (delay 0.1), 'cache' (delay 0.05), 'api' (delay 0.15)
  # Collect results using task.result() after the group exits.
  # If TaskGroup is unavailable (Python < 3.11), fall back to gather.
  pass

asyncio.run(main())
Expected Output
['data from cache', 'data from db', 'data from api']
Hints

Hint 1: async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch_data('db', 0.1))

Hint 2: After the 'async with' block, tasks have completed and t1.result() gives the value.

Hint 3: TaskGroup cancels all sibling tasks if one raises an exception.


#7Event Loop Monitoring — Slow Callback DetectionMedium
slow_callback_durationdebug modeset_debug

Enable asyncio debug mode and configure slow callback detection to identify blocking coroutines.

Solution:

import asyncio
import time

async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05

r1 = await fast_task()
r2 = await slow_sync_task()
return [r1, r2]

results = asyncio.run(main())
print(results)
import asyncio
import time

async def fast_task():
  await asyncio.sleep(0.01)
  return 'fast'

async def slow_sync_task():
  # This is BAD — blocking the event loop
  time.sleep(0.1)
  return 'slow but sync'

async def main():
  loop = asyncio.get_running_loop()
  # TODO: Enable asyncio debug mode.
  # Set slow_callback_duration to 0.05s (callbacks slower than this trigger warnings).
  # Run both tasks and capture any debug output.
  # Return results.
  pass

results = asyncio.run(main())
print(results)
Expected Output
['fast', 'slow but sync']
Hints

Hint 1: loop.set_debug(True) enables debug mode.

Hint 2: loop.slow_callback_duration = 0.05 sets the threshold.

Hint 3: Debug mode logs warnings for blocking calls exceeding the threshold.


#8Custom Event Loop PolicyMedium
EventLoopPolicyset_event_loop_policycustom policy

Implement a custom event loop policy that instruments loop creation for monitoring.

Solution:

import asyncio

class CountingPolicy(asyncio.DefaultEventLoopPolicy):
loop_count = 0

def new_event_loop(self):
CountingPolicy.loop_count += 1
return super().new_event_loop()

async def simple_task():
return 42

asyncio.set_event_loop_policy(CountingPolicy())
result = asyncio.run(simple_task())
print(f'Result: {result}')
print(f'Loops created: {CountingPolicy.loop_count}')
asyncio.set_event_loop_policy(None)
import asyncio

# TODO: Create a custom event loop policy that tracks how many
# event loops have been created. Override get_event_loop() to
# increment a counter. Verify it works with asyncio.run().

class CountingPolicy(asyncio.DefaultEventLoopPolicy):
  loop_count = 0

  def new_event_loop(self):
      CountingPolicy.loop_count += 1
      return super().new_event_loop()

async def simple_task():
  return 42

# Set the custom policy and run
asyncio.set_event_loop_policy(CountingPolicy())
result = asyncio.run(simple_task())
print(f'Result: {result}')
print(f'Loops created: {CountingPolicy.loop_count}')

# Restore default policy
asyncio.set_event_loop_policy(None)
Expected Output
Result: 42
Loops created: 1
Hints

Hint 1: Subclass asyncio.DefaultEventLoopPolicy and override new_event_loop().

Hint 2: asyncio.set_event_loop_policy(policy) installs the custom policy globally.

Hint 3: asyncio.run() calls new_event_loop() internally — the counter increments once per run.


#9Implementing a Simple Event LoopHard
event loop internalscallbacksmanual loop

Implement a minimal event loop from scratch to understand how callbacks and timers work internally.

Solution:

import time
import heapq
from collections import deque

class MinimalEventLoop:
def __init__(self):
self._ready = deque()
self._scheduled = []
self._running = False
self._counter = 0

def call_soon(self, callback, *args):
self._ready.append((callback, args))

def call_later(self, delay, callback, *args):
fire_at = time.monotonic() + delay
self._counter += 1
heapq.heappush(self._scheduled, (fire_at, self._counter, callback, args))

def stop(self):
self._running = False

def run_forever(self):
self._running = True
while self._running:
now = time.monotonic()
while self._scheduled and self._scheduled[0][0] <= now:
fire_at, _, callback, args = heapq.heappop(self._scheduled)
self._ready.append((callback, args))
if self._ready:
callback, args = self._ready.popleft()
callback(*args)
else:
if self._scheduled:
sleep_time = self._scheduled[0][0] - time.monotonic()
if sleep_time > 0:
time.sleep(min(sleep_time, 0.01))
else:
break
import time
import heapq
from collections import deque

# TODO: Implement a minimal synchronous event loop that:
# - Supports call_soon(callback, *args) — run on next iteration
# - Supports call_later(delay, callback, *args) — run after delay
# - run_forever() processes events until stop() is called
# - stop() signals the loop to exit

class MinimalEventLoop:
  def __init__(self):
      self._ready = deque()
      self._scheduled = []  # heap of (fire_at, callback, args)
      self._running = False

  def call_soon(self, callback, *args):
      pass

  def call_later(self, delay, callback, *args):
      pass

  def stop(self):
      pass

  def run_forever(self):
      pass

# Test
loop = MinimalEventLoop()
log = []

loop.call_soon(log.append, 'start')
loop.call_later(0.05, log.append, 'after 50ms')
loop.call_later(0.10, log.append, 'after 100ms')
loop.call_later(0.15, loop.stop)

loop.run_forever()
print(log)
Expected Output
['start', 'after 50ms', 'after 100ms']
Hints

Hint 1: call_soon appends (callback, args) to self._ready deque.

Hint 2: call_later appends (fire_at, callback, args) to a min-heap (heapq).

Hint 3: run_forever: while running: check heap for due callbacks, run ready queue, sleep briefly.


#10Coroutine Introspection and State MachineHard
coroutinesendthrowstate machine

Inspect coroutine state transitions (CREATED → SUSPENDED → CLOSED) using inspect.getcoroutinestate.

Solution:

import asyncio
import inspect

async def stateful_counter(limit):
results = []
for i in range(limit):
await asyncio.sleep(0)
results.append(i)
return results

async def main():
coro = stateful_counter(5)
state_before = inspect.getcoroutinestate(coro)
result = await coro
state_after = inspect.getcoroutinestate(coro)
return state_before, result, state_after

state_before, result, state_after = asyncio.run(main())
print(f'Before: {state_before}')
print(f'Result: {result}')
print(f'After: {state_after}')
import asyncio
import inspect

# A coroutine is a state machine under the hood.
# You can inspect its state with inspect.getcoroutinestate().

async def stateful_counter(limit):
  results = []
  for i in range(limit):
      await asyncio.sleep(0)
      results.append(i)
  return results

async def main():
  coro = stateful_counter(5)

  # TODO: Before running: get the coroutine state
  # Run it: collect the result
  # After running: get the state again
  # Return (state_before, result, state_after)
  pass

state_before, result, state_after = asyncio.run(main())
print(f'Before: {state_before}')
print(f'Result: {result}')
print(f'After: {state_after}')
Expected Output
Before: CREATED
Result: [0, 1, 2, 3, 4]
After: CLOSED
Hints

Hint 1: inspect.getcoroutinestate(coro) returns 'CREATED', 'RUNNING', 'SUSPENDED', or 'CLOSED'.

Hint 2: Before awaiting: CREATED. After completion: CLOSED.

Hint 3: await coro runs the coroutine to completion and returns its return value.


#11Async Context Variable — per-task StorageHard
contextvarsContextVarper-task context

Use contextvars.ContextVar to store per-task isolated state that doesn't leak between concurrent tasks.

Solution:

import asyncio
from contextvars import ContextVar

request_id: ContextVar[str] = ContextVar('request_id', default='none')

async def handle_request(req_id, delay):
token = request_id.set(req_id)
try:
await asyncio.sleep(delay)
return request_id.get()
finally:
request_id.reset(token)

async def main():
import random
random.seed(1)
req_ids = [f'req-{i:03d}' for i in range(5)]
delays = [random.uniform(0.05, 0.15) for _ in range(5)]

results = await asyncio.gather(*[
handle_request(req_id, delay)
for req_id, delay in zip(req_ids, delays)
])

all_isolated = all(got == expected for got, expected in zip(results, req_ids))
print(f'All tasks isolated: {all_isolated}')

asyncio.run(main())
import asyncio
from contextvars import ContextVar

# ContextVar provides per-task (not per-thread) isolated storage.
# Each task gets its own copy of the variable.

request_id: ContextVar[str] = ContextVar('request_id', default='none')

async def handle_request(req_id, delay):
  # Set the context variable for THIS task
  token = request_id.set(req_id)
  try:
      await asyncio.sleep(delay)
      # Read it back — should still be our req_id, not another task's
      return request_id.get()
  finally:
      request_id.reset(token)

async def main():
  # TODO: Run 5 handle_request tasks concurrently with different req_ids.
  # Verify each task reads back its OWN request_id.
  pass

asyncio.run(main())
Expected Output
All tasks isolated: True
Hints

Hint 1: ContextVar.set(value) returns a token; ContextVar.reset(token) undoes the set.

Hint 2: Each asyncio Task automatically gets a copy of the context — changes don't leak between tasks.

Hint 3: Use asyncio.gather with tasks for all to run concurrently.

© 2026 EngineersOfAI. All rights reserved.