Skip to main content

The Event Loop Explained

Reading time: ~35 minutes | Level: Intermediate → Engineering

Before reading further, consider this:

import asyncio

loop = asyncio.get_event_loop()
# DeprecationWarning: There is no current event loop

In Python 3.10+, calling asyncio.get_event_loop() in code that is not already inside a running loop emits a DeprecationWarning. In Python 3.12+, this warning is strengthened and is planned to become an error in a future release. Why did the Python core team change an API that millions of programs use? And what should you call instead?

The answer reveals something fundamental about how the event loop actually works - and why the old API was always subtly wrong. This lesson gives you the engineering-depth answer.

What You Will Learn

  • What the event loop physically is - a loop that polls for I/O readiness and runs callbacks
  • How the OS tells Python which file descriptors are ready: select, epoll, kqueue
  • The exact cycle the event loop runs - from scheduling callbacks to polling I/O and back
  • asyncio.get_event_loop() vs asyncio.get_running_loop() vs asyncio.run() - which to use and when
  • Handles and callbacks: loop.call_soon(), loop.call_later(), loop.call_at()
  • Debugging the event loop in production - slow callback detection, loop.set_debug(True)
  • uvloop - the drop-in replacement that is 2–4x faster and when to use it
  • Why you cannot run multiple event loops in one thread, and how frameworks work around it

Prerequisites

  • Lesson 03 of this module: Asyncio - you should be comfortable with async def, await, asyncio.gather, and coroutines
  • Basic familiarity with what a file descriptor is (a number the OS uses to refer to an open file, socket, or pipe)

Part 1 - What the Event Loop Actually Is

Not Magic - a Loop

Strip away all the Python abstractions and the asyncio event loop is this:

while True:
run all callbacks that are ready to run
ask the OS: which file descriptors are ready for I/O?
for each ready file descriptor:
run the callback registered for it
check for scheduled (time-based) callbacks and run any that are due

That is the entire event loop. It is not threading. It is not parallelism. It is a single-threaded program that repeatedly asks the operating system "is any I/O ready?" and then runs registered Python functions in response.

The Key Insight: Cooperative Scheduling

When your coroutine does await asyncio.sleep(1), it does not block the thread. It registers a callback with the event loop to be called after 1 second, then returns control to the event loop. The event loop then runs other ready callbacks - other coroutines that are waiting for different things. After 1 second, your callback fires and your coroutine resumes where it left off.

This is cooperative multitasking. Every await expression is a yield point where your coroutine voluntarily yields control back to the event loop. If you never await, you never yield, and you block the entire event loop for every other coroutine.

import asyncio

async def cooperative():
print("Starting cooperative task")
await asyncio.sleep(0) # yield to event loop - other tasks can run
print("Resumed after yield")

async def blocking():
print("Starting blocking task")
# No await - runs to completion without yielding
for i in range(10_000_000): # blocks the event loop for ~100ms
pass
print("Done (the event loop was frozen the whole time)")

Part 2 - Selectors: How the OS Tells Python Which I/O Is Ready

The Problem the Event Loop Solves

A naive I/O approach would be: read from socket A, wait for data, then read from socket B, wait for data, and so on. If you have 1,000 sockets, you would spend most of your time waiting, sequentially, for each one.

The event loop's solution: register all 1,000 sockets with the operating system and ask "let me know when ANY of them has data". The OS does this efficiently with system calls designed for this purpose.

The Three Selector Mechanisms

Python's selectors module provides a platform-independent interface. Underneath, it uses the most efficient mechanism available:

PlatformMechanismMax FDsComplexity
LinuxepollUnlimitedO(1) per event
macOS / BSDkqueueUnlimitedO(1) per event
WindowsIOCPUnlimitedO(1) per event
All (fallback)select1024O(n) per call
import selectors
import socket

# This is the abstraction asyncio uses internally
sel = selectors.DefaultSelector() # picks the best available mechanism

server = socket.socket()
server.setblocking(False) # non-blocking - will not sleep waiting for data
server.bind(("localhost", 8080))
server.listen()

def accept_callback(sock, mask):
conn, addr = sock.accept()
print(f"Connection from {addr}")

# Register: "call accept_callback when server socket is readable"
sel.register(server, selectors.EVENT_READ, data=accept_callback)

while True:
# Block here until at least one registered fd is ready
# timeout=0.1 means: wait at most 100ms, then return even if nothing is ready
events = sel.select(timeout=0.1)

for key, mask in events:
callback = key.data
callback(key.fileobj, mask)

How asyncio Uses Selectors

The asyncio event loop wraps exactly this pattern. Every time you do await reader.read(1024) on an asyncio stream, the event loop registers the underlying socket's file descriptor with the selector. It then calls selector.select() to wait for I/O. When data arrives, the selector returns the file descriptor, the event loop finds the coroutine waiting on it, and schedules it to resume.

# What asyncio does internally (simplified):
async def _read_from_socket(sock, n_bytes):
loop = asyncio.get_running_loop()

# Create a Future that will be resolved when data is available
future = loop.create_future()

def _read_callback():
data = sock.recv(n_bytes)
future.set_result(data) # wake up the awaiting coroutine

loop.add_reader(sock.fileno(), _read_callback)

try:
return await future # yield to event loop here
finally:
loop.remove_reader(sock.fileno())

Part 3 - The Event Loop Cycle

The Full Loop Iteration

Why the Order Matters

The order within a single loop iteration is not arbitrary. Callbacks scheduled with call_soon() run before the selector is polled. This means:

import asyncio

async def demonstrate_ordering():
loop = asyncio.get_running_loop()

results = []

# call_soon schedules for the current or very next iteration
loop.call_soon(lambda: results.append("call_soon 1"))
loop.call_soon(lambda: results.append("call_soon 2"))

# Yield - let the loop iteration complete
await asyncio.sleep(0)

# call_soon callbacks ran before we got back here
print(results) # ['call_soon 1', 'call_soon 2']

asyncio.run(demonstrate_ordering())

The Poll Timeout Calculation

When the event loop calls selector.select(timeout), the timeout is not arbitrary. The loop calculates: "when is the next time-based callback due?" If loop.call_later(5.0, callback) was registered and 3 seconds have passed, the timeout is 2 seconds. This ensures the loop wakes up exactly when the scheduled callback is due - not earlier (wasting CPU), not later (missing the deadline).

If there are ready callbacks in the queue, the timeout is 0 - the loop polls the selector but does not block, processes any ready I/O, and immediately runs the queued callbacks.

Part 4 - get_event_loop vs get_running_loop vs asyncio.run

Why get_event_loop() Was a Problem

asyncio.get_event_loop() was designed in Python 3.4 when the asyncio API was new. Its behavior was: return the running event loop if one exists, otherwise create a new one and set it as the current loop for this thread.

The problem: implicitly creating an event loop without the caller knowing about it led to bugs where code created an event loop, ran some coroutines, then a different part of the code created another event loop in the same thread - leaving orphaned loops and leaked resources.

In Python 3.10, the implicit creation behavior was deprecated. In Python 3.12, get_event_loop() outside a running loop emits a DeprecationWarning every time it creates a new loop.

The Three Functions, Explained

import asyncio

# 1. asyncio.run() - the right way to start an event loop from synchronous code.
# Creates a new event loop, runs the coroutine, closes the loop when done.
# Always use this as the top-level entry point.
asyncio.run(my_coroutine())

# 2. asyncio.get_running_loop() - use this INSIDE async code.
# Returns the currently running event loop.
# Raises RuntimeError if no loop is running (never returns None).
# This is the correct way to get the loop inside a coroutine.
async def my_coroutine():
loop = asyncio.get_running_loop() # always correct inside async def
future = loop.create_future()
# ...

# 3. asyncio.get_event_loop() - deprecated for most uses.
# Only use it if you are writing library code that must support
# being called both inside and outside a running loop.
# Prefer get_running_loop() inside async code.
# Prefer asyncio.run() as the entry point.

Decision Table

SituationUse This
Top-level entry point (main script)asyncio.run(main())
Inside a coroutine - need the loopasyncio.get_running_loop()
Need a Future inside a coroutineloop = asyncio.get_running_loop(); loop.create_future()
Need to schedule work from a callbackasyncio.get_running_loop()
Legacy code, must not break existing APIasyncio.get_event_loop() (with caution)
tip

Inside any async def function, always use asyncio.get_running_loop() to get the current event loop. It is faster (no thread-local lookup overhead), raises RuntimeError if called outside a loop (which catches bugs immediately), and is the officially endorsed approach since Python 3.10.

Part 5 - Handles and Callbacks

The Three Scheduling Functions

The event loop exposes three low-level callback scheduling functions. These are the primitives that asyncio.sleep(), asyncio.wait_for(), and asyncio.create_task() are all built on.

import asyncio

async def demonstrate_handles():
loop = asyncio.get_running_loop()
now = loop.time() # monotonic clock used by the event loop

results = []

# call_soon: run on the next iteration of the event loop.
# Returns a Handle object - call handle.cancel() to cancel.
handle1 = loop.call_soon(lambda: results.append("call_soon"))

# call_later: run after a delay in seconds.
# Returns a TimerHandle - also cancellable.
handle2 = loop.call_later(0.1, lambda: results.append("call_later 100ms"))

# call_at: run at an absolute event loop time.
# loop.time() returns seconds since the loop started (monotonic).
handle3 = loop.call_at(now + 0.2, lambda: results.append("call_at 200ms"))

await asyncio.sleep(0.3) # wait for all three to fire

print(results)
# ['call_soon', 'call_later 100ms', 'call_at 200ms']

asyncio.run(demonstrate_handles())

Handles Are Cancellable

async def cancellable_callback():
loop = asyncio.get_running_loop()

def expensive_operation():
print("This would do something expensive")

# Schedule it for 5 seconds from now
handle = loop.call_later(5.0, expensive_operation)

# Changed our mind - cancel before it fires
await asyncio.sleep(1.0)
handle.cancel()
print(f"Cancelled: {handle.cancelled()}") # True

await asyncio.sleep(5.0) # expensive_operation never runs

asyncio.run(cancellable_callback())

The Difference Between call_soon and asyncio.create_task

call_soon schedules a plain Python callable (not a coroutine). asyncio.create_task wraps a coroutine in a Task object which the event loop drives to completion, handling await expressions internally.

async def demo():
loop = asyncio.get_running_loop()

# call_soon: run a plain function
loop.call_soon(lambda: print("Plain callback - runs once, no await support"))

# create_task: run a coroutine that can await
async def my_coro():
await asyncio.sleep(0.1)
print("Coroutine - can suspend and resume")

task = asyncio.create_task(my_coro())
await task

asyncio.run(demo())

Thread-Safe Scheduling: call_soon_threadsafe

Regular call_soon is not thread-safe. If you are running code in a different thread (for example, a callback from a thread pool) and need to schedule work on the event loop, use call_soon_threadsafe:

import asyncio
import threading

async def main():
loop = asyncio.get_running_loop()
result_future = loop.create_future()

def thread_worker():
# This runs in a separate OS thread
value = expensive_blocking_computation()
# Schedule setting the result back on the event loop thread
loop.call_soon_threadsafe(result_future.set_result, value)

def expensive_blocking_computation():
import time
time.sleep(1) # simulates blocking work in a thread
return 42

t = threading.Thread(target=thread_worker)
t.start()

# Await the result in the event loop
result = await result_future
print(f"Thread result: {result}") # 42
t.join()

asyncio.run(main())

Part 6 - Debugging the Event Loop

Slow Callback Detection

The most important production debugging feature: the event loop can detect when a callback runs for too long. Any callback that takes more than 100ms (by default) is logged as a warning.

import asyncio
import logging
import time

logging.basicConfig(level=logging.DEBUG)

async def main():
loop = asyncio.get_running_loop()

# Enable debug mode - detects slow callbacks, logs coroutine origins
loop.set_debug(True)

# Lower the threshold for testing (default is 0.1 seconds = 100ms)
loop.slow_callback_duration = 0.05 # 50ms

async def fast_task():
await asyncio.sleep(0.01) # 10ms - fine
return "fast"

async def slow_task():
time.sleep(0.2) # 200ms blocking - will trigger warning
return "slow"

await asyncio.gather(fast_task(), slow_task())
# WARNING: asyncio: Executing <Task ...> took 0.200 seconds

asyncio.run(main())

What loop.set_debug(True) Enables

import asyncio
import os

# Enable via environment variable (no code change needed):
# PYTHONASYNCIODEBUG=1 python my_script.py

# Or programmatically:
async def main():
loop = asyncio.get_running_loop()
loop.set_debug(True)

# Debug mode enables:
# 1. Slow callback detection (>100ms callbacks logged as WARNING)
# 2. Coroutine never-awaited detection
# 3. Exception logging for Tasks garbage collected with unhandled exceptions
# 4. Detailed repr for asyncio objects
# 5. ResourceWarning for unclosed transports and event loops

async def forgotten():
return 42

coro = forgotten() # created but not awaited
# In debug mode: RuntimeWarning: coroutine 'forgotten' was never awaited
del coro

asyncio.run(main())

Using asyncio.set_event_loop_policy

Event loop policies control how the event loop is created. The default policy uses asyncio.DefaultEventLoopPolicy. You can replace it globally:

import asyncio

# Switch to uvloop globally (affects all asyncio.run() calls)
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # fallback to default if uvloop is not installed

async def main():
loop = asyncio.get_running_loop()
print(type(loop).__name__) # uvloop.Loop if uvloop is installed

asyncio.run(main())
warning

Never block the event loop - not even for 100ms. A single blocking call in a coroutine freezes every other coroutine, every pending network request, and every scheduled callback for the duration of the block. time.sleep(0.1) in a coroutine is not "just 100ms" - it is 100ms of complete application freeze for all concurrent work.

Use await asyncio.sleep() instead of time.sleep(). Use loop.run_in_executor() to offload blocking I/O or CPU work to a thread pool. Never call synchronous database drivers, requests.get(), or any other blocking library directly inside a coroutine.

Part 7 - uvloop: The Drop-In Replacement

What uvloop Is

uvloop is a reimplementation of the asyncio event loop built on top of libuv - the same C event loop library that powers Node.js. It replaces Python's default event loop with a highly optimized C implementation.

pip install uvloop

Installing uvloop

import asyncio

# Option 1: Global policy replacement (affects all asyncio.run() calls)
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Option 2: Per-runner (Python 3.11+)
asyncio.run(my_coroutine(), loop_factory=uvloop.new_event_loop)

# Option 3: Using asyncio.Runner context manager
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
runner.run(my_coroutine())

Performance Characteristics

uvloop is consistently 2–4x faster than the default asyncio event loop for I/O-heavy workloads:

WorkloadDefault asynciouvloopSpeedup
Echo server (small messages)~60k req/s~180k req/s~3x
HTTP server throughput~25k req/s~80k req/s~3.2x
SSL/TLS connections~8k conn/s~22k conn/s~2.75x
Timer precision±5ms±1ms~5x more precise

The speedup comes from libuv's optimized C code paths, fewer Python-level object allocations per I/O event, and more efficient timer management via a heap-based timer wheel.

When to Use uvloop

# USE uvloop when:
# - Building a high-throughput network service (HTTP server, WebSocket server)
# - Running FastAPI, aiohttp, or any asyncio-based web framework in production
# - Processing thousands of concurrent connections
# - Timer precision matters (scheduled tasks, rate limiters)

# DO NOT bother with uvloop when:
# - Your bottleneck is CPU computation (uvloop only helps with I/O event handling)
# - You have fewer than ~100 concurrent connections (overhead is negligible)
# - You are on Windows (uvloop does not support Windows)
# - You are using Python < 3.8
tip

Use uvloop in production for any network-heavy asyncio application. The installation is one line, the API change is one line, and the throughput improvement is typically 2–3x for real workloads. Major Python web frameworks like FastAPI explicitly recommend it. The only caveat: uvloop does not support Windows - add a try/except ImportError fallback for cross-platform development environments.

try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # Windows or uvloop not installed - default loop is fine

Part 8 - Multiple Event Loops and Framework Patterns

Why You Cannot Run Two Event Loops in One Thread

The event loop is a thread-local object. Each thread can have at most one running event loop. Attempting to start a second event loop in the same thread raises a RuntimeError:

import asyncio

async def inner():
return 42

async def outer():
# WRONG: calling asyncio.run() inside a running event loop
result = asyncio.run(inner())
# RuntimeError: This event loop is already running

asyncio.run(outer())

This error is common when trying to use asyncio inside Jupyter notebooks (which run their own event loop) or inside synchronous callbacks that are called from async code.

The Jupyter / IPython Pattern

Jupyter runs an event loop in the background. Two solutions exist:

# Option 1: Use nest_asyncio (patches asyncio to allow nested loops)
import nest_asyncio
nest_asyncio.apply()

import asyncio
asyncio.run(my_coroutine()) # works after nest_asyncio.apply()

# Option 2: Use await directly (Jupyter supports top-level await natively)
result = await my_coroutine() # works in Jupyter 6.0+ without nest_asyncio

How Web Frameworks Manage the Event Loop

Frameworks like FastAPI, Starlette, and aiohttp manage the event loop lifecycle for you:

import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

@asynccontextmanager
async def lifespan(app) -> AsyncGenerator[None, None]:
# Startup: runs when the event loop starts.
# The framework (uvicorn) has already created and started the loop.
loop = asyncio.get_running_loop()
print(f"App starting on loop: {type(loop).__name__}")

# Set up shared resources
db_pool = await create_db_pool()
app.state.db_pool = db_pool

yield # application runs here

# Shutdown: cleanup
await db_pool.close()

# The framework handles asyncio.run(serve(...))
# You never call asyncio.run() yourself - the framework owns the loop lifecycle

Running Blocking Code in the Event Loop

When you must call a blocking library from async code, use run_in_executor to offload to a thread pool:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

async def mixed_workload():
loop = asyncio.get_running_loop()

# Blocking I/O (file read, legacy library) - offload to thread pool
def read_large_file(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()

# None uses the default ThreadPoolExecutor
content = await loop.run_in_executor(None, read_large_file, "/var/log/syslog")

# CPU-bound work - offload to process pool to bypass the GIL
def compress_data(data: bytes) -> bytes:
import zlib
return zlib.compress(data, level=9)

with ProcessPoolExecutor() as pool:
compressed = await loop.run_in_executor(pool, compress_data, content)

return compressed
note

loop.run_in_executor(None, func, *args) runs func(*args) in the default thread pool (a ThreadPoolExecutor). The event loop continues processing other coroutines while func runs in the thread. The result is awaitable - your coroutine resumes when func completes. This is the correct pattern for integrating any blocking code into an asyncio application.

Full Example - Event Loop Debugging Toolkit

A self-contained debugging tool you can drop into any asyncio application during development to detect slow callbacks and measure event loop health:

"""
event_loop_debugger.py

Drop-in asyncio event loop debugging toolkit.
Detects slow callbacks, unclosed resources, and loop health metrics.

Usage:
from event_loop_debugger import DebugEventLoop

async def main():
async with DebugEventLoop.monitor(threshold=0.05) as monitor:
await your_application_code()
print(monitor.summary())

asyncio.run(main())
"""

import asyncio
import time
import functools
import logging
import sys
from dataclasses import dataclass, field
from contextlib import asynccontextmanager

logger = logging.getLogger("event_loop_debugger")


@dataclass
class CallbackRecord:
name: str
duration: float
threshold: float
timestamp: float = field(default_factory=time.time)

def is_slow(self) -> bool:
return self.duration > self.threshold

def __str__(self) -> str:
status = "SLOW" if self.is_slow() else "ok"
return f"[{status}] {self.name}: {self.duration * 1000:.1f}ms"


class DebugEventLoop:
"""
Monitors the asyncio event loop for blocking callbacks.

Wraps call_soon to measure callback duration and log warnings
when a callback exceeds the configured threshold.

Does NOT replace the event loop - wraps call_soon to instrument callbacks.
"""

def __init__(self, loop: asyncio.AbstractEventLoop, threshold: float = 0.1):
self.loop = loop
self.threshold = threshold
self._records: list[CallbackRecord] = []
self._iteration_count = 0
self._start_time = time.perf_counter()
self._original_call_soon = loop.call_soon
self._installed = False

def install(self) -> None:
"""Patch the event loop's call_soon to measure callback duration."""
if self._installed:
return
original = self._original_call_soon

def instrumented_call_soon(callback, *args, context=None):
name = getattr(callback, "__qualname__", repr(callback))

@functools.wraps(callback)
def timed(*cb_args):
self._iteration_count += 1
t0 = time.perf_counter()
try:
return callback(*cb_args)
finally:
elapsed = time.perf_counter() - t0
record = CallbackRecord(
name=name,
duration=elapsed,
threshold=self.threshold,
)
self._records.append(record)
if record.is_slow():
logger.warning(
"Slow callback: %s took %.3fs (threshold: %.3fs)",
name, elapsed, self.threshold,
)

return original(timed, *args, context=context)

self.loop.call_soon = instrumented_call_soon
self._installed = True
logger.debug("EventLoop monitor installed (threshold=%.3fs)", self.threshold)

def uninstall(self) -> None:
"""Restore the original call_soon."""
if not self._installed:
return
self.loop.call_soon = self._original_call_soon
self._installed = False

def slow_callbacks(self) -> list[CallbackRecord]:
return [r for r in self._records if r.is_slow()]

def summary(self) -> str:
elapsed = time.perf_counter() - self._start_time
slow = self.slow_callbacks()
throughput = self._iteration_count / elapsed if elapsed > 0 else 0

lines = [
"=" * 60,
"Event Loop Debug Summary",
"=" * 60,
f"Total time: {elapsed:.3f}s",
f"Callbacks executed: {self._iteration_count}",
f"Callback throughput: {throughput:.0f}/s",
f"Slow callbacks: {len(slow)} (>{self.threshold * 1000:.0f}ms)",
]
if slow:
lines.append("\nTop slow callbacks:")
for r in sorted(slow, key=lambda r: r.duration, reverse=True)[:5]:
lines.append(f" {r}")
lines.append("=" * 60)
return "\n".join(lines)

@classmethod
@asynccontextmanager
async def monitor(cls, threshold: float = 0.1):
"""Context manager that instruments the running event loop."""
loop = asyncio.get_running_loop()
instance = cls(loop, threshold=threshold)
instance.install()
try:
yield instance
finally:
instance.uninstall()


# ─── Demo ─────────────────────────────────────────────────────────────────────

async def simulate_application():
"""Simulates an application with a mix of fast and slow callbacks."""

async def fast_api_call():
await asyncio.sleep(0.01) # 10ms - normal async I/O
return "fast"

async def slow_db_query():
# Simulates forgetting to use run_in_executor for a blocking DB driver
time.sleep(0.15) # 150ms blocking - should be offloaded
return "slow"

async def background_poller():
for _ in range(5):
await asyncio.sleep(0.02) # 20ms polling interval

await asyncio.gather(
fast_api_call(),
slow_db_query(),
background_poller(),
)


async def main():
logging.basicConfig(
level=logging.WARNING,
format="%(levelname)s %(name)s: %(message)s",
stream=sys.stderr,
)

print("Running with event loop monitoring...\n")

async with DebugEventLoop.monitor(threshold=0.05) as monitor:
await simulate_application()

print(monitor.summary())


if __name__ == "__main__":
asyncio.run(main())
Expected Output
WARNING event_loop_debugger: Slow callback: slow_db_query took 0.150s (threshold: 0.050s)

============================================================
Event Loop Debug Summary
============================================================
Total time: 0.272s
Callbacks executed: 47
Callback throughput: 173/s
Slow callbacks: 1 (>50ms)

Top slow callbacks:
[SLOW] slow_db_query: 150.3ms
============================================================

The fast_api_call and background_poller coroutines complete normally. Only slow_db_query triggers the warning because time.sleep(0.15) blocks the event loop for 150ms - well above the 50ms threshold. The fix is to wrap the blocking call in loop.run_in_executor(None, blocking_func).

Graded Practice

Beginner - Understand the Fundamentals

Challenge: Predict what happens when you run the following code, and explain why the total time is approximately 1 second and not 3 seconds.

import asyncio

async def task(name: str, delay: float):
print(f"{name} starting")
await asyncio.sleep(delay)
print(f"{name} done")

async def main():
await asyncio.gather(
task("A", 1.0),
task("B", 1.0),
task("C", 1.0),
)

asyncio.run(main())
Show Answer

Output:

A starting
B starting
C starting
A done
B done
C done

Total time: approximately 1 second (not 3 seconds).

When asyncio.gather() is called, all three tasks are scheduled on the event loop simultaneously. Each task runs until its first await asyncio.sleep(delay) and then yields control. All three asyncio.sleep(1.0) timers are in flight at the same time - they are not sequential.

The event loop registers three timer callbacks, all due in approximately 1 second. After approximately 1 second, all three timers fire, and each task resumes. Because no task ever blocks the event loop (they all await asyncio.sleep, not time.sleep), the total wall-clock time is the duration of the longest single task - 1 second.

This is the core value of the event loop: concurrent I/O and timer waiting with zero thread overhead.

Intermediate - Debug a Blocking Event Loop

The following code is supposed to handle 5 "requests" concurrently, but it runs sequentially and takes approximately 5 seconds instead of approximately 1 second. Find the bug and fix it.

import asyncio
import time

async def handle_request(request_id: int) -> str:
print(f"Handling request {request_id}")
time.sleep(1) # simulate a database query
return f"Response {request_id}"

async def main():
tasks = [handle_request(i) for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)

asyncio.run(main())
Show Answer

The bug: time.sleep(1) is a blocking call. It blocks the entire event loop for 1 second per request - no other coroutines can run during that time. Even though asyncio.gather() schedules all 5 tasks, they execute one-at-a-time because each one freezes the event loop.

Fix 1 - Use asyncio.sleep when simulating async I/O:

async def handle_request(request_id: int) -> str:
print(f"Handling request {request_id}")
await asyncio.sleep(1) # yields to event loop - other tasks run concurrently
return f"Response {request_id}"

Fix 2 - Use run_in_executor for real blocking code (for example, a synchronous database driver):

async def handle_request(request_id: int) -> str:
loop = asyncio.get_running_loop()
print(f"Handling request {request_id}")

def blocking_db_query():
time.sleep(1) # runs in a thread - does not block the event loop
return f"Response {request_id}"

return await loop.run_in_executor(None, blocking_db_query)

With either fix, total time drops from approximately 5 seconds to approximately 1 second because all 5 tasks are genuinely concurrent.

Advanced - Implement a Rate Limiter Using the Event Loop

Implement an async rate limiter that allows at most N calls per second using the token bucket algorithm. The limiter must be usable as an async context manager and must be safe for concurrent use by multiple coroutines.

import asyncio

class RateLimiter:
"""Token bucket rate limiter. Allows at most `rate` calls per second."""

def __init__(self, rate: float):
... # your implementation

async def __aenter__(self):
... # wait until a token is available

async def __aexit__(self, *exc):
...

# Usage:
async def main():
limiter = RateLimiter(rate=2.0) # 2 calls per second

async def make_call(i: int):
async with limiter:
elapsed = asyncio.get_event_loop().time()
print(f"Call {i} at t={elapsed:.3f}s")

await asyncio.gather(*[make_call(i) for i in range(6)])
# Calls 0,1 fire immediately; calls 2,3 fire ~0.5s later; calls 4,5 fire ~1.0s later

asyncio.run(main())
Show Reference Solution
import asyncio
import time


class RateLimiter:
"""
Token bucket rate limiter.

Allows `rate` calls per second on average.
Tokens accumulate up to `burst` (defaults to rate).
Each call consumes one token. If no token is available, waits.
"""

def __init__(self, rate: float, burst: float | None = None):
self.rate = rate # tokens per second
self.burst = burst if burst is not None else rate
self._tokens = self.burst # start full
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()

def _refill(self) -> None:
"""Add tokens based on elapsed time since last refill."""
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last_refill = now

async def acquire(self) -> None:
"""Wait until a token is available, then consume one."""
async with self._lock:
while True:
self._refill()
if self._tokens >= 1.0:
self._tokens -= 1.0
return
# Calculate how long to wait for the next token
deficit = 1.0 - self._tokens
wait_time = deficit / self.rate
await asyncio.sleep(wait_time)

async def __aenter__(self):
await self.acquire()
return self

async def __aexit__(self, *exc):
pass # nothing to release in a token bucket


# Demo
async def main():
limiter = RateLimiter(rate=2.0) # 2 calls per second

start = asyncio.get_event_loop().time()

async def make_call(i: int):
async with limiter:
elapsed = asyncio.get_event_loop().time() - start
print(f"Call {i:2d} at t={elapsed:.3f}s")

await asyncio.gather(*[make_call(i) for i in range(6)])


asyncio.run(main())

# Expected output (approximate):
# Call 0 at t=0.000s - immediate (token available)
# Call 1 at t=0.000s - immediate (second token)
# Call 2 at t=0.500s - waited 0.5s for next token (rate=2/s)
# Call 3 at t=0.500s - both tokens refilled in that window
# Call 4 at t=1.000s - waited another 0.5s
# Call 5 at t=1.000s

Key design decisions:

  • asyncio.Lock ensures that token accounting is not corrupted when multiple coroutines compete for the limiter simultaneously - this is an async race condition prevention
  • The while True loop with await asyncio.sleep(wait_time) ensures correctness even if the sleep is slightly imprecise - token availability is always re-checked after waking
  • The _refill method is called inside the lock so refill and consume are atomic with respect to other coroutines

Key Takeaways

  • The asyncio event loop is a loop that alternates between running ready callbacks and calling selector.select() to wait for I/O readiness. It is not threading, not parallelism, and not magic.
  • selector.select() wraps epoll (Linux), kqueue (macOS/BSD), or IOCP (Windows) - O(1) I/O readiness notification that scales to tens of thousands of concurrent connections without creating threads.
  • Every await expression is a yield point. Coroutines that never await never yield, blocking the event loop and all other concurrent work for the entire duration of their execution.
  • Use asyncio.get_running_loop() inside async code and asyncio.run(main()) as the entry point. asyncio.get_event_loop() is deprecated for most uses since Python 3.10 and will become an error in a future release.
  • loop.call_soon(), loop.call_later(), and loop.call_at() are the primitives for scheduling plain callbacks. asyncio.create_task() wraps these for coroutines that need to await.
  • loop.call_soon_threadsafe() is required when scheduling work on the event loop from a non-event-loop thread. Regular call_soon is not thread-safe.
  • loop.set_debug(True) enables slow callback detection, coroutine leak warnings, and detailed logging. Always use it during development; disable in production to avoid overhead.
  • Never block the event loop - even 100ms of time.sleep() freezes all concurrent coroutines. Use await asyncio.sleep() or loop.run_in_executor() to keep the event loop responsive.
  • uvloop is a drop-in replacement for the default asyncio event loop that is 2–4x faster for I/O-heavy workloads. Install with pip install uvloop and enable with one line. Does not support Windows.
  • You cannot run two event loops simultaneously in the same thread. Frameworks like FastAPI and aiohttp own the event loop lifecycle - you never call asyncio.run() inside framework application code.

What's Next

Lesson 05 covers Race Conditions and Thread Safety - a deeper look at why concurrent code fails, what the GIL actually guarantees (less than you think), and how to detect and fix data races in both threaded and async code. You will see how x += 1 is secretly four bytecode instructions, why even asyncio code can have race conditions at await boundaries, and how to build data structures that behave correctly under real concurrency.

© 2026 EngineersOfAI. All rights reserved.