Skip to main content

Python Custom Awaitables Practice Problems & Exercises

Practice: Custom Awaitables

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1Identify AwaitablesEasy
awaitableinspectcoroutineFuture

Use inspect.isawaitable() to classify a set of objects and identify which are awaitable.

import asyncio
import inspect

async def my_coro():
return 42

class MyAwaitable:
def __await__(self):
yield
return 99

async def main():
coro = my_coro()
future = asyncio.Future()

for label, obj in [
("coroutine", coro),
("Future", future),
("int", 42),
("Awaitable class", MyAwaitable()),
]:
print(f"{label} {inspect.isawaitable(obj)}")

coro.close()

asyncio.run(main())
Solution
import asyncio
import inspect

async def my_coro():
return 42

class MyAwaitable:
def __await__(self):
yield
return 99

async def main():
coro = my_coro()
future = asyncio.Future()

for label, obj in [
("coroutine", coro),
("Future", future),
("int", 42),
("Awaitable class", MyAwaitable()),
]:
print(f"{label} {inspect.isawaitable(obj)}")

coro.close()

asyncio.run(main())

Awaitable taxonomy:

  • A coroutine object (result of calling an async def function) is awaitable.
  • asyncio.Future and asyncio.Task are awaitable.
  • Any object implementing __await__ is awaitable — Python checks for this method.
  • inspect.isawaitable() is the clean runtime check; it returns True for coroutines, futures, and objects with __await__.
  • Alternatively, isinstance(obj, collections.abc.Awaitable) works for classes registered with the ABC.
  • Plain callables, generators, and synchronous objects are NOT awaitable.
Expected Output
coroutine True\nFuture True\nint False\nAwaitable class True
Hints

Hint 1: Use inspect.isawaitable(obj) to check if an object can be awaited.

Hint 2: Coroutines, Futures, and Tasks are all awaitables. Plain objects are not unless they implement __await__.

#2Simplest Custom AwaitableEasy
__await__generatorStopIteration

Implement the simplest possible custom awaitable class that immediately returns a value when awaited.

import asyncio

class ImmediateValue:
def __init__(self, value):
self._value = value

def __await__(self):
# yield nothing (complete immediately), return self._value
return self._value
yield # make this a generator

async def main():
result = await ImmediateValue(42)
print(f"Result: {result}")

asyncio.run(main())
Solution
import asyncio

class ImmediateValue:
def __init__(self, value):
self._value = value

def __await__(self):
return self._value
yield # unreachable, but makes this a generator function

async def main():
result = await ImmediateValue(42)
print(f"Result: {result}")

asyncio.run(main())

How await works:

  • __await__ must return an iterator (specifically a generator-based iterator in CPython).
  • The presence of yield anywhere in the function body makes it a generator function, even if the yield is unreachable.
  • When the generator returns (either by exhaustion or return value), it raises StopIteration(value). The await machinery catches this and uses the value as the result of the await expression.
  • An immediate return with no yield reached = no suspension = the coroutine continues synchronously.
  • This is how asyncio.sleep(0) works internally: it yields once to the event loop, then returns.
Expected Output
Result: 42
Hints

Hint 1: __await__ must return an iterator. Use yield to suspend and return the final value via StopIteration.

Hint 2: A generator function that returns a value raises StopIteration with that value when exhausted.

#3Awaitable That Suspends OnceEasy
__await__yieldevent-loopsuspension

Create a custom awaitable YieldOnce that suspends the coroutine for one event loop tick, then resumes.

import asyncio

class YieldOnce:
def __await__(self):
yield # suspend for one tick
# return None implicitly

async def main():
print("Before await")
await YieldOnce()
print("After await — yielded once to event loop")

asyncio.run(main())
Solution
import asyncio

class YieldOnce:
def __await__(self):
yield # yield None to event loop; resumes next iteration

async def main():
print("Before await")
await YieldOnce()
print("After await — yielded once to event loop")

asyncio.run(main())

Suspension mechanics:

  • yield inside __await__ sends a None to the event loop's _step machinery.
  • The event loop sees the None, schedules a _step callback to resume this coroutine, and moves on to other ready callbacks.
  • On the next iteration, the coroutine is resumed at the yield point.
  • This is functionally equivalent to await asyncio.sleep(0) — both yield to the event loop for exactly one tick.
  • Understanding this is the foundation for building custom schedulers, profilers that intercept yield points, and low-level async primitives.
Expected Output
Before await\nAfter await — yielded once to event loop
Hints

Hint 1: yield None inside __await__ tells the event loop to schedule a callback and suspend the current coroutine.

Hint 2: The coroutine resumes on the next event loop iteration.


Medium

#4Custom Future-Like AwaitableMedium
__await__Futurecallbacksevent-loop

Implement a simplified ManualFuture class that can be awaited and resolved from outside.

import asyncio

class ManualFuture:
def __init__(self):
self._result = None
self._done = False
self._callbacks = []

def set_result(self, value):
self._result = value
self._done = True
for cb in self._callbacks:
cb()

def __await__(self):
if self._done:
return self._result
yield # make generator

# Not done yet — yield and register a waker
# This is a simplified version
while not self._done:
yield
return self._result

async def main():
future = ManualFuture()

async def resolver():
await asyncio.sleep(0.01)
future.set_result("hello from the future")

asyncio.create_task(resolver())
result = await future
print(f"Future resolved with: {result}")

asyncio.run(main())
Solution
import asyncio

class ManualFuture:
def __init__(self):
self._result = None
self._done = False

def set_result(self, value):
self._result = value
self._done = True

def __await__(self):
while not self._done:
yield # keep suspending until done
return self._result

async def main():
future = ManualFuture()

async def resolver():
await asyncio.sleep(0.01)
future.set_result("hello from the future")

asyncio.create_task(resolver())
result = await future
print(f"Future resolved with: {result}")

asyncio.run(main())

Custom Future design:

  • The while not self._done: yield loop keeps suspending the coroutine (returning to the event loop each tick) until the future is resolved.
  • This is a polling approach — the real asyncio.Future uses a smarter callback-based wake-up to avoid busy-looping.
  • In the real implementation, Future.__await__ yields the future object itself to the event loop; the loop then registers a callback to resume the coroutine when set_result() is called.
  • This exercise demonstrates that Futures are just cleverly designed awaitables with external resolution.
Expected Output
Future resolved with: hello from the future
Hints

Hint 1: A Future-like awaitable needs to suspend until an external event sets its result.

Hint 2: Use a callback registered on the event loop to set the result after a delay.

#5Coroutine Step-by-StepMedium
coroutinesendStopIterationlow-level

Drive a coroutine manually step-by-step using .send() to understand how the event loop runs coroutines.

import asyncio

async def two_step_coro():
await asyncio.sleep(0) # step 1: yields to event loop
await asyncio.sleep(0) # step 2: yields again
return 100

# We cannot drive asyncio coroutines raw because asyncio.sleep needs the loop.
# Instead, use a simple generator-based coroutine to demonstrate the protocol.
def simple_coro():
yield "step1"
yield "step2"
return 100

coro = simple_coro()
try:
val1 = coro.send(None)
print(f"Step 1: {val1}")
val2 = coro.send(None)
print(f"Step 2: {val2}")
coro.send(None) # raises StopIteration
except StopIteration as e:
print(f"Coroutine returned: {e.value}")
Solution
def simple_coro():
yield "step1"
yield "step2"
return 100

coro = simple_coro()
try:
val1 = coro.send(None)
print(f"Step 1: {val1}")
val2 = coro.send(None)
print(f"Step 2: {val2}")
coro.send(None)
except StopIteration as e:
print(f"Coroutine returned: {e.value}")

Coroutine as enhanced generator:

  • send(None) advances the generator to the next yield. The yielded value is returned from send().
  • When the generator returns, send() raises StopIteration with e.value = the return value.
  • This is exactly how asyncio's event loop drives coroutines: it calls coro.send(None) in a loop, handling StopIteration to know the coroutine is done.
  • The difference with async coroutines: they yield futures/Nones to signal the event loop about what they are waiting for, not arbitrary values.
  • The first send() must always pass None — passing any other value before the first yield raises TypeError.
Expected Output
Step 1: None\nStep 2: None\nCoroutine returned: 100
Hints

Hint 1: A coroutine is a generator under the hood. Drive it manually with .send(None) to advance past each yield.

Hint 2: When the coroutine returns, .send() raises StopIteration with the return value.

#6Delegating __await__ to Another AwaitableMedium
__await__yield-fromdelegationcomposing-awaitables

Create a LoggedFuture that wraps asyncio.Future and logs when it is awaited, by delegating via yield from.

import asyncio

class LoggedFuture:
def __init__(self, future):
self._future = future

def __await__(self):
print("LoggedFuture: starting await")
result = yield from self._future.__await__()
print("LoggedFuture: await complete")
return result

async def main():
loop = asyncio.get_event_loop()
fut = loop.create_future()

async def resolver():
await asyncio.sleep(0.01)
fut.set_result(99)

asyncio.create_task(resolver())
result = await LoggedFuture(fut)
print(f"LoggedFuture resolved: {result}")

asyncio.run(main())
Solution
import asyncio

class LoggedFuture:
def __init__(self, future):
self._future = future

def __await__(self):
print("LoggedFuture: starting await")
result = yield from self._future.__await__()
print("LoggedFuture: await complete")
return result

async def main():
loop = asyncio.get_event_loop()
fut = loop.create_future()

async def resolver():
await asyncio.sleep(0.01)
fut.set_result(99)

asyncio.create_task(resolver())
result = await LoggedFuture(fut)
print(f"LoggedFuture resolved: {result}")

asyncio.run(main())

Delegation with yield from:

  • yield from iterable in a generator delegates to the sub-iterator completely: it forwards send() calls, throw() calls, and receives the final StopIteration value.
  • yield from self._future.__await__() means: run the future's awaiting logic inside this generator, suspending as needed, and return the final result.
  • This is how await works internally — it compiles to yield from obj.__await__().
  • Delegation allows wrapping and decorating awaitables without rewriting their suspension logic.
  • The print statements run at entry and exit, proving that the wrapper adds behaviour around the actual suspension.
Expected Output
LoggedFuture resolved: 99
Hints

Hint 1: Use yield from inside __await__ to delegate to another awaitable's __await__ iterator.

Hint 2: yield from automatically handles the full iteration protocol including sending values and propagating exceptions.

#7Awaitable that Rejects Awaiting Outside a LoopMedium
__await__asyncio.get_running_loopvalidationguard

Create an awaitable that validates it is being awaited inside an active event loop, raising a clear error otherwise.

import asyncio

class LoopGuardedAwaitable:
def __init__(self, value):
self._value = value

def __await__(self):
# check asyncio.get_running_loop() — raise RuntimeError if no loop
try:
asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError("LoopGuardedAwaitable must be awaited inside an async context")
print("Loop context validated")
return self._value
yield

async def main():
result = await LoopGuardedAwaitable("done")
print(f"Result: {result}")

asyncio.run(main())
Solution
import asyncio

class LoopGuardedAwaitable:
def __init__(self, value):
self._value = value

def __await__(self):
try:
asyncio.get_running_loop()
except RuntimeError:
raise RuntimeError("LoopGuardedAwaitable must be awaited inside an async context")
print("Loop context validated")
return self._value
yield # makes this a generator

async def main():
result = await LoopGuardedAwaitable("done")
print(f"Result: {result}")

asyncio.run(main())

Loop validation pattern:

  • asyncio.get_running_loop() raises RuntimeError if called from a thread with no running event loop.
  • Inside an async def coroutine being driven by an event loop, it always succeeds.
  • This guard gives a helpful error message when users mistakenly call awaitable-creating code from sync context.
  • Production use: libraries like aiohttp and asyncpg use this pattern to provide clear "you must use async" error messages.
  • asyncio.get_event_loop() (deprecated pattern) would NOT raise — it creates a new loop. Prefer get_running_loop() for validation.
Expected Output
Loop context validated\nResult: done
Hints

Hint 1: Call asyncio.get_running_loop() inside __await__ to check if there is an active event loop.

Hint 2: Raise RuntimeError if there is no running loop to give a clear error message.


Hard

#8Build a Minimal SchedulerHard
scheduler__await__event-loopcoroutine-driver

Implement a minimal cooperative scheduler that runs multiple coroutines concurrently by interleaving them.

from collections import deque

def scheduler(*coros):
"""Run coroutines cooperatively. Each yield point gives others a turn."""
queue = deque(coros)
while queue:
coro = queue.popleft()
try:
coro.send(None) # advance to next yield
queue.append(coro) # put back if not done
except StopIteration:
pass # coroutine finished

def task_a():
print("A: step 1")
yield
print("A: step 2")
yield
print("A done")

def task_b():
print("B: step 1")
yield
print("B: step 2")
yield
print("B done")

scheduler(task_a(), task_b())
Solution
from collections import deque

def scheduler(*coros):
queue = deque(coros)
while queue:
coro = queue.popleft()
try:
coro.send(None)
queue.append(coro)
except StopIteration:
pass

def task_a():
print("A: step 1")
yield
print("A: step 2")
yield
print("A done")

def task_b():
print("B: step 1")
yield
print("B: step 2")
yield
print("B done")

scheduler(task_a(), task_b())

Minimal event loop:

  • This 10-line scheduler is the conceptual core of any event loop, including asyncio's.
  • Each yield is a suspension point — a voluntary handoff to the scheduler.
  • The scheduler round-robins through ready coroutines: dequeue, advance one step, re-enqueue.
  • A production event loop extends this with: I/O readiness (via select/epoll), timers (call_later), callbacks, thread executors, and signal handling.
  • The key insight: "async" in Python is cooperative multitasking — coroutines voluntarily yield control; the scheduler never preempts them.
Expected Output
A: step 1\nB: step 1\nA: step 2\nB: step 2\nA done\nB done
Hints

Hint 1: A scheduler is a loop that drives coroutines by calling .send(None) until they raise StopIteration.

Hint 2: Use a deque as a ready queue. When a coroutine yields, add it back to the queue.

#9Custom Awaitable with TimeoutHard
__await__asyncio.Futuretimeoutcancellation

Build AwaitableWithTimeout(value, delay, timeout) that resolves with value after delay seconds, but raises TimeoutError if timeout is exceeded first.

import asyncio

class AwaitableWithTimeout:
def __init__(self, value, delay, timeout):
self._value = value
self._delay = delay
self._timeout = timeout

def __await__(self):
loop = asyncio.get_event_loop()
fut = loop.create_future()
timeout_fired = [False]

def on_result():
if not fut.done():
fut.set_result(self._value)

def on_timeout():
timeout_fired[0] = True
if not fut.done():
fut.set_exception(TimeoutError(f"Timeout after {self._timeout}s"))

loop.call_later(self._delay, on_result)
loop.call_later(self._timeout, on_timeout)
return (yield from fut.__await__())

async def main():
# Case 1: result arrives before timeout
result = await AwaitableWithTimeout("data", delay=0.01, timeout=0.05)
print(f"Result arrived: {result}")

# Case 2: timeout fires first
try:
await AwaitableWithTimeout("data", delay=0.1, timeout=0.05)
except TimeoutError as e:
print(f"Timeout hit after 0.05s")

asyncio.run(main())
Solution
import asyncio

class AwaitableWithTimeout:
def __init__(self, value, delay, timeout):
self._value = value
self._delay = delay
self._timeout = timeout

def __await__(self):
loop = asyncio.get_event_loop()
fut = loop.create_future()

def on_result():
if not fut.done():
fut.set_result(self._value)

def on_timeout():
if not fut.done():
fut.set_exception(TimeoutError(f"Timeout after {self._timeout}s"))

loop.call_later(self._delay, on_result)
loop.call_later(self._timeout, on_timeout)
return (yield from fut.__await__())

async def main():
result = await AwaitableWithTimeout("data", delay=0.01, timeout=0.05)
print(f"Result arrived: {result}")

try:
await AwaitableWithTimeout("data", delay=0.1, timeout=0.05)
except TimeoutError as e:
print(f"Timeout hit after 0.05s")

asyncio.run(main())

Custom timeout awaitable:

  • loop.call_later(delay, fn) schedules fn to run after delay seconds — it is the primitive underlying asyncio.sleep.
  • The first callback to fire wins: fut.done() check prevents double-setting.
  • yield from fut.__await__() delegates suspension to the real Future, which wakes up when set_result/set_exception is called.
  • This demonstrates the composability of the awaitable protocol — we build higher-level behaviour by wrapping lower-level awaitables.
  • Production version: also cancel the unused callback to avoid resource leaks.
Expected Output
Result arrived: data\nTimeout hit after 0.05s
Hints

Hint 1: Use loop.call_later() to schedule a timeout callback that sets a cancel flag or cancels a Future.

Hint 2: The awaitable should yield to the event loop while waiting, and stop when either the result or timeout fires.

#10Awaitable Middleware ChainHard
__await__middlewarecomposable-awaitablesdecorator-pattern

Build a middleware wrapper for any awaitable that logs before and after the await completes.

import asyncio

class MiddlewareAwaitable:
def __init__(self, inner, name):
self._inner = inner
self._name = name

def __await__(self):
print(f"[before] calling {self._name}")
result = yield from self._inner.__await__()
print(f"[after] result: {result}")
return result

class DelayedValue:
def __init__(self, value):
self._value = value

def __await__(self):
yield # one tick
return self._value

async def add_one(n):
val = await DelayedValue(n)
return val + 1

async def main():
coro = add_one(42)
wrapped = MiddlewareAwaitable(coro, "add_one")
result = await wrapped
print(f"Final: {result}")

asyncio.run(main())
Solution
import asyncio

class MiddlewareAwaitable:
def __init__(self, inner, name):
self._inner = inner
self._name = name

def __await__(self):
print(f"[before] calling {self._name}")
result = yield from self._inner.__await__()
print(f"[after] result: {result}")
return result

class DelayedValue:
def __init__(self, value):
self._value = value

def __await__(self):
yield
return self._value

async def add_one(n):
val = await DelayedValue(n)
return val + 1

async def main():
coro = add_one(42)
wrapped = MiddlewareAwaitable(coro, "add_one")
result = await wrapped
print(f"Final: {result}")

asyncio.run(main())

Middleware pattern:

  • The middleware awaitable wraps any awaitable without modifying it.
  • yield from inner.__await__() transparently handles all suspension points inside the inner awaitable.
  • Code before yield from runs before any suspension. Code after runs after the inner awaitable resolves.
  • This pattern enables: tracing, timing, retrying, and rate-limiting at the awaitable protocol level.
  • Production use: observability libraries like OpenTelemetry wrap awaitables to create spans automatically.
Expected Output
[before] calling add_one\n[after] result: 43\nFinal: 43
Hints

Hint 1: Wrap an awaitable in another awaitable to add before/after logging.

Hint 2: yield from inner.__await__() in the outer __await__ handles the full awaiting delegation.

#11Async Semaphore from First PrinciplesHard
semaphore__await__asyncio.Futuresynchronization-primitive

Implement a semaphore from first principles using asyncio.Future and the __await__ protocol.

import asyncio

class ManualSemaphore:
def __init__(self, value):
self._value = value
self._waiters = []

def acquire(self):
return _SemaphoreAcquire(self)

def release(self):
self._value += 1
# wake up next waiter if any
while self._waiters:
fut = self._waiters.pop(0)
if not fut.done():
self._value -= 1
fut.set_result(None)
break

class _SemaphoreAcquire:
def __init__(self, sem):
self._sem = sem

def __await__(self):
if self._sem._value > 0:
self._sem._value -= 1
return
yield # make generator
# No permits available — create a Future and wait
fut = asyncio.get_event_loop().create_future()
self._sem._waiters.append(fut)
yield from fut.__await__()

async def worker(sem, worker_id):
await sem.acquire()
try:
print(f"Worker {worker_id} running")
await asyncio.sleep(0.01)
finally:
sem.release()

async def main():
sem = ManualSemaphore(2) # max 2 concurrent
await asyncio.gather(*[worker(sem, i) for i in range(5)])
print("All workers done")

asyncio.run(main())
Solution
import asyncio

class ManualSemaphore:
def __init__(self, value):
self._value = value
self._waiters = []

def acquire(self):
return _SemaphoreAcquire(self)

def release(self):
self._value += 1
while self._waiters:
fut = self._waiters.pop(0)
if not fut.done():
self._value -= 1
fut.set_result(None)
break

class _SemaphoreAcquire:
def __init__(self, sem):
self._sem = sem

def __await__(self):
if self._sem._value > 0:
self._sem._value -= 1
return
yield
fut = asyncio.get_event_loop().create_future()
self._sem._waiters.append(fut)
yield from fut.__await__()

async def worker(sem, worker_id):
await sem.acquire()
try:
print(f"Worker {worker_id} running")
await asyncio.sleep(0.01)
finally:
sem.release()

async def main():
sem = ManualSemaphore(2)
await asyncio.gather(*[worker(sem, i) for i in range(5)])
print("All workers done")

asyncio.run(main())

Semaphore internals:

  • When _value > 0, acquire decrements and returns immediately — no suspension needed.
  • When _value == 0, a Future is created and stored in the waiters queue. The coroutine suspends via yield from fut.__await__().
  • On release(), increment _value, then wake the next waiter by calling fut.set_result(None). The waiting coroutine resumes.
  • The fut.done() check handles cancellation: if a waiting coroutine was cancelled, its Future is done but not set, so we skip it and try the next waiter.
  • This is essentially asyncio.Semaphore's implementation, minus edge case handling. Studying it demystifies all asyncio synchronization primitives.
Expected Output
See solution — concurrent tasks limited to N at a time
Hints

Hint 1: Track a count of available permits. When count is 0, waiters queue up as Futures.

Hint 2: On release, if there are waiters, set_result on the next one to wake it up.

© 2026 EngineersOfAI. All rights reserved.