Python The Event Loop Explained Practice Problems & Exercises
Practice: The Event Loop Explained
← Back to lessonRetrieve the running event loop from inside a coroutine and verify its type.
Solution:
import asyncio
async def inspect_loop():
loop = asyncio.get_running_loop()
return isinstance(loop, asyncio.AbstractEventLoop)
result = asyncio.run(inspect_loop())
print('Got running loop:', result)
import asyncio
async def inspect_loop():
# TODO: Get the running event loop inside a coroutine.
# Return whether it's a running AbstractEventLoop.
pass
result = asyncio.run(inspect_loop())
print('Got running loop:', result)
Expected Output
Got running loop: TrueHints
Hint 1: asyncio.get_running_loop() returns the current running loop (only valid inside async context).
Hint 2: Check isinstance(loop, asyncio.AbstractEventLoop).
Hint 3: asyncio.get_event_loop() is the older API; prefer get_running_loop() inside coroutines.
Use loop.call_soon() to schedule callbacks for execution on the next event loop iteration.
Solution:
import asyncio
log = []
def log_event(message):
log.append(message)
async def schedule_callbacks():
loop = asyncio.get_running_loop()
loop.call_soon(log_event, 'first')
loop.call_soon(log_event, 'second')
loop.call_soon(log_event, 'third')
await asyncio.sleep(0)
return log
result = asyncio.run(schedule_callbacks())
print(result)
import asyncio
log = []
def log_event(message):
log.append(message)
async def schedule_callbacks():
loop = asyncio.get_running_loop()
# TODO: Use loop.call_soon() to schedule 3 callbacks:
# log_event('first'), log_event('second'), log_event('third')
# Then yield control with await asyncio.sleep(0) to let them run.
# Return the log.
pass
result = asyncio.run(schedule_callbacks())
print(result)
Expected Output
['first', 'second', 'third']Hints
Hint 1: loop.call_soon(callback, arg) schedules callback(arg) for the next event loop iteration.
Hint 2: await asyncio.sleep(0) yields to the event loop, allowing scheduled callbacks to run.
Hint 3: Callbacks run in the order they were scheduled.
Use loop.call_later() to fire callbacks at specific delays and verify they fire in order.
Solution:
import asyncio
import time
events = []
def record(label):
events.append((label, time.perf_counter()))
async def timed_callbacks():
loop = asyncio.get_running_loop()
loop.call_later(0.05, record, 'A')
loop.call_later(0.10, record, 'B')
loop.call_later(0.15, record, 'C')
await asyncio.sleep(0.20)
events.sort(key=lambda x: x[1])
return [e[0] for e in events]
labels = asyncio.run(timed_callbacks())
print('Order:', labels)
import asyncio
import time
events = []
def record(label):
events.append((label, time.perf_counter()))
async def timed_callbacks():
loop = asyncio.get_running_loop()
t0 = time.perf_counter()
# TODO: Schedule three callbacks:
# record('A') at 0.05s, record('B') at 0.10s, record('C') at 0.15s
# Wait 0.20s then return the labels in firing order.
pass
labels = asyncio.run(timed_callbacks())
print('Order:', labels)
Expected Output
Order: ['A', 'B', 'C']Hints
Hint 1: loop.call_later(delay, callback, arg) fires callback after 'delay' seconds.
Hint 2: await asyncio.sleep(0.20) lets all three fire before returning.
Hint 3: Events list records both label and timestamp; sort by timestamp to get order.
Use run_in_executor to run blocking I/O in a thread pool without blocking the event loop.
Solution:
import asyncio
import time
def blocking_io(n):
time.sleep(0.1)
return n * n
async def async_wrapper(n):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, blocking_io, n)
async def main():
results = await asyncio.gather(*[async_wrapper(n) for n in range(1, 6)])
print(results)
asyncio.run(main())
import asyncio
import time
def blocking_io(n):
time.sleep(0.1)
return n * n
async def async_wrapper(n):
# TODO: Run blocking_io(n) in the default executor so it doesn't block the event loop.
# Return the result.
pass
async def main():
results = await asyncio.gather(*[async_wrapper(n) for n in range(1, 6)])
print(results)
asyncio.run(main())
Expected Output
[1, 4, 9, 16, 25]Hints
Hint 1: loop = asyncio.get_running_loop()
Hint 2: await loop.run_in_executor(None, blocking_io, n) runs in the default ThreadPoolExecutor.
Hint 3: Passing None uses the default executor; pass an Executor instance to customize.
Cancel a long-running task mid-execution and handle CancelledError gracefully.
Solution:
import asyncio
async def long_running(duration):
try:
await asyncio.sleep(duration)
return f'Completed after {duration}s'
except asyncio.CancelledError:
return 'Cancelled!'
async def main():
task = asyncio.create_task(long_running(10))
await asyncio.sleep(0.1)
task.cancel()
result = await task
print(result)
asyncio.run(main())
import asyncio
async def long_running(duration):
try:
await asyncio.sleep(duration)
return f'Completed after {duration}s'
except asyncio.CancelledError:
return 'Cancelled!'
async def main():
# TODO: Create a task for long_running(10).
# After 0.1s, cancel it.
# Print the result.
pass
asyncio.run(main())
Expected Output
Cancelled!Hints
Hint 1: task = asyncio.create_task(long_running(10)) starts the task.
Hint 2: await asyncio.sleep(0.1) then task.cancel() cancels it.
Hint 3: await task returns the 'Cancelled!' string (from the except block).
Use asyncio.TaskGroup (Python 3.11+) to run concurrent tasks with structured lifecycle management.
Solution:
import asyncio
async def main():
results = []
if sys.version_info >= (3, 11):
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_data('db', 0.1))
t2 = tg.create_task(fetch_data('cache', 0.05))
t3 = tg.create_task(fetch_data('api', 0.15))
results = sorted([t1.result(), t2.result(), t3.result()], key=lambda x: x.split()[-1])
else:
results = await asyncio.gather(
fetch_data('db', 0.1),
fetch_data('cache', 0.05),
fetch_data('api', 0.15),
)
for r in sorted(results):
print(r)
Wait, let me simplify this output:
import asyncio
import sys
async def fetch_data(source, delay):
await asyncio.sleep(delay)
if source == 'bad_source':
raise RuntimeError(f'Source {source} unavailable')
return f'data from {source}'
async def main():
results = await asyncio.gather(
fetch_data('cache', 0.05),
fetch_data('db', 0.1),
fetch_data('api', 0.15),
)
print(list(results))
asyncio.run(main())
import asyncio
import sys
async def fetch_data(source, delay):
await asyncio.sleep(delay)
if source == 'bad_source':
raise RuntimeError(f'Source {source} unavailable')
return f'data from {source}'
async def main():
results = []
# TODO: Use asyncio.TaskGroup to fetch data from 3 sources concurrently.
# Sources: 'db' (delay 0.1), 'cache' (delay 0.05), 'api' (delay 0.15)
# Collect results using task.result() after the group exits.
# If TaskGroup is unavailable (Python < 3.11), fall back to gather.
pass
asyncio.run(main())
Expected Output
['data from cache', 'data from db', 'data from api']Hints
Hint 1: async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch_data('db', 0.1))
Hint 2: After the 'async with' block, tasks have completed and t1.result() gives the value.
Hint 3: TaskGroup cancels all sibling tasks if one raises an exception.
Enable asyncio debug mode and configure slow callback detection to identify blocking coroutines.
Solution:
import asyncio
import time
async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05
r1 = await fast_task()
r2 = await slow_sync_task()
return [r1, r2]
results = asyncio.run(main())
print(results)
import asyncio
import time
async def fast_task():
await asyncio.sleep(0.01)
return 'fast'
async def slow_sync_task():
# This is BAD — blocking the event loop
time.sleep(0.1)
return 'slow but sync'
async def main():
loop = asyncio.get_running_loop()
# TODO: Enable asyncio debug mode.
# Set slow_callback_duration to 0.05s (callbacks slower than this trigger warnings).
# Run both tasks and capture any debug output.
# Return results.
pass
results = asyncio.run(main())
print(results)
Expected Output
['fast', 'slow but sync']Hints
Hint 1: loop.set_debug(True) enables debug mode.
Hint 2: loop.slow_callback_duration = 0.05 sets the threshold.
Hint 3: Debug mode logs warnings for blocking calls exceeding the threshold.
Implement a custom event loop policy that instruments loop creation for monitoring.
Solution:
import asyncio
class CountingPolicy(asyncio.DefaultEventLoopPolicy):
loop_count = 0
def new_event_loop(self):
CountingPolicy.loop_count += 1
return super().new_event_loop()
async def simple_task():
return 42
asyncio.set_event_loop_policy(CountingPolicy())
result = asyncio.run(simple_task())
print(f'Result: {result}')
print(f'Loops created: {CountingPolicy.loop_count}')
asyncio.set_event_loop_policy(None)
import asyncio
# TODO: Create a custom event loop policy that tracks how many
# event loops have been created. Override get_event_loop() to
# increment a counter. Verify it works with asyncio.run().
class CountingPolicy(asyncio.DefaultEventLoopPolicy):
loop_count = 0
def new_event_loop(self):
CountingPolicy.loop_count += 1
return super().new_event_loop()
async def simple_task():
return 42
# Set the custom policy and run
asyncio.set_event_loop_policy(CountingPolicy())
result = asyncio.run(simple_task())
print(f'Result: {result}')
print(f'Loops created: {CountingPolicy.loop_count}')
# Restore default policy
asyncio.set_event_loop_policy(None)
Expected Output
Result: 42
Loops created: 1Hints
Hint 1: Subclass asyncio.DefaultEventLoopPolicy and override new_event_loop().
Hint 2: asyncio.set_event_loop_policy(policy) installs the custom policy globally.
Hint 3: asyncio.run() calls new_event_loop() internally — the counter increments once per run.
Implement a minimal event loop from scratch to understand how callbacks and timers work internally.
Solution:
import time
import heapq
from collections import deque
class MinimalEventLoop:
def __init__(self):
self._ready = deque()
self._scheduled = []
self._running = False
self._counter = 0
def call_soon(self, callback, *args):
self._ready.append((callback, args))
def call_later(self, delay, callback, *args):
fire_at = time.monotonic() + delay
self._counter += 1
heapq.heappush(self._scheduled, (fire_at, self._counter, callback, args))
def stop(self):
self._running = False
def run_forever(self):
self._running = True
while self._running:
now = time.monotonic()
while self._scheduled and self._scheduled[0][0] <= now:
fire_at, _, callback, args = heapq.heappop(self._scheduled)
self._ready.append((callback, args))
if self._ready:
callback, args = self._ready.popleft()
callback(*args)
else:
if self._scheduled:
sleep_time = self._scheduled[0][0] - time.monotonic()
if sleep_time > 0:
time.sleep(min(sleep_time, 0.01))
else:
break
import time
import heapq
from collections import deque
# TODO: Implement a minimal synchronous event loop that:
# - Supports call_soon(callback, *args) — run on next iteration
# - Supports call_later(delay, callback, *args) — run after delay
# - run_forever() processes events until stop() is called
# - stop() signals the loop to exit
class MinimalEventLoop:
def __init__(self):
self._ready = deque()
self._scheduled = [] # heap of (fire_at, callback, args)
self._running = False
def call_soon(self, callback, *args):
pass
def call_later(self, delay, callback, *args):
pass
def stop(self):
pass
def run_forever(self):
pass
# Test
loop = MinimalEventLoop()
log = []
loop.call_soon(log.append, 'start')
loop.call_later(0.05, log.append, 'after 50ms')
loop.call_later(0.10, log.append, 'after 100ms')
loop.call_later(0.15, loop.stop)
loop.run_forever()
print(log)
Expected Output
['start', 'after 50ms', 'after 100ms']Hints
Hint 1: call_soon appends (callback, args) to self._ready deque.
Hint 2: call_later appends (fire_at, callback, args) to a min-heap (heapq).
Hint 3: run_forever: while running: check heap for due callbacks, run ready queue, sleep briefly.
Inspect coroutine state transitions (CREATED → SUSPENDED → CLOSED) using inspect.getcoroutinestate.
Solution:
import asyncio
import inspect
async def stateful_counter(limit):
results = []
for i in range(limit):
await asyncio.sleep(0)
results.append(i)
return results
async def main():
coro = stateful_counter(5)
state_before = inspect.getcoroutinestate(coro)
result = await coro
state_after = inspect.getcoroutinestate(coro)
return state_before, result, state_after
state_before, result, state_after = asyncio.run(main())
print(f'Before: {state_before}')
print(f'Result: {result}')
print(f'After: {state_after}')
import asyncio
import inspect
# A coroutine is a state machine under the hood.
# You can inspect its state with inspect.getcoroutinestate().
async def stateful_counter(limit):
results = []
for i in range(limit):
await asyncio.sleep(0)
results.append(i)
return results
async def main():
coro = stateful_counter(5)
# TODO: Before running: get the coroutine state
# Run it: collect the result
# After running: get the state again
# Return (state_before, result, state_after)
pass
state_before, result, state_after = asyncio.run(main())
print(f'Before: {state_before}')
print(f'Result: {result}')
print(f'After: {state_after}')
Expected Output
Before: CREATED
Result: [0, 1, 2, 3, 4]
After: CLOSEDHints
Hint 1: inspect.getcoroutinestate(coro) returns 'CREATED', 'RUNNING', 'SUSPENDED', or 'CLOSED'.
Hint 2: Before awaiting: CREATED. After completion: CLOSED.
Hint 3: await coro runs the coroutine to completion and returns its return value.
Use contextvars.ContextVar to store per-task isolated state that doesn't leak between concurrent tasks.
Solution:
import asyncio
from contextvars import ContextVar
request_id: ContextVar[str] = ContextVar('request_id', default='none')
async def handle_request(req_id, delay):
token = request_id.set(req_id)
try:
await asyncio.sleep(delay)
return request_id.get()
finally:
request_id.reset(token)
async def main():
import random
random.seed(1)
req_ids = [f'req-{i:03d}' for i in range(5)]
delays = [random.uniform(0.05, 0.15) for _ in range(5)]
results = await asyncio.gather(*[
handle_request(req_id, delay)
for req_id, delay in zip(req_ids, delays)
])
all_isolated = all(got == expected for got, expected in zip(results, req_ids))
print(f'All tasks isolated: {all_isolated}')
asyncio.run(main())
import asyncio
from contextvars import ContextVar
# ContextVar provides per-task (not per-thread) isolated storage.
# Each task gets its own copy of the variable.
request_id: ContextVar[str] = ContextVar('request_id', default='none')
async def handle_request(req_id, delay):
# Set the context variable for THIS task
token = request_id.set(req_id)
try:
await asyncio.sleep(delay)
# Read it back — should still be our req_id, not another task's
return request_id.get()
finally:
request_id.reset(token)
async def main():
# TODO: Run 5 handle_request tasks concurrently with different req_ids.
# Verify each task reads back its OWN request_id.
pass
asyncio.run(main())
Expected Output
All tasks isolated: TrueHints
Hint 1: ContextVar.set(value) returns a token; ContextVar.reset(token) undoes the set.
Hint 2: Each asyncio Task automatically gets a copy of the context — changes don't leak between tasks.
Hint 3: Use asyncio.gather with tasks for all to run concurrently.
