Python Custom Awaitables Practice Problems & Exercises
Practice: Custom Awaitables
← Back to lessonEasy
Use inspect.isawaitable() to classify a set of objects and identify which are awaitable.
import asyncio
import inspect
async def my_coro():
return 42
class MyAwaitable:
def __await__(self):
yield
return 99
async def main():
coro = my_coro()
future = asyncio.Future()
for label, obj in [
("coroutine", coro),
("Future", future),
("int", 42),
("Awaitable class", MyAwaitable()),
]:
print(f"{label} {inspect.isawaitable(obj)}")
coro.close()
asyncio.run(main())
Solution
import asyncio
import inspect
async def my_coro():
return 42
class MyAwaitable:
def __await__(self):
yield
return 99
async def main():
coro = my_coro()
future = asyncio.Future()
for label, obj in [
("coroutine", coro),
("Future", future),
("int", 42),
("Awaitable class", MyAwaitable()),
]:
print(f"{label} {inspect.isawaitable(obj)}")
coro.close()
asyncio.run(main())
Awaitable taxonomy:
- A coroutine object (result of calling an
async deffunction) is awaitable. asyncio.Futureandasyncio.Taskare awaitable.- Any object implementing
__await__is awaitable — Python checks for this method. inspect.isawaitable()is the clean runtime check; it returnsTruefor coroutines, futures, and objects with__await__.- Alternatively,
isinstance(obj, collections.abc.Awaitable)works for classes registered with the ABC. - Plain callables, generators, and synchronous objects are NOT awaitable.
Expected Output
coroutine True\nFuture True\nint False\nAwaitable class TrueHints
Hint 1: Use inspect.isawaitable(obj) to check if an object can be awaited.
Hint 2: Coroutines, Futures, and Tasks are all awaitables. Plain objects are not unless they implement __await__.
Implement the simplest possible custom awaitable class that immediately returns a value when awaited.
import asyncio
class ImmediateValue:
def __init__(self, value):
self._value = value
def __await__(self):
# yield nothing (complete immediately), return self._value
return self._value
yield # make this a generator
async def main():
result = await ImmediateValue(42)
print(f"Result: {result}")
asyncio.run(main())
Solution
import asyncio
class ImmediateValue:
def __init__(self, value):
self._value = value
def __await__(self):
return self._value
yield # unreachable, but makes this a generator function
async def main():
result = await ImmediateValue(42)
print(f"Result: {result}")
asyncio.run(main())
How await works:
__await__must return an iterator (specifically a generator-based iterator in CPython).- The presence of
yieldanywhere in the function body makes it a generator function, even if theyieldis unreachable. - When the generator returns (either by exhaustion or
return value), it raisesStopIteration(value). Theawaitmachinery catches this and uses the value as the result of theawaitexpression. - An immediate return with no
yieldreached = no suspension = the coroutine continues synchronously. - This is how
asyncio.sleep(0)works internally: it yields once to the event loop, then returns.
Expected Output
Result: 42Hints
Hint 1: __await__ must return an iterator. Use yield to suspend and return the final value via StopIteration.
Hint 2: A generator function that returns a value raises StopIteration with that value when exhausted.
Create a custom awaitable YieldOnce that suspends the coroutine for one event loop tick, then resumes.
import asyncio
class YieldOnce:
def __await__(self):
yield # suspend for one tick
# return None implicitly
async def main():
print("Before await")
await YieldOnce()
print("After await — yielded once to event loop")
asyncio.run(main())
Solution
import asyncio
class YieldOnce:
def __await__(self):
yield # yield None to event loop; resumes next iteration
async def main():
print("Before await")
await YieldOnce()
print("After await — yielded once to event loop")
asyncio.run(main())
Suspension mechanics:
yieldinside__await__sends aNoneto the event loop's_stepmachinery.- The event loop sees the
None, schedules a_stepcallback to resume this coroutine, and moves on to other ready callbacks. - On the next iteration, the coroutine is resumed at the
yieldpoint. - This is functionally equivalent to
await asyncio.sleep(0)— both yield to the event loop for exactly one tick. - Understanding this is the foundation for building custom schedulers, profilers that intercept yield points, and low-level async primitives.
Expected Output
Before await\nAfter await — yielded once to event loopHints
Hint 1: yield None inside __await__ tells the event loop to schedule a callback and suspend the current coroutine.
Hint 2: The coroutine resumes on the next event loop iteration.
Medium
Implement a simplified ManualFuture class that can be awaited and resolved from outside.
import asyncio
class ManualFuture:
def __init__(self):
self._result = None
self._done = False
self._callbacks = []
def set_result(self, value):
self._result = value
self._done = True
for cb in self._callbacks:
cb()
def __await__(self):
if self._done:
return self._result
yield # make generator
# Not done yet — yield and register a waker
# This is a simplified version
while not self._done:
yield
return self._result
async def main():
future = ManualFuture()
async def resolver():
await asyncio.sleep(0.01)
future.set_result("hello from the future")
asyncio.create_task(resolver())
result = await future
print(f"Future resolved with: {result}")
asyncio.run(main())
Solution
import asyncio
class ManualFuture:
def __init__(self):
self._result = None
self._done = False
def set_result(self, value):
self._result = value
self._done = True
def __await__(self):
while not self._done:
yield # keep suspending until done
return self._result
async def main():
future = ManualFuture()
async def resolver():
await asyncio.sleep(0.01)
future.set_result("hello from the future")
asyncio.create_task(resolver())
result = await future
print(f"Future resolved with: {result}")
asyncio.run(main())
Custom Future design:
- The
while not self._done: yieldloop keeps suspending the coroutine (returning to the event loop each tick) until the future is resolved. - This is a polling approach — the real
asyncio.Futureuses a smarter callback-based wake-up to avoid busy-looping. - In the real implementation,
Future.__await__yields the future object itself to the event loop; the loop then registers a callback to resume the coroutine whenset_result()is called. - This exercise demonstrates that Futures are just cleverly designed awaitables with external resolution.
Expected Output
Future resolved with: hello from the futureHints
Hint 1: A Future-like awaitable needs to suspend until an external event sets its result.
Hint 2: Use a callback registered on the event loop to set the result after a delay.
Drive a coroutine manually step-by-step using .send() to understand how the event loop runs coroutines.
import asyncio
async def two_step_coro():
await asyncio.sleep(0) # step 1: yields to event loop
await asyncio.sleep(0) # step 2: yields again
return 100
# We cannot drive asyncio coroutines raw because asyncio.sleep needs the loop.
# Instead, use a simple generator-based coroutine to demonstrate the protocol.
def simple_coro():
yield "step1"
yield "step2"
return 100
coro = simple_coro()
try:
val1 = coro.send(None)
print(f"Step 1: {val1}")
val2 = coro.send(None)
print(f"Step 2: {val2}")
coro.send(None) # raises StopIteration
except StopIteration as e:
print(f"Coroutine returned: {e.value}")
Solution
def simple_coro():
yield "step1"
yield "step2"
return 100
coro = simple_coro()
try:
val1 = coro.send(None)
print(f"Step 1: {val1}")
val2 = coro.send(None)
print(f"Step 2: {val2}")
coro.send(None)
except StopIteration as e:
print(f"Coroutine returned: {e.value}")
Coroutine as enhanced generator:
send(None)advances the generator to the nextyield. The yielded value is returned fromsend().- When the generator returns,
send()raisesStopIterationwithe.value= the return value. - This is exactly how asyncio's event loop drives coroutines: it calls
coro.send(None)in a loop, handlingStopIterationto know the coroutine is done. - The difference with async coroutines: they yield futures/Nones to signal the event loop about what they are waiting for, not arbitrary values.
- The first
send()must always passNone— passing any other value before the first yield raisesTypeError.
Expected Output
Step 1: None\nStep 2: None\nCoroutine returned: 100Hints
Hint 1: A coroutine is a generator under the hood. Drive it manually with .send(None) to advance past each yield.
Hint 2: When the coroutine returns, .send() raises StopIteration with the return value.
Create a LoggedFuture that wraps asyncio.Future and logs when it is awaited, by delegating via yield from.
import asyncio
class LoggedFuture:
def __init__(self, future):
self._future = future
def __await__(self):
print("LoggedFuture: starting await")
result = yield from self._future.__await__()
print("LoggedFuture: await complete")
return result
async def main():
loop = asyncio.get_event_loop()
fut = loop.create_future()
async def resolver():
await asyncio.sleep(0.01)
fut.set_result(99)
asyncio.create_task(resolver())
result = await LoggedFuture(fut)
print(f"LoggedFuture resolved: {result}")
asyncio.run(main())
Solution
import asyncio
class LoggedFuture:
def __init__(self, future):
self._future = future
def __await__(self):
print("LoggedFuture: starting await")
result = yield from self._future.__await__()
print("LoggedFuture: await complete")
return result
async def main():
loop = asyncio.get_event_loop()
fut = loop.create_future()
async def resolver():
await asyncio.sleep(0.01)
fut.set_result(99)
asyncio.create_task(resolver())
result = await LoggedFuture(fut)
print(f"LoggedFuture resolved: {result}")
asyncio.run(main())
Delegation with yield from:
yield from iterablein a generator delegates to the sub-iterator completely: it forwardssend()calls,throw()calls, and receives the finalStopIterationvalue.yield from self._future.__await__()means: run the future's awaiting logic inside this generator, suspending as needed, and return the final result.- This is how
awaitworks internally — it compiles toyield from obj.__await__(). - Delegation allows wrapping and decorating awaitables without rewriting their suspension logic.
- The print statements run at entry and exit, proving that the wrapper adds behaviour around the actual suspension.
Expected Output
LoggedFuture resolved: 99Hints
Hint 1: Use yield from inside __await__ to delegate to another awaitable's __await__ iterator.
Hint 2: yield from automatically handles the full iteration protocol including sending values and propagating exceptions.
Create an awaitable that validates it is being awaited inside an active event loop, raising a clear error otherwise.
import asyncio
class LoopGuardedAwaitable:
def __init__(self, value):
self._value = value
def __await__(self):
# check asyncio.get_running_loop() — raise RuntimeError if no loop
try:
asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError("LoopGuardedAwaitable must be awaited inside an async context")
print("Loop context validated")
return self._value
yield
async def main():
result = await LoopGuardedAwaitable("done")
print(f"Result: {result}")
asyncio.run(main())
Solution
import asyncio
class LoopGuardedAwaitable:
def __init__(self, value):
self._value = value
def __await__(self):
try:
asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError("LoopGuardedAwaitable must be awaited inside an async context")
print("Loop context validated")
return self._value
yield # makes this a generator
async def main():
result = await LoopGuardedAwaitable("done")
print(f"Result: {result}")
asyncio.run(main())
Loop validation pattern:
asyncio.get_running_loop()raisesRuntimeErrorif called from a thread with no running event loop.- Inside an
async defcoroutine being driven by an event loop, it always succeeds. - This guard gives a helpful error message when users mistakenly call awaitable-creating code from sync context.
- Production use: libraries like aiohttp and asyncpg use this pattern to provide clear "you must use async" error messages.
asyncio.get_event_loop()(deprecated pattern) would NOT raise — it creates a new loop. Preferget_running_loop()for validation.
Expected Output
Loop context validated\nResult: doneHints
Hint 1: Call asyncio.get_running_loop() inside __await__ to check if there is an active event loop.
Hint 2: Raise RuntimeError if there is no running loop to give a clear error message.
Hard
Implement a minimal cooperative scheduler that runs multiple coroutines concurrently by interleaving them.
from collections import deque
def scheduler(*coros):
"""Run coroutines cooperatively. Each yield point gives others a turn."""
queue = deque(coros)
while queue:
coro = queue.popleft()
try:
coro.send(None) # advance to next yield
queue.append(coro) # put back if not done
except StopIteration:
pass # coroutine finished
def task_a():
print("A: step 1")
yield
print("A: step 2")
yield
print("A done")
def task_b():
print("B: step 1")
yield
print("B: step 2")
yield
print("B done")
scheduler(task_a(), task_b())
Solution
from collections import deque
def scheduler(*coros):
queue = deque(coros)
while queue:
coro = queue.popleft()
try:
coro.send(None)
queue.append(coro)
except StopIteration:
pass
def task_a():
print("A: step 1")
yield
print("A: step 2")
yield
print("A done")
def task_b():
print("B: step 1")
yield
print("B: step 2")
yield
print("B done")
scheduler(task_a(), task_b())
Minimal event loop:
- This 10-line scheduler is the conceptual core of any event loop, including asyncio's.
- Each
yieldis a suspension point — a voluntary handoff to the scheduler. - The scheduler round-robins through ready coroutines: dequeue, advance one step, re-enqueue.
- A production event loop extends this with: I/O readiness (via
select/epoll), timers (call_later), callbacks, thread executors, and signal handling. - The key insight: "async" in Python is cooperative multitasking — coroutines voluntarily yield control; the scheduler never preempts them.
Expected Output
A: step 1\nB: step 1\nA: step 2\nB: step 2\nA done\nB doneHints
Hint 1: A scheduler is a loop that drives coroutines by calling .send(None) until they raise StopIteration.
Hint 2: Use a deque as a ready queue. When a coroutine yields, add it back to the queue.
Build AwaitableWithTimeout(value, delay, timeout) that resolves with value after delay seconds, but raises TimeoutError if timeout is exceeded first.
import asyncio
class AwaitableWithTimeout:
def __init__(self, value, delay, timeout):
self._value = value
self._delay = delay
self._timeout = timeout
def __await__(self):
loop = asyncio.get_event_loop()
fut = loop.create_future()
timeout_fired = [False]
def on_result():
if not fut.done():
fut.set_result(self._value)
def on_timeout():
timeout_fired[0] = True
if not fut.done():
fut.set_exception(TimeoutError(f"Timeout after {self._timeout}s"))
loop.call_later(self._delay, on_result)
loop.call_later(self._timeout, on_timeout)
return (yield from fut.__await__())
async def main():
# Case 1: result arrives before timeout
result = await AwaitableWithTimeout("data", delay=0.01, timeout=0.05)
print(f"Result arrived: {result}")
# Case 2: timeout fires first
try:
await AwaitableWithTimeout("data", delay=0.1, timeout=0.05)
except TimeoutError as e:
print(f"Timeout hit after 0.05s")
asyncio.run(main())
Solution
import asyncio
class AwaitableWithTimeout:
def __init__(self, value, delay, timeout):
self._value = value
self._delay = delay
self._timeout = timeout
def __await__(self):
loop = asyncio.get_event_loop()
fut = loop.create_future()
def on_result():
if not fut.done():
fut.set_result(self._value)
def on_timeout():
if not fut.done():
fut.set_exception(TimeoutError(f"Timeout after {self._timeout}s"))
loop.call_later(self._delay, on_result)
loop.call_later(self._timeout, on_timeout)
return (yield from fut.__await__())
async def main():
result = await AwaitableWithTimeout("data", delay=0.01, timeout=0.05)
print(f"Result arrived: {result}")
try:
await AwaitableWithTimeout("data", delay=0.1, timeout=0.05)
except TimeoutError as e:
print(f"Timeout hit after 0.05s")
asyncio.run(main())
Custom timeout awaitable:
loop.call_later(delay, fn)schedulesfnto run afterdelayseconds — it is the primitive underlyingasyncio.sleep.- The first callback to fire wins:
fut.done()check prevents double-setting. yield from fut.__await__()delegates suspension to the real Future, which wakes up when set_result/set_exception is called.- This demonstrates the composability of the awaitable protocol — we build higher-level behaviour by wrapping lower-level awaitables.
- Production version: also cancel the unused callback to avoid resource leaks.
Expected Output
Result arrived: data\nTimeout hit after 0.05sHints
Hint 1: Use loop.call_later() to schedule a timeout callback that sets a cancel flag or cancels a Future.
Hint 2: The awaitable should yield to the event loop while waiting, and stop when either the result or timeout fires.
Build a middleware wrapper for any awaitable that logs before and after the await completes.
import asyncio
class MiddlewareAwaitable:
def __init__(self, inner, name):
self._inner = inner
self._name = name
def __await__(self):
print(f"[before] calling {self._name}")
result = yield from self._inner.__await__()
print(f"[after] result: {result}")
return result
class DelayedValue:
def __init__(self, value):
self._value = value
def __await__(self):
yield # one tick
return self._value
async def add_one(n):
val = await DelayedValue(n)
return val + 1
async def main():
coro = add_one(42)
wrapped = MiddlewareAwaitable(coro, "add_one")
result = await wrapped
print(f"Final: {result}")
asyncio.run(main())
Solution
import asyncio
class MiddlewareAwaitable:
def __init__(self, inner, name):
self._inner = inner
self._name = name
def __await__(self):
print(f"[before] calling {self._name}")
result = yield from self._inner.__await__()
print(f"[after] result: {result}")
return result
class DelayedValue:
def __init__(self, value):
self._value = value
def __await__(self):
yield
return self._value
async def add_one(n):
val = await DelayedValue(n)
return val + 1
async def main():
coro = add_one(42)
wrapped = MiddlewareAwaitable(coro, "add_one")
result = await wrapped
print(f"Final: {result}")
asyncio.run(main())
Middleware pattern:
- The middleware awaitable wraps any awaitable without modifying it.
yield from inner.__await__()transparently handles all suspension points inside the inner awaitable.- Code before
yield fromruns before any suspension. Code after runs after the inner awaitable resolves. - This pattern enables: tracing, timing, retrying, and rate-limiting at the awaitable protocol level.
- Production use: observability libraries like OpenTelemetry wrap awaitables to create spans automatically.
Expected Output
[before] calling add_one\n[after] result: 43\nFinal: 43Hints
Hint 1: Wrap an awaitable in another awaitable to add before/after logging.
Hint 2: yield from inner.__await__() in the outer __await__ handles the full awaiting delegation.
Implement a semaphore from first principles using asyncio.Future and the __await__ protocol.
import asyncio
class ManualSemaphore:
def __init__(self, value):
self._value = value
self._waiters = []
def acquire(self):
return _SemaphoreAcquire(self)
def release(self):
self._value += 1
# wake up next waiter if any
while self._waiters:
fut = self._waiters.pop(0)
if not fut.done():
self._value -= 1
fut.set_result(None)
break
class _SemaphoreAcquire:
def __init__(self, sem):
self._sem = sem
def __await__(self):
if self._sem._value > 0:
self._sem._value -= 1
return
yield # make generator
# No permits available — create a Future and wait
fut = asyncio.get_event_loop().create_future()
self._sem._waiters.append(fut)
yield from fut.__await__()
async def worker(sem, worker_id):
await sem.acquire()
try:
print(f"Worker {worker_id} running")
await asyncio.sleep(0.01)
finally:
sem.release()
async def main():
sem = ManualSemaphore(2) # max 2 concurrent
await asyncio.gather(*[worker(sem, i) for i in range(5)])
print("All workers done")
asyncio.run(main())
Solution
import asyncio
class ManualSemaphore:
def __init__(self, value):
self._value = value
self._waiters = []
def acquire(self):
return _SemaphoreAcquire(self)
def release(self):
self._value += 1
while self._waiters:
fut = self._waiters.pop(0)
if not fut.done():
self._value -= 1
fut.set_result(None)
break
class _SemaphoreAcquire:
def __init__(self, sem):
self._sem = sem
def __await__(self):
if self._sem._value > 0:
self._sem._value -= 1
return
yield
fut = asyncio.get_event_loop().create_future()
self._sem._waiters.append(fut)
yield from fut.__await__()
async def worker(sem, worker_id):
await sem.acquire()
try:
print(f"Worker {worker_id} running")
await asyncio.sleep(0.01)
finally:
sem.release()
async def main():
sem = ManualSemaphore(2)
await asyncio.gather(*[worker(sem, i) for i in range(5)])
print("All workers done")
asyncio.run(main())
Semaphore internals:
- When
_value > 0, acquire decrements and returns immediately — no suspension needed. - When
_value == 0, a Future is created and stored in the waiters queue. The coroutine suspends viayield from fut.__await__(). - On
release(), increment_value, then wake the next waiter by callingfut.set_result(None). The waiting coroutine resumes. - The
fut.done()check handles cancellation: if a waiting coroutine was cancelled, its Future is done but not set, so we skip it and try the next waiter. - This is essentially
asyncio.Semaphore's implementation, minus edge case handling. Studying it demystifies all asyncio synchronization primitives.
Expected Output
See solution — concurrent tasks limited to N at a timeHints
Hint 1: Track a count of available permits. When count is 0, waiters queue up as Futures.
Hint 2: On release, if there are waiters, set_result on the next one to wake it up.
