You can wait for tasks issued to the multiprocessing pool to complete by calling AsyncResult.wait() or calling Pool.join().
In this tutorial you will discover how to wait for tasks to complete in the process pool in Python.
Let’s get started.
Need Wait For All Tasks in the Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to wait for all tasks to complete.
This may be for many reasons, such as:
- Waiting for all tasks to complete before issuing follow-up tasks.
- Waiting for all task results so that they can be combined or used.
- Waiting for all tasks to complete before continuing on with the program.
How can we wait for all tasks to complete in the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Wait For All Tasks to Finish
There are two ways that we can wait for tasks to finish in the multiprocessing.pool.Pool.
They are:
- Wait for an asynchronous set of tasks to complete with the wait() function.
- Wait for all issued tasks to complete after shutdown with the join() function.
Let’s take a closer look at each approach.
How to Wait For All Tasks in a Batch
Tasks may be issued asynchronously to the process pool.
This can be achieved using a function such as apply_async(), map_async(), and starmap_async(). These functions return an AsyncResult object.
We can wait for a single batch of tasks issued asynchronously to the process pool to complete by calling the wait() function on the returned AsyncResult object.
For example:
1 2 3 4 5 |
... # issue tasks result = pool.map_async(...) # wait for issued tasks to complete result.wait() |
If multiple batches of asynchronous tasks are issued to the process pool, we can collect the AsyncResult objects that are returned and wait on each in turn.
You can learn more about how to wait on the AsyncResult object in the tutorial:
How to Wait For All Tasks After Shutdown
We may issue many batches of asynchronous tasks to the process pool and not hang onto the AsyncResult objects that are returned.
Instead, we can wait for all tasks in the process pool to complete by first shutting down the process pool, then joining it to wait for all issued tasks to be completed.
This can be achieved by first calling the close() function that will prevent any further tasks to be issued to the process pool and close down the worker processes once all tasks are complete.
We can then call the join() function. This will block the caller until all tasks in the process pools are completed and the worker child processes in the process pool have closed.
For example:
1 2 3 4 5 |
... # close the process pool pool.close() # block until all tasks are complete and processes close pool.join() |
The downside of this approach is that we cannot issue tasks to the pool after it is closed. This approach can only be used once you know that you have no further tasks to issue to the process pool.
You can learn more about joining the process pool after shutdown in the tutorial:
Now that we know how to wait for all tasks in the process pool to complete, let’s look at some worked examples.
Example of Waiting for All Tasks in a Batch
We can explore how to wait for a batch of issued tasks to complete in the process pool.
In this example we will define a task that blocks for a moment and then reports a message. From the main process, we will issue a batch of tasks to the process pool asynchronous. We will then explicitly wait on the batch of tasks to complete by waiting on the returned AsyncResult object.
Firstly, we can define a task to execute in the process pool.
The custom function will take an integer argument, will block for a fraction of a second to simulate computational effort, then reports a message that the task is done.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in a worker process def task(identifier):     # block for a moment     sleep(0.5)     # report done     print(f'Task {identifier} done', flush=True) |
Next, in the main process we will create the process pool with the default configuration.
We will use the context manager interface to ensure that the process pool is closed automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We will then issue 10 tasks to the process pool by calling our custom task() function with integers from 0 to 9. This can be achieved via the map_async() function.
1 2 3 |
... # issue tasks into the process pool result = pool.map_async(task, range(10)) |
You can learn more about issuing asynchronous tasks to the process pool with the map_async() function in the tutorial:
This returns an AsyncResult object on which we can call the wait() function.
This function call will block until all 10 tasks issued asynchronously are completed.
1 2 3 |
... # wait for tasks to complete result.wait() |
We can then report a message that the tasks are completed.
1 2 3 |
... # report all tasks done print('All tasks are done', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of waiting for all tasks in a batch to finish from time import sleep from multiprocessing.pool import Pool  # task executed in a worker process def task(identifier):     # block for a moment     sleep(0.5)     # report done     print(f'Task {identifier} done', flush=True)  # protect the entry point if __name__ == '__main__':     # create and configure the process pool     with Pool() as pool:         # issue tasks into the process pool         result = pool.map_async(task, range(10))         # wait for tasks to complete         result.wait()         # report all tasks done         print('All tasks are done', flush=True)     # process pool is closed automatically |
Running the example first creates the process pool.
The ten tasks are then issued to the process pool asynchronously. An AsyncResult object is returned and the main process then blocks until the issued tasks are completed.
Each task is issued in the process pool, first blocking for a fraction of a second, then printing a message.
All ten tasks issued as a batch to the process pool complete, then wait() function returns and the main process continues on.
A final message is reported, then the process pool is closed automatically via the context manager interface.
1 2 3 4 5 6 7 8 9 10 11 |
Task 1 done Task 0 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 9 done Task 8 done All tasks are done |
Next, let’s look at how we might wait for all tasks in the process pool to complete when shutting down the pool.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Waiting for All Tasks After Shutdown
We can wait for all tasks issued to the process pool to complete by shutting down the pool, then joining it.
In this example, we can update the previous example to issue two batches of tasks to the process pool and not hang onto the AsyncResult objects that are returned. We can then close the process pool and join it, blocking until all tasks issued to the process pool have completed.
This can be achieved by first issuing two batches of tasks to the pool via two calls to the map_async() function.
1 2 3 4 5 |
... # issue tasks into the process pool _ = pool.map_async(task, range(5)) # issue more tasks into the process pool _ = pool.map_async(task, range(5, 10)) |
Next, we can close the process pool to prevent any further tasks to be issued to the pool and to shutdown the worker processes once they are done.
1 2 3 |
... # shutdown the process pool pool.close() |
We can then join the process pool which will block until all tasks issued to the pool have completed.
1 2 3 |
... # wait for tasks to complete pool.join() |
Finally, we can report a message that all tasks are completed.
1 2 3 |
... # report all tasks done print('All tasks are done', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of waiting for all tasks after shutdown from time import sleep from multiprocessing.pool import Pool  # task executed in a worker process def task(identifier):     # block for a moment     sleep(0.5)     # report done     print(f'Task {identifier} done', flush=True)  # protect the entry point if __name__ == '__main__':     # create and configure the process pool     with Pool() as pool:         # issue tasks into the process pool         _ = pool.map_async(task, range(5))         # issue more tasks into the process pool         _ = pool.map_async(task, range(5, 10))         # shutdown the process pool         pool.close()         # wait for tasks to complete         pool.join()         # report all tasks done         print('All tasks are done', flush=True)     # process pool is closed automatically |
Running the example first creates the process pool.
The five tasks are then issued to the process pool asynchronously. An AsyncResult object is returned and ignored.
A further five tasks are then issued to the process pool asynchronously, and again an AsyncResult object is returned and ignored.
The process pool is then closed preventing any further tasks from being issued to the pool and closing the worker processes once all tasks are completed. The main process then blocks waiting for the process pool to close completely.
Each task is issued in the process pool, first blocking for a fraction of a second, then printing a message.
All ten tasks issued to the process pool complete and the child worker processes in the pool terminate. The blocking call to the join() function returns and the main process continues on.
A final message is reported.
1 2 3 4 5 6 7 8 9 10 11 |
Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done All tasks are done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to wait for tasks to complete in the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by miguel sinisterra on Unsplash
Leave a Reply