Skip to main content

Python Structured Concurrency with TaskGroup: Practice Problems & Exercises

Practice: Structured Concurrency with TaskGroup

11 problems3 Easy4 Medium4 Hard60–90 min
← Back to lesson

Easy

#1Basic TaskGroupEasy
TaskGroupasyncioconcurrent-tasks

Run three coroutines concurrently using asyncio.TaskGroup and print a done message from each.

import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")

async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task("task_a", 0.01))
tg.create_task(task("task_b", 0.02))
tg.create_task(task("task_c", 0.005))

asyncio.run(main())
Solution
import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
print(f"{name} done")

async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(task("task_a", 0.01))
tg.create_task(task("task_b", 0.02))
tg.create_task(task("task_c", 0.005))

asyncio.run(main())

TaskGroup fundamentals:

  • All tasks created inside the async with block are awaited when the block exits.
  • Tasks run concurrently — task_c (0.005s) finishes first, then task_a (0.01s), then task_b (0.02s).
  • If any task raises an exception, all other tasks are cancelled and the exception propagates as an ExceptionGroup.
  • Compare with asyncio.gather(): gather can silently leak tasks if cancelled; TaskGroup always cleans up.
  • Python 3.11+ only — use anyio.create_task_group() for 3.9/3.10 compatibility.
Expected Output
task_a done\ntask_b done\ntask_c done
Hints

Hint 1: Use async with asyncio.TaskGroup() as tg: to create a group, then tg.create_task(coro) to add tasks.

Hint 2: The async with block waits for ALL tasks to complete before continuing.

#2Collecting Task ResultsEasy
TaskGrouptask-resultasyncio.Task

Use TaskGroup to compute squares concurrently and collect the results in order.

import asyncio

async def square(n):
await asyncio.sleep(0.001)
return n * n

async def main():
tasks = []
async with asyncio.TaskGroup() as tg:
for n in range(1, 6):
tasks.append(tg.create_task(square(n)))
results = [t.result() for t in tasks]
print(f"Results: {results}")

asyncio.run(main())
Solution
import asyncio

async def square(n):
await asyncio.sleep(0.001)
return n * n

async def main():
tasks = []
async with asyncio.TaskGroup() as tg:
for n in range(1, 6):
tasks.append(tg.create_task(square(n)))
# After the block: all tasks are done
results = [t.result() for t in tasks]
print(f"Results: {results}")

asyncio.run(main())

Result collection pattern:

  • tg.create_task() returns an asyncio.Task — store references before the block exits.
  • After the async with block, all tasks are complete. Call .result() to get return values.
  • Results are ordered by how you added the tasks (not by completion order) — predictable unlike asyncio.gather with return_exceptions=True.
  • If a task raised an exception, .result() re-raises it — but with TaskGroup, if any task failed, the entire group failed, so you would not normally reach this line.
  • For result + error handling, check t.exception() before calling t.result().
Expected Output
Results: [1, 4, 9, 16, 25]
Hints

Hint 1: tg.create_task() returns an asyncio.Task object. Call .result() on it after the TaskGroup block exits.

Hint 2: Tasks are guaranteed to be complete (or failed) once the async with block exits.

#3TaskGroup vs gather() — Spot the DifferenceEasy
TaskGroupasyncio.gathercancellationcomparison

Predict the output of both approaches when one task fails. Understand how error propagation differs.

import asyncio

async def good_task():
await asyncio.sleep(0.02)
return "Task B result"

async def bad_task():
await asyncio.sleep(0.01)
raise ValueError("Task A failed")

async def with_gather():
try:
results = await asyncio.gather(bad_task(), good_task(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"gather error: {r}")
else:
print(f"gather: {r}")
except Exception as e:
print(f"gather raised: {e}")

async def with_taskgroup():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(bad_task())
tg.create_task(good_task())
except* ValueError as eg:
print(f"TaskGroup raised ExceptionGroup")

async def main():
await with_gather()
await with_taskgroup()

asyncio.run(main())
Solution
import asyncio

async def good_task():
await asyncio.sleep(0.02)
return "Task B result"

async def bad_task():
await asyncio.sleep(0.01)
raise ValueError("Task A failed")

async def with_gather():
results = await asyncio.gather(bad_task(), good_task(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"gather error: {r}")
else:
print(f"gather: {r}")

async def with_taskgroup():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(bad_task())
tg.create_task(good_task())
except* ValueError as eg:
print(f"TaskGroup raised ExceptionGroup")

async def main():
await with_gather()
await with_taskgroup()

asyncio.run(main())

Key behavioral difference:

  • gather(return_exceptions=True) collects all results including exceptions — both tasks complete; you inspect results manually.
  • gather(return_exceptions=False) raises the first exception immediately — remaining tasks may still be running in the background (the leak problem).
  • TaskGroup always cancels all sibling tasks when any task fails, then raises an ExceptionGroup containing ALL exceptions from failed tasks.
  • except* is the new Python 3.11 syntax for handling ExceptionGroup — it matches specific exception types within the group.
  • The cleanup guarantee is the primary reason to prefer TaskGroup in production.
Expected Output
gather: Task B result\nTaskGroup raised ExceptionGroup
Hints

Hint 1: asyncio.gather() with return_exceptions=False raises the first exception and cancels remaining tasks, but may not await their cancellation.

Hint 2: TaskGroup always awaits cancellation of all sibling tasks before re-raising.


Medium

#4Handle ExceptionGroupMedium
ExceptionGroupexcept*error-handlingTaskGroup

Handle mixed exception types from a TaskGroup using except* with multiple clauses.

import asyncio

async def failing_task(task_id, exc_type, message):
await asyncio.sleep(0.001)
raise exc_type(message)

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1, ValueError, "error in task 1"))
tg.create_task(failing_task(2, RuntimeError, "resource unavailable"))
tg.create_task(failing_task(3, ValueError, "error in task 3"))
except* ValueError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught {len(eg.exceptions)} ValueErrors: {msgs}")
except* RuntimeError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught 1 RuntimeError: {msgs}")

asyncio.run(main())
Solution
import asyncio

async def failing_task(task_id, exc_type, message):
await asyncio.sleep(0.001)
raise exc_type(message)

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task(1, ValueError, "error in task 1"))
tg.create_task(failing_task(2, RuntimeError, "resource unavailable"))
tg.create_task(failing_task(3, ValueError, "error in task 3"))
except* ValueError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught {len(eg.exceptions)} ValueErrors: {msgs}")
except* RuntimeError as eg:
msgs = [str(e) for e in eg.exceptions]
print(f"Caught 1 RuntimeError: {msgs}")

asyncio.run(main())

ExceptionGroup and except mechanics:*

  • ExceptionGroup wraps multiple exceptions that occurred concurrently. It is the result of a TaskGroup where multiple tasks failed.
  • except* ExcType filters the group, extracting only matching exceptions. The remaining (unmatched) exceptions are re-raised in a new ExceptionGroup.
  • If all exceptions are matched by except* clauses, the ExceptionGroup is fully handled.
  • eg.exceptions is a tuple of the matched exceptions.
  • except* cannot be mixed with regular except in the same try block.
  • ExceptionGroup can be nested: a task group containing sub-groups produces nested ExceptionGroup objects.
Expected Output
Caught 2 ValueErrors: ['error in task 1', 'error in task 3']\nCaught 1 RuntimeError: ['resource unavailable']
Hints

Hint 1: except* ValueError as eg catches only ValueError instances from the group. eg.exceptions is the list.

Hint 2: Multiple except* clauses can handle different exception types from the same ExceptionGroup.

#5Timeout with TaskGroupMedium
asyncio.timeoutTaskGroupcancellationTimeoutError

Add a timeout to a TaskGroup so that if the group takes too long, all tasks are cancelled.

import asyncio

async def fast_query():
await asyncio.sleep(0.01)
print("fast_query completed")
return "fast result"

async def slow_query():
await asyncio.sleep(0.1)
return "slow result"

async def main():
try:
async with asyncio.timeout(0.03):
async with asyncio.TaskGroup() as tg:
tg.create_task(fast_query())
tg.create_task(slow_query())
except TimeoutError:
print("Operation timed out after 0.03s")

asyncio.run(main())
Solution
import asyncio

async def fast_query():
await asyncio.sleep(0.01)
print("fast_query completed")
return "fast result"

async def slow_query():
await asyncio.sleep(0.1)
return "slow result"

async def main():
try:
async with asyncio.timeout(0.03):
async with asyncio.TaskGroup() as tg:
tg.create_task(fast_query())
tg.create_task(slow_query())
except TimeoutError:
print("Operation timed out after 0.03s")

asyncio.run(main())

Timeout + TaskGroup interaction:

  • asyncio.timeout() is a context manager (Python 3.11+) that raises TimeoutError when the deadline is exceeded.
  • When the timeout fires, it cancels the current coroutine, which propagates cancellation into the TaskGroup.
  • TaskGroup receives the cancellation and cancels all its running tasks before allowing the cancellation to propagate.
  • This combination guarantees no tasks are orphaned after a timeout.
  • For Python 3.10 and earlier, use asyncio.wait_for() or the third-party anyio library.
  • The fast_query task may print its message because it completed before the timeout — this depends on timing.
Expected Output
fast_query completed\nOperation timed out after 0.03s
Hints

Hint 1: Wrap the TaskGroup in asyncio.timeout() (Python 3.11+) or asyncio.wait_for() for timeout control.

Hint 2: When the timeout fires, TaskGroup cancels all running tasks.

#6Nested TaskGroupsMedium
TaskGroupnestedhierarchical-concurrency

Build a two-level task hierarchy: an outer group spawns "fetch" tasks, each of which uses its own inner TaskGroup to perform sub-tasks in parallel.

import asyncio

async def process_record(shard_id, record_id):
await asyncio.sleep(0.001)
return f"shard_{shard_id}_record_{record_id}"

async def fetch_shard(shard_id, record_count):
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_record(shard_id, r)) for r in range(record_count)]
results = [t.result() for t in tasks]
print(f"Shard {shard_id}: {len(results)} records processed")
return results

async def main():
all_results = []
shard_tasks = []
async with asyncio.TaskGroup() as outer:
for shard_id in range(3):
shard_tasks.append(outer.create_task(fetch_shard(shard_id, 2)))
for t in shard_tasks:
all_results.extend(t.result())
print(f"Total records: {len(all_results)}")

asyncio.run(main())
Solution
import asyncio

async def process_record(shard_id, record_id):
await asyncio.sleep(0.001)
return f"shard_{shard_id}_record_{record_id}"

async def fetch_shard(shard_id, record_count):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process_record(shard_id, r)) for r in range(record_count)]
results = [t.result() for t in tasks]
print(f"Shard {shard_id}: {len(results)} records processed")
return results

async def main():
shard_tasks = []
async with asyncio.TaskGroup() as outer:
for shard_id in range(3):
shard_tasks.append(outer.create_task(fetch_shard(shard_id, 2)))
all_results = []
for t in shard_tasks:
all_results.extend(t.result())
print(f"Total records: {len(all_results)}")

asyncio.run(main())

Hierarchical concurrency:

  • Three shards run concurrently in the outer group. Within each shard, records run concurrently in the inner group.
  • This gives a 3x2 concurrency matrix: 6 record tasks running potentially in parallel.
  • Error propagation: if any inner group task fails, the inner group fails, which fails the shard task, which the outer group sees as a task failure, cancelling all other shards.
  • This is structured concurrency's key property: the failure scope is well-defined and predictable.
  • Real-world mapping: outer group = shards / partitions; inner group = parallel reads per shard.
Expected Output
See solution — outer and inner groups complete in correct order
Hints

Hint 1: Nest TaskGroup blocks inside task coroutines to create hierarchical concurrency.

Hint 2: Each TaskGroup scope is independent — failures in an inner group propagate up to the outer group.

#7Fan-Out Fan-In with TaskGroupMedium
fan-outfan-inTaskGrouppipeline

Implement a fan-out/fan-in pattern: fetch multiple URLs concurrently with TaskGroup, then aggregate results.

import asyncio

async def fetch_url(url):
await asyncio.sleep(0.01) # simulate I/O
return f"content_of_{url}"

async def main():
urls = [f"url_{i}" for i in range(5)]
tasks = []
async with asyncio.TaskGroup() as tg:
for url in urls:
tasks.append(tg.create_task(fetch_url(url)))
results = [t.result() for t in tasks]
print(f"Processed {len(results)} URLs in parallel")
print("All results collected")

asyncio.run(main())
Solution
import asyncio

async def fetch_url(url):
await asyncio.sleep(0.01) # simulate network I/O
return f"content_of_{url}"

async def main():
urls = [f"url_{i}" for i in range(5)]
tasks = []
async with asyncio.TaskGroup() as tg:
for url in urls:
tasks.append(tg.create_task(fetch_url(url)))
results = [t.result() for t in tasks]
print(f"Processed {len(results)} URLs in parallel")
print("All results collected")

asyncio.run(main())

Fan-out/fan-in pattern:

  • Fan-out: distribute work across N tasks created inside the TaskGroup.
  • Fan-in: collect results after the TaskGroup block exits (all tasks are complete).
  • Results are in input order because we preserve task references in order of creation.
  • This replaces asyncio.gather() with better error semantics and no risk of task leaks.
  • Production pattern: limit concurrency by using this pattern inside a semaphore context, or by batching the URL list into chunks processed serially.
Expected Output
Processed 5 URLs in parallel\nAll results collected
Hints

Hint 1: Fan-out: create one task per URL in the TaskGroup. Fan-in: collect results after the block.

Hint 2: Store tasks as a list to collect results in input order.


Hard

#8Cancellation PropagationHard
cancellationCancelledErrorTaskGroupcleanup

Demonstrate that cancellation of a TaskGroup propagates to all tasks and that per-task cleanup runs correctly.

import asyncio

async def task_with_cleanup(name):
try:
print(f"{name} started")
await asyncio.sleep(10) # long sleep — will be cancelled
finally:
print(f"{name} cleanup ran")

async def main():
try:
async with asyncio.timeout(0.02):
async with asyncio.TaskGroup() as tg:
tg.create_task(task_with_cleanup("task_1"))
tg.create_task(task_with_cleanup("task_2"))
except TimeoutError:
print("CancelledError propagated")

asyncio.run(main())
Solution
import asyncio

async def task_with_cleanup(name):
try:
print(f"{name} started")
await asyncio.sleep(10) # will be cancelled by timeout
finally:
print(f"{name} cleanup ran")

async def main():
try:
async with asyncio.timeout(0.02):
async with asyncio.TaskGroup() as tg:
tg.create_task(task_with_cleanup("task_1"))
tg.create_task(task_with_cleanup("task_2"))
except TimeoutError:
print("CancelledError propagated")

asyncio.run(main())

Cancellation mechanics:

  • asyncio.timeout fires and cancels the enclosing scope, which cancels the TaskGroup, which cancels all tasks.
  • Each task receives CancelledError at its await asyncio.sleep(10) point.
  • The finally block in each task runs, guaranteeing cleanup.
  • CancelledError is NOT caught by except Exception (it inherits from BaseException), so cleanup code must not accidentally suppress it.
  • After all tasks finish their cleanup, the TimeoutError propagates out of the asyncio.timeout context manager.
  • The finally pattern is how you implement resource release (close DB connections, file handles, network sockets) even when cancelled.
Expected Output
task_1 started\ntask_2 started\ntask_1 cleanup ran\ntask_2 cleanup ran\nCancelledError propagated
Hints

Hint 1: CancelledError is raised at await points inside a cancelled task. Use try/finally to guarantee cleanup.

Hint 2: Do NOT suppress CancelledError — re-raise it after cleanup. Suppressing it breaks the cancellation protocol.

#9Task Scoping — Preventing Task LeaksHard
task-leakasyncio.create_taskTaskGroupstructured-concurrency

Write a demonstration showing how bare asyncio.create_task() can leak tasks, and how TaskGroup prevents it.

import asyncio

async def background_task(label):
try:
await asyncio.sleep(0.1)
print(f"{label}: completed (potential leak)")
except asyncio.CancelledError:
print(f"{label}: was cancelled (no leak)")
raise

async def leaky_approach():
"""Tasks created here may outlive this coroutine."""
t = asyncio.create_task(background_task("leaky"))
await asyncio.sleep(0.01) # creator finishes early
# t is still running — leaked task
return t

async def safe_approach():
"""TaskGroup ensures all tasks complete (or are cancelled) before exit."""
async with asyncio.TaskGroup() as tg:
tg.create_task(background_task("safe"))
await asyncio.sleep(0.01) # still inside the group
# Here, background_task("safe") has finished

async def main():
# Leaky: task runs past its creator's lifetime
leaked_task = await leaky_approach()
await asyncio.sleep(0) # yield to let the task keep running
leaked_task.cancel()
await asyncio.sleep(0)

# Safe: TaskGroup waits for all tasks
await safe_approach()
print("TaskGroup: all tasks complete")

asyncio.run(main())
Solution
import asyncio

async def background_task(label):
try:
await asyncio.sleep(0.1)
print(f"{label}: completed (potential leak)")
except asyncio.CancelledError:
print(f"{label}: was cancelled (no leak)")
raise

async def leaky_approach():
t = asyncio.create_task(background_task("leaky"))
await asyncio.sleep(0.01)
return t # creator is done, task still running

async def safe_approach():
async with asyncio.TaskGroup() as tg:
tg.create_task(background_task("safe"))
await asyncio.sleep(0.01)

async def main():
leaked_task = await leaky_approach()
await asyncio.sleep(0)
leaked_task.cancel()
await asyncio.sleep(0.001)

await safe_approach()
print("TaskGroup: all tasks complete")

asyncio.run(main())

Task leak explained:

  • asyncio.create_task() attaches a task to the running event loop, not to the calling coroutine.
  • When leaky_approach() returns, the task continues running independently.
  • If the event loop shuts down while the task is running, you get warnings about "Task was destroyed but it is pending!"
  • TaskGroup solves this: the async with block acts as a scope boundary. You cannot exit the scope until all tasks inside it are done (or cancelled).
  • This is the core principle of structured concurrency: task lifetime is bounded by its creating scope.
Expected Output
Demonstrates that fire-and-forget tasks outlive their creator; TaskGroup prevents this
Hints

Hint 1: A task created with asyncio.create_task() outside a TaskGroup is attached to the event loop, not to any scope. It can outlive its creator coroutine.

Hint 2: Prove the leak by cancelling the parent and observing the orphan task still running.

#10Parallel Data Pipeline with Error RecoveryHard
TaskGrouppipelineerror-recoveryExceptionGroup

Build a parallel processing pipeline that processes a list of records, collects per-record errors without aborting the group, and reports a summary.

import asyncio

async def process_record(record_id, data):
await asyncio.sleep(0.001)
if data < 0:
raise ValueError(f"Invalid data {data} for record {record_id}")
return record_id * data

async def main():
records = [(1, 10), (2, -5), (3, 20), (4, -3), (5, 15)]
successes = []
errors = []

async def safe_process(rec_id, val):
try:
result = await process_record(rec_id, val)
successes.append((rec_id, result))
except ValueError as e:
errors.append((rec_id, str(e)))

async with asyncio.TaskGroup() as tg:
for rec_id, val in records:
tg.create_task(safe_process(rec_id, val))

print(f"Successes: {successes}")
print(f"Errors: {errors}")

asyncio.run(main())
Solution
import asyncio

async def process_record(record_id, data):
await asyncio.sleep(0.001)
if data < 0:
raise ValueError(f"Invalid data {data} for record {record_id}")
return record_id * data

async def main():
records = [(1, 10), (2, -5), (3, 20), (4, -3), (5, 15)]
successes = []
errors = []

async def safe_process(rec_id, val):
try:
result = await process_record(rec_id, val)
successes.append((rec_id, result))
except ValueError as e:
errors.append((rec_id, str(e)))

async with asyncio.TaskGroup() as tg:
for rec_id, val in records:
tg.create_task(safe_process(rec_id, val))

print(f"Successes: {successes}")
print(f"Errors: {errors}")

asyncio.run(main())

Error isolation pattern:

  • The safe_process wrapper catches per-record errors before they reach the TaskGroup, so the group itself never sees a failed task.
  • Shared lists (successes, errors) are safe here because the asyncio event loop is single-threaded — there is no concurrent mutation.
  • This pattern implements "partial success" semantics: process all records, collect all errors, report at the end.
  • Alternative: use a result queue (asyncio.Queue) instead of shared lists if you need streaming results.
  • For CPU-bound processing, replace asyncio.sleep with loop.run_in_executor() to avoid blocking the event loop.
Expected Output
See solution — processes records, collects errors, continues partial success
Hints

Hint 1: Use a shared list to collect successful results and a separate list for errors.

Hint 2: Wrap individual task logic in try/except so a single record failure does not fail the whole group.

#11Dynamic Task Spawning Inside a TaskGroupHard
TaskGroupdynamic-tasksrecursive-concurrencywork-discovery

Demonstrate that a task running inside a TaskGroup can spawn additional tasks into the same group, enabling dynamic work discovery (like a web crawler).

import asyncio
import random

async def crawl(url, depth, tg, results, visited):
if url in visited or depth == 0:
return
visited.add(url)
await asyncio.sleep(0.001) # simulate fetch
results.append(url)
# "discover" sub-URLs
sub_urls = [f"{url}/child_{i}" for i in range(2)]
for sub in sub_urls:
if sub not in visited:
tg.create_task(crawl(sub, depth - 1, tg, results, visited))

async def main():
results = []
visited = set()
async with asyncio.TaskGroup() as tg:
tg.create_task(crawl("root", depth=2, tg=tg, results=results, visited=visited))
print(f"Crawled {len(results)} pages")
print(sorted(results))

asyncio.run(main())
Solution
import asyncio

async def crawl(url, depth, tg, results, visited):
if url in visited or depth == 0:
return
visited.add(url)
await asyncio.sleep(0.001)
results.append(url)
sub_urls = [f"{url}/child_{i}" for i in range(2)]
for sub in sub_urls:
if sub not in visited:
tg.create_task(crawl(sub, depth - 1, tg=tg, results=results, visited=visited))

async def main():
results = []
visited = set()
async with asyncio.TaskGroup() as tg:
tg.create_task(crawl("root", depth=2, tg=tg, results=results, visited=visited))
print(f"Crawled {len(results)} pages")
print(sorted(results))

asyncio.run(main())

Dynamic task spawning:

  • TaskGroup tracks all tasks created via tg.create_task(), even if they are spawned from within running tasks.
  • The group does not exit until ALL tasks (including dynamically added ones) complete.
  • This is the structured concurrency solution to recursive fan-out: no tasks leak, no manual join needed.
  • Expected crawl: root (depth 2) spawns root/child_0 and root/child_1 (depth 1), each of which spawns 2 more (depth 0, no-ops). Total crawled = 3 pages.
  • Production crawler adds: semaphore for rate limiting, deduplication with a persistent store, error handling per URL, and depth-first vs breadth-first strategy.
Expected Output
See solution — tasks spawn subtasks dynamically; all complete before group exits
Hints

Hint 1: Tasks created inside a TaskGroup (via tg.create_task) from within a running task are tracked by the same group.

Hint 2: You can pass the tg reference into subtasks to allow recursive task spawning.

© 2026 EngineersOfAI. All rights reserved.