Last Updated on October 29, 2022
You can get results from tasks in the ThreadPool using a callback or by calling AsyncResult.get().
In this tutorial, you will discover how to get results from tasks issued asynchronously to the ThreadPool in Python.
Let’s get started.
ThreadPool Asynchronous Tasks
We can execute tasks asynchronously using the ThreadPool.
The ThreadPool class provides three functions for issuing tasks asynchronously, they are:
- apply_async(): For executing one-off tasks.
- map_async(): For calling the same function multiple times with different arguments.
- starmap_async(): For calling the same function many times with more than one argument.
Each of these approaches for executing an asynchronous task in the ThreadPool returns immediately with an AsyncResult object.
This object provides a handle on the task or tasks and allows us to check on the status of the tasks and to get results.
How can we get results from asynchronous tasks?
Run loops using all CPUs, download your FREE book to learn how.
How to Get Results from Asynchronous Tasks
There are two ways that we can get results from asynchronous tasks executed with the ThreadPool.
They are:
- Use a result callback.
- Get results from the AsyncResult object.
Let’s take a closer look at each approach.
How to Get Results Via a Callback
We can get results from asynchronous tasks via a result callback.
A result callback can be specified via the “callback” argument.
The argument specifies the name of a custom function to call with the result from an asynchronous task or tasks.
The function may have any name you like, as long as it does not conflict with a function name already in use.
For example, if apply_async() is configured with a callback, then the callback function will be called with the return value of the task function that was executed.
1 2 3 4 5 6 7 |
# result callback function def result_callback(result): print(result) ... # issue a single task result = apply_async(..., callback=result_callback) |
Alternatively, if map_async() or starmap_async() are configured with a callback, then the callback function will be called with an iterable of return values from all tasks issued to the ThreadPool.
1 2 3 4 5 6 7 8 9 |
# result callback function def result_callback(result): # iterate all results for value in result: print(value) ... # issue a single task result = map_async(..., callback=result_callback) |
How to Get Results Via AsyncResult
We can get results from asynchronous tasks via the AsyncResult object.
An AsyncResult provides a handle on one or more issued tasks.
It allows the caller to check on the status of the issued tasks, wait for the tasks to complete, and get the results once tasks are completed.
We can get the result of an issued task by calling the AsyncResult.get() function.
This will return the result of the specific function called to issue the task.
- apply_async(): Returns the return value of the target function.
- map_async(): Returns an iterable over the return values of the target function.
- starmap_async(): Returns an iterable over the return values of the target function.
For example:
1 2 3 |
... # get the result of the task or tasks value = result.get() |
If the issued tasks have not yet been completed, then get() will block until the tasks are finished.
A “timeout” argument can be specified. If the tasks are still running and do not completed within the specified number of seconds, a multiprocessing.TimeoutError is raised.
For example:
1 2 3 4 5 6 |
... try: # get the task result with a timeout value = result.get(timeout=10) except multiprocessing.TimeoutError as e: # ... |
If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.
We may need to handle this case explicitly if we expect a task to raise an exception on failure.
For example:
1 2 3 4 5 6 |
... try: # get the task result that might raise an exception value = result.get() except Exception as e: # ... |
You can learn more about the AsyncResult object in the tutorial:
Now that we know how to get results from asynchronous tasks issued to the ThreadPool, let’s look at some worked examples.
How to Get Results From apply_async()
We can explore how to get results from tasks issued asynchronously with apply_async().
In this example, we define a simple task that generates and returns a number. We will then issue a task to execute this function and then get the result from the issued task.
Firstly, we can define a target task function.
The function will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the number that was generated.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed by thread worker def task(): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value return value |
Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.
1 2 3 4 |
... # create the thread pool with ThreadPool() as pool: # ... |
We will then issue a call to the task() function to be executed by the ThreadPool asynchronously via the apply_async() function.
1 2 3 |
... # issue a task asynchronously async_result = pool.apply_async(task) |
You can learn more about how to use the apply_async() function in the tutorial:
This call returns immediately with an AsyncResult object.
We then call the get() function to get the result from the task.
This will block until the task has been completed and then returns the return value from the task.
1 2 3 4 |
... # wait for the task to complete and get the return value print('Waiting for result...') result = async_result.get() |
Finally, we report the value retrieved from the task.
1 2 3 |
... # report the result print(f'Got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of getting a result from an asynchronous task issued with apply_async from time import sleep from random import random from multiprocessing.pool import ThreadPool # task executed by thread worker def task(): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create the thread pool with ThreadPool() as pool: # issue a task asynchronously async_result = pool.apply_async(task) # wait for the task to complete and get the return value print('Waiting for result...') result = async_result.get() # report the result print(f'Got: {result}') |
Running the example first creates the ThreadPool with a default configuration.
Next, the task is issued to the pool to be executed asynchronously.
An AsyncResult object is returned and the main thread blocks attempting to get the result.
The task executes, first generating a random number, blocking for a fraction of a second, then returning the number.
The main thread receives the result from the asynchronous task and reports the value.
Note, that the output will differ each time the program is run given the use of random numbers.
1 2 |
Waiting for result... Got: 0.45583546811729037 |
This example demonstrated how to get the result from a task executed asynchronously with apply_async().
Next, let’s look at how we might get the result from tasks executed with map_async().
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
How to Get Results From map_async()
We can explore how to get results from tasks issued asynchronously with map_async().
In this example, we define a simple task that takes an integer as an argument, generates a random number, then returns a combination of the argument with the generated number. We will then have the ThreadPool execute the function multiple times asynchronously with different arguments.
Firstly, we can define a target task function.
The function will take an integer as an argument. It will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the generated number added to the input argument.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a thread worker def task(arg): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg + value |
Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.
1 2 3 4 |
... # create the thread pool with ThreadPool() as pool: # ... |
We will then issue a call to the task() function multiple times with values between 0 and 4. The tasks are issued using map_async() to be executed asynchronously.
1 2 3 |
... # issue tasks asynchronously async_result = pool.map_async(task, range(5)) |
You can learn more about how to use the map_async() function in the tutorial:
This call returns immediately with an AsyncResult object.
We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.
This call will block until all tasks have been completed.
1 2 3 4 |
... # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() |
Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.
1 2 3 4 5 |
... # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of getting a result from an asynchronous task issued with map_async from time import sleep from random import random from multiprocessing.pool import ThreadPool # task executed in a thread worker def task(arg): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg + value # protect the entry point if __name__ == '__main__': # create the thread pool with ThreadPool() as pool: # issue tasks asynchronously async_result = pool.map_async(task, range(5)) # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Running the example first creates the ThreadPool with a default configuration.
Next, five tasks are issued to the pool to be executed asynchronously.
An AsyncResult object is returned and the main thread blocks attempting to get the iterable of return values.
The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.
The main thread receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.
Note, that the output will differ each time the program is run given the use of random numbers.
1 2 3 4 5 6 |
Waiting for results... Got: 0.8595136493881548 Got: 1.8697912864375383 Got: 2.5849814054642755 Got: 3.9306465445033094 Got: 4.084973998527668 |
This example demonstrates how to get results from tasks executed asynchronously with map_async().
Next, let’s look at how we might get the result from tasks executed with starmap_async().
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Get Result From starmap_async()
We can explore how to get results from tasks issued asynchronously with starmap_async().
In this example, we define a simple task that takes three integers as an argument, generates a random number, then returns a combination of the input arguments with the generated number. We will then have the ThreadPool execute the function multiple times asynchronously with different arguments.
Firstly, we can define a target task function.
The function will take three integers as arguments. It will generate a random number between 0 and 1. It then blocks for a fraction of a second to simulate computational effort, then adds the generated number and all arguments and returns the sum.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a thread worker def task(arg1, arg2, arg3): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg1 + arg2 + arg3 + value |
Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.
1 2 3 4 |
... # create the thread pool with ThreadPool() as pool: # ... |
Next, we prepare arguments for the starmap() function.
In this case, we create a list of five tuples. Each tuple has an integer argument between 0 and 4, then the square and the cube of each integer.
1 2 3 |
... # prepare arguments args = [(i,i*2, i*3) for i in range(5)] |
We will then issue a call to the task() function multiple times with the prepared arguments. The tasks are issued using starmap_async() to be executed asynchronously.
1 2 3 |
... # issue tasks asynchronously async_result = pool.starmap_async(task, args) |
You can learn more about how to use the starmap_async() function in the tutorial:
This call returns immediately with an AsyncResult object.
We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.
This call will block until all tasks have been completed.
1 2 3 4 |
... # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() |
Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.
1 2 3 4 5 |
... # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of getting a result from an asynchronous task issued with starmap_async from time import sleep from random import random from multiprocessing.pool import ThreadPool # task executed in a thread worker def task(arg1, arg2, arg3): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg1 + arg2 + arg3 + value # protect the entry point if __name__ == '__main__': # create the thread pool with ThreadPool() as pool: # prepare arguments args = [(i,i*2, i*3) for i in range(5)] # issue tasks asynchronously async_result = pool.starmap_async(task, args) # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Running the example first creates the ThreadPool with a default configuration.
Next, arguments are prepared and five tasks are issued to the pool to be executed asynchronously.
An AsyncResult object is returned and the main thread blocks attempting to get the iterable of return values.
The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.
The main thread receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.
Note, the output will differ each time the program is run given the use of random numbers.
1 2 3 4 5 6 |
Waiting for results... Got: 0.68185718775623 Got: 6.432202267794842 Got: 12.449108841533565 Got: 18.551857634870373 Got: 24.583687905402517 |
This example demonstrates how to get results from tasks executed asynchronously with starmap_async().
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to get results from asynchronous tasks executed with the ThreadPool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Alec Douglas on Unsplash
Do you have any questions?