Stop All Tasks if One Fails with the ProcessPoolExecutor in Python
You can stop all tasks if one task fails with the ProcessPoolExecutor using a multiprocessing.Event.
In this tutorial you will discover how to stop all asynchronous tasks if one task fails in the process pool.
Let's get started.
Need to Stop All Tasks if One Task Fails
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
Tasks executed in the process pool.
The most common pattern for task failure is for a target task function to raise an error or exception.
The exception is then caught by the process pool so that it does not affect the worker process.
The caught exception can then be accessed by calling the exception() function on the Future object or will be re-thrown when getting the result from the task via the result() function on the Future object.
When a task fails, we may wish to stop executing all tasks running in the process pool.
The process pool does not provide a facility to stop running tasks. Instead, it only provides a way to cancel tasks via their Future objects only for those tasks that are not yet running and are not done.
As such, how do we stop all running tasks when one task fails by raising an exception?
How to Stop All Running Tasks If One Fails
The ProcessPoolExecutor does not provide a facility to stop running tasks.
Nevertheless, we can update our target task functions to stop running when a process-safe flag is set.
This can be implemented using multiprocessing.Event.
We cannot create Event objects directly and pass them around when working with processes, as we can when working with threads. Instead, we must use a multiprocessing.Manager to create the Event object for us.
Managers are used to create and manage shared state between processes, like data and variables.
You can learn more about Events and Managers here:
Managers have to be created and closed which can be achieved seamlessly using the context manager, for example:
...
# create the manager used for sharing state between processes
with Manager() as manager:
# ...
Next, we can use the Manager to create the event:
...
# create an event to shut down all running tasks
event = manager.Event()
This Event can then be passed to each target task function as an argument to be checked within separate worker processes.
...
# execute the task, passing the event
future = executor.submit(work, event)
Once the tasks are running, they can be stopped any time by setting the value of the even flag.
...
# stop running tasks via the event
event.set()
Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
...
# check the status of the flag
if event.is_set():
return
If your target task function has some resources open, it may need to clean up before returning.
This approach to stopping running tasks may require that you change the structure of your task so that you have a loop allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.
Now that we know how to stop all running tasks, we need to consider the trigger for stopping the tasks.
We want to stop all running tasks if any task raises an exception.
This can be achieved using the wait() module function.
This function takes a collection of Future objects and returns a tuple of two sets, one containing all done tasks and a second that contains all non-done tasks.
Recall that we get one Future object for each call to the submit() function when issuing tasks to the process pool to be executed asynchronously.
By default, the wait() function will return when all passed in tasks have completed. This is controlled via the "return_when" argument that is set to the ALL_COMPLETED constant. We can change the value to the FIRST_EXCEPTION constant which will return when any task raises an exception or when all tasks are completed.
...
# wait until any task raises an exception or all tasks complete
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
Once the call to wait() returns, we can inspect the "done" set. If it contains one Future object and that Future object has an exception via a call to the exception() function that does not return None, then we can trigger all running tasks to stop.
...
# check if a task raised an exception
if len(done) == 1 and done.pop().exception != None:
# stop all running tasks
event.set()
We may also want to cancel any tasks that have not yet started running.
...
# cancel any tasks that are not yet running
for future in futures:
future.cancel()
Now that we know how to stop all running tasks when one task fails, let's look at a worked example.
Example of Stopping All Running Tasks When One Task Fails
Let's develop an example in which a problem occurs in one task and triggers all running tasks to stop when using the ProcessPoolExecutor.
First, we must create the Manager and then create the Event that will be passed to all tasks executed in the process pool.
...
# create a manager for sharing events
with Manager() as manager:
# create an event used to stop running tasks
event = manager.Event()
We can then pass this event to our target function when we submit the task to the pool for execution.
...
# execute one task
future = executor.submit(work, event, ...)
Next, we can define a task function that does work in a loop including a sleep for one second to simulate a IO-bound task. The loop checks the status of the event each iteration and returns if the event is triggered.
Each task has a unique name and we use this to keep track of what is going on.
If the task has a name of 0 (the first task) then it will fail by raising an exception.
# target task function
def work(event, name):
# pretend read data for a long time
for _ in range(10):
# pretend to read some data
sleep(1)
# check if this task should fail
if name == 0:
print(f'Task has failed, name={name}')
raise Exception('Something bad happened')
# check if the task should stop
if event.is_set():
print(f'Stopping, name={name}')
return
So far so good.
Next, we can start a process pool with ten worker processes and submit ten tasks to the process pool with names from 0 to 9.
...
# create a process pool
with ProcessPoolExecutor(10) as executor:
# execute many tasks
futures = [executor.submit(work, event, i) for i in range(10)]
We can then wait for all tasks to complete or for any of the tasks to fail by raising an exception.
...
# wait for all tasks to complete, or one task to fail
print('Waiting for tasks to complete, or fail...')
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
When we finish waiting, we can check if all tasks completed are not yet done and if so, whether an exception occurred.
If an exception did occur in the first (and only) done Future, then we can cancel any scheduled tasks and trigger the event which will cause all running tasks to stop executing.
...
# check if not all tasks are done
if len(done) > 0 and len(done) != len(futures):
# check if an exception was raised
future = done.pop()
if future.exception() != None:
print(f'One task failed with: {future.exception()}, shutting down')
# cancel any scheduled tasks
for future in futures:
future.cancel()
# stop all running tasks
event.set()
And that's it.
Tying this all together, the complete example of stopping all tasks if one task fails in the ProcessPoolExecutor is listed below.
# SuperFastPython.com
# example of stopping all running tasks when one task fails
from time import sleep
from multiprocessing import Manager
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
from concurrent.futures import FIRST_EXCEPTION
# target task function
def work(event, name):
# pretend read data for a long time
for _ in range(10):
# pretend to read some data
sleep(1)
# check if this task should fail
if name == 0:
print(f'Task has failed, name={name}', flush=True)
raise Exception('Something bad happened')
# check if the task should stop
if event.is_set():
print(f'Stopping, name={name}', flush=True)
return
# entry point
if __name__ == '__main__':
# create a manager for sharing events
with Manager() as manager:
# create an event used to stop running tasks
event = manager.Event()
# create a process pool
with ProcessPoolExecutor(10) as executor:
# execute many tasks
futures = [executor.submit(work, event, i) for i in range(10)]
# wait for all tasks to complete, or one task to fail
print('Waiting for tasks to complete, or fail...')
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
# check if not all tasks are done
if len(done) > 0 and len(done) != len(futures):
# check if an exception was raised
future = done.pop()
if future.exception() != None:
print(f'One task failed with: {future.exception()}, shutting down')
# cancel any scheduled tasks
for future in futures:
future.cancel()
# stop all running tasks
event.set()
print('All done.')
Running the example first starts the process pool with ten worker processes then submits ten tasks into the pool and collects the Future objects.
We then wait for all tasks to complete or for one task to fail during execution.
A failure occurs in one task.
The failure triggers the wait() function to stop waiting. The results are interpreted and it is determined that one task raised an exception, therefore any scheduled tasks are cancelled and all running tasks are stopped.
Each task detects the request to stop via the event and stops its execution.
Waiting for tasks to complete, or fail...
Task has failed, name=0
One task failed with: Something bad happened, shutting down
Stopping, name=9
Stopping, name=2
Stopping, name=8
Stopping, name=5
Stopping, name=7
Stopping, name=6
Stopping, name=3
Stopping, name=4
Stopping, name=1
All done.
Takeaways
You now know how to stop all tasks if one asynchronous task fails in the ProcessPoolExecutor.
If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.