ThreadPoolExecutor Thread That Runs Done Callback Functions

August 8, 2023 Python ThreadPoolExecutor

The worker thread that executes the task is typically the same thread that executes the done callback function for the task.

In this tutorial, you will discover which thread runs the done callback functions in the ThreadPoolExecutor.

Let's get started.

What Thread Runs Done Callback Functions

The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.

Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.

A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue a task
	future = tpe.submit(task)
	# the task result once the task is done
	result = future.result()

You can learn more about the ThreadPoolExecutor in the tutorial:

Done callback functions can be added to a Future object for a task issued to the ThreadPoolExecutor. This can be achieved by calling the add_done_callback() method on the Future object and specifying the name of a function to call when the task is completed.

For example:

...
# add a callback to a task
future.add_done_callback(my_callback)

You can learn more about adding done callbacks to tasks in the tutorial:

One question we might consider is what thread runs done callback functions.

This is important to know if we need to access a resource shared between threads from within the callback function. It will determine if any locks may be needed.

For example, is the done callback function executed in the main thread that manages the ThreadPoolExecutor?

Alternately, is the done callback function executed by the worker thread that executes the task itself, or perhaps another worker thread or helper thread within the ThreadPoolExecutor?

What thread executes the done callback function?

Thread That Runs The Done Callback Functions

The concurrent.futures API does not specify which thread executes the done callback function.

This means that the answer to which thread executes the done callback function must be determined empirically. It also means that the answer may change in the future if the internals of the ThreadPoolExecutor class change.

After some investigation of the code and testing, it has been determined that the worker thread that executes the task also executes all done callback functions for the task.

This appears to be true in most situations.

The only case where this does not hold is when the task is done and the done callback function is added. In this case, the calling thread will execute the callback function immediately.

Now that we know which thread executes the done callback functions, let's look at a worked example.

Example of Reporting the Thread That Runs the Done Callback Function

We can explore an example to discover which thread runs done callback functions for tasks executed in the ThreadPoolExecutor.

In this example, we will define a task that blocks for a moment and then reports the details of the thread that is executing it. We will add a done callback function to the task that reports the details of the thread that is executing it. We expect that the same thread that executes the task will also execute the done callback function.

Firstly, we can define a task that sleeps and then reports the details of the worker thread that is executing it.

The task() function below implements this.

# target function run in the thread pool
def task():
    # block for a moment
    sleep(1)
    # retrieve access to the worker thread
    thread = current_thread()
    # report the details of the worker thread
    print(f'Task: {thread}')

Next, we can define a callback function that reports the details of the thread that is executing it.

The callback function() below implements this.

# custom done callback function
def callback(future):
    # retrieve access to the current thread
    thread = current_thread()
    # report the details of the worker thread
    print(f'Callback: {thread}')

Next, we can create the ThreadPoolExecutor with the default number of worker threads, issue the task() function to the thread pool for execution, and add the done callback function to the Future object for the task.

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue the task to the thread pool
	future = tpe.submit(task)
	# add the done callback function
	future.add_done_callback(callback)
# shutdown and wait for the task to complete

Tying this together, the complete example is listed below.

# SuperFastPython.com
# report the thread that runs the done callback function
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from threading import current_thread

# target function run in the thread pool
def task():
    # block for a moment
    sleep(1)
    # retrieve access to the worker thread
    thread = current_thread()
    # report the details of the worker thread
    print(f'Task: {thread}')

# custom done callback function
def callback(future):
    # retrieve access to the current thread
    thread = current_thread()
    # report the details of the worker thread
    print(f'Callback: {thread}')

# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue the task to the thread pool
	future = tpe.submit(task)
	# add the done callback function
	future.add_done_callback(callback)
# shutdown and wait for the task to complete

Running the example first creates the ThreadPoolExecutor using the context manager interface.

Next, the task is issued to the thread pool and a Future object is returned. The main thread then adds the done callback to the Future and blocks, waiting for the thread pool to shut down.

The task executes and reports the details of the thread that executes it.

The task completes and the done callback is executed. It too reports the details of the thread that executes it.

In this case, we can see that both the task and the done callback function are both executed by the same worker thread in the ThreadPoolExecutor.

Task: <Thread(ThreadPoolExecutor-0_0, started 123145400930304)>
Callback: <Thread(ThreadPoolExecutor-0_0, started 123145400930304)>

Example of Many Tasks With a Done Callback Function

We can update the above example so that the ThreadPoolExecutor executes many tasks concurrently, all of which have a registered done callback function.

If we limit the number of workers in the ThreadPoolExecutor and have many more tasks than workers, it will cause tasks to be queued and may expose whether the ThreadPoolExecutor will always use the same worker thread for the task and done callback, or the next available worker thread.

We can achieve this by updating the above example.

Firstly, we can change the task function so that it returns the name of the worker thread that is executing it.

The updated task() function with this change is listed below.

# target function run in the thread pool
def task():
    # block for a moment
    sleep(1)
    # retrieve access to the worker thread
    thread = current_thread()
    # return the worker thread name
    return thread.name

Next, we can change the callback() function so that it retrieves the name of the thread that executed the task and compares it to the name of the thread that is executing it, then reports both details.

This will highlight whether a worker thread other than the worker thread that executes the task ever executes the done callback function.

The updated callback() function is listed below.

# custom done callback function
def callback(future):
    # get the task worker thread name
    name = future.result()
    # retrieve access to the current thread
    thread = current_thread()
    # check if different
    diff = name != thread.name
    # report the details of the worker thread
    print(f'task: {name}, callback {thread.name}, different: {diff}')

Finally, we will update the main thread so that a ThreadPoolExecutor with 4 workers is created, then 16 tasks are issued to the pool, ensuring tasks are queued and that the ThreadPoolExecutor is under pressure to execute tasks and done callbacks as fast as it is able.

...
# create a thread pool
with ThreadPoolExecutor(4) as tpe:
    # add many tasks
    for _ in range(16):
        # issue the task to the thread pool
        future = tpe.submit(task)
        # add the done callback function
        future.add_done_callback(callback)
# shutdown and wait for the task to complete

Tying this together, the complete example is listed below.

# SuperFastPython.com
# report of the worker and callback threads are different when running many tasks
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from threading import current_thread

# target function run in the thread pool
def task():
    # block for a moment
    sleep(1)
    # retrieve access to the worker thread
    thread = current_thread()
    # return the worker thread name
    return thread.name

# custom done callback function
def callback(future):
    # get the task worker thread name
    name = future.result()
    # retrieve access to the current thread
    thread = current_thread()
    # check if different
    diff = name != thread.name
    # report the details of the worker thread
    print(f'task: {name}, callback {thread.name}, different: {diff}')

# create a thread pool
with ThreadPoolExecutor(4) as tpe:
    # add many tasks
    for _ in range(16):
        # issue the task to the thread pool
        future = tpe.submit(task)
        # add the done callback function
        future.add_done_callback(callback)
# shutdown and wait for the task to complete

Running the example first creates the ThreadPoolExecutor with 4 worker threads.

Next, 16 tasks are issued to the thread pool and a done callback is added to each.

The main thread then blocks and waits for all tasks to complete.

Tasks are executed in the thread pool, retrieving their worker thread name and returning it.

The callback functions are executed, retrieving the worker thread name from their associated task, comparing it to the name of the thread that is executing the callback, and reporting it.

We can see that in all cases, the same thread is used to execute the task and callback, they never differ.

task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False
task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False
task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False
task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False
task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False
task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False
task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False
task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False
task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False
task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False
task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False
task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False
task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False
task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False
task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False
task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False

Example of Adding a Callback After Task is Done

A done callback function can be added to a task via the Future object after the task is already completed.

This is an interesting case, as the ThreadPoolExecutor may not know which worker thread executed the task.

As such, the done callback is very likely executed by a different thread.

We can update the first example where a single task is issued with a done callback and both the task and callback report their thread.

In this case, we will simply add a delay between issuing the task and adding the done callback in the main thread. The delay is long enough to ensure that the task is done before the callback is added.

...
# wait for the task to complete entirely
sleep(2)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# report the thread that runs the done callback function added after the task is done
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from threading import current_thread

# target function run in the thread pool
def task():
    # block for a moment
    sleep(1)
    # retrieve access to the worker thread
    thread = current_thread()
    # report the details of the worker thread
    print(f'Task: {thread}')

# custom done callback function
def callback(future):
    # retrieve access to the current thread
    thread = current_thread()
    # report the details of the worker thread
    print(f'Callback: {thread}')

# create a thread pool
with ThreadPoolExecutor() as tpe:
    # issue the task to the thread pool
   future = tpe.submit(task)
   # wait for the task to complete entirely
   sleep(2)
   # add the done callback function
   future.add_done_callback(callback)
# shutdown and wait for the task to complete

Running the example first creates the ThreadPoolExecutor.

Next, the task is issued to the thread pool and a Future object is returned.

The main thread then blocks for two seconds.

The task is executed and reports the details of the thread that executes it.

The main thread resumes and adds the done callback to the Future.

Because the task is already completed, the callback executes immediately.

In this case, we can see that the calling thread, the main thread, executes the done callback function.

Task: <Thread(ThreadPoolExecutor-0_0, started 123145377968128)>
Callback: <_MainThread(MainThread, started 4383403520)>

Takeaways

You now know which thread runs the done callback functions in the ThreadPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.