How to Use ThreadPool AsyncResult

September 29, 2022 Python ThreadPool

You can issue asynchronous tasks to the ThreadPool which will return an AsyncResult object.

The AsyncResult provides a handle or issued tasks in the ThreadPool and can be used to check on the status of the tasks and to get task results.

In this tutorial you will discover how to use the AsyncResult in Python.

Let's get started.

What is the AsyncResult

An AsyncResult object is returned when issuing tasks to ThreadPool asynchronously.

This can be achieved via any of the following methods on the ThreadPool:

A AsyncResult provides a handle on one or more issued tasks.

It allows the caller to check on the status of the issued tasks, to wait for the tasks to complete, and to get the results once tasks are completed.

How to Use the AsyncResult

The AsyncResult class is straightforward to use.

First, you must get an AsyncResult object by issuing one or more tasks to the ThreadPool any of the apply_async(), map_async(), or starmap_async() functions.

For example:

...
# issue a task to the thread pool
result = pool.apply_async(...)

Once you have an AsyncResult object, you can use it to query the status and get results from the task.

Get a Result

We can get the result of an issued task by calling the AsyncResult.get() function.

Return the result when it arrives.

-- multiprocessing — Process-based parallelism

This will return the result of the specific function called to issue the task.

For example:

...
# get the result of the task or tasks
value = result.get()

If the issued tasks have not yet completed, then get() will block until the tasks are finished.

A "timeout" argument can be specified. If the tasks are still running and do not complete within the specified number of seconds, a multiprocessing.TimeoutError is raised.

If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised.

-- multiprocessing — Process-based parallelism

For example:

...
try:
	# get the task result with a timeout
	value = result.get(timeout=10)
except multiprocessing.TimeoutError as e:
	# ...

If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.

We may need to handle this case explicitly if we expect a task to raise an exception on failure.

If the remote call raised an exception then that exception will be re-raised by get().

-- multiprocessing — Process-based parallelism

For example:

...
try:
	# get the task result that might raise an exception
	value = result.get()
except Exception as e:
	# ...

Wait For Completion

We can wait for all tasks to complete via the AsyncResult.wait() method.

This will block until all issued tasks are completed.

For example:

...
# wait for issued task to complete
result.wait()

If the tasks have already completed, then the wait() method will return immediately.

A "timeout" argument can be specified to set a limit in seconds for how long the caller is willing to wait.

Wait until the result is available or until timeout seconds pass.

-- multiprocessing — Process-based parallelism

If the timeout expires before the tasks are complete, the wait() function will return.

When using a timeout, the wait() function does not give an indication that it returned because tasks completed or because the timeout elapsed. Therefore, we can check if the tasks completed via the ready() function.

For example:

...
# wait for issued task to complete with a timeout
result.wait(timeout=10)
# check if the tasks are all done
if result.ready()
	print('All Done')
	...
else :
	print('Not Done Yet')
	...

Check if Tasks Are Completed

We can check if the issued tasks are completed via the AsyncResult.ready() method.

Return whether the call has completed.

-- multiprocessing — Process-based parallelism

It returns True if the tasks have completed, successfully or otherwise, or False if the tasks are still running.

For example:

...
# check if tasks are still running
if result.ready():
	print('Tasks are done')
else:
	print('Tasks are not done')

Check if Tasks Were Successful

We can check if the issued tasks completed successfully via the AsyncResult.successful() method.

Issued tasks are successful if no tasks raised an exception.

If at least one issued task raised an exception, then the call was not successful and the successful() function will return False.

This function should be called after it is known that the tasks have completed, e.g. ready() returns True.

For example:

...
# check if the tasks have completed
if result.ready():
	# check if the tasks were successful
	if result.successful():
		print('Successful')
	else:
		print('Unsuccessful')

If the issued tasks are still running, a ValueError is raised.

Return whether the call completed without raising an exception. Will raise ValueError if the result is not ready.

-- multiprocessing — Process-based parallelism

For example:

...
try:
	# check if the tasks were successful
	if result.successful():
		print('Successful')
except ValueError as e:
	print('Tasks still running')

Now that we know how to use the AsyncResult, let's look at some worked examples.

How to Get a AsyncResult

We can get an AsyncResult object via calling any of the apply_async(), map_async(), or starmap_async() functions to issue tasks to the ThreadPool.

Let's take a look at each example in turn.

Example of Getting a AsyncResult via apply_async()

We can explore how to get an AsyncResult object by issuing tasks using the apply_async() function.

In this example we will define a task function that takes an integer as an argument, generates a random number, reports the values, blocks for a fraction of a second, then returns the generated value.

Firstly, we can define the task function that takes an argument and returns the generated value.

The task() function below implements this.

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

Next, in the main thread we can issue a task using the apply_async() function. We can specify the task() function and an argument of 0 via the "args" argument.

This will return an AsyncResult object immediately.

In the main thread, we can create the ThreadPool. We will use the context manager interface to ensure the pool is terminated automatically once we are finished with it, and use the default configuration.

...
# create and configure the thread pool
with ThreadPool() as pool:
	# ...

You can learn more about creating a ThreadPool using the context manager interface in the tutorial:

...
# issue tasks to the thread pool
result = pool.apply_async(task, args=(0,))

You can learn more about how to use the apply_async() function in the tutorial:

Finally, we can close the ThreadPool to prevent any further tasks being issued to the ThreadPool, then join the pool to wait for all issued tasks to complete.

...
# close the thread pool
pool.close()
# wait for all tasks to complete
pool.join()

You can learn more about joining the ThreadPool in the tutorial:

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting an AsyncResult via apply_async()
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # close the thread pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then closes the ThreadPool and waits for all issued tasks to complete.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

The issued task completes, and the main thread continues on and terminates the program.

Task 0 executing with 0.2341804974133077

Example of Getting a AsyncResult via map_async()

We can explore how to get an AsyncResult object by issuing tasks using the map_async() function.

In this example, we can update the previous example to issue multiple calls to the task() function with different arguments from 0 to 9.

This can be achieved by calling the map_async() and specifying the task() function and a range of arguments.

...
# issue tasks to the thread pool
result = pool.map_async(task, range(10))

This call will return immediately with an AsyncResult object.

You can learn more about how to use the map_async() function in the tutorial:

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting an AsyncResult via map_async()
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.map_async(task, range(10))
        # close the thread pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the ThreadPool with the default configuration.

A total of 10 tasks are issued to the ThreadPool and a single AsyncResult object is returned immediately.

The main thread then closes the ThreadPool and waits for all issued tasks to complete.

The issued tasks are executed by the ThreadPool. Each task generates a random number, reports a message, blocks for a moment, then returns the generated value.

All issued tasks complete and the main thread continues on and terminates the program.

Task 0 executing with 0.9725518300936189
Task 1 executing with 0.37769018395586973
Task 2 executing with 0.14834504164539097
Task 3 executing with 0.6345103966886227
Task 4 executing with 0.9689599646132412
Task 5 executing with 0.2274484073261115
Task 6 executing with 0.31175106188105683
Task 7 executing with 0.2973257640104593
Task 8 executing with 0.38643518304135394
Task 9 executing with 0.08353457705129097

Example of Getting a AsyncResult via starmap_async()

We can explore how to get an AsyncResult object by issuing tasks using the starmap_async() function.

In this example, we can update the previous example so that the task() function takes two arguments and that we issue multiple calls to the task() function with two arguments per function call.

This can be achieved by calling the starmap_async().

First, we must update the task() function to take two arguments. In this case it will take the integer number as before, and generate a floating point value between 0 and 1, instead of generating it itself.

The updated task() function is listed below.

# task executed in a worker thread
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

Next, we can prepare arguments for each call to the task() function.

In this case, we will use a list comprehension to prepare a list of arguments, where each item in the list is a tuple of arguments for a call to the task() function.

...
# prepare arguments
items = [(i, random()) for i in range(10)]

We can then issue tasks to the ThreadPool by specifying the task() function and the list of arguments for each task.

...
# issue tasks to the thread pool
result = pool.starmap_async(task, items)

This call will return immediately with an AsyncResult object.

You can learn more about how to use the starmap_async() function in the tutorial:

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting an AsyncResult via starmap_async()
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier, value):
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # prepare arguments
        items = [(i, random()) for i in range(10)]
        # issue tasks to the thread pool
        result = pool.starmap_async(task, items)
        # close the thread pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the ThreadPool with the default configuration.

A total of 10 tasks are issued to the ThreadPool and a single AsyncResult object is returned immediately.

The main thread then closes the ThreadPool and waits for all issued tasks to complete.

The issued tasks are executed by the ThreadPool. Each task reports a message, blocks for a moment, then returns the floating point value.

All issued tasks complete and the main thread continues on and terminates the program.

Task 0 executing with 0.6483495161346381
Task 1 executing with 0.06752928130815483
Task 2 executing with 0.24228992400799954
Task 3 executing with 0.5838559409210107
Task 4 executing with 0.06536703787588205
Task 5 executing with 0.19152849834963948
Task 6 executing with 0.5499248833016134
Task 7 executing with 0.5353222309212269
Task 8 executing with 0.8891503883724293
Task 9 executing with 0.6692238800285062

Now that we know how to get a AsyncResult, let's look at how we might get results from completed tasks

How to Get a Result From a AsyncResult

In this section we will explore how to get a result for issued tasks via an AsyncResult.

In these examples, we will use the apply_async() function to issue one task as a pretext for exploring how to get a result. Nevertheless, the examples are general and just as applicable to getting results from an AsyncResult created via map_async() and starmap_async().

Example of Getting an AsyncResult Result

We can explore how to get the results of issued tasks from an AsyncResult.

This can be achieved by updating the previous example that gets an AsyncResult by calling apply_async(), then calling the get() function on an AsyncResult to get the result of the issued task.

For example:

...
# wait for and get the result from the task
value = result.get()

The call will block until the task is complete, then returns the return value from the issued task, in this case from the task() function.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from a AsyncResult
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # wait for and get the result from the task
        value = result.get()
        # report the result
        print(f'Got: {value}')

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then attempts to get the result from the issued task. It blocks until the task is completed.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

The main thread receives the result from the completed task and reports the value with a message.

Task 0 executing with 0.4426396464648831
Got: 0.4426396464648831

Example of Getting an AsyncResult Result With a Timeout

We can explore getting a result from an asynchronous task with a timeout.

This can be achieved by updating the previous example and calling the get() function and specifying a "timeout" argument in seconds.

We can use a timeout of half a second, meaning that half the time we will get a result on time, and half the time the timeout will elapse before we get a result.

...
# wait for and get the result from the task with a timeout
value = result.get(timeout=0.5)
# report the result
print(f'Got: {value}')

If the timeout elapses before we get a result, then an exception is raised.

We will handle this exception and report a failure message.

...
try:
    # wait for and get the result from the task with a timeout
    value = result.get(timeout=0.5)
    # report the result
    print(f'Got: {value}')
except TimeoutError:
    print(f'Failed to get the result')

If the exception is raised the task may not complete because the ThreadPool will be terminated by the context manager.

Therefore we can close the pool and wait for all issued tasks to complete.

...
# close the thread pool
pool.close()
# wait for all tasks to complete
pool.join()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result with a timeout from a AsyncResult
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        try:
            # wait for and get the result from the task with a timeout
            value = result.get(timeout=0.5)
            # report the result
            print(f'Got: {value}')
        except TimeoutError:
            print(f'Failed to get the result')
        # close the thread pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then attempts to get the result from the issued task with a timeout. It blocks until the task is completed or the timeout elapses.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

In this case, the main thread receives the result from the completed task and reports the value with a message.

Task 0 executing with 0.4132529369603417
Got: 0.4132529369603417

We can run the example a few times, sometimes the main thread will get a result and sometimes it will not.

Rerunning the example again, we may get the failure case.

In this case, the timeout elapses before the task is completed.

An exception is raised by the ThreadPool and caught by the main thread. A failure message is reported.

The ThreadPool is then closed and the main thread blocks until all issued tasks have completed.

Task 0 executing with 0.6841087458378395
Failed to get the result

Example of Getting an AsyncResult Result With Exception

We can explore getting a result from issued tasks where at least one task fails with an unhandled exception.

This can be achieved by updating the previous example so that the task function raises an exception. We can then attempt to get the result from the issued task in the main thread and handle the exception that will be re-raised by the ThreadPool.

Firstly, we can update the task() function so that it raises an exception.

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # fail
    raise Exception('Something bad happened')
    # return the generated value
    return value

We can then attempt to get the result from the issued task via the get() function on the AsyncResult.

The ThreadPool will re-raise the exception when we attempt to get the result, therefore we will handle the exception when getting the result.

...
try:
    # wait for and get the result from the task
    value = result.get()
    # report the result
    print(f'Got: {value}')
except Exception as e:
    print(f'Failed to get the result: {e}')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of getting a result from a AsyncResult that raises an exception
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # fail
    raise Exception('Something bad happened')
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        try:
            # wait for and get the result from the task
            value = result.get()
            # report the result
            print(f'Got: {value}')
        except Exception as e:
            print(f'Failed to get the result: {e}')

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then attempts to get the result from the issued task. It blocks until the task is completed.

The issued task is executed, generating a random number, reporting a message, blocking, then raises an exception.

The task is completed, but unsuccessfully.

The main thread handles the re-raised exception from the completed task and reports the failure with a message.

Task 0 executing with 0.5372208163234266
Failed to get the result: Something bad happened

Next, let's explore how we might wait for issued tasks to complete.

How to Wait For Tasks to Complete With AsyncResult

We can wait for all tasks associated with an AsyncResult object to complete.

This can be achieved using the wait() method, which may also take a timeout in seconds.

Let's explore each case of waiting for tasks.

Example of Waiting for a AsyncResult Results

We can explore how to wait for all issued tasks to complete.

This can be achieved by updating the previous example to call the wait() function instead of getting the result.

The wait() function will only return after all issued tasks associated with the AsyncResult object have completed.

Recall, a task is completed if it finishes normally or raises an exception.

For example:

...
# wait for tasks to complete
result.wait()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of waiting for a result from a AsyncResult
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # wait for tasks to complete
        result.wait()

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then waits for the issued task to complete, blocking until the wait() function returns.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

The main thread unblocks and continues on, terminating the application.

Task 0 executing with 0.49201873498348836

Example of Waiting for AsyncResult Results with a Timeout

We can explore waiting for all issued tasks to complete with a timeout.

This can be achieved by updating the previous example of waiting for all tasks associated with an AsyncResult to complete, then specifying a "timeout" argument.

We will use a timeout of half a second, meaning that half the time the issued task will complete and the wait() function will return normally and the other half of the time the timeout will elapse and the wait() function will return.

...
# wait for tasks to complete
result.wait(timeout=0.5)

Note, the wait() function does not return a value. This means that it does not give an indication of whether the call returned because the associated tasks completed or because the timeout elapses. Therefore the ready() function should be called to check if the tasks are completed.

Finally, because the wait() function may have returned because the timeout elapsed, we can wait for all issued tasks in the pool to complete by joining the pool.

...
# close the thread pool
pool.close()
# wait for all tasks to complete
pool.join()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of waiting for a result from a AsyncResult with a timeout
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # wait for tasks to complete
        result.wait(timeout=0.5)
        # close the thread pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then waits for the issued task to complete with a timeout, blocking until the wait() function returns or the timeout elapses.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

The main thread unblocks after the task completes or the timeout.

The main thread then closes the ThreadPool and waits for all issued tasks to complete.

Task 0 executing with 0.8160834744441859

Next, let's explore how to check the status of tasks issued by the ThreadPool.

How to Check the Status of Tasks via AsyncResult

We can check the status of all tasks issued to a ThreadPool associated with an AsyncResult.

This can be achieved via the ready() function and the successful() function.

Let's take examples of each in turn.

Example of Checking if AsyncResult Results are Ready

We can explore checking whether issued tasks have completed via the ready() function.

Recall that a completed task may or may not have completed successfully (e.g. raised an exception).

We can update the previous example to sleep for a fraction of a second after issuing the task, then checking if the tasks were completed. We can then report a message if the task is completed and a different message if the task is not completed.

...
# wait a moment
sleep(0.5)
# check if the tasks are complete
if result.ready():
    print('Tasks are complete')
else:
    print('Tasks are not complete')

Because the task itself will block for a random fraction of a second we will get a different result each time the program is run.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of checking if tasks are finished via a AsyncResult
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # wait a moment
        sleep(0.5)
        # check if the tasks are complete
        if result.ready():
            print('Tasks are complete')
        else:
            print('Tasks are not complete')
        # close the thread pool
        pool.close()
        # wait for all tasks to complete
        pool.join()

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then blocks for a fraction of a second.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

It then checks if the issued task is completed or not.

In this case, the task was completed by the time the check was performed.

Task 0 executing with 0.4071953838768907
Tasks are complete

Note, you will get different results each time the program is run given the use of random numbers.

We can explore a failure case.

Running the program again, the main thread checks if the issued tasks have completed.

In this case the task has not been completed. A different message is then reported.

The ThreadPool is then closed and the main thread blocks until all issued tasks have completed.

Task 0 executing with 0.9709175072663047
Tasks are not complete

Example of Completed Successfully

We can explore whether issued tasks were completed successfully via the successful() function.

We can update the previous example so that we first wait for the issued task to complete by calling the wait() function.

...
# wait for tasks to complete
result.wait()

We can then call the successful() function and report a message if the issued task was completed successfully, which we expect it was in this case.

...
# check if the tasks were successful
if result.successful():
    print('Tasks were successful')
else:
    print('Tasks were not successful')

The complete example is listed below

# SuperFastPython.com
# example of checking if tasks were successful via a AsyncResult
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # wait for tasks to complete
        result.wait()
        # check if the tasks were successful
        if result.successful():
            print('Tasks were successful')
        else:
            print('Tasks were not successful')

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then waits for the issued task to complete, blocking until the wait() function returns.

The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.

The main thread unblocks and continues on. It checks the success of the tasks associated with the AsyncResult object.

The task was completed successfully and an appropriate message was reported.

Task 0 executing with 0.39553548636223324
Tasks were successful

Example of Completed Unsuccessfully

We can explore checking the status of a completed task via the successful() function that we know was not completed successfully.

This can be achieved by updating the previous example so that the task() function raises an exception.

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # fail
    raise Exception('Something bad happened')
    # return the generated value
    return value

We then expect that checking the success of the issued task will then report a different message.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of checking if tasks were successful via a AsyncResult and they are not
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # fail
    raise Exception('Something bad happened')
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # wait for tasks to complete
        result.wait()
        # check if the tasks were successful
        if result.successful():
            print('Tasks were successful')
        else:
            print('Tasks were not successful')

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then waits for the issued task to complete, blocking until the wait() function returns.

The issued task is executed, generating a random number, reporting a message, blocking, then raises an exception.

The main thread unblocks and continues on. It checks the success of the tasks associated with the AsyncResult object.

The task was not completed successfully this time and an appropriate message is reported.

Task 0 executing with 0.885207853231017
Tasks were not successful

Example of Tasks Not Completed

We can explore checking the status of a task that is not yet complete via the successful() function.

The expectation is that the function will raise an exception as the task is not yet completed.

This can be achieved by updating the example in the previous section to check the successful status of the task immediately after it is issued.

...
# issue tasks to the thread pool
result = pool.apply_async(task, args=(0,))
# check if the tasks were successful
if result.successful():
    print('Tasks were successful')
else:
    print('Tasks were not successful')

This will raise an exception as the task is not completed

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of checking if tasks were successful via a AsyncResult and they are not complete
from random import random
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(identifier):
    # generate a value
    value = random()
    # report a message
    print(f'Task {identifier} executing with {value}')
    # block for a moment
    sleep(value)
    # return the generated value
    return value

# protect the entry point
if __name__ == '__main__':
    # create and configure the thread pool
    with ThreadPool() as pool:
        # issue tasks to the thread pool
        result = pool.apply_async(task, args=(0,))
        # check if the tasks were successful
        if result.successful():
            print('Tasks were successful')
        else:
            print('Tasks were not successful')

Running the example first creates the ThreadPool with the default configuration.

A task is issued which returns an AsyncResult object immediately.

The main thread then checks the success of the tasks associated with the AsyncResult object.

The task was not yet complete and an exception was raised.

Traceback (most recent call last):
  ...
ValueError: <multiprocessing.pool.ApplyResult object at 0x1013b9b20> not ready

Takeaways

You now know how to use an AsyncResult object associated with tasks issued to the ThreadPool.