Last Updated on September 12, 2022
You can check the number of remaining tasks in the ThreadPoolExecutor by the size of the _work_queue protected member.
In this tutorial, you will discover how to check the number of remaining tasks in the ThreadPoolExecutor.
Let’s get started.
Need to Check the Number of Remaining Tasks
The ThreadPoolExecutor provides a flexible way to execute ad hoc tasks using a pool of worker threads.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
We may need to check the number of remaining tasks in the ThreadPoolExecutor.
This may be for many reasons, such as submitting tasks and not retaining or having access to the associated Future objects.
How can we check the number of tasks that remain in the ThreadPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Check the Number of Tasks Remaining in the ThreadPoolExecutor
There are a number of ways that we can check the number of tasks that remain in the ThreadPoolExecutor.
One approach is to report the number of pending tasks. That is, tasks that have been submitted but have not started executing.
This can be achieved by checking the size of the _work_queue protected member of the ThreadPoolExecutor class.
This is an instance of the SimpleQueue class and we can check the number of items in the queue via the qsize() function.
For example:
1 2 3 |
... # check the number of scheduled tasks size = executor._work_queue.qsize() |
You can learn more about thread-safe queues and the SimpleQueue class here:
This value in addition to the number of worker threads in the thread pool will give a reasonable approximation of the number of remaining tasks.
If we know how many tasks were submitted to the thread pool, we can also check the number of remaining tasks by counting the number of tasks that have been completed.
One approach would be to collect all of the Future objects for the tasks in the thread pool, then use as_completed() in a for loop and update a counter as each task is done.
For example:
1 2 3 4 5 6 7 8 |
... # the number of tasks that are completed completed = 0 # update each time a task finishes for _ in as_completed(futures): # check the number of remaining tasks completed += 1 size = TOTAL_TASKS - completed |
A similar approach to this would be to update and report the number of remaining tasks using a done callback for each Future.
This can be achieved by defining a function that takes a Future object and adding it to the Future object for each task shortly after they are submitted by calling the add_done_callback() function.
You can learn more about adding callback functions here:
The callback function can update a counter for the number of completed tasks and then report the number of tasks that remain. A lock is required as the callback function is called by the worker thread for the task and may be called concurrently by multiple worker threads.
An example of a callback function to check and report the number of remaining tasks is listed below.
1 2 3 4 5 6 7 8 9 |
# callback that keeps track of the number of completed tasks def completed_callback(future): global completed, lock with lock: completed += 1 # check the number of remaining tasks size = TOTAL_TASKS - completed # report the total number of tasks that remain print(f'About {size} tasks remain') |
This function can be registered on Future objects as follows:
1 2 3 |
... # add callback for checking the number of remaining tasks future.add_done_callback(completed_callback) |
The downside of these last two approaches is that it requires that you know how many tasks were submitted to the thread pool and it requires that you store and have access to the Future object for each submitted task.
Now that we know how to check the number of remaining tasks in the thread pool, let’s look at some worked examples.
Example of Checking the Number of Remaining Tasks Using Queue Size
Let’s explore how we can report the number of remaining tasks using the size of the queue within the ThreadPoolExecutor.
Firstly, let’s define a mock task that will sleep for a moment.
1 2 3 |
# mock test that works for moment def task(): sleep(random()) |
Next, we can create a thread pool with four worker threads and submit 50 tasks.
1 2 3 4 5 6 7 |
... # number of worker threads n_workers = 4 # start the process pool with ThreadPoolExecutor(n_workers) as executor: # submit many tasks futures = [executor.submit(task) for _ in range(50)] |
In this example, we want to report an estimate of the number of remaining tasks frequently. One approach is to report the number of remaining tasks as each task in the pool is completed.
This can be achieved by passing a list of all Future objects to the as_completed() function.
1 2 3 4 |
... # update each time a task finishes for _ in as_completed(futures): # ... |
Each iteration, we can then access the _work_queue member of the ThreadPoolExecutor class and report the size of this queue via the qsize() function.
This will report the total number of tasks that are queued up and ready for execution. If we add the number of worker threads in the pool to this number, we will get a rough estimate of the number of remaining tasks.
1 2 3 4 |
... # report the number of remaining tasks size = executor._work_queue.qsize() + n_workers print(f'About {size} tasks remain') |
Tying this together, the complete example of checking and reporting the number of remaining tasks in the thread pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of estimating the number of remaining tasks using a protected member from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # mock test that works for moment def task(): sleep(random()) # number of worker threads n_workers = 4 # start the process pool with ThreadPoolExecutor(n_workers) as executor: # submit many tasks futures = [executor.submit(task) for _ in range(50)] # update each time a task finishes for _ in as_completed(futures): # report the number of remaining tasks size = executor._work_queue.qsize() + n_workers print(f'About {size} tasks remain') |
Running the example creates the thread pool and submits the tasks as per normal.
Then, as tasks are completed, the count of the number of remaining tasks is reported.
We can see that the results are quite reasonable until the queue of tasks is empty, and then the estimate is fixed at a value of four. We could fix this by reporting something like “four or fewer tasks remain” at this point.
1 2 3 4 5 6 7 8 9 10 11 12 |
... About 10 tasks remain About 9 tasks remain About 8 tasks remain About 7 tasks remain About 6 tasks remain About 5 tasks remain About 4 tasks remain About 4 tasks remain About 4 tasks remain About 4 tasks remain About 4 tasks remain |
We might not have access to the Future objects for tasks submitted to the thread pool.
In this case, we can spin and check the _work_queue each iteration until the queue of tasks is empty.
For example:
1 2 3 4 5 6 7 |
... # report the number of remaining tasks while executor._work_queue.qsize() > 0: size = executor._work_queue.qsize() + n_workers print(f'About {size} tasks remain') sleep(0.1) print('4 or fewer tasks remain...') |
This is not ideal as the main thread will perform unnecessary computation, but it will provide a simple way of checking and reporting the progress of the thread pool without access to the Future objects.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # estimate remaining tasks using a protected member and no futures from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # mock test that works for moment def task(): sleep(random()) # number of worker threads n_workers = 4 # start the process pool with ThreadPoolExecutor(n_workers) as executor: # submit many tasks _ = [executor.submit(task) for _ in range(50)] # report the number of remaining tasks while executor._work_queue.qsize() > 0: size = executor._work_queue.qsize() + n_workers print(f'About {size} tasks remain') sleep(0.1) print('4 or fewer tasks remain...') |
Running the example creates the thread pool and submits the fifty tasks as before.
We can see that this approach will report the same number of tasks occasionally and skip over numbers in the count as it is operating based on a sleep timer rather than responding to the actual completion of tasks in the pool.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
... About 20 tasks remain About 19 tasks remain About 17 tasks remain About 14 tasks remain About 14 tasks remain About 14 tasks remain About 13 tasks remain About 13 tasks remain About 11 tasks remain About 11 tasks remain About 10 tasks remain About 10 tasks remain About 8 tasks remain About 8 tasks remain About 7 tasks remain About 7 tasks remain About 6 tasks remain About 5 tasks remain 4 or fewer tasks remain... |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Checking the Number of Remaining Tasks With a Count
A simpler approach to checking and reporting the number of remaining tasks is to simply count.
This requires both access to the Future objects in order to update the count as tasks finish, and to record the number of tasks that were submitted in the first place.
First, we can define a constant that specifies the number of tasks that will be submitted.
1 2 3 |
... # total tasks TOTAL_TASKS = 50 |
Next, we can define a variable that will keep track of the number of tasks that have been completed.
1 2 3 |
... # the number of tasks that are completed completed = 0 |
Finally, we can use the as_completed() function to iterate Future objects as tasks are completed and update the count and report the current number of tasks that remain.
1 2 3 4 5 6 7 |
... # update each time a task finishes for _ in as_completed(futures): # report the number of remaining tasks completed += 1 size = TOTAL_TASKS - completed print(f'About {size} tasks remain to be completed') |
Tying this together, the complete example of using a count in the main thread to report the number of remaining tasks in the thread pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of estimating the number of remaining tasks using a count from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # mock test that works for moment def task(): sleep(random()) # total tasks TOTAL_TASKS = 50 # start the process pool with ThreadPoolExecutor(4) as executor: # submit many tasks futures = [executor.submit(task) for _ in range(TOTAL_TASKS)] # the number of tasks that are completed completed = 0 # update each time a task finishes for _ in as_completed(futures): # report the number of remaining tasks completed += 1 size = TOTAL_TASKS - completed print(f'About {size} tasks remain to be completed') |
Running the example creates the thread pool and submits fifty tasks.
As tasks are completed, the count of completed tasks is updated and we are able to report an accurate estimate of the number of tasks that remain.
This is a reliable approach but does require access to the Future objects and the count of tasks that were submitted.
1 2 3 4 5 6 7 8 9 10 11 12 |
... About 10 tasks remain to be completed About 9 tasks remain to be completed About 8 tasks remain to be completed About 7 tasks remain to be completed About 6 tasks remain to be completed About 5 tasks remain to be completed About 4 tasks remain to be completed About 3 tasks remain to be completed About 2 tasks remain to be completed About 1 tasks remain to be completed About 0 tasks remain to be completed |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Checking the Number of Remaining Tasks With a Callback
Another approach to checking and reporting the number of remaining tasks is to use a done callback.
This requires that we know how many tasks were submitted, but does not require the Future objects, beyond registering the callback.
As before, we must define the total number of tasks that were submitted. In this case, we will use a global constant.
1 2 3 |
... # total tasks TOTAL_TASKS = 50 |
Next, we need to define a count variable for the number of tasks that were completed and a mutual exclusion lock.
A Lock is needed to update the count of completed tasks in the callback. This is because the callback function will increment the count and can be accessed concurrently by the worker thread in the thread pool that is executing the callback function.
The Lock protects the critical section of updating the count by ensuring only one thread can execute the block at the same time.
1 2 3 4 5 |
... # lock for protecting the completed count lock = Lock() # the number of tasks that are completed completed = 0 |
Next, we can define our callback function.
The function will need access to the count of completed tasks and the lock, both of which are global variables.
The function can then acquire the lock using the context manager, a preferred approach so that we don’t have to explicitly call acquire() and release() on the lock.
Once we have the lock, the function can update the count and report the current progress in terms of the number of remaining tasks.
The complete callback function is listed below.
1 2 3 4 5 6 7 8 9 |
# callback that keeps track of the number of completed tasks def completed_callback(future): global completed, lock with lock: completed += 1 # check the number of remaining tasks size = TOTAL_TASKS - completed # report the total number of tasks that remain print(f'About {size} tasks remain') |
Next, all that remains is to register the callback function on the Future object for each submitted task.
This can be achieved by calling the add_done_callback() function and specifying the callback function name.
1 2 3 4 |
... # add callbacks for future in futures: future.add_done_callback(completed_callback) |
And that’s it.
Tying this together, the complete example of checking and reporting the number of remaining tasks in the ThreadPoolExecutor using a callback is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of estimating the number of remaining tasks with a callback from time import sleep from random import random from threading import Lock from concurrent.futures import ThreadPoolExecutor # mock test that works for moment def task(): sleep(random()) # callback that keeps track of the number of completed tasks def completed_callback(future): global completed, lock with lock: completed += 1 # check the number of remaining tasks size = TOTAL_TASKS - completed # report the total number of tasks that remain print(f'About {size} tasks remain') # total tasks TOTAL_TASKS = 50 # lock for protecting the completed count lock = Lock() # the number of tasks that are completed completed = 0 # start the process pool with ThreadPoolExecutor(4) as executor: # submit many tasks futures = [executor.submit(task) for _ in range(TOTAL_TASKS)] # add callbacks for future in futures: future.add_done_callback(completed_callback) print('Waiting for tasks to complete...') |
Running the example creates the thread pool and submits the tasks as before.
The progress is then reported as each task is completed, providing an accurate estimate of the number of remaining tasks without access to the Future objects in the main thread.
1 2 3 4 5 6 7 8 9 10 11 12 |
... About 10 tasks remain About 9 tasks remain About 8 tasks remain About 7 tasks remain About 6 tasks remain About 5 tasks remain About 4 tasks remain About 3 tasks remain About 2 tasks remain About 1 tasks remain About 0 tasks remain |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to check the number of tasks remaining in the ThreadPoolExecutor.
Do you have any questions about how to check the number of remaining tasks?
Ask your questions in the comments below and I will do my best to answer.
Photo by Zoltan Tasi on Unsplash
Do you have any questions?