How to Add a Callbacks to the ThreadPoolExecutor in Python

November 7, 2021 Python ThreadPoolExecutor

You can add a callback to a task in the ThreadPoolExecutor by calling the add_done_callback() function on the task's Future object.

In this tutorial, you will discover how to add callbacks for tasks submitted to a Python thread pool.

Let's get started.

The Need to Call a Function for Each Task as It Is Completed

The ThreadPoolExecutor class in Python provides a pool of reusable threads for executing ad hoc tasks.

You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.

Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.

After submitting tasks to the thread pool, you may want to call a function for each task as it is completed.

This may be for many reasons, such as:

How can you automatically call a function for each task as it is completed?

Call add_done_callback() to Add a Callback for a Task

You can add a callback to a task via the Future object for the task.

Recall that you will get a Future object for each task submitted to the ThreadPoolExecutor via a call to the submit() function.

Once submitted, you can call the add_done_callback() to add a callback function to your task.

...
# add a callback to a task
future.add_done_callback(my_callback)

Your callback function must take one argument, which is the Future object for the task that is completed.

# example callback function
def my_callback(future)
	# do something...

You can add multiple callback functions to a task and they will be called in the order that they were added.

...
# add a callback to a task
future.add_done_callback(my_callback)
# add another callback to the task
future.add_done_callback(my_other_callback)

Callback functions will be called when:

If the task is already completed when the callback is added, the callback function will be executed immediately.

If a callback function raises an exception, it will not prevent any other registered callbacks from being called and executed.

You cannot add a callback function to tasks added to the thread pool via a call to map().

Now that we know how to add callback functions to tasks, let's look at a worked example.

How to Add a Callback to Tasks in the ThreadPoolExecutor

Let's look at a worked example of adding a callback to a task in the Python thread pool.

Task Callback Example

Firstly, let's define a mock task that will sleep for a moment and report when it is completed.

# mock task that will sleep for a moment
def work():
    sleep(1)
    print('The task is done.')

Next, let's define a custom callback function to be called once the task has finished executing.

Recall that the custom callback function must take the future object as an argument.

Forgetting the Future object in the signature of your function will result in an error when the callback is called once the task is done.

# callback function to call when a task is completed
def custom_callback(future):
    print('The custom callback was called.')

Next, we can create a thread pool using the context manager and call submit() to put the task into the thread pool for execution.

This will return a Future object, which we will require in a moment to add our callback function.

...
# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute the task
    future = executor.submit(work)

Next, we can register the custom callback function with the task by calling the add_done_callback() function on the Future object and passing it the name of our custom callback function.

...
# add the custom callback
future.add_done_callback(custom_callback)

That's it.

Once the task is complete, we expect our custom callback function to be called.

Tying this together, the complete example of adding a custom callback to a task in a ThreadPoolExecutor is listed below.

# SuperFastPython.com
# example of adding a callback to a task in the thread pool
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# callback function to call when a task is completed
def custom_callback(future):
    print('The custom callback was called.')

# mock task that will sleep for a moment
def work():
    sleep(1)
    print('The task is done.')

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute the task
    future = executor.submit(work)
    # add the custom callback
    future.add_done_callback(custom_callback)
    # wait for all tasks to complete...

Running the example first creates the thread pool and submits the task.

The custom callback is then added to the task via its Future object.

The task completes then reports a message, then the callback is called, reporting a second message.

The task is done.
The custom callback was called.

Callbacks Triggered on Task Exception

The callback is still called, even if the task raises an exception.

We can demonstrate this by updating the task to raise an Exception instance, for example:

# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('Something bad')
    print('The task is done.')

Tying this together, the complete example of adding a callback for a task known to raise an exception is listed below.

# SuperFastPython.com
# example of adding a callback to a task in the thread pool
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# callback function to call when a task is completed
def custom_callback(future):
    print('The custom callback was called.')

# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('Something bad')
    print('The task is done.')

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute the task
    future = executor.submit(work)
    # add the custom callback
    future.add_done_callback(custom_callback)
    # wait for all tasks to complete...

Running the example, the thread pool is created and the task is submitted as per normal.

The task runs then raises an exception before completion.

The task is marked done internally, and the callback function is called.

The custom callback was called.

Takeaways

You now know how to add a callback to tasks in the ThreadPoolExecutor.