Last Updated on September 12, 2022
You can stop running tasks in a ThreadPoolExecutor using a threading.Event.
In this tutorial you will discover how you can update your tasks running in a ThreadPoolExecutor so that they stop running whenever you want.
Let’s get started.
cancel() Won’t Stop Running Tasks
The ThreadPoolExecutor in Python provides a thread pool that lets you run tasks concurrently.
You can add tasks to the pool by calling submit() with your function name, which will return a Future object.
You can call the cancel() function on the Future object to cancel the task before it has started running.
1 2 3 4 5 6 |
... # execute one task future = executor.submit(work) # cancel the task print('Cancel the task...') future.cancel() |
If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.
The example below demonstrates this by starting a task that takes ten seconds, then attempting to stop the task by calling cancel.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of calling cancel not stopping the task from time import sleep from concurrent.futures import ThreadPoolExecutor # mock target task function def work(): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # create a thread pool with ThreadPoolExecutor() as executor: # execute one task future = executor.submit(work) # wait a moment print('The task is running...') sleep(2) # try to stop by calling the cancel function print('Cancel the task...') future.cancel() # waiting for all tasks to complete print('Waiting...') print('All done.') |
Running the example starts the task as per normal.
After two seconds, we attempt to stop the task by calling the cancel() function on the Future object. This does not stop the task because it is already running.
We must wait for the task to complete before we can close the thread pool and continue on.
1 2 3 4 |
The task is running... Cancel the task... Waiting... All done. |
Next, let’s look at how we might update a target task function to check a thread-safe flag to indicate whether it should stop running.
Run loops using all CPUs, download your FREE book to learn how.
Stop Running Tasks Using threading.Event
Although the ThreadPoolExecutor does not provide a facility to stop running tasks, we can update our target task functions to stop running when a thread-safe flag is set.
This can be implemented using threading.Event.
You can learn more about the threading.Event here:
First, it requires that you first create a threading.Event to control when running tasks should stop.
1 2 3 |
... # create an event to shut down all running tasks event = Event() |
This event can then be passed to each target task function as an argument.
1 2 3 |
... # execute the task, passing the event future = executor.submit(work, event) |
Once the task is running, it can be stopped from the main thread by setting the flag on the Event.
1 2 3 |
... # stop running tasks via the event event.set() |
Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
1 2 3 4 |
... # check the status of the flag if event.is_set(): return |
If your target task function has some resources open, it may need to clean up before returning.
This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.
Now that we know how to stop a task running, let’s look at a worked example.
How to Stop Running Tasks in the ThreadPoolExecutor
We can update our previous example to check the status of the thread-safe flag and stop when the flag is set.
First, we must create the event that will be passed to all tasks executed in the thread pool.
1 2 3 |
... # create an event used to stop running tasks event = Event() |
We can then pass this event to our target function when we submit the task to the pool for execution.
1 2 3 |
... # execute one task future = executor.submit(work, event) |
Next, we can update our target task function to take the event as an argument and to check the status of the event each iteration via the is_set() function on the event.
If the event is set, the target task function can return immediately.
1 2 3 4 5 6 7 8 9 |
# mock target task function def work(event): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if the task should stop if event.is_set(): return |
It is a good practice to cancel the task in the thread pool, just in case it has not yet started executing.
1 2 3 |
... # cancel the task, just in case it is not yet running future.cancel() |
Finally, we can stop the running task by calling the set() function on the event any time.
1 2 3 4 |
... # stop the running task using the flag print('Stopping the task...') event.set() |
And that’s it.
Tying this together, the example below provides a template you can use for adding an event flag to your target task function to check for a stop condition to shutdown all currently running tasks.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of stopping a running task in a thread pool from time import sleep from threading import Event from concurrent.futures import ThreadPoolExecutor # mock target task function def work(event): # pretend read data for a long time for _ in range(10): # pretend to read some data sleep(1) # check if the task should stop if event.is_set(): return # create an event used to stop running tasks event = Event() # create a thread pool with ThreadPoolExecutor() as executor: # execute one task future = executor.submit(work, event) # wait a moment print('The task is running...') sleep(2) # cancel the task, just in case it is not yet running future.cancel() # stop the running task using the flag print('Stopping the task...') event.set() # waiting for all tasks to complete print('Waiting...') print('All done.') |
Running the example first creates a thread pool and schedules a task.
An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.
The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case it has not yet started executing.
We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.
1 2 3 4 |
The task is running... Stopping the task... Waiting... All done. |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to use a threading.Event to stop tasks executed by a ThreadPoolExecutor.
Do you have any questions about how to stop tasks running in a thread pool?
Ask your questions in the comments below and I will do my best to help.
Photo by Vidar Nordli-Mathisen on Unsplash
Do you have any questions?