Last Updated on September 12, 2022
Use wait() when waiting for one or all tasks to complete and use as_completed() when you need results as they are available when using the ProcessPoolExecutor in Python.
In this tutorial you will discover when to use wait() and as_completed() for asynchronous tasks executed by processes pools in Python.
Let’s get started.
Use wait() to Wait for Tasks To Complete with the ProcessPoolExecutor
Use the wait() module function to wait for one or all tasks to complete.
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
1 2 3 |
... # submit a task and get a future future = executor.submit(task) |
If you call submit multiple times for different tasks or different arguments to the same task, you can do so in a loop and store all of the Future objects in a collection for later use.
For example, it is common to use a list comprehension.
1 2 3 |
... # create many tasks and store the future objects in a list futures = [executor.submit(work) for _ in range(100)] |
You can call the wait() module function and pass it a collection of Future objects created with calls to the submit() function.
By default, the will wait automatically for all tasks in the collection of Future objects to complete before returning.
1 2 3 |
... # wait for all tasks to complete before returning wait(futures) # blocks |
The “return_when” argument to the wait() function controls the condition to wait for. By default, it is set to the constant ALL_COMPLETED which waits for all tasks related to the provided Future objects to complete before returning.
1 2 3 |
... # wait for all tasks to complete before returning wait(futures, return_when=ALL_COMPLETED) |
It can also be set to the value FIRST_COMPLETED to return when any task completes, and to FIRST_EXCEPTION to return when any task raises an exception or if all tasks are completed without raising an exception.
1 2 3 |
... # wait until any task is completed wait(futures, return_when=FIRST_COMPLETED) |
The wait() function returns a tuple of two set objects. The first contains all of the Future objects that are done and the second contains all of the Future objects that are not done.
If the “return_when” is set to ALL_COMPLETED, then the “done” set will contain all Future objects. Whereas if “return_when” is set to FIRST_COMPLETED, then the “done” set will contain the single Future object for the task that was completed first.
1 2 3 |
... # wait for all tasks to complete before returning done, not_done = wait(futures) |
We may want to limit how long we are willing to wait for one or all tasks to be done.
This can be achieved by setting the “timeout” argument to a number of seconds. If the time elapses while waiting, the function will return with empty done and not_done sets.
1 2 3 |
... # wait a for a fixed time for tasks to be done done, not_done = wait(futures, timeout=5) |
Now that we know how to use the wait() function, let’s look at a concrete example.
The example below demonstrates how to use the wait() function to wait for all tasks to complete.
Note that we are intentionally ignoring the return from calling wait() as we have no need to inspect it in this case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of waiting for all tasks in a collection of tasks to be done from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete wait(futures) print('All tasks are done!') |
Running the example, we can see that results are handled by each task as the tasks complete.
Importantly, we can see that the main process waits until all tasks are completed before carrying on and printing a message.
1 2 3 4 5 6 7 8 9 10 11 |
3 9 0 8 4 6 2 1 5 7 All tasks are done! |
Let’s take a look at another example.
The example below demonstrates the pattern of calling wait() and waiting for the first task (any task) in the provided collection to be done.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of waiting for the first task in a collection of tasks to be done from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # get the first task to complete future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
Running the example will wait for any of the tasks to complete, then retrieve the result of the first completed task and shutdown the process pool.
1 |
9 |
Now that we are familiar with how to use the wait() function, let’s look at how to use the as_completed() function.
Run loops using all CPUs, download your FREE book to learn how.
Use as_completed() to Get Results As Tasks Complete with the ProcessPoolExecutor
Use the as_completed() module function to get results for tasks as they complete.
Recall that we receive a Future object for each call to the submit() function on the ProcessPoolExecutor.
1 2 3 |
... # submit a task and get a future future = executor.submit(task) |
We call the submit function repeatedly and store all of the Future objects in a collection. If we are calling the same target function multiple times, then it is common to use a list comprehension to build a collection of Future objects.
1 2 3 |
... # create many tasks and store the future objects in a list futures = [executor.submit(work) for _ in range(100)] |
We can call the as_completed() function and pass it a collection of Future objects in order to wait for tasks to be done.
The as_completed() will return an iterator over the same Future objects that were provided, but they will be in the order that the tasks are completed, not the order that they were provided or the order they were submitted to the ProcessPoolExecutor.
1 2 3 |
... # get an iterator for futures in the order they are completed iterator = as_completed(futures) |
This iterator can then be used in a for-loop to get the Future and in turn the result of the associated task in the order that they are completed.
This allows the program to be more responsive than if it was to process tasks in the order they were submitted to the process pool or if it was not able to process results until all tasks were completed.
1 2 3 4 5 |
... # process tasks in the order they are completed for future in as_completed(futures): # get the result result = future.result() |
The example below demonstrates the as_completed() function, submitting the tasks in order from 0 to 9 and showing results in the order that they were completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of waiting for tasks using the as_completed function from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results as they are available for future in as_completed(futures): # retrieve the result print(future.result()) |
Running the example we can see that the results are retrieved and printed in the order that the tasks completed, not the order that the tasks were submitted to the process pool.
1 2 3 4 5 6 7 8 9 10 |
6 9 0 1 3 8 4 7 2 5 |
We may want to limit how long we are willing to wait for a result.
This can be achieved by setting the “timeout” argument to a number of seconds.
1 2 3 |
... # set a timeout while waiting for results in completion order iterator = as_completed(futures, timeout=5) |
If the timeout elapses before a result is received, then a TimeoutError is raised that may need to be handled.
1 2 3 4 5 6 7 8 |
... # handle a timeout waiting for a results try: # get results in completion order with a timeout iterator = as_completed(futures, timeout=5) # do things... except TimeoutError: # handle the timeout |
If we only require the result from the first task to complete, we can call next() on the iterator directly.
1 2 3 4 5 |
... # get an iterator for futures in the order they are completed iterator = as_completed(futures) # get the first task to complete future = next(iterator) |
We can update the previous example to get the first task to complete and report its result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of waiting for the first task using the as_completed function from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # get an iterator for tasks in completion order iterator = as_completed(futures) # get the first task to complete future = next(iterator) # retrieve the result print(future.result()) |
Running the example creates the process pool and submits all tasks as we did before.
We then get an iterator over the Future objects in the order of their completion, then retrieve just the first task to complete and report its result.
1 |
8 |
Now that we are familiar with the wait() and as_completed() module functions, let’s take a closer look at when we might use each.
When to Use wait() vs as_completed() with the ProcessPoolExecutor
Let’s compare the wait() and as_completed() functions when using the ProcessPoolExecutor.
Both functions have a lot in common, such as:
- Both help you wait for tasks to be done.
- Both can help you wait for all tasks to be done, or the first task to be done.
- Both require you to have called submit() to create Future objects.
- Both functions are optional, e.g. you don’t have to use them when executing asynchronous tasks.
The as_completed() function is simpler.
It is designed specifically for processing tasks in the order of their completion.
You should use it if:
- You need task results in the order that tasks are completed.
You should not use it if:
- You need to wait for all tasks to be done.
- You need to wait for any task to be done.
- You need to wait for any task to raise an exception.
Although you can achieve these outcomes with a call to the as_completed() function, it requires more code and is more prone to bugs.
Additionally, the as_completed() function does not let you process tasks in the order that they were submitted to the process pool.
To do this you can use the map() function instead of the submit() function to issue tasks to the process pool. Alternately, you can iterate over the collection of Future objects in the order you submitted them to the pool, for example:
1 2 3 4 5 |
... # iterate future objects in the order they were submitted for future in futures: # get the result result = future.result() |
The wait() function offers more flexibility.
It is designed for you to wait for a specified condition in a collection of tasks, such as any task being done, any task raising an exception or all tasks being done.
You should use it if:
- You need to wait for all tasks to be done.
- You need to wait for any task to be done.
- You need to wait for any task to raise an exception.
You should not use it if:
- You need task results in the order that tasks are completed.
The wait() function will not help you wait for all tasks in the process pool, only those tasks for which you have Future objects and have passed to the wait function.
If you need to wait for all tasks in the process pool to complete, you can call the shutdown() function on the process pool, which by default will not cancel scheduled tasks and will return once all scheduled and running tasks have completed.
Now that we have compared and contrasted the wait() and as_completed() functions, which will you use?
Let me know in the comments below.
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know when to use wait() and as_completed() when executing tasks with the ProcessPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jason Rosewell on Unsplash
Ruslan says
what if i modify the list of futures i am waiting on either using the as_completed function or the wait?
will added futures, for example, be waited upon or just those that were there when the function was called?
what is the better way around such case?
Jason Brownlee says
Great question, I’ll investigate and get back to you.