Last Updated on September 12, 2022
You can wait for the first task to finish in the ThreadPoolExecutor by calling the wait() function with return_when set to FIRST_COMPLETED.
In this tutorial you will discover how to wait for the first result from a thread pool in Python.
Let’s get started.
Need to Wait For The First Result From ThreadPoolExecutor
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
There may be cases where you submit many tasks but are only interested in the first result.
This may be the case for many reasons, such as:
- Downloading the same file from multiple servers.
- Accessing the same data from multiple sources.
How can you submit multiple tasks to the ThreadPoolExecutor and stop when you get the first result?
Run loops using all CPUs, download your FREE book to learn how.
How to Wait For the First Result
You can get the first result from a collection of tasks by calling the wait() module function.
The wait() function will take a collection of Future objects as an argument.
Recall that a Future argument is returned from the ThreadPoolExecutor when calling submit() to start executing the task. You can collect the Future objects from all tasks submitted to the thread pool and provide them to the wait function.
You must also set the return_when argument to the constant FIRST_COMPLETED.
This ensures that the wait() function will return as soon as any task in the provided collection is completed.
The wait() function will return two sets of Future objects. The first set will contain those Future objects for tasks that are done (e.g. the first task in this case) and the second will contain Future objects for those tasks that are not yet completed.
1 2 3 |
... # get the first result from a collection of tasks done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
We can also set a timeout in seconds for how long we are willing to wait for the first result.
This can be set via the timeout argument when calling the wait() function. If the timeout is exceeded while waiting, an empty done set is returned which must be checked-for and handled.
1 2 3 |
... # get the first result from a collection of tasks with a timeout done, not_done = wait(futures, timeout=5, return_when=FIRST_COMPLETED) |
Once we have the first result from a collection of Future objects, we can retrieve it from the done set by calling the pop() function. We can then call the result() function on the Future object to get the value returned by the target task function.
1 2 3 4 5 |
... # get the first future from the done set future = done.pop() # get the result from the future result = future.result() |
Note, the first task may or may not have been completed successfully.
Calling the wait() function will return as soon as one task is done which may because of one of three reasons:
- The task was completed successfully.
- The task was cancelled.
- The task raised an exception that was not handled.
As such, you must be careful to check that the task was not cancelled and that an exception was not raised before accessing and using the result.
1 2 3 4 5 6 7 8 9 10 |
... # get the first future from the done set future = done.pop() # check that the task completed successfully if future.cancelled() or future.exception(): print('Task did not complete successfully') else: # get the result from the future result = future.result() # do something... |
Also note, that getting the result for the first task to complete does not stop the other tasks from running.
You may want to cancel all tasks that have not yet started running, for example:
1 2 3 4 |
... # cancel all tasks that have not yet started running for future in futures: future.cancel() |
You may also want to manually shutdown the thread pool, instead of using the context manager, and not wait for the remaining tasks to complete.
This can be achieved by setting the wait argument to the shutdown() function to False. You can also set the cancel_futures argument to the shutdown() function to True to also cancel all tasks that are not yet running.
1 2 3 |
... # shutdown the thread pool, cancel scheduled tasks and not wait for any tasks still running executor.shutdown(wait=False, cancel_futures=True) |
Now that we know how to wait for the first result from a collection of tasks, let’s look at a worked example.
Example of Waiting for the First Task to Finish
Let’s develop an example of getting the first result from a collection of tasks.
First, let’s develop a mock task that will sleep for a random fraction of ten seconds.
1 2 3 4 5 6 |
# custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' |
Next, we can start the thread pool using the context manager then submit 10 tasks and collect their Future objects into a list.
1 2 3 4 5 |
... # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] |
We can then pass the collection of Future objects to the wait() function and set the return_when argument to the constant FIRST_COMPLETED so that we get the Future for the first task to be marked done.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
Finally, we can retrieve the Future from the set of done tasks and the value from the first completed task.
1 2 3 4 5 6 |
... # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
Tying this all together, the complete example of submitting a collection of tasks to the ThreadPoolExecutor and waiting for the first result is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of waiting for the first result from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
Running the example first starts the thread pool and submits the ten tasks.
We then wait for the first result from the collection of tasks.
A task completes and we first retrieve the Future object from the set of done tasks, then the result from the Future object.
In this case the task with id=7 was the first complete that slept for a tenth of a second. Your results will vary given the use of the random numbers in the task.
We then close the thread pool and wait for all other tasks to complete before moving on.
1 |
Task=7: 0.01 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Waiting for the First Task to Finish With a Timeout
We can also add a timeout to the wait().
This can be helpful if we require a result but only have a limited number of seconds in which we can wait for the first result.
This can be achieved by passing a timeout argument to the wait() function.
For example, we may want to limit our wait time to a fraction of a second.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, timeout=0.05, return_when=FIRST_COMPLETED) |
If the timeout is exceeded before a task is completed, then the done set will be empty and we must check for this condition.
1 2 3 4 |
... # check if no results were received if len(done) == 0: print('No result!') |
Tying this together, the complete example of waiting for the first result with a timeout is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of waiting for the first result with a timeout from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, timeout=0.05, return_when=FIRST_COMPLETED) # check if no results were received if len(done) == 0: print('No result!') else: # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) |
In this case, the timeout is too short and no tasks were able to complete in time.
1 |
No result! |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Waiting for the First Task Then Cancel Remaining Tasks
We can update the first example to cancel any scheduled tasks and shutdown the thread pool without waiting for the running tasks to complete.
This requires that we create and shutdown the thread pool manually instead of using the context manager, for example:
1 2 3 4 5 6 |
... # start the thread pool executor = ThreadPoolExecutor(10) ... print('Got result, shutting down pool') executor.shutdown(wait=False, cancel_futures=True) |
The updated version of this example that cancels all scheduled tasks and shuts down the pool after we get the first result is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of waiting for the first result then shutting down the thread pool from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # start the thread pool executor = ThreadPoolExecutor(10) # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # shutdown the pool, cancel scheduled tasks and not wait for running tasks print('Got result, shutting down pool') executor.shutdown(wait=False, cancel_futures=True) # get the future from the done set future = done.pop() # get the result from the first task to complete result = future.result() print(result) # do other things... |
Running the example creates the thread pool and submits the tasks as before.
This time, the thread pool is shut down as soon as we get the first result and we carry on with our program, processing the result.
Note, the main thread will not exit until all worker threads in the main thread have completed. It just so happens that our small example does not have much work to do after we get the result so we have to wait around a moment for the program to end.
1 2 |
Got result, shutting down pool Task=6: 0.09 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to wait for the first task to finish in the ThreadPoolExecutor in Python.
Do you have any questions about how to wait for the first task result?
Ask your question in the comments below and I will do my best to answer.
Photo by Didier Weemaels on Unsplash
raidgar98 says
Here is snippet for continous joining:
def run_in_thread_pool_executor(predicate: Callable[[Any], Any], iterable_args: list[Any], *, max_threads = (os.cpu_count() or 24)) -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
futures: list[concurrent.futures.Future] = []
for args in iterable_args:
futures.append(executor.submit(predicate, args))
start = time.perf_counter()
while len((tasks := concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED))[0]) > 0:
futures.pop(futures.index(tasks[0].pop()))
self.logger.debug(f"Joined next item after {(time.perf_counter() - start) :.4f} seconds...")
Jason Brownlee says
Great, thanks for sharing!