Structured Concurrency with TaskGroup
Before reading any explanation, predict the output of this program:
import asyncio
async def task_a():
await asyncio.sleep(0.1)
raise ValueError("A failed")
async def task_b():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("B was cancelled")
raise
async def task_c():
await asyncio.sleep(0.05)
print("C completed")
return "C"
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_a())
tg.create_task(task_b())
tg.create_task(task_c())
except* ValueError as eg:
print(f"Caught {len(eg.exceptions)} error(s)")
for e in eg.exceptions:
print(f" {e}")
asyncio.run(main())
Think carefully. Does task C complete? Does task B get cancelled? What syntax catches the error?
# Output:
# C completed
# B was cancelled
# Caught 1 error(s)
# A failed
Task C finishes before task A fails (0.05s < 0.1s). When task A raises, the TaskGroup cancels all remaining tasks (task B), waits for them to handle cancellation, then raises an ExceptionGroup containing the ValueError. The except* syntax (Python 3.11+) is required to catch it.
This is structured concurrency: every task has a clear lifetime bounded by its parent scope, and errors propagate predictably.
What You Will Learn
- Why
asyncio.gather()has fundamental design problems for error handling - How
asyncio.TaskGroup(Python 3.11+) solves these problems ExceptionGroupand theexcept*syntax for handling multiple simultaneous errors- Cancellation semantics: what happens when one task fails
- The nursery pattern and its guarantees
- Real-world patterns: parallel API calls, fan-out/fan-in, graceful degradation
Prerequisites
- Comfortable with
asyncio.create_task(),asyncio.gather() - Understanding of task cancellation (
CancelledError) - Async context managers from Lesson 2
- Familiarity with exception handling in async code
Part 1 -- The Problem with gather()
asyncio.gather() was the standard way to run tasks concurrently before Python 3.11. It has three fundamental problems.
Problem 1: Silent Exception Swallowing
import asyncio
async def fetch_user(user_id):
if user_id == 2:
raise ConnectionError(f"Failed to fetch user {user_id}")
await asyncio.sleep(0.1)
return {"id": user_id, "name": f"User {user_id}"}
async def main():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2), # This will fail
fetch_user(3),
return_exceptions=True # "Solution" that creates new problems
)
for r in results:
# You must check EVERY result to see if it's an exception
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Success: {r}")
asyncio.run(main())
# Output:
# Success: {'id': 1, 'name': 'User 1'}
# Error: Failed to fetch user 2
# Success: {'id': 3, 'name': 'User 3'}
With return_exceptions=True, exceptions are mixed in with results. If you forget to check, you silently process Exception objects as if they were valid data. Without return_exceptions=True, the first exception cancels the gather(), but other tasks may continue running as orphans.
Problem 2: Task Leaks
async def leaky_gather():
async def long_task():
try:
await asyncio.sleep(100)
finally:
print("long_task cleanup") # May never run!
async def failing_task():
await asyncio.sleep(0.1)
raise RuntimeError("fail")
try:
await asyncio.gather(long_task(), failing_task())
except RuntimeError:
print("Caught error")
# But long_task is still running as an orphaned task!
# It has no parent, no cleanup, no way to cancel it.
asyncio.run(leaky_gather())
# Output:
# Caught error
# (long_task cleanup may or may not print depending on event loop shutdown)
Problem 3: No Cancellation Propagation
When gather() is cancelled, it cancels all children. But when one child fails, it does not cancel siblings. This leads to wasted work:
async def expensive_computation():
"""Runs for 60 seconds even though the result is useless."""
await asyncio.sleep(60)
return "computed"
async def validation():
"""Fails immediately -- the computation is pointless."""
raise ValueError("invalid input")
# The expensive computation continues running after validation fails
results = await asyncio.gather(
expensive_computation(),
validation(),
return_exceptions=True
)
Part 2 -- TaskGroup Fundamentals
asyncio.TaskGroup (Python 3.11+) is an async context manager that owns a set of tasks. When the context exits, it guarantees:
- All tasks have completed (either successfully or via cancellation).
- If any task raised an exception, all other tasks are cancelled.
- All exceptions are collected into an
ExceptionGroupand raised.
import asyncio
async def fetch(url: str, delay: float) -> dict:
await asyncio.sleep(delay)
return {"url": url, "status": 200}
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("https://api.example.com/users", 0.1))
task2 = tg.create_task(fetch("https://api.example.com/posts", 0.2))
task3 = tg.create_task(fetch("https://api.example.com/comments", 0.15))
# All tasks are guaranteed complete when we reach here
print(task1.result()) # {"url": "...", "status": 200}
print(task2.result()) # {"url": "...", "status": 200}
print(task3.result()) # {"url": "...", "status": 200}
asyncio.run(main())
Task Naming
Give tasks descriptive names for debugging:
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch(url), name=f"fetch-{url}")
Accessing Results
create_task() returns an asyncio.Task object. After the TaskGroup exits, you can access .result():
async def main():
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch(url, 0.1), name=url)
for url in ["api/users", "api/posts", "api/comments"]
]
results = [t.result() for t in tasks]
print(f"Fetched {len(results)} resources")
Part 3 -- ExceptionGroup and except*
When a TaskGroup encounters errors, it raises an ExceptionGroup (or BaseExceptionGroup for BaseException subclasses like KeyboardInterrupt).
The except* Syntax
Python 3.11 introduced except* to handle ExceptionGroup:
async def flaky_fetch(url):
if "users" in url:
raise ConnectionError(f"Cannot reach {url}")
if "posts" in url:
raise ValueError(f"Invalid response from {url}")
await asyncio.sleep(0.1)
return {"url": url}
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(flaky_fetch("api/users"))
tg.create_task(flaky_fetch("api/posts"))
tg.create_task(flaky_fetch("api/comments"))
except* ConnectionError as eg:
print(f"Connection errors ({len(eg.exceptions)}):")
for e in eg.exceptions:
print(f" {e}")
except* ValueError as eg:
print(f"Value errors ({len(eg.exceptions)}):")
for e in eg.exceptions:
print(f" {e}")
asyncio.run(main())
# Output:
# Connection errors (1):
# Cannot reach api/users
# Value errors (1):
# Invalid response from api/posts
:::danger Critical Difference from except
except* does NOT prevent other except* clauses from running. Each clause handles a subset of exceptions from the group. All matching clauses execute. This is fundamentally different from except, where only the first matching clause runs.
:::
ExceptionGroup Structure
An ExceptionGroup wraps one or more exceptions and has a message:
# Creating ExceptionGroups manually
eg = ExceptionGroup("multiple errors", [
ValueError("bad value"),
TypeError("bad type"),
ValueError("another bad value"),
])
print(eg)
# ExceptionGroup: multiple errors (3 sub-exceptions)
# Subgroup by type
value_errors = eg.subgroup(ValueError)
print(value_errors)
# ExceptionGroup: multiple errors (2 sub-exceptions)
type_errors = eg.subgroup(TypeError)
print(type_errors)
# ExceptionGroup: multiple errors (1 sub-exception)
Nested ExceptionGroups
ExceptionGroups can nest when TaskGroups are nested:
async def inner_work():
async with asyncio.TaskGroup() as tg:
tg.create_task(fail_with(ValueError("inner-1")))
tg.create_task(fail_with(TypeError("inner-2")))
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(inner_work()) # Raises ExceptionGroup
tg.create_task(fail_with(RuntimeError("outer")))
except* ValueError as eg:
# Handles ValueError even though it is nested inside
# an inner ExceptionGroup
print(f"ValueErrors: {eg.exceptions}")
except* TypeError as eg:
print(f"TypeErrors: {eg.exceptions}")
except* RuntimeError as eg:
print(f"RuntimeErrors: {eg.exceptions}")
except* automatically traverses nested ExceptionGroup structures to find matching exceptions.
Part 4 -- Cancellation Semantics
Understanding exactly what happens when a task fails inside a TaskGroup is critical.
The Cancellation Sequence
- Task 2 raises
ValueError. - TaskGroup calls
cancel()on Task 1 and Task 3. - Cancellation injects
CancelledErrorat each task's nextawaitpoint. - Tasks handle
CancelledError(or let it propagate) and terminate. - TaskGroup waits for ALL tasks to finish.
- TaskGroup raises
ExceptionGroupcontaining only the non-cancellation exceptions.
Cleanup During Cancellation
Tasks should use try/finally to clean up when cancelled:
async def managed_task(name, resource_pool):
conn = await resource_pool.acquire()
try:
while True:
data = await fetch_next_item()
await process(data, conn)
except asyncio.CancelledError:
print(f"{name}: cleaning up after cancellation")
# You CAN await during cancellation cleanup
await conn.rollback()
raise # Always re-raise CancelledError
finally:
await resource_pool.release(conn)
Multiple Failures
If multiple tasks fail, all their exceptions are collected:
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fail_after(0.1, ValueError("first")))
tg.create_task(fail_after(0.1, TypeError("second")))
tg.create_task(fail_after(0.2, RuntimeError("third")))
except* (ValueError, TypeError, RuntimeError) as eg:
print(f"Total errors: {len(eg.exceptions)}")
for e in eg.exceptions:
print(f" {type(e).__name__}: {e}")
async def fail_after(delay, error):
await asyncio.sleep(delay)
raise error
asyncio.run(main())
# Output:
# Total errors: 3
# ValueError: first
# TypeError: second
# RuntimeError: third
When task 1 and task 2 fail at roughly the same time (both at 0.1s), the TaskGroup cancels task 3. Task 3 is cancelled before its 0.2s sleep completes. But if task 3 had already started raising its RuntimeError before the cancellation reached it, that exception would also be collected.
Part 5 -- Advanced TaskGroup Patterns
Pattern 1: Fan-Out / Fan-In
Process a batch of items concurrently with a bounded number of workers:
import asyncio
async def process_item(item: dict) -> dict:
"""Process a single item (simulated)."""
await asyncio.sleep(0.1)
return {**item, "processed": True}
async def fan_out_fan_in(items: list[dict], max_concurrent: int = 10) -> list[dict]:
"""Process items concurrently with bounded parallelism."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def bounded_process(item):
async with semaphore:
result = await process_item(item)
results.append(result)
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(bounded_process(item))
return results
async def main():
items = [{"id": i} for i in range(50)]
results = await fan_out_fan_in(items, max_concurrent=10)
print(f"Processed {len(results)} items")
asyncio.run(main())
Pattern 2: Graceful Degradation
Sometimes you want to continue even when some tasks fail. TaskGroup cancels everything on first failure. To work around this, catch exceptions inside the task:
from dataclasses import dataclass
from typing import TypeVar, Generic
T = TypeVar("T")
@dataclass
class Result(Generic[T]):
value: T | None = None
error: Exception | None = None
@property
def ok(self) -> bool:
return self.error is None
async def safe_fetch(url: str) -> Result[dict]:
"""Wrap a fetch so it never raises -- returns Result instead."""
try:
response = await do_fetch(url)
return Result(value=response)
except Exception as e:
return Result(error=e)
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/invalid", # Will fail
"https://api.example.com/posts",
]
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(safe_fetch(url)) for url in urls]
# No ExceptionGroup raised -- errors are in Results
for task, url in zip(tasks, urls):
result = task.result()
if result.ok:
print(f"{url}: {result.value}")
else:
print(f"{url}: FAILED - {result.error}")
Pattern 3: Timeout with TaskGroup
Combine asyncio.timeout() with TaskGroup:
async def main():
try:
async with asyncio.timeout(5.0):
async with asyncio.TaskGroup() as tg:
tg.create_task(slow_operation_1())
tg.create_task(slow_operation_2())
tg.create_task(slow_operation_3())
except TimeoutError:
print("Operations took too long -- all tasks cancelled")
except* OperationError as eg:
print(f"Some operations failed: {eg.exceptions}")
Pattern 4: Dynamic Task Creation
You can create tasks dynamically inside the TaskGroup block, including from within tasks themselves:
async def crawl(tg: asyncio.TaskGroup, url: str, visited: set, depth: int):
"""Recursively crawl pages, spawning tasks for each link."""
if depth <= 0 or url in visited:
return
visited.add(url)
links = await fetch_links(url)
for link in links:
if link not in visited:
tg.create_task(
crawl(tg, link, visited, depth - 1),
name=f"crawl-{link}"
)
async def main():
visited = set()
async with asyncio.TaskGroup() as tg:
tg.create_task(crawl(tg, "https://example.com", visited, depth=3))
print(f"Crawled {len(visited)} pages")
Be careful with unbounded dynamic task creation. A recursive crawler without depth limits or rate limiting can spawn thousands of tasks and overwhelm the system. Always bound the total number of concurrent tasks.
Part 6 -- TaskGroup vs. gather() Migration
Direct Replacement
# Before (gather)
results = await asyncio.gather(
fetch_users(),
fetch_posts(),
fetch_comments()
)
users, posts, comments = results
# After (TaskGroup)
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_users())
t2 = tg.create_task(fetch_posts())
t3 = tg.create_task(fetch_comments())
users = t1.result()
posts = t2.result()
comments = t3.result()
Handling return_exceptions=True
# Before
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
# After -- use the safe_fetch wrapper pattern
async with asyncio.TaskGroup() as tg:
result_tasks = [tg.create_task(safe_wrapper(t)) for t in task_args]
results = [t.result() for t in result_tasks]
successes = [r for r in results if r.ok]
failures = [r for r in results if not r.ok]
When gather() is Still Appropriate
gather() is simpler for cases where:
- You trust all tasks to succeed
- You want all tasks to run to completion regardless of individual failures
- You are writing a quick script, not production code
For production services, prefer TaskGroup.
Part 7 -- The Nursery Pattern
TaskGroup implements what the structured concurrency literature calls the "nursery pattern" (term coined by the Trio library). The core guarantees are:
- Bounded lifetime: No task outlives its parent scope.
- Error propagation: If a child fails, the parent knows.
- Automatic cleanup: All children are cancelled when the scope exits.
- No orphans: You cannot create a task that nobody is responsible for.
This contrasts with create_task() used outside a TaskGroup, which creates "fire-and-forget" tasks. Those tasks are orphans -- if they fail, the exception may be silently lost or only show up as a warning when the task is garbage-collected.
# DANGEROUS: orphaned task
async def main():
task = asyncio.create_task(might_fail())
await asyncio.sleep(1)
# If might_fail() raised, we might never know
# Python prints: "Task exception was never retrieved"
# SAFE: TaskGroup owns the task
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(might_fail())
# If might_fail() raised, ExceptionGroup is raised here
Key Takeaways
asyncio.gather()has fundamental problems: it does not cancel siblings on failure, mixes exceptions with results when usingreturn_exceptions=True, and can leak orphaned tasks.asyncio.TaskGroup(Python 3.11+) provides structured concurrency: bounded task lifetimes, automatic cancellation, and collected error propagation.- When any task in a TaskGroup raises, all other tasks are cancelled. The TaskGroup waits for all cancellations to complete, then raises
ExceptionGroup. - Use
except*to handleExceptionGroup. Unlikeexcept, multipleexcept*clauses can match different exception types from the same group. ExceptionGroupcan nest when TaskGroups are nested.except*traverses nested groups automatically.- For graceful degradation (continue despite failures), wrap individual task coroutines in error-catching wrappers that return a Result type.
- Dynamic task creation inside a TaskGroup is safe -- the group waits for all tasks, including those created by other tasks.
- Prefer TaskGroup over
gather()in production code. Reservegather()for simple scripts or trusted, failure-tolerant scenarios.
Graded Practice Challenges
Level 1 -- Predict the Output
Question 1: What does this print?
import asyncio
async def worker(n, delay):
await asyncio.sleep(delay)
print(f"worker-{n} done")
return n * 10
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(worker(1, 0.3))
t2 = tg.create_task(worker(2, 0.1))
t3 = tg.create_task(worker(3, 0.2))
print(f"results: {t1.result()}, {t2.result()}, {t3.result()}")
asyncio.run(main())
Answer
worker-2 done
worker-3 done
worker-1 done
results: 10, 20, 30
Tasks complete in order of their delay (2, 3, 1), but results are accessed in creation order. The TaskGroup waits for all three before continuing.
Question 2: What does this print?
import asyncio
async def task_a():
await asyncio.sleep(0.1)
raise ValueError("A")
async def task_b():
await asyncio.sleep(0.1)
raise TypeError("B")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task_a())
tg.create_task(task_b())
except* ValueError as eg:
print(f"ValueError: {eg.exceptions}")
except* TypeError as eg:
print(f"TypeError: {eg.exceptions}")
asyncio.run(main())
Answer
ValueError: (ValueError('A'),)
TypeError: (TypeError('B'),)
Both except* clauses execute because each handles a different exception type from the group. This is the key difference from regular except -- all matching except* clauses run.
Question 3: What happens here?
import asyncio
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(0.1))
print("done")
# Can we reuse the TaskGroup?
async with tg:
tg.create_task(asyncio.sleep(0.1))
print("reused")
asyncio.run(main())
Answer
The second async with tg: raises RuntimeError because a TaskGroup cannot be reused. Each async with asyncio.TaskGroup() creates a new group. The variable tg refers to the exhausted group from the first block.
done
Traceback: RuntimeError: TaskGroup ... has already been used
Level 2 -- Debug Challenge
This code runs parallel API fetches but hangs forever when the API is slow. Find and fix the issue.
import asyncio
async def fetch_api(endpoint: str) -> dict:
"""Simulates an API call that may be very slow."""
delays = {
"/users": 1.0,
"/posts": 300.0, # This endpoint is essentially hung
"/comments": 0.5,
}
await asyncio.sleep(delays.get(endpoint, 1.0))
return {"endpoint": endpoint, "data": []}
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_api("/users"))
t2 = tg.create_task(fetch_api("/posts"))
t3 = tg.create_task(fetch_api("/comments"))
print(t1.result(), t2.result(), t3.result())
asyncio.run(main())
Answer
The TaskGroup waits for ALL tasks, including /posts which takes 300 seconds. There is no timeout.
Fix: wrap the TaskGroup in asyncio.timeout():
async def main():
try:
async with asyncio.timeout(5.0):
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_api("/users"))
t2 = tg.create_task(fetch_api("/posts"))
t3 = tg.create_task(fetch_api("/comments"))
print(t1.result(), t2.result(), t3.result())
except TimeoutError:
print("Fetch timed out -- not all APIs responded in time")
Or add per-task timeouts:
async def fetch_with_timeout(endpoint: str, timeout: float) -> dict:
async with asyncio.timeout(timeout):
return await fetch_api(endpoint)
Level 3 -- Design Challenge
Design a WorkerPool class that:
- Accepts a maximum concurrency limit
- Accepts an async iterable of work items
- Processes each item with a provided async worker function
- Uses TaskGroup for structured concurrency
- Returns results in the same order as inputs
- If any worker fails, cancels remaining work and raises the error
- Supports a configurable timeout per item
Design Hints
class WorkerPool:
def __init__(self, worker_fn, max_concurrent: int = 10,
per_item_timeout: float | None = None):
self.worker_fn = worker_fn
self.max_concurrent = max_concurrent
self.per_item_timeout = per_item_timeout
async def map(self, items) -> list:
"""Process items maintaining input order."""
semaphore = asyncio.Semaphore(self.max_concurrent)
results: dict[int, Any] = {}
async def process(index, item):
async with semaphore:
if self.per_item_timeout:
async with asyncio.timeout(self.per_item_timeout):
result = await self.worker_fn(item)
else:
result = await self.worker_fn(item)
results[index] = result
async with asyncio.TaskGroup() as tg:
for i, item in enumerate(items):
tg.create_task(process(i, item), name=f"worker-{i}")
# Return results in input order
return [results[i] for i in range(len(results))]
Key considerations:
- Use a
dict[int, result]to maintain input order despite concurrent completion order - Semaphore bounds concurrency without limiting the total number of items
asyncio.timeout()per item prevents a single slow item from blocking the pool- TaskGroup ensures cleanup on failure
What's Next
Now that you understand how to manage groups of concurrent tasks safely, the next lesson, Custom Awaitables, takes you inside the await machinery itself. You will learn how coroutines, Futures, and the __await__ protocol work under the hood -- knowledge that lets you build custom async primitives.
