ThreadPool Wait For All Tasks To Finish in Python
You can wait for tasks issued to the ThreadPool to complete by calling wait() on the AsyncResult object or calling join() on the ThreadPool.
In this tutorial, you will discover how to wait for tasks to complete in the ThreadPool in Python.
Let's get started.
Need Wait For All Tasks in the ThreadPool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
-- multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using methods such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the methods such as apply_async() and map_async().
When using the ThreadPool, we may need to wait for all tasks to complete.
This may be for many reasons, such as:
- Waiting for all tasks to complete before issuing follow-up tasks.
- Waiting for all task results so that they can be combined or used.
- Waiting for all tasks to complete before continuing on with the program.
How can we wait for all tasks to complete in the ThreadPool?
How to Wait For All Tasks to Finish
There are two ways that we can wait for tasks to finish in the ThreadPool.
They are:
- Wait for an asynchronous set of tasks to complete with the wait() method.
- Wait for all issued tasks to complete after shutdown with the join() method.
Let's take a closer look at each approach.
How to Wait For All Tasks in a Batch
Tasks may be issued asynchronously to the ThreadPool.
This can be achieved using a method such as apply_async(), map_async(), and starmap_async(). These methods return an AsyncResult object.
We can wait for a single batch of tasks issued asynchronously to the ThreadPool to complete by calling the wait() method on the returned AsyncResult object.
For example:
...
# issue tasks
result = pool.map_async(...)
# wait for issued tasks to complete
result.wait()
If multiple batches of asynchronous tasks are issued to the ThreadPool, we can collect the AsyncResult objects that are returned and wait on each in turn.
You can learn more about how to wait on the AsyncResult object in the tutorial:
How to Wait For All Tasks After Shutdown
We may issue many batches of asynchronous tasks to the ThreadPool and not hang onto the AsyncResult objects that are returned.
Instead, we can wait for all tasks in the ThreadPool to complete by first shutting down the ThreadPool, then joining it to wait for all issued tasks to be completed.
This can be achieved by first calling the close() method that will prevent any further tasks to be issued to the ThreadPool and close down the worker threads once all tasks are complete.
We can then call the join() method. This will block the caller until all tasks in the ThreadPool are completed and the worker threads in the ThreadPool have closed.
For example:
...
# close the thread pool
pool.close()
# block until all tasks are complete and threads close
pool.join()
The downside of this approach is that we cannot issue tasks to the pool after it is closed. This approach can only be used once you know that you have no further tasks to issue to the ThreadPool.
You can learn more about joining the ThreadPool after shutdown in the tutorial:
Now that we know how to wait for all tasks in the ThreadPool to complete, let's look at some worked examples.
Example of Waiting for All Tasks in a Batch
We can explore how to wait for a batch of issued tasks to complete in the ThreadPool.
In this example, we will define a task that blocks for a moment and then reports a message. From the main thread, we will issue a batch of tasks to the ThreadPool asynchronously. We will then explicitly wait on the batch of tasks to complete by waiting on the returned AsyncResult object.
Firstly, we can define a task to execute in the ThreadPool.
The custom function will take an integer argument, will block for a fraction of a second to simulate computational effort, then reports a message that the task is done.
The task() function below implements this.
# task executed in a worker thread
def task(identifier):
# block for a moment
sleep(0.5)
# report done
print(f'Task {identifier} done')
Next, in the main thread, we will create the ThreadPool with the default configuration.
We will use the context manager interface to ensure that the ThreadPool is closed automatically once we are finished with it.
...
# create and configure the thread pool
with ThreadPool() as pool:
# ...
You can learn more about the context manager interface in the tutorial:
We will then issue 10 tasks to the ThreadPool by calling our custom task() function with integers from 0 to 9. This can be achieved via the map_async() method.
...
# issue tasks into the thread pool
result = pool.map_async(task, range(10))
You can learn more about issuing asynchronous tasks to the ThreadPool with the map_async() method in the tutorial:
This returns an AsyncResult object on which we can call the wait() method.
This method call will block until all 10 tasks issued asynchronously are completed.
...
# wait for tasks to complete
result.wait()
We can then report a message that the tasks are completed.
...
# report all tasks done
print('All tasks are done')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of waiting for all tasks in a batch to finish
from time import sleep
from multiprocessing.pool import ThreadPool
# task executed in a worker thread
def task(identifier):
# block for a moment
sleep(0.5)
# report done
print(f'Task {identifier} done')
# protect the entry point
if __name__ == '__main__':
# create and configure the thread pool
with ThreadPool() as pool:
# issue tasks into the thread pool
result = pool.map_async(task, range(10))
# wait for tasks to complete
result.wait()
# report all tasks done
print('All tasks are done')
# thread pool is closed automatically
Running the example first creates the ThreadPool.
The ten tasks are then issued to the ThreadPool asynchronously. An AsyncResult object is returned and the main thread then blocks until the issued tasks are completed.
Each task is issued in the ThreadPool, first blocking for a fraction of a second, then printing a message.
All ten tasks are issued as a batch to the ThreadPool are completed, then the wait() method returns and the main thread continues on.
A final message is reported, then the ThreadPool is closed automatically via the context manager interface.
Task 0 done
Task 5 done
Task 2 done
Task 3 done
Task 6 done
Task 7 done
Task 1 done
Task 4 done
Task 8 done
Task 9 done
All tasks are done
Next, let's look at how we might wait for all tasks in the ThreadPool to be completed when shutting down the pool.
Example of Waiting for All Tasks After Shutdown
We can wait for all tasks issued to the ThreadPool to complete by shutting down the pool, then joining it.
In this example, we can update the previous example to issue two batches of tasks to the ThreadPool and not hang onto the AsyncResult objects that are returned. We can then close the ThreadPool and join it, blocking until all tasks issued to the ThreadPool have been completed.
This can be achieved by first issuing two batches of tasks to the pool via two calls to the map_async() method.
...
# issue tasks into the thread pool
_ = pool.map_async(task, range(5))
# issue more tasks into the thread pool
_ = pool.map_async(task, range(5, 10))
Next, we can close the ThreadPool to prevent any further tasks to be issued to the pool and to shut down the worker threads once they are done.
...
# shutdown the thread pool
pool.close()
We can then join the thread pool which will block until all tasks issued to the pool have been completed.
...
# wait for tasks to complete
pool.join()
Finally, we can report a message that all tasks are completed.
...
# report all tasks done
print('All tasks are done')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of waiting for all tasks after shutdown
from time import sleep
from multiprocessing.pool import ThreadPool
# task executed in a worker thread
def task(identifier):
# block for a moment
sleep(0.5)
# report done
print(f'Task {identifier} done')
# protect the entry point
if __name__ == '__main__':
# create and configure the thread pool
with ThreadPool() as pool:
# issue tasks into the thread pool
_ = pool.map_async(task, range(5))
# issue more tasks into the thread pool
_ = pool.map_async(task, range(5, 10))
# shutdown the thread pool
pool.close()
# wait for tasks to complete
pool.join()
# report all tasks done
print('All tasks are done')
# thread pool is closed automatically
Running the example first creates the ThreadPool.
The five tasks are then issued to the ThreadPool asynchronously. An AsyncResult object is returned and ignored.
A further five tasks are then issued to the ThreadPool asynchronously, and again an AsyncResult object is returned and ignored.
The ThreadPool is then closed, preventing any further tasks from being issued to the pool and closing the worker threads once all tasks are completed. The main thread then blocks waiting for the ThreadPool to close completely.
Each task is issued in the ThreadPool, first blocking for a fraction of a second, then printing a message.
All ten tasks issued to the ThreadPool are completed and the worker threads in the pool terminate. The blocking call to the join() method returns and the main thread continues on.
A final message is reported.
Task 1 done
Task 2 done
Task 0 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 3 done
Task 9 done
Task 8 done
All tasks are done
Takeaways
You now know how to wait for tasks to complete in the ThreadPool in Python.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.