Last Updated on September 12, 2022
You can safely stop tasks in the process pool using a shared multiprocessing.Event object.
In this tutorial you will discover how to safely stop tasks in the process pool in Python.
Let’s get started.
Need to Safely Stop All Tasks
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we often need to to safely stop all running tasks.
This may be for many reasons, such as:
- A user requests to close the application.
- An error or fault that negates the tasks.
- A problem accessing a required resource.
How can we safely do all tasks in the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How To Safely Stop All Tasks in the Process Pool
The process pool does not provide a mechanism to safely stop all currently running tasks.
It does provide a way to forcefully terminate all running worker processes via the terminate() function. This approach is too aggressive for most use cases as it will mean the process pool can no longer be used.
Instead, we can develop a mechanism to safely stop all running tasks in a process pool using a multiprocessing.Event object.
Firstly, an Event object must be created and shared among all running tasks.
For example:
1 2 3 |
... # create a shared event event = multiprocessing.Event() |
Recall that an event provides a process-safe boolean variable.
It is created in the False state and can be checked via the is_set() function and made True via the set() function.
You can learn more about how to use a multiprocessing.Event in the tutorial:
The multiprocessing.Event object cannot be passed as an argument to task functions executed in the process pool. It will result in an error such as:
1 |
RuntimeError: Condition objects should only be shared between processes through inheritance |
The error is likely triggered because the multiprocessing.Event object uses a multiprocessing.Condition object internally, which cannot be serialized (pickled) in the manner required by the process pool.
Instead, we have two options to share the Event with tasks in the pool.
- Define the Event as a global variable and inherit the global variable in the child process. This will only work for child processes created using the ‘fork’ start method.
- Define the Event using a Manager, then inherit the global variable or pass it as an argument. This works with both ‘spawn’ and ‘fork’ start methods.
This latter approach is more general and therefore preferred.
A multiprocessing.Manager creates a process and is responsible for managing a centralized version of an object. It then provides proxy objects that can be used in other processes that keep up-to-date with the single centralized object.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
— multiprocessing — Process-based parallelism
As such, using a Manager is a useful way to centralize a synchronization primitive like an event shared among multiple worker processes.
We can first create a multiprocessing.Manager using the context manager interface.
For example:
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
We can then create a shared Event object using the manager.
This will return a proxy object for the Event object in the manager process that we can share among child worker processes directly or indirectly.
For example:
1 2 3 |
... # create a shared event via the manager event = manager.Event() |
It is best to explicitly share state among child processes wherever possible. It’s just easier to read and debug.
Therefore, I recommend passing the shared Event as an argument to the target task function.
For example:
1 2 3 |
# function executed in worker processes def task(event, ...): # ... |
The custom function executing the task can check the status of the Event object periodically, such as each iteration of a loop.
If the Event set, the target task function can then choose to stop, closing any resources if necessary.
1 2 3 4 5 6 7 |
# function executed in worker processes def task(event, ...): # ... while True: # ... if event.is_set(): return |
There are some limitations to this approach.
It requires that you have complete control over the target task function or functions executed in the process pool.
This control is required so that the functions can be changed to take the event as an argument and then to check the status of the event periodically.
- Target task function must take the multiprocessing.Event as an argument.
- Target task function must check the status of the multiprocessing.Event periodically.
The first limitation can be removed by inheriting the event from the parent process as a global variable. Nevertheless, all tasks that you want to stop will need to check the status of the event all the time.
This can be a problem if tasks are performing blocking operations such as reading and/or writing from/to files or sockets.
Now that we know how to safely stop tasks in the process pool, let’s look at a worked example.
Example Safely Stop All Tasks in the Process Pool
We can explore how to safely stop tasks in the process pool.
In this example, we will define a task function that takes a shared event as an argument that loops a number of times and sleeps each iteration. It will check the status of the shared event each iteration and stop the task if the event is set. The main process will issue a number of tasks to the process book, wait a moment then request that all issued tasks safely stop, then wait for the tasks to stop.
Importantly, the process pool is left in a condition where it can be used again to issue more tasks. This is unlike calling the terminate() function to forcefully kill all child worker processes.
Firstly, we can define a custom function to execute tasks in the process pool.
The task function will take a unique integer identifier and a shared event object as arguments. It will report a message that it is starting, then loop 10 times. Each loop it will sleep for one second to simulate computational effort. It will then check the status of the event, and if set it will report a message and break the loop. Finally, a message is reported right before the task is stopped.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# task executed in a worker process def task(identifier, event): print(f'Task {identifier} running', flush=True) # run for a long time for i in range(10): # block for a moment sleep(1) # check if the task should stop if event.is_set(): print(f'Task {identifier} stopping...', flush=True) # stop the task break # report all done print(f'Task {identifier} Done', flush=True) |
Next, in the main process we can create the shared event.
This requires first creating the multiprocessing.Manager that will control the centralized version of the multiprocessing.Event object.
We will use the context manager interface to ensure the Manager is closed once we are finished with it.
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
Next, we will create the multiprocessing.Event using the Manager. This will return a proxy for the centralized Event object that we can safely share with child worker processes directly.
1 2 3 |
... # create the shared event event = manager.Event() |
Next, we will create a process pool with a default configuration.
Again, we will use the context manager interface to ensure that the pool is closed automatically once we are finished using it.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
We will then issue 4 tasks to the process pool.
In this case, we will use the starmap_async() function that supports multiple arguments and allows tasks to be issued asynchronously, allowing the main process to continue on.
Firstly, we will create a list of arguments, with one tuple per task containing a unique integer and the shared event object. We then issue the tasks and receive an AsyncResult as a handle on the issued tasks.
1 2 3 4 5 |
... # prepare arguments items = [(i,event) for i in range(4)] # issue tasks asynchronously result = pool.starmap_async(task, items) |
You can learn more about issuing tasks using the starmap_async() function in the tutorial:
Next, the main process will block for a moment to allow the tasks to get started and run for a while.
1 2 3 |
... # wait a moment sleep(2) |
The main process will then request that all issued tasks safely stop by setting the shared event.
1 2 3 4 |
... # safely stop the issued tasks print('Safely stopping all tasks') event.set() |
The main process will then wait for all tasks to stop.
There are many ways to do this, although in this case will simply wait on the single AsyncResult object returned as a handle on the single batch of tasks issued to the pool.
1 2 3 4 |
... # wait for all tasks to stop result.wait() print('All stopped') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# SuperFastPython.com # example of safely stopping all tasks in the process pool from time import sleep from multiprocessing import Event from multiprocessing import Manager from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, event): print(f'Task {identifier} running', flush=True) # run for a long time for i in range(10): # block for a moment sleep(1) # check if the task should stop if event.is_set(): print(f'Task {identifier} stopping...', flush=True) # stop the task break # report all done print(f'Task {identifier} Done', flush=True) # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared event event = manager.Event() # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i,event) for i in range(4)] # issue tasks asynchronously result = pool.starmap_async(task, items) # wait a moment sleep(2) # safely stop the issued tasks print('Safely stopping all tasks') event.set() # wait for all tasks to stop result.wait() print('All stopped') |
Running the example first creates the manager, then creates the shared event object from the manager.
The process pool is then created using the default configuration.
The arguments for the tasks are prepared, then the four tasks are issued asynchronously to the process pool.
The main process then blocks for a moment.
Each task starts, reporting a message then starting its main loop.
The main process wakes up and sets the event, requesting all issued tasks to stop. It then waits on the AsyncResult for the issued tasks to stop.
Each task checks the status of the event each iteration of its main loop. They notice that the event is set, break their main loop, report a final message then return, stopping the task safely.
All tasks stop, allowing the main process to continue on, ending the application.
This highlights how we can safely stop all tasks in the process pool in a controlled manner.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Task 0 running Task 2 running Task 1 running Task 3 running Safely stopping all tasks Task 1 stopping... Task 1 Done Task 0 stopping... Task 0 Done Task 3 stopping... Task 3 Done Task 2 stopping... Task 2 Done All stopped |
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to safely stop tasks in the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Samuel Bryngelsson on Unsplash
Do you have any questions?