Number of Remaining Tasks in the ThreadPoolExecutor

November 26, 2021 Python ThreadPoolExecutor

You can check the number of remaining tasks in the ThreadPoolExecutor by the size of the _work_queue protected member.

In this tutorial, you will discover how to check the number of remaining tasks in the ThreadPoolExecutor.

Let's get started.

Need to Check the Number of Remaining Tasks

The ThreadPoolExecutor provides a flexible way to execute ad hoc tasks using a pool of worker threads.

You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.

Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.

We may need to check the number of remaining tasks in the ThreadPoolExecutor.

This may be for many reasons, such as submitting tasks and not retaining or having access to the associated Future objects.

How can we check the number of tasks that remain in the ThreadPoolExecutor?

How to Check the Number of Tasks Remaining in the ThreadPoolExecutor

There are a number of ways that we can check the number of tasks that remain in the ThreadPoolExecutor.

One approach is to report the number of pending tasks. That is, tasks that have been submitted but have not started executing.

This can be achieved by checking the size of the _work_queue protected member of the ThreadPoolExecutor class.

This is an instance of the SimpleQueue class and we can check the number of items in the queue via the qsize() function.

For example:

...
# check the number of scheduled tasks
size = executor._work_queue.qsize()

You can learn more about thread-safe queues and the SimpleQueue class here:

This value in addition to the number of worker threads in the thread pool will give a reasonable approximation of the number of remaining tasks.

If we know how many tasks were submitted to the thread pool, we can also check the number of remaining tasks by counting the number of tasks that have been completed.

One approach would be to collect all of the Future objects for the tasks in the thread pool, then use as_completed() in a for loop and update a counter as each task is done.

For example:

...
# the number of tasks that are completed
completed = 0
# update each time a task finishes
for _ in as_completed(futures):
    # check the number of remaining tasks
    completed += 1
    size = TOTAL_TASKS - completed

A similar approach to this would be to update and report the number of remaining tasks using a done callback for each Future.

This can be achieved by defining a function that takes a Future object and adding it to the Future object for each task shortly after they are submitted by calling the add_done_callback() function.

You can learn more about adding callback functions here:

The callback function can update a counter for the number of completed tasks and then report the number of tasks that remain. A lock is required as the callback function is called by the worker thread for the task and may be called concurrently by multiple worker threads.

An example of a callback function to check and report the number of remaining tasks is listed below.

# callback that keeps track of the number of completed tasks
def completed_callback(future):
    global completed, lock
    with lock:
        completed += 1
        # check the number of remaining tasks
        size = TOTAL_TASKS - completed
        # report the total number of tasks that remain
        print(f'About {size} tasks remain')

This function can be registered on Future objects as follows:

...
# add callback for checking the number of remaining tasks
future.add_done_callback(completed_callback)

The downside of these last two approaches is that it requires that you know how many tasks were submitted to the thread pool and it requires that you store and have access to the Future object for each submitted task.

Now that we know how to check the number of remaining tasks in the thread pool, let's look at some worked examples.

Example of Checking the Number of Remaining Tasks Using Queue Size

Let's explore how we can report the number of remaining tasks using the size of the queue within the ThreadPoolExecutor.

Firstly, let's define a mock task that will sleep for a moment.

# mock test that works for moment
def task():
    sleep(random())

Next, we can create a thread pool with four worker threads and submit 50 tasks.

...
# number of worker threads
n_workers = 4
# start the process pool
with ThreadPoolExecutor(n_workers) as executor:
    # submit many tasks
    futures = [executor.submit(task) for _ in range(50)]

In this example, we want to report an estimate of the number of remaining tasks frequently. One approach is to report the number of remaining tasks as each task in the pool is completed.

This can be achieved by passing a list of all Future objects to the as_completed() function.

...
# update each time a task finishes
for _ in as_completed(futures):
	# ...

Each iteration, we can then access the _work_queue member of the ThreadPoolExecutor class and report the size of this queue via the qsize() function.

This will report the total number of tasks that are queued up and ready for execution. If we add the number of worker threads in the pool to this number, we will get a rough estimate of the number of remaining tasks.

...
# report the number of remaining tasks
size = executor._work_queue.qsize() + n_workers
print(f'About {size} tasks remain')

Tying this together, the complete example of checking and reporting the number of remaining tasks in the thread pool is listed below.

# SuperFastPython.com
# example of estimating the number of remaining tasks using a protected member
from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# mock test that works for moment
def task():
    sleep(random())

# number of worker threads
n_workers = 4
# start the process pool
with ThreadPoolExecutor(n_workers) as executor:
    # submit many tasks
    futures = [executor.submit(task) for _ in range(50)]
    # update each time a task finishes
    for _ in as_completed(futures):
        # report the number of remaining tasks
        size = executor._work_queue.qsize() + n_workers
        print(f'About {size} tasks remain')

Running the example creates the thread pool and submits the tasks as per normal.

Then, as tasks are completed, the count of the number of remaining tasks is reported.

We can see that the results are quite reasonable until the queue of tasks is empty, and then the estimate is fixed at a value of four. We could fix this by reporting something like "four or fewer tasks remain" at this point.

...
About 10 tasks remain
About 9 tasks remain
About 8 tasks remain
About 7 tasks remain
About 6 tasks remain
About 5 tasks remain
About 4 tasks remain
About 4 tasks remain
About 4 tasks remain
About 4 tasks remain
About 4 tasks remain

We might not have access to the Future objects for tasks submitted to the thread pool.

In this case, we can spin and check the _work_queue each iteration until the queue of tasks is empty.

For example:

...
# report the number of remaining tasks
while executor._work_queue.qsize() > 0:
    size = executor._work_queue.qsize() + n_workers
    print(f'About {size} tasks remain')
    sleep(0.1)
print('4 or fewer tasks remain...')

This is not ideal as the main thread will perform unnecessary computation, but it will provide a simple way of checking and reporting the progress of the thread pool without access to the Future objects.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# estimate remaining tasks using a protected member and no futures
from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# mock test that works for moment
def task():
    sleep(random())

# number of worker threads
n_workers = 4
# start the process pool
with ThreadPoolExecutor(n_workers) as executor:
    # submit many tasks
    _ = [executor.submit(task) for _ in range(50)]
    # report the number of remaining tasks
    while executor._work_queue.qsize() > 0:
        size = executor._work_queue.qsize() + n_workers
        print(f'About {size} tasks remain')
        sleep(0.1)
    print('4 or fewer tasks remain...')

Running the example creates the thread pool and submits the fifty tasks as before.

We can see that this approach will report the same number of tasks occasionally and skip over numbers in the count as it is operating based on a sleep timer rather than responding to the actual completion of tasks in the pool.

...
About 20 tasks remain
About 19 tasks remain
About 17 tasks remain
About 14 tasks remain
About 14 tasks remain
About 14 tasks remain
About 13 tasks remain
About 13 tasks remain
About 11 tasks remain
About 11 tasks remain
About 10 tasks remain
About 10 tasks remain
About 8 tasks remain
About 8 tasks remain
About 7 tasks remain
About 7 tasks remain
About 6 tasks remain
About 5 tasks remain
4 or fewer tasks remain...

Example of Checking the Number of Remaining Tasks With a Count

A simpler approach to checking and reporting the number of remaining tasks is to simply count.

This requires both access to the Future objects in order to update the count as tasks finish, and to record the number of tasks that were submitted in the first place.

First, we can define a constant that specifies the number of tasks that will be submitted.

...
# total tasks
TOTAL_TASKS = 50

Next, we can define a variable that will keep track of the number of tasks that have been completed.

...
# the number of tasks that are completed
completed = 0

Finally, we can use the as_completed() function to iterate Future objects as tasks are completed and update the count and report the current number of tasks that remain.

...
# update each time a task finishes
for _ in as_completed(futures):
    # report the number of remaining tasks
    completed += 1
    size = TOTAL_TASKS - completed
    print(f'About {size} tasks remain to be completed')

Tying this together, the complete example of using a count in the main thread to report the number of remaining tasks in the thread pool is listed below.

# SuperFastPython.com
# example of estimating the number of remaining tasks using a count
from time import sleep
from random import random
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# mock test that works for moment
def task():
    sleep(random())

# total tasks
TOTAL_TASKS = 50
# start the process pool
with ThreadPoolExecutor(4) as executor:
    # submit many tasks
    futures = [executor.submit(task) for _ in range(TOTAL_TASKS)]
    # the number of tasks that are completed
    completed = 0
    # update each time a task finishes
    for _ in as_completed(futures):
        # report the number of remaining tasks
        completed += 1
        size = TOTAL_TASKS - completed
        print(f'About {size} tasks remain to be completed')

Running the example creates the thread pool and submits fifty tasks.

As tasks are completed, the count of completed tasks is updated and we are able to report an accurate estimate of the number of tasks that remain.

This is a reliable approach but does require access to the Future objects and the count of tasks that were submitted.

...
About 10 tasks remain to be completed
About 9 tasks remain to be completed
About 8 tasks remain to be completed
About 7 tasks remain to be completed
About 6 tasks remain to be completed
About 5 tasks remain to be completed
About 4 tasks remain to be completed
About 3 tasks remain to be completed
About 2 tasks remain to be completed
About 1 tasks remain to be completed
About 0 tasks remain to be completed

Example of Checking the Number of Remaining Tasks With a Callback

Another approach to checking and reporting the number of remaining tasks is to use a done callback.

This requires that we know how many tasks were submitted, but does not require the Future objects, beyond registering the callback.

As before, we must define the total number of tasks that were submitted. In this case, we will use a global constant.

...
# total tasks
TOTAL_TASKS = 50

Next, we need to define a count variable for the number of tasks that were completed and a mutual exclusion lock.

A Lock is needed to update the count of completed tasks in the callback. This is because the callback function will increment the count and can be accessed concurrently by the worker thread in the thread pool that is executing the callback function.

The Lock protects the critical section of updating the count by ensuring only one thread can execute the block at the same time.

...
# lock for protecting the completed count
lock = Lock()
# the number of tasks that are completed
completed = 0

Next, we can define our callback function.

The function will need access to the count of completed tasks and the lock, both of which are global variables.

The function can then acquire the lock using the context manager, a preferred approach so that we don’t have to explicitly call acquire() and release() on the lock.

Once we have the lock, the function can update the count and report the current progress in terms of the number of remaining tasks.

The complete callback function is listed below.

# callback that keeps track of the number of completed tasks
def completed_callback(future):
    global completed, lock
    with lock:
        completed += 1
        # check the number of remaining tasks
        size = TOTAL_TASKS - completed
        # report the total number of tasks that remain
        print(f'About {size} tasks remain')

Next, all that remains is to register the callback function on the Future object for each submitted task.

This can be achieved by calling the add_done_callback() function and specifying the callback function name.

...
# add callbacks
for future in futures:
    future.add_done_callback(completed_callback)

And that's it.

Tying this together, the complete example of checking and reporting the number of remaining tasks in the ThreadPoolExecutor using a callback is listed below.

# SuperFastPython.com
# example of estimating the number of remaining tasks with a callback
from time import sleep
from random import random
from threading import Lock
from concurrent.futures import ThreadPoolExecutor

# mock test that works for moment
def task():
    sleep(random())

# callback that keeps track of the number of completed tasks
def completed_callback(future):
    global completed, lock
    with lock:
        completed += 1
        # check the number of remaining tasks
        size = TOTAL_TASKS - completed
        # report the total number of tasks that remain
        print(f'About {size} tasks remain')

# total tasks
TOTAL_TASKS = 50
# lock for protecting the completed count
lock = Lock()
# the number of tasks that are completed
completed = 0
# start the process pool
with ThreadPoolExecutor(4) as executor:
    # submit many tasks
    futures = [executor.submit(task) for _ in range(TOTAL_TASKS)]
    # add callbacks
    for future in futures:
        future.add_done_callback(completed_callback)
    print('Waiting for tasks to complete...')

Running the example creates the thread pool and submits the tasks as before.

The progress is then reported as each task is completed, providing an accurate estimate of the number of remaining tasks without access to the Future objects in the main thread.

...
About 10 tasks remain
About 9 tasks remain
About 8 tasks remain
About 7 tasks remain
About 6 tasks remain
About 5 tasks remain
About 4 tasks remain
About 3 tasks remain
About 2 tasks remain
About 1 tasks remain
About 0 tasks remain

Takeaways

You now know how to check the number of tasks remaining in the ThreadPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.