The Event Loop Explained
Reading time: ~35 minutes | Level: Intermediate → Engineering
Before reading further, consider this:
import asyncio
loop = asyncio.get_event_loop()
# DeprecationWarning: There is no current event loop
In Python 3.10+, calling asyncio.get_event_loop() in code that is not already inside a running loop emits a DeprecationWarning. In Python 3.12+, this warning is strengthened and is planned to become an error in a future release. Why did the Python core team change an API that millions of programs use? And what should you call instead?
The answer reveals something fundamental about how the event loop actually works - and why the old API was always subtly wrong. This lesson gives you the engineering-depth answer.
What You Will Learn
- What the event loop physically is - a loop that polls for I/O readiness and runs callbacks
- How the OS tells Python which file descriptors are ready:
select,epoll,kqueue - The exact cycle the event loop runs - from scheduling callbacks to polling I/O and back
asyncio.get_event_loop()vsasyncio.get_running_loop()vsasyncio.run()- which to use and when- Handles and callbacks:
loop.call_soon(),loop.call_later(),loop.call_at() - Debugging the event loop in production - slow callback detection,
loop.set_debug(True) - uvloop - the drop-in replacement that is 2–4x faster and when to use it
- Why you cannot run multiple event loops in one thread, and how frameworks work around it
Prerequisites
- Lesson 03 of this module: Asyncio - you should be comfortable with
async def,await,asyncio.gather, and coroutines - Basic familiarity with what a file descriptor is (a number the OS uses to refer to an open file, socket, or pipe)
Part 1 - What the Event Loop Actually Is
Not Magic - a Loop
Strip away all the Python abstractions and the asyncio event loop is this:
while True:
run all callbacks that are ready to run
ask the OS: which file descriptors are ready for I/O?
for each ready file descriptor:
run the callback registered for it
check for scheduled (time-based) callbacks and run any that are due
That is the entire event loop. It is not threading. It is not parallelism. It is a single-threaded program that repeatedly asks the operating system "is any I/O ready?" and then runs registered Python functions in response.
The Key Insight: Cooperative Scheduling
When your coroutine does await asyncio.sleep(1), it does not block the thread. It registers a callback with the event loop to be called after 1 second, then returns control to the event loop. The event loop then runs other ready callbacks - other coroutines that are waiting for different things. After 1 second, your callback fires and your coroutine resumes where it left off.
This is cooperative multitasking. Every await expression is a yield point where your coroutine voluntarily yields control back to the event loop. If you never await, you never yield, and you block the entire event loop for every other coroutine.
import asyncio
async def cooperative():
print("Starting cooperative task")
await asyncio.sleep(0) # yield to event loop - other tasks can run
print("Resumed after yield")
async def blocking():
print("Starting blocking task")
# No await - runs to completion without yielding
for i in range(10_000_000): # blocks the event loop for ~100ms
pass
print("Done (the event loop was frozen the whole time)")
Part 2 - Selectors: How the OS Tells Python Which I/O Is Ready
The Problem the Event Loop Solves
A naive I/O approach would be: read from socket A, wait for data, then read from socket B, wait for data, and so on. If you have 1,000 sockets, you would spend most of your time waiting, sequentially, for each one.
The event loop's solution: register all 1,000 sockets with the operating system and ask "let me know when ANY of them has data". The OS does this efficiently with system calls designed for this purpose.
The Three Selector Mechanisms
Python's selectors module provides a platform-independent interface. Underneath, it uses the most efficient mechanism available:
| Platform | Mechanism | Max FDs | Complexity |
|---|---|---|---|
| Linux | epoll | Unlimited | O(1) per event |
| macOS / BSD | kqueue | Unlimited | O(1) per event |
| Windows | IOCP | Unlimited | O(1) per event |
| All (fallback) | select | 1024 | O(n) per call |
import selectors
import socket
# This is the abstraction asyncio uses internally
sel = selectors.DefaultSelector() # picks the best available mechanism
server = socket.socket()
server.setblocking(False) # non-blocking - will not sleep waiting for data
server.bind(("localhost", 8080))
server.listen()
def accept_callback(sock, mask):
conn, addr = sock.accept()
print(f"Connection from {addr}")
# Register: "call accept_callback when server socket is readable"
sel.register(server, selectors.EVENT_READ, data=accept_callback)
while True:
# Block here until at least one registered fd is ready
# timeout=0.1 means: wait at most 100ms, then return even if nothing is ready
events = sel.select(timeout=0.1)
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
How asyncio Uses Selectors
The asyncio event loop wraps exactly this pattern. Every time you do await reader.read(1024) on an asyncio stream, the event loop registers the underlying socket's file descriptor with the selector. It then calls selector.select() to wait for I/O. When data arrives, the selector returns the file descriptor, the event loop finds the coroutine waiting on it, and schedules it to resume.
# What asyncio does internally (simplified):
async def _read_from_socket(sock, n_bytes):
loop = asyncio.get_running_loop()
# Create a Future that will be resolved when data is available
future = loop.create_future()
def _read_callback():
data = sock.recv(n_bytes)
future.set_result(data) # wake up the awaiting coroutine
loop.add_reader(sock.fileno(), _read_callback)
try:
return await future # yield to event loop here
finally:
loop.remove_reader(sock.fileno())
Part 3 - The Event Loop Cycle
The Full Loop Iteration
Why the Order Matters
The order within a single loop iteration is not arbitrary. Callbacks scheduled with call_soon() run before the selector is polled. This means:
import asyncio
async def demonstrate_ordering():
loop = asyncio.get_running_loop()
results = []
# call_soon schedules for the current or very next iteration
loop.call_soon(lambda: results.append("call_soon 1"))
loop.call_soon(lambda: results.append("call_soon 2"))
# Yield - let the loop iteration complete
await asyncio.sleep(0)
# call_soon callbacks ran before we got back here
print(results) # ['call_soon 1', 'call_soon 2']
asyncio.run(demonstrate_ordering())
The Poll Timeout Calculation
When the event loop calls selector.select(timeout), the timeout is not arbitrary. The loop calculates: "when is the next time-based callback due?" If loop.call_later(5.0, callback) was registered and 3 seconds have passed, the timeout is 2 seconds. This ensures the loop wakes up exactly when the scheduled callback is due - not earlier (wasting CPU), not later (missing the deadline).
If there are ready callbacks in the queue, the timeout is 0 - the loop polls the selector but does not block, processes any ready I/O, and immediately runs the queued callbacks.
Part 4 - get_event_loop vs get_running_loop vs asyncio.run
Why get_event_loop() Was a Problem
asyncio.get_event_loop() was designed in Python 3.4 when the asyncio API was new. Its behavior was: return the running event loop if one exists, otherwise create a new one and set it as the current loop for this thread.
The problem: implicitly creating an event loop without the caller knowing about it led to bugs where code created an event loop, ran some coroutines, then a different part of the code created another event loop in the same thread - leaving orphaned loops and leaked resources.
In Python 3.10, the implicit creation behavior was deprecated. In Python 3.12, get_event_loop() outside a running loop emits a DeprecationWarning every time it creates a new loop.
The Three Functions, Explained
import asyncio
# 1. asyncio.run() - the right way to start an event loop from synchronous code.
# Creates a new event loop, runs the coroutine, closes the loop when done.
# Always use this as the top-level entry point.
asyncio.run(my_coroutine())
# 2. asyncio.get_running_loop() - use this INSIDE async code.
# Returns the currently running event loop.
# Raises RuntimeError if no loop is running (never returns None).
# This is the correct way to get the loop inside a coroutine.
async def my_coroutine():
loop = asyncio.get_running_loop() # always correct inside async def
future = loop.create_future()
# ...
# 3. asyncio.get_event_loop() - deprecated for most uses.
# Only use it if you are writing library code that must support
# being called both inside and outside a running loop.
# Prefer get_running_loop() inside async code.
# Prefer asyncio.run() as the entry point.
Decision Table
| Situation | Use This |
|---|---|
| Top-level entry point (main script) | asyncio.run(main()) |
| Inside a coroutine - need the loop | asyncio.get_running_loop() |
| Need a Future inside a coroutine | loop = asyncio.get_running_loop(); loop.create_future() |
| Need to schedule work from a callback | asyncio.get_running_loop() |
| Legacy code, must not break existing API | asyncio.get_event_loop() (with caution) |
Inside any async def function, always use asyncio.get_running_loop() to get the current event loop. It is faster (no thread-local lookup overhead), raises RuntimeError if called outside a loop (which catches bugs immediately), and is the officially endorsed approach since Python 3.10.
Part 5 - Handles and Callbacks
The Three Scheduling Functions
The event loop exposes three low-level callback scheduling functions. These are the primitives that asyncio.sleep(), asyncio.wait_for(), and asyncio.create_task() are all built on.
import asyncio
async def demonstrate_handles():
loop = asyncio.get_running_loop()
now = loop.time() # monotonic clock used by the event loop
results = []
# call_soon: run on the next iteration of the event loop.
# Returns a Handle object - call handle.cancel() to cancel.
handle1 = loop.call_soon(lambda: results.append("call_soon"))
# call_later: run after a delay in seconds.
# Returns a TimerHandle - also cancellable.
handle2 = loop.call_later(0.1, lambda: results.append("call_later 100ms"))
# call_at: run at an absolute event loop time.
# loop.time() returns seconds since the loop started (monotonic).
handle3 = loop.call_at(now + 0.2, lambda: results.append("call_at 200ms"))
await asyncio.sleep(0.3) # wait for all three to fire
print(results)
# ['call_soon', 'call_later 100ms', 'call_at 200ms']
asyncio.run(demonstrate_handles())
Handles Are Cancellable
async def cancellable_callback():
loop = asyncio.get_running_loop()
def expensive_operation():
print("This would do something expensive")
# Schedule it for 5 seconds from now
handle = loop.call_later(5.0, expensive_operation)
# Changed our mind - cancel before it fires
await asyncio.sleep(1.0)
handle.cancel()
print(f"Cancelled: {handle.cancelled()}") # True
await asyncio.sleep(5.0) # expensive_operation never runs
asyncio.run(cancellable_callback())
The Difference Between call_soon and asyncio.create_task
call_soon schedules a plain Python callable (not a coroutine). asyncio.create_task wraps a coroutine in a Task object which the event loop drives to completion, handling await expressions internally.
async def demo():
loop = asyncio.get_running_loop()
# call_soon: run a plain function
loop.call_soon(lambda: print("Plain callback - runs once, no await support"))
# create_task: run a coroutine that can await
async def my_coro():
await asyncio.sleep(0.1)
print("Coroutine - can suspend and resume")
task = asyncio.create_task(my_coro())
await task
asyncio.run(demo())
Thread-Safe Scheduling: call_soon_threadsafe
Regular call_soon is not thread-safe. If you are running code in a different thread (for example, a callback from a thread pool) and need to schedule work on the event loop, use call_soon_threadsafe:
import asyncio
import threading
async def main():
loop = asyncio.get_running_loop()
result_future = loop.create_future()
def thread_worker():
# This runs in a separate OS thread
value = expensive_blocking_computation()
# Schedule setting the result back on the event loop thread
loop.call_soon_threadsafe(result_future.set_result, value)
def expensive_blocking_computation():
import time
time.sleep(1) # simulates blocking work in a thread
return 42
t = threading.Thread(target=thread_worker)
t.start()
# Await the result in the event loop
result = await result_future
print(f"Thread result: {result}") # 42
t.join()
asyncio.run(main())
Part 6 - Debugging the Event Loop
Slow Callback Detection
The most important production debugging feature: the event loop can detect when a callback runs for too long. Any callback that takes more than 100ms (by default) is logged as a warning.
import asyncio
import logging
import time
logging.basicConfig(level=logging.DEBUG)
async def main():
loop = asyncio.get_running_loop()
# Enable debug mode - detects slow callbacks, logs coroutine origins
loop.set_debug(True)
# Lower the threshold for testing (default is 0.1 seconds = 100ms)
loop.slow_callback_duration = 0.05 # 50ms
async def fast_task():
await asyncio.sleep(0.01) # 10ms - fine
return "fast"
async def slow_task():
time.sleep(0.2) # 200ms blocking - will trigger warning
return "slow"
await asyncio.gather(fast_task(), slow_task())
# WARNING: asyncio: Executing <Task ...> took 0.200 seconds
asyncio.run(main())
What loop.set_debug(True) Enables
import asyncio
import os
# Enable via environment variable (no code change needed):
# PYTHONASYNCIODEBUG=1 python my_script.py
# Or programmatically:
async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)
# Debug mode enables:
# 1. Slow callback detection (>100ms callbacks logged as WARNING)
# 2. Coroutine never-awaited detection
# 3. Exception logging for Tasks garbage collected with unhandled exceptions
# 4. Detailed repr for asyncio objects
# 5. ResourceWarning for unclosed transports and event loops
async def forgotten():
return 42
coro = forgotten() # created but not awaited
# In debug mode: RuntimeWarning: coroutine 'forgotten' was never awaited
del coro
asyncio.run(main())
Using asyncio.set_event_loop_policy
Event loop policies control how the event loop is created. The default policy uses asyncio.DefaultEventLoopPolicy. You can replace it globally:
import asyncio
# Switch to uvloop globally (affects all asyncio.run() calls)
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # fallback to default if uvloop is not installed
async def main():
loop = asyncio.get_running_loop()
print(type(loop).__name__) # uvloop.Loop if uvloop is installed
asyncio.run(main())
Never block the event loop - not even for 100ms. A single blocking call in a coroutine freezes every other coroutine, every pending network request, and every scheduled callback for the duration of the block. time.sleep(0.1) in a coroutine is not "just 100ms" - it is 100ms of complete application freeze for all concurrent work.
Use await asyncio.sleep() instead of time.sleep(). Use loop.run_in_executor() to offload blocking I/O or CPU work to a thread pool. Never call synchronous database drivers, requests.get(), or any other blocking library directly inside a coroutine.
Part 7 - uvloop: The Drop-In Replacement
What uvloop Is
uvloop is a reimplementation of the asyncio event loop built on top of libuv - the same C event loop library that powers Node.js. It replaces Python's default event loop with a highly optimized C implementation.
pip install uvloop
Installing uvloop
import asyncio
# Option 1: Global policy replacement (affects all asyncio.run() calls)
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Option 2: Per-runner (Python 3.11+)
asyncio.run(my_coroutine(), loop_factory=uvloop.new_event_loop)
# Option 3: Using asyncio.Runner context manager
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
runner.run(my_coroutine())
Performance Characteristics
uvloop is consistently 2–4x faster than the default asyncio event loop for I/O-heavy workloads:
| Workload | Default asyncio | uvloop | Speedup |
|---|---|---|---|
| Echo server (small messages) | ~60k req/s | ~180k req/s | ~3x |
| HTTP server throughput | ~25k req/s | ~80k req/s | ~3.2x |
| SSL/TLS connections | ~8k conn/s | ~22k conn/s | ~2.75x |
| Timer precision | ±5ms | ±1ms | ~5x more precise |
The speedup comes from libuv's optimized C code paths, fewer Python-level object allocations per I/O event, and more efficient timer management via a heap-based timer wheel.
When to Use uvloop
# USE uvloop when:
# - Building a high-throughput network service (HTTP server, WebSocket server)
# - Running FastAPI, aiohttp, or any asyncio-based web framework in production
# - Processing thousands of concurrent connections
# - Timer precision matters (scheduled tasks, rate limiters)
# DO NOT bother with uvloop when:
# - Your bottleneck is CPU computation (uvloop only helps with I/O event handling)
# - You have fewer than ~100 concurrent connections (overhead is negligible)
# - You are on Windows (uvloop does not support Windows)
# - You are using Python < 3.8
Use uvloop in production for any network-heavy asyncio application. The installation is one line, the API change is one line, and the throughput improvement is typically 2–3x for real workloads. Major Python web frameworks like FastAPI explicitly recommend it. The only caveat: uvloop does not support Windows - add a try/except ImportError fallback for cross-platform development environments.
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # Windows or uvloop not installed - default loop is fine
Part 8 - Multiple Event Loops and Framework Patterns
Why You Cannot Run Two Event Loops in One Thread
The event loop is a thread-local object. Each thread can have at most one running event loop. Attempting to start a second event loop in the same thread raises a RuntimeError:
import asyncio
async def inner():
return 42
async def outer():
# WRONG: calling asyncio.run() inside a running event loop
result = asyncio.run(inner())
# RuntimeError: This event loop is already running
asyncio.run(outer())
This error is common when trying to use asyncio inside Jupyter notebooks (which run their own event loop) or inside synchronous callbacks that are called from async code.
The Jupyter / IPython Pattern
Jupyter runs an event loop in the background. Two solutions exist:
# Option 1: Use nest_asyncio (patches asyncio to allow nested loops)
import nest_asyncio
nest_asyncio.apply()
import asyncio
asyncio.run(my_coroutine()) # works after nest_asyncio.apply()
# Option 2: Use await directly (Jupyter supports top-level await natively)
result = await my_coroutine() # works in Jupyter 6.0+ without nest_asyncio
How Web Frameworks Manage the Event Loop
Frameworks like FastAPI, Starlette, and aiohttp manage the event loop lifecycle for you:
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
@asynccontextmanager
async def lifespan(app) -> AsyncGenerator[None, None]:
# Startup: runs when the event loop starts.
# The framework (uvicorn) has already created and started the loop.
loop = asyncio.get_running_loop()
print(f"App starting on loop: {type(loop).__name__}")
# Set up shared resources
db_pool = await create_db_pool()
app.state.db_pool = db_pool
yield # application runs here
# Shutdown: cleanup
await db_pool.close()
# The framework handles asyncio.run(serve(...))
# You never call asyncio.run() yourself - the framework owns the loop lifecycle
Running Blocking Code in the Event Loop
When you must call a blocking library from async code, use run_in_executor to offload to a thread pool:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
async def mixed_workload():
loop = asyncio.get_running_loop()
# Blocking I/O (file read, legacy library) - offload to thread pool
def read_large_file(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()
# None uses the default ThreadPoolExecutor
content = await loop.run_in_executor(None, read_large_file, "/var/log/syslog")
# CPU-bound work - offload to process pool to bypass the GIL
def compress_data(data: bytes) -> bytes:
import zlib
return zlib.compress(data, level=9)
with ProcessPoolExecutor() as pool:
compressed = await loop.run_in_executor(pool, compress_data, content)
return compressed
loop.run_in_executor(None, func, *args) runs func(*args) in the default thread pool (a ThreadPoolExecutor). The event loop continues processing other coroutines while func runs in the thread. The result is awaitable - your coroutine resumes when func completes. This is the correct pattern for integrating any blocking code into an asyncio application.
Full Example - Event Loop Debugging Toolkit
A self-contained debugging tool you can drop into any asyncio application during development to detect slow callbacks and measure event loop health:
"""
event_loop_debugger.py
Drop-in asyncio event loop debugging toolkit.
Detects slow callbacks, unclosed resources, and loop health metrics.
Usage:
from event_loop_debugger import DebugEventLoop
async def main():
async with DebugEventLoop.monitor(threshold=0.05) as monitor:
await your_application_code()
print(monitor.summary())
asyncio.run(main())
"""
import asyncio
import time
import functools
import logging
import sys
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
logger = logging.getLogger("event_loop_debugger")
@dataclass
class CallbackRecord:
name: str
duration: float
threshold: float
timestamp: float = field(default_factory=time.time)
def is_slow(self) -> bool:
return self.duration > self.threshold
def __str__(self) -> str:
status = "SLOW" if self.is_slow() else "ok"
return f"[{status}] {self.name}: {self.duration * 1000:.1f}ms"
class DebugEventLoop:
"""
Monitors the asyncio event loop for blocking callbacks.
Wraps call_soon to measure callback duration and log warnings
when a callback exceeds the configured threshold.
Does NOT replace the event loop - wraps call_soon to instrument callbacks.
"""
def __init__(self, loop: asyncio.AbstractEventLoop, threshold: float = 0.1):
self.loop = loop
self.threshold = threshold
self._records: list[CallbackRecord] = []
self._iteration_count = 0
self._start_time = time.perf_counter()
self._original_call_soon = loop.call_soon
self._installed = False
def install(self) -> None:
"""Patch the event loop's call_soon to measure callback duration."""
if self._installed:
return
original = self._original_call_soon
def instrumented_call_soon(callback, *args, context=None):
name = getattr(callback, "__qualname__", repr(callback))
@functools.wraps(callback)
def timed(*cb_args):
self._iteration_count += 1
t0 = time.perf_counter()
try:
return callback(*cb_args)
finally:
elapsed = time.perf_counter() - t0
record = CallbackRecord(
name=name,
duration=elapsed,
threshold=self.threshold,
)
self._records.append(record)
if record.is_slow():
logger.warning(
"Slow callback: %s took %.3fs (threshold: %.3fs)",
name, elapsed, self.threshold,
)
return original(timed, *args, context=context)
self.loop.call_soon = instrumented_call_soon
self._installed = True
logger.debug("EventLoop monitor installed (threshold=%.3fs)", self.threshold)
def uninstall(self) -> None:
"""Restore the original call_soon."""
if not self._installed:
return
self.loop.call_soon = self._original_call_soon
self._installed = False
def slow_callbacks(self) -> list[CallbackRecord]:
return [r for r in self._records if r.is_slow()]
def summary(self) -> str:
elapsed = time.perf_counter() - self._start_time
slow = self.slow_callbacks()
throughput = self._iteration_count / elapsed if elapsed > 0 else 0
lines = [
"=" * 60,
"Event Loop Debug Summary",
"=" * 60,
f"Total time: {elapsed:.3f}s",
f"Callbacks executed: {self._iteration_count}",
f"Callback throughput: {throughput:.0f}/s",
f"Slow callbacks: {len(slow)} (>{self.threshold * 1000:.0f}ms)",
]
if slow:
lines.append("\nTop slow callbacks:")
for r in sorted(slow, key=lambda r: r.duration, reverse=True)[:5]:
lines.append(f" {r}")
lines.append("=" * 60)
return "\n".join(lines)
@classmethod
@asynccontextmanager
async def monitor(cls, threshold: float = 0.1):
"""Context manager that instruments the running event loop."""
loop = asyncio.get_running_loop()
instance = cls(loop, threshold=threshold)
instance.install()
try:
yield instance
finally:
instance.uninstall()
# ─── Demo ─────────────────────────────────────────────────────────────────────
async def simulate_application():
"""Simulates an application with a mix of fast and slow callbacks."""
async def fast_api_call():
await asyncio.sleep(0.01) # 10ms - normal async I/O
return "fast"
async def slow_db_query():
# Simulates forgetting to use run_in_executor for a blocking DB driver
time.sleep(0.15) # 150ms blocking - should be offloaded
return "slow"
async def background_poller():
for _ in range(5):
await asyncio.sleep(0.02) # 20ms polling interval
await asyncio.gather(
fast_api_call(),
slow_db_query(),
background_poller(),
)
async def main():
logging.basicConfig(
level=logging.WARNING,
format="%(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)
print("Running with event loop monitoring...\n")
async with DebugEventLoop.monitor(threshold=0.05) as monitor:
await simulate_application()
print(monitor.summary())
if __name__ == "__main__":
asyncio.run(main())
Expected Output
WARNING event_loop_debugger: Slow callback: slow_db_query took 0.150s (threshold: 0.050s)
============================================================
Event Loop Debug Summary
============================================================
Total time: 0.272s
Callbacks executed: 47
Callback throughput: 173/s
Slow callbacks: 1 (>50ms)
Top slow callbacks:
[SLOW] slow_db_query: 150.3ms
============================================================
The fast_api_call and background_poller coroutines complete normally. Only slow_db_query triggers the warning because time.sleep(0.15) blocks the event loop for 150ms - well above the 50ms threshold. The fix is to wrap the blocking call in loop.run_in_executor(None, blocking_func).
Graded Practice
Beginner - Understand the Fundamentals
Challenge: Predict what happens when you run the following code, and explain why the total time is approximately 1 second and not 3 seconds.
import asyncio
async def task(name: str, delay: float):
print(f"{name} starting")
await asyncio.sleep(delay)
print(f"{name} done")
async def main():
await asyncio.gather(
task("A", 1.0),
task("B", 1.0),
task("C", 1.0),
)
asyncio.run(main())
Show Answer
Output:
A starting
B starting
C starting
A done
B done
C done
Total time: approximately 1 second (not 3 seconds).
When asyncio.gather() is called, all three tasks are scheduled on the event loop simultaneously. Each task runs until its first await asyncio.sleep(delay) and then yields control. All three asyncio.sleep(1.0) timers are in flight at the same time - they are not sequential.
The event loop registers three timer callbacks, all due in approximately 1 second. After approximately 1 second, all three timers fire, and each task resumes. Because no task ever blocks the event loop (they all await asyncio.sleep, not time.sleep), the total wall-clock time is the duration of the longest single task - 1 second.
This is the core value of the event loop: concurrent I/O and timer waiting with zero thread overhead.
Intermediate - Debug a Blocking Event Loop
The following code is supposed to handle 5 "requests" concurrently, but it runs sequentially and takes approximately 5 seconds instead of approximately 1 second. Find the bug and fix it.
import asyncio
import time
async def handle_request(request_id: int) -> str:
print(f"Handling request {request_id}")
time.sleep(1) # simulate a database query
return f"Response {request_id}"
async def main():
tasks = [handle_request(i) for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Show Answer
The bug: time.sleep(1) is a blocking call. It blocks the entire event loop for 1 second per request - no other coroutines can run during that time. Even though asyncio.gather() schedules all 5 tasks, they execute one-at-a-time because each one freezes the event loop.
Fix 1 - Use asyncio.sleep when simulating async I/O:
async def handle_request(request_id: int) -> str:
print(f"Handling request {request_id}")
await asyncio.sleep(1) # yields to event loop - other tasks run concurrently
return f"Response {request_id}"
Fix 2 - Use run_in_executor for real blocking code (for example, a synchronous database driver):
async def handle_request(request_id: int) -> str:
loop = asyncio.get_running_loop()
print(f"Handling request {request_id}")
def blocking_db_query():
time.sleep(1) # runs in a thread - does not block the event loop
return f"Response {request_id}"
return await loop.run_in_executor(None, blocking_db_query)
With either fix, total time drops from approximately 5 seconds to approximately 1 second because all 5 tasks are genuinely concurrent.
Advanced - Implement a Rate Limiter Using the Event Loop
Implement an async rate limiter that allows at most N calls per second using the token bucket algorithm. The limiter must be usable as an async context manager and must be safe for concurrent use by multiple coroutines.
import asyncio
class RateLimiter:
"""Token bucket rate limiter. Allows at most `rate` calls per second."""
def __init__(self, rate: float):
... # your implementation
async def __aenter__(self):
... # wait until a token is available
async def __aexit__(self, *exc):
...
# Usage:
async def main():
limiter = RateLimiter(rate=2.0) # 2 calls per second
async def make_call(i: int):
async with limiter:
elapsed = asyncio.get_event_loop().time()
print(f"Call {i} at t={elapsed:.3f}s")
await asyncio.gather(*[make_call(i) for i in range(6)])
# Calls 0,1 fire immediately; calls 2,3 fire ~0.5s later; calls 4,5 fire ~1.0s later
asyncio.run(main())
Show Reference Solution
import asyncio
import time
class RateLimiter:
"""
Token bucket rate limiter.
Allows `rate` calls per second on average.
Tokens accumulate up to `burst` (defaults to rate).
Each call consumes one token. If no token is available, waits.
"""
def __init__(self, rate: float, burst: float | None = None):
self.rate = rate # tokens per second
self.burst = burst if burst is not None else rate
self._tokens = self.burst # start full
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
def _refill(self) -> None:
"""Add tokens based on elapsed time since last refill."""
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last_refill = now
async def acquire(self) -> None:
"""Wait until a token is available, then consume one."""
async with self._lock:
while True:
self._refill()
if self._tokens >= 1.0:
self._tokens -= 1.0
return
# Calculate how long to wait for the next token
deficit = 1.0 - self._tokens
wait_time = deficit / self.rate
await asyncio.sleep(wait_time)
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *exc):
pass # nothing to release in a token bucket
# Demo
async def main():
limiter = RateLimiter(rate=2.0) # 2 calls per second
start = asyncio.get_event_loop().time()
async def make_call(i: int):
async with limiter:
elapsed = asyncio.get_event_loop().time() - start
print(f"Call {i:2d} at t={elapsed:.3f}s")
await asyncio.gather(*[make_call(i) for i in range(6)])
asyncio.run(main())
# Expected output (approximate):
# Call 0 at t=0.000s - immediate (token available)
# Call 1 at t=0.000s - immediate (second token)
# Call 2 at t=0.500s - waited 0.5s for next token (rate=2/s)
# Call 3 at t=0.500s - both tokens refilled in that window
# Call 4 at t=1.000s - waited another 0.5s
# Call 5 at t=1.000s
Key design decisions:
asyncio.Lockensures that token accounting is not corrupted when multiple coroutines compete for the limiter simultaneously - this is an async race condition prevention- The
while Trueloop withawait asyncio.sleep(wait_time)ensures correctness even if the sleep is slightly imprecise - token availability is always re-checked after waking - The
_refillmethod is called inside the lock so refill and consume are atomic with respect to other coroutines
Key Takeaways
- The asyncio event loop is a loop that alternates between running ready callbacks and calling
selector.select()to wait for I/O readiness. It is not threading, not parallelism, and not magic. selector.select()wrapsepoll(Linux),kqueue(macOS/BSD), orIOCP(Windows) - O(1) I/O readiness notification that scales to tens of thousands of concurrent connections without creating threads.- Every
awaitexpression is a yield point. Coroutines that neverawaitnever yield, blocking the event loop and all other concurrent work for the entire duration of their execution. - Use
asyncio.get_running_loop()inside async code andasyncio.run(main())as the entry point.asyncio.get_event_loop()is deprecated for most uses since Python 3.10 and will become an error in a future release. loop.call_soon(),loop.call_later(), andloop.call_at()are the primitives for scheduling plain callbacks.asyncio.create_task()wraps these for coroutines that need toawait.loop.call_soon_threadsafe()is required when scheduling work on the event loop from a non-event-loop thread. Regularcall_soonis not thread-safe.loop.set_debug(True)enables slow callback detection, coroutine leak warnings, and detailed logging. Always use it during development; disable in production to avoid overhead.- Never block the event loop - even 100ms of
time.sleep()freezes all concurrent coroutines. Useawait asyncio.sleep()orloop.run_in_executor()to keep the event loop responsive. - uvloop is a drop-in replacement for the default asyncio event loop that is 2–4x faster for I/O-heavy workloads. Install with
pip install uvloopand enable with one line. Does not support Windows. - You cannot run two event loops simultaneously in the same thread. Frameworks like FastAPI and aiohttp own the event loop lifecycle - you never call
asyncio.run()inside framework application code.
What's Next
Lesson 05 covers Race Conditions and Thread Safety - a deeper look at why concurrent code fails, what the GIL actually guarantees (less than you think), and how to detect and fix data races in both threaded and async code. You will see how x += 1 is secretly four bytecode instructions, why even asyncio code can have race conditions at await boundaries, and how to build data structures that behave correctly under real concurrency.
