Last Updated on October 29, 2022
You can cancel all tasks in the ThreadPool if one task fails using a shared Event object.
In this tutorial, you will discover how to cancel all tasks in the Python ThreadPool if one task fails.
Let’s get started.
Need To Stop All Tasks on Exception
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may need to cancel or stop all tasks if one task fails with an exception.
The ThreadPool does not provide this capability.
How can we cancel all issued tasks if one task fails with an exception?
Run loops using all CPUs, download your FREE book to learn how.
How to Cancel All Tasks if One Task Fails
We can cancel all tasks if one task fails using a shared Event.
This approach assumes a few things, such as:
- Tasks know when they have failed, e.g. catch an exception or check state.
- Tasks are able to check the status of a shared event periodically, e.g. each iteration of a loop.
Firstly, a shared event must be prepared in the main thread.
1 2 3 |
... # create a shared event shared_event = threading.Event() |
The Event instance can then be shared with each worker thread.
It could be passed via an argument to each task.
For example:
1 2 3 |
# task executed in a worker thread def task(event): Â Â Â Â # ... |
Tasks can then interact with the shared Event.
A task that fails can set the event via the set() function.
For example:
1 2 3 |
... # cancel all tasks event.set() |
Then all tasks that can be canceled can check if the event has been set periodically, such as within each iteration of a loop.
For example:
1 2 3 4 |
... # check for stop if event.is_set(): Â Â Â Â return |
You can learn more about using a shared Event in the tutorial:
Now that we know how to cancel all tasks if one task fails in the ThreadPool, we can look at a worked example.
Example of Canceling All Tasks If One Task Fails
We can explore how to cancel all tasks in the ThreadPool if one task fails.
In this example, we will define a custom task function that loops a number of times. One of the issued tasks will fail, report a message and signal all other tasks to stop. Each task will check if they should cancel each iteration and once canceled, they stop as soon as possible.
Firstly, we can define a target task function that will take the shared event and a unique integer argument to identify the task.
1 2 3 |
# task executed in a worker thread def task(event, identifier): # ... |
The task will loop four times. Each iteration, it will first check the status of the shared event to see if the task should be canceled or not. If so, it will report a message and return from the task function immediately. It then blocks for a random fraction of a second to simulate computational effort.
1 2 3 4 5 6 7 8 9 |
... # loop for a while for i in range(4):     # check for stop     if event.is_set():         print(f'Task {identifier} stopped')         return     # block for a moment     sleep(random()) |
One of the tasks will conditionally fail. Specifically, if the number of iterations is above 2 and if the task is number 5. If so, it will report a message and signal all tasks to cancel via the shared event.
1 2 3 4 5 6 7 |
... # conditionally fail if i > 2 and identifier == 5:     print('Something bad happened!')     # cancel all tasks     event.set()     return |
Finally, if the task completes normally, it will report a message.
1 2 3 |
... # report done print(f'Task {identifier} done') |
Tying this together, the complete task() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# task executed in a worker thread def task(event, identifier):     # loop for a while     for i in range(4):         # check for stop         if event.is_set():             print(f'Task {identifier} stopped')             return         # block for a moment         sleep(random())         # conditionally fail         if i > 2 and identifier == 5:             print('Something bad happened!')             # cancel all tasks             event.set()             return     # report done     print(f'Task {identifier} done') |
Next, the main thread will create the shared Event.
1 2 3 |
... # create a shared event shared_event = Event() |
We can then create a ThreadPool.
The ThreadPool will be created using the context manager interface to ensure the pool is closed automatically for us once we are finished with it.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We will then issue 10 tasks to the ThreadPool, calling our custom task() function with the shared event and numbers 0 through 9. We will use the starmap_async() to issue the tasks asynchronously, which will return an AsyncResult object.
We use the starmap_async() method because it supports a target function that takes more than one argument, as opposed to the map_async() method that does not.
1 2 3 4 5 |
... # prepare arguments for tasks args = [(shared_event,i) for i in range(10)] # issue tasks into the thread pool result = pool.starmap_async(task, args) |
You can learn more about issuing asynchronous tasks to the ThreadPool with the starmap_async() function in the tutorial:
Finally, the main thread will wait on the AsyncResult object for the issued tasks to complete, successfully or otherwise.
1 2 3 |
... # wait for tasks to complete result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of canceling all tasks if one task fails with an exception from random import random from time import sleep from threading import Event from multiprocessing.pool import ThreadPool  # task executed in a worker thread def task(event, identifier):     # loop for a while     for i in range(4):         # check for stop         if event.is_set():             print(f'Task {identifier} stopped')             return         # block for a moment         sleep(random())         # conditionally fail         if i > 2 and identifier == 5:             print('Something bad happened!')             # cancel all tasks             event.set()             return     # report done     print(f'Task {identifier} done')  # protect the entry point if __name__ == '__main__':     # create a shared event     shared_event = Event()     # create and configure the thread pool     with ThreadPool() as pool:         # prepare arguments for tasks         args = [(shared_event,i) for i in range(10)]         # issue tasks into the thread pool         result = pool.starmap_async(task, args)         # wait for tasks to complete         result.wait()     # thread pool is closed automatically |
Running the example first creates the shared event.
The main thread then creates the ThreadPool with the default configuration.
The main thread then issues ten tasks to the ThreadPool and then blocks until the tasks are done.
Tasks begin running in the ThreadPool. Each task loops, checking the status of the shared event each iteration, then blocking for a random fraction of a second. Some tasks finish and report a message.
One task conditionally fails. It reports a message, then signals all other tasks to cancel by setting the shared event.
The running tasks check the status of the event on their next iteration. They notice that the shared event is set and then return immediately, canceling their tasks.
All tasks are finished and the main thread continues on and closes the ThreadPool.
Note, that the specific tasks that finish and are canceled will differ each time the code is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 |
Task 4 done Task 7 done Task 6 done Task 2 done Task 1 done Something bad happened! Task 8 stopped Task 0 done Task 9 stopped Task 3 done |
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to cancel all tasks in the Python ThreadPool if one task fails.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by qiwei yang on Unsplash
Do you have any questions?