Last Updated on October 29, 2022
You can wait for tasks issued to the ThreadPool to complete by calling wait() on the AsyncResult object or calling join() on the ThreadPool.
In this tutorial, you will discover how to wait for tasks to complete in the ThreadPool in Python.
Let’s get started.
Need Wait For All Tasks in the ThreadPool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may need to wait for all tasks to complete.
This may be for many reasons, such as:
- Waiting for all tasks to complete before issuing follow-up tasks.
- Waiting for all task results so that they can be combined or used.
- Waiting for all tasks to complete before continuing on with the program.
How can we wait for all tasks to complete in the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How to Wait For All Tasks to Finish
There are two ways that we can wait for tasks to finish in the ThreadPool.
They are:
- Wait for an asynchronous set of tasks to complete with the wait() method.
- Wait for all issued tasks to complete after shutdown with the join() method.
Let’s take a closer look at each approach.
How to Wait For All Tasks in a Batch
Tasks may be issued asynchronously to the ThreadPool.
This can be achieved using a method such as apply_async(), map_async(), and starmap_async(). These methods return an AsyncResult object.
We can wait for a single batch of tasks issued asynchronously to the ThreadPool to complete by calling the wait() method on the returned AsyncResult object.
For example:
1 2 3 4 5 |
... # issue tasks result = pool.map_async(...) # wait for issued tasks to complete result.wait() |
If multiple batches of asynchronous tasks are issued to the ThreadPool, we can collect the AsyncResult objects that are returned and wait on each in turn.
You can learn more about how to wait on the AsyncResult object in the tutorial:
How to Wait For All Tasks After Shutdown
We may issue many batches of asynchronous tasks to the ThreadPool and not hang onto the AsyncResult objects that are returned.
Instead, we can wait for all tasks in the ThreadPool to complete by first shutting down the ThreadPool, then joining it to wait for all issued tasks to be completed.
This can be achieved by first calling the close() method that will prevent any further tasks to be issued to the ThreadPool and close down the worker threads once all tasks are complete.
We can then call the join() method. This will block the caller until all tasks in the ThreadPool are completed and the worker threads in the ThreadPool have closed.
For example:
1 2 3 4 5 |
... # close the thread pool pool.close() # block until all tasks are complete and threads close pool.join() |
The downside of this approach is that we cannot issue tasks to the pool after it is closed. This approach can only be used once you know that you have no further tasks to issue to the ThreadPool.
You can learn more about joining the ThreadPool after shutdown in the tutorial:
Now that we know how to wait for all tasks in the ThreadPool to complete, let’s look at some worked examples.
Example of Waiting for All Tasks in a Batch
We can explore how to wait for a batch of issued tasks to complete in the ThreadPool.
In this example, we will define a task that blocks for a moment and then reports a message. From the main thread, we will issue a batch of tasks to the ThreadPool asynchronously. We will then explicitly wait on the batch of tasks to complete by waiting on the returned AsyncResult object.
Firstly, we can define a task to execute in the ThreadPool.
The custom function will take an integer argument, will block for a fraction of a second to simulate computational effort, then reports a message that the task is done.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in a worker thread def task(identifier): # block for a moment sleep(0.5) # report done print(f'Task {identifier} done') |
Next, in the main thread, we will create the ThreadPool with the default configuration.
We will use the context manager interface to ensure that the ThreadPool is closed automatically once we are finished with it.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We will then issue 10 tasks to the ThreadPool by calling our custom task() function with integers from 0 to 9. This can be achieved via the map_async() method.
1 2 3 |
... # issue tasks into the thread pool result = pool.map_async(task, range(10)) |
You can learn more about issuing asynchronous tasks to the ThreadPool with the map_async() method in the tutorial:
This returns an AsyncResult object on which we can call the wait() method.
This method call will block until all 10 tasks issued asynchronously are completed.
1 2 3 |
... # wait for tasks to complete result.wait() |
We can then report a message that the tasks are completed.
1 2 3 |
... # report all tasks done print('All tasks are done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of waiting for all tasks in a batch to finish from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # block for a moment sleep(0.5) # report done print(f'Task {identifier} done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks into the thread pool result = pool.map_async(task, range(10)) # wait for tasks to complete result.wait() # report all tasks done print('All tasks are done') # thread pool is closed automatically |
Running the example first creates the ThreadPool.
The ten tasks are then issued to the ThreadPool asynchronously. An AsyncResult object is returned and the main thread then blocks until the issued tasks are completed.
Each task is issued in the ThreadPool, first blocking for a fraction of a second, then printing a message.
All ten tasks are issued as a batch to the ThreadPool are completed, then the wait() method returns and the main thread continues on.
A final message is reported, then the ThreadPool is closed automatically via the context manager interface.
1 2 3 4 5 6 7 8 9 10 11 |
Task 0 done Task 5 done Task 2 done Task 3 done Task 6 done Task 7 done Task 1 done Task 4 done Task 8 done Task 9 done All tasks are done |
Next, let’s look at how we might wait for all tasks in the ThreadPool to be completed when shutting down the pool.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of Waiting for All Tasks After Shutdown
We can wait for all tasks issued to the ThreadPool to complete by shutting down the pool, then joining it.
In this example, we can update the previous example to issue two batches of tasks to the ThreadPool and not hang onto the AsyncResult objects that are returned. We can then close the ThreadPool and join it, blocking until all tasks issued to the ThreadPool have been completed.
This can be achieved by first issuing two batches of tasks to the pool via two calls to the map_async() method.
1 2 3 4 5 |
... # issue tasks into the thread pool _ = pool.map_async(task, range(5)) # issue more tasks into the thread pool _ = pool.map_async(task, range(5, 10)) |
Next, we can close the ThreadPool to prevent any further tasks to be issued to the pool and to shut down the worker threads once they are done.
1 2 3 |
... # shutdown the thread pool pool.close() |
We can then join the thread pool which will block until all tasks issued to the pool have been completed.
1 2 3 |
... # wait for tasks to complete pool.join() |
Finally, we can report a message that all tasks are completed.
1 2 3 |
... # report all tasks done print('All tasks are done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of waiting for all tasks after shutdown from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # block for a moment sleep(0.5) # report done print(f'Task {identifier} done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks into the thread pool _ = pool.map_async(task, range(5)) # issue more tasks into the thread pool _ = pool.map_async(task, range(5, 10)) # shutdown the thread pool pool.close() # wait for tasks to complete pool.join() # report all tasks done print('All tasks are done') # thread pool is closed automatically |
Running the example first creates the ThreadPool.
The five tasks are then issued to the ThreadPool asynchronously. An AsyncResult object is returned and ignored.
A further five tasks are then issued to the ThreadPool asynchronously, and again an AsyncResult object is returned and ignored.
The ThreadPool is then closed, preventing any further tasks from being issued to the pool and closing the worker threads once all tasks are completed. The main thread then blocks waiting for the ThreadPool to close completely.
Each task is issued in the ThreadPool, first blocking for a fraction of a second, then printing a message.
All ten tasks issued to the ThreadPool are completed and the worker threads in the pool terminate. The blocking call to the join() method returns and the main thread continues on.
A final message is reported.
1 2 3 4 5 6 7 8 9 10 11 |
Task 1 done Task 2 done Task 0 done Task 4 done Task 5 done Task 6 done Task 7 done Task 3 done Task 9 done Task 8 done All tasks are done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to wait for tasks to complete in the ThreadPool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Grant Durr on Unsplash
Do you have any questions?