Last Updated on September 12, 2022
You can stop all tasks if one task fails with the ThreadPoolExecutor in Python by using a threading.Event and waiting for tasks with the wait() module function.
In this tutorial, you will discover how to stop all asynchronous tasks if one task fails in the thread pool.
Let’s get started.
Need to Stop All Tasks if One Task Fails
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
The most common pattern for task failure is for a target task function to raise an error or exception.
The exception is then caught by the thread pool so that it does not affect the worker thread.
The caught exception can then be accessed by calling the exception() function on the Future object or will be re-thrown when getting the result from the task via the result() function on the Future object.
When a task fails, we may wish to stop executing all tasks running in the thread pool.
The thread pool does not provide a facility to stop running tasks. Instead, it only provides a way to cancel tasks via their Future objects, and only for those tasks that are not yet running and are not done.
As such, how do we stop all running tasks when one task fails by raising an exception?
Run loops using all CPUs, download your FREE book to learn how.
How to Stop All Running Tasks If One Task Fails
The ThreadPoolExecutor does not provide a facility to stop running tasks.
Nevertheless, we can update our target task functions to stop running when a thread-safe flag is set.
This can be implemented using threading.Event.
First, it requires that you first create a threading.Event to control when running tasks should stop.
You can learn more about the threading.Event class here:
1 2 3 |
... # create an event to shut down all running tasks event = Event() |
This event can then be passed to each target task function as an argument.
1 2 3 |
... # execute the task, passing the event future = executor.submit(work, event) |
Once the tasks are running, they can be stopped at any time by setting the value of the Event flag.
1 2 3 |
... # stop running tasks via the event event.set() |
Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
1 2 3 4 |
... # check the status of the flag if event.is_set(): return |
If your target task function has some resources open, it may need to clean up before returning.
This approach to stopping running tasks may require that you change the structure of your task so that you have a loop structure, allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop, you can check the status of the flag.
Now that we know how to stop all running tasks, we need to consider the trigger for stopping the tasks.
We want to stop all running tasks if any task raises an exception.
This can be achieved using the wait() module function.
This function takes a collection of Future objects and returns a tuple of two sets, one containing all done tasks and a second that contains all non-done tasks.
Recall that we get one Future object for each call to the submit() function when submitting tasks to the thread pool to be executed asynchronously.
By default, the wait() function will return when all passed in tasks have completed. This is controlled via the return_when argument that is set to the ALL_COMPLETED constant. We can change the value to the FIRST_EXCEPTION constant, which will return when any task raises an exception or when all tasks are completed.
1 2 3 |
... # wait until any task raises an exception or all tasks complete done, not_done = wait(futures, return_when=FIRST_EXCEPTION) |
Once the call to wait() returns, we can inspect the done set. If it contains one Future object and that Future object has an exception via a call to the exception() function that does not return None, then we can trigger all running tasks to stop.
1 2 3 4 5 |
... # check if a task raised an exception if len(done) == 1 and done.pop().exception(): # stop all running tasks event.set() |
We may also want to cancel any tasks that have not yet started running.
1 2 3 4 |
... # cancel any tasks that are not yet running for future in futures: future.cancel() |
Now that we know how to stop all running tasks when one task fails, let’s look at a worked example.
Example of Stopping All Running Tasks When One Task Fails
Let’s develop an example in which a problem occurs in one task and triggers all running tasks to stop when using the ThreadPoolExecutor.
First, we must create the event that will be passed to all tasks executed in the thread pool.
1 2 3 |
... # create an event used to stop running tasks event = Event() |
We can then pass this event to our target function when we submit the task to the pool for execution.
1 2 3 |
... # execute one task future = executor.submit(work, event, ...) |
Next, we can define a task function that does work in a loop, including a sleep for one second to simulate a IO-bound task. The loop checks the status of the event each iteration and returns if the event is triggered.
Each task has a unique name and we use this to keep track of what is going on.
If the task has a name of 0 (the first task), then it will fail by raising an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# mock target task function def work(event, name): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if this task should fail if name == 0: print(f'Task has failed, name={name}') raise Exception('Something bad happened') # check if the task should stop if event.is_set(): print(f'Stopping, name={name}') return |
So far, so good.
Next, we can start a thread pool with ten worker threads and submit ten tasks to the thread pool with names from 0 to 9.
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor(10) as executor: # execute many tasks futures = [executor.submit(work, event, i) for i in range(10)] |
We can then wait for all tasks to complete or for any of the tasks to fail by raising an exception.
1 2 3 4 |
... # wait for all tasks to complete, or one task to fail print('Waiting for tasks to complete, or fail...') done, not_done = wait(futures, return_when=FIRST_EXCEPTION) |
When we finish waiting, we can check if all tasks completed are not yet done, and if so, whether an exception occurred.
If an exception did occur in the first (and only) done Future, then we can cancel any scheduled tasks and trigger the event, which will cause all running tasks to stop executing.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # check if not all tasks are done if len(done) > 0 and len(done) != len(futures): # check if an exception was raised future = done.pop() if future.exception() != None: print(f'One task failed with: {future.exception()}, shutting down') # cancel any scheduled tasks for future in futures: future.cancel() # stop all running tasks event.set() |
And that’s it.
Tying this all together, the complete example of stopping all tasks if one task fails in the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# SuperFastPython.com # example of stopping all running tasks when one task fails from time import sleep from threading import Event from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_EXCEPTION # mock target task function def work(event, name): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if this task should fail if name == 0: print(f'Task has failed, name={name}') raise Exception('Something bad happened') # check if the task should stop if event.is_set(): print(f'Stopping, name={name}') return # create an event used to stop running tasks event = Event() # create a thread pool with ThreadPoolExecutor(10) as executor: # execute many tasks futures = [executor.submit(work, event, i) for i in range(10)] # wait for all tasks to complete, or one task to fail print('Waiting for tasks to complete, or fail...') done, not_done = wait(futures, return_when=FIRST_EXCEPTION) # check if not all tasks are done if len(done) > 0 and len(done) != len(futures): # check if an exception was raised future = done.pop() if future.exception() != None: print(f'One task failed with: {future.exception()}, shutting down') # cancel any scheduled tasks for future in futures: future.cancel() # stop all running tasks event.set() print('All done.') |
Running the example first starts the thread pool with ten worker threads, then submits ten tasks into the pool and collects the Future objects.
We then wait for all tasks to complete or for one task to fail during execution.
A failure occurs in one task.
The failure triggers the wait() function to stop waiting. The results are interpreted and it is determined that one task raised an exception, therefore any scheduled tasks are cancelled and all running tasks are stopped.
Each task detects the request to stop via the event and stops its execution.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Waiting for tasks to complete, or fail... Task has failed, name=0 One task failed with: Something bad happened, shutting down Stopping, name=9 Stopping, name=2 Stopping, name=8 Stopping, name=5 Stopping, name=7 Stopping, name=6 Stopping, name=3 Stopping, name=4 Stopping, name=1 All done. |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to stop all tasks if one asynchronous task fails in the ThreadPoolExecutor.
Do you have any questions about how to stop all tasks if one fails?
Ask your question in the comments below and I will do my best to answer.
Do you have any questions?