Skip to main content

Python Advanced Event Loop Practice Problems & Exercises

Practice: Advanced Event Loop

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1call_soon and call_laterEasy
call_sooncall_laterevent-loopcallbacks

Use the event loop's call_soon and call_later scheduling primitives directly.

import asyncio
import time

async def main():
loop = asyncio.get_running_loop()

def soon_callback():
print("call_soon fired")

def later_callback():
print("call_later fired after ~0.01s")

loop.call_soon(soon_callback)
loop.call_later(0.01, later_callback)

await asyncio.sleep(0.05) # give callbacks time to fire

asyncio.run(main())
Solution
import asyncio

async def main():
loop = asyncio.get_running_loop()

def soon_callback():
print("call_soon fired")

def later_callback():
print("call_later fired after ~0.01s")

loop.call_soon(soon_callback)
loop.call_later(0.01, later_callback)

await asyncio.sleep(0.05)

asyncio.run(main())

call_soon vs call_later:

  • call_soon(fn) adds fn to the ready queue. It runs on the next event loop iteration (before I/O polling).
  • call_later(delay, fn) adds fn to the scheduled heap with a timestamp. It runs after delay seconds.
  • Both are synchronous scheduling calls — they return immediately.
  • They accept *args: call_soon(fn, arg1, arg2) calls fn(arg1, arg2).
  • Use call_soon for work that should happen "right after the current step". Use call_later for timers.
  • loop.call_at(when, fn) is similar to call_later but takes an absolute loop.time() timestamp.
Expected Output
call_soon fired\ncall_later fired after ~0.01s
Hints

Hint 1: loop.call_soon(fn) schedules fn for the next event loop iteration.

Hint 2: loop.call_later(delay, fn) schedules fn after delay seconds.

#2run_in_executor for Blocking CodeEasy
run_in_executorThreadPoolExecutorblocking-ioevent-loop

Run a blocking function in a thread pool without blocking the event loop using run_in_executor.

import asyncio
import time

def blocking_io():
time.sleep(0.05) # simulates blocking disk/network I/O
return 42

async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print(f"Blocking result: {result}")
print("Async code continues")

asyncio.run(main())
Solution
import asyncio
import time

def blocking_io():
time.sleep(0.05)
return 42

async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print(f"Blocking result: {result}")
print("Async code continues")

asyncio.run(main())

run_in_executor mechanics:

  • None executor uses the default ThreadPoolExecutor (size = min(32, cpu_count + 4) in Python 3.8+).
  • The blocking function runs in a worker thread; the event loop continues processing other callbacks.
  • Returns a Future that resolves when the thread completes.
  • For CPU-bound work, use ProcessPoolExecutor to avoid GIL contention.
  • asyncio.to_thread(fn, *args) (Python 3.9+) is a higher-level wrapper: await asyncio.to_thread(blocking_io).
  • Do NOT call blocking code (file I/O, time.sleep, CPU-intensive loops) directly in async functions — it stalls the entire event loop.
Expected Output
Blocking result: 42\nAsync code continues
Hints

Hint 1: loop.run_in_executor(executor, fn, *args) runs fn in a thread pool without blocking the event loop.

Hint 2: Pass None as executor to use the default thread pool.

#3loop.time() and Scheduling PrecisionEasy
loop.timecall_latermonotonic-clockscheduling

Measure the actual delay of call_later using loop.time() to observe event loop scheduling precision.

import asyncio

async def main():
loop = asyncio.get_running_loop()
start = loop.time()
fired_at = [None]

def record_time():
fired_at[0] = loop.time()

loop.call_later(0.02, record_time)
await asyncio.sleep(0.05)

elapsed = fired_at[0] - start
print(f"Elapsed: approximately {elapsed:.2f}s")

asyncio.run(main())
Solution
import asyncio

async def main():
loop = asyncio.get_running_loop()
start = loop.time()
fired_at = [None]

def record_time():
fired_at[0] = loop.time()

loop.call_later(0.02, record_time)
await asyncio.sleep(0.05)

elapsed = fired_at[0] - start
print(f"Elapsed: approximately {elapsed:.2f}s")

asyncio.run(main())

Scheduling precision:

  • loop.time() uses time.monotonic() internally — it is the event loop's reference clock.
  • call_later precision is limited by: the sleep resolution of the OS (time.sleep granularity ~1ms on Linux, ~15ms on Windows), event loop overhead, and GIL contention.
  • In practice, expect ~1-5ms jitter on a lightly loaded system, up to ~20ms under load.
  • For tight scheduling requirements, uvloop (a drop-in replacement built on libuv) provides significantly better timer precision.
  • call_at(when, fn) takes an absolute loop.time() value — useful for scheduling relative to known reference points.
Expected Output
Elapsed: approximately 0.02s
Hints

Hint 1: loop.time() returns the event loop's monotonic clock as a float in seconds.

Hint 2: Use loop.call_at(when, fn) to schedule at an absolute loop time.


Medium

#4Custom Event Loop PolicyMedium
event-loop-policyasyncio.set_event_loop_policycustom-loop

Create a custom event loop policy that injects a logging event loop subclass.

import asyncio

class LoggingEventLoop(asyncio.SelectorEventLoop):
"""Event loop that logs its class name on creation."""
def __init__(self):
super().__init__()
print(f"Using custom event loop: {type(self).__name__}")

class LoggingEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return LoggingEventLoop()

asyncio.set_event_loop_policy(LoggingEventLoopPolicy())

async def main():
await asyncio.sleep(0.001)
print("Task completed")

asyncio.run(main())

# Reset to default policy
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
Solution
import asyncio

class LoggingEventLoop(asyncio.SelectorEventLoop):
def __init__(self):
super().__init__()
print(f"Using custom event loop: {type(self).__name__}")

class LoggingEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return LoggingEventLoop()

asyncio.set_event_loop_policy(LoggingEventLoopPolicy())

async def main():
await asyncio.sleep(0.001)
print("Task completed")

asyncio.run(main())

asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

Event loop policy system:

  • asyncio.get_event_loop_policy() returns the current policy. Default is DefaultEventLoopPolicy.
  • The policy controls how asyncio.run(), asyncio.get_event_loop(), and asyncio.new_event_loop() create and manage loops.
  • On Windows, use asyncio.WindowsProactorEventLoopPolicy for better I/O performance (IOCP).
  • uvloop.EventLoopPolicy is the most common custom policy — it replaces the C SelectorEventLoop with a libuv-based implementation that is 2-4x faster.
  • Always reset the policy at the end of tests to avoid polluting other test cases.
Expected Output
Using custom event loop: CustomEventLoop\nTask completed
Hints

Hint 1: Subclass asyncio.DefaultEventLoopPolicy and override new_event_loop() to return your custom loop.

Hint 2: Call asyncio.set_event_loop_policy() before asyncio.run() to activate the policy.

#5Monitoring Event Loop LagMedium
event-loopmonitoringlagcall_soonperformance

Build a loop lag detector: schedule a call_soon callback, measure how long it actually takes to fire, and demonstrate how blocking code inflates lag.

import asyncio
import time

async def measure_lag(loop, label):
scheduled_at = loop.time()
fired_at = [None]

def callback():
fired_at[0] = loop.time()

loop.call_soon(callback)
await asyncio.sleep(0) # allow callback to fire
lag_ms = (fired_at[0] - scheduled_at) * 1000
print(f"{label}: {lag_ms:.1f}ms lag")

async def main():
loop = asyncio.get_running_loop()
await measure_lag(loop, "Normal")

# Simulate blocking
time.sleep(0.05) # blocks the event loop!
await measure_lag(loop, "After blocking")

asyncio.run(main())
Solution
import asyncio
import time

async def measure_lag(loop, label):
scheduled_at = loop.time()
fired_at = [None]

def callback():
fired_at[0] = loop.time()

loop.call_soon(callback)
await asyncio.sleep(0)
lag_ms = (fired_at[0] - scheduled_at) * 1000
print(f"{label}: {lag_ms:.1f}ms lag")

async def main():
loop = asyncio.get_running_loop()
await measure_lag(loop, "Normal")
time.sleep(0.05) # blocks event loop
await measure_lag(loop, "After blocking")

asyncio.run(main())

Event loop lag monitoring:

  • call_soon callbacks should fire within microseconds on an idle loop.
  • time.sleep() in the event loop thread blocks all I/O, timers, and callbacks for the sleep duration.
  • Loop lag is the primary metric for async application health. Libraries like prometheus_client expose this as a gauge.
  • Production monitoring: schedule a repeating call_soon ping every 100ms; if the lag exceeds 50ms, emit a warning.
  • This is how tools like Sentry detect "event loop blocked" issues in async Python applications.
Expected Output
Loop lag: approximately 0.0ms (no blocking)\nLoop lag: approximately 50ms (after blocking)
Hints

Hint 1: Schedule a callback with call_soon, record the time before and after. The difference is event loop lag.

Hint 2: If the event loop is blocked by CPU-intensive or blocking code, the lag increases.

#6run_in_executor with ProcessPoolExecutorMedium
ProcessPoolExecutorrun_in_executorCPU-boundmultiprocessing

Run a CPU-intensive computation in a ProcessPoolExecutor without blocking the event loop.

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive(n):
"""Sum 1..n — deliberately slow for demonstration."""
return sum(range(n + 1))

async def main():
with ProcessPoolExecutor(max_workers=2) as pool:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(pool, cpu_intensive, 1_000_000)
print(f"CPU result: {result}")
print("Time with executor: fast")

asyncio.run(main())
Solution
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive(n):
return sum(range(n + 1))

async def main():
with ProcessPoolExecutor(max_workers=2) as pool:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(pool, cpu_intensive, 1_000_000)
print(f"CPU result: {result}")
print("Time with executor: fast")

asyncio.run(main())

Process pool for CPU-bound work:

  • ThreadPoolExecutor threads share the GIL — for pure Python CPU work, threads do NOT parallelize.
  • ProcessPoolExecutor spawns separate processes with their own GIL — true parallelism for CPU work.
  • Functions must be picklable: module-level functions work; lambdas and closures do not.
  • Process startup overhead (~100ms) makes process pools expensive for small tasks. Use for work that takes at least hundreds of milliseconds.
  • For mixed I/O + CPU, compose: use thread pool for I/O, process pool for CPU-intensive post-processing.
  • Alternative: asyncio.to_thread() for I/O-bound blocking; ProcessPoolExecutor for CPU-bound.
Expected Output
CPU result: 500000500000\nTime with executor: fast
Hints

Hint 1: Use concurrent.futures.ProcessPoolExecutor for CPU-bound work to bypass the GIL.

Hint 2: The function must be picklable — define it at module level, not as a lambda or closure.

#7Signal Handling in the Event LoopMedium
signal-handlingasyncioadd_signal_handlergraceful-shutdown

Register a SIGINT handler in the event loop that triggers graceful shutdown.

import asyncio
import signal

async def main():
loop = asyncio.get_running_loop()
shutdown = asyncio.Event()

def on_signal():
print("SIGINT received — shutting down gracefully")
shutdown.set()

loop.add_signal_handler(signal.SIGINT, on_signal)
print("Server started")

# Simulate receiving SIGINT after 0.05s
loop.call_later(0.05, loop.remove_signal_handler, signal.SIGINT)
loop.call_later(0.05, on_signal)

await shutdown.wait()
print("Server stopped")

asyncio.run(main())
Solution
import asyncio
import signal

async def main():
loop = asyncio.get_running_loop()
shutdown = asyncio.Event()

def on_signal():
print("SIGINT received — shutting down gracefully")
shutdown.set()

loop.add_signal_handler(signal.SIGINT, on_signal)
print("Server started")

loop.call_later(0.05, loop.remove_signal_handler, signal.SIGINT)
loop.call_later(0.05, on_signal)

await shutdown.wait()
print("Server stopped")

asyncio.run(main())

Signal handling in asyncio:

  • loop.add_signal_handler() is POSIX-only (Linux/macOS) — not available on Windows.
  • The callback runs in the event loop thread (not a signal handler thread), making it safe to call asyncio primitives.
  • Common pattern: set an asyncio.Event in the signal callback, then await event.wait() in the main coroutine.
  • loop.remove_signal_handler(sig) deregisters the handler and restores the default behaviour.
  • Production graceful shutdown: on SIGINT/SIGTERM, stop accepting new connections, wait for in-flight requests to complete (with a timeout), then stop the loop.
Expected Output
Server started\nSIGINT received — shutting down gracefully\nServer stopped
Hints

Hint 1: loop.add_signal_handler(signal.SIGINT, callback) registers a callback for SIGINT.

Hint 2: Signal handlers in asyncio run in the event loop thread — safe to call coroutines from them.


Hard

#8Debug Mode and Slow Callback DetectionHard
debug-modeslow-callbackasyncioperformance

Enable asyncio debug mode and trigger a slow callback warning by running a blocking operation in a callback.

import asyncio
import time
import logging

logging.basicConfig(level=logging.WARNING)

async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05 # warn if callback takes > 50ms

def slow_callback():
time.sleep(0.1) # blocks for 100ms — slower than threshold

loop.call_soon(slow_callback)
await asyncio.sleep(0.2) # give callback time to fire and be reported
print("Slow callback detected (>0.1s) in debug mode")

asyncio.run(main())
Solution
import asyncio
import time
import logging

logging.basicConfig(level=logging.WARNING)

async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05

def slow_callback():
time.sleep(0.1)

loop.call_soon(slow_callback)
await asyncio.sleep(0.2)
print("Slow callback detected (>0.1s) in debug mode")

asyncio.run(main())

Debug mode capabilities:

  • loop.set_debug(True) enables: slow callback warnings, unawaited coroutine warnings, and detailed exception tracebacks.
  • loop.slow_callback_duration (default 0.1s) is the threshold for "slow callback" warnings.
  • In debug mode, asyncio tracks every coroutine creation site for better error reporting.
  • Enable globally: PYTHONASYNCIODEBUG=1 environment variable, or asyncio.run(main(), debug=True).
  • Production: run with debug mode in staging to catch blocking code; disable in production for performance.
  • asyncio.get_event_loop().set_debug(True) also enables coroutine creation tracking for the "was never awaited" warning.
Expected Output
Slow callback detected (>0.1s) in debug mode
Hints

Hint 1: Set PYTHONASYNCIODEBUG=1 or loop.set_debug(True) to enable slow callback warnings.

Hint 2: A slow callback is one that takes longer than loop.slow_callback_duration seconds (default 0.1s).

#9Multiple Event Loops in Different ThreadsHard
event-loopthreadingasyncio.runthread-isolation

Run three independent event loops in three separate threads, each running a different async task.

import asyncio
import threading

async def thread_task(thread_id):
await asyncio.sleep(0.01)
return thread_id

def run_in_thread(thread_id, results):
result = asyncio.run(thread_task(thread_id))
results[thread_id] = result

results = {}
threads = [threading.Thread(target=run_in_thread, args=(i, results)) for i in range(3)]

for t in threads:
t.start()
for t in threads:
t.join()

for thread_id in sorted(results):
print(f"Thread {thread_id} loop result: {results[thread_id]}")
Solution
import asyncio
import threading

async def thread_task(thread_id):
await asyncio.sleep(0.01)
return thread_id

def run_in_thread(thread_id, results):
result = asyncio.run(thread_task(thread_id))
results[thread_id] = result

results = {}
threads = [threading.Thread(target=run_in_thread, args=(i, results)) for i in range(3)]

for t in threads:
t.start()
for t in threads:
t.join()

for thread_id in sorted(results):
print(f"Thread {thread_id} loop result: {results[thread_id]}")

Multi-loop threading:

  • Each thread has its own event loop. asyncio.run() creates a new loop, runs the coroutine, then closes the loop.
  • Loops are completely isolated — Futures, Tasks, and Locks from one loop cannot be used in another.
  • Use asyncio.run_coroutine_threadsafe(coro, loop) when you need to submit work to a running loop from a different thread.
  • The dict results is safely shared between threads because writes happen after the threads complete (via join()).
  • Common production pattern: one main async loop for I/O, plus worker threads for CPU-bound tasks that communicate back via asyncio.Queue or Future.set_result.
Expected Output
Thread 0 loop result: 0\nThread 1 loop result: 1\nThread 2 loop result: 2
Hints

Hint 1: Each thread must create its own event loop. asyncio.run() handles this automatically.

Hint 2: asyncio objects (Futures, Tasks) are NOT thread-safe — never share them across loops.

#10Custom I/O WatcherHard
add_readeradd_writerfile-descriptorevent-loopselector

Use loop.add_reader() to register a callback that fires when data becomes available on a file descriptor.

import asyncio
import os

async def main():
loop = asyncio.get_running_loop()
read_fd, write_fd = os.pipe()
received = asyncio.Future()

def on_readable():
data = os.read(read_fd, 1024).decode()
print(f"Data available to read")
print(f"Content: {data}")
loop.remove_reader(read_fd)
os.close(read_fd)
received.set_result(data)

loop.add_reader(read_fd, on_readable)

# Write data from another task
async def writer():
await asyncio.sleep(0.01)
os.write(write_fd, b"hello from pipe")
os.close(write_fd)

asyncio.create_task(writer())
await received

asyncio.run(main())
Solution
import asyncio
import os

async def main():
loop = asyncio.get_running_loop()
read_fd, write_fd = os.pipe()
received = asyncio.Future()

def on_readable():
data = os.read(read_fd, 1024).decode()
print("Data available to read")
print(f"Content: {data}")
loop.remove_reader(read_fd)
os.close(read_fd)
received.set_result(data)

loop.add_reader(read_fd, on_readable)

async def writer():
await asyncio.sleep(0.01)
os.write(write_fd, b"hello from pipe")
os.close(write_fd)

asyncio.create_task(writer())
await received

asyncio.run(main())

add_reader / add_writer:

  • loop.add_reader(fd, callback) tells the event loop's selector to watch fd for readability.
  • When the OS signals that fd is readable, the event loop calls callback on the next iteration.
  • This is the raw primitive underlying all of asyncio's I/O: sockets, pipes, and subprocesses use this internally.
  • Always call loop.remove_reader(fd) when done to prevent spurious callbacks after the fd is closed.
  • add_writer(fd, callback) is the equivalent for write readiness — used for non-blocking writes to avoid blocking on a full send buffer.
  • This level of control is how high-performance protocol implementations (like uvloop) operate.
Expected Output
Data available to read\nContent: hello from pipe
Hints

Hint 1: loop.add_reader(fd, callback) registers a callback to run when fd becomes readable.

Hint 2: Use os.pipe() to create a read/write pair for testing.

#11Event Loop Lifecycle — Full ControlHard
event-looprun_until_completerun_foreverstopclose

Manually manage the event loop lifecycle: create, run tasks, stop, and close the loop.

import asyncio

async def slow_task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")

async def orchestrator(loop):
await slow_task("Task 1", 0.01)
await slow_task("Task 2", 0.02)
print("Shutting down")
loop.stop()

loop = asyncio.new_event_loop()
print("Loop started")
loop.run_until_complete(orchestrator(loop))
loop.close()
print("Loop closed")
Solution
import asyncio

async def slow_task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")

async def orchestrator(loop):
await slow_task("Task 1", 0.01)
await slow_task("Task 2", 0.02)
print("Shutting down")
loop.stop()

loop = asyncio.new_event_loop()
print("Loop started")
loop.run_until_complete(orchestrator(loop))
loop.close()
print("Loop closed")

Manual loop lifecycle:

  • asyncio.new_event_loop() creates a loop but does not start it. Useful when you need full control (tests, embedding asyncio in sync frameworks).
  • run_until_complete(coro) runs the loop until the coroutine completes, then stops.
  • run_forever() + loop.stop() (called from inside) gives finer control over the run loop.
  • loop.close() releases the selector, file descriptors, and executor threads. Always call it when done.
  • After close(), the loop is not reusable. Create a new one if needed.
  • asyncio.run(coro) is the recommended high-level API — it does new_event_loop, run_until_complete, close, and some cleanup automatically.
Expected Output
Loop started\nTask 1 done\nTask 2 done\nShutting down\nLoop closed
Hints

Hint 1: loop.run_forever() runs until loop.stop() is called. run_until_complete() is a convenience wrapper.

Hint 2: Always close the loop with loop.close() after stopping to release OS resources.

© 2026 EngineersOfAI. All rights reserved.