How To Stop Running Tasks in the ThreadPoolExecutor in Python

November 12, 2021 Python ThreadPoolExecutor

You can stop running tasks in a ThreadPoolExecutor using a threading.Event.

In this tutorial you will discover how you can update your tasks running in a ThreadPoolExecutor so that they stop running whenever you want.

Let's get started.

cancel() Won't Stop Running Tasks

The ThreadPoolExecutor in Python provides a thread pool that lets you run tasks concurrently.

You can add tasks to the pool by calling submit() with your function name, which will return a Future object.

You can call the cancel() function on the Future object to cancel the task before it has started running.

...
# execute one task
future = executor.submit(work)
# cancel the task
print('Cancel the task...')
future.cancel()

If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.

The example below demonstrates this by starting a task that takes ten seconds, then attempting to stop the task by calling cancel.

# SuperFastPython.com
# example of calling cancel not stopping the task
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# mock target task function
def work():
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute one task
    future = executor.submit(work)
    # wait a moment
    print('The task is running...')
    sleep(2)
    # try to stop by calling the cancel function
    print('Cancel the task...')
    future.cancel()
    # waiting for all tasks to complete
    print('Waiting...')
print('All done.')

Running the example starts the task as per normal.

After two seconds, we attempt to stop the task by calling the cancel() function on the Future object. This does not stop the task because it is already running.

We must wait for the task to complete before we can close the thread pool and continue on.

The task is running...
Cancel the task...
Waiting...
All done.

Next, let's look at how we might update a target task function to check a thread-safe flag to indicate whether it should stop running.

Stop Running Tasks Using threading.Event

Although the ThreadPoolExecutor does not provide a facility to stop running tasks, we can update our target task functions to stop running when a thread-safe flag is set.

This can be implemented using threading.Event.

You can learn more about the threading.Event here:

First, it requires that you first create a threading.Event to control when running tasks should stop.

...
# create an event to shut down all running tasks
event = Event()

This event can then be passed to each target task function as an argument.

...
# execute the task, passing the event
future = executor.submit(work, event)

Once the task is running, it can be stopped from the main thread by setting the flag on the Event.

...
# stop running tasks via the event
event.set()

Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.

If the event is set, the task can then stop running, perhaps by returning immediately.

...
# check the status of the flag
if event.is_set():
    return

If your target task function has some resources open, it may need to clean up before returning.

This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.

For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.

Now that we know how to stop a task running, let's look at a worked example.

How to Stop Running Tasks in the ThreadPoolExecutor

We can update our previous example to check the status of the thread-safe flag and stop when the flag is set.

First, we must create the event that will be passed to all tasks executed in the thread pool.

...
# create an event used to stop running tasks
event = Event()

We can then pass this event to our target function when we submit the task to the pool for execution.

...
# execute one task
future = executor.submit(work, event)

Next, we can update our target task function to take the event as an argument and to check the status of the event each iteration via the is_set() function on the event.

If the event is set, the target task function can return immediately.

# mock target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

It is a good practice to cancel the task in the thread pool, just in case it has not yet started executing.

...
# cancel the task, just in case it is not yet running
future.cancel()

Finally, we can stop the running task by calling the set() function on the event any time.

...
# stop the running task using the flag
print('Stopping the task...')
event.set()

And that's it.

Tying this together, the example below provides a template you can use for adding an event flag to your target task function to check for a stop condition to shutdown all currently running tasks.

# SuperFastPython.com
# example of stopping a running task in a thread pool
from time import sleep
from threading import Event
from concurrent.futures import ThreadPoolExecutor

# mock target task function
def work(event):
    # pretend read data for a long time
    for _ in range(10):
        # pretend to read some data
        sleep(1)
        # check if the task should stop
        if event.is_set():
            return

# create an event used to stop running tasks
event = Event()
# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute one task
    future = executor.submit(work, event)
    # wait a moment
    print('The task is running...')
    sleep(2)
    # cancel the task, just in case it is not yet running
    future.cancel()
    # stop the running task using the flag
    print('Stopping the task...')
    event.set()
    # waiting for all tasks to complete
    print('Waiting...')
print('All done.')

Running the example first creates a thread pool and schedules a task.

An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.

The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case it has not yet started executing.

We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.

The task is running...
Stopping the task...
Waiting...
All done.

Takeaways

You now know how to use a threading.Event to stop tasks executed by a ThreadPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.