Last Updated on September 12, 2022
You can wait for the first result from the ProcessPoolExecutor by calling the wait() function with return_when set to FIRST_COMPLETED.
In this tutorial you will discover how to wait for the first result from a process pool in Python.
Let’s get started.
Need to Wait For The First Result
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
There may be cases where we submit many tasks but are only interested in the first result.
This may be the case for many reasons, such as:
- Downloading the same file from multiple servers.
- Accessing the same data from multiple sources.
How can you submit multiple tasks to the ProcessPoolExecutor and stop when you get the first result?
Run loops using all CPUs, download your FREE book to learn how.
How to Wait for the First Result
You can get the first result from many tasks by calling the wait() module function.
The wait() function will take a collection of Future objects as an argument.
Recall that a Future argument is returned from the ProcessPoolExecutor when calling submit() to start executing the task. You can collect the Future objects from all tasks submitted to the process pool and provide them to the wait() function.
You must also set the “return_when” argument to the constant FIRST_COMPLETED.
This ensures that the wait() function will return as soon as any task in the provided collection is completed.
The wait() function will return two sets of Future objects. The first set will contain those Future objects for tasks that are done (e.g. the first task in this case) and the second will contain Future objects for those tasks that are not yet completed.
1 2 3 |
... # get the first result from a collection of tasks done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
We can also set a timeout in seconds for how long we are willing to wait for the first result.
This can be set via the “timeout” argument when calling the wait() function. If the timeout is exceeded while waiting, an empty done set is returned which must be checked-for and handled.
1 2 3 |
... # get the first result from a collection of tasks with a timeout done, not_done = wait(futures, timeout=5, return_when=FIRST_COMPLETED) |
Once we have the first result from a collection of Future objects, we can retrieve it from the done set by calling the pop() function. We can then call the result() function on the Future object to get the value returned by the target task function.
1 2 3 4 5 |
... # get the first future from the done set future = done.pop() # get the result from the future result = future.result() |
Note, the first task may or may not have been completed successfully.
Calling the wait() function will return as soon as one task is “done” which may because of one of three reasons:
- The task was completed successfully.
- The task was cancelled.
- The task raised an exception that was not handled.
As such, you must be careful to check that the task was not cancelled and that an exception was not raised before accessing and using the result.
1 2 3 4 5 6 7 8 9 10 |
... # get the first future from the done set future = done.pop() # check that the task completed successfully if future.cancelled() or future.exception(): print('Task did not complete successfully') else: # get the result from the future result = future.result() # do something... |
Also note, that getting the result for the first task to complete does not stop the other tasks from running.
You may want to cancel all tasks that have not yet started running, for example:
1 2 3 4 |
... # cancel all tasks that have not yet started running for future in futures: future.cancel() |
You may also want to manually shutdown the process pool, instead of using the context manager, and not wait for the remaining tasks to complete.
This can be achieved by setting the “wait” argument to the shutdown() function to False. You can also set the “cancel_futures” argument to the shutdown() function to True to also cancel all tasks that are not yet running.
1 2 3 |
... # shutdown the process pool, cancel scheduled tasks and not wait for any tasks still running executor.shutdown(wait=False, cancel_futures=True) |
Now that we know how to wait for the first result from a collection of tasks, let’s look at a worked example.
Example of Waiting for the First Result
Let’s develop an example of getting the first result from a collection of tasks.
First, let’s develop a task that will block for a random fraction of ten seconds.
1 2 3 4 5 6 |
# custom task that will block for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' |
Next, we can start the process pool using the context manager then submit 10 tasks and collect their Future objects into a list.
1 2 3 4 5 |
... # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] |
We can then pass the collection of Future objects to the wait() function and set the return_when argument to the constant FIRST_COMPLETED so that we get the Future for the first task to be marked “done“.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
Finally, we can retrieve the Future from the set of done tasks and the value from the first completed task.
1 2 3 4 5 6 |
... # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
Tying this all together, the complete example of submitting a collection of tasks to the ProcessPoolExecutor and waiting for the first result is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of waiting for the first result from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
Running the example first starts the process pool and submits the ten tasks.
We then wait for the first result from the collection of tasks.
A task completes and we first retrieve the Future object from the set of done tasks, then the result from the Future object.
In this case the task with id=7 was the first complete that slept for a tenth of a second. Your results will vary given the use of random numbers in the task.
We then close the process pool and wait for all other tasks to complete before moving on.
1 |
Task=7: 0.01 |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Waiting for the First Result with a Timeout
We can also add a timeout when calling the wait() function.
This can be helpful if we require a result but only have a limited number of seconds in which we can wait for the first result.
This can be achieved by passing a “timeout” argument to the wait() function.
For example, we may want to limit our wait time to a fraction of a second.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, timeout=0.05, return_when=FIRST_COMPLETED) |
If the timeout is exceeded before a task is completed, then the done set will be empty and we must check for this condition.
1 2 3 4 |
... # check if no results were received if len(done) == 0: print('No result!') |
Tying this together, the complete example of waiting for the first result with a timeout is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of waiting for the first result with a timeout from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, timeout=0.05, return_when=FIRST_COMPLETED) # check if no results were received if len(done) == 0: print('No result!') else: # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
In this case, the timeout is too short and no tasks were able to complete in time.
1 |
No result! |
We can update the example to cancel any scheduled tasks and shutdown the process pool without waiting for the running tasks to complete.
This requires that we create and shutdown the process pool manually instead of using the context manager, for example:
1 2 3 4 5 6 |
... # start the process pool executor = ProcessPoolExecutor(10) ... print('Got result, shutting down pool') executor.shutdown(wait=False, cancel_futures=True) |
The updated version of this example that cancels all scheduled tasks and shuts down the pool after we get the first result is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of waiting for the first result then shutting down the process pool from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # entry point if __name__ == '__main__': # start the process pool executor = ProcessPoolExecutor(10) # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # shutdown the pool, cancel scheduled tasks and not wait for running tasks print('Got result, shutting down pool') executor.shutdown(wait=False, cancel_futures=True) # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) # do other things... |
Running the example creates the process pool and submits the tasks as before.
This time, the process pool is shut down as soon as we get the first result and we carry on with our program, processing the result.
Note, the main process will not exit until all worker processes in the main process have completed. It just so happens that our small example does not have much work to do after we get the result so we have to wait around a moment for the program to end.
1 2 |
Got result, shutting down pool Task=6: 0.09 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to wait for the first results from the ProcessPoolExecutor in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Marcus Zymmer on Unsplash
Do you have any questions?