Last Updated on September 12, 2022
You can stop running tasks in a ProcessPoolExecutor using a multiprocessing.Event.
In this tutorial you will discover how you can update your tasks running in a ProcessPoolExecutor so that they stop running whenever you want.
Let’s get started.
Calling cancel() Does Not Stop Running Tasks
The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently.
You can add tasks to the pool by calling submit() with your function name, which will return a Future object.
You can call the cancel() function on the Future object to cancel the task before it has started running.
1 2 3 4 5 6 |
... # execute one task future = executor.submit(work) # cancel the task print('Cancel the task...') future.cancel() |
If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.
The example below demonstrates this by starting a task that takes ten seconds, then attempting to stop the task by calling cancel.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of calling cancel not stopping the task from time import sleep from concurrent.futures import ProcessPoolExecutor # target task function def work(): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as executor: # execute one task future = executor.submit(work) # wait a moment print('The task is running...') sleep(2) # try to stop by calling the cancel function print('Cancel the task...') future.cancel() # waiting for all tasks to complete print('Waiting...') print('All done.') |
Running the example starts the task as per normal.
After two seconds, we attempt to stop the task by calling the cancel() function on the Future object. This does not stop the task because it is already running.
We must wait for the task to complete before we can close the process pool and continue on.
1 2 3 4 |
The task is running... Cancel the task... Waiting... All done. |
Next, let’s look at how we might update a target task function to check a process-safe flag to indicate whether it should stop running.
Run loops using all CPUs, download your FREE book to learn how.
How to Stop a Running Task
Although the ProcessPoolExecutor does not provide a facility to stop running tasks, we can update our target task functions to stop running when a process-safe flag is set.
This can be implemented using multiprocessing.Event.
First, it requires that you first create a multiprocessing.Event to control when running tasks should stop.
We cannot create Event objects directly and pass them around when working with processes, as we can when working with threads. Instead, we must use a multiprocessing.Manager to create the Event object for us.
Managers are used to create and manage shared state between processes, like data and variables.
You can learn more about Events and Managers here:
Managers have to be created and closed which can be achieved seamlessly using the context manager, for example:
1 2 3 4 |
... # create the manager used for sharing state between processes with Manager() as manager: # ... |
Next, we can use the Manager to create the event:
1 2 3 |
... # create an event to shut down all running tasks event = manager.Event() |
This event can then be passed to each target task function as an argument to be checked within separate processes.
1 2 3 |
... # execute the task, passing the event future = executor.submit(work, event) |
Once the task is running, it can be stopped from the main process by setting the flag on the Event.
1 2 3 |
... # stop running tasks via the event event.set() |
Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
1 2 3 4 |
... # check the status of the flag if event.is_set(): return |
If your target task function has some resources open, it may need to clean up before returning.
This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.
Now that we know how to stop a task running, let’s look at a worked example.
Example of Stopping a Running Task
We can update our previous example to check the status of the process-safe flag and stop when the flag is set.
First, we must create the Manager and then create the Event that will be passed to all tasks executed in the process pool.
1 2 3 4 5 |
... # create a manager for sharing events with Manager() as manager: # create an event used to stop running tasks event = manager.Event() |
We can then pass this event to our target function when we submit the task to the pool for execution.
1 2 3 |
... # execute one task future = executor.submit(work, event) |
Next, we can update our target task function to take the event as an argument and to check the status of the event each iteration via the is_set() function on the event.
If the event is set, the target task function can return immediately.
1 2 3 4 5 6 7 8 9 |
# target task function def work(event): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if the task should stop if event.is_set(): return |
It is a good practice to cancel tasks in the process pool, just in case they have not yet started running.
1 2 3 |
... # cancel the task, just in case it is not yet running future.cancel() |
Finally, we can stop the running task by calling the set() function on the event any time.
1 2 3 4 |
... # stop the running task using the flag print('Stopping the task...') event.set() |
And that’s it.
Tying this together, the example below provides a template you can use for adding an event flag to your target task function to check for a stop condition to shutdown all currently running tasks.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of stopping a running task in a process pool from time import sleep from multiprocessing import Manager from multiprocessing import Event from concurrent.futures import ProcessPoolExecutor # target task function def work(event): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if the task should stop if event.is_set(): return # entry point if __name__ == '__main__': with Manager() as manager: # create an event used to stop running tasks event = manager.Event() # create a process pool with ProcessPoolExecutor() as executor: # execute one task future = executor.submit(work, event) # wait a moment print('The task is running...') sleep(2) # cancel the task, just in case it is not yet running future.cancel() # stop the running task using the flag print('Stopping the task...') event.set() # waiting for all tasks to complete print('Waiting...') print('All done.') |
Running the example first creates a process pool and schedules a task.
An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.
The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case they have not yet started executing.
We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.
1 2 3 4 |
The task is running... Stopping the task... Waiting... All done. |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to use a multiprocessing.Event to stop tasks executed by a ProcessPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to help.
Photo by paul jespers on Unsplash
Grzegorz says
Hi Jason,
This solution is fantastic, worked a charm. Just what I was looking for!
Jason Brownlee says
I’m happy to hear that!