Last Updated on October 29, 2022
You can issue asynchronous tasks to the ThreadPool which will return an AsyncResult object.
The AsyncResult provides a handle or issued tasks in the ThreadPool and can be used to check on the status of the tasks and to get task results.
In this tutorial you will discover how to use the AsyncResult in Python.
Let’s get started.
What is the AsyncResult
An AsyncResult object is returned when issuing tasks to ThreadPool asynchronously.
This can be achieved via any of the following methods on the ThreadPool:
- apply_async() to issue one task.
- map_async() to issue multiple tasks.
- starmap_async() to issue multiple tasks that take multiple arguments.
A AsyncResult provides a handle on one or more issued tasks.
It allows the caller to check on the status of the issued tasks, to wait for the tasks to complete, and to get the results once tasks are completed.
Run loops using all CPUs, download your FREE book to learn how.
How to Use the AsyncResult
The AsyncResult class is straightforward to use.
First, you must get an AsyncResult object by issuing one or more tasks to the ThreadPool any of the apply_async(), map_async(), or starmap_async() functions.
For example:
1 2 3 |
... # issue a task to the thread pool result = pool.apply_async(...) |
Once you have an AsyncResult object, you can use it to query the status and get results from the task.
Get a Result
We can get the result of an issued task by calling the AsyncResult.get() function.
Return the result when it arrives.
— multiprocessing — Process-based parallelism
This will return the result of the specific function called to issue the task.
- apply_async(): Returns the return value of the target function.
- map_async(): Returns an iterable over the return values of the target function.
- starmap_async(): Returns an iterable over the return values of the target function.
For example:
1 2 3 |
... # get the result of the task or tasks value = result.get() |
If the issued tasks have not yet completed, then get() will block until the tasks are finished.
A “timeout” argument can be specified. If the tasks are still running and do not complete within the specified number of seconds, a multiprocessing.TimeoutError is raised.
If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 |
... try: # get the task result with a timeout value = result.get(timeout=10) except multiprocessing.TimeoutError as e: # ... |
If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.
We may need to handle this case explicitly if we expect a task to raise an exception on failure.
If the remote call raised an exception then that exception will be re-raised by get().
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 |
... try: # get the task result that might raise an exception value = result.get() except Exception as e: # ... |
Wait For Completion
We can wait for all tasks to complete via the AsyncResult.wait() method.
This will block until all issued tasks are completed.
For example:
1 2 3 |
... # wait for issued task to complete result.wait() |
If the tasks have already completed, then the wait() method will return immediately.
A “timeout” argument can be specified to set a limit in seconds for how long the caller is willing to wait.
Wait until the result is available or until timeout seconds pass.
— multiprocessing — Process-based parallelism
If the timeout expires before the tasks are complete, the wait() function will return.
When using a timeout, the wait() function does not give an indication that it returned because tasks completed or because the timeout elapsed. Therefore, we can check if the tasks completed via the ready() function.
For example:
1 2 3 4 5 6 7 8 9 10 |
... # wait for issued task to complete with a timeout result.wait(timeout=10) # check if the tasks are all done if result.ready() print('All Done') ... else : print('Not Done Yet') ... |
Check if Tasks Are Completed
We can check if the issued tasks are completed via the AsyncResult.ready() method.
Return whether the call has completed.
— multiprocessing — Process-based parallelism
It returns True if the tasks have completed, successfully or otherwise, or False if the tasks are still running.
For example:
1 2 3 4 5 6 |
... # check if tasks are still running if result.ready(): print('Tasks are done') else: print('Tasks are not done') |
Check if Tasks Were Successful
We can check if the issued tasks completed successfully via the AsyncResult.successful() method.
Issued tasks are successful if no tasks raised an exception.
If at least one issued task raised an exception, then the call was not successful and the successful() function will return False.
This function should be called after it is known that the tasks have completed, e.g. ready() returns True.
For example:
1 2 3 4 5 6 7 8 |
... # check if the tasks have completed if result.ready(): # check if the tasks were successful if result.successful(): print('Successful') else: print('Unsuccessful') |
If the issued tasks are still running, a ValueError is raised.
Return whether the call completed without raising an exception. Will raise ValueError if the result is not ready.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 7 |
... try: # check if the tasks were successful if result.successful(): print('Successful') except ValueError as e: print('Tasks still running') |
Now that we know how to use the AsyncResult, let’s look at some worked examples.
How to Get a AsyncResult
We can get an AsyncResult object via calling any of the apply_async(), map_async(), or starmap_async() functions to issue tasks to the ThreadPool.
Let’s take a look at each example in turn.
Example of Getting a AsyncResult via apply_async()
We can explore how to get an AsyncResult object by issuing tasks using the apply_async() function.
In this example we will define a task function that takes an integer as an argument, generates a random number, reports the values, blocks for a fraction of a second, then returns the generated value.
Firstly, we can define the task function that takes an argument and returns the generated value.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value |
Next, in the main thread we can issue a task using the apply_async() function. We can specify the task() function and an argument of 0 via the “args” argument.
This will return an AsyncResult object immediately.
In the main thread, we can create the ThreadPool. We will use the context manager interface to ensure the pool is terminated automatically once we are finished with it, and use the default configuration.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
You can learn more about creating a ThreadPool using the context manager interface in the tutorial:
1 2 3 |
... # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) |
You can learn more about how to use the apply_async() function in the tutorial:
Finally, we can close the ThreadPool to prevent any further tasks being issued to the ThreadPool, then join the pool to wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
You can learn more about joining the ThreadPool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting an AsyncResult via apply_async() from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then closes the ThreadPool and waits for all issued tasks to complete.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The issued task completes, and the main thread continues on and terminates the program.
1 |
Task 0 executing with 0.2341804974133077 |
Example of Getting a AsyncResult via map_async()
We can explore how to get an AsyncResult object by issuing tasks using the map_async() function.
In this example, we can update the previous example to issue multiple calls to the task() function with different arguments from 0 to 9.
This can be achieved by calling the map_async() and specifying the task() function and a range of arguments.
1 2 3 |
... # issue tasks to the thread pool result = pool.map_async(task, range(10)) |
This call will return immediately with an AsyncResult object.
You can learn more about how to use the map_async() function in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting an AsyncResult via map_async() from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.map_async(task, range(10)) # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the ThreadPool with the default configuration.
A total of 10 tasks are issued to the ThreadPool and a single AsyncResult object is returned immediately.
The main thread then closes the ThreadPool and waits for all issued tasks to complete.
The issued tasks are executed by the ThreadPool. Each task generates a random number, reports a message, blocks for a moment, then returns the generated value.
All issued tasks complete and the main thread continues on and terminates the program.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.9725518300936189 Task 1 executing with 0.37769018395586973 Task 2 executing with 0.14834504164539097 Task 3 executing with 0.6345103966886227 Task 4 executing with 0.9689599646132412 Task 5 executing with 0.2274484073261115 Task 6 executing with 0.31175106188105683 Task 7 executing with 0.2973257640104593 Task 8 executing with 0.38643518304135394 Task 9 executing with 0.08353457705129097 |
Example of Getting a AsyncResult via starmap_async()
We can explore how to get an AsyncResult object by issuing tasks using the starmap_async() function.
In this example, we can update the previous example so that the task() function takes two arguments and that we issue multiple calls to the task() function with two arguments per function call.
This can be achieved by calling the starmap_async().
First, we must update the task() function to take two arguments. In this case it will take the integer number as before, and generate a floating point value between 0 and 1, instead of generating it itself.
The updated task() function is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value |
Next, we can prepare arguments for each call to the task() function.
In this case, we will use a list comprehension to prepare a list of arguments, where each item in the list is a tuple of arguments for a call to the task() function.
1 2 3 |
... # prepare arguments items = [(i, random()) for i in range(10)] |
We can then issue tasks to the ThreadPool by specifying the task() function and the list of arguments for each task.
1 2 3 |
... # issue tasks to the thread pool result = pool.starmap_async(task, items) |
This call will return immediately with an AsyncResult object.
You can learn more about how to use the starmap_async() function in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting an AsyncResult via starmap_async() from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issue tasks to the thread pool result = pool.starmap_async(task, items) # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the ThreadPool with the default configuration.
A total of 10 tasks are issued to the ThreadPool and a single AsyncResult object is returned immediately.
The main thread then closes the ThreadPool and waits for all issued tasks to complete.
The issued tasks are executed by the ThreadPool. Each task reports a message, blocks for a moment, then returns the floating point value.
All issued tasks complete and the main thread continues on and terminates the program.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.6483495161346381 Task 1 executing with 0.06752928130815483 Task 2 executing with 0.24228992400799954 Task 3 executing with 0.5838559409210107 Task 4 executing with 0.06536703787588205 Task 5 executing with 0.19152849834963948 Task 6 executing with 0.5499248833016134 Task 7 executing with 0.5353222309212269 Task 8 executing with 0.8891503883724293 Task 9 executing with 0.6692238800285062 |
Now that we know how to get a AsyncResult, let’s look at how we might get results from completed tasks
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
How to Get a Result From a AsyncResult
In this section we will explore how to get a result for issued tasks via an AsyncResult.
In these examples, we will use the apply_async() function to issue one task as a pretext for exploring how to get a result. Nevertheless, the examples are general and just as applicable to getting results from an AsyncResult created via map_async() and starmap_async().
Example of Getting an AsyncResult Result
We can explore how to get the results of issued tasks from an AsyncResult.
This can be achieved by updating the previous example that gets an AsyncResult by calling apply_async(), then calling the get() function on an AsyncResult to get the result of the issued task.
For example:
1 2 3 |
... # wait for and get the result from the task value = result.get() |
The call will block until the task is complete, then returns the return value from the issued task, in this case from the task() function.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting a result from a AsyncResult from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # wait for and get the result from the task value = result.get() # report the result print(f'Got: {value}') |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then attempts to get the result from the issued task. It blocks until the task is completed.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main thread receives the result from the completed task and reports the value with a message.
1 2 |
Task 0 executing with 0.4426396464648831 Got: 0.4426396464648831 |
Example of Getting an AsyncResult Result With a Timeout
We can explore getting a result from an asynchronous task with a timeout.
This can be achieved by updating the previous example and calling the get() function and specifying a “timeout” argument in seconds.
We can use a timeout of half a second, meaning that half the time we will get a result on time, and half the time the timeout will elapse before we get a result.
1 2 3 4 5 |
... # wait for and get the result from the task with a timeout value = result.get(timeout=0.5) # report the result print(f'Got: {value}') |
If the timeout elapses before we get a result, then an exception is raised.
We will handle this exception and report a failure message.
1 2 3 4 5 6 7 8 |
... try: # wait for and get the result from the task with a timeout value = result.get(timeout=0.5) # report the result print(f'Got: {value}') except TimeoutError: print(f'Failed to get the result') |
If the exception is raised the task may not complete because the ThreadPool will be terminated by the context manager.
Therefore we can close the pool and wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of getting a result with a timeout from a AsyncResult from random import random from time import sleep from multiprocessing.pool import ThreadPool from multiprocessing import TimeoutError # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) try: # wait for and get the result from the task with a timeout value = result.get(timeout=0.5) # report the result print(f'Got: {value}') except TimeoutError: print(f'Failed to get the result') # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then attempts to get the result from the issued task with a timeout. It blocks until the task is completed or the timeout elapses.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
In this case, the main thread receives the result from the completed task and reports the value with a message.
1 2 |
Task 0 executing with 0.4132529369603417 Got: 0.4132529369603417 |
We can run the example a few times, sometimes the main thread will get a result and sometimes it will not.
Rerunning the example again, we may get the failure case.
In this case, the timeout elapses before the task is completed.
An exception is raised by the ThreadPool and caught by the main thread. A failure message is reported.
The ThreadPool is then closed and the main thread blocks until all issued tasks have completed.
1 2 |
Task 0 executing with 0.6841087458378395 Failed to get the result |
Example of Getting an AsyncResult Result With Exception
We can explore getting a result from issued tasks where at least one task fails with an unhandled exception.
This can be achieved by updating the previous example so that the task function raises an exception. We can then attempt to get the result from the issued task in the main thread and handle the exception that will be re-raised by the ThreadPool.
Firstly, we can update the task() function so that it raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value |
We can then attempt to get the result from the issued task via the get() function on the AsyncResult.
The ThreadPool will re-raise the exception when we attempt to get the result, therefore we will handle the exception when getting the result.
1 2 3 4 5 6 7 8 |
... try: # wait for and get the result from the task value = result.get() # report the result print(f'Got: {value}') except Exception as e: print(f'Failed to get the result: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of getting a result from a AsyncResult that raises an exception from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) try: # wait for and get the result from the task value = result.get() # report the result print(f'Got: {value}') except Exception as e: print(f'Failed to get the result: {e}') |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then attempts to get the result from the issued task. It blocks until the task is completed.
The issued task is executed, generating a random number, reporting a message, blocking, then raises an exception.
The task is completed, but unsuccessfully.
The main thread handles the re-raised exception from the completed task and reports the failure with a message.
1 2 |
Task 0 executing with 0.5372208163234266 Failed to get the result: Something bad happened |
Next, let’s explore how we might wait for issued tasks to complete.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Wait For Tasks to Complete With AsyncResult
We can wait for all tasks associated with an AsyncResult object to complete.
This can be achieved using the wait() method, which may also take a timeout in seconds.
Let’s explore each case of waiting for tasks.
Example of Waiting for a AsyncResult Results
We can explore how to wait for all issued tasks to complete.
This can be achieved by updating the previous example to call the wait() function instead of getting the result.
The wait() function will only return after all issued tasks associated with the AsyncResult object have completed.
Recall, a task is completed if it finishes normally or raises an exception.
For example:
1 2 3 |
... # wait for tasks to complete result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of waiting for a result from a AsyncResult from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait() |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then waits for the issued task to complete, blocking until the wait() function returns.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main thread unblocks and continues on, terminating the application.
1 |
Task 0 executing with 0.49201873498348836 |
Example of Waiting for AsyncResult Results with a Timeout
We can explore waiting for all issued tasks to complete with a timeout.
This can be achieved by updating the previous example of waiting for all tasks associated with an AsyncResult to complete, then specifying a “timeout” argument.
We will use a timeout of half a second, meaning that half the time the issued task will complete and the wait() function will return normally and the other half of the time the timeout will elapse and the wait() function will return.
1 2 3 |
... # wait for tasks to complete result.wait(timeout=0.5) |
Note, the wait() function does not return a value. This means that it does not give an indication of whether the call returned because the associated tasks completed or because the timeout elapses. Therefore the ready() function should be called to check if the tasks are completed.
Finally, because the wait() function may have returned because the timeout elapsed, we can wait for all issued tasks in the pool to complete by joining the pool.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of waiting for a result from a AsyncResult with a timeout from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait(timeout=0.5) # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then waits for the issued task to complete with a timeout, blocking until the wait() function returns or the timeout elapses.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main thread unblocks after the task completes or the timeout.
The main thread then closes the ThreadPool and waits for all issued tasks to complete.
1 |
Task 0 executing with 0.8160834744441859 |
Next, let’s explore how to check the status of tasks issued by the ThreadPool.
How to Check the Status of Tasks via AsyncResult
We can check the status of all tasks issued to a ThreadPool associated with an AsyncResult.
This can be achieved via the ready() function and the successful() function.
Let’s take examples of each in turn.
Example of Checking if AsyncResult Results are Ready
We can explore checking whether issued tasks have completed via the ready() function.
Recall that a completed task may or may not have completed successfully (e.g. raised an exception).
We can update the previous example to sleep for a fraction of a second after issuing the task, then checking if the tasks were completed. We can then report a message if the task is completed and a different message if the task is not completed.
1 2 3 4 5 6 7 8 |
... # wait a moment sleep(0.5) # check if the tasks are complete if result.ready(): print('Tasks are complete') else: print('Tasks are not complete') |
Because the task itself will block for a random fraction of a second we will get a different result each time the program is run.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of checking if tasks are finished via a AsyncResult from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # wait a moment sleep(0.5) # check if the tasks are complete if result.ready(): print('Tasks are complete') else: print('Tasks are not complete') # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then blocks for a fraction of a second.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
It then checks if the issued task is completed or not.
In this case, the task was completed by the time the check was performed.
1 2 |
Task 0 executing with 0.4071953838768907 Tasks are complete |
Note, you will get different results each time the program is run given the use of random numbers.
We can explore a failure case.
Running the program again, the main thread checks if the issued tasks have completed.
In this case the task has not been completed. A different message is then reported.
The ThreadPool is then closed and the main thread blocks until all issued tasks have completed.
1 2 |
Task 0 executing with 0.9709175072663047 Tasks are not complete |
Example of Completed Successfully
We can explore whether issued tasks were completed successfully via the successful() function.
We can update the previous example so that we first wait for the issued task to complete by calling the wait() function.
1 2 3 |
... # wait for tasks to complete result.wait() |
We can then call the successful() function and report a message if the issued task was completed successfully, which we expect it was in this case.
1 2 3 4 5 6 |
... # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
The complete example is listed below
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of checking if tasks were successful via a AsyncResult from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait() # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then waits for the issued task to complete, blocking until the wait() function returns.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main thread unblocks and continues on. It checks the success of the tasks associated with the AsyncResult object.
The task was completed successfully and an appropriate message was reported.
1 2 |
Task 0 executing with 0.39553548636223324 Tasks were successful |
Example of Completed Unsuccessfully
We can explore checking the status of a completed task via the successful() function that we know was not completed successfully.
This can be achieved by updating the previous example so that the task() function raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value |
We then expect that checking the success of the issued task will then report a different message.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of checking if tasks were successful via a AsyncResult and they are not from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait() # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then waits for the issued task to complete, blocking until the wait() function returns.
The issued task is executed, generating a random number, reporting a message, blocking, then raises an exception.
The main thread unblocks and continues on. It checks the success of the tasks associated with the AsyncResult object.
The task was not completed successfully this time and an appropriate message is reported.
1 2 |
Task 0 executing with 0.885207853231017 Tasks were not successful |
Example of Tasks Not Completed
We can explore checking the status of a task that is not yet complete via the successful() function.
The expectation is that the function will raise an exception as the task is not yet completed.
This can be achieved by updating the example in the previous section to check the successful status of the task immediately after it is issued.
1 2 3 4 5 6 7 8 |
... # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
This will raise an exception as the task is not completed
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of checking if tasks were successful via a AsyncResult and they are not complete from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # issue tasks to the thread pool result = pool.apply_async(task, args=(0,)) # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Running the example first creates the ThreadPool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main thread then checks the success of the tasks associated with the AsyncResult object.
The task was not yet complete and an exception was raised.
1 2 3 |
Traceback (most recent call last): ... ValueError: <multiprocessing.pool.ApplyResult object at 0x1013b9b20> not ready |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to use an AsyncResult object associated with tasks issued to the ThreadPool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?