Advanced Event Loop
Before reading any explanation, predict the output of this program:
import asyncio
async def main():
loop = asyncio.get_event_loop()
results = []
loop.call_soon(results.append, "C")
loop.call_soon(results.append, "D")
results.append("A")
await asyncio.sleep(0)
results.append("B")
await asyncio.sleep(0)
print(" ".join(results))
asyncio.run(main())
Think about when call_soon callbacks execute relative to await asyncio.sleep(0).
# Output:
# A C D B
results.append("A") runs immediately (synchronous code). Then await asyncio.sleep(0) suspends the coroutine. The event loop processes the call_soon callbacks (C, then D) before resuming the coroutine. After resuming, results.append("B") runs. The second sleep(0) has no pending callbacks, so B is the last item.
This ordering reveals a key insight: await asyncio.sleep(0) is not a no-op. It yields control to the event loop, which processes all pending callbacks before returning. Understanding the event loop's internal scheduling is essential for writing correct and performant async code.
What You Will Learn
- Event loop architecture: the selector, callback queue, and timer heap
call_soon,call_later,call_atfor scheduling callbacks- How
run_in_executorbridges sync and async worlds - Custom event loop policies for testing and specialization
- uvloop as a drop-in performance upgrade
- Signal handling in async applications
- Real-world patterns: graceful shutdown, health monitoring, periodic tasks
Prerequisites
- Solid understanding of async/await, Tasks, and Futures from previous lessons
- Familiarity with the
__await__protocol from Lesson 4 - Basic understanding of OS-level I/O multiplexing (select/poll/epoll) is helpful but not required
- Experience with
asyncio.run(),create_task(),gather()
Part 1 -- Event Loop Architecture
The event loop is a single-threaded scheduler that multiplexes I/O operations and callbacks. Every asyncio program runs on one event loop (per thread). Understanding its internal phases explains why async code behaves the way it does.
The Event Loop Cycle
Each iteration of the event loop performs these steps:
- Ready callbacks: Execute all callbacks that are ready to run (from
call_soon). - I/O polling: Call the selector (epoll/kqueue/select) to check for I/O events. The timeout for the poll is the time until the nearest scheduled callback.
- I/O callbacks: Execute callbacks for file descriptors that are ready.
- Scheduled callbacks: Execute
call_later/call_atcallbacks whose deadline has passed.
The Selector
The event loop uses the operating system's I/O multiplexing API:
| Platform | Default Selector | Description |
|---|---|---|
| Linux | epoll | O(1) readiness notification |
| macOS | kqueue | O(1) readiness notification |
| Windows | IOCP (via ProactorEventLoop) | Completion-based I/O |
| Fallback | select | O(n) polling, limited to 1024 fds |
import asyncio
import selectors
# Check which selector is being used
loop = asyncio.new_event_loop()
print(type(loop._selector))
# Linux: <class 'selectors.EpollSelector'>
# macOS: <class 'selectors.KqueueSelector'>
loop.close()
Callback Queue Internals
import asyncio
async def demonstrate_callback_ordering():
loop = asyncio.get_event_loop()
order = []
# call_soon callbacks are FIFO within the same cycle
loop.call_soon(order.append, 1)
loop.call_soon(order.append, 2)
loop.call_soon(order.append, 3)
# This await gives the loop a chance to run callbacks
await asyncio.sleep(0)
print(f"call_soon order: {order}") # [1, 2, 3] -- always FIFO
asyncio.run(demonstrate_callback_ordering())
Part 2 -- Scheduling Callbacks
call_soon: Run on Next Loop Iteration
call_soon(callback, *args) schedules a callback to run on the next iteration of the event loop, after the current coroutine suspends:
import asyncio
async def main():
loop = asyncio.get_event_loop()
def sync_callback(name):
print(f"Callback: {name}")
loop.call_soon(sync_callback, "first")
loop.call_soon(sync_callback, "second")
print("Before sleep")
await asyncio.sleep(0) # Yield to event loop
print("After sleep")
asyncio.run(main())
# Output:
# Before sleep
# Callback: first
# Callback: second
# After sleep
call_soon_threadsafe: Cross-Thread Scheduling
When you need to schedule a callback from a different thread:
import asyncio
import threading
async def main():
loop = asyncio.get_event_loop()
results = []
def background_work():
"""Runs in a separate thread."""
import time
time.sleep(0.1)
# Schedule callback on the event loop from this thread
loop.call_soon_threadsafe(results.append, "from thread")
thread = threading.Thread(target=background_work)
thread.start()
await asyncio.sleep(0.2) # Wait for thread to schedule
thread.join()
print(results) # ['from thread']
asyncio.run(main())
call_soon is NOT thread-safe. If you call it from a non-event-loop thread, you get undefined behavior (data corruption, missed callbacks, crashes). Always use call_soon_threadsafe from other threads.
call_later and call_at: Timed Callbacks
import asyncio
async def main():
loop = asyncio.get_event_loop()
results = []
# call_later: relative time (seconds from now)
loop.call_later(0.3, results.append, "300ms")
loop.call_later(0.1, results.append, "100ms")
loop.call_later(0.2, results.append, "200ms")
await asyncio.sleep(0.5)
print(f"call_later order: {results}")
# call_later order: ['100ms', '200ms', '300ms']
results.clear()
# call_at: absolute time (loop.time() based)
now = loop.time()
loop.call_at(now + 0.2, results.append, "at+200ms")
loop.call_at(now + 0.1, results.append, "at+100ms")
await asyncio.sleep(0.5)
print(f"call_at order: {results}")
# call_at order: ['at+100ms', 'at+200ms']
asyncio.run(main())
Cancelling Scheduled Callbacks
call_later and call_at return a TimerHandle that can be cancelled:
async def main():
loop = asyncio.get_event_loop()
handle = loop.call_later(1.0, print, "This will not print")
print(f"Handle cancelled: {handle.cancelled()}") # False
handle.cancel()
print(f"Handle cancelled: {handle.cancelled()}") # True
await asyncio.sleep(1.5)
# Nothing prints -- the callback was cancelled
asyncio.run(main())
Part 3 -- Running Sync Code in Async Context
run_in_executor: Thread and Process Pools
The event loop's run_in_executor runs a synchronous function in a thread or process pool and returns an awaitable Future:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def blocking_io(url: str) -> str:
"""Simulate a blocking I/O operation."""
time.sleep(1)
return f"Response from {url}"
def cpu_intensive(n: int) -> int:
"""CPU-bound computation."""
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_event_loop()
# Default executor (ThreadPoolExecutor)
result = await loop.run_in_executor(None, blocking_io, "https://api.example.com")
print(result)
# Custom thread pool
thread_pool = ThreadPoolExecutor(max_workers=4)
results = await asyncio.gather(
loop.run_in_executor(thread_pool, blocking_io, "url1"),
loop.run_in_executor(thread_pool, blocking_io, "url2"),
loop.run_in_executor(thread_pool, blocking_io, "url3"),
)
print(f"Parallel results: {len(results)}")
thread_pool.shutdown(wait=False)
# Process pool for CPU-bound work
process_pool = ProcessPoolExecutor(max_workers=4)
result = await loop.run_in_executor(process_pool, cpu_intensive, 10_000_000)
print(f"CPU result: {result}")
process_pool.shutdown(wait=False)
asyncio.run(main())
asyncio.to_thread (Python 3.9+)
A simpler API for the common case of running in a thread:
import asyncio
def sync_read_file(path: str) -> str:
with open(path) as f:
return f.read()
async def main():
# Equivalent to loop.run_in_executor(None, sync_read_file, path)
content = await asyncio.to_thread(sync_read_file, "/etc/hostname")
print(content)
asyncio.run(main())
Setting the Default Executor
from concurrent.futures import ThreadPoolExecutor
async def main():
loop = asyncio.get_event_loop()
# Set a custom default executor
executor = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix="async-worker"
)
loop.set_default_executor(executor)
# Now run_in_executor(None, ...) uses our custom pool
result = await loop.run_in_executor(None, blocking_io, "url")
print(result)
asyncio.run(main())
The default executor has min(32, os.cpu_count() + 4) workers. For I/O-heavy applications, you may want more. For CPU-heavy work, use ProcessPoolExecutor to bypass the GIL. Always size your pools based on measured workload.
Part 4 -- Custom Event Loop Policies
An event loop policy determines which event loop class is created. The default policy creates SelectorEventLoop on Unix and ProactorEventLoop on Windows.
Writing a Custom Policy
import asyncio
class DebugEventLoop(asyncio.SelectorEventLoop):
"""An event loop that logs all callback scheduling."""
def call_soon(self, callback, *args, context=None):
print(f" [call_soon] {callback.__qualname__}({args})")
return super().call_soon(callback, *args, context=context)
def call_later(self, delay, callback, *args, context=None):
print(f" [call_later] {delay}s -> {callback.__qualname__}({args})")
return super().call_later(delay, callback, *args, context=context)
def _run_once(self):
print("--- loop iteration ---")
super()._run_once()
class DebugEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
return DebugEventLoop()
# Use the custom policy
asyncio.set_event_loop_policy(DebugEventLoopPolicy())
async def main():
await asyncio.sleep(0.1)
print("Done")
asyncio.run(main())
# Shows detailed loop iteration logs
# Reset to default policy
asyncio.set_event_loop_policy(None)
Testing with Custom Loops
Custom policies are useful for testing:
class DeterministicEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
"""Event loop that provides deterministic time for testing."""
def new_event_loop(self):
loop = super().new_event_loop()
loop._time_offset = 0.0
original_time = loop.time
def controlled_time():
return original_time() + loop._time_offset
loop.time = controlled_time
return loop
Part 5 -- uvloop: Production Performance
uvloop is a drop-in replacement for the default asyncio event loop, built on top of libuv (the same library that powers Node.js). It is typically 2-4x faster than the default loop for I/O-heavy workloads.
Installation and Usage
pip install uvloop
import asyncio
import uvloop
# Option 1: Set as the default policy (affects all loops)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Option 2: Use uvloop.run() (Python 3.12+ / uvloop 0.18+)
# uvloop.run(main())
async def main():
# Your existing async code works unchanged
await asyncio.sleep(0.1)
print("Running on uvloop")
asyncio.run(main())
Performance Comparison
import asyncio
import time
async def benchmark_loop(num_tasks=10000):
"""Benchmark: create and await many tasks."""
async def noop():
await asyncio.sleep(0)
start = time.perf_counter()
tasks = [asyncio.create_task(noop()) for _ in range(num_tasks)]
await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
return elapsed
async def main():
elapsed = await benchmark_loop(10000)
loop = asyncio.get_event_loop()
print(f"Loop type: {type(loop).__name__}")
print(f"10,000 tasks: {elapsed:.3f}s")
# Run with default loop
asyncio.run(main())
# Run with uvloop
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())
asyncio.set_event_loop_policy(None)
# Typical results:
# SelectorEventLoop: 10,000 tasks: 0.180s
# uvloop: 10,000 tasks: 0.065s
When uvloop Helps Most
| Workload | uvloop Benefit |
|---|---|
| Many small I/O operations (HTTP requests, Redis commands) | High (2-4x) |
| Few large I/O operations (file streaming) | Low |
| CPU-bound (computation in Python) | None |
| High connection count (10k+ concurrent) | High |
| Timer-heavy (many call_later/sleep) | Moderate |
Limitations
- No Windows support (libuv supports Windows, but uvloop does not compile there)
- No
asyncio.subprocesssupport on some platforms - Cannot be used with custom
_run_oncedebugging hooks (it replaces the entire loop) - Some introspection APIs (
loop._selector, etc.) are not available
For production FastAPI and Starlette applications, uvloop is the standard recommendation. Uvicorn uses it by default when available: uvicorn main:app --loop uvloop.
Part 6 -- Signal Handling
Async applications need to handle OS signals for graceful shutdown, configuration reload, and other operations.
Basic Signal Handling
import asyncio
import signal
async def main():
loop = asyncio.get_event_loop()
stop_event = asyncio.Event()
def handle_sigterm():
print("Received SIGTERM -- initiating shutdown")
stop_event.set()
def handle_sighup():
print("Received SIGHUP -- reloading configuration")
# Trigger config reload
# Register signal handlers
loop.add_signal_handler(signal.SIGTERM, handle_sigterm)
loop.add_signal_handler(signal.SIGINT, handle_sigterm) # Ctrl+C
# On Unix, you can also handle SIGHUP
if hasattr(signal, 'SIGHUP'):
loop.add_signal_handler(signal.SIGHUP, handle_sighup)
print("Server running. Press Ctrl+C to stop.")
await stop_event.wait()
print("Shutting down gracefully...")
asyncio.run(main())
Signal Handling with Cleanup
import asyncio
import signal
class AsyncServer:
def __init__(self):
self._shutdown_event = asyncio.Event()
self._tasks: set = set()
async def start(self):
loop = asyncio.get_event_loop()
# Register signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self._signal_handler, sig)
# Start background workers
for i in range(3):
task = asyncio.create_task(self._worker(i))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
print("Server started with 3 workers")
await self._shutdown_event.wait()
await self._shutdown()
def _signal_handler(self, sig):
print(f"\nReceived {signal.Signals(sig).name}")
self._shutdown_event.set()
async def _shutdown(self):
print("Cancelling workers...")
for task in self._tasks:
task.cancel()
# Wait for all tasks to handle cancellation
results = await asyncio.gather(*self._tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
print(f"Worker error during shutdown: {result}")
print("Shutdown complete")
async def _worker(self, worker_id):
try:
while True:
print(f"Worker {worker_id}: processing")
await asyncio.sleep(2)
except asyncio.CancelledError:
print(f"Worker {worker_id}: cleaning up")
await asyncio.sleep(0.1) # Simulate cleanup
print(f"Worker {worker_id}: done")
raise
asyncio.run(AsyncServer().start())
Signal handlers registered with loop.add_signal_handler run in the event loop thread. They cannot be coroutines -- they must be regular functions. To trigger async cleanup from a signal handler, set an asyncio.Event and have a coroutine await it.
Part 7 -- Practical Patterns
Pattern 1: Periodic Task
async def periodic(interval: float, func, *args):
"""Run a function periodically with drift correction."""
loop = asyncio.get_event_loop()
next_call = loop.time() + interval
while True:
await func(*args)
now = loop.time()
# Calculate sleep time to maintain consistent interval
sleep_time = max(0, next_call - now)
await asyncio.sleep(sleep_time)
next_call += interval
# If we fell behind, skip missed intervals
if next_call < now:
next_call = now + interval
async def health_check():
print(f"Health check at {asyncio.get_event_loop().time():.1f}")
async def main():
task = asyncio.create_task(periodic(1.0, health_check))
await asyncio.sleep(5.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(main())
Pattern 2: Running Blocking Libraries
import asyncio
from functools import partial
# Hypothetical blocking library
class BlockingDatabase:
def connect(self, dsn):
import time
time.sleep(0.5) # Blocking!
return self
def query(self, sql, params=None):
import time
time.sleep(0.1) # Blocking!
return [{"id": 1, "name": "example"}]
def close(self):
import time
time.sleep(0.1)
class AsyncDatabaseAdapter:
"""Wrap a blocking database library for async usage."""
def __init__(self, dsn: str, executor=None):
self._dsn = dsn
self._executor = executor
self._db = BlockingDatabase()
async def __aenter__(self):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self._executor,
self._db.connect,
self._dsn
)
return self
async def __aexit__(self, *exc):
loop = asyncio.get_event_loop()
await loop.run_in_executor(self._executor, self._db.close)
return False
async def query(self, sql: str, params=None):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
partial(self._db.query, sql, params)
)
async def main():
async with AsyncDatabaseAdapter("postgresql://localhost/db") as db:
rows = await db.query("SELECT * FROM users WHERE active = %s", [True])
print(f"Found {len(rows)} users")
asyncio.run(main())
Pattern 3: Event Loop Time for Instrumentation
import asyncio
class LoopMonitor:
"""Monitor event loop health by tracking iteration latency."""
def __init__(self, threshold_ms: float = 100.0):
self.threshold_ms = threshold_ms
self._last_check = 0.0
self._running = False
async def start(self):
self._running = True
loop = asyncio.get_event_loop()
self._last_check = loop.time()
while self._running:
await asyncio.sleep(0.1) # Check every 100ms
now = loop.time()
elapsed_ms = (now - self._last_check) * 1000
if elapsed_ms > self.threshold_ms + 50: # 50ms grace
print(
f"WARNING: Event loop blocked for {elapsed_ms:.0f}ms "
f"(threshold: {self.threshold_ms:.0f}ms)"
)
self._last_check = now
def stop(self):
self._running = False
async def main():
monitor = LoopMonitor(threshold_ms=200)
task = asyncio.create_task(monitor.start())
# Simulate a blocking call that stalls the event loop
import time
await asyncio.sleep(0.5)
time.sleep(0.5) # BLOCKS the event loop for 500ms!
await asyncio.sleep(0.5)
monitor.stop()
await task
asyncio.run(main())
# Output: WARNING: Event loop blocked for ~600ms (threshold: 200ms)
Pattern 4: Custom Protocol with Low-Level APIs
import asyncio
class EchoProtocol(asyncio.Protocol):
"""Low-level protocol for an echo server."""
def connection_made(self, transport):
self.transport = transport
peer = transport.get_extra_info('peername')
print(f"Connection from {peer}")
def data_received(self, data):
message = data.decode()
print(f"Received: {message!r}")
self.transport.write(data) # Echo back
def connection_lost(self, exc):
print("Connection closed")
async def main():
loop = asyncio.get_event_loop()
server = await loop.create_server(
EchoProtocol,
'127.0.0.1',
8888
)
print("Echo server running on 127.0.0.1:8888")
async with server:
await server.serve_forever()
# asyncio.run(main()) # Uncomment to run
Key Takeaways
- The event loop is a single-threaded scheduler that cycles through callback execution, I/O polling (via selectors), and timer processing.
call_sooncallbacks run on the next loop iteration (FIFO order).call_laterandcall_atuse a timer heap for time-based scheduling. All return cancellable handles.call_soon_threadsafeis the ONLY safe way to schedule from another thread.call_soonfrom a non-event-loop thread causes undefined behavior.run_in_executorandasyncio.to_threadbridge synchronous blocking code into the async world. UseThreadPoolExecutorfor I/O-bound work,ProcessPoolExecutorfor CPU-bound work.- Custom event loop policies let you replace the event loop implementation for debugging, testing, or performance.
- uvloop provides 2-4x performance improvement for I/O-heavy workloads and is the standard choice for production Python web services.
- Signal handlers run synchronously in the event loop thread. Use
asyncio.Eventto bridge from signal handlers to async cleanup coroutines. await asyncio.sleep(0)is not a no-op -- it yields to the event loop, allowing pending callbacks and I/O to be processed.
Graded Practice Challenges
Level 1 -- Predict the Output
Question 1: What does this print?
import asyncio
async def main():
loop = asyncio.get_event_loop()
output = []
loop.call_later(0.2, output.append, "B")
loop.call_later(0.1, output.append, "A")
loop.call_later(0.3, output.append, "C")
await asyncio.sleep(0.5)
print(output)
asyncio.run(main())
Answer
['A', 'B', 'C']
call_later schedules based on delay. They execute in time order: 0.1s (A), 0.2s (B), 0.3s (C). Registration order does not matter.
Question 2: What does this print?
import asyncio
async def main():
loop = asyncio.get_event_loop()
output = []
handle = loop.call_later(0.1, output.append, "cancelled")
handle.cancel()
loop.call_soon(output.append, "soon-1")
loop.call_soon(output.append, "soon-2")
await asyncio.sleep(0)
output.append("after-sleep")
await asyncio.sleep(0.2)
print(output)
asyncio.run(main())
Answer
['soon-1', 'soon-2', 'after-sleep']
The call_later callback is cancelled, so "cancelled" never appears. The two call_soon callbacks run during the first sleep(0). Then "after-sleep" is appended. The second sleep(0.2) gives time for the (now-cancelled) timer to expire, but it is already cancelled.
Question 3: What is the output order?
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(name)
async def main():
loop = asyncio.get_event_loop()
asyncio.create_task(task("task-1", 0.2))
asyncio.create_task(task("task-2", 0.1))
loop.call_later(0.15, print, "callback")
await asyncio.sleep(0.3)
asyncio.run(main())
Answer
task-2
callback
task-1
task-2 wakes at 0.1s, the callback fires at 0.15s, and task-1 wakes at 0.2s. Tasks and callbacks are interleaved based on their scheduled time.
Level 2 -- Debug Challenge
This code tries to run a blocking function without freezing the event loop, but it still blocks. Find and fix the issue.
import asyncio
import time
def heavy_computation(n):
"""Intentionally CPU-heavy."""
total = 0
for i in range(n):
total += i * i
return total
async def monitor():
while True:
print(f"Monitor alive at {time.monotonic():.1f}")
await asyncio.sleep(0.5)
async def main():
task = asyncio.create_task(monitor())
# This should not block the event loop
result = await asyncio.create_task(
asyncio.coroutine(heavy_computation)(10_000_000)
)
print(f"Result: {result}")
task.cancel()
asyncio.run(main())
Answer
The code tries to make heavy_computation a coroutine, but asyncio.coroutine (deprecated in 3.8, removed in 3.11) just wraps a generator-based coroutine -- it does not run the function in a thread. The CPU-bound work still blocks the event loop.
Fix: use asyncio.to_thread or loop.run_in_executor:
async def main():
task = asyncio.create_task(monitor())
# Run blocking code in a thread
result = await asyncio.to_thread(heavy_computation, 10_000_000)
# Or for CPU-bound work, use a ProcessPoolExecutor:
# from concurrent.futures import ProcessPoolExecutor
# loop = asyncio.get_event_loop()
# with ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, heavy_computation, 10_000_000)
print(f"Result: {result}")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
Note: to_thread uses a ThreadPoolExecutor, which is fine for I/O but does not bypass the GIL for CPU work. For true parallelism, use ProcessPoolExecutor.
Level 3 -- Design Challenge
Design an AsyncScheduler class that:
- Supports scheduling recurring tasks at fixed intervals (like cron)
- Supports one-time delayed tasks
- Handles task failures without crashing the scheduler
- Supports cancelling individual scheduled tasks by name
- Provides a
status()method showing all scheduled tasks and their next run time - Shuts down gracefully when used as an async context manager
Design Hints
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Coroutine
@dataclass
class ScheduledTask:
name: str
func: Callable[[], Coroutine]
interval: float | None # None = one-shot
next_run: float
task: asyncio.Task | None = None
cancelled: bool = False
class AsyncScheduler:
def __init__(self):
self._tasks: dict[str, ScheduledTask] = {}
self._running = False
def schedule_recurring(self, name, func, interval, delay=0):
loop = asyncio.get_event_loop()
self._tasks[name] = ScheduledTask(
name=name, func=func, interval=interval,
next_run=loop.time() + delay
)
def schedule_once(self, name, func, delay):
loop = asyncio.get_event_loop()
self._tasks[name] = ScheduledTask(
name=name, func=func, interval=None,
next_run=loop.time() + delay
)
def cancel(self, name):
if name in self._tasks:
self._tasks[name].cancelled = True
if self._tasks[name].task:
self._tasks[name].task.cancel()
def status(self):
return {
name: {
"next_run": st.next_run,
"interval": st.interval,
"cancelled": st.cancelled
}
for name, st in self._tasks.items()
}
async def __aenter__(self):
self._running = True
self._runner = asyncio.create_task(self._run_loop())
return self
async def __aexit__(self, *exc):
self._running = False
self._runner.cancel()
try:
await self._runner
except asyncio.CancelledError:
pass
async def _run_loop(self):
while self._running:
loop = asyncio.get_event_loop()
now = loop.time()
for st in self._tasks.values():
if st.cancelled or (st.task and not st.task.done()):
continue
if now >= st.next_run:
st.task = asyncio.create_task(self._safe_run(st))
await asyncio.sleep(0.01)
async def _safe_run(self, st):
try:
await st.func()
except Exception as e:
print(f"Task {st.name} failed: {e}")
finally:
if st.interval and not st.cancelled:
st.next_run = asyncio.get_event_loop().time() + st.interval
elif not st.interval:
st.cancelled = True
What's Next
Now that you understand the event loop's scheduling machinery, the next lesson, Async Synchronization Patterns, covers the concurrency primitives built on top of it -- locks, semaphores, events, conditions, and barriers -- and how to use them for rate limiting, bounded concurrency, and circuit breakers.
