Multiprocessing Pool Stop All Tasks If One Task Fails in Python

July 23, 2022 Python Multiprocessing Pool

You can cancel all tasks in the multiprocessing pool if one task fails using a shared multiprocessing.Event object.

In this tutorial you will discover how to cancel all tasks in the Python process pool if one task fails.

Let's get started.

Need To Stop All Tasks on Exception

The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.

A process pool can be configured when it is created, which will prepare the child workers.

A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

-- multiprocessing — Process-based parallelism

We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().

Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().

When using the process pool, we may need to cancel or stop all tasks if one task fails with an exception.

The process pool does not provide this capability.

How can we cancel all issued tasks if one task fails with an exception?

How to Cancel All Tasks if One Task Fails

We can cancel all tasks if one task fails using a shared multiprocessing.Event.

This approach assumes a few things, such as:

Firstly, a shared event must be prepared in the main process.

We cannot just create an Event object and share it with worker processes. This will result in an exception, such as:

RuntimeError: Condition objects should only be shared between processes through inheritance

Instead, we must create the event using a multiprocessing.Manager.

A Manager allows us to create a single Event centralized in a process, then share a proxy object for the event that interacts with the single centralized event object in the manager behind the scenes.

First, we can create a Manager instance. This can be achieved using the context manager interface to ensure the manager is closed once we are finished using it.

...
# create a manager
with Manager() as manager:
	# ...

We can then create the shared multiprocessing.Event instance using the manager.

...
# create a shared event
shared_event = manager.Event()

The Event instance can then be shared with each worker process.

This can be achieved by defining a custom initialization function that takes the shared Event instance as an argument and stores it in a global variable for the worker process.

# initialize worker processes
def init_worker(shared_event):
    # store the event as a global in the worker process
    global event
    event = shared_event

We can then create a multiprocessing.pool.Pool instance and configure it to call the custom worker child process initialization function and pass the shared event object as an argument.

...
# create the process pool
pool = Pool(initializer=init_worker, initargs=(shared_event,))

Finally, each custom task function executed by the process pool can check or set the status of the event.

A task that fails can set the event via the set() function.

For example:

...
# cancel all tasks
event.set()

Then all tasks that can be canceled can check if the event has been set periodically, such as within each iteration of a loop.

For example:

...
# check for stop
if event.is_set():
    return

You can learn more about using a shared multiprocessing.Event in the tutorial:

Now that we know how to cancel all tasks if one task fails in the process pool, we can look at a worked example.

Example of Canceling All Tasks If One Task Fails

We can explore how to cancel all tasks in the process pool if one task fails.

In this example, we will define a custom task function that loops some number of times. One of the issued tasks will fail, report a message and signal all other tasks to stop. Each task will check if they should cancel each iteration and once canceled, they stop as soon as possible.

Firstly, we can define a custom function used to initialize each child worker process.

The initialization function will take a shared event object as an argument. It will then define a global variable and store the provided shared event object in the global variable.

Importantly, the shared event stored in the global variable will be accessible to all tasks in the process pool that are executed by the process, and therefore all child worker processes as all workers are initialized the same way.

The init_worker() function below implements this.

# initialize worker processes
def init_worker(shared_event):
    # store the event as a global in the worker process
    global event
    event = shared_event

Next, we can define a custom function for executing tasks in the process pool.

The task function will take a unique integer argument to identify the task.

# task executed in a worker process
def task(identifier):
	# ...

The task will loop four times. Each iteration, it will first check the status of the shared event to see if the task should be canceled or not. If so, it will report a message and return from the task function immediately. It then blocks for a random fraction of a second to simulate computational effort.

...
# loop for a while
for i in range(4):
    # check for stop
    if event.is_set():
        print(f'Task {identifier} stopped', flush=True)
        return
    # block for a moment
    sleep(random())

One of the tasks will conditionally fail. Specifically, if the number of iterations is above 2 and if the task is number 5. If so, it will report a message and signal all tasks to cancel via the shared event.

...
# conditionally fail
if i > 2 and identifier == 5:
    print('Something bad happened!', flush=True)
    # cancel all tasks
    event.set()
    return

Finally, if the task completes normally, it will report a message.

...
# report done
print(f'Task {identifier} done', flush=True)

Tying this together, the complete task() function is listed below.

# task executed in a worker process
def task(identifier):
    # loop for a while
    for i in range(4):
        # check for stop
        if event.is_set():
            print(f'Task {identifier} stopped', flush=True)
            return
        # block for a moment
        sleep(random())
        # conditionally fail
        if i > 2 and identifier == 5:
            print('Something bad happened!', flush=True)
            # cancel all tasks
            event.set()
            return
    # report done
    print(f'Task {identifier} done', flush=True)

Next, the main process will create a manager used to create and manage centralized objects, in this case a centralized Event object.

...
# create a manager
with Manager() as manager:
	# ...

We then use the manager to create a shared event that returns a proxy object that can be shared among all child worker processes safely and used to interact with the centralized event object.

...
# create a shared event
shared_event = manager.Event()

We can then create a process pool. We will configure it to initialize each child worker process with our custom initialization function and pass the shared event as an object.

The process pool will be created using the context manager interface to ensure the pool is closed automatically for us once we are finished with it.

...
# create and configure the process pool
with Pool(initializer=init_worker, initargs=(shared_event,)) as pool:
	# ...

You can learn more about the context manager interface in the tutorial:

We will then issue 10 tasks to the process pool, calling our custom task() function with numbers 0 through 9. We will use the map_async() to issue the tasks asynchronously, which will return an AsyncResult object.

...
# issue tasks into the process pool
result = pool.map_async(task, range(10))

You can learn more about issuing asynchronous tasks to the process pool with the map_async() function in the tutorial:

Finally, the main process will wait on the AsyncResult object for the issued tasks to complete, successfully or otherwise.

...
# wait for tasks to complete
result.wait()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of canceling all tasks if one task fails with an exception
from random import random
from time import sleep
from multiprocessing.pool import Pool
from multiprocessing import Manager

# initialize worker processes
def init_worker(shared_event):
    # store the event as a global in the worker process
    global event
    event = shared_event

# task executed in a worker process
def task(identifier):
    # loop for a while
    for i in range(4):
        # check for stop
        if event.is_set():
            print(f'Task {identifier} stopped', flush=True)
            return
        # block for a moment
        sleep(random())
        # conditionally fail
        if i > 2 and identifier == 5:
            print('Something bad happened!', flush=True)
            # cancel all tasks
            event.set()
            return
    # report done
    print(f'Task {identifier} done', flush=True)

# protect the entry point
if __name__ == '__main__':
    # create a manager
    with Manager() as manager:
        # create a shared event
        shared_event = manager.Event()
        # create and configure the process pool
        with Pool(initializer=init_worker, initargs=(shared_event,)) as pool:
            # issue tasks into the process pool
            result = pool.map_async(task, range(10))
            # wait for tasks to complete
            result.wait()
        # process pool is closed automatically

Running the example first creates the manager. The manager is then used to create the shared event, returning proxy objects that can be shared safely with the child worker processes.

The main process then creates the process pool and configures it to initialize each child worker process.

The pool is started and each child worker process is initialized with the custom initialization function. Each child worker process stores the proxy for the shared event in a global variable, making it accessible to all tasks executed by the process, and in turn by all processes in the pool.

The main process then issues ten tasks to the process pool and then blocks until the tasks are done.

Tasks begin running in the process pool. Each task loops, checking the status of the shared event each iteration, then blocking for a random fraction of a second. Some tasks finish and report a message.

One task conditionally fails. It reports a message, then signals all other tasks to cancel by setting the shared event.

The running tasks check the status of the event on their next iteration. They notice that the shared event is set and then return immediately, canceling their tasks.

All tasks are finished and the main process continues on and closes the process pool.

Note, the specific tasks that finish and are canceled will differ each time the code is run given the use of random numbers.

Task 3 done
Task 4 done
Task 0 done
Something bad happened!
Task 8 stopped
Task 6 done
Task 2 stopped
Task 1 done
Task 9 stopped
Task 7 done

Takeaways

You now know how to cancel all tasks in the Python process pool if one task fails.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.