Last Updated on June 29, 2023
You can get results for tasks as they are completed with the ThreadPoolExecutor by calling the as_completed() module function.
In this tutorial, you will discover how to get results from tasks as they are completed in the Python thread pool.
Let’s get started.
Need to Get Results as Tasks Are Completed in ThreadPoolExecutor
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
In some applications, you may need to process results from tasks in the order that tasks finish their execution, and not in the order that tasks were submitted to the thread pool.
This will allow your program to be more responsive to the tasks and perhaps show progress and results to the user sooner than process results in the order that tasks were submitted.
How do you process results from the ThreadPoolExecutor in the order that tasks are completed?
Run loops using all CPUs, download your FREE book to learn how.
How to Get Results as Tasks Are Completed
You can get results from the ThreadPoolExecutor in the order that tasks are completed by calling the as_completed() module function.
The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.
Recall that when you submit tasks to the ThreadPoolExecutor via a call to the submit() function that you will receive a Future object for the task in return. These Future objects can be added to a list or similar collection and provided to the as_completed() function.
The common idiom for using the as_completed() function is in a for-loop; for example:
1 2 3 4 |
... # iterate future objects as they are completed for future in as_completed(futures): # do something... |
You can also set a timeout for how long you are willing to wait for the first result.
This can be achieved by setting the timeout argument and specifying a number of seconds.
If the first task takes more than the timeout number of seconds to complete after calling the as_completed() function, then a TimeoutError will be raised that may need to be handled.
1 2 3 4 5 6 7 |
... try: # iterate future objects with a timeout on the first task completing for future in as_completed(futures, timeout=5): # do something... except TimeoutError: # handle error |
Note: if duplicate future objects are provided to the as_completed(), then only distinct instances are considered.
Also, note that a task is “completed” for one of three reasons; they are:
- The task was completed successfully.
- The task was cancelled.
- The task raised an exception that was not handled.
The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.
Now that we know how to get results in the order that tasks are completed in the ThreadPoolExecutor, let’s look at a worked example.
Example of Getting Results as Tasks Are Completed
Let’s explore how we can get results from tasks as they are completed in the ThreadPoolExecutor with a worked example.
First, let’s define a task that sleeps for a random fraction of a second.
The task takes a unique name as an argument, then generates a random number between 0 and 1. The task then sleeps for the generated random number of seconds, then returns a custom string result with the task name and variable sleep amount.
1 2 3 4 5 6 |
# custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' |
Next, we can start the thread pool using the context manager so that it is closed automatically for us when we are finished with it.
We will create a pool of ten worker threads, one for each task we wish to execute. We can then add ten tasks to the thread pool.
This is achieved by calling the submit() function, specifying the name of the target task function and an argument. Each call to submit() returns a Future object that we will need later to pass to the as_completed() function. As such, we submit the tasks in a list comprehension, which gives us a list of ten Future objects.
1 2 3 4 5 |
... # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] |
Next, we can wait for the tasks to complete and get the Future objects in the order that the tasks are completed, not the order that we submitted the tasks.
We do this in a for-loop, getting each Future object as they are available, retrieving the result, and reporting it with a print statement.
1 2 3 4 5 6 |
... # process task results as they are available for future in as_completed(futures): # retrieve the result result = future.result() print(result) |
Tying this together, the complete example of getting results from the ThreadPoolExecutor in the order that tasks are completed is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of getting results for tasks as they are completed from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results as they are available for future in as_completed(futures): # retrieve the result result = future.result() print(result) |
Running the example first starts the thread pool and submits the ten tasks.
The tasks start executing and we iterate over the results in the order that tasks are completed.
We can see from the results that tasks are completed in a random order, different from the order that we submitted them.
1 2 3 4 5 6 7 8 9 10 |
Task=3: 0.07 Task=2: 0.11 Task=5: 0.14 Task=4: 0.51 Task=0: 0.56 Task=9: 0.67 Task=1: 0.80 Task=7: 0.90 Task=8: 0.91 Task=6: 0.92 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Getting Results as Tasks Are Completed With Timeout
We might also like to add a timeout in seconds to wait before getting the first result.
This can be achieved by setting the timeout argument on the call to as_completed() to a number of seconds.
1 2 3 4 5 6 |
... # process task results as they are available for future in as_completed(futures, timeout=0.05): # retrieve the result result = future.result() print(result) |
We’d like to handle the timeout if it occurs.
This can be achieved by wrapping the call to the as_completed() function with a try-except block and catching a TimeoutError.
1 2 3 4 5 6 7 8 9 |
# handle a timeout waiting for the first result try: # process task results as they are available for future in as_completed(futures, timeout=0.05): # retrieve the result result = future.result() print(result) except TimeoutError: print('Timed out waiting for a result') |
Tying this together, the complete example of processing task results as they are completed with a timeout is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of getting results for tasks as they are completed with a timeout from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from concurrent.futures import TimeoutError # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second value = random() sleep(value * 10) return f'Task={name}: {value:.2f}' # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # handle a timeout waiting for the first result try: # process task results as they are available for future in as_completed(futures, timeout=0.05): # retrieve the result result = future.result() print(result) except TimeoutError: print('Timed out waiting for a result') |
Running the example, in this case, we can see that we time out before any of the tasks completed and provided a result.
1 |
Timed out waiting for a result |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to get results from the ThreadPoolExecutor as tasks are completed.
Do you have any questions about how to get results as tasks are completed?
Ask your question in the comments below and I will do my best to answer.
Photo by Martin Magnemyr on Unsplash
gerbs says
There’s incomplete sentence: “This means that you may need to check the status of each task via its”
For the rest of your web…GREAT JOB, CONGRATULATIONS AND THANK YOU!
Jason Brownlee says
Thanks, fixed,
Thank you for your support and kind words!