ThreadPool Stop All Tasks If One Task Fails in Python
You can cancel all tasks in the ThreadPool if one task fails using a shared Event object.
In this tutorial, you will discover how to cancel all tasks in the Python ThreadPool if one task fails.
Let's get started.
Need To Stop All Tasks on Exception
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
-- multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may need to cancel or stop all tasks if one task fails with an exception.
The ThreadPool does not provide this capability.
How can we cancel all issued tasks if one task fails with an exception?
How to Cancel All Tasks if One Task Fails
We can cancel all tasks if one task fails using a shared Event.
This approach assumes a few things, such as:
- Tasks know when they have failed, e.g. catch an exception or check state.
- Tasks are able to check the status of a shared event periodically, e.g. each iteration of a loop.
Firstly, a shared event must be prepared in the main thread.
...
# create a shared event
shared_event = threading.Event()
The Event instance can then be shared with each worker thread.
It could be passed via an argument to each task.
For example:
# task executed in a worker thread
def task(event):
# ...
Tasks can then interact with the shared Event.
A task that fails can set the event via the set() function.
For example:
...
# cancel all tasks
event.set()
Then all tasks that can be canceled can check if the event has been set periodically, such as within each iteration of a loop.
For example:
...
# check for stop
if event.is_set():
return
You can learn more about using a shared Event in the tutorial:
Now that we know how to cancel all tasks if one task fails in the ThreadPool, we can look at a worked example.
Example of Canceling All Tasks If One Task Fails
We can explore how to cancel all tasks in the ThreadPool if one task fails.
In this example, we will define a custom task function that loops a number of times. One of the issued tasks will fail, report a message and signal all other tasks to stop. Each task will check if they should cancel each iteration and once canceled, they stop as soon as possible.
Firstly, we can define a target task function that will take the shared event and a unique integer argument to identify the task.
# task executed in a worker thread
def task(event, identifier):
# ...
The task will loop four times. Each iteration, it will first check the status of the shared event to see if the task should be canceled or not. If so, it will report a message and return from the task function immediately. It then blocks for a random fraction of a second to simulate computational effort.
...
# loop for a while
for i in range(4):
# check for stop
if event.is_set():
print(f'Task {identifier} stopped')
return
# block for a moment
sleep(random())
One of the tasks will conditionally fail. Specifically, if the number of iterations is above 2 and if the task is number 5. If so, it will report a message and signal all tasks to cancel via the shared event.
...
# conditionally fail
if i > 2 and identifier == 5:
print('Something bad happened!')
# cancel all tasks
event.set()
return
Finally, if the task completes normally, it will report a message.
...
# report done
print(f'Task {identifier} done')
Tying this together, the complete task() function is listed below.
# task executed in a worker thread
def task(event, identifier):
# loop for a while
for i in range(4):
# check for stop
if event.is_set():
print(f'Task {identifier} stopped')
return
# block for a moment
sleep(random())
# conditionally fail
if i > 2 and identifier == 5:
print('Something bad happened!')
# cancel all tasks
event.set()
return
# report done
print(f'Task {identifier} done')
Next, the main thread will create the shared Event.
...
# create a shared event
shared_event = Event()
We can then create a ThreadPool.
The ThreadPool will be created using the context manager interface to ensure the pool is closed automatically for us once we are finished with it.
...
# create and configure the thread pool
with ThreadPool() as pool:
# ...
You can learn more about the context manager interface in the tutorial:
We will then issue 10 tasks to the ThreadPool, calling our custom task() function with the shared event and numbers 0 through 9. We will use the starmap_async() to issue the tasks asynchronously, which will return an AsyncResult object.
We use the starmap_async() method because it supports a target function that takes more than one argument, as opposed to the map_async() method that does not.
...
# prepare arguments for tasks
args = [(shared_event,i) for i in range(10)]
# issue tasks into the thread pool
result = pool.starmap_async(task, args)
You can learn more about issuing asynchronous tasks to the ThreadPool with the starmap_async() function in the tutorial:
Finally, the main thread will wait on the AsyncResult object for the issued tasks to complete, successfully or otherwise.
...
# wait for tasks to complete
result.wait()
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of canceling all tasks if one task fails with an exception
from random import random
from time import sleep
from threading import Event
from multiprocessing.pool import ThreadPool
# task executed in a worker thread
def task(event, identifier):
# loop for a while
for i in range(4):
# check for stop
if event.is_set():
print(f'Task {identifier} stopped')
return
# block for a moment
sleep(random())
# conditionally fail
if i > 2 and identifier == 5:
print('Something bad happened!')
# cancel all tasks
event.set()
return
# report done
print(f'Task {identifier} done')
# protect the entry point
if __name__ == '__main__':
# create a shared event
shared_event = Event()
# create and configure the thread pool
with ThreadPool() as pool:
# prepare arguments for tasks
args = [(shared_event,i) for i in range(10)]
# issue tasks into the thread pool
result = pool.starmap_async(task, args)
# wait for tasks to complete
result.wait()
# thread pool is closed automatically
Running the example first creates the shared event.
The main thread then creates the ThreadPool with the default configuration.
The main thread then issues ten tasks to the ThreadPool and then blocks until the tasks are done.
Tasks begin running in the ThreadPool. Each task loops, checking the status of the shared event each iteration, then blocking for a random fraction of a second. Some tasks finish and report a message.
One task conditionally fails. It reports a message, then signals all other tasks to cancel by setting the shared event.
The running tasks check the status of the event on their next iteration. They notice that the shared event is set and then return immediately, canceling their tasks.
All tasks are finished and the main thread continues on and closes the ThreadPool.
Note, that the specific tasks that finish and are canceled will differ each time the code is run given the use of random numbers.
Task 4 done
Task 7 done
Task 6 done
Task 2 done
Task 1 done
Something bad happened!
Task 8 stopped
Task 0 done
Task 9 stopped
Task 3 done
Takeaways
You now know how to cancel all tasks in the Python ThreadPool if one task fails.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.