Last Updated on October 29, 2022
You can report the number of remaining tasks in the ThreadPool with apply_async() and a busy-wait loop, or via the imap_unordered() function.
In this tutorial, you will discover how to report the number of remaining tasks in the ThreadPool.
Let’s get started.
Need to Report Remaining Tasks in the Pool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may issue many tasks to the pool and require that we keep the user updated about how many tasks remain to be completed.
The ThreadPool does not provide this capability, instead, we must develop it ourselves.
How can we report the number of remaining tasks in the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How to Report Remaining Tasks in the Pool
We can report the number of remaining tasks in the ThreadPool by keeping track of the number of tasks issued and the number of tasks completed.
This can be achieved in different ways, depending on the specific method used to issue the tasks.
We will look at two approaches:
- Issue tasks asynchronously with apply_async().
- Issue tasks synchronously with imap_unordered().
Let’s take a closer look at each approach in turn.
Remaining Tasks with apply_async()
The apply_async() method will issue a one-off task asynchronously and return immediately with an AsyncResult object.
You can learn more about how to use the apply_async() function in the tutorial:
For example:
1 2 3 |
... # issue task asynchronously result = pool.apply_async(task) |
We can later check if the task has been completed via the AsyncResult.ready() method, which will return True if the associated task is done.
For example:
1 2 3 4 |
... # check if the task is done if result.ready(): # done... |
You can learn more about how to use the AsyncResult object in the tutorial:
We can issue a large number of tasks with the apply_async(), then check the status of the AsyncResult objects in order to determine how many tasks remain in the ThreadPool.
This can be performed in a loop that runs until all tasks are completed.
1 2 3 4 5 6 7 |
... # issue many tasks results = [pool.apply_async(task) for _ in range(50)] # report remaining tasks for a while count = len(results) while count: # ... |
In each iteration of the loop we can count the total number of AsyncResult objects where the ready() function returns False, meaning the tasks are still running.
This can be achieved in a list comprehension, where the resulting list can be summed to give the count.
For example:
1 2 3 |
... # check all tasks and count the number that are not done count = sum([not r.ready() for r in results]) |
The number of remaining tasks can then be reported and the caller can block for a moment in a busy-wait loop.
1 2 3 4 5 |
... # report the number of remaining tasks print(f'>{count}/{len(results)} tasks remain') # wait a moment sleep(0.5) |
You can learn more about the busy wait loop in the tutorial:
The complete loop may look as follows:
1 2 3 4 5 6 7 8 9 10 11 12 |
... # issue many tasks results = [pool.apply_async(task) for _ in range(50)] # report remaining tasks for a while count = len(results) while count:     # check all tasks and count the number that are not done     count = sum([not r.ready() for r in results])     # report the number of remaining tasks     print(f'>{count}/{len(results)} tasks remain')     # wait a moment     sleep(0.5) |
This approach could be expanded to track the AsyncResult objects returned from other functions used to issue tasks, such as map_async() and starmap_async(). The AsyncResult objects returned could be added to the same list of results and their status checked each iteration of the loop.
The limitation of this approach is that although a function like map_async() may issue many tasks into the pool, it only returns a single AsyncResult object, allowing the batch of tasks to be treated as a single task when reporting the number of remaining tasks in the pool.
Another limitation is the use of a busy wait loop which may be wasteful of resources if the wait time is not calibrated well with the typical task duration.
Remaining Tasks with imap_unordered()
The imap_unordered() function will call a target function for each argument in a provided iterable.
Each call is issued as a task in the ThreadPool, one by one as workers become available. The function returns an iterable of return values. Return values are yielded in the order that tasks are completed, allowing the caller to be responsive to tasks completed in the pool.
For example:
1 2 3 4 |
... # issue tasks and respond each time a task is completed for result in pool.imap_unordered(task, range(50)): # ... |
You can learn more about how to use the imap_unordered() function in the tutorial:
This function provides an effective way to dynamically report the number of tasks that remain in the ThreadPool.
We can issue tasks using imap_unordered() and as each task completes, a result is yielded, allowing the caller to respond and report the number of tasks that remain.
It requires that the number of tasks that were issued is known and that we keep track of the number of tasks that have been completed.
For example:
1 2 3 4 5 6 7 8 9 10 11 |
... # number of tasks n_tasks = 50 # keep track of the number of tasks completed completed = 0 # issue tasks and respond each time a task is completed for _ in pool.imap_unordered(task, range(n_tasks)):     # updated completed tasks     completed += 1     # report the number of remaining tasks     print(f'>{(n_tasks-completed)}/{n_tasks} tasks remain') |
This approach will be effective, even if the target function does not return a value explicitly, e.g. returns None.
This approach is more responsive and more efficient than the previous approach, only reporting changes to the number of remaining tasks when a task finishes, rather than at ad hoc times.
A limitation of this approach is that it is limited to tracking the number of remaining tasks issued in a single batch. It does not allow the general status of the ThreadPool to be reported, e.g. across batches of tasks.
Now that we have seen a few ways to report the number of remaining tasks, let’s look at some worked examples.
Example of Remaining Tasks with apply_async()
We can explore how to report the number of remaining tasks in the ThreadPool issued with apply_async().
In this example, we will define a task that blocks for a random fraction of a second. We will then issue a large number of these tasks to the pool asynchronously and gather the AsyncResult objects. In the main thread, we will then loop until the count of running tasks is zero. For each iteration of the loop, we will count the number of running tasks via their AsyncResult objects, report the value, and block for a fraction of a second.
Firstly, we can define a task function to simulate work by blocking for a random fraction of a second.
The task() function below implements this.
1 2 3 4 |
# task executed in a worker thread def task():     # block for a fraction of a second     sleep(random()) |
In the main thread, we can first create the ThreadPool with a default configuration.
We will use the context manager interface to ensure that the pool is closed once we are finished with it.
1 2 3 4 |
... # create the pool with ThreadPool() as pool: # ... |
We can then issue 50 calls to our custom task() function and gather the AsyncResult objects in a list.
This can be achieved in a list comprehension.
1 2 3 |
... # issue many tasks results = [pool.apply_async(task) for _ in range(50)] |
Next, we can define a count variable for the number of remaining tasks and loop until this count is zero. Initially, the number of remaining tasks equals the number of tasks that were issued.
1 2 3 4 5 |
... # report remaining tasks for a while count = len(results) while count: # ... |
For each iteration, we can first count the number of remaining tasks. This involves counting the number of tasks where the AsyncResult.ready() function returns False, indicating that the task is still running.
This can be achieved in a list comprehension that first collects a True value for all tasks that are still running, then sums these True values into account.
1 2 3 |
... # check all tasks and count the number that are not done count = sum([not r.ready() for r in results]) |
We can then report the number of tasks that remain running in the ThreadPool.
1 2 3 |
... # report the number of remaining tasks print(f'>{count}/{len(results)} tasks remain') |
Finally, we can block for a moment before checking the tasks again.
This is called a busy-wait loop.
1 2 3 |
... # wait a moment sleep(0.5) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of reporting the number of remaining tasks with apply_async() from time import sleep from random import random from multiprocessing.pool import ThreadPool  # task executed in a worker thread def task():     # block for a fraction of a second     sleep(random())  # protect the entry point if __name__ == '__main__':     # create the pool     with ThreadPool() as pool:         # issue many tasks         results = [pool.apply_async(task) for _ in range(50)]         # report remaining tasks for a while         count = len(results)         while count:             # check all tasks and count the number that are not done             count = sum([not r.ready() for r in results])             # report the number of remaining tasks             print(f'>{count}/{len(results)} tasks remain')             # wait a moment             sleep(0.5) |
Running the example first starts the ThreadPool, then issues 50 tasks.
The tasks begin executing in the pool.
The main thread then checks the status of all tasks, reports the value, then blocks.
This thread is repeated until all tasks are complete, e.g. are “ready“.
All tasks are completed and the busy-wait loop exits. The pool is then closed automatically via the context manager interface.
An example output is listed below.
1 2 3 4 5 6 7 8 9 |
>50/50 tasks remain >44/50 tasks remain >39/50 tasks remain >31/50 tasks remain >23/50 tasks remain >17/50 tasks remain >8/50 tasks remain >4/50 tasks remain >0/50 tasks remain |
Next, let’s explore how we might report the number of remaining tasks with the imap_unordered() function.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of Remaining Tasks with imap_unordered()
We can explore how to report the number of remaining tasks via the imap_unordered() function.
In this example, we will again define a custom task that blocks for a fraction of a second to simulate work. We will then issue a large number of tasks via the imap_unordered() and then iterate the return values in the order that tasks are completed. As our task does not have a return value, it will be ignored and instead used as a prompt to dynamically report the status of the number of tasks that remain running in the pool.
Firstly, we can define the task to be executed in the pool.
The task blocks for a fraction of a second and does not return a value.
1 2 3 4 |
# task executed in a worker thread def task(arg):     # block for a fraction of a second     sleep(random()) |
Next, we can create the ThreadPool using the default configuration.
We will use the context manager interface to ensure that the pool is closed correctly once we are finished with it.
1 2 3 4 |
... # create the pool with ThreadPool() as pool: # ... |
Next, we will record the number of tasks that we will issue, which will help with reporting the number of remaining tasks later.
We will also initialize a variable to keep track of the number of tasks that have been completed.
1 2 3 4 5 |
... # number of tasks n_tasks = 50 # keep track of the number of tasks completed completed = 0 |
Next, we will issue the tasks via imap_unordered().
This function returns an iterable that yields results in the order that tasks are completed, e.g. out of order compared to how they were issued.
We will look over the return values, None in this case, and use each completed task as a trigger to update the count of tasks completed. We will also report the number of tasks completed as the total tasks issued minus the number of tasks that have been completed.
1 2 3 4 5 6 7 |
... # issue tasks and respond each time a task is completed for _ in pool.imap_unordered(task, range(50)):     # updated completed tasks     completed += 1     # report the number of remaining tasks     print(f'>{(n_tasks-completed)}/{n_tasks} tasks remain') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of reporting the number of remaining tasks with imap_unordered() from time import sleep from random import random from multiprocessing.pool import ThreadPool  # task executed in a worker thread def task(arg):     # block for a fraction of a second     sleep(random())  # protect the entry point if __name__ == '__main__':     # create the pool     with ThreadPool() as pool:         # number of tasks         n_tasks = 50         # keep track of the number of tasks completed         completed = 0         # issue tasks and respond each time a task is completed         for _ in pool.imap_unordered(task, range(50)):             # updated completed tasks             completed += 1             # report the number of remaining tasks             print(f'>{(n_tasks-completed)}/{n_tasks} tasks remain') |
Running the example first starts the ThreadPool, then issues tasks one by one as workers become available in the pool.
The tasks begin executing in the pool.
The main thread loops over return values as tasks are complete and updates the count of completed tasks and the total tasks remaining.
Unlike the previous example, in this case, the status of the number of remaining tasks is updated only as tasks are completed and after each individual task completes, giving a fine-grained indication of the tasks that remain.
Once all tasks have been completed, the loop exits, and the ThreadPool is closed automatically by the context manager.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>49/50 tasks remain >48/50 tasks remain >47/50 tasks remain >46/50 tasks remain >45/50 tasks remain >44/50 tasks remain >43/50 tasks remain >42/50 tasks remain >41/50 tasks remain >40/50 tasks remain >39/50 tasks remain >38/50 tasks remain >37/50 tasks remain >36/50 tasks remain >35/50 tasks remain >34/50 tasks remain >33/50 tasks remain >32/50 tasks remain >31/50 tasks remain >30/50 tasks remain >29/50 tasks remain >28/50 tasks remain >27/50 tasks remain >26/50 tasks remain >25/50 tasks remain >24/50 tasks remain >23/50 tasks remain >22/50 tasks remain >21/50 tasks remain >20/50 tasks remain >19/50 tasks remain >18/50 tasks remain >17/50 tasks remain >16/50 tasks remain >15/50 tasks remain >14/50 tasks remain >13/50 tasks remain >12/50 tasks remain >11/50 tasks remain >10/50 tasks remain >9/50 tasks remain >8/50 tasks remain >7/50 tasks remain >6/50 tasks remain >5/50 tasks remain >4/50 tasks remain >3/50 tasks remain >2/50 tasks remain >1/50 tasks remain >0/50 tasks remain |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to report the number of remaining tasks in the ThreadPool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jeremy Bezanger on Unsplash
Do you have any questions?