How To Stop Running Tasks in the ProcessPoolExecutor in Python

January 31, 2022 Python ProcessPoolExecutor

You can stop running tasks in a ProcessPoolExecutor using a multiprocessing.Event.

In this tutorial you will discover how you can update your tasks running in a ProcessPoolExecutor so that they stop running whenever you want.

Let's get started.

Calling cancel() Does Not Stop Running Tasks

The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently.

You can add tasks to the pool by calling submit() with your function name, which will return a Future object.

You can call the cancel() function on the Future object to cancel the task before it has started running.

...
# execute one task
future = executor.submit(work)
# cancel the task
print('Cancel the task...')
future.cancel()

If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.

The example below demonstrates this by starting a task that takes ten seconds, then attempting to stop the task by calling cancel.

# SuperFastPython.com
# example of calling cancel not stopping the task
from time import sleep
from concurrent.futures import ProcessPoolExecutor

# target task function
def work():
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)

# entry point
if __name__ == '__main__':
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute one task
        future = executor.submit(work)
        # wait a moment
        print('The task is running...')
        sleep(2)
        # try to stop by calling the cancel function
        print('Cancel the task...')
        future.cancel()
        # waiting for all tasks to complete
        print('Waiting...')
    print('All done.')

Running the example starts the task as per normal.

After two seconds, we attempt to stop the task by calling the cancel() function on the Future object. This does not stop the task because it is already running.

We must wait for the task to complete before we can close the process pool and continue on.

The task is running...
Cancel the task...
Waiting...
All done.

Next, let's look at how we might update a target task function to check a process-safe flag to indicate whether it should stop running.

How to Stop a Running Task

Although the ProcessPoolExecutor does not provide a facility to stop running tasks, we can update our target task functions to stop running when a process-safe flag is set.

This can be implemented using multiprocessing.Event.

First, it requires that you first create a multiprocessing.Event to control when running tasks should stop.

We cannot create Event objects directly and pass them around when working with processes, as we can when working with threads. Instead, we must use a multiprocessing.Manager to create the Event object for us.

Managers are used to create and manage shared state between processes, like data and variables.

You can learn more about Events and Managers here:

Managers have to be created and closed which can be achieved seamlessly using the context manager, for example:

...
# create the manager used for sharing state between processes
with Manager() as manager:
    # ...

Next, we can use the Manager to create the event:

...
# create an event to shut down all running tasks
event = manager.Event()

This event can then be passed to each target task function as an argument to be checked within separate processes.

...
# execute the task, passing the event
future = executor.submit(work, event)

Once the task is running, it can be stopped from the main process by setting the flag on the Event.

...
# stop running tasks via the event
event.set()

Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.

If the event is set, the task can then stop running, perhaps by returning immediately.

...
# check the status of the flag
if event.is_set():
    return

If your target task function has some resources open, it may need to clean up before returning.

This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.

For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.

Now that we know how to stop a task running, let's look at a worked example.

Example of Stopping a Running Task

We can update our previous example to check the status of the process-safe flag and stop when the flag is set.

First, we must create the Manager and then create the Event that will be passed to all tasks executed in the process pool.

...
# create a manager for sharing events
with Manager() as manager:
    # create an event used to stop running tasks
    event = manager.Event()

We can then pass this event to our target function when we submit the task to the pool for execution.

...
# execute one task
future = executor.submit(work, event)

Next, we can update our target task function to take the event as an argument and to check the status of the event each iteration via the is_set() function on the event.

If the event is set, the target task function can return immediately.

# target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

It is a good practice to cancel tasks in the process pool, just in case they have not yet started running.

...
# cancel the task, just in case it is not yet running
future.cancel()

Finally, we can stop the running task by calling the set() function on the event any time.

...
# stop the running task using the flag
print('Stopping the task...')
event.set()

And that's it.

Tying this together, the example below provides a template you can use for adding an event flag to your target task function to check for a stop condition to shutdown all currently running tasks.

# SuperFastPython.com
# example of stopping a running task in a process pool
from time import sleep
from multiprocessing import Manager
from multiprocessing import Event
from concurrent.futures import ProcessPoolExecutor

# target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

# entry point
if __name__ == '__main__':
    with Manager() as manager:
        # create an event used to stop running tasks
        event = manager.Event()
        # create a process pool
        with ProcessPoolExecutor() as executor:
            # execute one task
            future = executor.submit(work, event)
            # wait a moment
            print('The task is running...')
            sleep(2)
            # cancel the task, just in case it is not yet running
            future.cancel()
            # stop the running task using the flag
            print('Stopping the task...')
            event.set()
            # waiting for all tasks to complete
            print('Waiting...')
    print('All done.')

Running the example first creates a process pool and schedules a task.

An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.

The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case they have not yet started executing.

We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.

The task is running...
Stopping the task...
Waiting...
All done.

Takeaways

You now know how to use a multiprocessing.Event to stop tasks executed by a ProcessPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.