Last Updated on September 12, 2022
You can stop all tasks if one task fails with the ProcessPoolExecutor using a multiprocessing.Event.
In this tutorial you will discover how to stop all asynchronous tasks if one task fails in the process pool.
Let’s get started.
Need to Stop All Tasks if One Task Fails
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
Tasks executed in the process pool.
The most common pattern for task failure is for a target task function to raise an error or exception.
The exception is then caught by the process pool so that it does not affect the worker process.
The caught exception can then be accessed by calling the exception() function on the Future object or will be re-thrown when getting the result from the task via the result() function on the Future object.
When a task fails, we may wish to stop executing all tasks running in the process pool.
The process pool does not provide a facility to stop running tasks. Instead, it only provides a way to cancel tasks via their Future objects only for those tasks that are not yet running and are not done.
As such, how do we stop all running tasks when one task fails by raising an exception?
Run loops using all CPUs, download your FREE book to learn how.
How to Stop All Running Tasks If One Fails
The ProcessPoolExecutor does not provide a facility to stop running tasks.
Nevertheless, we can update our target task functions to stop running when a process-safe flag is set.
This can be implemented using multiprocessing.Event.
We cannot create Event objects directly and pass them around when working with processes, as we can when working with threads. Instead, we must use a multiprocessing.Manager to create the Event object for us.
Managers are used to create and manage shared state between processes, like data and variables.
You can learn more about Events and Managers here:
Managers have to be created and closed which can be achieved seamlessly using the context manager, for example:
1 2 3 4 |
... # create the manager used for sharing state between processes with Manager() as manager: # ... |
Next, we can use the Manager to create the event:
1 2 3 |
... # create an event to shut down all running tasks event = manager.Event() |
This Event can then be passed to each target task function as an argument to be checked within separate worker processes.
1 2 3 |
... # execute the task, passing the event future = executor.submit(work, event) |
Once the tasks are running, they can be stopped any time by setting the value of the even flag.
1 2 3 |
... # stop running tasks via the event event.set() |
Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
1 2 3 4 |
... # check the status of the flag if event.is_set(): return |
If your target task function has some resources open, it may need to clean up before returning.
This approach to stopping running tasks may require that you change the structure of your task so that you have a loop allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.
Now that we know how to stop all running tasks, we need to consider the trigger for stopping the tasks.
We want to stop all running tasks if any task raises an exception.
This can be achieved using the wait() module function.
This function takes a collection of Future objects and returns a tuple of two sets, one containing all done tasks and a second that contains all non-done tasks.
Recall that we get one Future object for each call to the submit() function when issuing tasks to the process pool to be executed asynchronously.
By default, the wait() function will return when all passed in tasks have completed. This is controlled via the “return_when” argument that is set to the ALL_COMPLETED constant. We can change the value to the FIRST_EXCEPTION constant which will return when any task raises an exception or when all tasks are completed.
1 2 3 |
... # wait until any task raises an exception or all tasks complete done, not_done = wait(futures, return_when=FIRST_EXCEPTION) |
Once the call to wait() returns, we can inspect the “done” set. If it contains one Future object and that Future object has an exception via a call to the exception() function that does not return None, then we can trigger all running tasks to stop.
1 2 3 4 5 |
... # check if a task raised an exception if len(done) == 1 and done.pop().exception != None: # stop all running tasks event.set() |
We may also want to cancel any tasks that have not yet started running.
1 2 3 4 |
... # cancel any tasks that are not yet running for future in futures: future.cancel() |
Now that we know how to stop all running tasks when one task fails, let’s look at a worked example.
Example of Stopping All Running Tasks When One Task Fails
Let’s develop an example in which a problem occurs in one task and triggers all running tasks to stop when using the ProcessPoolExecutor.
First, we must create the Manager and then create the Event that will be passed to all tasks executed in the process pool.
1 2 3 4 5 |
... # create a manager for sharing events with Manager() as manager: # create an event used to stop running tasks event = manager.Event() |
We can then pass this event to our target function when we submit the task to the pool for execution.
1 2 3 |
... # execute one task future = executor.submit(work, event, ...) |
Next, we can define a task function that does work in a loop including a sleep for one second to simulate a IO-bound task. The loop checks the status of the event each iteration and returns if the event is triggered.
Each task has a unique name and we use this to keep track of what is going on.
If the task has a name of 0 (the first task) then it will fail by raising an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# target task function def work(event, name): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if this task should fail if name == 0: print(f'Task has failed, name={name}') raise Exception('Something bad happened') # check if the task should stop if event.is_set(): print(f'Stopping, name={name}') return |
So far so good.
Next, we can start a process pool with ten worker processes and submit ten tasks to the process pool with names from 0 to 9.
1 2 3 4 5 |
... # create a process pool with ProcessPoolExecutor(10) as executor: # execute many tasks futures = [executor.submit(work, event, i) for i in range(10)] |
We can then wait for all tasks to complete or for any of the tasks to fail by raising an exception.
1 2 3 4 |
... # wait for all tasks to complete, or one task to fail print('Waiting for tasks to complete, or fail...') done, not_done = wait(futures, return_when=FIRST_EXCEPTION) |
When we finish waiting, we can check if all tasks completed are not yet done and if so, whether an exception occurred.
If an exception did occur in the first (and only) done Future, then we can cancel any scheduled tasks and trigger the event which will cause all running tasks to stop executing.
1 2 3 4 5 6 7 8 9 10 11 12 |
... # check if not all tasks are done if len(done) > 0 and len(done) != len(futures): # check if an exception was raised future = done.pop() if future.exception() != None: print(f'One task failed with: {future.exception()}, shutting down') # cancel any scheduled tasks for future in futures: future.cancel() # stop all running tasks event.set() |
And that’s it.
Tying this all together, the complete example of stopping all tasks if one task fails in the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# SuperFastPython.com # example of stopping all running tasks when one task fails from time import sleep from multiprocessing import Manager from multiprocessing import Event from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_EXCEPTION # target task function def work(event, name): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if this task should fail if name == 0: print(f'Task has failed, name={name}', flush=True) raise Exception('Something bad happened') # check if the task should stop if event.is_set(): print(f'Stopping, name={name}', flush=True) return # entry point if __name__ == '__main__': # create a manager for sharing events with Manager() as manager: # create an event used to stop running tasks event = manager.Event() # create a process pool with ProcessPoolExecutor(10) as executor: # execute many tasks futures = [executor.submit(work, event, i) for i in range(10)] # wait for all tasks to complete, or one task to fail print('Waiting for tasks to complete, or fail...') done, not_done = wait(futures, return_when=FIRST_EXCEPTION) # check if not all tasks are done if len(done) > 0 and len(done) != len(futures): # check if an exception was raised future = done.pop() if future.exception() != None: print(f'One task failed with: {future.exception()}, shutting down') # cancel any scheduled tasks for future in futures: future.cancel() # stop all running tasks event.set() print('All done.') |
Running the example first starts the process pool with ten worker processes then submits ten tasks into the pool and collects the Future objects.
We then wait for all tasks to complete or for one task to fail during execution.
A failure occurs in one task.
The failure triggers the wait() function to stop waiting. The results are interpreted and it is determined that one task raised an exception, therefore any scheduled tasks are cancelled and all running tasks are stopped.
Each task detects the request to stop via the event and stops its execution.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Waiting for tasks to complete, or fail... Task has failed, name=0 One task failed with: Something bad happened, shutting down Stopping, name=9 Stopping, name=2 Stopping, name=8 Stopping, name=5 Stopping, name=7 Stopping, name=6 Stopping, name=3 Stopping, name=4 Stopping, name=1 All done. |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to stop all tasks if one asynchronous task fails in the ProcessPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Johan Van Wambeke on Unsplash
Do you have any questions?