Skip to main content

Advanced Event Loop

Before reading any explanation, predict the output of this program:

import asyncio

async def main():
loop = asyncio.get_event_loop()

results = []

loop.call_soon(results.append, "C")
loop.call_soon(results.append, "D")

results.append("A")
await asyncio.sleep(0)
results.append("B")
await asyncio.sleep(0)

print(" ".join(results))

asyncio.run(main())

Think about when call_soon callbacks execute relative to await asyncio.sleep(0).

# Output:
# A C D B

results.append("A") runs immediately (synchronous code). Then await asyncio.sleep(0) suspends the coroutine. The event loop processes the call_soon callbacks (C, then D) before resuming the coroutine. After resuming, results.append("B") runs. The second sleep(0) has no pending callbacks, so B is the last item.

This ordering reveals a key insight: await asyncio.sleep(0) is not a no-op. It yields control to the event loop, which processes all pending callbacks before returning. Understanding the event loop's internal scheduling is essential for writing correct and performant async code.

What You Will Learn

  • Event loop architecture: the selector, callback queue, and timer heap
  • call_soon, call_later, call_at for scheduling callbacks
  • How run_in_executor bridges sync and async worlds
  • Custom event loop policies for testing and specialization
  • uvloop as a drop-in performance upgrade
  • Signal handling in async applications
  • Real-world patterns: graceful shutdown, health monitoring, periodic tasks

Prerequisites

  • Solid understanding of async/await, Tasks, and Futures from previous lessons
  • Familiarity with the __await__ protocol from Lesson 4
  • Basic understanding of OS-level I/O multiplexing (select/poll/epoll) is helpful but not required
  • Experience with asyncio.run(), create_task(), gather()

Part 1 -- Event Loop Architecture

The event loop is a single-threaded scheduler that multiplexes I/O operations and callbacks. Every asyncio program runs on one event loop (per thread). Understanding its internal phases explains why async code behaves the way it does.

The Event Loop Cycle

Each iteration of the event loop performs these steps:

  1. Ready callbacks: Execute all callbacks that are ready to run (from call_soon).
  2. I/O polling: Call the selector (epoll/kqueue/select) to check for I/O events. The timeout for the poll is the time until the nearest scheduled callback.
  3. I/O callbacks: Execute callbacks for file descriptors that are ready.
  4. Scheduled callbacks: Execute call_later / call_at callbacks whose deadline has passed.

The Selector

The event loop uses the operating system's I/O multiplexing API:

PlatformDefault SelectorDescription
LinuxepollO(1) readiness notification
macOSkqueueO(1) readiness notification
WindowsIOCP (via ProactorEventLoop)Completion-based I/O
FallbackselectO(n) polling, limited to 1024 fds
import asyncio
import selectors

# Check which selector is being used
loop = asyncio.new_event_loop()
print(type(loop._selector))
# Linux: <class 'selectors.EpollSelector'>
# macOS: <class 'selectors.KqueueSelector'>
loop.close()

Callback Queue Internals

import asyncio

async def demonstrate_callback_ordering():
loop = asyncio.get_event_loop()
order = []

# call_soon callbacks are FIFO within the same cycle
loop.call_soon(order.append, 1)
loop.call_soon(order.append, 2)
loop.call_soon(order.append, 3)

# This await gives the loop a chance to run callbacks
await asyncio.sleep(0)

print(f"call_soon order: {order}") # [1, 2, 3] -- always FIFO

asyncio.run(demonstrate_callback_ordering())

Part 2 -- Scheduling Callbacks

call_soon: Run on Next Loop Iteration

call_soon(callback, *args) schedules a callback to run on the next iteration of the event loop, after the current coroutine suspends:

import asyncio

async def main():
loop = asyncio.get_event_loop()

def sync_callback(name):
print(f"Callback: {name}")

loop.call_soon(sync_callback, "first")
loop.call_soon(sync_callback, "second")

print("Before sleep")
await asyncio.sleep(0) # Yield to event loop
print("After sleep")

asyncio.run(main())
# Output:
# Before sleep
# Callback: first
# Callback: second
# After sleep

call_soon_threadsafe: Cross-Thread Scheduling

When you need to schedule a callback from a different thread:

import asyncio
import threading

async def main():
loop = asyncio.get_event_loop()
results = []

def background_work():
"""Runs in a separate thread."""
import time
time.sleep(0.1)
# Schedule callback on the event loop from this thread
loop.call_soon_threadsafe(results.append, "from thread")

thread = threading.Thread(target=background_work)
thread.start()

await asyncio.sleep(0.2) # Wait for thread to schedule
thread.join()

print(results) # ['from thread']

asyncio.run(main())
danger

call_soon is NOT thread-safe. If you call it from a non-event-loop thread, you get undefined behavior (data corruption, missed callbacks, crashes). Always use call_soon_threadsafe from other threads.

call_later and call_at: Timed Callbacks

import asyncio

async def main():
loop = asyncio.get_event_loop()
results = []

# call_later: relative time (seconds from now)
loop.call_later(0.3, results.append, "300ms")
loop.call_later(0.1, results.append, "100ms")
loop.call_later(0.2, results.append, "200ms")

await asyncio.sleep(0.5)
print(f"call_later order: {results}")
# call_later order: ['100ms', '200ms', '300ms']

results.clear()

# call_at: absolute time (loop.time() based)
now = loop.time()
loop.call_at(now + 0.2, results.append, "at+200ms")
loop.call_at(now + 0.1, results.append, "at+100ms")

await asyncio.sleep(0.5)
print(f"call_at order: {results}")
# call_at order: ['at+100ms', 'at+200ms']

asyncio.run(main())

Cancelling Scheduled Callbacks

call_later and call_at return a TimerHandle that can be cancelled:

async def main():
loop = asyncio.get_event_loop()

handle = loop.call_later(1.0, print, "This will not print")
print(f"Handle cancelled: {handle.cancelled()}") # False

handle.cancel()
print(f"Handle cancelled: {handle.cancelled()}") # True

await asyncio.sleep(1.5)
# Nothing prints -- the callback was cancelled

asyncio.run(main())

Part 3 -- Running Sync Code in Async Context

run_in_executor: Thread and Process Pools

The event loop's run_in_executor runs a synchronous function in a thread or process pool and returns an awaitable Future:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def blocking_io(url: str) -> str:
"""Simulate a blocking I/O operation."""
time.sleep(1)
return f"Response from {url}"

def cpu_intensive(n: int) -> int:
"""CPU-bound computation."""
return sum(i * i for i in range(n))


async def main():
loop = asyncio.get_event_loop()

# Default executor (ThreadPoolExecutor)
result = await loop.run_in_executor(None, blocking_io, "https://api.example.com")
print(result)

# Custom thread pool
thread_pool = ThreadPoolExecutor(max_workers=4)
results = await asyncio.gather(
loop.run_in_executor(thread_pool, blocking_io, "url1"),
loop.run_in_executor(thread_pool, blocking_io, "url2"),
loop.run_in_executor(thread_pool, blocking_io, "url3"),
)
print(f"Parallel results: {len(results)}")
thread_pool.shutdown(wait=False)

# Process pool for CPU-bound work
process_pool = ProcessPoolExecutor(max_workers=4)
result = await loop.run_in_executor(process_pool, cpu_intensive, 10_000_000)
print(f"CPU result: {result}")
process_pool.shutdown(wait=False)

asyncio.run(main())

asyncio.to_thread (Python 3.9+)

A simpler API for the common case of running in a thread:

import asyncio

def sync_read_file(path: str) -> str:
with open(path) as f:
return f.read()

async def main():
# Equivalent to loop.run_in_executor(None, sync_read_file, path)
content = await asyncio.to_thread(sync_read_file, "/etc/hostname")
print(content)

asyncio.run(main())

Setting the Default Executor

from concurrent.futures import ThreadPoolExecutor

async def main():
loop = asyncio.get_event_loop()

# Set a custom default executor
executor = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix="async-worker"
)
loop.set_default_executor(executor)

# Now run_in_executor(None, ...) uses our custom pool
result = await loop.run_in_executor(None, blocking_io, "url")
print(result)

asyncio.run(main())
tip

The default executor has min(32, os.cpu_count() + 4) workers. For I/O-heavy applications, you may want more. For CPU-heavy work, use ProcessPoolExecutor to bypass the GIL. Always size your pools based on measured workload.

Part 4 -- Custom Event Loop Policies

An event loop policy determines which event loop class is created. The default policy creates SelectorEventLoop on Unix and ProactorEventLoop on Windows.

Writing a Custom Policy

import asyncio

class DebugEventLoop(asyncio.SelectorEventLoop):
"""An event loop that logs all callback scheduling."""

def call_soon(self, callback, *args, context=None):
print(f" [call_soon] {callback.__qualname__}({args})")
return super().call_soon(callback, *args, context=context)

def call_later(self, delay, callback, *args, context=None):
print(f" [call_later] {delay}s -> {callback.__qualname__}({args})")
return super().call_later(delay, callback, *args, context=context)

def _run_once(self):
print("--- loop iteration ---")
super()._run_once()


class DebugEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return DebugEventLoop()


# Use the custom policy
asyncio.set_event_loop_policy(DebugEventLoopPolicy())

async def main():
await asyncio.sleep(0.1)
print("Done")

asyncio.run(main())
# Shows detailed loop iteration logs

# Reset to default policy
asyncio.set_event_loop_policy(None)

Testing with Custom Loops

Custom policies are useful for testing:

class DeterministicEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
"""Event loop that provides deterministic time for testing."""

def new_event_loop(self):
loop = super().new_event_loop()
loop._time_offset = 0.0
original_time = loop.time

def controlled_time():
return original_time() + loop._time_offset

loop.time = controlled_time
return loop

Part 5 -- uvloop: Production Performance

uvloop is a drop-in replacement for the default asyncio event loop, built on top of libuv (the same library that powers Node.js). It is typically 2-4x faster than the default loop for I/O-heavy workloads.

Installation and Usage

pip install uvloop
import asyncio
import uvloop

# Option 1: Set as the default policy (affects all loops)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Option 2: Use uvloop.run() (Python 3.12+ / uvloop 0.18+)
# uvloop.run(main())

async def main():
# Your existing async code works unchanged
await asyncio.sleep(0.1)
print("Running on uvloop")

asyncio.run(main())

Performance Comparison

import asyncio
import time

async def benchmark_loop(num_tasks=10000):
"""Benchmark: create and await many tasks."""
async def noop():
await asyncio.sleep(0)

start = time.perf_counter()
tasks = [asyncio.create_task(noop()) for _ in range(num_tasks)]
await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
return elapsed

async def main():
elapsed = await benchmark_loop(10000)
loop = asyncio.get_event_loop()
print(f"Loop type: {type(loop).__name__}")
print(f"10,000 tasks: {elapsed:.3f}s")

# Run with default loop
asyncio.run(main())

# Run with uvloop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())
asyncio.set_event_loop_policy(None)

# Typical results:
# SelectorEventLoop: 10,000 tasks: 0.180s
# uvloop: 10,000 tasks: 0.065s

When uvloop Helps Most

Workloaduvloop Benefit
Many small I/O operations (HTTP requests, Redis commands)High (2-4x)
Few large I/O operations (file streaming)Low
CPU-bound (computation in Python)None
High connection count (10k+ concurrent)High
Timer-heavy (many call_later/sleep)Moderate

Limitations

  • No Windows support (libuv supports Windows, but uvloop does not compile there)
  • No asyncio.subprocess support on some platforms
  • Cannot be used with custom _run_once debugging hooks (it replaces the entire loop)
  • Some introspection APIs (loop._selector, etc.) are not available
note

For production FastAPI and Starlette applications, uvloop is the standard recommendation. Uvicorn uses it by default when available: uvicorn main:app --loop uvloop.

Part 6 -- Signal Handling

Async applications need to handle OS signals for graceful shutdown, configuration reload, and other operations.

Basic Signal Handling

import asyncio
import signal

async def main():
loop = asyncio.get_event_loop()
stop_event = asyncio.Event()

def handle_sigterm():
print("Received SIGTERM -- initiating shutdown")
stop_event.set()

def handle_sighup():
print("Received SIGHUP -- reloading configuration")
# Trigger config reload

# Register signal handlers
loop.add_signal_handler(signal.SIGTERM, handle_sigterm)
loop.add_signal_handler(signal.SIGINT, handle_sigterm) # Ctrl+C

# On Unix, you can also handle SIGHUP
if hasattr(signal, 'SIGHUP'):
loop.add_signal_handler(signal.SIGHUP, handle_sighup)

print("Server running. Press Ctrl+C to stop.")
await stop_event.wait()
print("Shutting down gracefully...")

asyncio.run(main())

Signal Handling with Cleanup

import asyncio
import signal

class AsyncServer:
def __init__(self):
self._shutdown_event = asyncio.Event()
self._tasks: set = set()

async def start(self):
loop = asyncio.get_event_loop()

# Register signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self._signal_handler, sig)

# Start background workers
for i in range(3):
task = asyncio.create_task(self._worker(i))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)

print("Server started with 3 workers")
await self._shutdown_event.wait()
await self._shutdown()

def _signal_handler(self, sig):
print(f"\nReceived {signal.Signals(sig).name}")
self._shutdown_event.set()

async def _shutdown(self):
print("Cancelling workers...")
for task in self._tasks:
task.cancel()

# Wait for all tasks to handle cancellation
results = await asyncio.gather(*self._tasks, return_exceptions=True)

for result in results:
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
print(f"Worker error during shutdown: {result}")

print("Shutdown complete")

async def _worker(self, worker_id):
try:
while True:
print(f"Worker {worker_id}: processing")
await asyncio.sleep(2)
except asyncio.CancelledError:
print(f"Worker {worker_id}: cleaning up")
await asyncio.sleep(0.1) # Simulate cleanup
print(f"Worker {worker_id}: done")
raise

asyncio.run(AsyncServer().start())
danger

Signal handlers registered with loop.add_signal_handler run in the event loop thread. They cannot be coroutines -- they must be regular functions. To trigger async cleanup from a signal handler, set an asyncio.Event and have a coroutine await it.

Part 7 -- Practical Patterns

Pattern 1: Periodic Task

async def periodic(interval: float, func, *args):
"""Run a function periodically with drift correction."""
loop = asyncio.get_event_loop()
next_call = loop.time() + interval

while True:
await func(*args)
now = loop.time()
# Calculate sleep time to maintain consistent interval
sleep_time = max(0, next_call - now)
await asyncio.sleep(sleep_time)
next_call += interval
# If we fell behind, skip missed intervals
if next_call < now:
next_call = now + interval


async def health_check():
print(f"Health check at {asyncio.get_event_loop().time():.1f}")

async def main():
task = asyncio.create_task(periodic(1.0, health_check))
await asyncio.sleep(5.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

asyncio.run(main())

Pattern 2: Running Blocking Libraries

import asyncio
from functools import partial

# Hypothetical blocking library
class BlockingDatabase:
def connect(self, dsn):
import time
time.sleep(0.5) # Blocking!
return self

def query(self, sql, params=None):
import time
time.sleep(0.1) # Blocking!
return [{"id": 1, "name": "example"}]

def close(self):
import time
time.sleep(0.1)


class AsyncDatabaseAdapter:
"""Wrap a blocking database library for async usage."""

def __init__(self, dsn: str, executor=None):
self._dsn = dsn
self._executor = executor
self._db = BlockingDatabase()

async def __aenter__(self):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self._executor,
self._db.connect,
self._dsn
)
return self

async def __aexit__(self, *exc):
loop = asyncio.get_event_loop()
await loop.run_in_executor(self._executor, self._db.close)
return False

async def query(self, sql: str, params=None):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
partial(self._db.query, sql, params)
)


async def main():
async with AsyncDatabaseAdapter("postgresql://localhost/db") as db:
rows = await db.query("SELECT * FROM users WHERE active = %s", [True])
print(f"Found {len(rows)} users")

asyncio.run(main())

Pattern 3: Event Loop Time for Instrumentation

import asyncio

class LoopMonitor:
"""Monitor event loop health by tracking iteration latency."""

def __init__(self, threshold_ms: float = 100.0):
self.threshold_ms = threshold_ms
self._last_check = 0.0
self._running = False

async def start(self):
self._running = True
loop = asyncio.get_event_loop()
self._last_check = loop.time()

while self._running:
await asyncio.sleep(0.1) # Check every 100ms
now = loop.time()
elapsed_ms = (now - self._last_check) * 1000

if elapsed_ms > self.threshold_ms + 50: # 50ms grace
print(
f"WARNING: Event loop blocked for {elapsed_ms:.0f}ms "
f"(threshold: {self.threshold_ms:.0f}ms)"
)

self._last_check = now

def stop(self):
self._running = False


async def main():
monitor = LoopMonitor(threshold_ms=200)
task = asyncio.create_task(monitor.start())

# Simulate a blocking call that stalls the event loop
import time
await asyncio.sleep(0.5)
time.sleep(0.5) # BLOCKS the event loop for 500ms!
await asyncio.sleep(0.5)

monitor.stop()
await task

asyncio.run(main())
# Output: WARNING: Event loop blocked for ~600ms (threshold: 200ms)

Pattern 4: Custom Protocol with Low-Level APIs

import asyncio

class EchoProtocol(asyncio.Protocol):
"""Low-level protocol for an echo server."""

def connection_made(self, transport):
self.transport = transport
peer = transport.get_extra_info('peername')
print(f"Connection from {peer}")

def data_received(self, data):
message = data.decode()
print(f"Received: {message!r}")
self.transport.write(data) # Echo back

def connection_lost(self, exc):
print("Connection closed")


async def main():
loop = asyncio.get_event_loop()
server = await loop.create_server(
EchoProtocol,
'127.0.0.1',
8888
)

print("Echo server running on 127.0.0.1:8888")
async with server:
await server.serve_forever()

# asyncio.run(main()) # Uncomment to run

Key Takeaways

  • The event loop is a single-threaded scheduler that cycles through callback execution, I/O polling (via selectors), and timer processing.
  • call_soon callbacks run on the next loop iteration (FIFO order). call_later and call_at use a timer heap for time-based scheduling. All return cancellable handles.
  • call_soon_threadsafe is the ONLY safe way to schedule from another thread. call_soon from a non-event-loop thread causes undefined behavior.
  • run_in_executor and asyncio.to_thread bridge synchronous blocking code into the async world. Use ThreadPoolExecutor for I/O-bound work, ProcessPoolExecutor for CPU-bound work.
  • Custom event loop policies let you replace the event loop implementation for debugging, testing, or performance.
  • uvloop provides 2-4x performance improvement for I/O-heavy workloads and is the standard choice for production Python web services.
  • Signal handlers run synchronously in the event loop thread. Use asyncio.Event to bridge from signal handlers to async cleanup coroutines.
  • await asyncio.sleep(0) is not a no-op -- it yields to the event loop, allowing pending callbacks and I/O to be processed.

Graded Practice Challenges

Level 1 -- Predict the Output

Question 1: What does this print?

import asyncio

async def main():
loop = asyncio.get_event_loop()
output = []

loop.call_later(0.2, output.append, "B")
loop.call_later(0.1, output.append, "A")
loop.call_later(0.3, output.append, "C")

await asyncio.sleep(0.5)
print(output)

asyncio.run(main())
Answer
['A', 'B', 'C']

call_later schedules based on delay. They execute in time order: 0.1s (A), 0.2s (B), 0.3s (C). Registration order does not matter.

Question 2: What does this print?

import asyncio

async def main():
loop = asyncio.get_event_loop()
output = []

handle = loop.call_later(0.1, output.append, "cancelled")
handle.cancel()

loop.call_soon(output.append, "soon-1")
loop.call_soon(output.append, "soon-2")

await asyncio.sleep(0)
output.append("after-sleep")
await asyncio.sleep(0.2)

print(output)

asyncio.run(main())
Answer
['soon-1', 'soon-2', 'after-sleep']

The call_later callback is cancelled, so "cancelled" never appears. The two call_soon callbacks run during the first sleep(0). Then "after-sleep" is appended. The second sleep(0.2) gives time for the (now-cancelled) timer to expire, but it is already cancelled.

Question 3: What is the output order?

import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
print(name)

async def main():
loop = asyncio.get_event_loop()

asyncio.create_task(task("task-1", 0.2))
asyncio.create_task(task("task-2", 0.1))
loop.call_later(0.15, print, "callback")

await asyncio.sleep(0.3)

asyncio.run(main())
Answer
task-2
callback
task-1

task-2 wakes at 0.1s, the callback fires at 0.15s, and task-1 wakes at 0.2s. Tasks and callbacks are interleaved based on their scheduled time.

Level 2 -- Debug Challenge

This code tries to run a blocking function without freezing the event loop, but it still blocks. Find and fix the issue.

import asyncio
import time

def heavy_computation(n):
"""Intentionally CPU-heavy."""
total = 0
for i in range(n):
total += i * i
return total

async def monitor():
while True:
print(f"Monitor alive at {time.monotonic():.1f}")
await asyncio.sleep(0.5)

async def main():
task = asyncio.create_task(monitor())

# This should not block the event loop
result = await asyncio.create_task(
asyncio.coroutine(heavy_computation)(10_000_000)
)
print(f"Result: {result}")

task.cancel()

asyncio.run(main())
Answer

The code tries to make heavy_computation a coroutine, but asyncio.coroutine (deprecated in 3.8, removed in 3.11) just wraps a generator-based coroutine -- it does not run the function in a thread. The CPU-bound work still blocks the event loop.

Fix: use asyncio.to_thread or loop.run_in_executor:

async def main():
task = asyncio.create_task(monitor())

# Run blocking code in a thread
result = await asyncio.to_thread(heavy_computation, 10_000_000)

# Or for CPU-bound work, use a ProcessPoolExecutor:
# from concurrent.futures import ProcessPoolExecutor
# loop = asyncio.get_event_loop()
# with ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, heavy_computation, 10_000_000)

print(f"Result: {result}")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

Note: to_thread uses a ThreadPoolExecutor, which is fine for I/O but does not bypass the GIL for CPU work. For true parallelism, use ProcessPoolExecutor.

Level 3 -- Design Challenge

Design an AsyncScheduler class that:

  1. Supports scheduling recurring tasks at fixed intervals (like cron)
  2. Supports one-time delayed tasks
  3. Handles task failures without crashing the scheduler
  4. Supports cancelling individual scheduled tasks by name
  5. Provides a status() method showing all scheduled tasks and their next run time
  6. Shuts down gracefully when used as an async context manager
Design Hints
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Coroutine

@dataclass
class ScheduledTask:
name: str
func: Callable[[], Coroutine]
interval: float | None # None = one-shot
next_run: float
task: asyncio.Task | None = None
cancelled: bool = False

class AsyncScheduler:
def __init__(self):
self._tasks: dict[str, ScheduledTask] = {}
self._running = False

def schedule_recurring(self, name, func, interval, delay=0):
loop = asyncio.get_event_loop()
self._tasks[name] = ScheduledTask(
name=name, func=func, interval=interval,
next_run=loop.time() + delay
)

def schedule_once(self, name, func, delay):
loop = asyncio.get_event_loop()
self._tasks[name] = ScheduledTask(
name=name, func=func, interval=None,
next_run=loop.time() + delay
)

def cancel(self, name):
if name in self._tasks:
self._tasks[name].cancelled = True
if self._tasks[name].task:
self._tasks[name].task.cancel()

def status(self):
return {
name: {
"next_run": st.next_run,
"interval": st.interval,
"cancelled": st.cancelled
}
for name, st in self._tasks.items()
}

async def __aenter__(self):
self._running = True
self._runner = asyncio.create_task(self._run_loop())
return self

async def __aexit__(self, *exc):
self._running = False
self._runner.cancel()
try:
await self._runner
except asyncio.CancelledError:
pass

async def _run_loop(self):
while self._running:
loop = asyncio.get_event_loop()
now = loop.time()
for st in self._tasks.values():
if st.cancelled or (st.task and not st.task.done()):
continue
if now >= st.next_run:
st.task = asyncio.create_task(self._safe_run(st))
await asyncio.sleep(0.01)

async def _safe_run(self, st):
try:
await st.func()
except Exception as e:
print(f"Task {st.name} failed: {e}")
finally:
if st.interval and not st.cancelled:
st.next_run = asyncio.get_event_loop().time() + st.interval
elif not st.interval:
st.cancelled = True

What's Next

Now that you understand the event loop's scheduling machinery, the next lesson, Async Synchronization Patterns, covers the concurrency primitives built on top of it -- locks, semaphores, events, conditions, and barriers -- and how to use them for rate limiting, bounded concurrency, and circuit breakers.

© 2026 EngineersOfAI. All rights reserved.