You can cancel all tasks when one task fails when using asyncio.gather() by manually traversing the list of tasks can cancel them manually.
This is because we cannot call the cancel() method on the Future object returned from the gather() function when a task fails with an exception because, by the time the exception is propagated to the caller, the future is already done.
In this tutorial, you will discover how to cancel all asyncio tasks in a gather() group if one task fails.
Let’s get started.
What is Asyncio gather()
The asyncio.gather() module function allows the caller to group multiple awaitables together.
Once grouped, the awaitables can be executed concurrently, awaited, and canceled.
Run awaitable objects in the aws sequence concurrently.
— Coroutines and Tasks
It is a helpful utility function for both grouping and executing multiple coroutines or multiple tasks.
For example:
1 2 3 |
... # run a collection of awaitables results = await asyncio.gather(coro1(), asyncio.create_task(coro2())) |
You can learn more about how to use the gather() function in the tutorial:
The gather() function does not block.
Instead, it returns an asyncio.Future object that represents the group of awaitables.
For example:
1 2 3 |
... # get a future that represents multiple awaitables group = asyncio.gather(coro1(), coro2()) |
We can use the Future object returned from a call to gather() to cancel all tasks in the group given some condition, such as one task in the group fails.
Run loops using all CPUs, download your FREE book to learn how.
How to Cancel All Tasks if One Task Fails (the wrong way)
The gather() function allows us to group many tasks into one task.
If one task fails with an unhandled exception, we may wish to cancel all remaining tasks.
The Future returned from the gather() function supports this capability.
Specifically, calling cancel() on the returned Future will be propagated to all tasks in the group.
For example:
1 2 3 |
... # cancel the group, cancels all tasks in the group as well group.cancel() |
It even says so in the documentation.
For example:
If gather() is cancelled, all submitted awaitables (that have not completed yet) are also cancelled.
— Coroutines and Tasks
Therefore if a task in the group fails with an unhandled exception, we can cancel all tasks.
Recall that if a task in the group fails with an unhandled exception, the exception is propagated back to the caller that is awaiting the group.
Therefore we can await the Future returned from the call to gather() and wrap the call with a try-except block. If an exception is raised, we can call the cancel() method on the group.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
... # execute all coroutines as a group group = asyncio.gather(*coros) # handle the case that a task fails try: # wait for the group of tasks to complete await group except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel the group group.cancel() |
Seems logical.
Let’s try it out.
Example of Canceling All Tasks on Failure (the wrong way)
The example below defines a task coroutine that reports a message, sleeps for a moment, then sleeps for a moment again. One out of the group will fail part way through by raising an exception. All other tasks are expected to be completed normally.
We will catch the exception raised by any task in the group and cancel all tasks.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of canceling all tasks if one task fails (the wrong way) import asyncio # coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') # coroutine used for the entry point async def main(): # create many coroutines coros = [task_coro(i) for i in range(10)] # execute all coroutines as a group group = asyncio.gather(*coros) # handle the case that a task fails try: # wait for the group of tasks to complete await group except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel the group group.cancel() # wait for all tasks to be canceled await asyncio.sleep(2) # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine and uses it to start the asyncio program.
The main() coroutine then creates a list of 10 coroutines, each passing a separate integer value from 0 to 9.
The list of coroutines is then passed to the gather() module function, unpacked into separate expressions using the star-operator, a requirement of the gather() function.
This immediately schedules all of the coroutines as tasks in the asyncio event loop and returns a Future object. The tasks have not yet started running because the main() coroutine has not yet been suspended.
The main() coroutine then awaits the group Future object within a try-except expression. This suspends the main() coroutine and allows the tasks to execute.
Each task reports its message in turn and sleeps.
The tasks awake and then sleep again. One task, the one that receives a value of “5” as an argument, raises an exception.
The exception is propagated back to the main() coroutine.
The main() coroutine resumes and handles the exception. It reports a message and cancels the group Future object.
The expectation is that this will cancel all tasks within the group.
This does not happen.
All tasks in the group continue to be executed and completes normally.
Huh?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
>task 0 executing >task 1 executing >task 2 executing >task 3 executing >task 4 executing >task 5 executing >task 6 executing >task 7 executing >task 8 executing >task 9 executing >task 5 failing A task failed with: Something bad happened, canceling all tasks >task 0 done >task 1 done >task 2 done >task 3 done >task 4 done >task 6 done >task 7 done >task 8 done >task 9 done |
Let’s take a look at why it didn’t work.
Why Didn’t It Work
Our approach was logical, at least from a high level.
One key point we overlooked, and that almost everyone overlooks, is the life-cycle of Task and Future objects.
You can learn more about the life cycle of Task (and Future) objects in the tutorial:
When a Task or Future object raises an exception it is transitioned from the “running” state into a “done” state.
By the time the group Future object in our example re-raises the exception in the main() coroutine, our group Future object is “done“.
We then call the cancel() method on the Future object. But this does nothing because the Future is done.
And in turn, the cancel operation is not propagated to all tasks in the group and we do not achieve the desired effect.
We can see this if we report the done status of the group Future object in the except block.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 |
... # handle the case that a task fails try: # wait for the group of tasks to complete await group except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # check if the group future is done print(f'group future done: {group.done()}') # cancel the group group.cancel() |
Running the example reports that indeed, as we expected, the group Future object is done.
We cannot cancel a Future after it is done.
1 2 |
... group future done: True |
In fact, this specific case is mentioned in the API documentation, although it is easy to skim past.
For example:
If return_exceptions is False, cancelling gather() after it has been marked done won’t cancel any submitted awaitables. For instance, gather can be marked done after propagating an exception to the caller, therefore, calling gather.cancel() after catching an exception (raised by one of the awaitables) from gather won’t cancel any other awaitables.
— Coroutines and Tasks
Next, let’s look at how we might achieve the desired effect.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
How to Cancel All Tasks if One Task Fails (the right way)
There are probably better ways to cancel all tasks if one task fails other than using the gather() function.
Nevertheless, it is a common approach and the API can make it seem an obvious approach and misleading at the same time.
Therefore, it is a good practice to see how we might achieve the desired result.
The solution involves simply canceling all tasks in the group manually.
There are a few ways we can achieve this.
One way is to create the Task objects in a list, pass them to gather, then cancel the tasks if one fails.
We can create a list of Tasks instead of a list of coroutines as follows:
1 2 3 |
... # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] |
When a task fails, the exception is propagated back to the main() coroutine that can catch the exception and manually traverse the list of tasks and cancel each in turn.
For example:
1 2 3 4 |
... # cancel all tasks for task in tasks: task.cancel() |
Perhaps we would prefer to stick with using coroutines instead of tasks.
In this case, we cancel all running tasks by first getting a list of all tasks running in the event loop, removing the current task from this list, then canceling all remaining tasks.
For example:
1 2 3 4 5 6 7 8 9 10 |
... # get all running tasks tasks = asyncio.all_tasks() # get the current task current = asyncio.current_task() # remove current task from all tasks tasks.remove(current) # cancel all remaining running tasks for task in tasks: task.cancel() |
A downside of this approach is that we may need to differentiate between running tasks if there are other tasks running at the same time than those in the call to gather().
This could be achieved using task names or similar.
For more on task names, see the tutorial:
Now that we know the right way to cancel all tasks if one task fails when using asyncio.gather(), let’s look at some worked examples.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Canceling All Tasks on Failure (Tasks)
We can explore how to cancel all tasks if one task fails when using gather().
In this example, we will create a list of Task objects to pass to gather(), then directly cancel the Task objects if one fails.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of canceling all tasks if one task fails import asyncio # coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') # coroutine used for the entry point async def main(): # create many coroutines tasks = [asyncio.create_task(task_coro(i)) for i in range(10)] # execute all coroutines as a group group = asyncio.gather(*tasks) # handle the case that a task fails try: # wait for the group of tasks to complete await group except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel all tasks for task in tasks: task.cancel() # wait a while await asyncio.sleep(2) # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine and uses it to start the asyncio program.
The main() coroutine then creates a list of 10 Task objects, each passing a separate integer value from 0 to 9.
The task objects are scheduled in the asyncio event loop as part of being created.
The list of tasks is then passed to the gather() module function, unpacked into separate expressions using the star-operator, a requirement of the gather() function.
This returns a Future object that represents the group of awaitables.
The main() coroutine then awaits the group Future object within a try-except expression. This suspends the main() coroutine and allows the tasks to execute.
Each task reports its message in turn and sleeps.
The tasks awake and then sleep again. One task, the one that received a value of “5” as an argument, raises an exception.
The exception is propagated back to the main() coroutine.
The main() coroutine resumes and handles the exception. It reports a message and then manually cancels the group Future objects.
This has the desired effect and a CancelledError exception is raised in each Task, causing them to terminate.
The main() coroutine suspends for a moment to allow all canceled tasks to respond to their request for cancellation.
We achieved the desired effect of canceling all tasks if one task failed when using gather().
1 2 3 4 5 6 7 8 9 10 11 12 |
>task 0 executing >task 1 executing >task 2 executing >task 3 executing >task 4 executing >task 5 executing >task 6 executing >task 7 executing >task 8 executing >task 9 executing >task 5 failing A task failed with: Something bad happened, canceling all tasks |
Next, let’s look at using the same solution although with coroutines instead of Task objects.
Example of Canceling All Tasks on Failure (Coroutines)
We can explore how to cancel all tasks if one task fails when using gather() with coroutines.
In this example, we will create a list of coroutines to pass to gather() as per normal. Then if a task fails, we will get a list of all running tasks, remove the currently running task, and then cancel all remaining tasks in the list.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of canceling all tasks if one task fails import asyncio # cancel all tasks except the current task def cancel_all_tasks(): # get all running tasks tasks = asyncio.all_tasks() # get the current task current = asyncio.current_task() # remove current task from all tasks tasks.remove(current) # cancel all remaining running tasks for task in tasks: task.cancel() # coroutine used for a task async def task_coro(value): # report a message print(f'>task {value} executing') # sleep for a moment await asyncio.sleep(1) # check if this task should fail if value == 5: print(f'>task {value} failing') raise Exception('Something bad happened') # otherwise, block again await asyncio.sleep(1) print(f'>task {value} done') # coroutine used for the entry point async def main(): # create many coroutines coros = [task_coro(i) for i in range(10)] # execute all coroutines as a group group = asyncio.gather(*coros) # handle the case that a task fails try: # wait for the group of tasks to complete await group except Exception as e: # report failure print(f'A task failed with: {e}, canceling all tasks') # cancel all tasks cancel_all_tasks() # wait a while await asyncio.sleep(2) # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine and uses it to start the asyncio program.
The main() coroutine then creates a list of 10 Task objects, each passing a separate integer value from 0 to 9.
The task objects are scheduled in the asyncio event loop as part of being created.
The list of tasks are then passed to the gather() module function, unpacked into separate expressions using the star-operator, a requirement of the gather() function.
This returns a Future object that represents the group of awaitables.
The main() coroutine then awaits the group Future object within a try-except expression. This suspends the main() coroutine and allows the tasks to execute.
Each task reports its message in turn and sleeps.
The tasks awake and then sleep again. One task, the one that received a value of “5” as an argument, raises an exception.
The exception is propagated back to the main() coroutine.
The main() coroutine resumes and handles the exception. It reports a message and then manually cancels each running task.
This is achieved using a custom function that first gets a list of all running tasks, gets the Task instance for the current task, removes the current task from the list, then cancels all those tasks that remain in the list.
This has the desired effect and a CancelledError exception is raised in each Task, causing them to terminate.
The main() coroutine suspends for a moment to allow all canceled tasks to respond to their request for cancellation.
This highlights an alternate, although perhaps less robust, approach to canceling all tasks when one task fails when using the gather() function.
1 2 3 4 5 6 7 8 9 10 11 12 |
>task 0 executing >task 1 executing >task 2 executing >task 3 executing >task 4 executing >task 5 executing >task 6 executing >task 7 executing >task 8 executing >task 9 executing >task 5 failing A task failed with: Something bad happened, canceling all tasks |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to cancel all asyncio tasks in a gather() group if one task fails.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Brad Pearson on Unsplash
Do you have any questions?