Last Updated on September 12, 2022
You can get results for tasks as they are completed with the ProcessPoolExecutor by calling the as_completed() module function.
In this tutorial you will discover how to get results from tasks as they are completed in the Python process pool.
Let’s get started.
Need to Get Results As Tasks Are Completed
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute in another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
In some applications you may need to process results from tasks in the order that tasks finish their execution, and not in the order that tasks were submitted to the process pool.
This will allow your program to be more responsive to the tasks and perhaps show progress and results to the user sooner than process results in the order that tasks were submitted.
How do you get results from the ProcessPoolExecutor in the order that tasks are completed?
Run loops using all CPUs, download your FREE book to learn how.
How to Get Results As Tasks Are Completed
You can get results from the ProcessPoolExecutor in the order that tasks are completed by calling the as_completed() module function.
The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.
Recall that when you submit tasks to the ProcessPoolExecutor via a call to the submit() function, that you will receive a Future object for the task in return. These Future objects can be added to a list or similar collection and provided to the as_completed() function.
The common idiom for using the as_completed() function is in a for-loop, for example:
1 2 3 4 |
... # iterate future objects as they are completed for future in as_completed(futures): # do something... |
You can also set a timeout for how long you are willing to wait for the first result.
This can be achieved by setting the “timeout” argument and specifying a number of seconds.
If the first task takes more than the timeout number of seconds to complete after calling the as_completed() function, then a TimeoutError will be raised, that may need to be handled.
1 2 3 4 5 6 7 |
... try: # iterate future objects with a timeout on the first task completing for future in as_completed(futures, timeout=5): # do something... except TimeoutError: # handle error |
Note, if duplicates Future objects are provided to the as_completed() then only distinct instances are considered.
Also note that a task is “completed” for one of three reasons, they are:
- The task was completed successfully.
- The task was cancelled.
- The task raised an exception that was not handled.
This means that you may need to check the status of each task via its Future object as they are iterated.
Now that we know how to get results in the order that tasks are completed in the ProcessPoolExecutor, let’s look at a worked example.
Example of Getting Results As Tasks Are Completed
Let’s explore how we can get results from tasks as they are completed in the ProcessPoolExecutor with a worked example.
First, let’s define a task that blocks for a random fraction of a second.
The task takes a unique name as an argument, then generates a random number between 0 and 1. The task then sleeps for the generated random number of seconds then returns a custom string result with the task name and variable sleep amount.
1 2 3 4 5 6 |
# custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' |
Next, we can start the process pool using the context manager so that it is closed automatically for us when we are finished with it.
We will create a pool of ten worker processes, one for each task we wish to execute. We can then add ten tasks to the process pool.
This is achieved by calling the submit() function, specifying the name of the target task function and an argument.
Each call to submit() returns a Future object that we will need later to pass to the as_completed() function. As such, we submit the tasks in a list comprehension which gives us a list of ten Future objects.
1 2 3 4 5 |
... # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] |
Next, we can wait for the tasks to complete and get the Future objects in the order that the tasks are completed, not the order that we submitted the tasks.
We do this in a for-loop, getting each Future object as they are available, retrieving the result and reporting it with a print statement.
1 2 3 4 5 6 |
... # process task results as they are available for future in as_completed(futures): # retrieve the result result = future.result() print(result) |
Tying this together, the complete example of getting results from the ProcessPoolExecutor in the order that tasks are completed is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of getting results for tasks as they are completed from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results as they are available for future in as_completed(futures): # retrieve the result result = future.result() print(result) |
Running the example first starts the process pool and submits the ten tasks.
The tasks start executing and we iterate over the results in the order that tasks are completed.
We can see from the results that tasks are completed in a random order, different from the order that we submitted them.
1 2 3 4 5 6 7 8 9 10 |
Task=4: 0.10 Task=7: 0.17 Task=6: 0.35 Task=1: 0.37 Task=9: 0.48 Task=8: 0.70 Task=5: 0.72 Task=3: 0.73 Task=0: 0.80 Task=2: 0.98 |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Getting Results With A Timeout
We might also like to add a timeout in seconds to wait before getting the first result.
This can be achieved by setting the “timeout” argument on the call to the as_completed() function to a number of seconds.
1 2 3 4 5 6 |
... # process task results as they are available for future in as_completed(futures, timeout=0.05): # retrieve the result result = future.result() print(result) |
We’d like to handle the timeout if it occurs.
This can be achieved by wrapping the call to the as_completed() function with a try-except block and catching a TimeoutError.
1 2 3 4 5 6 7 8 9 10 |
... # handle a timeout waiting for the first result try: # process task results as they are available for future in as_completed(futures, timeout=0.05): # retrieve the result result = future.result() print(result) except TimeoutError: print('Timed out waiting for a result') |
Tying this together, the complete example of processing task results as they are completed with a timeout is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of getting results for tasks as they are completed with a timeout from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed from concurrent.futures import TimeoutError # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # handle a timeout waiting for the first result try: # process task results as they are available for future in as_completed(futures, timeout=0.05): # retrieve the result result = future.result() print(result) except TimeoutError: print('Timed out waiting for a result') |
Running the example, in this case we can see that we timeout before any of the tasks completed and provided a result.
Timed out waiting for a result
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to get results from the ProcessPoolExecutor as tasks are completed.
Do you have any questions?
Ask your question in the comments below and I will do my best to answer.
Do you have any questions?