Last Updated on September 12, 2022
You can get results from tasks in the multiprocessing pool using a callback or by calling AsyncResult.get().
In this tutorial you will discover how to get results from tasks issued asynchronously to the multiprocessing pool in Python.
Let’s get started.
Multiprocessing Pool Asynchronous Tasks
We can execute tasks asynchronously using the multiprocessing pool.
The multiprocessing.Pool class provides three functions for issuing tasks asynchronously, they are:
- Pool.apply_async(): For executing one-off tasks.
- Pool.map_async(): For calling the same function multiple times with different arguments.
- Pool.starmap_async(): For calling the same function many times with more than one argument.
Each of these approaches for executing an asynchronous task in the multiprocessing pool returns immediately with an AsyncResult object.
This object provides a handle on the task or tasks and allows us to check on the status of the tasks and to get results.
How can we get results from asynchronous tasks?
Run loops using all CPUs, download your FREE book to learn how.
How to Get Result from an Asynchronous Tasks
There are two ways that we can get results from asynchronous tasks executed with the multiprocessing pool.
They are:
- Use a result callback.
- Get results from the AsyncResult object.
Let’s take a closer look at each approach.
How to Get Result Via a Callback
We can get results from asynchronous tasks via a result callback.
A result callback can be specified via the “callback” argument.
The argument specifies the name of a custom function to call with the result of asynchronous task or tasks.
The function may have any name you like, as long as it does not conflict with a function name already in use.
For example, if apply_async() is configured with a callback, then the callback function will be called with the return value of the task function that was executed.
1 2 3 4 5 6 7 |
# result callback function def result_callback(result): print(result, flush=True) ... # issue a single task result = apply_async(..., callback=result_callback) |
Alternatively, if map_async() or starmap_async() are configured with a callback, then the callback function will be called with an iterable of return values from all tasks issued to the process pool.
1 2 3 4 5 6 7 8 9 |
# result callback function def result_callback(result): # iterate all results for value in result: print(value, flush=True) ... # issue a single task result = map_async(..., callback=result_callback) |
You can learn more about result callbacks in the tutorial:
How to Get Result Via AsyncResult
We can get results from asynchronous tasks via the AsyncResult object.
A AsyncResult provides a handle on one or more issued tasks.
It allows the caller to check on the status of the issued tasks, to wait for the tasks to complete, and to get the results once tasks are completed.
We can get the result of an issued task by calling the AsyncResult.get() function.
This will return the result of the specific function called to issue the task.
- apply_async(): Returns the return value of the target function.
- map_async(): Returns an iterable over the return values of the target function.
- starmap_async(): Returns an iterable over the return values of the target function.
For example:
1 2 3 |
... # get the result of the task or tasks value = result.get() |
If the issued tasks have not yet completed, then get() will block until the tasks are finished.
A “timeout” argument can be specified. If the tasks are still running and do not complete within the specified number of seconds, a multiprocessing.TimeoutError is raised.
For example:
1 2 3 4 5 6 |
... try: # get the task result with a timeout value = result.get(timeout=10) except multiprocessing.TimeoutError as e: # ... |
If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.
We may need to handle this case explicitly if we expect a task to raise an exception on failure.
For example:
1 2 3 4 5 6 |
... try: # get the task result that might raise an exception value = result.get() except Exception as e: # ... |
You can learn more about the AsyncResult object in the tutorial:
Now that we know how to get results from asynchronous tasks issued to the multiprocessing pool, let’s look at some worked examples.
How to Get Result From apply_async()
We can explore how to get results from tasks issued asynchronously with apply_async().
In this example we define a simple task that generates and returns a number. We will then issue a task to execute this function then get the result from the issued task.
Firstly, we can define a target task function.
The function will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the number that was generated.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a child process def task(): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value return value |
Next, in the main process, we will create a multiprocessing pool with the default configuration using the context manager interface.
1 2 3 4 |
... # create the process pool with Pool() as pool: # ... |
We will then issue a call to the task() function to be executed by the multiprocessing pool asynchronously via the apply_async() function.
1 2 3 |
... # issue a task asynchronously async_result = pool.apply_async(task) |
You can learn more about how to use the apply_async() function in the tutorial:
This call returns immediately with an AsyncResult object.
We then call the get() function to get the result from the task.
This will block until the task has completed and then returns the return value from the task.
1 2 3 4 |
... # wait for the task to complete and get the return value print('Waiting for result...') result = async_result.get() |
Finally, we report the value retrieved from the task.
1 2 3 |
... # report the result print(f'Got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of getting a result from an asynchronous task issued with apply_async from time import sleep from random import random from multiprocessing import Pool # task executed in a child process def task(): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create the process pool with Pool() as pool: # issue a task asynchronously async_result = pool.apply_async(task) # wait for the task to complete and get the return value print('Waiting for result...') result = async_result.get() # report the result print(f'Got: {result}') |
Running the example first creates the multiprocessing pool with a default configuration.
Next, the task is issued to the pool to be executed asynchronously.
An AsyncResult object is returned and the main process blocks attempting to get the result.
The task executes, first generating a random number, blocking for a fraction of a second, then returning the number.
The main process receives the result from the asynchronous task and reports the value.
Note, the output will differ each time the program is run given the use of random numbers.
1 2 |
Waiting for result... Got: 0.30244728699858925 |
This example demonstrated how to get the result from a task executed asynchronously with apply_async().
Next, let’s look at how we might get the result from tasks executed with map_async().
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
How to Get Result From map_async()
We can explore how to get results from tasks issued asynchronously with map_async().
In this example we define a simple task that takes an integer as an argument, generates a random number, then returns a combination of the argument with the generated number. We will then have the multiprocessing pool execute the function multiple times asynchronously with different arguments.
Firstly, we can define a target task function.
The function will take an integer as an argument. It will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the generated number added to the input argument.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a child process def task(arg): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg + value |
Next, in the main process, we will create a multiprocessing pool with the default configuration using the context manager interface.
1 2 3 4 |
... # create the process pool with Pool() as pool: # ... |
We will then issue a call to the task() function multiple times with values between 0 and 4. The tasks are issued using map_async() to be executed asynchronously.
1 2 3 |
... # issue tasks asynchronously async_result = pool.map_async(task, range(5)) |
You can learn more about how to use the map_async() function in the tutorial:
This call returns immediately with an AsyncResult object.
We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.
This call will block until all tasks have completed.
1 2 3 4 |
... # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() |
Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.
1 2 3 4 5 |
... # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of getting a result from an asynchronous task issued with map_async from time import sleep from random import random from multiprocessing import Pool # task executed in a child process def task(arg): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg + value # protect the entry point if __name__ == '__main__': # create the process pool with Pool() as pool: # issue tasks asynchronously async_result = pool.map_async(task, range(5)) # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Running the example first creates the multiprocessing pool with a default configuration.
Next, five tasks are issued to the pool to be executed asynchronously.
An AsyncResult object is returned and the main process blocks attempting to get the iterable of return values.
The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.
The main process receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.
Note, the output will differ each time the program is run given the use of random numbers.
1 2 3 4 5 6 |
Waiting for result... Got: 0.44714197382220844 Got: 1.8567341012371452 Got: 2.9308971103246804 Got: 3.8640584603527994 Got: 4.387599564850404 |
This example demonstrates how to get results from tasks executed asynchronously with map_async().
Next, let’s look at how we might get the result from tasks executed with starmap_async().
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Get Result From starmap_async()
We can explore how to get results from tasks issued asynchronously with starmap_async().
In this example we define a simple task that takes three integers as an argument, generates a random number, then returns a combination of the input arguments with the generated number. We will then have the multiprocessing pool execute the function multiple times asynchronously with different arguments.
Firstly, we can define a target task function.
The function will take three integers as arguments. It will generate a random number between 0 and 1. It then blocks for a fraction of a second to simulate computational effort, then adds the generated number and all arguments and returns the sum.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a child process def task(arg1, arg2, arg3): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg1 + arg2 + arg3 + value |
Next, in the main process, we will create a multiprocessing pool with the default configuration using the context manager interface.
1 2 3 4 |
... # create the process pool with Pool() as pool: # ... |
Next, we prepare arguments for the starmap() function.
In this case, we create a list of five tuples. Each tuple has an integer argument between 0 and 4, then the square and the cube of each integer.
1 2 3 |
... # prepare arguments args = [(i,i*2, i*3) for i in range(5)] |
We will then issue a call to the task() function multiple times with the prepared arguments. The tasks are issued using starmap_async() to be executed asynchronously.
1 2 3 |
... # issue tasks asynchronously async_result = pool.starmap_async(task, args) |
You can learn more about how to use the starmap_async() function in the tutorial:
This call returns immediately with an AsyncResult object.
We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.
This call will block until all tasks have completed.
1 2 3 4 |
... # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() |
Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.
1 2 3 4 5 |
... # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of getting a result from an asynchronous task issued with starmap_async from time import sleep from random import random from multiprocessing import Pool # task executed in a child process def task(arg1, arg2, arg3): # generate a random value between 0 and 1 value = random() # block for a moment sleep(value) # return the generated value combined with the argument return arg1 + arg2 + arg3 + value # protect the entry point if __name__ == '__main__': # create the process pool with Pool() as pool: # prepare arguments args = [(i,i*2, i*3) for i in range(5)] # issue tasks asynchronously async_result = pool.starmap_async(task, args) # wait for the task to complete and get the iterable of return values print('Waiting for results...') results = async_result.get() # iterate return values and report for result in results: # report the result print(f'Got: {result}') |
Running the example first creates the multiprocessing pool with a default configuration.
Next, arguments are prepared and five tasks are issued to the pool to be executed asynchronously.
An AsyncResult object is returned and the main process blocks attempting to get the iterable of return values.
The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.
The main process receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.
Note, the output will differ each time the program is run given the use of random numbers.
1 2 3 4 5 6 |
Waiting for results... Got: 0.5694834046113727 Got: 6.664069516764784 Got: 12.564221171514783 Got: 18.33154747747727 Got: 24.998680841635768 |
This example demonstrates how to get results from tasks executed asynchronously with starmap_async().
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to get results from asynchronous tasks executed with the multiprocessing pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Sincerely Media on Unsplash
Do you have any questions?