Last Updated on October 29, 2022
You can safely stop tasks in the ThreadPool using a shared Event object.
In this tutorial, you will discover how to safely stop tasks in the ThreadPool in Python.
Let’s get started.
Need to Safely Stop All Tasks
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we often need to safely stop all running tasks.
This may be for many reasons, such as:
- A user requests to close the application.
- An error or fault that negates the tasks.
- A problem accessing a required resource.
How can we safely do all tasks in the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How To Safely Stop All Tasks in the ThreadPool
The ThreadPool does not provide a mechanism to safely stop all currently running tasks.
Instead, we can develop a mechanism to safely stop all running tasks in a ThreadPool using an Event object.
Firstly, an Event object must be created and shared among all running tasks.
For example:
1 2 3 |
... # create a shared event event = Event() |
Recall that an event provides a thread-safe boolean variable.
It is created in the False state and can be checked via the is_set() function and made True via the set() function.
You can learn more about how to use an Event object in the tutorial:
We can create a shared Event object in the main thread, then pass it as an argument to any task that needs to be stopped.
For example:
1 2 3 |
# function executed in worker threads def task(event, ...): # ... |
The custom function executing the task can check the status of the Event object periodically, such as each iteration of a loop.
If the Event set, the target task function can then choose to stop, closing any resources if necessary.
1 2 3 4 5 6 7 |
# function executed in worker threads def task(event, ...): # ... while True: # ... if event.is_set(): return |
There are some limitations to this approach.
It requires that you have complete control over the target task function or functions executed in the ThreadPool.
This control is required so that the functions can be changed to take the event as an argument and then check the status of the event periodically.
- The target task function must take the Event as an argument.
- The target task function must check the status of the Event periodically.
The first limitation can be removed by inheriting the shared Event from the main thread as a global variable. Nevertheless, all tasks that you want to stop will need to check the status of the event all the time.
This can be a problem if tasks are performing blocking operations such as reading and/or writing from/to files or sockets.
Now that we know how to safely stop tasks in the ThreadPool, let’s look at a worked example.
Example Safely Stop All Tasks in the ThreadPool
We can explore how to stop tasks in the ThreadPool safely.
In this example, we will define a task function that takes a shared event as an argument that loops a number of times and sleeps each iteration. It will check the status of the shared event each iteration and stop the task if the event is set. The main thread will issue a number of tasks to the ThreadPool, wait a moment then request that all issued tasks safely stop, then wait for the tasks to stop.
Importantly, the ThreadPool is left in a condition where it can be used again to issue more tasks.
Firstly, we can define a custom function to execute tasks in the ThreadPool.
The task function will take a unique integer identifier and a shared event object as arguments. It will report a message that it is starting, then loop 10 times. Each loop it will sleep for one second to simulate computational effort. It will then check the status of the event, and if set it will report a message and break the loop. Finally, a message is reported right before the task is stopped.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# task executed in a worker thread def task(identifier, event): print(f'Task {identifier} running') # run for a long time for i in range(10): # block for a moment sleep(1) # check if the task should stop if event.is_set(): print(f'Task {identifier} stopping...') # stop the task break # report all done print(f'Task {identifier} Done') |
Next, in the main thread we can create the shared event.
1 2 3 |
... # create the shared event event = Event() |
Next, we will create a ThreadPool with a default configuration.
We will use the context manager interface to ensure that the pool is closed automatically once we are finished using it.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
We will then issue 4 tasks to the ThreadPool.
In this case, we will use the starmap_async() function that supports multiple arguments and allows tasks to be issued asynchronously, allowing the main thread to continue on.
Firstly, we will create a list of arguments, with one tuple per task containing a unique integer and the shared event object. We then issue the tasks and receive an AsyncResult as a handle on the issued tasks.
1 2 3 4 5 |
... # prepare arguments items = [(i,event) for i in range(4)] # issue tasks asynchronously result = pool.starmap_async(task, items) |
You can learn more about issuing tasks using the starmap_async() function in the tutorial:
Next, the main thread will block for a moment to allow the tasks to get started and run for a while.
1 2 3 |
... # wait a moment sleep(2) |
The main thread will then request that all issued tasks safely stop by setting the shared event.
1 2 3 4 |
... # safely stop the issued tasks print('Safely stopping all tasks') event.set() |
The main thread will then wait for all tasks to stop.
There are many ways to do this, although in this case will simply wait on the single AsyncResult object returned as a handle on the single batch of tasks issued to the pool.
1 2 3 4 |
... # wait for all tasks to stop result.wait() print('All stopped') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# SuperFastPython.com # example of safely stopping all tasks in the thread pool from time import sleep from threading import Event from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier, event): print(f'Task {identifier} running') # run for a long time for i in range(10): # block for a moment sleep(1) # check if the task should stop if event.is_set(): print(f'Task {identifier} stopping...') # stop the task break # report all done print(f'Task {identifier} Done') # protect the entry point if __name__ == '__main__': # create the shared event event = Event() # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i,event) for i in range(4)] # issue tasks asynchronously result = pool.starmap_async(task, items) # wait a moment sleep(2) # safely stop the issued tasks print('Safely stopping all tasks') event.set() # wait for all tasks to stop result.wait() print('All stopped') |
Running the example first creates the shared Event object.
The ThreadPool is then created using the default configuration.
The arguments for the tasks are prepared, then the four tasks are issued asynchronously to the ThreadPool.
The main thread then blocks for a moment.
Each task starts, reporting a message and then starting its main loop.
The main thread wakes up and sets the event, requesting all issued tasks to stop. It then waits on the AsyncResult for the issued tasks to stop.
Each task checks the status of the event in each iteration of its main loop. They notice that the event is set, break their main loop, report a final message then return, stopping the task safely.
All tasks stop, allowing the main thread to continue on, ending the application.
This highlights how we can safely stop all tasks in the ThreadPool in a controlled manner.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Task 0 running Task 1 running Task 2 running Task 3 running Safely stopping all tasks Task 0 stopping... Task 2 stopping... Task 2 DoneTask 0 Done Task 1 stopping... Task 1 Done Task 3 stopping... Task 3 Done All stopped |
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to safely stop tasks in the ThreadPool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Cibi Chakravarthi on Unsplash
Do you have any questions?