The worker thread that executes the task is typically the same thread that executes the done callback function for the task.
In this tutorial, you will discover which thread runs the done callback functions in the ThreadPoolExecutor.
Let’s get started.
What Thread Runs Done Callback Functions
The ThreadPoolExecutor provides a pool of reusable worker threads using the executor design pattern.
Tasks executed in new threads are executed concurrently in Python, making the ThreadPoolExecutor appropriate for I/O-bound tasks.
A ThreadPoolExecutor can be created directly or via the context manager interface and tasks can be issued one-by-one via the submit() method or in batch via the map() method.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue a task future = tpe.submit(task) # the task result once the task is done result = future.result() |
You can learn more about the ThreadPoolExecutor in the tutorial:
Done callback functions can be added to a Future object for a task issued to the ThreadPoolExecutor. This can be achieved by calling the add_done_callback() method on the Future object and specifying the name of a function to call when the task is completed.
For example:
1 2 3 |
... # add a callback to a task future.add_done_callback(my_callback) |
You can learn more about adding done callbacks to tasks in the tutorial:
One question we might consider is what thread runs done callback functions.
This is important to know if we need to access a resource shared between threads from within the callback function. It will determine if any locks may be needed.
For example, is the done callback function executed in the main thread that manages the ThreadPoolExecutor?
Alternately, is the done callback function executed by the worker thread that executes the task itself, or perhaps another worker thread or helper thread within the ThreadPoolExecutor?
What thread executes the done callback function?
Run loops using all CPUs, download your FREE book to learn how.
Thread That Runs The Done Callback Functions
The concurrent.futures API does not specify which thread executes the done callback function.
This means that the answer to which thread executes the done callback function must be determined empirically. It also means that the answer may change in the future if the internals of the ThreadPoolExecutor class change.
After some investigation of the code and testing, it has been determined that the worker thread that executes the task also executes all done callback functions for the task.
- The worker thread that runs the task runs the done callback function.
This appears to be true in most situations.
The only case where this does not hold is when the task is done and the done callback function is added. In this case, the calling thread will execute the callback function immediately.
Now that we know which thread executes the done callback functions, let’s look at a worked example.
Example of Reporting the Thread That Runs the Done Callback Function
We can explore an example to discover which thread runs done callback functions for tasks executed in the ThreadPoolExecutor.
In this example, we will define a task that blocks for a moment and then reports the details of the thread that is executing it. We will add a done callback function to the task that reports the details of the thread that is executing it. We expect that the same thread that executes the task will also execute the done callback function.
Firstly, we can define a task that sleeps and then reports the details of the worker thread that is executing it.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# target function run in the thread pool def task(): # block for a moment sleep(1) # retrieve access to the worker thread thread = current_thread() # report the details of the worker thread print(f'Task: {thread}') |
Next, we can define a callback function that reports the details of the thread that is executing it.
The callback function() below implements this.
1 2 3 4 5 6 |
# custom done callback function def callback(future): # retrieve access to the current thread thread = current_thread() # report the details of the worker thread print(f'Callback: {thread}') |
Next, we can create the ThreadPoolExecutor with the default number of worker threads, issue the task() function to the thread pool for execution, and add the done callback function to the Future object for the task.
1 2 3 4 5 6 7 8 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue the task to the thread pool future = tpe.submit(task) # add the done callback function future.add_done_callback(callback) # shutdown and wait for the task to complete |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # report the thread that runs the done callback function from concurrent.futures import ThreadPoolExecutor from time import sleep from threading import current_thread # target function run in the thread pool def task(): # block for a moment sleep(1) # retrieve access to the worker thread thread = current_thread() # report the details of the worker thread print(f'Task: {thread}') # custom done callback function def callback(future): # retrieve access to the current thread thread = current_thread() # report the details of the worker thread print(f'Callback: {thread}') # create a thread pool with ThreadPoolExecutor() as tpe: # issue the task to the thread pool future = tpe.submit(task) # add the done callback function future.add_done_callback(callback) # shutdown and wait for the task to complete |
Running the example first creates the ThreadPoolExecutor using the context manager interface.
Next, the task is issued to the thread pool and a Future object is returned. The main thread then adds the done callback to the Future and blocks, waiting for the thread pool to shut down.
The task executes and reports the details of the thread that executes it.
The task completes and the done callback is executed. It too reports the details of the thread that executes it.
In this case, we can see that both the task and the done callback function are both executed by the same worker thread in the ThreadPoolExecutor.
1 2 |
Task: <Thread(ThreadPoolExecutor-0_0, started 123145400930304)> Callback: <Thread(ThreadPoolExecutor-0_0, started 123145400930304)> |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Many Tasks With a Done Callback Function
We can update the above example so that the ThreadPoolExecutor executes many tasks concurrently, all of which have a registered done callback function.
If we limit the number of workers in the ThreadPoolExecutor and have many more tasks than workers, it will cause tasks to be queued and may expose whether the ThreadPoolExecutor will always use the same worker thread for the task and done callback, or the next available worker thread.
We can achieve this by updating the above example.
Firstly, we can change the task function so that it returns the name of the worker thread that is executing it.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# target function run in the thread pool def task(): # block for a moment sleep(1) # retrieve access to the worker thread thread = current_thread() # return the worker thread name return thread.name |
Next, we can change the callback() function so that it retrieves the name of the thread that executed the task and compares it to the name of the thread that is executing it, then reports both details.
This will highlight whether a worker thread other than the worker thread that executes the task ever executes the done callback function.
The updated callback() function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# custom done callback function def callback(future): # get the task worker thread name name = future.result() # retrieve access to the current thread thread = current_thread() # check if different diff = name != thread.name # report the details of the worker thread print(f'task: {name}, callback {thread.name}, different: {diff}') |
Finally, we will update the main thread so that a ThreadPoolExecutor with 4 workers is created, then 16 tasks are issued to the pool, ensuring tasks are queued and that the ThreadPoolExecutor is under pressure to execute tasks and done callbacks as fast as it is able.
1 2 3 4 5 6 7 8 9 10 |
... # create a thread pool with ThreadPoolExecutor(4) as tpe: # add many tasks for _ in range(16): # issue the task to the thread pool future = tpe.submit(task) # add the done callback function future.add_done_callback(callback) # shutdown and wait for the task to complete |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # report of the worker and callback threads are different when running many tasks from concurrent.futures import ThreadPoolExecutor from time import sleep from threading import current_thread # target function run in the thread pool def task(): # block for a moment sleep(1) # retrieve access to the worker thread thread = current_thread() # return the worker thread name return thread.name # custom done callback function def callback(future): # get the task worker thread name name = future.result() # retrieve access to the current thread thread = current_thread() # check if different diff = name != thread.name # report the details of the worker thread print(f'task: {name}, callback {thread.name}, different: {diff}') # create a thread pool with ThreadPoolExecutor(4) as tpe: # add many tasks for _ in range(16): # issue the task to the thread pool future = tpe.submit(task) # add the done callback function future.add_done_callback(callback) # shutdown and wait for the task to complete |
Running the example first creates the ThreadPoolExecutor with 4 worker threads.
Next, 16 tasks are issued to the thread pool and a done callback is added to each.
The main thread then blocks and waits for all tasks to complete.
Tasks are executed in the thread pool, retrieving their worker thread name and returning it.
The callback functions are executed, retrieving the worker thread name from their associated task, comparing it to the name of the thread that is executing the callback, and reporting it.
We can see that in all cases, the same thread is used to execute the task and callback, they never differ.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False task: ThreadPoolExecutor-0_0, callback ThreadPoolExecutor-0_0, different: False task: ThreadPoolExecutor-0_2, callback ThreadPoolExecutor-0_2, different: False task: ThreadPoolExecutor-0_3, callback ThreadPoolExecutor-0_3, different: False task: ThreadPoolExecutor-0_1, callback ThreadPoolExecutor-0_1, different: False |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Adding a Callback After Task is Done
A done callback function can be added to a task via the Future object after the task is already completed.
This is an interesting case, as the ThreadPoolExecutor may not know which worker thread executed the task.
As such, the done callback is very likely executed by a different thread.
We can update the first example where a single task is issued with a done callback and both the task and callback report their thread.
In this case, we will simply add a delay between issuing the task and adding the done callback in the main thread. The delay is long enough to ensure that the task is done before the callback is added.
1 2 3 |
... # wait for the task to complete entirely sleep(2) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # report the thread that runs the done callback function added after the task is done from concurrent.futures import ThreadPoolExecutor from time import sleep from threading import current_thread # target function run in the thread pool def task(): # block for a moment sleep(1) # retrieve access to the worker thread thread = current_thread() # report the details of the worker thread print(f'Task: {thread}') # custom done callback function def callback(future): # retrieve access to the current thread thread = current_thread() # report the details of the worker thread print(f'Callback: {thread}') # create a thread pool with ThreadPoolExecutor() as tpe: # issue the task to the thread pool future = tpe.submit(task) # wait for the task to complete entirely sleep(2) # add the done callback function future.add_done_callback(callback) # shutdown and wait for the task to complete |
Running the example first creates the ThreadPoolExecutor.
Next, the task is issued to the thread pool and a Future object is returned.
The main thread then blocks for two seconds.
The task is executed and reports the details of the thread that executes it.
The main thread resumes and adds the done callback to the Future.
Because the task is already completed, the callback executes immediately.
In this case, we can see that the calling thread, the main thread, executes the done callback function.
1 2 |
Task: <Thread(ThreadPoolExecutor-0_0, started 123145377968128)> Callback: <_MainThread(MainThread, started 4383403520)> |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know which thread runs the done callback functions in the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Rizki Yulian on Unsplash
Do you have any questions?