Skip to main content

Custom Awaitables

Before reading any explanation, predict what this program prints:

import asyncio

class Delayed:
def __init__(self, value, seconds):
self.value = value
self.seconds = seconds

def __await__(self):
yield from asyncio.sleep(self.seconds).__await__()
return self.value

async def main():
result = await Delayed(42, 0.1)
print(result)
print(type(Delayed(42, 0.1)))

asyncio.run(main())

Two questions: (1) Does this work? (2) What type is Delayed(42, 0.1)?

# Output:
# 42
# <class '__main__.Delayed'>

Yes, it works. Delayed is not a coroutine, not a Future, and not an async generator. It is a plain class that implements __await__, making it awaitable. The await expression recognizes any object with __await__ and delegates to the iterator it returns. Understanding this protocol is the key to building custom async primitives.

What You Will Learn

  • The __await__ protocol and what await actually does at the bytecode level
  • How coroutines, Futures, and Tasks relate to the __await__ protocol
  • Building custom awaitable objects for lazy initialization, caching, and deferred computation
  • The generator-based foundation of async/await
  • Wrapping synchronous blocking APIs as awaitable objects
  • Real-world patterns: lazy async singletons, cached awaitables, retry awaitables

Prerequisites

  • Solid understanding of async def, await, and the event loop
  • Generators: yield, yield from, send(), StopIteration with values
  • Familiarity with asyncio.Future at a usage level
  • Async context managers from Lesson 2

Part 1 -- What await Actually Does

The await keyword is syntactic sugar for a specific protocol. When you write result = await expr, Python roughly does:

# result = await expr

# Step 1: Get the __await__ iterator
_iter = expr.__await__()

# Step 2: Drive the iterator (simplified -- the event loop does this)
try:
while True:
_yielded = next(_iter)
# _yielded is sent to the event loop as a "suspension point"
# The event loop will resume this when the I/O is ready
except StopIteration as e:
result = e.value # The return value comes from StopIteration.value

This is the same protocol as yield from on a generator. In fact, before Python 3.5 introduced async def / await, coroutines were written as generators with @asyncio.coroutine and yield from.

The Three Awaitable Types

Python recognizes three things as awaitable:

  1. Coroutine objects -- returned by async def functions. They implement __await__ by returning themselves (coroutines are iterators).

  2. Objects with __await__ -- any class implementing __await__() that returns an iterator.

  3. asyncio.Future objects -- implement __await__ to suspend until a result is set.

import asyncio
import inspect

async def coro_func():
return 42

coro = coro_func()
print(inspect.isawaitable(coro)) # True
print(hasattr(coro, '__await__')) # True

future = asyncio.get_event_loop().create_future()
print(inspect.isawaitable(future)) # True
print(hasattr(future, '__await__')) # True

class MyAwaitable:
def __await__(self):
yield # At least one yield to create a generator
return 42

obj = MyAwaitable()
print(inspect.isawaitable(obj)) # True
print(hasattr(obj, '__await__')) # True

# Close the coroutine to avoid warning
coro.close()

Part 2 -- The await Protocol in Depth

__await__ must return an iterator. The iterator's yield values are "messages" to the event loop, and the StopIteration.value is the final result.

Minimal Awaitable

The simplest possible awaitable yields nothing and returns immediately:

class Immediate:
"""An awaitable that returns a value without suspending."""
def __init__(self, value):
self.value = value

def __await__(self):
return self.value
# This makes __await__ a regular function that returns,
# NOT a generator. But wait -- this won't work!

# TypeError: __await__ must return an iterator

__await__ must return an iterator (a generator). A plain function with return is not a generator. You need at least the possibility of yielding:

class Immediate:
"""An awaitable that returns a value without suspending."""
def __init__(self, value):
self.value = value

def __await__(self):
# This is a generator because it COULD yield (even though it never does)
if False:
yield
return self.value

A cleaner approach uses a helper:

class Immediate:
def __init__(self, value):
self.value = value

def __await__(self):
return iter([]) # Empty iterator -- no suspension
# But this doesn't let us return a value...

The standard pattern for returning a value is:

class Immediate:
def __init__(self, value):
self.value = value

def __await__(self):
# Generator that returns a value without yielding
yield # Suspend once (event loop will resume us immediately)
return self.value
note

A single yield without a value causes one suspension point. The event loop resumes the coroutine on the next iteration of its loop. For truly zero-cost immediate values, use if False: yield which makes __await__ a generator without actually suspending. However, in practice the single suspension is negligible.

Delegating to Other Awaitables

The most common pattern is delegating to existing awaitables using yield from:

class DelayedValue:
"""Return a value after a delay."""

def __init__(self, value, delay: float):
self.value = value
self.delay = delay

def __await__(self):
# Delegate to asyncio.sleep's __await__
yield from asyncio.sleep(self.delay).__await__()
return self.value


async def main():
result = await DelayedValue("hello", 0.5)
print(result) # hello (after 0.5 seconds)

asyncio.run(main())

The yield from delegates the entire iteration protocol to asyncio.sleep()'s iterator. Every yield from sleep() passes through, and when sleep() finishes, execution continues with return self.value.

Part 3 -- Future Internals

asyncio.Future is the foundational awaitable in asyncio. Understanding its __await__ implementation reveals how the event loop suspends and resumes coroutines.

Simplified Future Implementation

class SimpleFuture:
"""Simplified version of asyncio.Future to show the protocol."""

_PENDING = "PENDING"
_FINISHED = "FINISHED"

def __init__(self):
self._state = self._PENDING
self._result = None
self._exception = None
self._callbacks = []

def set_result(self, result):
if self._state != self._PENDING:
raise RuntimeError("Future already resolved")
self._result = result
self._state = self._FINISHED
self._schedule_callbacks()

def set_exception(self, exception):
if self._state != self._PENDING:
raise RuntimeError("Future already resolved")
self._exception = exception
self._state = self._FINISHED
self._schedule_callbacks()

def result(self):
if self._state != self._FINISHED:
raise RuntimeError("Result not yet available")
if self._exception:
raise self._exception
return self._result

def done(self):
return self._state == self._FINISHED

def add_done_callback(self, callback):
if self._state == self._FINISHED:
# Already done -- schedule callback immediately
asyncio.get_event_loop().call_soon(callback, self)
else:
self._callbacks.append(callback)

def _schedule_callbacks(self):
for callback in self._callbacks:
asyncio.get_event_loop().call_soon(callback, self)
self._callbacks.clear()

def __await__(self):
if not self.done():
# THIS IS THE KEY: yield self to the event loop
# The event loop sees the yielded Future and registers
# itself as a callback. When set_result() is called,
# the callback resumes the coroutine.
yield self
# When we resume, the result is available
return self.result()

The critical insight: when a Future yields itself during __await__, the event loop receives the Future object. The event loop then calls future.add_done_callback(resume_coroutine). When someone later calls future.set_result(), the callback fires and the event loop resumes the coroutine that was waiting.

How asyncio.sleep() Works

async def sleep(delay):
"""Simplified version of asyncio.sleep."""
future = asyncio.get_event_loop().create_future()
loop = asyncio.get_event_loop()

# Schedule setting the result after the delay
loop.call_later(delay, future.set_result, None)

# Suspend until the future is resolved
await future

This is the entire mechanism:

  1. Create a Future.
  2. Schedule something that will set the Future's result.
  3. await the Future (which yields it to the event loop).
  4. The event loop registers a callback to resume us when the Future resolves.
  5. When call_later fires, it calls future.set_result(), which triggers the callback, which resumes our coroutine.

Part 4 -- Building Custom Awaitables

Pattern 1: Lazy Async Singleton

import asyncio

class AsyncSingleton:
"""Awaitable that initializes a resource once and caches it.

Multiple coroutines can await this concurrently -- only one
initialization occurs.
"""

def __init__(self, factory):
self._factory = factory
self._instance = None
self._lock = asyncio.Lock()
self._initialized = False

def __await__(self):
return self._get_instance().__await__()

async def _get_instance(self):
if self._initialized:
return self._instance

async with self._lock:
# Double-check after acquiring lock
if not self._initialized:
self._instance = await self._factory()
self._initialized = True

return self._instance


# Usage
async def create_db_pool():
print("Creating database pool (expensive)")
await asyncio.sleep(0.5)
return {"pool": "connected", "size": 10}

db_pool = AsyncSingleton(create_db_pool)

async def handler_1():
pool = await db_pool # First call creates the pool
print(f"Handler 1: {pool}")

async def handler_2():
pool = await db_pool # Second call reuses the cached pool
print(f"Handler 2: {pool}")

async def main():
await asyncio.gather(handler_1(), handler_2())

asyncio.run(main())
# Output:
# Creating database pool (expensive)
# Handler 1: {'pool': 'connected', 'size': 10}
# Handler 2: {'pool': 'connected', 'size': 10}

Pattern 2: Cached Awaitable with TTL

import time

class CachedAwaitable:
"""An awaitable that caches its result for a given TTL."""

def __init__(self, coro_factory, ttl: float = 60.0):
self._factory = coro_factory
self._ttl = ttl
self._cached_result = None
self._cached_at = 0.0
self._lock = asyncio.Lock()

def __await__(self):
return self._get().__await__()

async def _get(self):
now = time.monotonic()
if self._cached_result is not None and now - self._cached_at < self._ttl:
return self._cached_result

async with self._lock:
# Double-check after lock
now = time.monotonic()
if self._cached_result is not None and now - self._cached_at < self._ttl:
return self._cached_result

self._cached_result = await self._factory()
self._cached_at = now
return self._cached_result

def invalidate(self):
"""Force cache miss on next await."""
self._cached_at = 0.0


# Usage
async def fetch_config():
print("Fetching remote config...")
await asyncio.sleep(0.1)
return {"feature_flags": {"new_ui": True}, "version": 3}

config = CachedAwaitable(fetch_config, ttl=300.0)

async def main():
c1 = await config # Fetches
c2 = await config # Cache hit
print(c1 is c2) # True

config.invalidate()
c3 = await config # Fetches again
print(c3)

asyncio.run(main())
# Output:
# Fetching remote config...
# True
# Fetching remote config...
# {'feature_flags': {'new_ui': True}, 'version': 3}

Pattern 3: Retry Awaitable

class RetryAwaitable:
"""An awaitable that retries a coroutine on failure."""

def __init__(self, coro_factory, max_retries: int = 3,
backoff: float = 1.0, exceptions=(Exception,)):
self._factory = coro_factory
self._max_retries = max_retries
self._backoff = backoff
self._exceptions = exceptions

def __await__(self):
return self._execute().__await__()

async def _execute(self):
last_error = None
for attempt in range(self._max_retries):
try:
return await self._factory()
except self._exceptions as e:
last_error = e
if attempt < self._max_retries - 1:
delay = self._backoff * (2 ** attempt)
await asyncio.sleep(delay)
raise last_error


# Usage
call_count = 0

async def flaky_api_call():
global call_count
call_count += 1
if call_count < 3:
raise ConnectionError(f"Attempt {call_count} failed")
return {"status": "ok"}

async def main():
result = await RetryAwaitable(
flaky_api_call,
max_retries=5,
backoff=0.1,
exceptions=(ConnectionError,)
)
print(result)

asyncio.run(main())
# Output: {'status': 'ok'}

Part 5 -- Wrapping Synchronous APIs

A common need is wrapping blocking synchronous code so it can be awaited without blocking the event loop:

import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor

class AsyncWrapper:
"""Make a blocking function awaitable by running it in a thread."""

_executor = ThreadPoolExecutor(max_workers=4)

def __init__(self, func, *args, **kwargs):
self._func = func
self._args = args
self._kwargs = kwargs

def __await__(self):
return self._run().__await__()

async def _run(self):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
functools.partial(self._func, *self._args, **self._kwargs)
)


# Usage: wrap blocking I/O
import hashlib

def compute_hash(data: bytes) -> str:
"""CPU-intensive blocking function."""
return hashlib.pbkdf2_hmac('sha256', data, b'salt', 100000).hex()

async def main():
# Without wrapping -- blocks the event loop
# result = compute_hash(b"password")

# With wrapping -- runs in thread pool
result = await AsyncWrapper(compute_hash, b"password")
print(f"Hash: {result[:16]}...")

asyncio.run(main())

A decorator version:

def make_async(func):
"""Decorator to make a sync function awaitable."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return AsyncWrapper(func, *args, **kwargs)
return wrapper

@make_async
def blocking_read(path: str) -> str:
with open(path) as f:
return f.read()

async def main():
content = await blocking_read("/etc/hostname")
print(content)

Part 6 -- Coroutines Under the Hood

Generator-Based History

Before Python 3.5, coroutines were generators:

# Python 3.4 style (historical reference)
import asyncio

@asyncio.coroutine
def old_style_coro():
result = yield from asyncio.sleep(1)
return result

Python 3.5 introduced async def / await as dedicated syntax, but the underlying mechanism is the same: coroutines are specialized generators. The CPython implementation uses the same frame and generator machinery.

Inspecting Coroutine State

import asyncio
import inspect

async def example():
await asyncio.sleep(0)
return 42

coro = example()

print(inspect.getcoroutinestate(coro)) # CORO_CREATED
print(inspect.getcoroutinelocals(coro)) # {}

# Drive it manually (DO NOT do this in real code)
try:
coro.send(None) # Start the coroutine
except StopIteration as e:
# This would only work if the coro returned immediately
pass

print(inspect.getcoroutinestate(coro)) # CORO_SUSPENDED

coro.close()
print(inspect.getcoroutinestate(coro)) # CORO_CLOSED

The Coroutine Wrapper Chain

When you write nested await expressions, a chain of __await__ iterators forms:

async def inner():
await asyncio.sleep(0.1) # Yields a Future to the event loop
return "inner result"

async def middle():
result = await inner() # yield from inner().__await__()
return f"middle got: {result}"

async def outer():
result = await middle() # yield from middle().__await__()
return f"outer got: {result}"

The yield from asyncio.sleep() bubbles all the way up through the chain to the event loop. When the timer fires, the send() call propagates back down through the chain.

Part 7 -- Advanced Patterns

Awaitable Property Descriptor

class AsyncProperty:
"""A descriptor that makes a property awaitable with caching."""

def __init__(self, func):
self.func = func
self.attr_name = f"_async_prop_{func.__name__}"

def __set_name__(self, owner, name):
self.attr_name = f"_async_prop_{name}"

def __get__(self, obj, objtype=None):
if obj is None:
return self

class Awaitable:
def __await__(inner_self):
return self._resolve(obj).__await__()

return Awaitable()

async def _resolve(self, obj):
cached = getattr(obj, self.attr_name, None)
if cached is not None:
return cached
result = await self.func(obj)
setattr(obj, self.attr_name, result)
return result


class User:
def __init__(self, user_id: int):
self.user_id = user_id

@AsyncProperty
async def profile(self):
"""Lazily fetch and cache user profile."""
print(f"Fetching profile for user {self.user_id}")
await asyncio.sleep(0.1)
return {"id": self.user_id, "name": f"User {self.user_id}"}


async def main():
user = User(42)
profile1 = await user.profile # Fetches
profile2 = await user.profile # Cache hit
print(profile1)
print(profile1 is profile2) # True

asyncio.run(main())
# Output:
# Fetching profile for user 42
# {'id': 42, 'name': 'User 42'}
# True

Composable Awaitable Chain

class AsyncPipe:
"""Chain of async transformations that can be awaited."""

def __init__(self, coro_or_value):
if asyncio.iscoroutine(coro_or_value):
self._coro = coro_or_value
else:
async def _wrap():
return coro_or_value
self._coro = _wrap()

def then(self, func):
"""Chain a transformation. func receives the previous result."""
async def chained():
result = await self._coro
if asyncio.iscoroutinefunction(func):
return await func(result)
return func(result)
return AsyncPipe(chained())

def __await__(self):
return self._coro.__await__()


# Usage
async def fetch_data(url):
await asyncio.sleep(0.1)
return {"items": [1, 2, 3, 4, 5]}

async def main():
result = await (
AsyncPipe(fetch_data("https://api.example.com"))
.then(lambda data: data["items"])
.then(lambda items: [x * 2 for x in items])
.then(lambda items: sum(items))
)
print(result) # 30

asyncio.run(main())

Awaitable with Progress Callback

class ProgressAwaitable:
"""An awaitable that reports progress."""

def __init__(self, total_steps: int, step_func, on_progress=None):
self.total_steps = total_steps
self.step_func = step_func
self.on_progress = on_progress

def __await__(self):
return self._run().__await__()

async def _run(self):
results = []
for i in range(self.total_steps):
result = await self.step_func(i)
results.append(result)
if self.on_progress:
progress = (i + 1) / self.total_steps * 100
self.on_progress(progress)
return results


async def process_step(step_num):
await asyncio.sleep(0.05)
return step_num ** 2

async def main():
results = await ProgressAwaitable(
total_steps=10,
step_func=process_step,
on_progress=lambda p: print(f"\rProgress: {p:.0f}%", end="", flush=True)
)
print(f"\nResults: {results}")

asyncio.run(main())
# Output:
# Progress: 100%
# Results: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Key Takeaways

  • await works on any object with an __await__ method that returns an iterator. This is the only requirement.
  • __await__ typically delegates to another awaitable using yield from other.__await__(). This creates a chain of suspension/resumption through nested coroutines.
  • asyncio.Future.__await__ yields the Future itself to the event loop. The event loop registers a callback to resume the coroutine when the Future resolves. This is the fundamental mechanism of async I/O in Python.
  • Coroutines are specialized generators under the hood. async def / await is syntactic sugar over the generator yield from protocol.
  • Custom awaitables enable patterns impossible with plain coroutines: lazy initialization, cached computation, retry logic, progress tracking, and composable pipelines.
  • When wrapping blocking code, run it in an executor and await the resulting Future. This is how asyncio.to_thread() and loop.run_in_executor() work.
  • The __await__ method must return an iterator (generator). A plain return statement is not sufficient -- use if False: yield or delegate via yield from.

Graded Practice Challenges

Level 1 -- Predict the Output

Question 1: What does this print?

import asyncio

class Always42:
def __await__(self):
if False:
yield
return 42

async def main():
x = await Always42()
y = await Always42()
print(x + y)

asyncio.run(main())
Answer
84

Each await Always42() creates a new instance, calls __await__(), gets a generator that never yields, and immediately returns 42 via StopIteration(42). No suspension occurs.

Question 2: What does this print?

import asyncio

class ChainedAwait:
def __init__(self, value):
self.value = value

def __await__(self):
result = yield from asyncio.sleep(0).__await__()
return self.value * 2

async def main():
a = await ChainedAwait(5)
b = await ChainedAwait(a)
print(b)

asyncio.run(main())
Answer
20

First await returns 5 * 2 = 10. Second await receives 10 and returns 10 * 2 = 20.

Question 3: Does this code work? If not, why?

import asyncio

class BrokenAwaitable:
def __await__(self):
return 42

async def main():
result = await BrokenAwaitable()
print(result)

asyncio.run(main())
Answer

It raises TypeError: __await__() returned a non-iterator of type 'int'. The __await__ method must return an iterator (a generator object or any object with __next__). A plain integer is not an iterator. Fix by using if False: yield before the return.

Level 2 -- Debug Challenge

This custom awaitable is supposed to cache the result of an async operation, but it fails when multiple coroutines await it concurrently. Find the bug.

import asyncio

class CachedResult:
def __init__(self, coro_factory):
self._factory = coro_factory
self._result = None
self._fetched = False

def __await__(self):
return self._get().__await__()

async def _get(self):
if not self._fetched:
self._result = await self._factory()
self._fetched = True
return self._result


async def expensive_query():
print("Running expensive query...")
await asyncio.sleep(1)
return {"data": [1, 2, 3]}

cached = CachedResult(expensive_query)

async def main():
# Two coroutines await the same cached result concurrently
r1, r2 = await asyncio.gather(
cached.__await__().__asend(None), # Wrong way to drive it
cached.__await__().__asend(None),
)
Answer

There are two bugs:

  1. Race condition: Without a lock, both coroutines see _fetched = False simultaneously and both call _factory(), running the expensive query twice.

  2. Incorrect driving: The main() function tries to manually drive the awaitable, which is wrong. The fix is straightforward:

class CachedResult:
def __init__(self, coro_factory):
self._factory = coro_factory
self._result = None
self._fetched = False
self._lock = asyncio.Lock() # Fix 1: Add lock

def __await__(self):
return self._get().__await__()

async def _get(self):
if self._fetched:
return self._result

async with self._lock: # Fix 1: Serialize initialization
if not self._fetched: # Double-check after lock
self._result = await self._factory()
self._fetched = True
return self._result

# Fix 2: Await normally
async def main():
r1, r2 = await asyncio.gather(
get_cached(),
get_cached(),
)
print(r1 is r2) # True

async def get_cached():
return await cached

Level 3 -- Design Challenge

Design a LazyService class that:

  1. Wraps an async factory function that creates an expensive resource
  2. Is awaitable -- service = await lazy_service -- and returns the initialized resource
  3. Supports async with for lifecycle management (cleanup when done)
  4. Is safe for concurrent access (multiple coroutines can await simultaneously)
  5. Supports re-initialization after the resource is closed
  6. Tracks initialization time and access count
Design Hints
class LazyService:
def __init__(self, factory, cleanup=None):
self._factory = factory
self._cleanup = cleanup
self._instance = None
self._lock = asyncio.Lock()
self._initialized = False
self._init_time = 0.0
self._access_count = 0

def __await__(self):
return self._get().__await__()

async def _get(self):
self._access_count += 1
if self._initialized:
return self._instance

async with self._lock:
if not self._initialized:
start = asyncio.get_event_loop().time()
self._instance = await self._factory()
self._init_time = asyncio.get_event_loop().time() - start
self._initialized = True
return self._instance

async def __aenter__(self):
return await self

async def __aexit__(self, *exc):
if self._initialized and self._cleanup:
await self._cleanup(self._instance)
self._instance = None
self._initialized = False
return False

@property
def stats(self):
return {
"initialized": self._initialized,
"init_time": self._init_time,
"access_count": self._access_count,
}

What's Next

You now understand the machinery behind await. The next lesson, Advanced Event Loop, takes you deeper into the event loop itself -- the scheduler that drives all these awaitables. You will learn about selectors, callbacks, timers, custom event loop policies, and high-performance alternatives like uvloop.

© 2026 EngineersOfAI. All rights reserved.