Last Updated on September 12, 2022
You can show progress of tasks in the ThreadPoolExecutor by using a callback function.
In this tutorial, you will discover how to show progress of tasks in a Python thread pool.
Let’s get started.
Need to Show Progress of Tasks in the ThreadPoolExecutor
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You may need to show progress of all or a subset of tasks submitted to the thread pool with the submit() function.
This may be for many reasons, such as reporting progress to the user or internally keeping track of how much time or work remains to be completed.
How can we show progress of completed tasks in the ThreadPoolExecutor in a standardized way?
Run loops using all CPUs, download your FREE book to learn how.
Use add_done_callback() to Show Progress
You can add a callback function to provide a standard way to track progress for completed tasks.
This can be achieved by calling the add_done_callback() function on the Future object for each task and specifying a custom function to track or report progress.
1 2 3 |
... # add a callback function to a task future.add_done_callback(custom_callback) |
Recall that we get a Future object when calling the submit() function on the ThreadPoolExecutor when submitting a task.
The callback function must have a single argument, which is the Future object on which it was called.
1 2 3 |
# task callback function def custom_callback(future): # do something... |
Using a callback is not the only way to show progress. For example, you can enumerate tasks as they are completed in the main thread via the as_completed() function.
1 2 3 4 |
... # report progress as tasks are completed for future in as_completed(futures): print('Another task has completed!') |
The problem with this approach is that it requires that you mix both result processing and task progress indication code together, which may be less clean than using a callback function.
The callback function will only be called when the task is completed, e.g. “done”.
A task may be completed in one of three ways:
- The task finishes successfully.
- The task is cancelled.
- The task raises an exception that is not handled.
As such, you may want to check the status of the task to see if it was cancelled or if an exception was raised in the callback function before reporting on the progress.
1 2 3 4 5 6 7 8 9 |
# task callback function def custom_callback(future): # check if task was cancelled if future.cancelled(): # the task was cancelled elseif future.exception() # the task raised an exception else: # the task finished successfully |
There are many ways to report progress in the callback function.
For example, you can print a character, one for each task that is completed.
1 2 3 |
... # show progress for one task print('.', end='', flush=True) |
Now that we know how to show the progress of tasks in a standard way, let’s look at a worked example.
Example of Showing Progress of Tasks
Let’s look at how we might show the progress of tasks completed in the ThreadPoolExecutor.
First, let’s define a mock task that sleeps for a fraction of a second.
1 2 3 |
# mock test that works for moment def task(name): sleep(random()) |
Next, we can define our callback function that will take a Future object and report progress by printing one character for each task that completes.
This is a generic and scalable way of showing progress for tasks, although it does not provide an indication of how many tasks remain to be executed.
1 2 3 |
# simple progress indicator callback function def progress_indicator(future): print('.', end='', flush=True) |
Next, we can create a thread pool with two threads and submit many tasks.
1 2 3 4 5 |
... # start the thread pool with ThreadPoolExecutor(2) as executor: # send in the tasks futures = [executor.submit(task, i) for i in range(20)] |
We can then register the callback with each task, which will be executed after each task has completed.
1 2 3 4 |
... # register the progress indicator callback for future in futures: future.add_done_callback(progress_indicator) |
That’s it.
Tying this together, the complete example of showing a progress indicator of completed tasks is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of a simple progress indicator for tasks from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait # simple progress indicator callback function def progress_indicator(future): print('.', end='', flush=True) # mock test that works for moment def task(name): sleep(random()) # start the thread pool with ThreadPoolExecutor(2) as executor: # send in the tasks futures = [executor.submit(task, i) for i in range(20)] # register the progress indicator callback for future in futures: future.add_done_callback(progress_indicator) # wait for all tasks to complete print('\nDone!') |
Running the example creates the thread pool and submits twenty tasks for execution.
The tasks complete one-by-one, reporting the progress of completed tasks with printed dots.
Once all 20 tasks have completed with all 20 dots printed, we can carry on with our program.
1 2 |
.................... Done! |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
How to Count Tasks Completed
We may want to report the total number of tasks that have completed and/or the total tasks that remain to be executed.
This requires a counter of the total number of tasks completed so far.
Multiple threads may try and update or report values from the counter at the same time, which may make the counter inconsistent or report incorrect values.
1 2 3 4 5 |
... # update the counter tasks_completed += 1 # report progress print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.') |
Reading and writing the counter must be made thread-safe, e.g. an operation that only one thread can perform at a time.
This can be achieved using a threading.Lock class, which must be acquired before the counter can be updated or reported.
Once the lock is held by one thread, any other thread must wait to acquire the lock before it can update the counter. This waiting is performed automatically. As soon as the lock is released, another thread may acquire the lock and update the counter.
1 2 3 4 5 6 7 8 9 |
... # acquire the lock lock.acquire() # update the counter tasks_completed += 1 # report progress print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.') # release the lock lock.release() |
The lock can be acquired using a context manager, ensuring it is released automatically once we are finished with it.
1 2 3 4 5 6 7 |
... # obtain the lock with lock: # update the counter tasks_completed += 1 # report progress print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.') |
We cannot pass objects to the callback function or store data in the Future object for the task, so this approach would require the use of global variables or to occur within a thread that keeps track of tasks as they are completed via the as_completed() function.
Let’s use global variables for the lock, the counter, and the total number of tasks and access them from the callback function called by the ThreadPoolExecutor as each task is completed.
The updated callback function to report the total complete and total remaining tasks in the thread pool is listed below.
1 2 3 4 5 6 7 8 9 |
# simple progress indicator callback function def progress_indicator(future): global lock, tasks_total, tasks_completed # obtain the lock with lock: # update the counter tasks_completed += 1 # report progress print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.') |
We then create the Lock and counters at the beginning of our program
1 2 3 4 5 6 7 |
... # create a lock for the counter lock = Lock() # total tasks we will execute tasks_total = 20 # total completed tasks tasks_completed = 0 |
Tying this together, the complete example of keeping track of the number of complete and remaining tasks is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of a simple progress indicator for tasks from time import sleep from random import random from threading import Lock from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # simple progress indicator callback function def progress_indicator(future): global lock, tasks_total, tasks_completed # obtain the lock with lock: # update the counter tasks_completed += 1 # report progress print(f'{tasks_completed}/{tasks_total} completed, {tasks_total-tasks_completed} remain.') # mock test that works for moment def task(name): sleep(random()) # create a lock for the counter lock = Lock() # total tasks we will execute tasks_total = 20 # total completed tasks tasks_completed = 0 # start the thread pool with ThreadPoolExecutor(2) as executor: # send in the tasks futures = [executor.submit(task, i) for i in range(20)] # register the progress indicator callback for future in futures: future.add_done_callback(progress_indicator) # wait for all tasks to complete print('Done!') |
Running the example creates the thread pool, submits the tasks, and registers the callback function as before.
As tasks complete, the updated callback function is used, first updating the counter, reporting the total complete and total remaining tasks in a thread-safe manner, governed by a lock.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
1/20 completed, 19 remain. 2/20 completed, 18 remain. 3/20 completed, 17 remain. 4/20 completed, 16 remain. 5/20 completed, 15 remain. 6/20 completed, 14 remain. 7/20 completed, 13 remain. 8/20 completed, 12 remain. 9/20 completed, 11 remain. 10/20 completed, 10 remain. 11/20 completed, 9 remain. 12/20 completed, 8 remain. 13/20 completed, 7 remain. 14/20 completed, 6 remain. 15/20 completed, 5 remain. 16/20 completed, 4 remain. 17/20 completed, 3 remain. 18/20 completed, 2 remain. 19/20 completed, 1 remain. 20/20 completed, 0 remain. Done! |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to show progress of tasks in the ThreadPoolExecutor.
Do you have any questions about how to show progress of tasks?
Ask your questions in the comments below and I will do my best to answer.
Photo by Zoltan Tasi on Unsplash
Do you have any questions?