Python Advanced Event Loop Practice Problems & Exercises
Practice: Advanced Event Loop
← Back to lessonEasy
Use the event loop's call_soon and call_later scheduling primitives directly.
import asyncio
import time
async def main():
loop = asyncio.get_running_loop()
def soon_callback():
print("call_soon fired")
def later_callback():
print("call_later fired after ~0.01s")
loop.call_soon(soon_callback)
loop.call_later(0.01, later_callback)
await asyncio.sleep(0.05) # give callbacks time to fire
asyncio.run(main())
Solution
import asyncio
async def main():
loop = asyncio.get_running_loop()
def soon_callback():
print("call_soon fired")
def later_callback():
print("call_later fired after ~0.01s")
loop.call_soon(soon_callback)
loop.call_later(0.01, later_callback)
await asyncio.sleep(0.05)
asyncio.run(main())
call_soon vs call_later:
call_soon(fn)addsfnto the ready queue. It runs on the next event loop iteration (before I/O polling).call_later(delay, fn)addsfnto the scheduled heap with a timestamp. It runs afterdelayseconds.- Both are synchronous scheduling calls — they return immediately.
- They accept
*args:call_soon(fn, arg1, arg2)callsfn(arg1, arg2). - Use
call_soonfor work that should happen "right after the current step". Usecall_laterfor timers. loop.call_at(when, fn)is similar tocall_laterbut takes an absoluteloop.time()timestamp.
Expected Output
call_soon fired\ncall_later fired after ~0.01sHints
Hint 1: loop.call_soon(fn) schedules fn for the next event loop iteration.
Hint 2: loop.call_later(delay, fn) schedules fn after delay seconds.
Run a blocking function in a thread pool without blocking the event loop using run_in_executor.
import asyncio
import time
def blocking_io():
time.sleep(0.05) # simulates blocking disk/network I/O
return 42
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print(f"Blocking result: {result}")
print("Async code continues")
asyncio.run(main())
Solution
import asyncio
import time
def blocking_io():
time.sleep(0.05)
return 42
async def main():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_io)
print(f"Blocking result: {result}")
print("Async code continues")
asyncio.run(main())
run_in_executor mechanics:
Noneexecutor uses the defaultThreadPoolExecutor(size = min(32, cpu_count + 4) in Python 3.8+).- The blocking function runs in a worker thread; the event loop continues processing other callbacks.
- Returns a
Futurethat resolves when the thread completes. - For CPU-bound work, use
ProcessPoolExecutorto avoid GIL contention. asyncio.to_thread(fn, *args)(Python 3.9+) is a higher-level wrapper:await asyncio.to_thread(blocking_io).- Do NOT call blocking code (file I/O,
time.sleep, CPU-intensive loops) directly in async functions — it stalls the entire event loop.
Expected Output
Blocking result: 42\nAsync code continuesHints
Hint 1: loop.run_in_executor(executor, fn, *args) runs fn in a thread pool without blocking the event loop.
Hint 2: Pass None as executor to use the default thread pool.
Measure the actual delay of call_later using loop.time() to observe event loop scheduling precision.
import asyncio
async def main():
loop = asyncio.get_running_loop()
start = loop.time()
fired_at = [None]
def record_time():
fired_at[0] = loop.time()
loop.call_later(0.02, record_time)
await asyncio.sleep(0.05)
elapsed = fired_at[0] - start
print(f"Elapsed: approximately {elapsed:.2f}s")
asyncio.run(main())
Solution
import asyncio
async def main():
loop = asyncio.get_running_loop()
start = loop.time()
fired_at = [None]
def record_time():
fired_at[0] = loop.time()
loop.call_later(0.02, record_time)
await asyncio.sleep(0.05)
elapsed = fired_at[0] - start
print(f"Elapsed: approximately {elapsed:.2f}s")
asyncio.run(main())
Scheduling precision:
loop.time()usestime.monotonic()internally — it is the event loop's reference clock.call_laterprecision is limited by: the sleep resolution of the OS (time.sleepgranularity ~1ms on Linux, ~15ms on Windows), event loop overhead, and GIL contention.- In practice, expect ~1-5ms jitter on a lightly loaded system, up to ~20ms under load.
- For tight scheduling requirements,
uvloop(a drop-in replacement built on libuv) provides significantly better timer precision. call_at(when, fn)takes an absoluteloop.time()value — useful for scheduling relative to known reference points.
Expected Output
Elapsed: approximately 0.02sHints
Hint 1: loop.time() returns the event loop's monotonic clock as a float in seconds.
Hint 2: Use loop.call_at(when, fn) to schedule at an absolute loop time.
Medium
Create a custom event loop policy that injects a logging event loop subclass.
import asyncio
class LoggingEventLoop(asyncio.SelectorEventLoop):
"""Event loop that logs its class name on creation."""
def __init__(self):
super().__init__()
print(f"Using custom event loop: {type(self).__name__}")
class LoggingEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return LoggingEventLoop()
asyncio.set_event_loop_policy(LoggingEventLoopPolicy())
async def main():
await asyncio.sleep(0.001)
print("Task completed")
asyncio.run(main())
# Reset to default policy
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
Solution
import asyncio
class LoggingEventLoop(asyncio.SelectorEventLoop):
def __init__(self):
super().__init__()
print(f"Using custom event loop: {type(self).__name__}")
class LoggingEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return LoggingEventLoop()
asyncio.set_event_loop_policy(LoggingEventLoopPolicy())
async def main():
await asyncio.sleep(0.001)
print("Task completed")
asyncio.run(main())
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
Event loop policy system:
asyncio.get_event_loop_policy()returns the current policy. Default isDefaultEventLoopPolicy.- The policy controls how
asyncio.run(),asyncio.get_event_loop(), andasyncio.new_event_loop()create and manage loops. - On Windows, use
asyncio.WindowsProactorEventLoopPolicyfor better I/O performance (IOCP). uvloop.EventLoopPolicyis the most common custom policy — it replaces the CSelectorEventLoopwith a libuv-based implementation that is 2-4x faster.- Always reset the policy at the end of tests to avoid polluting other test cases.
Expected Output
Using custom event loop: CustomEventLoop\nTask completedHints
Hint 1: Subclass asyncio.DefaultEventLoopPolicy and override new_event_loop() to return your custom loop.
Hint 2: Call asyncio.set_event_loop_policy() before asyncio.run() to activate the policy.
Build a loop lag detector: schedule a call_soon callback, measure how long it actually takes to fire, and demonstrate how blocking code inflates lag.
import asyncio
import time
async def measure_lag(loop, label):
scheduled_at = loop.time()
fired_at = [None]
def callback():
fired_at[0] = loop.time()
loop.call_soon(callback)
await asyncio.sleep(0) # allow callback to fire
lag_ms = (fired_at[0] - scheduled_at) * 1000
print(f"{label}: {lag_ms:.1f}ms lag")
async def main():
loop = asyncio.get_running_loop()
await measure_lag(loop, "Normal")
# Simulate blocking
time.sleep(0.05) # blocks the event loop!
await measure_lag(loop, "After blocking")
asyncio.run(main())
Solution
import asyncio
import time
async def measure_lag(loop, label):
scheduled_at = loop.time()
fired_at = [None]
def callback():
fired_at[0] = loop.time()
loop.call_soon(callback)
await asyncio.sleep(0)
lag_ms = (fired_at[0] - scheduled_at) * 1000
print(f"{label}: {lag_ms:.1f}ms lag")
async def main():
loop = asyncio.get_running_loop()
await measure_lag(loop, "Normal")
time.sleep(0.05) # blocks event loop
await measure_lag(loop, "After blocking")
asyncio.run(main())
Event loop lag monitoring:
call_sooncallbacks should fire within microseconds on an idle loop.time.sleep()in the event loop thread blocks all I/O, timers, and callbacks for the sleep duration.- Loop lag is the primary metric for async application health. Libraries like
prometheus_clientexpose this as a gauge. - Production monitoring: schedule a repeating
call_soonping every 100ms; if the lag exceeds 50ms, emit a warning. - This is how tools like Sentry detect "event loop blocked" issues in async Python applications.
Expected Output
Loop lag: approximately 0.0ms (no blocking)\nLoop lag: approximately 50ms (after blocking)Hints
Hint 1: Schedule a callback with call_soon, record the time before and after. The difference is event loop lag.
Hint 2: If the event loop is blocked by CPU-intensive or blocking code, the lag increases.
Run a CPU-intensive computation in a ProcessPoolExecutor without blocking the event loop.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive(n):
"""Sum 1..n — deliberately slow for demonstration."""
return sum(range(n + 1))
async def main():
with ProcessPoolExecutor(max_workers=2) as pool:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(pool, cpu_intensive, 1_000_000)
print(f"CPU result: {result}")
print("Time with executor: fast")
asyncio.run(main())
Solution
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive(n):
return sum(range(n + 1))
async def main():
with ProcessPoolExecutor(max_workers=2) as pool:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(pool, cpu_intensive, 1_000_000)
print(f"CPU result: {result}")
print("Time with executor: fast")
asyncio.run(main())
Process pool for CPU-bound work:
ThreadPoolExecutorthreads share the GIL — for pure Python CPU work, threads do NOT parallelize.ProcessPoolExecutorspawns separate processes with their own GIL — true parallelism for CPU work.- Functions must be picklable: module-level functions work; lambdas and closures do not.
- Process startup overhead (~100ms) makes process pools expensive for small tasks. Use for work that takes at least hundreds of milliseconds.
- For mixed I/O + CPU, compose: use thread pool for I/O, process pool for CPU-intensive post-processing.
- Alternative:
asyncio.to_thread()for I/O-bound blocking;ProcessPoolExecutorfor CPU-bound.
Expected Output
CPU result: 500000500000\nTime with executor: fastHints
Hint 1: Use concurrent.futures.ProcessPoolExecutor for CPU-bound work to bypass the GIL.
Hint 2: The function must be picklable — define it at module level, not as a lambda or closure.
Register a SIGINT handler in the event loop that triggers graceful shutdown.
import asyncio
import signal
async def main():
loop = asyncio.get_running_loop()
shutdown = asyncio.Event()
def on_signal():
print("SIGINT received — shutting down gracefully")
shutdown.set()
loop.add_signal_handler(signal.SIGINT, on_signal)
print("Server started")
# Simulate receiving SIGINT after 0.05s
loop.call_later(0.05, loop.remove_signal_handler, signal.SIGINT)
loop.call_later(0.05, on_signal)
await shutdown.wait()
print("Server stopped")
asyncio.run(main())
Solution
import asyncio
import signal
async def main():
loop = asyncio.get_running_loop()
shutdown = asyncio.Event()
def on_signal():
print("SIGINT received — shutting down gracefully")
shutdown.set()
loop.add_signal_handler(signal.SIGINT, on_signal)
print("Server started")
loop.call_later(0.05, loop.remove_signal_handler, signal.SIGINT)
loop.call_later(0.05, on_signal)
await shutdown.wait()
print("Server stopped")
asyncio.run(main())
Signal handling in asyncio:
loop.add_signal_handler()is POSIX-only (Linux/macOS) — not available on Windows.- The callback runs in the event loop thread (not a signal handler thread), making it safe to call asyncio primitives.
- Common pattern: set an
asyncio.Eventin the signal callback, thenawait event.wait()in the main coroutine. loop.remove_signal_handler(sig)deregisters the handler and restores the default behaviour.- Production graceful shutdown: on SIGINT/SIGTERM, stop accepting new connections, wait for in-flight requests to complete (with a timeout), then stop the loop.
Expected Output
Server started\nSIGINT received — shutting down gracefully\nServer stoppedHints
Hint 1: loop.add_signal_handler(signal.SIGINT, callback) registers a callback for SIGINT.
Hint 2: Signal handlers in asyncio run in the event loop thread — safe to call coroutines from them.
Hard
Enable asyncio debug mode and trigger a slow callback warning by running a blocking operation in a callback.
import asyncio
import time
import logging
logging.basicConfig(level=logging.WARNING)
async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05 # warn if callback takes > 50ms
def slow_callback():
time.sleep(0.1) # blocks for 100ms — slower than threshold
loop.call_soon(slow_callback)
await asyncio.sleep(0.2) # give callback time to fire and be reported
print("Slow callback detected (>0.1s) in debug mode")
asyncio.run(main())
Solution
import asyncio
import time
import logging
logging.basicConfig(level=logging.WARNING)
async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.05
def slow_callback():
time.sleep(0.1)
loop.call_soon(slow_callback)
await asyncio.sleep(0.2)
print("Slow callback detected (>0.1s) in debug mode")
asyncio.run(main())
Debug mode capabilities:
loop.set_debug(True)enables: slow callback warnings, unawaited coroutine warnings, and detailed exception tracebacks.loop.slow_callback_duration(default 0.1s) is the threshold for "slow callback" warnings.- In debug mode, asyncio tracks every coroutine creation site for better error reporting.
- Enable globally:
PYTHONASYNCIODEBUG=1environment variable, orasyncio.run(main(), debug=True). - Production: run with debug mode in staging to catch blocking code; disable in production for performance.
asyncio.get_event_loop().set_debug(True)also enables coroutine creation tracking for the "was never awaited" warning.
Expected Output
Slow callback detected (>0.1s) in debug modeHints
Hint 1: Set PYTHONASYNCIODEBUG=1 or loop.set_debug(True) to enable slow callback warnings.
Hint 2: A slow callback is one that takes longer than loop.slow_callback_duration seconds (default 0.1s).
Run three independent event loops in three separate threads, each running a different async task.
import asyncio
import threading
async def thread_task(thread_id):
await asyncio.sleep(0.01)
return thread_id
def run_in_thread(thread_id, results):
result = asyncio.run(thread_task(thread_id))
results[thread_id] = result
results = {}
threads = [threading.Thread(target=run_in_thread, args=(i, results)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
for thread_id in sorted(results):
print(f"Thread {thread_id} loop result: {results[thread_id]}")
Solution
import asyncio
import threading
async def thread_task(thread_id):
await asyncio.sleep(0.01)
return thread_id
def run_in_thread(thread_id, results):
result = asyncio.run(thread_task(thread_id))
results[thread_id] = result
results = {}
threads = [threading.Thread(target=run_in_thread, args=(i, results)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
for thread_id in sorted(results):
print(f"Thread {thread_id} loop result: {results[thread_id]}")
Multi-loop threading:
- Each thread has its own event loop.
asyncio.run()creates a new loop, runs the coroutine, then closes the loop. - Loops are completely isolated — Futures, Tasks, and Locks from one loop cannot be used in another.
- Use
asyncio.run_coroutine_threadsafe(coro, loop)when you need to submit work to a running loop from a different thread. - The dict
resultsis safely shared between threads because writes happen after the threads complete (viajoin()). - Common production pattern: one main async loop for I/O, plus worker threads for CPU-bound tasks that communicate back via
asyncio.QueueorFuture.set_result.
Expected Output
Thread 0 loop result: 0\nThread 1 loop result: 1\nThread 2 loop result: 2Hints
Hint 1: Each thread must create its own event loop. asyncio.run() handles this automatically.
Hint 2: asyncio objects (Futures, Tasks) are NOT thread-safe — never share them across loops.
Use loop.add_reader() to register a callback that fires when data becomes available on a file descriptor.
import asyncio
import os
async def main():
loop = asyncio.get_running_loop()
read_fd, write_fd = os.pipe()
received = asyncio.Future()
def on_readable():
data = os.read(read_fd, 1024).decode()
print(f"Data available to read")
print(f"Content: {data}")
loop.remove_reader(read_fd)
os.close(read_fd)
received.set_result(data)
loop.add_reader(read_fd, on_readable)
# Write data from another task
async def writer():
await asyncio.sleep(0.01)
os.write(write_fd, b"hello from pipe")
os.close(write_fd)
asyncio.create_task(writer())
await received
asyncio.run(main())
Solution
import asyncio
import os
async def main():
loop = asyncio.get_running_loop()
read_fd, write_fd = os.pipe()
received = asyncio.Future()
def on_readable():
data = os.read(read_fd, 1024).decode()
print("Data available to read")
print(f"Content: {data}")
loop.remove_reader(read_fd)
os.close(read_fd)
received.set_result(data)
loop.add_reader(read_fd, on_readable)
async def writer():
await asyncio.sleep(0.01)
os.write(write_fd, b"hello from pipe")
os.close(write_fd)
asyncio.create_task(writer())
await received
asyncio.run(main())
add_reader / add_writer:
loop.add_reader(fd, callback)tells the event loop's selector to watchfdfor readability.- When the OS signals that
fdis readable, the event loop callscallbackon the next iteration. - This is the raw primitive underlying all of asyncio's I/O: sockets, pipes, and subprocesses use this internally.
- Always call
loop.remove_reader(fd)when done to prevent spurious callbacks after the fd is closed. add_writer(fd, callback)is the equivalent for write readiness — used for non-blocking writes to avoid blocking on a full send buffer.- This level of control is how high-performance protocol implementations (like
uvloop) operate.
Expected Output
Data available to read\nContent: hello from pipeHints
Hint 1: loop.add_reader(fd, callback) registers a callback to run when fd becomes readable.
Hint 2: Use os.pipe() to create a read/write pair for testing.
Manually manage the event loop lifecycle: create, run tasks, stop, and close the loop.
import asyncio
async def slow_task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")
async def orchestrator(loop):
await slow_task("Task 1", 0.01)
await slow_task("Task 2", 0.02)
print("Shutting down")
loop.stop()
loop = asyncio.new_event_loop()
print("Loop started")
loop.run_until_complete(orchestrator(loop))
loop.close()
print("Loop closed")
Solution
import asyncio
async def slow_task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")
async def orchestrator(loop):
await slow_task("Task 1", 0.01)
await slow_task("Task 2", 0.02)
print("Shutting down")
loop.stop()
loop = asyncio.new_event_loop()
print("Loop started")
loop.run_until_complete(orchestrator(loop))
loop.close()
print("Loop closed")
Manual loop lifecycle:
asyncio.new_event_loop()creates a loop but does not start it. Useful when you need full control (tests, embedding asyncio in sync frameworks).run_until_complete(coro)runs the loop until the coroutine completes, then stops.run_forever()+loop.stop()(called from inside) gives finer control over the run loop.loop.close()releases the selector, file descriptors, and executor threads. Always call it when done.- After
close(), the loop is not reusable. Create a new one if needed. asyncio.run(coro)is the recommended high-level API — it doesnew_event_loop,run_until_complete,close, and some cleanup automatically.
Expected Output
Loop started\nTask 1 done\nTask 2 done\nShutting down\nLoop closedHints
Hint 1: loop.run_forever() runs until loop.stop() is called. run_until_complete() is a convenience wrapper.
Hint 2: Always close the loop with loop.close() after stopping to release OS resources.
