Multiprocessing Pool Get Result from Asynchronous Tasks

August 17, 2022 Python Multiprocessing Pool

You can get results from tasks in the multiprocessing pool using a callback or by calling AsyncResult.get().

In this tutorial you will discover how to get results from tasks issued asynchronously to the multiprocessing pool in Python.

Let's get started.

Multiprocessing Pool Asynchronous Tasks

We can execute tasks asynchronously using the multiprocessing pool.

The multiprocessing.Pool class provides three functions for issuing tasks asynchronously, they are:

Each of these approaches for executing an asynchronous task in the multiprocessing pool returns immediately with an AsyncResult object.

This object provides a handle on the task or tasks and allows us to check on the status of the tasks and to get results.

How can we get results from asynchronous tasks?

How to Get Result from an Asynchronous Tasks

There are two ways that we can get results from asynchronous tasks executed with the multiprocessing pool.

They are:

Let's take a closer look at each approach.

How to Get Result Via a Callback

We can get results from asynchronous tasks via a result callback.

A result callback can be specified via the "callback" argument.

The argument specifies the name of a custom function to call with the result of asynchronous task or tasks.

The function may have any name you like, as long as it does not conflict with a function name already in use.

For example, if apply_async() is configured with a callback, then the callback function will be called with the return value of the task function that was executed.

# result callback function
def result_callback(result):
	print(result, flush=True)

...
# issue a single task
result = apply_async(..., callback=result_callback)

Alternatively, if map_async() or starmap_async() are configured with a callback, then the callback function will be called with an iterable of return values from all tasks issued to the process pool.

# result callback function
def result_callback(result):
	# iterate all results
	for value in result:
		print(value, flush=True)

...
# issue a single task
result = map_async(..., callback=result_callback)

You can learn more about result callbacks in the tutorial:

How to Get Result Via AsyncResult

We can get results from asynchronous tasks via the AsyncResult object.

A AsyncResult provides a handle on one or more issued tasks.

It allows the caller to check on the status of the issued tasks, to wait for the tasks to complete, and to get the results once tasks are completed.

We can get the result of an issued task by calling the AsyncResult.get() function.

This will return the result of the specific function called to issue the task.

For example:

...
# get the result of the task or tasks
value = result.get()

If the issued tasks have not yet completed, then get() will block until the tasks are finished.

A "timeout" argument can be specified. If the tasks are still running and do not complete within the specified number of seconds, a multiprocessing.TimeoutError is raised.

For example:

...
try:
	# get the task result with a timeout
	value = result.get(timeout=10)
except multiprocessing.TimeoutError as e:
	# ...

If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.

We may need to handle this case explicitly if we expect a task to raise an exception on failure.

For example:

...
try:
	# get the task result that might raise an exception
	value = result.get()
except Exception as e:
	# ...

You can learn more about the AsyncResult object in the tutorial:

Now that we know how to get results from asynchronous tasks issued to the multiprocessing pool, let's look at some worked examples.

How to Get Result From apply_async()

We can explore how to get results from tasks issued asynchronously with apply_async().

In this example we define a simple task that generates and returns a number. We will then issue a task to execute this function then get the result from the issued task.

Firstly, we can define a target task function.

The function will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the number that was generated.

The task() function below implements this.

# task executed in a child process
def task():
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value
    return value

Next, in the main process, we will create a multiprocessing pool with the default configuration using the context manager interface.

...
# create the process pool
with Pool() as pool:
	# ...

We will then issue a call to the task() function to be executed by the multiprocessing pool asynchronously via the apply_async() function.

...
# issue a task asynchronously
async_result = pool.apply_async(task)

You can learn more about how to use the apply_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the task.

This will block until the task has completed and then returns the return value from the task.

...
# wait for the task to complete and get the return value
print('Waiting for result...')
result = async_result.get()

Finally, we report the value retrieved from the task.

...
# report the result
print(f'Got: {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from an asynchronous task issued with apply_async
from time import sleep
from random import random
from multiprocessing import Pool

# task executed in a child process
def task():
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # issue a task asynchronously
        async_result = pool.apply_async(task)
        # wait for the task to complete and get the return value
        print('Waiting for result...')
        result = async_result.get()
        # report the result
        print(f'Got: {result}')

Running the example first creates the multiprocessing pool with a default configuration.

Next, the task is issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main process blocks attempting to get the result.

The task executes, first generating a random number, blocking for a fraction of a second, then returning the number.

The main process receives the result from the asynchronous task and reports the value.

Note, the output will differ each time the program is run given the use of random numbers.

Waiting for result...
Got: 0.30244728699858925

This example demonstrated how to get the result from a task executed asynchronously with apply_async().

Next, let's look at how we might get the result from tasks executed with map_async().

How to Get Result From map_async()

We can explore how to get results from tasks issued asynchronously with map_async().

In this example we define a simple task that takes an integer as an argument, generates a random number, then returns a combination of the argument with the generated number. We will then have the multiprocessing pool execute the function multiple times asynchronously with different arguments.

Firstly, we can define a target task function.

The function will take an integer as an argument. It will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the generated number added to the input argument.

The task() function below implements this.

# task executed in a child process
def task(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg + value

Next, in the main process, we will create a multiprocessing pool with the default configuration using the context manager interface.

...
# create the process pool
with Pool() as pool:
	# ...

We will then issue a call to the task() function multiple times with values between 0 and 4. The tasks are issued using map_async() to be executed asynchronously.

...
# issue tasks asynchronously
async_result = pool.map_async(task, range(5))

You can learn more about how to use the map_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.

This call will block until all tasks have completed.

...
# wait for the task to complete and get the iterable of return values
print('Waiting for results...')
results = async_result.get()

Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.

...
# iterate return values and report
for result in results:
    # report the result
    print(f'Got: {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from an asynchronous task issued with map_async
from time import sleep
from random import random
from multiprocessing import Pool

# task executed in a child process
def task(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg + value

# protect the entry point
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # issue tasks asynchronously
        async_result = pool.map_async(task, range(5))
        # wait for the task to complete and get the iterable of return values
        print('Waiting for results...')
        results = async_result.get()
        # iterate return values and report
        for result in results:
            # report the result
            print(f'Got: {result}')

Running the example first creates the multiprocessing pool with a default configuration.

Next, five tasks are issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main process blocks attempting to get the iterable of return values.

The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.

The main process receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.

Note, the output will differ each time the program is run given the use of random numbers.

Waiting for result...
Got: 0.44714197382220844
Got: 1.8567341012371452
Got: 2.9308971103246804
Got: 3.8640584603527994
Got: 4.387599564850404

This example demonstrates how to get results from tasks executed asynchronously with map_async().

Next, let's look at how we might get the result from tasks executed with starmap_async().

How to Get Result From starmap_async()

We can explore how to get results from tasks issued asynchronously with starmap_async().

In this example we define a simple task that takes three integers as an argument, generates a random number, then returns a combination of the input arguments with the generated number. We will then have the multiprocessing pool execute the function multiple times asynchronously with different arguments.

Firstly, we can define a target task function.

The function will take three integers as arguments. It will generate a random number between 0 and 1. It then blocks for a fraction of a second to simulate computational effort, then adds the generated number and all arguments and returns the sum.

The task() function below implements this.

# task executed in a child process
def task(arg1, arg2, arg3):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg1 + arg2 + arg3 + value

Next, in the main process, we will create a multiprocessing pool with the default configuration using the context manager interface.

...
# create the process pool
with Pool() as pool:
	# ...

Next, we prepare arguments for the starmap() function.

In this case, we create a list of five tuples. Each tuple has an integer argument between 0 and 4, then the square and the cube of each integer.

...
# prepare arguments
args = [(i,i*2, i*3) for i in range(5)]

We will then issue a call to the task() function multiple times with the prepared arguments. The tasks are issued using starmap_async() to be executed asynchronously.

...
# issue tasks asynchronously
async_result = pool.starmap_async(task, args)

You can learn more about how to use the starmap_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.

This call will block until all tasks have completed.

...
# wait for the task to complete and get the iterable of return values
print('Waiting for results...')
results = async_result.get()

Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.

...
# iterate return values and report
for result in results:
    # report the result
    print(f'Got: {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from an asynchronous task issued with starmap_async
from time import sleep
from random import random
from multiprocessing import Pool

# task executed in a child process
def task(arg1, arg2, arg3):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg1 + arg2 + arg3 + value

# protect the entry point
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # prepare arguments
        args = [(i,i*2, i*3) for i in range(5)]
        # issue tasks asynchronously
        async_result = pool.starmap_async(task, args)
        # wait for the task to complete and get the iterable of return values
        print('Waiting for results...')
        results = async_result.get()
        # iterate return values and report
        for result in results:
            # report the result
            print(f'Got: {result}')

Running the example first creates the multiprocessing pool with a default configuration.

Next, arguments are prepared and five tasks are issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main process blocks attempting to get the iterable of return values.

The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.

The main process receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.

Note, the output will differ each time the program is run given the use of random numbers.

Waiting for results...
Got: 0.5694834046113727
Got: 6.664069516764784
Got: 12.564221171514783
Got: 18.33154747747727
Got: 24.998680841635768

This example demonstrates how to get results from tasks executed asynchronously with starmap_async().

Takeaways

You now know how to get results from asynchronous tasks executed with the multiprocessing pool.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.