Python Structured Concurrency with TaskGroup: Practice Problems & Exercises
Practice: Structured Concurrency with TaskGroup
← Back to lessonEasy
Run three coroutines concurrently using asyncio.TaskGroup and print a done message from each.
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task("task_a", 0.01))
tg.create_task(task("task_b", 0.02))
tg.create_task(task("task_c", 0.005))
asyncio.run(main())
Solution
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task("task_a", 0.01))
tg.create_task(task("task_b", 0.02))
tg.create_task(task("task_c", 0.005))
asyncio.run(main())
TaskGroup fundamentals:
- All tasks created inside the
async withblock are awaited when the block exits. - Tasks run concurrently —
task_c(0.005s) finishes first, thentask_a(0.01s), thentask_b(0.02s). - If any task raises an exception, all other tasks are cancelled and the exception propagates as an
ExceptionGroup. - Compare with
asyncio.gather(): gather can silently leak tasks if cancelled; TaskGroup always cleans up. - Python 3.11+ only — use
anyio.create_task_group()for 3.9/3.10 compatibility.
Expected Output
task_a done\ntask_b done\ntask_c doneHints
Hint 1: Use async with asyncio.TaskGroup() as tg: to create a group, then tg.create_task(coro) to add tasks.
Hint 2: The async with block waits for ALL tasks to complete before continuing.
Use TaskGroup to compute squares concurrently and collect the results in order.
import asyncio
async def square(n):
await asyncio.sleep(0.001)
return n * n
async def main():
tasks = []
async with asyncio.TaskGroup() as tg:
for n in range(1, 6):
tasks.append(tg.create_task(square(n)))
results = [t.result() for t in tasks]
print(f"Results: {results}")
asyncio.run(main())
Solution
import asyncio
async def square(n):
await asyncio.sleep(0.001)
return n * n
async def main():
tasks = []
async with asyncio.TaskGroup() as tg:
for n in range(1, 6):
tasks.append(tg.create_task(square(n)))
# After the block: all tasks are done
results = [t.result() for t in tasks]
print(f"Results: {results}")
asyncio.run(main())
Result collection pattern:
tg.create_task()returns anasyncio.Task— store references before the block exits.- After the
async withblock, all tasks are complete. Call.result()to get return values. - Results are ordered by how you added the tasks (not by completion order) — predictable unlike
asyncio.gatherwithreturn_exceptions=True. - If a task raised an exception,
.result()re-raises it — but with TaskGroup, if any task failed, the entire group failed, so you would not normally reach this line. - For result + error handling, check
t.exception()before callingt.result().
Expected Output
Results: [1, 4, 9, 16, 25]Hints
Hint 1: tg.create_task() returns an asyncio.Task object. Call .result() on it after the TaskGroup block exits.
Hint 2: Tasks are guaranteed to be complete (or failed) once the async with block exits.
Predict the output of both approaches when one task fails. Understand how error propagation differs.
import asyncio
async def good_task():
await asyncio.sleep(0.02)
return "Task B result"
async def bad_task():
await asyncio.sleep(0.01)
raise ValueError("Task A failed")
async def with_gather():
try:
results = await asyncio.gather(bad_task(), good_task(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"gather error: {r}")
else:
print(f"gather: {r}")
except Exception as e:
print(f"gather raised: {e}")
async def with_taskgroup():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(bad_task())
tg.create_task(good_task())
except* ValueError as eg:
print(f"TaskGroup raised ExceptionGroup")
async def main():
await with_gather()
await with_taskgroup()
asyncio.run(main())
Solution
import asyncio
async def good_task():
await asyncio.sleep(0.02)
return "Task B result"
async def bad_task():
await asyncio.sleep(0.01)
raise ValueError("Task A failed")
async def with_gather():
results = await asyncio.gather(bad_task(), good_task(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"gather error: {r}")
else:
print(f"gather: {r}")
async def with_taskgroup():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(bad_task())
tg.create_task(good_task())
except* ValueError as eg:
print(f"TaskGroup raised ExceptionGroup")
async def main():
await with_gather()
await with_taskgroup()
asyncio.run(main())
Key behavioral difference:
gather(return_exceptions=True)collects all results including exceptions — both tasks complete; you inspect results manually.gather(return_exceptions=False)raises the first exception immediately — remaining tasks may still be running in the background (the leak problem).TaskGroupalways cancels all sibling tasks when any task fails, then raises anExceptionGroupcontaining ALL exceptions from failed tasks.except*is the new Python 3.11 syntax for handlingExceptionGroup— it matches specific exception types within the group.- The cleanup guarantee is the primary reason to prefer TaskGroup in production.
Expected Output
gather: Task B result\nTaskGroup raised ExceptionGroupHints
Hint 1: asyncio.gather() with return_exceptions=False raises the first exception and cancels remaining tasks, but may not await their cancellation.
Hint 2: TaskGroup always awaits cancellation of all sibling tasks before re-raising.
Medium
Handle mixed exception types from a TaskGroup using except* with multiple clauses.
import asyncio
async def failing_task(task_id, exc_type, message):
await asyncio.sleep(0.001)
raise exc_type(message)
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1, ValueError, "error in task 1"))
tg.create_task(failing_task(2, RuntimeError, "resource unavailable"))
tg.create_task(failing_task(3, ValueError, "error in task 3"))
except* ValueError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught {len(eg.exceptions)} ValueErrors: {msgs}")
except* RuntimeError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught 1 RuntimeError: {msgs}")
asyncio.run(main())
Solution
import asyncio
async def failing_task(task_id, exc_type, message):
await asyncio.sleep(0.001)
raise exc_type(message)
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1, ValueError, "error in task 1"))
tg.create_task(failing_task(2, RuntimeError, "resource unavailable"))
tg.create_task(failing_task(3, ValueError, "error in task 3"))
except* ValueError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught {len(eg.exceptions)} ValueErrors: {msgs}")
except* RuntimeError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught 1 RuntimeError: {msgs}")
asyncio.run(main())
ExceptionGroup and except mechanics:*
ExceptionGroupwraps multiple exceptions that occurred concurrently. It is the result of a TaskGroup where multiple tasks failed.except* ExcTypefilters the group, extracting only matching exceptions. The remaining (unmatched) exceptions are re-raised in a newExceptionGroup.- If all exceptions are matched by
except*clauses, theExceptionGroupis fully handled. eg.exceptionsis a tuple of the matched exceptions.except*cannot be mixed with regularexceptin the same try block.ExceptionGroupcan be nested: a task group containing sub-groups produces nestedExceptionGroupobjects.
Expected Output
Caught 2 ValueErrors: ['error in task 1', 'error in task 3']\nCaught 1 RuntimeError: ['resource unavailable']Hints
Hint 1: except* ValueError as eg catches only ValueError instances from the group. eg.exceptions is the list.
Hint 2: Multiple except* clauses can handle different exception types from the same ExceptionGroup.
Add a timeout to a TaskGroup so that if the group takes too long, all tasks are cancelled.
import asyncio
async def fast_query():
await asyncio.sleep(0.01)
print("fast_query completed")
return "fast result"
async def slow_query():
await asyncio.sleep(0.1)
return "slow result"
async def main():
try:
async with asyncio.timeout(0.03):
async with asyncio.TaskGroup() as tg:
tg.create_task(fast_query())
tg.create_task(slow_query())
except TimeoutError:
print("Operation timed out after 0.03s")
asyncio.run(main())
Solution
import asyncio
async def fast_query():
await asyncio.sleep(0.01)
print("fast_query completed")
return "fast result"
async def slow_query():
await asyncio.sleep(0.1)
return "slow result"
async def main():
try:
async with asyncio.timeout(0.03):
async with asyncio.TaskGroup() as tg:
tg.create_task(fast_query())
tg.create_task(slow_query())
except TimeoutError:
print("Operation timed out after 0.03s")
asyncio.run(main())
Timeout + TaskGroup interaction:
asyncio.timeout()is a context manager (Python 3.11+) that raisesTimeoutErrorwhen the deadline is exceeded.- When the timeout fires, it cancels the current coroutine, which propagates cancellation into the TaskGroup.
- TaskGroup receives the cancellation and cancels all its running tasks before allowing the cancellation to propagate.
- This combination guarantees no tasks are orphaned after a timeout.
- For Python 3.10 and earlier, use
asyncio.wait_for()or the third-partyanyiolibrary. - The
fast_querytask may print its message because it completed before the timeout — this depends on timing.
Expected Output
fast_query completed\nOperation timed out after 0.03sHints
Hint 1: Wrap the TaskGroup in asyncio.timeout() (Python 3.11+) or asyncio.wait_for() for timeout control.
Hint 2: When the timeout fires, TaskGroup cancels all running tasks.
Build a two-level task hierarchy: an outer group spawns "fetch" tasks, each of which uses its own inner TaskGroup to perform sub-tasks in parallel.
import asyncio
async def process_record(shard_id, record_id):
await asyncio.sleep(0.001)
return f"shard_{shard_id}_record_{record_id}"
async def fetch_shard(shard_id, record_count):
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_record(shard_id, r)) for r in range(record_count)]
results = [t.result() for t in tasks]
print(f"Shard {shard_id}: {len(results)} records processed")
return results
async def main():
all_results = []
shard_tasks = []
async with asyncio.TaskGroup() as outer:
for shard_id in range(3):
shard_tasks.append(outer.create_task(fetch_shard(shard_id, 2)))
for t in shard_tasks:
all_results.extend(t.result())
print(f"Total records: {len(all_results)}")
asyncio.run(main())
Solution
import asyncio
async def process_record(shard_id, record_id):
await asyncio.sleep(0.001)
return f"shard_{shard_id}_record_{record_id}"
async def fetch_shard(shard_id, record_count):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_record(shard_id, r)) for r in range(record_count)]
results = [t.result() for t in tasks]
print(f"Shard {shard_id}: {len(results)} records processed")
return results
async def main():
shard_tasks = []
async with asyncio.TaskGroup() as outer:
for shard_id in range(3):
shard_tasks.append(outer.create_task(fetch_shard(shard_id, 2)))
all_results = []
for t in shard_tasks:
all_results.extend(t.result())
print(f"Total records: {len(all_results)}")
asyncio.run(main())
Hierarchical concurrency:
- Three shards run concurrently in the outer group. Within each shard, records run concurrently in the inner group.
- This gives a 3x2 concurrency matrix: 6 record tasks running potentially in parallel.
- Error propagation: if any inner group task fails, the inner group fails, which fails the shard task, which the outer group sees as a task failure, cancelling all other shards.
- This is structured concurrency's key property: the failure scope is well-defined and predictable.
- Real-world mapping: outer group = shards / partitions; inner group = parallel reads per shard.
Expected Output
See solution — outer and inner groups complete in correct orderHints
Hint 1: Nest TaskGroup blocks inside task coroutines to create hierarchical concurrency.
Hint 2: Each TaskGroup scope is independent — failures in an inner group propagate up to the outer group.
Implement a fan-out/fan-in pattern: fetch multiple URLs concurrently with TaskGroup, then aggregate results.
import asyncio
async def fetch_url(url):
await asyncio.sleep(0.01) # simulate I/O
return f"content_of_{url}"
async def main():
urls = [f"url_{i}" for i in range(5)]
tasks = []
async with asyncio.TaskGroup() as tg:
for url in urls:
tasks.append(tg.create_task(fetch_url(url)))
results = [t.result() for t in tasks]
print(f"Processed {len(results)} URLs in parallel")
print("All results collected")
asyncio.run(main())
Solution
import asyncio
async def fetch_url(url):
await asyncio.sleep(0.01) # simulate network I/O
return f"content_of_{url}"
async def main():
urls = [f"url_{i}" for i in range(5)]
tasks = []
async with asyncio.TaskGroup() as tg:
for url in urls:
tasks.append(tg.create_task(fetch_url(url)))
results = [t.result() for t in tasks]
print(f"Processed {len(results)} URLs in parallel")
print("All results collected")
asyncio.run(main())
Fan-out/fan-in pattern:
- Fan-out: distribute work across N tasks created inside the TaskGroup.
- Fan-in: collect results after the TaskGroup block exits (all tasks are complete).
- Results are in input order because we preserve task references in order of creation.
- This replaces
asyncio.gather()with better error semantics and no risk of task leaks. - Production pattern: limit concurrency by using this pattern inside a semaphore context, or by batching the URL list into chunks processed serially.
Expected Output
Processed 5 URLs in parallel\nAll results collectedHints
Hint 1: Fan-out: create one task per URL in the TaskGroup. Fan-in: collect results after the block.
Hint 2: Store tasks as a list to collect results in input order.
Hard
Demonstrate that cancellation of a TaskGroup propagates to all tasks and that per-task cleanup runs correctly.
import asyncio
async def task_with_cleanup(name):
try:
print(f"{name} started")
await asyncio.sleep(10) # long sleep — will be cancelled
finally:
print(f"{name} cleanup ran")
async def main():
try:
async with asyncio.timeout(0.02):
async with asyncio.TaskGroup() as tg:
tg.create_task(task_with_cleanup("task_1"))
tg.create_task(task_with_cleanup("task_2"))
except TimeoutError:
print("CancelledError propagated")
asyncio.run(main())
Solution
import asyncio
async def task_with_cleanup(name):
try:
print(f"{name} started")
await asyncio.sleep(10) # will be cancelled by timeout
finally:
print(f"{name} cleanup ran")
async def main():
try:
async with asyncio.timeout(0.02):
async with asyncio.TaskGroup() as tg:
tg.create_task(task_with_cleanup("task_1"))
tg.create_task(task_with_cleanup("task_2"))
except TimeoutError:
print("CancelledError propagated")
asyncio.run(main())
Cancellation mechanics:
asyncio.timeoutfires and cancels the enclosing scope, which cancels the TaskGroup, which cancels all tasks.- Each task receives
CancelledErrorat itsawait asyncio.sleep(10)point. - The
finallyblock in each task runs, guaranteeing cleanup. CancelledErroris NOT caught byexcept Exception(it inherits fromBaseException), so cleanup code must not accidentally suppress it.- After all tasks finish their cleanup, the
TimeoutErrorpropagates out of theasyncio.timeoutcontext manager. - The
finallypattern is how you implement resource release (close DB connections, file handles, network sockets) even when cancelled.
Expected Output
task_1 started\ntask_2 started\ntask_1 cleanup ran\ntask_2 cleanup ran\nCancelledError propagatedHints
Hint 1: CancelledError is raised at await points inside a cancelled task. Use try/finally to guarantee cleanup.
Hint 2: Do NOT suppress CancelledError — re-raise it after cleanup. Suppressing it breaks the cancellation protocol.
Write a demonstration showing how bare asyncio.create_task() can leak tasks, and how TaskGroup prevents it.
import asyncio
async def background_task(label):
try:
await asyncio.sleep(0.1)
print(f"{label}: completed (potential leak)")
except asyncio.CancelledError:
print(f"{label}: was cancelled (no leak)")
raise
async def leaky_approach():
"""Tasks created here may outlive this coroutine."""
t = asyncio.create_task(background_task("leaky"))
await asyncio.sleep(0.01) # creator finishes early
# t is still running — leaked task
return t
async def safe_approach():
"""TaskGroup ensures all tasks complete (or are cancelled) before exit."""
async with asyncio.TaskGroup() as tg:
tg.create_task(background_task("safe"))
await asyncio.sleep(0.01) # still inside the group
# Here, background_task("safe") has finished
async def main():
# Leaky: task runs past its creator's lifetime
leaked_task = await leaky_approach()
await asyncio.sleep(0) # yield to let the task keep running
leaked_task.cancel()
await asyncio.sleep(0)
# Safe: TaskGroup waits for all tasks
await safe_approach()
print("TaskGroup: all tasks complete")
asyncio.run(main())
Solution
import asyncio
async def background_task(label):
try:
await asyncio.sleep(0.1)
print(f"{label}: completed (potential leak)")
except asyncio.CancelledError:
print(f"{label}: was cancelled (no leak)")
raise
async def leaky_approach():
t = asyncio.create_task(background_task("leaky"))
await asyncio.sleep(0.01)
return t # creator is done, task still running
async def safe_approach():
async with asyncio.TaskGroup() as tg:
tg.create_task(background_task("safe"))
await asyncio.sleep(0.01)
async def main():
leaked_task = await leaky_approach()
await asyncio.sleep(0)
leaked_task.cancel()
await asyncio.sleep(0.001)
await safe_approach()
print("TaskGroup: all tasks complete")
asyncio.run(main())
Task leak explained:
asyncio.create_task()attaches a task to the running event loop, not to the calling coroutine.- When
leaky_approach()returns, the task continues running independently. - If the event loop shuts down while the task is running, you get warnings about "Task was destroyed but it is pending!"
TaskGroupsolves this: theasync withblock acts as a scope boundary. You cannot exit the scope until all tasks inside it are done (or cancelled).- This is the core principle of structured concurrency: task lifetime is bounded by its creating scope.
Expected Output
Demonstrates that fire-and-forget tasks outlive their creator; TaskGroup prevents thisHints
Hint 1: A task created with asyncio.create_task() outside a TaskGroup is attached to the event loop, not to any scope. It can outlive its creator coroutine.
Hint 2: Prove the leak by cancelling the parent and observing the orphan task still running.
Build a parallel processing pipeline that processes a list of records, collects per-record errors without aborting the group, and reports a summary.
import asyncio
async def process_record(record_id, data):
await asyncio.sleep(0.001)
if data < 0:
raise ValueError(f"Invalid data {data} for record {record_id}")
return record_id * data
async def main():
records = [(1, 10), (2, -5), (3, 20), (4, -3), (5, 15)]
successes = []
errors = []
async def safe_process(rec_id, val):
try:
result = await process_record(rec_id, val)
successes.append((rec_id, result))
except ValueError as e:
errors.append((rec_id, str(e)))
async with asyncio.TaskGroup() as tg:
for rec_id, val in records:
tg.create_task(safe_process(rec_id, val))
print(f"Successes: {successes}")
print(f"Errors: {errors}")
asyncio.run(main())
Solution
import asyncio
async def process_record(record_id, data):
await asyncio.sleep(0.001)
if data < 0:
raise ValueError(f"Invalid data {data} for record {record_id}")
return record_id * data
async def main():
records = [(1, 10), (2, -5), (3, 20), (4, -3), (5, 15)]
successes = []
errors = []
async def safe_process(rec_id, val):
try:
result = await process_record(rec_id, val)
successes.append((rec_id, result))
except ValueError as e:
errors.append((rec_id, str(e)))
async with asyncio.TaskGroup() as tg:
for rec_id, val in records:
tg.create_task(safe_process(rec_id, val))
print(f"Successes: {successes}")
print(f"Errors: {errors}")
asyncio.run(main())
Error isolation pattern:
- The
safe_processwrapper catches per-record errors before they reach the TaskGroup, so the group itself never sees a failed task. - Shared lists (
successes,errors) are safe here because the asyncio event loop is single-threaded — there is no concurrent mutation. - This pattern implements "partial success" semantics: process all records, collect all errors, report at the end.
- Alternative: use a result queue (asyncio.Queue) instead of shared lists if you need streaming results.
- For CPU-bound processing, replace
asyncio.sleepwithloop.run_in_executor()to avoid blocking the event loop.
Expected Output
See solution — processes records, collects errors, continues partial successHints
Hint 1: Use a shared list to collect successful results and a separate list for errors.
Hint 2: Wrap individual task logic in try/except so a single record failure does not fail the whole group.
Demonstrate that a task running inside a TaskGroup can spawn additional tasks into the same group, enabling dynamic work discovery (like a web crawler).
import asyncio
import random
async def crawl(url, depth, tg, results, visited):
if url in visited or depth == 0:
return
visited.add(url)
await asyncio.sleep(0.001) # simulate fetch
results.append(url)
# "discover" sub-URLs
sub_urls = [f"{url}/child_{i}" for i in range(2)]
for sub in sub_urls:
if sub not in visited:
tg.create_task(crawl(sub, depth - 1, tg, results, visited))
async def main():
results = []
visited = set()
async with asyncio.TaskGroup() as tg:
tg.create_task(crawl("root", depth=2, tg=tg, results=results, visited=visited))
print(f"Crawled {len(results)} pages")
print(sorted(results))
asyncio.run(main())
Solution
import asyncio
async def crawl(url, depth, tg, results, visited):
if url in visited or depth == 0:
return
visited.add(url)
await asyncio.sleep(0.001)
results.append(url)
sub_urls = [f"{url}/child_{i}" for i in range(2)]
for sub in sub_urls:
if sub not in visited:
tg.create_task(crawl(sub, depth - 1, tg=tg, results=results, visited=visited))
async def main():
results = []
visited = set()
async with asyncio.TaskGroup() as tg:
tg.create_task(crawl("root", depth=2, tg=tg, results=results, visited=visited))
print(f"Crawled {len(results)} pages")
print(sorted(results))
asyncio.run(main())
Dynamic task spawning:
TaskGrouptracks all tasks created viatg.create_task(), even if they are spawned from within running tasks.- The group does not exit until ALL tasks (including dynamically added ones) complete.
- This is the structured concurrency solution to recursive fan-out: no tasks leak, no manual join needed.
- Expected crawl: root (depth 2) spawns root/child_0 and root/child_1 (depth 1), each of which spawns 2 more (depth 0, no-ops). Total crawled = 3 pages.
- Production crawler adds: semaphore for rate limiting, deduplication with a persistent store, error handling per URL, and depth-first vs breadth-first strategy.
Expected Output
See solution — tasks spawn subtasks dynamically; all complete before group exitsHints
Hint 1: Tasks created inside a TaskGroup (via tg.create_task) from within a running task are tracked by the same group.
Hint 2: You can pass the tg reference into subtasks to allow recursive task spawning.
