Custom Awaitables
Before reading any explanation, predict what this program prints:
import asyncio
class Delayed:
def __init__(self, value, seconds):
self.value = value
self.seconds = seconds
def __await__(self):
yield from asyncio.sleep(self.seconds).__await__()
return self.value
async def main():
result = await Delayed(42, 0.1)
print(result)
print(type(Delayed(42, 0.1)))
asyncio.run(main())
Two questions: (1) Does this work? (2) What type is Delayed(42, 0.1)?
# Output:
# 42
# <class '__main__.Delayed'>
Yes, it works. Delayed is not a coroutine, not a Future, and not an async generator. It is a plain class that implements __await__, making it awaitable. The await expression recognizes any object with __await__ and delegates to the iterator it returns. Understanding this protocol is the key to building custom async primitives.
What You Will Learn
- The
__await__protocol and whatawaitactually does at the bytecode level - How coroutines, Futures, and Tasks relate to the
__await__protocol - Building custom awaitable objects for lazy initialization, caching, and deferred computation
- The generator-based foundation of async/await
- Wrapping synchronous blocking APIs as awaitable objects
- Real-world patterns: lazy async singletons, cached awaitables, retry awaitables
Prerequisites
- Solid understanding of
async def,await, and the event loop - Generators:
yield,yield from,send(),StopIterationwith values - Familiarity with
asyncio.Futureat a usage level - Async context managers from Lesson 2
Part 1 -- What await Actually Does
The await keyword is syntactic sugar for a specific protocol. When you write result = await expr, Python roughly does:
# result = await expr
# Step 1: Get the __await__ iterator
_iter = expr.__await__()
# Step 2: Drive the iterator (simplified -- the event loop does this)
try:
while True:
_yielded = next(_iter)
# _yielded is sent to the event loop as a "suspension point"
# The event loop will resume this when the I/O is ready
except StopIteration as e:
result = e.value # The return value comes from StopIteration.value
This is the same protocol as yield from on a generator. In fact, before Python 3.5 introduced async def / await, coroutines were written as generators with @asyncio.coroutine and yield from.
The Three Awaitable Types
Python recognizes three things as awaitable:
-
Coroutine objects -- returned by
async deffunctions. They implement__await__by returning themselves (coroutines are iterators). -
Objects with
__await__-- any class implementing__await__()that returns an iterator. -
asyncio.Futureobjects -- implement__await__to suspend until a result is set.
import asyncio
import inspect
async def coro_func():
return 42
coro = coro_func()
print(inspect.isawaitable(coro)) # True
print(hasattr(coro, '__await__')) # True
future = asyncio.get_event_loop().create_future()
print(inspect.isawaitable(future)) # True
print(hasattr(future, '__await__')) # True
class MyAwaitable:
def __await__(self):
yield # At least one yield to create a generator
return 42
obj = MyAwaitable()
print(inspect.isawaitable(obj)) # True
print(hasattr(obj, '__await__')) # True
# Close the coroutine to avoid warning
coro.close()
Part 2 -- The await Protocol in Depth
__await__ must return an iterator. The iterator's yield values are "messages" to the event loop, and the StopIteration.value is the final result.
Minimal Awaitable
The simplest possible awaitable yields nothing and returns immediately:
class Immediate:
"""An awaitable that returns a value without suspending."""
def __init__(self, value):
self.value = value
def __await__(self):
return self.value
# This makes __await__ a regular function that returns,
# NOT a generator. But wait -- this won't work!
# TypeError: __await__ must return an iterator
__await__ must return an iterator (a generator). A plain function with return is not a generator. You need at least the possibility of yielding:
class Immediate:
"""An awaitable that returns a value without suspending."""
def __init__(self, value):
self.value = value
def __await__(self):
# This is a generator because it COULD yield (even though it never does)
if False:
yield
return self.value
A cleaner approach uses a helper:
class Immediate:
def __init__(self, value):
self.value = value
def __await__(self):
return iter([]) # Empty iterator -- no suspension
# But this doesn't let us return a value...
The standard pattern for returning a value is:
class Immediate:
def __init__(self, value):
self.value = value
def __await__(self):
# Generator that returns a value without yielding
yield # Suspend once (event loop will resume us immediately)
return self.value
A single yield without a value causes one suspension point. The event loop resumes the coroutine on the next iteration of its loop. For truly zero-cost immediate values, use if False: yield which makes __await__ a generator without actually suspending. However, in practice the single suspension is negligible.
Delegating to Other Awaitables
The most common pattern is delegating to existing awaitables using yield from:
class DelayedValue:
"""Return a value after a delay."""
def __init__(self, value, delay: float):
self.value = value
self.delay = delay
def __await__(self):
# Delegate to asyncio.sleep's __await__
yield from asyncio.sleep(self.delay).__await__()
return self.value
async def main():
result = await DelayedValue("hello", 0.5)
print(result) # hello (after 0.5 seconds)
asyncio.run(main())
The yield from delegates the entire iteration protocol to asyncio.sleep()'s iterator. Every yield from sleep() passes through, and when sleep() finishes, execution continues with return self.value.
Part 3 -- Future Internals
asyncio.Future is the foundational awaitable in asyncio. Understanding its __await__ implementation reveals how the event loop suspends and resumes coroutines.
Simplified Future Implementation
class SimpleFuture:
"""Simplified version of asyncio.Future to show the protocol."""
_PENDING = "PENDING"
_FINISHED = "FINISHED"
def __init__(self):
self._state = self._PENDING
self._result = None
self._exception = None
self._callbacks = []
def set_result(self, result):
if self._state != self._PENDING:
raise RuntimeError("Future already resolved")
self._result = result
self._state = self._FINISHED
self._schedule_callbacks()
def set_exception(self, exception):
if self._state != self._PENDING:
raise RuntimeError("Future already resolved")
self._exception = exception
self._state = self._FINISHED
self._schedule_callbacks()
def result(self):
if self._state != self._FINISHED:
raise RuntimeError("Result not yet available")
if self._exception:
raise self._exception
return self._result
def done(self):
return self._state == self._FINISHED
def add_done_callback(self, callback):
if self._state == self._FINISHED:
# Already done -- schedule callback immediately
asyncio.get_event_loop().call_soon(callback, self)
else:
self._callbacks.append(callback)
def _schedule_callbacks(self):
for callback in self._callbacks:
asyncio.get_event_loop().call_soon(callback, self)
self._callbacks.clear()
def __await__(self):
if not self.done():
# THIS IS THE KEY: yield self to the event loop
# The event loop sees the yielded Future and registers
# itself as a callback. When set_result() is called,
# the callback resumes the coroutine.
yield self
# When we resume, the result is available
return self.result()
The critical insight: when a Future yields itself during __await__, the event loop receives the Future object. The event loop then calls future.add_done_callback(resume_coroutine). When someone later calls future.set_result(), the callback fires and the event loop resumes the coroutine that was waiting.
How asyncio.sleep() Works
async def sleep(delay):
"""Simplified version of asyncio.sleep."""
future = asyncio.get_event_loop().create_future()
loop = asyncio.get_event_loop()
# Schedule setting the result after the delay
loop.call_later(delay, future.set_result, None)
# Suspend until the future is resolved
await future
This is the entire mechanism:
- Create a Future.
- Schedule something that will set the Future's result.
awaitthe Future (which yields it to the event loop).- The event loop registers a callback to resume us when the Future resolves.
- When
call_laterfires, it callsfuture.set_result(), which triggers the callback, which resumes our coroutine.
Part 4 -- Building Custom Awaitables
Pattern 1: Lazy Async Singleton
import asyncio
class AsyncSingleton:
"""Awaitable that initializes a resource once and caches it.
Multiple coroutines can await this concurrently -- only one
initialization occurs.
"""
def __init__(self, factory):
self._factory = factory
self._instance = None
self._lock = asyncio.Lock()
self._initialized = False
def __await__(self):
return self._get_instance().__await__()
async def _get_instance(self):
if self._initialized:
return self._instance
async with self._lock:
# Double-check after acquiring lock
if not self._initialized:
self._instance = await self._factory()
self._initialized = True
return self._instance
# Usage
async def create_db_pool():
print("Creating database pool (expensive)")
await asyncio.sleep(0.5)
return {"pool": "connected", "size": 10}
db_pool = AsyncSingleton(create_db_pool)
async def handler_1():
pool = await db_pool # First call creates the pool
print(f"Handler 1: {pool}")
async def handler_2():
pool = await db_pool # Second call reuses the cached pool
print(f"Handler 2: {pool}")
async def main():
await asyncio.gather(handler_1(), handler_2())
asyncio.run(main())
# Output:
# Creating database pool (expensive)
# Handler 1: {'pool': 'connected', 'size': 10}
# Handler 2: {'pool': 'connected', 'size': 10}
Pattern 2: Cached Awaitable with TTL
import time
class CachedAwaitable:
"""An awaitable that caches its result for a given TTL."""
def __init__(self, coro_factory, ttl: float = 60.0):
self._factory = coro_factory
self._ttl = ttl
self._cached_result = None
self._cached_at = 0.0
self._lock = asyncio.Lock()
def __await__(self):
return self._get().__await__()
async def _get(self):
now = time.monotonic()
if self._cached_result is not None and now - self._cached_at < self._ttl:
return self._cached_result
async with self._lock:
# Double-check after lock
now = time.monotonic()
if self._cached_result is not None and now - self._cached_at < self._ttl:
return self._cached_result
self._cached_result = await self._factory()
self._cached_at = now
return self._cached_result
def invalidate(self):
"""Force cache miss on next await."""
self._cached_at = 0.0
# Usage
async def fetch_config():
print("Fetching remote config...")
await asyncio.sleep(0.1)
return {"feature_flags": {"new_ui": True}, "version": 3}
config = CachedAwaitable(fetch_config, ttl=300.0)
async def main():
c1 = await config # Fetches
c2 = await config # Cache hit
print(c1 is c2) # True
config.invalidate()
c3 = await config # Fetches again
print(c3)
asyncio.run(main())
# Output:
# Fetching remote config...
# True
# Fetching remote config...
# {'feature_flags': {'new_ui': True}, 'version': 3}
Pattern 3: Retry Awaitable
class RetryAwaitable:
"""An awaitable that retries a coroutine on failure."""
def __init__(self, coro_factory, max_retries: int = 3,
backoff: float = 1.0, exceptions=(Exception,)):
self._factory = coro_factory
self._max_retries = max_retries
self._backoff = backoff
self._exceptions = exceptions
def __await__(self):
return self._execute().__await__()
async def _execute(self):
last_error = None
for attempt in range(self._max_retries):
try:
return await self._factory()
except self._exceptions as e:
last_error = e
if attempt < self._max_retries - 1:
delay = self._backoff * (2 ** attempt)
await asyncio.sleep(delay)
raise last_error
# Usage
call_count = 0
async def flaky_api_call():
global call_count
call_count += 1
if call_count < 3:
raise ConnectionError(f"Attempt {call_count} failed")
return {"status": "ok"}
async def main():
result = await RetryAwaitable(
flaky_api_call,
max_retries=5,
backoff=0.1,
exceptions=(ConnectionError,)
)
print(result)
asyncio.run(main())
# Output: {'status': 'ok'}
Part 5 -- Wrapping Synchronous APIs
A common need is wrapping blocking synchronous code so it can be awaited without blocking the event loop:
import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor
class AsyncWrapper:
"""Make a blocking function awaitable by running it in a thread."""
_executor = ThreadPoolExecutor(max_workers=4)
def __init__(self, func, *args, **kwargs):
self._func = func
self._args = args
self._kwargs = kwargs
def __await__(self):
return self._run().__await__()
async def _run(self):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
functools.partial(self._func, *self._args, **self._kwargs)
)
# Usage: wrap blocking I/O
import hashlib
def compute_hash(data: bytes) -> str:
"""CPU-intensive blocking function."""
return hashlib.pbkdf2_hmac('sha256', data, b'salt', 100000).hex()
async def main():
# Without wrapping -- blocks the event loop
# result = compute_hash(b"password")
# With wrapping -- runs in thread pool
result = await AsyncWrapper(compute_hash, b"password")
print(f"Hash: {result[:16]}...")
asyncio.run(main())
A decorator version:
def make_async(func):
"""Decorator to make a sync function awaitable."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return AsyncWrapper(func, *args, **kwargs)
return wrapper
@make_async
def blocking_read(path: str) -> str:
with open(path) as f:
return f.read()
async def main():
content = await blocking_read("/etc/hostname")
print(content)
Part 6 -- Coroutines Under the Hood
Generator-Based History
Before Python 3.5, coroutines were generators:
# Python 3.4 style (historical reference)
import asyncio
@asyncio.coroutine
def old_style_coro():
result = yield from asyncio.sleep(1)
return result
Python 3.5 introduced async def / await as dedicated syntax, but the underlying mechanism is the same: coroutines are specialized generators. The CPython implementation uses the same frame and generator machinery.
Inspecting Coroutine State
import asyncio
import inspect
async def example():
await asyncio.sleep(0)
return 42
coro = example()
print(inspect.getcoroutinestate(coro)) # CORO_CREATED
print(inspect.getcoroutinelocals(coro)) # {}
# Drive it manually (DO NOT do this in real code)
try:
coro.send(None) # Start the coroutine
except StopIteration as e:
# This would only work if the coro returned immediately
pass
print(inspect.getcoroutinestate(coro)) # CORO_SUSPENDED
coro.close()
print(inspect.getcoroutinestate(coro)) # CORO_CLOSED
The Coroutine Wrapper Chain
When you write nested await expressions, a chain of __await__ iterators forms:
async def inner():
await asyncio.sleep(0.1) # Yields a Future to the event loop
return "inner result"
async def middle():
result = await inner() # yield from inner().__await__()
return f"middle got: {result}"
async def outer():
result = await middle() # yield from middle().__await__()
return f"outer got: {result}"
The yield from asyncio.sleep() bubbles all the way up through the chain to the event loop. When the timer fires, the send() call propagates back down through the chain.
Part 7 -- Advanced Patterns
Awaitable Property Descriptor
class AsyncProperty:
"""A descriptor that makes a property awaitable with caching."""
def __init__(self, func):
self.func = func
self.attr_name = f"_async_prop_{func.__name__}"
def __set_name__(self, owner, name):
self.attr_name = f"_async_prop_{name}"
def __get__(self, obj, objtype=None):
if obj is None:
return self
class Awaitable:
def __await__(inner_self):
return self._resolve(obj).__await__()
return Awaitable()
async def _resolve(self, obj):
cached = getattr(obj, self.attr_name, None)
if cached is not None:
return cached
result = await self.func(obj)
setattr(obj, self.attr_name, result)
return result
class User:
def __init__(self, user_id: int):
self.user_id = user_id
@AsyncProperty
async def profile(self):
"""Lazily fetch and cache user profile."""
print(f"Fetching profile for user {self.user_id}")
await asyncio.sleep(0.1)
return {"id": self.user_id, "name": f"User {self.user_id}"}
async def main():
user = User(42)
profile1 = await user.profile # Fetches
profile2 = await user.profile # Cache hit
print(profile1)
print(profile1 is profile2) # True
asyncio.run(main())
# Output:
# Fetching profile for user 42
# {'id': 42, 'name': 'User 42'}
# True
Composable Awaitable Chain
class AsyncPipe:
"""Chain of async transformations that can be awaited."""
def __init__(self, coro_or_value):
if asyncio.iscoroutine(coro_or_value):
self._coro = coro_or_value
else:
async def _wrap():
return coro_or_value
self._coro = _wrap()
def then(self, func):
"""Chain a transformation. func receives the previous result."""
async def chained():
result = await self._coro
if asyncio.iscoroutinefunction(func):
return await func(result)
return func(result)
return AsyncPipe(chained())
def __await__(self):
return self._coro.__await__()
# Usage
async def fetch_data(url):
await asyncio.sleep(0.1)
return {"items": [1, 2, 3, 4, 5]}
async def main():
result = await (
AsyncPipe(fetch_data("https://api.example.com"))
.then(lambda data: data["items"])
.then(lambda items: [x * 2 for x in items])
.then(lambda items: sum(items))
)
print(result) # 30
asyncio.run(main())
Awaitable with Progress Callback
class ProgressAwaitable:
"""An awaitable that reports progress."""
def __init__(self, total_steps: int, step_func, on_progress=None):
self.total_steps = total_steps
self.step_func = step_func
self.on_progress = on_progress
def __await__(self):
return self._run().__await__()
async def _run(self):
results = []
for i in range(self.total_steps):
result = await self.step_func(i)
results.append(result)
if self.on_progress:
progress = (i + 1) / self.total_steps * 100
self.on_progress(progress)
return results
async def process_step(step_num):
await asyncio.sleep(0.05)
return step_num ** 2
async def main():
results = await ProgressAwaitable(
total_steps=10,
step_func=process_step,
on_progress=lambda p: print(f"\rProgress: {p:.0f}%", end="", flush=True)
)
print(f"\nResults: {results}")
asyncio.run(main())
# Output:
# Progress: 100%
# Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Key Takeaways
awaitworks on any object with an__await__method that returns an iterator. This is the only requirement.__await__typically delegates to another awaitable usingyield from other.__await__(). This creates a chain of suspension/resumption through nested coroutines.asyncio.Future.__await__yields the Future itself to the event loop. The event loop registers a callback to resume the coroutine when the Future resolves. This is the fundamental mechanism of async I/O in Python.- Coroutines are specialized generators under the hood.
async def/awaitis syntactic sugar over the generatoryield fromprotocol. - Custom awaitables enable patterns impossible with plain coroutines: lazy initialization, cached computation, retry logic, progress tracking, and composable pipelines.
- When wrapping blocking code, run it in an executor and await the resulting Future. This is how
asyncio.to_thread()andloop.run_in_executor()work. - The
__await__method must return an iterator (generator). A plainreturnstatement is not sufficient -- useif False: yieldor delegate viayield from.
Graded Practice Challenges
Level 1 -- Predict the Output
Question 1: What does this print?
import asyncio
class Always42:
def __await__(self):
if False:
yield
return 42
async def main():
x = await Always42()
y = await Always42()
print(x + y)
asyncio.run(main())
Answer
84
Each await Always42() creates a new instance, calls __await__(), gets a generator that never yields, and immediately returns 42 via StopIteration(42). No suspension occurs.
Question 2: What does this print?
import asyncio
class ChainedAwait:
def __init__(self, value):
self.value = value
def __await__(self):
result = yield from asyncio.sleep(0).__await__()
return self.value * 2
async def main():
a = await ChainedAwait(5)
b = await ChainedAwait(a)
print(b)
asyncio.run(main())
Answer
20
First await returns 5 * 2 = 10. Second await receives 10 and returns 10 * 2 = 20.
Question 3: Does this code work? If not, why?
import asyncio
class BrokenAwaitable:
def __await__(self):
return 42
async def main():
result = await BrokenAwaitable()
print(result)
asyncio.run(main())
Answer
It raises TypeError: __await__() returned a non-iterator of type 'int'. The __await__ method must return an iterator (a generator object or any object with __next__). A plain integer is not an iterator. Fix by using if False: yield before the return.
Level 2 -- Debug Challenge
This custom awaitable is supposed to cache the result of an async operation, but it fails when multiple coroutines await it concurrently. Find the bug.
import asyncio
class CachedResult:
def __init__(self, coro_factory):
self._factory = coro_factory
self._result = None
self._fetched = False
def __await__(self):
return self._get().__await__()
async def _get(self):
if not self._fetched:
self._result = await self._factory()
self._fetched = True
return self._result
async def expensive_query():
print("Running expensive query...")
await asyncio.sleep(1)
return {"data": [1, 2, 3]}
cached = CachedResult(expensive_query)
async def main():
# Two coroutines await the same cached result concurrently
r1, r2 = await asyncio.gather(
cached.__await__().__asend(None), # Wrong way to drive it
cached.__await__().__asend(None),
)
Answer
There are two bugs:
-
Race condition: Without a lock, both coroutines see
_fetched = Falsesimultaneously and both call_factory(), running the expensive query twice. -
Incorrect driving: The
main()function tries to manually drive the awaitable, which is wrong. The fix is straightforward:
class CachedResult:
def __init__(self, coro_factory):
self._factory = coro_factory
self._result = None
self._fetched = False
self._lock = asyncio.Lock() # Fix 1: Add lock
def __await__(self):
return self._get().__await__()
async def _get(self):
if self._fetched:
return self._result
async with self._lock: # Fix 1: Serialize initialization
if not self._fetched: # Double-check after lock
self._result = await self._factory()
self._fetched = True
return self._result
# Fix 2: Await normally
async def main():
r1, r2 = await asyncio.gather(
get_cached(),
get_cached(),
)
print(r1 is r2) # True
async def get_cached():
return await cached
Level 3 -- Design Challenge
Design a LazyService class that:
- Wraps an async factory function that creates an expensive resource
- Is awaitable --
service = await lazy_service-- and returns the initialized resource - Supports
async withfor lifecycle management (cleanup when done) - Is safe for concurrent access (multiple coroutines can await simultaneously)
- Supports re-initialization after the resource is closed
- Tracks initialization time and access count
Design Hints
class LazyService:
def __init__(self, factory, cleanup=None):
self._factory = factory
self._cleanup = cleanup
self._instance = None
self._lock = asyncio.Lock()
self._initialized = False
self._init_time = 0.0
self._access_count = 0
def __await__(self):
return self._get().__await__()
async def _get(self):
self._access_count += 1
if self._initialized:
return self._instance
async with self._lock:
if not self._initialized:
start = asyncio.get_event_loop().time()
self._instance = await self._factory()
self._init_time = asyncio.get_event_loop().time() - start
self._initialized = True
return self._instance
async def __aenter__(self):
return await self
async def __aexit__(self, *exc):
if self._initialized and self._cleanup:
await self._cleanup(self._instance)
self._instance = None
self._initialized = False
return False
@property
def stats(self):
return {
"initialized": self._initialized,
"init_time": self._init_time,
"access_count": self._access_count,
}
What's Next
You now understand the machinery behind await. The next lesson, Advanced Event Loop, takes you deeper into the event loop itself -- the scheduler that drives all these awaitables. You will learn about selectors, callbacks, timers, custom event loop policies, and high-performance alternatives like uvloop.
