Last Updated on September 12, 2022
You can report the number of remaining tasks in the multiprocessing pool with Pool.apply_async() and a busy-wait loop, or via the Pool.imap_unordered() function.
In this tutorial you will discover how to report the number of remaining tasks in the multiprocessing pool.
Let’s get started.
Need to Report Remaining Tasks in the Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A multiprocessing pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as Pool.apply() or we can apply the same function to an iterable of items using functions such as Pool.map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as Pool.apply_async() and Pool.map_async().
When using the multiprocessing pool, we may issue many tasks to the pool and require that we keep the user updated about how many tasks remain to be completed.
The multiprocessing pool does not provide this capability, instead we must develop it ourselves.
How can we report the number of remaining tasks in the multiprocessing pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Report Remaining Tasks in the Pool
We can report the number of remaining tasks in the multiprocessing pool by keeping track of the number of tasks issued and the number of tasks completed.
This can be achieved in different ways, depending on the specific method used to issue the tasks.
We will look at two approaches:
- Issue tasks asynchronously with Pool.apply_async()
- Issue tasks synchronously with Pool.imap_unordered()
Remaining Tasks with Pool.apply_async()
The Pool.apply_async() function will issue a one-off task asynchronously and return immediately with an AsyncResult object.
You can learn more about how to use the apply_async() function in the tutorial:
For example:
1 2 3 |
... # issue task asynchronously result = pool.apply_async(task) |
We can later check if the task has completed via the AsyncResult.ready() function, which will return True if the associated task is done.
For example:
1 2 3 4 |
... # check if the task is done if result.ready(): # done... |
You can learn more about how to use the AsyncResult object in the tutorial:
We can issue a large number of tasks with the apply_async(), then check the status of the AsyncResult objects in order to determine how many tasks remain in the multiprocessing pool.
This can be performed in a loop that runs until all tasks are completed.
1 2 3 4 5 6 7 |
... # issue many tasks results = [pool.apply_async(task) for _ in range(50)] # report remaining tasks for a while count = len(results) while count: # ... |
Each iteration of the loop we can count the total number of AsyncResult objects where the ready() function returns False, meaning the tasks are still running.
This can be achieved in a list comprehension, where the resulting list can be summed to give the count.
For example:
1 2 3 |
... # check all tasks and count the number that are not done count = sum([not r.ready() for r in results]) |
The number of remaining tasks can then be reported and the caller can block for a moment in a busy-wait loop.
1 2 3 4 5 |
... # report the number of remaining tasks print(f'>{count}/{len(results)} tasks remain') # wait a moment sleep(0.5) |
You can learn more about the busy wait loop in the tutorial:
The complete loop may look as follows:
1 2 3 4 5 6 7 8 9 10 11 12 |
... # issue many tasks results = [pool.apply_async(task) for _ in range(50)] # report remaining tasks for a while count = len(results) while count: # check all tasks and count the number that are not done count = sum([not r.ready() for r in results]) # report the number of remaining tasks print(f'>{count}/{len(results)} tasks remain') # wait a moment sleep(0.5) |
This approach could be expanded to track the AsyncResult objects returned from other functions used to issue tasks, such as map_async() and starmap_async(). The AsyncResult objects returned could be added to the same list of results and their status checked each iteration of the loop.
The limitation of this approach is that although a function like map_async() may issue many tasks into the pool, it only returns a single AsyncResult object, allowing the batch of tasks to be treated as a single task when reporting the number of remaining tasks in the pool.
Another limitation is the use of a busy wait loop which may be wasteful to resources if the wait time is not calibrated well with the typical task duration.
Remaining Tasks with Pool.imap_unordered()
The Pool.imap_unordered() function will call a target function for each argument in a provided iterable.
Each call is issued as a task in the multiprocessing pool, one-by-one as workers become available. The function returns an iterable of return values. Return values are yielded in the order that tasks are completed, allowing the caller to be responsive to tasks completed in the pool.
For example:
1 2 3 4 |
... # issue tasks and respond each time a task is completed for result in pool.imap_unordered(task, range(50)): # ... |
You can learn more about how to use the imap_unordered() function in the tutorial:
This function provides an effective way to dynamically report the number of tasks that remain in the multiprocessing pool.
We can issue tasks using imap_unordered() and as each task completes, a result is yielded, allowing the caller to respond and report the number of tasks that remain.
It requires that the number of tasks that were issued is known and that we keep track of the number of tasks that have completed.
For example:
1 2 3 4 5 6 7 8 9 10 11 |
... # number of tasks n_tasks = 50 # keep track of the number of tasks completed completed = 0 # issue tasks and respond each time a task is completed for _ in pool.imap_unordered(task, range(n_tasks)): # updated completed tasks completed += 1 # report the number of remaining tasks print(f'>{(n_tasks-completed)}/{n_tasks} tasks remain') |
This approach will be effective, even if the target function does not return a value explicitly, e.g. returns None.
This approach is more responsive and more efficient than the previous approach, only reporting changes to the number of remaining tasks when a task finishes, rather than at ad hoc times.
A limitation of this approach is that it is limited to tracking the number of remaining tasks issued in a single batch. It does not allow the general status of the multiprocessing pool to be reported, e.g. across batches of tasks.
Now that we have seen a few ways to report the number of remaining tasks, let’s look at some worked examples.
Example of Remaining Tasks with apply_async()
We can explore how to report the number of remaining tasks in the multiprocessing pool issued with apply_async().
In this example, we will define a task that blocks for a random fraction of a second. We will then issue a large number of these tasks to the pool asynchronously and gather the AsyncResult objects. In the main process, we will then loop until the count of running tasks is zero. Each iteration of the loop we will count the number of running tasks via their AsyncResult objects, report the value and block for a fraction of a second.
Firstly, we can define a task function to simulate work by blocking for a random fraction of a second.
The task() function below implements this.
1 2 3 4 |
# task executed in a worker process def task(): # block for a fraction of a second sleep(random()) |
In the main process we can first create the multiprocessing pool with a default configuration.
We will use the context manager interface to ensure that the pool is closed once we are finished with it.
1 2 3 4 |
... # create the pool with Pool() as pool: # ... |
We can then issue 50 calls to our custom task() function and gather the AsyncResult objects in a list.
This can be achieved in a list comprehension.
1 2 3 |
... # issue many tasks results = [pool.apply_async(task) for _ in range(50)] |
Next, we can define a count variable for the number of remaining tasks and loop until this count is zero. Initially the number of remaining tasks equals the number of tasks that were issued.
1 2 3 4 5 |
... # report remaining tasks for a while count = len(results) while count: # ... |
Each iteration, we can first count the number of remaining tasks. This involves counting the number of tasks where the AsyncResult.ready() function returns False, indicating that the task is still running.
This can be achieved in a list comprehension that first collects a True value for all tasks that are still running, then sums these True values into account.
1 2 3 |
... # check all tasks and count the number that are not done count = sum([not r.ready() for r in results]) |
We can then report the number of tasks that remain running in the multiprocessing pool.
1 2 3 |
... # report the number of remaining tasks print(f'>{count}/{len(results)} tasks remain') |
Finally, we can block for a moment before checking the tasks again.
This is called a busy-wait loop.
1 2 3 |
... # wait a moment sleep(0.5) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of reporting the number of remaining tasks with apply_async() from time import sleep from random import random from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a fraction of a second sleep(random()) # protect the entry point if __name__ == '__main__': # create the pool with Pool() as pool: # issue many tasks results = [pool.apply_async(task) for _ in range(50)] # report remaining tasks for a while count = len(results) while count: # check all tasks and count the number that are not done count = sum([not r.ready() for r in results]) # report the number of remaining tasks print(f'>{count}/{len(results)} tasks remain') # wait a moment sleep(0.5) |
Running the example first starts the multiprocessing pool, then issues 50 tasks.
The tasks begin executing in the pool.
The main process then checks the status of all tasks, reports the value, then blocks.
This process is repeated until all tasks complete, e.g. are “ready“.
All tasks complete and the busy-wait loop exits. The pool is then closed automatically via the context manager interface.
An example output is listed below.
1 2 3 4 5 6 7 8 9 |
>50/50 tasks remain >46/50 tasks remain >41/50 tasks remain >29/50 tasks remain >21/50 tasks remain >15/50 tasks remain >6/50 tasks remain >2/50 tasks remain >0/50 tasks remain |
Next, let’s explore how we might report the number of remaining tasks with the imap_unordered() function.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Remaining Tasks with imap_unordered()
We can explore how to report the number of remaining tasks via the imap_unordered() function.
In this example we will again define a custom task that blocks for a fraction of a second to simulate work. We will then issue a large number of tasks via the imap_unordered() then iterate the return values in the order that tasks are completed. As our task does not have a return value, it will be ignored and instead used as a prompt to dynamically report the status of the number of tasks that remain running in the pool.
Firstly, we can define the task to be executed in the pool.
The task blocks for a fraction of a second and does not return a value.
1 2 3 4 |
# task executed in a worker process def task(arg): # block for a fraction of a second sleep(random()) |
Next, we can create the multiprocessing pool using the default configuration.
We will use the context manager interface to ensure that the pool is closed correctly once we are finished with it.
1 2 3 4 |
... # create the pool with Pool() as pool: # ... |
Next, we will record the number of tasks that we will issue, which will help with reporting the number of remaining tasks later.
We will also initialize a variable to keep track of the number of tasks that have completed.
1 2 3 4 5 |
... # number of tasks n_tasks = 50 # keep track of the number of tasks completed completed = 0 |
Next, we will issue the tasks via imap_unordered().
This function returns an iterable that yields results in the order that tasks are completed, e.g. out of order compared to how they were issued.
We will look over the return values, None in this case, and use each completed task as a trigger to update the count of tasks completed. We will also report the number of tasks completed as the total tasks issued minus the number of tasks that have completed.
1 2 3 4 5 6 7 |
... # issue tasks and respond each time a task is completed for _ in pool.imap_unordered(task, range(50)): # updated completed tasks completed += 1 # report the number of remaining tasks print(f'>{(n_tasks-completed)}/{n_tasks} tasks remain') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of reporting the number of remaining tasks with imap_unordered() from time import sleep from random import random from multiprocessing.pool import Pool # task executed in a worker process def task(arg): # block for a fraction of a second sleep(random()) # protect the entry point if __name__ == '__main__': # create the pool with Pool() as pool: # number of tasks n_tasks = 50 # keep track of the number of tasks completed completed = 0 # issue tasks and respond each time a task is completed for _ in pool.imap_unordered(task, range(50)): # updated completed tasks completed += 1 # report the number of remaining tasks print(f'>{(n_tasks-completed)}/{n_tasks} tasks remain') |
Running the example first starts the multiprocessing pool, then issues tasks one-by-one as workers become available in the pool.
The tasks begin executing in the pool.
The main process loops over return values as tasks complete and updates the count of completed tasks and the total tasks remaining.
Unlike the previous example, in this case the status of the number of remaining tasks is updated only as tasks are completed and after each individual task completes, giving a fine grained indication of the tasks that remain.
Once all tasks have completed, the loop exits and the multiprocessing pool is closed automatically by the context manager.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
>49/50 tasks remain >48/50 tasks remain >47/50 tasks remain >46/50 tasks remain >45/50 tasks remain >44/50 tasks remain >43/50 tasks remain >42/50 tasks remain >41/50 tasks remain >40/50 tasks remain >39/50 tasks remain >38/50 tasks remain >37/50 tasks remain >36/50 tasks remain >35/50 tasks remain >34/50 tasks remain >33/50 tasks remain >32/50 tasks remain >31/50 tasks remain >30/50 tasks remain >29/50 tasks remain >28/50 tasks remain >27/50 tasks remain >26/50 tasks remain >25/50 tasks remain >24/50 tasks remain >23/50 tasks remain >22/50 tasks remain >21/50 tasks remain >20/50 tasks remain >19/50 tasks remain >18/50 tasks remain >17/50 tasks remain >16/50 tasks remain >15/50 tasks remain >14/50 tasks remain >13/50 tasks remain >12/50 tasks remain >11/50 tasks remain >10/50 tasks remain >9/50 tasks remain >8/50 tasks remain >7/50 tasks remain >6/50 tasks remain >5/50 tasks remain >4/50 tasks remain >3/50 tasks remain >2/50 tasks remain >1/50 tasks remain >0/50 tasks remain |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to report the number of remaining tasks in the multiprocessing pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jeremy Bishop on Unsplash
Do you have any questions?