ThreadPool Get Results from Asynchronous Tasks

October 7, 2022 Python ThreadPool

You can get results from tasks in the ThreadPool using a callback or by calling AsyncResult.get().

In this tutorial, you will discover how to get results from tasks issued asynchronously to the ThreadPool in Python.

Let's get started.

ThreadPool Asynchronous Tasks

We can execute tasks asynchronously using the ThreadPool.

The ThreadPool class provides three functions for issuing tasks asynchronously, they are:

Each of these approaches for executing an asynchronous task in the ThreadPool returns immediately with an AsyncResult object.

This object provides a handle on the task or tasks and allows us to check on the status of the tasks and to get results.

How can we get results from asynchronous tasks?

How to Get Results from Asynchronous Tasks

There are two ways that we can get results from asynchronous tasks executed with the ThreadPool.

They are:

  1. Use a result callback.
  2. Get results from the AsyncResult object.

Let's take a closer look at each approach.

How to Get Results Via a Callback

We can get results from asynchronous tasks via a result callback.

A result callback can be specified via the "callback" argument.

The argument specifies the name of a custom function to call with the result from an asynchronous task or tasks.

The function may have any name you like, as long as it does not conflict with a function name already in use.

For example, if apply_async() is configured with a callback, then the callback function will be called with the return value of the task function that was executed.

# result callback function
def result_callback(result):
	print(result)

...
# issue a single task
result = apply_async(..., callback=result_callback)

Alternatively, if map_async() or starmap_async() are configured with a callback, then the callback function will be called with an iterable of return values from all tasks issued to the ThreadPool.

# result callback function
def result_callback(result):
	# iterate all results
	for value in result:
		print(value)

...
# issue a single task
result = map_async(..., callback=result_callback)

How to Get Results Via AsyncResult

We can get results from asynchronous tasks via the AsyncResult object.

An AsyncResult provides a handle on one or more issued tasks.

It allows the caller to check on the status of the issued tasks, wait for the tasks to complete, and get the results once tasks are completed.

We can get the result of an issued task by calling the AsyncResult.get() function.

This will return the result of the specific function called to issue the task.

For example:

...
# get the result of the task or tasks
value = result.get()

If the issued tasks have not yet been completed, then get() will block until the tasks are finished.

A "timeout" argument can be specified. If the tasks are still running and do not completed within the specified number of seconds, a multiprocessing.TimeoutError is raised.

For example:

...
try:
	# get the task result with a timeout
	value = result.get(timeout=10)
except multiprocessing.TimeoutError as e:
	# ...

If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.

We may need to handle this case explicitly if we expect a task to raise an exception on failure.

For example:

...
try:
	# get the task result that might raise an exception
	value = result.get()
except Exception as e:
	# ...

You can learn more about the AsyncResult object in the tutorial:

Now that we know how to get results from asynchronous tasks issued to the ThreadPool, let's look at some worked examples.

How to Get Results From apply_async()

We can explore how to get results from tasks issued asynchronously with apply_async().

In this example, we define a simple task that generates and returns a number. We will then issue a task to execute this function and then get the result from the issued task.

Firstly, we can define a target task function.

The function will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the number that was generated.

The task() function below implements this.

# task executed by thread worker
def task():
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value
    return value

Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.

...
# create the thread pool
with ThreadPool() as pool:
	# ...

We will then issue a call to the task() function to be executed by the ThreadPool asynchronously via the apply_async() function.

...
# issue a task asynchronously
async_result = pool.apply_async(task)

You can learn more about how to use the apply_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the task.

This will block until the task has been completed and then returns the return value from the task.

...
# wait for the task to complete and get the return value
print('Waiting for result...')
result = async_result.get()

Finally, we report the value retrieved from the task.

...
# report the result
print(f'Got: {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from an asynchronous task issued with apply_async
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool

# task executed by thread worker
def task():
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create the thread pool
    with ThreadPool() as pool:
        # issue a task asynchronously
        async_result = pool.apply_async(task)
        # wait for the task to complete and get the return value
        print('Waiting for result...')
        result = async_result.get()
        # report the result
        print(f'Got: {result}')

Running the example first creates the ThreadPool with a default configuration.

Next, the task is issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main thread blocks attempting to get the result.

The task executes, first generating a random number, blocking for a fraction of a second, then returning the number.

The main thread receives the result from the asynchronous task and reports the value.

Note, that the output will differ each time the program is run given the use of random numbers.

Waiting for result...
Got: 0.45583546811729037

This example demonstrated how to get the result from a task executed asynchronously with apply_async().

Next, let's look at how we might get the result from tasks executed with map_async().

How to Get Results From map_async()

We can explore how to get results from tasks issued asynchronously with map_async().

In this example, we define a simple task that takes an integer as an argument, generates a random number, then returns a combination of the argument with the generated number. We will then have the ThreadPool execute the function multiple times asynchronously with different arguments.

Firstly, we can define a target task function.

The function will take an integer as an argument. It will generate a random number between 0 and 1. It will then block for a fraction of a second to simulate computational effort, then return the generated number added to the input argument.

The task() function below implements this.

# task executed in a thread worker
def task(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg + value

Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.

...
# create the thread pool
with ThreadPool() as pool:
	# ...

We will then issue a call to the task() function multiple times with values between 0 and 4. The tasks are issued using map_async() to be executed asynchronously.

...
# issue tasks asynchronously
async_result = pool.map_async(task, range(5))

You can learn more about how to use the map_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.

This call will block until all tasks have been completed.

...
# wait for the task to complete and get the iterable of return values
print('Waiting for results...')
results = async_result.get()

Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.

...
# iterate return values and report
for result in results:
    # report the result
    print(f'Got: {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from an asynchronous task issued with map_async
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool

# task executed in a thread worker
def task(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg + value

# protect the entry point
if __name__ == '__main__':
    # create the thread pool
    with ThreadPool() as pool:
        # issue tasks asynchronously
        async_result = pool.map_async(task, range(5))
        # wait for the task to complete and get the iterable of return values
        print('Waiting for results...')
        results = async_result.get()
        # iterate return values and report
        for result in results:
            # report the result
            print(f'Got: {result}')

Running the example first creates the ThreadPool with a default configuration.

Next, five tasks are issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main thread blocks attempting to get the iterable of return values.

The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.

The main thread receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.

Note, that the output will differ each time the program is run given the use of random numbers.

Waiting for results...
Got: 0.8595136493881548
Got: 1.8697912864375383
Got: 2.5849814054642755
Got: 3.9306465445033094
Got: 4.084973998527668

This example demonstrates how to get results from tasks executed asynchronously with map_async().

Next, let's look at how we might get the result from tasks executed with starmap_async().

How to Get Result From starmap_async()

We can explore how to get results from tasks issued asynchronously with starmap_async().

In this example, we define a simple task that takes three integers as an argument, generates a random number, then returns a combination of the input arguments with the generated number. We will then have the ThreadPool execute the function multiple times asynchronously with different arguments.

Firstly, we can define a target task function.

The function will take three integers as arguments. It will generate a random number between 0 and 1. It then blocks for a fraction of a second to simulate computational effort, then adds the generated number and all arguments and returns the sum.

The task() function below implements this.

# task executed in a thread worker
def task(arg1, arg2, arg3):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg1 + arg2 + arg3 + value

Next, in the main thread, we will create a ThreadPool with the default configuration using the context manager interface.

...
# create the thread pool
with ThreadPool() as pool:
	# ...

Next, we prepare arguments for the starmap() function.

In this case, we create a list of five tuples. Each tuple has an integer argument between 0 and 4, then the square and the cube of each integer.

...
# prepare arguments
args = [(i,i*2, i*3) for i in range(5)]

We will then issue a call to the task() function multiple times with the prepared arguments. The tasks are issued using starmap_async() to be executed asynchronously.

...
# issue tasks asynchronously
async_result = pool.starmap_async(task, args)

You can learn more about how to use the starmap_async() function in the tutorial:

This call returns immediately with an AsyncResult object.

We then call the get() function to get the result from the issued tasks, specifically an iterator over return values from the five function calls.

This call will block until all tasks have been completed.

...
# wait for the task to complete and get the iterable of return values
print('Waiting for results...')
results = async_result.get()

Finally, we report the values retrieved from the five issued tasks by iterating over the return values and reporting each in turn.

...
# iterate return values and report
for result in results:
    # report the result
    print(f'Got: {result}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from an asynchronous task issued with starmap_async
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool

# task executed in a thread worker
def task(arg1, arg2, arg3):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    sleep(value)
    # return the generated value combined with the argument
    return arg1 + arg2 + arg3 + value

# protect the entry point
if __name__ == '__main__':
    # create the thread pool
    with ThreadPool() as pool:
        # prepare arguments
        args = [(i,i*2, i*3) for i in range(5)]
        # issue tasks asynchronously
        async_result = pool.starmap_async(task, args)
        # wait for the task to complete and get the iterable of return values
        print('Waiting for results...')
        results = async_result.get()
        # iterate return values and report
        for result in results:
            # report the result
            print(f'Got: {result}')

Running the example first creates the ThreadPool with a default configuration.

Next, arguments are prepared and five tasks are issued to the pool to be executed asynchronously.

An AsyncResult object is returned and the main thread blocks attempting to get the iterable of return values.

The tasks execute in parallel, each task taking an integer argument, generating a random number, blocking for a fraction of a second, then returning the generated number added to the argument.

The main thread receives the result from the asynchronous task. It then traverses the returned iterable and reports the return value from each task.

Note, the output will differ each time the program is run given the use of random numbers.

Waiting for results...
Got: 0.68185718775623
Got: 6.432202267794842
Got: 12.449108841533565
Got: 18.551857634870373
Got: 24.583687905402517

This example demonstrates how to get results from tasks executed asynchronously with starmap_async().

Takeaways

You now know how to get results from asynchronous tasks executed with the ThreadPool.