Last Updated on September 12, 2022
You can cancel all tasks in the multiprocessing pool if one task fails using a shared multiprocessing.Event object.
In this tutorial you will discover how to cancel all tasks in the Python process pool if one task fails.
Let’s get started.
Need To Stop All Tasks on Exception
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to cancel or stop all tasks if one task fails with an exception.
The process pool does not provide this capability.
How can we cancel all issued tasks if one task fails with an exception?
Run loops using all CPUs, download your FREE book to learn how.
How to Cancel All Tasks if One Task Fails
We can cancel all tasks if one task fails using a shared multiprocessing.Event.
This approach assumes a few things, such as:
- Tasks know when they have failed, e.g. catch an exception or check state.
- Tasks are able to check the status of a shared event periodically, e.g. each iteration of a loop.
Firstly, a shared event must be prepared in the main process.
We cannot just create an Event object and share it with worker processes. This will result in an exception, such as:
1 |
RuntimeError: Condition objects should only be shared between processes through inheritance |
Instead, we must create the event using a multiprocessing.Manager.
A Manager allows us to create a single Event centralized in a process, then share a proxy object for the event that interacts with the single centralized event object in the manager behind the scenes.
First, we can create a Manager instance. This can be achieved using the context manager interface to ensure the manager is closed once we are finished using it.
1 2 3 4 |
... # create a manager with Manager() as manager: # ... |
We can then create the shared multiprocessing.Event instance using the manager.
1 2 3 |
... # create a shared event shared_event = manager.Event() |
The Event instance can then be shared with each worker process.
This can be achieved by defining a custom initialization function that takes the shared Event instance as an argument and stores it in a global variable for the worker process.
1 2 3 4 5 |
# initialize worker processes def init_worker(shared_event): # store the event as a global in the worker process global event event = shared_event |
We can then create a multiprocessing.pool.Pool instance and configure it to call the custom worker child process initialization function and pass the shared event object as an argument.
1 2 3 |
... # create the process pool pool = Pool(initializer=init_worker, initargs=(shared_event,)) |
Finally, each custom task function executed by the process pool can check or set the status of the event.
A task that fails can set the event via the set() function.
For example:
1 2 3 |
... # cancel all tasks event.set() |
Then all tasks that can be canceled can check if the event has been set periodically, such as within each iteration of a loop.
For example:
1 2 3 4 |
... # check for stop if event.is_set(): return |
You can learn more about using a shared multiprocessing.Event in the tutorial:
Now that we know how to cancel all tasks if one task fails in the process pool, we can look at a worked example.
Example of Canceling All Tasks If One Task Fails
We can explore how to cancel all tasks in the process pool if one task fails.
In this example, we will define a custom task function that loops some number of times. One of the issued tasks will fail, report a message and signal all other tasks to stop. Each task will check if they should cancel each iteration and once canceled, they stop as soon as possible.
Firstly, we can define a custom function used to initialize each child worker process.
The initialization function will take a shared event object as an argument. It will then define a global variable and store the provided shared event object in the global variable.
Importantly, the shared event stored in the global variable will be accessible to all tasks in the process pool that are executed by the process, and therefore all child worker processes as all workers are initialized the same way.
The init_worker() function below implements this.
1 2 3 4 5 |
# initialize worker processes def init_worker(shared_event): # store the event as a global in the worker process global event event = shared_event |
Next, we can define a custom function for executing tasks in the process pool.
The task function will take a unique integer argument to identify the task.
1 2 3 |
# task executed in a worker process def task(identifier): # ... |
The task will loop four times. Each iteration, it will first check the status of the shared event to see if the task should be canceled or not. If so, it will report a message and return from the task function immediately. It then blocks for a random fraction of a second to simulate computational effort.
1 2 3 4 5 6 7 8 9 |
... # loop for a while for i in range(4): # check for stop if event.is_set(): print(f'Task {identifier} stopped', flush=True) return # block for a moment sleep(random()) |
One of the tasks will conditionally fail. Specifically, if the number of iterations is above 2 and if the task is number 5. If so, it will report a message and signal all tasks to cancel via the shared event.
1 2 3 4 5 6 7 |
... # conditionally fail if i > 2 and identifier == 5: print('Something bad happened!', flush=True) # cancel all tasks event.set() return |
Finally, if the task completes normally, it will report a message.
1 2 3 |
... # report done print(f'Task {identifier} done', flush=True) |
Tying this together, the complete task() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# task executed in a worker process def task(identifier): # loop for a while for i in range(4): # check for stop if event.is_set(): print(f'Task {identifier} stopped', flush=True) return # block for a moment sleep(random()) # conditionally fail if i > 2 and identifier == 5: print('Something bad happened!', flush=True) # cancel all tasks event.set() return # report done print(f'Task {identifier} done', flush=True) |
Next, the main process will create a manager used to create and manage centralized objects, in this case a centralized Event object.
1 2 3 4 |
... # create a manager with Manager() as manager: # ... |
We then use the manager to create a shared event that returns a proxy object that can be shared among all child worker processes safely and used to interact with the centralized event object.
1 2 3 |
... # create a shared event shared_event = manager.Event() |
We can then create a process pool. We will configure it to initialize each child worker process with our custom initialization function and pass the shared event as an object.
The process pool will be created using the context manager interface to ensure the pool is closed automatically for us once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool(initializer=init_worker, initargs=(shared_event,)) as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We will then issue 10 tasks to the process pool, calling our custom task() function with numbers 0 through 9. We will use the map_async() to issue the tasks asynchronously, which will return an AsyncResult object.
1 2 3 |
... # issue tasks into the process pool result = pool.map_async(task, range(10)) |
You can learn more about issuing asynchronous tasks to the process pool with the map_async() function in the tutorial:
Finally, the main process will wait on the AsyncResult object for the issued tasks to complete, successfully or otherwise.
1 2 3 |
... # wait for tasks to complete result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# SuperFastPython.com # example of canceling all tasks if one task fails with an exception from random import random from time import sleep from multiprocessing.pool import Pool from multiprocessing import Manager # initialize worker processes def init_worker(shared_event): # store the event as a global in the worker process global event event = shared_event # task executed in a worker process def task(identifier): # loop for a while for i in range(4): # check for stop if event.is_set(): print(f'Task {identifier} stopped', flush=True) return # block for a moment sleep(random()) # conditionally fail if i > 2 and identifier == 5: print('Something bad happened!', flush=True) # cancel all tasks event.set() return # report done print(f'Task {identifier} done', flush=True) # protect the entry point if __name__ == '__main__': # create a manager with Manager() as manager: # create a shared event shared_event = manager.Event() # create and configure the process pool with Pool(initializer=init_worker, initargs=(shared_event,)) as pool: # issue tasks into the process pool result = pool.map_async(task, range(10)) # wait for tasks to complete result.wait() # process pool is closed automatically |
Running the example first creates the manager. The manager is then used to create the shared event, returning proxy objects that can be shared safely with the child worker processes.
The main process then creates the process pool and configures it to initialize each child worker process.
The pool is started and each child worker process is initialized with the custom initialization function. Each child worker process stores the proxy for the shared event in a global variable, making it accessible to all tasks executed by the process, and in turn by all processes in the pool.
The main process then issues ten tasks to the process pool and then blocks until the tasks are done.
Tasks begin running in the process pool. Each task loops, checking the status of the shared event each iteration, then blocking for a random fraction of a second. Some tasks finish and report a message.
One task conditionally fails. It reports a message, then signals all other tasks to cancel by setting the shared event.
The running tasks check the status of the event on their next iteration. They notice that the shared event is set and then return immediately, canceling their tasks.
All tasks are finished and the main process continues on and closes the process pool.
Note, the specific tasks that finish and are canceled will differ each time the code is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 |
Task 3 done Task 4 done Task 0 done Something bad happened! Task 8 stopped Task 6 done Task 2 stopped Task 1 done Task 9 stopped Task 7 done |
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to cancel all tasks in the Python process pool if one task fails.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jeremy Bezanger on Unsplash
Pavel says
Hi Jason!
Can’t this example be done without an initializer?
Jason Brownlee says
Yes, two other ways I can think of:
An event created by a manager can be passed to all tasks as an argument, for example:
https://superfastpython.com/multiprocessing-pool-event/
It may also be inherited as a global variable when using the ‘fork’ start method, for example:
https://superfastpython.com/multiprocessing-inherit-global-variables-in-python/
The approach I’ve proposed does not require that the task is modified and will work with the ‘spawn’ start method.
I believe your code example fails, the event is not made available to workers in the pool.