We can cancel all asyncio tasks if one task fails.
It is common to group similar tasks together, then execute the group of tasks concurrently and wait for them to complete. If one task in the group fails with an unhandled exception, we can then cancel all other tasks in the group.
This can be achieved in many ways, although it is straightforward when using the asyncio.TaskGroup structure, and the asyncio.gather() and asyncio.wait() functions.
In this tutorial, you will discover how to cancel all tasks in asyncio if one task fails.
Specifically, you will learn:
- About the requirement to cancel all tasks if one task fails with an unhandled exception.
- How to cancel all tasks if one task fails with asyncio.gather() and asyncio.wait().
- How to automatically cancel all tasks if one task fails with the asyncio.TaskGroup.
Let’s get started.
Need to Cancel All Tasks If One Task Fails
We typically group tasks in asyncio.
For example, we have a list of items and issue one task for each to complete concurrently.
When working with a group of tasks, one task may fail and we need to cancel all remaining tasks.
When we say that a task “fails” we mean that it raises an exception that propagates up and causes the task to be “done”.
You can learn more about the asyncio task life-cycle in the tutorial:
We may need to cancel all tasks in a group for many reasons, such as:
- Preventing Resource Leakage: When one task in a group encounters an error or fails, there might be associated resources allocated or ongoing processes. Canceling all tasks ensures that these resources are released and prevents any potential resource leakage, maintaining system efficiency.
- Ensuring Consistent State: In scenarios where the tasks in a group are interdependent or contribute to a larger process, allowing one task to fail while others continue could lead to an inconsistent or invalid state. Canceling all tasks ensures the application remains in a consistent state by preventing further execution that might rely on the failed task.
- Error Propagation and Cleanup: By canceling all tasks, especially in cases where the failure might impact subsequent tasks or the overall application flow, it allows for appropriate error handling and cleanup. This ensures that any necessary cleanup actions or error-handling procedures are uniformly applied across all tasks in response to the failure.
Canceling all remaining tasks if one task fails is a common feature in many real-world scenarios.
For example:
Batch Processing: Imagine a scenario where a series of tasks represents batch jobs or data processing tasks. If one task fails due to invalid data or an error, canceling all other tasks prevents further processing, ensuring data consistency and integrity.
Parallel Requests: In applications dealing with concurrent HTTP requests or multiple API calls, if one request fails due to a server error or connectivity issue, canceling other pending requests prevents overloading the system or creating a cascade of subsequent failures.
Data Pipelines: Data processing pipelines or ETL (Extract, Transform, Load) processes often involve a sequence of tasks. Canceling all tasks if one fails prevents incomplete or inconsistent data from being propagated further down the pipeline, maintaining data quality.
Distributed Systems: In distributed systems where tasks may span across different nodes or services, canceling all tasks can prevent unnecessary resource consumption or avoid inconsistent states when a component failure occurs.
Service Orchestration: In microservices architectures where one service’s failure might impact others, canceling downstream tasks upon a failure ensures the system’s stability and avoids unnecessary processing.
In asyncio, how can we cancel all tasks if one task in a group fails?
What approaches are available?
Run loops using all CPUs, download your FREE book to learn how.
How to Cancel All Tasks if One Task Fails
There are 3 ways that we can cancel all tasks in a group of tasks if one task in the group fails.
They are:
- Use asyncio.gather()
- Use asyncio.wait()
- Use TaskGroup
We will take a closer look at each in turn.
There are other approaches we could use, for example:
- Exit the main coroutine, it will cause all other running tasks to be canceled.
- Monitor all tasks in a loop and cancel all if one fails with an exception.
- Set an event if one task fails and have all other tasks check on the event in a loop.
These less direct approaches are left as an exercise.
Cancel All Tasks If One Task Fails with asyncio.gather()
We can cancel all tasks if one task fails using the asyncio.gather() function.
Recall that the asyncio.gather() function takes one or more asyncio tasks, suspends until all tasks are done, and then returns an iterable of return values for all tasks.
For example:
1 2 3 |
... # execute a group of tasks concurrently results = await asyncio.gather(task1, task2, task3) |
When used with a list of tasks, the list must be unpacked first using the star (*) operator, for example:
1 2 3 |
... # execute a group of tasks concurrently results = await asyncio.gather(*tasks) |
You can learn more about the asyncio.gather() function in the tutorial:
We can use asyncio.gather() to cancel all tasks if one task fails.
This can be achieved by wrapping the call to asyncio.gather() in a try-except structure.
For example:
1 2 3 4 5 |
try: # execute a group of tasks concurrently results = await asyncio.gather(*tasks) except Exception: ... |
If one task fails in the group, the exception will propagate up and cancel the task of awaiting the group.
We can catch and handle that exception.
Although the group task has terminated, the tasks in the group have not terminated.
We can iterate over all tasks and cancel each in turn.
1 2 3 4 |
... # cancel all tasks for task in tasks: task.cancel() |
We can then wait for all canceled tasks to cancel, using the “cancel and await” idiom.
For example:
1 2 3 4 5 6 7 8 9 |
... # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass |
The whole asyncio.gather() approach would look as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
... # handle the case that a task fails try: # wait for the group of tasks to complete await asyncio.gather(*tasks) except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel all tasks for task in tasks: task.cancel() # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass |
You can learn more about how to cancel all tasks if task in a gather fails in the tutorial:
Cancel All Tasks If One Task Fails with asyncio.wait()
We can cancel all tasks if one task fails using the asyncio.wait() function.
Recall that asyncio.wait() takes a collection of asyncio tasks and will suspend until a condition is met, such as all tasks are done.
For example:
1 2 3 |
... # wait for all tasks to be done _ = await asyncio.wait(tasks) |
You can learn more about the asyncio.wait() function in the tutorial:
We can configure the asyncio.wait() function to wait for the first task to fail.
This can be achieved by setting the “return_when” argument to asyncio.FIRST_EXCEPTION.
For example:
1 2 3 |
... # wait for all tasks failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) |
If at least one task fails, it will be returned in the “failed” set, whereas any tasks still running will be in the “rest” set.
If no tasks fail, then the “failed” set will have all tasks and the “rest” set will be empty.
Therefore, when asyncio.wait() returns, we can check if we have any tasks still running in the “rest” set and if so, cancel them all, then wait for them to be done.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
... # check for at least one failure if len(rest) > 0: print(f'{len(failed)} task(s) failed, cancelling the rest') # cancel all tasks for task in rest: task.cancel() # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass |
Cancel All Tasks If One Task Fails with TaskGroup
We can cancel all tasks if one task fails in asyncio using an asyncio.TaskGroup.
Recall that the asyncio.TaskGroup can be used to create and manage a group of tasks in asyncio.
It is used via the context manager interface and can be used to wait for all tasks to complete.
For example:
1 2 3 4 5 6 |
... # create task group async with asyncio.TaskGroup() as group: # create many coroutines tasks = [group.create_task(task_coro(i)) for i in range(10)] # wait for tasks... |
You can learn more about the asyncio.TaskGroup in the tutorial:
Helpfully, the asyncio.TaskGroup will automatically cancel all tasks in the group if one task fails with an unhandled exception.
This means that we can wrap the usage of the asyncio.TaskGroup in a try-except structure and be confident that all tasks in the group will be canceled on failure.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
... try: # create task group async with asyncio.TaskGroup() as group: # create many coroutines tasks = [group.create_task(task_coro(i)) for i in range(10)] # wait for tasks... except Exception as e: # report a message print('At least one task failed, rest were cancelled') # confirm no other running tasks print(f'There are only {len(asyncio.all_tasks())} task(s) currently running') |
Now that we know how to cancel all tasks if one task fails, let’s look at some worked examples.
Example of Canceling All Tasks If One Task Fails With asyncio.gather()
We can explore an example of how to cancel all tasks if one task fails using asyncio.gather().
In this example, we can define a task that conditionally fails. We will then create and schedule 10 versions of the task and wait for them to complete concurrently using asyncio.gather(). If one task fails, we will handle the exception and cancel all tasks in the group.
Firstly, we can define the task that conditionally fails.
The task takes an integer task identifier as an argument. It reports its argument value, sleeps for one second, and if the argument value is equal to 5, then an exception is raised. Otherwise, the task sleeps again for one second and reports a final message.
The task_coro() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') |
Next, we can define the main coroutine.
Firstly, we will create and schedule 10 tasks with arguments from 0 to 9.
1 2 3 |
... # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] |
We then await all tasks in the group using asyncio.gather() and cancel all tasks if one task fails with an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
... # handle the case that a task fails try: # wait for the group of tasks to complete await asyncio.gather(*tasks) except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel all tasks for task in tasks: task.cancel() # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass |
Tying this together, the complete main() coroutine is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# coroutine used for the entry point async def main(): # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] # handle the case that a task fails try: # wait for the group of tasks to complete await asyncio.gather(*tasks) except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel all tasks for task in tasks: task.cancel() # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass |
Finally, we can start the asyncio event loop and run our main() coroutine.
1 2 3 |
... # start the asyncio program asyncio.run(main()) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# SuperFastPython.com # example of canceling all tasks if one task fails with gather() import asyncio # coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') # coroutine used for the entry point async def main(): # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] # handle the case that a task fails try: # wait for the group of tasks to complete await asyncio.gather(*tasks) except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel all tasks for task in tasks: task.cancel() # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass # start the asyncio program asyncio.run(main()) |
Running the example starts the asyncio event loop and runs our main() coroutine.
The main() coroutine runs and creates and schedules 10 tasks to run concurrently, with arguments from 0 to 9.
The main() coroutine then suspends and awaits all tasks via the asyncio.gather() function.
Each task runs, reporting a message and suspending with a sleep. The tasks then resume. The task with an argument of 5 then fails, other tasks suspend with an additional sleep.
The exception propagates up and terminates the task. It then propagates back to the main() coroutine where it is caught.
A message is reported and all tasks in the group are canceled.
The main() coroutine then awaits all tasks in the group, careful to handle any exceptions and CancelledError that may be re-raised.
This highlights how we can cancel all tasks in the group if one task fails using asyncio.gather().
1 2 3 4 5 6 7 8 9 10 11 12 |
>task 0 executing >task 1 executing >task 2 executing >task 3 executing >task 4 executing >task 5 executing >task 6 executing >task 7 executing >task 8 executing >task 9 executing >task 5 failing A task failed with: Something bad happened, canceling all tasks |
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of Canceling All Tasks If One Task Fails With asyncio.wait()
We can explore an example of how to cancel all tasks if one task fails using asyncio.wait().
In this case, we can update the above example to use asyncio.wait() to suspend the main() coroutine and wait for all tasks to complete, or for at least one task to fail with an exception.
1 2 3 |
... # wait for all tasks failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) |
We can then check if asyncio.wait() returns early before all tasks are completed, and if so cancel all tasks and wait for the tasks to be done.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
... # check for at least one failure if len(rest) > 0: print(f'{len(failed)} task(s) failed, cancelling the rest') # cancel all tasks for task in rest: task.cancel() # wait for all tasks to cancel for task in rest: try: await task except asyncio.CancelledError: pass else: print('No tasks failed') |
Tying this together, the updated main() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# coroutine used for the entry point async def main(): # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] # wait for all tasks failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) # check for at least one failure if len(rest) > 0: print(f'{len(failed)} task(s) failed, cancelling the rest') # cancel all tasks for task in rest: task.cancel() # wait for all tasks to cancel for task in rest: try: await task except asyncio.CancelledError: pass else: print('No tasks failed') |
Tying all of this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# SuperFastPython.com # example of canceling all tasks if one task fails with wait() import asyncio # coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') # coroutine used for the entry point async def main(): # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] # wait for all tasks failed, rest = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) # check for at least one failure if len(rest) > 0: print(f'{len(failed)} task(s) failed, cancelling the rest') # cancel all tasks for task in rest: task.cancel() # wait for all tasks to cancel for task in tasks: try: await task except Exception: pass except asyncio.CancelledError: pass else: print('No tasks failed') # start the asyncio program asyncio.run(main()) |
Running the example starts the asyncio event loop and runs our main() coroutine.
The main() coroutine runs and creates and schedules 10 tasks to run concurrently, with arguments from 0 to 9.
The main() coroutine then suspends and awaits all tasks via the asyncio.wait() function.
Each task runs, reporting a message and suspending with a sleep. The tasks then resume. The task with an argument of 5 then fails, other tasks suspend with an additional sleep.
The exception propagates up and terminates the task. It then propagates back to the asyncio.wait() coroutine that returns one set with the failed task and another set with all tasks that are still running.
The main() coroutine resumes and checks if there are any tasks in the “rest” set. There are, so the message is reported and all tasks in the group are canceled.
The main() coroutine then awaits all tasks in the group, careful to handle any exceptions and CancelledError that may be re-raised.
This highlights how we can cancel all tasks in the group if one task fails using asyncio.wait().
1 2 3 4 5 6 7 8 9 10 11 12 |
>task 0 executing >task 1 executing >task 2 executing >task 3 executing >task 4 executing >task 5 executing >task 6 executing >task 7 executing >task 8 executing >task 9 executing >task 5 failing 1 task(s) failed, cancelling the rest |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Canceling All Tasks If One Task Fails With TaskGroup
We can explore an example of how to cancel all tasks if one task fails using asyncio.TaskGroup).
In this case, we can update the above example to create and use the asyncio.TaskGroup to create all tasks.
This can then be wrapped in a try-except structure to handle exceptions raised by tasks in the group.
The updated main() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# coroutine used for the entry point async def main(): try: # create task group async with asyncio.TaskGroup() as group: # create many coroutines tasks = [group.create_task(task_coro(i)) for i in range(10)] # wait for tasks... except Exception as e: # report a message print('At least one task failed, rest were cancelled') # confirm no other running tasks print(f'There are only {len(asyncio.all_tasks())} task(s) currently running') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of canceling all tasks if one task fails with TaskGroup import asyncio # coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') # coroutine used for the entry point async def main(): try: # create task group async with asyncio.TaskGroup() as group: # create many coroutines tasks = [group.create_task(task_coro(i)) for i in range(10)] # wait for tasks... except Exception as e: # report a message print('At least one task failed, rest were cancelled') # confirm no other running tasks print(f'There are only {len(asyncio.all_tasks())} task(s) currently running') # start the asyncio program asyncio.run(main()) |
Running the example starts the asyncio event loop and runs our main() coroutine.
The main() coroutine runs and creates a TaskGroup, then uses it to create and schedule 10 tasks to run concurrently, with arguments from 0 to 9.
The exit method of the TaskGroup runs and suspends, awaiting all tasks in the group.
Each task runs, reporting a message and suspending with a sleep. The tasks then resume. The task with an argument of 5 then fails, other tasks suspend with an additional sleep.
The exception propagates up and terminates the task. It then propagates back to the TaskGroup which cancels all tasks in the group.
The exception is handled and messages are reported, showing that only one task remains running, the current task.
This highlights how we can cancel all tasks in the group if one task fails using TaskGroup.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
>task 0 executing >task 1 executing >task 2 executing >task 3 executing >task 4 executing >task 5 executing >task 6 executing >task 7 executing >task 8 executing >task 9 executing >task 5 failing At least one task failed, rest were cancelled There are only 1 task(s) currently running |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to cancel all tasks in asyncio if one task fails.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?