Stop All Tasks in the Multiprocessing Pool in Python
You can safely stop tasks in the process pool using a shared multiprocessing.Event object.
In this tutorial you will discover how to safely stop tasks in the process pool in Python.
Let's get started.
Need to Safely Stop All Tasks
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
-- multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we often need to to safely stop all running tasks.
This may be for many reasons, such as:
- A user requests to close the application.
- An error or fault that negates the tasks.
- A problem accessing a required resource.
How can we safely do all tasks in the process pool?
How To Safely Stop All Tasks in the Process Pool
The process pool does not provide a mechanism to safely stop all currently running tasks.
It does provide a way to forcefully terminate all running worker processes via the terminate() function. This approach is too aggressive for most use cases as it will mean the process pool can no longer be used.
Instead, we can develop a mechanism to safely stop all running tasks in a process pool using a multiprocessing.Event object.
Firstly, an Event object must be created and shared among all running tasks.
For example:
...
# create a shared event
event = multiprocessing.Event()
Recall that an event provides a process-safe boolean variable.
It is created in the False state and can be checked via the is_set() function and made True via the set() function.
You can learn more about how to use a multiprocessing.Event in the tutorial:
The multiprocessing.Event object cannot be passed as an argument to task functions executed in the process pool. It will result in an error such as:
RuntimeError: Condition objects should only be shared between processes through inheritance
The error is likely triggered because the multiprocessing.Event object uses a multiprocessing.Condition object internally, which cannot be serialized (pickled) in the manner required by the process pool.
Instead, we have two options to share the Event with tasks in the pool.
- Define the Event as a global variable and inherit the global variable in the child process. This will only work for child processes created using the 'fork' start method.
- Define the Event using a Manager, then inherit the global variable or pass it as an argument. This works with both 'spawn' and 'fork' start methods.
This latter approach is more general and therefore preferred.
A multiprocessing.Manager creates a process and is responsible for managing a centralized version of an object. It then provides proxy objects that can be used in other processes that keep up-to-date with the single centralized object.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
-- multiprocessing — Process-based parallelism
As such, using a Manager is a useful way to centralize a synchronization primitive like an event shared among multiple worker processes.
We can first create a multiprocessing.Manager using the context manager interface.
For example:
...
# create the manager
with Manager() as manager:
# ...
We can then create a shared Event object using the manager.
This will return a proxy object for the Event object in the manager process that we can share among child worker processes directly or indirectly.
For example:
...
# create a shared event via the manager
event = manager.Event()
It is best to explicitly share state among child processes wherever possible. It's just easier to read and debug.
Therefore, I recommend passing the shared Event as an argument to the target task function.
For example:
# function executed in worker processes
def task(event, ...):
# ...
The custom function executing the task can check the status of the Event object periodically, such as each iteration of a loop.
If the Event set, the target task function can then choose to stop, closing any resources if necessary.
# function executed in worker processes
def task(event, ...):
# ...
while True:
# ...
if event.is_set():
return
There are some limitations to this approach.
It requires that you have complete control over the target task function or functions executed in the process pool.
This control is required so that the functions can be changed to take the event as an argument and then to check the status of the event periodically.
- Target task function must take the multiprocessing.Event as an argument.
- Target task function must check the status of the multiprocessing.Event periodically.
The first limitation can be removed by inheriting the event from the parent process as a global variable. Nevertheless, all tasks that you want to stop will need to check the status of the event all the time.
This can be a problem if tasks are performing blocking operations such as reading and/or writing from/to files or sockets.
Now that we know how to safely stop tasks in the process pool, let's look at a worked example.
Example Safely Stop All Tasks in the Process Pool
We can explore how to safely stop tasks in the process pool.
In this example, we will define a task function that takes a shared event as an argument that loops a number of times and sleeps each iteration. It will check the status of the shared event each iteration and stop the task if the event is set. The main process will issue a number of tasks to the process book, wait a moment then request that all issued tasks safely stop, then wait for the tasks to stop.
Importantly, the process pool is left in a condition where it can be used again to issue more tasks. This is unlike calling the terminate() function to forcefully kill all child worker processes.
Firstly, we can define a custom function to execute tasks in the process pool.
The task function will take a unique integer identifier and a shared event object as arguments. It will report a message that it is starting, then loop 10 times. Each loop it will sleep for one second to simulate computational effort. It will then check the status of the event, and if set it will report a message and break the loop. Finally, a message is reported right before the task is stopped.
The task() function below implements this.
# task executed in a worker process
def task(identifier, event):
print(f'Task {identifier} running', flush=True)
# run for a long time
for i in range(10):
# block for a moment
sleep(1)
# check if the task should stop
if event.is_set():
print(f'Task {identifier} stopping...', flush=True)
# stop the task
break
# report all done
print(f'Task {identifier} Done', flush=True)
Next, in the main process we can create the shared event.
This requires first creating the multiprocessing.Manager that will control the centralized version of the multiprocessing.Event object.
We will use the context manager interface to ensure the Manager is closed once we are finished with it.
...
# create the manager
with Manager() as manager:
# ...
Next, we will create the multiprocessing.Event using the Manager. This will return a proxy for the centralized Event object that we can safely share with child worker processes directly.
...
# create the shared event
event = manager.Event()
Next, we will create a process pool with a default configuration.
Again, we will use the context manager interface to ensure that the pool is closed automatically once we are finished using it.
...
# create and configure the process pool
with Pool() as pool:
# ...
We will then issue 4 tasks to the process pool.
In this case, we will use the starmap_async() function that supports multiple arguments and allows tasks to be issued asynchronously, allowing the main process to continue on.
Firstly, we will create a list of arguments, with one tuple per task containing a unique integer and the shared event object. We then issue the tasks and receive an AsyncResult as a handle on the issued tasks.
...
# prepare arguments
items = [(i,event) for i in range(4)]
# issue tasks asynchronously
result = pool.starmap_async(task, items)
You can learn more about issuing tasks using the starmap_async() function in the tutorial:
Next, the main process will block for a moment to allow the tasks to get started and run for a while.
...
# wait a moment
sleep(2)
The main process will then request that all issued tasks safely stop by setting the shared event.
...
# safely stop the issued tasks
print('Safely stopping all tasks')
event.set()
The main process will then wait for all tasks to stop.
There are many ways to do this, although in this case will simply wait on the single AsyncResult object returned as a handle on the single batch of tasks issued to the pool.
...
# wait for all tasks to stop
result.wait()
print('All stopped')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of safely stopping all tasks in the process pool
from time import sleep
from multiprocessing import Event
from multiprocessing import Manager
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, event):
print(f'Task {identifier} running', flush=True)
# run for a long time
for i in range(10):
# block for a moment
sleep(1)
# check if the task should stop
if event.is_set():
print(f'Task {identifier} stopping...', flush=True)
# stop the task
break
# report all done
print(f'Task {identifier} Done', flush=True)
# protect the entry point
if __name__ == '__main__':
# create the manager
with Manager() as manager:
# create the shared event
event = manager.Event()
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items = [(i,event) for i in range(4)]
# issue tasks asynchronously
result = pool.starmap_async(task, items)
# wait a moment
sleep(2)
# safely stop the issued tasks
print('Safely stopping all tasks')
event.set()
# wait for all tasks to stop
result.wait()
print('All stopped')
Running the example first creates the manager, then creates the shared event object from the manager.
The process pool is then created using the default configuration.
The arguments for the tasks are prepared, then the four tasks are issued asynchronously to the process pool.
The main process then blocks for a moment.
Each task starts, reporting a message then starting its main loop.
The main process wakes up and sets the event, requesting all issued tasks to stop. It then waits on the AsyncResult for the issued tasks to stop.
Each task checks the status of the event each iteration of its main loop. They notice that the event is set, break their main loop, report a final message then return, stopping the task safely.
All tasks stop, allowing the main process to continue on, ending the application.
This highlights how we can safely stop all tasks in the process pool in a controlled manner.
Task 0 running
Task 2 running
Task 1 running
Task 3 running
Safely stopping all tasks
Task 1 stopping...
Task 1 Done
Task 0 stopping...
Task 0 Done
Task 3 stopping...
Task 3 Done
Task 2 stopping...
Task 2 Done
All stopped
Takeaways
You now know how to safely stop tasks in the process pool in Python.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.