Last Updated on October 18, 2022
You can issue asynchronous tasks to the process pool which will return a multiprocessing.pool.AsyncResult object.
The AsyncResult provides a handle or issued tasks in the process pool and can be used to check on the status of the tasks and to get task results.
In this tutorial you will discover how to use the AsyncResult in Python.
Let’s get started.
What is the AsyncResult
An multiprocessing.pool.AsyncResult object is returned when issuing tasks to multiprocessing.pool.Pool the process pool asynchronously.
This can be achieved via any of the following methods on the process pool:
- Pool.apply_async() to issue one task.
- Pool.map_async() to issue multiple tasks.
- Pool.starmap_async() to issue multiple tasks that take multiple arguments.
A AsyncResult provides a handle on one or more issued tasks.
It allows the caller to check on the status of the issued tasks, to wait for the tasks to complete, and to get the results once tasks are completed.
Run loops using all CPUs, download your FREE book to learn how.
How to Use the AsyncResult
The AsyncResult class is straightforward to use.
First, you must get an AsyncResult object by issuing one or more tasks to the process pool using any of the apply_async(), map_async(), or starmap_async() functions.
For example:
1 2 3 |
... # issue a task to the process pool result = pool.apply_async(...) |
Once you have an AsyncResult object, you can use it to query the status and get results from the task.
How to Get a Result
We can get the result of an issued task by calling the AsyncResult.get() function.
Return the result when it arrives.
— multiprocessing — Process-based parallelism
This will return the result of the specific function called to issue the task.
- apply_async(): Returns the return value of the target function.
- map_async(): Returns an iterable over the return values of the target function.
- starmap_async(): Returns an iterable over the return values of the target function.
For example:
1 2 3 |
... # get the result of the task or tasks value = result.get() |
If the issued tasks have not yet completed, then get() will block until the tasks are finished.
A “timeout” argument can be specified. If the tasks are still running and do not complete within the specified number of seconds, a multiprocessing.TimeoutError is raised.
If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 |
... try: # get the task result with a timeout value = result.get(timeout=10) except multiprocessing.TimeoutError as e: # ... |
If an issued task raises an exception, the exception will be re-raised once the issued tasks are completed.
We may need to handle this case explicitly if we expect a task to raise an exception on failure.
If the remote call raised an exception then that exception will be re-raised by get().
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 |
... try: # get the task result that might raise an exception value = result.get() except Exception as e: # ... |
How to Wait For Task to Finish
We can wait for all tasks to complete via the AsyncResult.wait() function.
This will block until all issued tasks are completed.
For example:
1 2 3 |
... # wait for issued task to complete result.wait() |
If the tasks have already completed, then the wait() function will return immediately.
A “timeout” argument can be specified to set a limit in seconds for how long the caller is willing to wait.
Wait until the result is available or until timeout seconds pass.
— multiprocessing — Process-based parallelism
If the timeout expires before the tasks are complete, the wait() function will return.
When using a timeout, the wait() function does not give an indication that it returned because tasks completed or because the timeout elapsed. Therefore, we can check if the tasks completed via the ready() function.
For example:
1 2 3 4 5 6 7 8 9 10 |
... # wait for issued task to complete with a timeout result.wait(timeout=10) # check if the tasks are all done if result.ready() print('All Done') ... else : print('Not Done Yet') ... |
How to Check if Tasks Are Completed
We can check if the issued tasks are completed via the AsyncResult.ready() function.
Return whether the call has completed.
— multiprocessing — Process-based parallelism
It returns True if the tasks have completed, successfully or otherwise, or False if the tasks are still running.
For example:
1 2 3 4 5 6 |
... # check if tasks are still running if result.ready(): print('Tasks are done') else: print('Tasks are not done') |
How to Check if Tasks Were Successful
We can check if the issued tasks completed successfully via the AsyncResult.successful() function.
Issued tasks are successful if no tasks raised an exception.
If at least one issued task raised an exception, then the call was not successful and the successful() function will return False.
This function should be called after it is known that the tasks have completed, e.g. ready() returns True.
For example:
1 2 3 4 5 6 7 8 |
... # check if the tasks have completed if result.ready(): # check if the tasks were successful if result.successful(): print('Successful') else: print('Unsuccessful') |
If the issued tasks are still running, a ValueError is raised.
Return whether the call completed without raising an exception. Will raise ValueError if the result is not ready.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 7 |
... try: # check if the tasks were successful if result.successful(): print('Successful') except ValueError as e: print('Tasks still running') |
Now that we know how to use the AsyncResult, let’s look at some worked examples.
How to Get a AsyncResult
We can get an AsyncResult object via calling any of the apply_async(), map_async(), or starmap_async() functions to issue tasks to the process pool.
Let’s take a look at each example in turn.
Example of Getting a AsyncResult via apply_async()
We can explore how to get an AsyncResult object by issuing tasks using the apply_async() function.
In this example we will define a task function that takes an integer as an argument, generates a random number, reports the values, blocks for a fraction of a second, then returns the generated value.
Firstly, we can define the task function that takes an argument and returns the generated value.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value |
Next, in the main thread we can issue a task using the apply_async() function. We can specify the task() function and an argument of 0 via the “args” argument.
This will return an AsyncResult object immediately.
In the main thread, we can create the process pool. We will use the context manager interface to ensure the pool is terminated automatically once we are finished with it, and use the default configuration.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about creating a process pool using the context manager interface in the tutorial:
1 2 3 |
... # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) |
You can learn more about how to use the apply_async() function in the tutorial:
Finally, we can close the process pool to prevent any further tasks being issued to the process pool, then join the pool to wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
You can learn more about joining the process pool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting an asyncresult via apply_async() from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then closes the process pool and waits for all issued tasks to complete.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The issued task completes, and the main process continues on and terminates the program.
1 |
Task 0 executing with 0.1920938036089912 |
Example of Getting a AsyncResult via map_async()
We can explore how to get an AsyncResult object by issuing tasks using the map_async() function.
In this example, we can update the previous example to issue multiple calls to the task() function with different arguments from 0 to 9.
This can be achieved by calling the map_async() and specifying the task() function and a range of arguments.
1 2 3 |
... # issue tasks to the process pool result = pool.map_async(task, range(10)) |
This call will return immediately with an AsyncResult object.
You can learn more about how to use the map_async() function in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting an asyncresult via map_async() from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(10)) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool with the default configuration.
A total of 10 tasks are issued to the process pool and a single AsyncResult object is returned immediately.
The main process then closes the process pool and waits for all issued tasks to complete.
The issued tasks are executed by the process pool. Each task generates a random number, reports a message, blocks for a moment, then returns the generated value.
All issued tasks complete and the main process continues on and terminates the program.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.06293210850179787 Task 1 executing with 0.7078155808857848 Task 2 executing with 0.27007672662037396 Task 3 executing with 0.12754897420511724 Task 4 executing with 0.5141327108756916 Task 5 executing with 0.8290227657532473 Task 6 executing with 0.8402138244392698 Task 7 executing with 0.9160303048960673 Task 8 executing with 0.20362210990940766 Task 9 executing with 0.5226266581935975 |
Example of Getting a AsyncResult via starmap_async()
We can explore how to get an AsyncResult object by issuing tasks using the starmap_async() function.
In this example, we can update the previous example so that the task() function takes two arguments and that we issue multiple calls to the task() function with two arguments per function call.
This can be achieved by calling the starmap_async().
First, we must update the task() function to take two arguments. In this case it will take the integer number as before, and generate a floating point value between 0 and 1, instead of generating it itself.
The updated task() function is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value |
Next, we can prepare arguments for each call to the task() function.
In this case, we will use a list comprehension to prepare a list of arguments, where each item in the list is a tuple of arguments for a call to the task() function.
1 2 3 |
... # prepare arguments items = [(i, random()) for i in range(10)] |
We can then issue tasks to the process pool by specifying the task() function and the list of arguments for each task.
1 2 3 |
... # issue tasks to the process pool result = pool.starmap_async(task, items) |
This call will return immediately with an AsyncResult object.
You can learn more about how to use the starmap_async() function in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting an asyncresult via starmap_async() from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issue tasks to the process pool result = pool.starmap_async(task, items) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool with the default configuration.
A total of 10 tasks are issued to the process pool and a single AsyncResult object is returned immediately.
The main process then closes the process pool and waits for all issued tasks to complete.
The issued tasks are executed by the process pool. Each task reports a message, blocks for a moment, then returns the floating point value.
All issued tasks complete and the main process continues on and terminates the program.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.6483495161346381 Task 1 executing with 0.06752928130815483 Task 2 executing with 0.24228992400799954 Task 3 executing with 0.5838559409210107 Task 4 executing with 0.06536703787588205 Task 5 executing with 0.19152849834963948 Task 6 executing with 0.5499248833016134 Task 7 executing with 0.5353222309212269 Task 8 executing with 0.8891503883724293 Task 9 executing with 0.6692238800285062 |
Now that we know how to get a AsyncResult, let’s look at how we might get results from completed tasks
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
How to Get a Result From a AsyncResult
In this section we will explore how to get a result for issued tasks via an AsyncResult.
In these examples, we will use the apply_async() function to issue one task as a pretext for exploring how to get a result. Nevertheless, the examples are general and just as applicable to getting results from an AsyncResult created via map_async() and starmap_async().
Example of Getting an AsyncResult Result
We can explore how to get the results of issued tasks from an AsyncResult.
This can be achieved by updating the previous example that gets an AsyncResult by calling apply_async(), then calling the get() function on an AsyncResult to get the result of the issued task.
For example:
1 2 3 |
... # wait for and get the result from the task value = result.get() |
The call will block until the task is complete, then returns the return value from the issued task, in this case from the task() function.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of getting a result from a asyncresult from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # wait for and get the result from the task value = result.get() # report the result print(f'Got: {value}') |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then attempts to get the result from the issued task. It blocks until the task is completed.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main process receives the result from the completed task and reports the value with a message.
1 2 |
Task 0 executing with 0.8386366803150449 Got: 0.8386366803150449 |
Example of Getting an AsyncResult Result With a Timeout
We can explore getting a result from an asynchronous task with a timeout.
This can be achieved by updating the previous example and calling the get() function and specifying a “timeout” argument in seconds.
We can use a timeout of half a second, meaning that half the time we will get a result on time, and half the time the timeout will elapse before we get a result.
1 2 3 4 5 |
... # wait for and get the result from the task with a timeout value = result.get(timeout=0.5) # report the result print(f'Got: {value}') |
If the timeout elapses before we get a result, then an exception is raised.
We will handle this exception and report a failure message.
1 2 3 4 5 6 7 8 |
... try: # wait for and get the result from the task with a timeout value = result.get(timeout=0.5) # report the result print(f'Got: {value}') except TimeoutError: print(f'Failed to get the result') |
If the exception is raised the task may not complete because the process pool will be terminated by the context manager.
Therefore we can close the pool and wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of getting a result with a timeout from a asyncresult from random import random from time import sleep from multiprocessing.pool import Pool from multiprocessing import TimeoutError # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) try: # wait for and get the result from the task with a timeout value = result.get(timeout=0.5) # report the result print(f'Got: {value}') except TimeoutError: print(f'Failed to get the result') # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then attempts to get the result from the issued task with a timeout. It blocks until the task is completed or the timeout elapses.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
In this case, the main process receives the result from the completed task and reports the value with a message.
1 2 |
Task 0 executing with 0.09356907131129166 Got: 0.09356907131129166 |
We can run the example a few times, sometimes the main process will get a result and sometimes it will not.
Rerunning the example again, we may get the failure case.
In this case, the timeout elapses before the task is completed.
An exception is raised by the process pool and caught by the main process. A failure message is reported.
The process pool is then closed and the main process blocks until all issued tasks have completed.
1 2 |
Task 0 executing with 0.9196687642240333 Failed to get the result |
Example of Getting an AsyncResult Result With Exception
We can explore getting a result from issued tasks where at least one task fails with an unhandled exception.
This can be achieved by updating the previous example so that the task function raises an exception. We can then attempt to get the result from the issued task in the main process and handle the exception that will be re-raised by the process pool.
Firstly, we can update the task() function so that it raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value |
We can then attempt to get the result from the issued task via the get() function on the AsyncResult.
The process pool will re-raise the exception when we attempt to get the result, therefore we will handle the exception when getting the result.
1 2 3 4 5 6 7 8 |
... try: # wait for and get the result from the task value = result.get() # report the result print(f'Got: {value}') except Exception as e: print(f'Failed to get the result: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of getting a result from a asyncresult that raises an exception from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) try: # wait for and get the result from the task value = result.get() # report the result print(f'Got: {value}') except Exception as e: print(f'Failed to get the result: {e}') |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then attempts to get the result from the issued task. It blocks until the task is completed.
The issued task is executed, generating a random number, reporting a message, blocking, then raises an exception.
The task is completed, but unsuccessfully.
The main process handles the re-raised exception from the completed task and reports the failure with a message.
1 2 |
Task 0 executing with 0.295456453333674 Failed to get the result: Something bad happened |
Next, let’s explore how we might wait for issued tasks to complete.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Wait For Tasks to Complete With AsyncResult
We can wait for all tasks associated with an AsyncResult object to complete.
This can be achieved using the wait() function, which may also take a timeout in seconds.
Let’s explore each case of waiting for tasks.
Example of Waiting for a AsyncResult Results
We can explore how to wait for all issued tasks to complete.
This can be achieved by updating the previous example to call the wait() function instead of getting the result.
The wait() function will only return after all issued tasks associated with the AsyncResult object have completed.
Recall, a task is completed if it finishes normally or raises an exception.
For example:
1 2 3 |
... # wait for tasks to complete result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of waiting for a result from a asyncresult from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait() |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then waits for the issued task to complete, blocking until the wait() function returns.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main process unblocks and continues on, terminating the application.
1 |
Task 0 executing with 0.2583807813645548 |
Example of Waiting for AsyncResult Results with a Timeout
We can explore waiting for all issued tasks to complete with a timeout.
This can be achieved by updating the previous example of waiting for all tasks associated with an AsyncResult to complete, then specifying a “timeout” argument.
We will use a timeout of half a second, meaning that half the time the issued task will complete and the wait() function will return normally and the other half of the time the timeout will elapse and the wait() function will return.
1 2 3 |
... # wait for tasks to complete result.wait(timeout=0.5) |
Note, the wait() function does not return a value. This means that it does not give an indication of whether the call returned because the associated tasks completed or because the timeout elapses. Therefore the ready() function should be called to check if the tasks are completed.
Finally, because the wait() function may have returned because the timeout elapsed, we can wait for all issued tasks in the pool to complete by joining the pool.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of waiting for a result from a asyncresult with a timeout from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait(timeout=0.5) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then waits for the issued task to complete with a timeout, blocking until the wait() function returns or the timeout elapses.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main process unblocks after the task completes or the timeout.
The main process then closes the process pool and waits for all issued tasks to complete.
1 |
Task 0 executing with 0.50649006909379 |
Next, let’s explore how to check the status of tasks issued by the process pool.
How to Check the Status of Tasks via AsyncResult
We can check the status of all tasks issued to a process pool associated with an AsyncResult.
This can be achieved via the ready() function and the successful() function.
Let’s take examples of each in turn.
Example of Checking if AsyncResult Results are Ready
We can explore checking whether issued tasks have completed via the ready() function.
Recall that a completed task may or may not have completed successfully (e.g. raised an exception).
We can update the previous example to sleep for a fraction of a second after issuing the task, then checking if the tasks were completed. We can then report a message if the task is completed and a different message if the task is not completed.
1 2 3 4 5 6 7 8 |
... # wait a moment sleep(0.5) # check if the tasks are complete if result.ready(): print('Tasks are complete') else: print('Tasks are not complete') |
Because the task itself will block for a random fraction of a second we will get a different result each time the program is run.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of checking if tasks are finished via a asyncresult from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # wait a moment sleep(0.5) # check if the tasks are complete if result.ready(): print('Tasks are complete') else: print('Tasks are not complete') # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then blocks for a fraction of a second.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
It then checks if the issued task is completed or not.
In this case, the task was completed by the time the check was performed.
1 2 |
Task 0 executing with 0.07379032454197942 Tasks are complete |
Note, you will get different results each time the program is run given the use of random numbers.
We can explore a failure case.
Running the program again, the main process checks if the issued tasks have completed.
In this case the task has not been completed. A different message is then reported.
The process pool is then closed and the main process blocks until all issued tasks have completed.
1 2 |
Task 0 executing with 0.607946567005606 Tasks are not complete |
Example of Checking if Tasks are Completed Successfully
We can explore whether issued tasks were completed successfully via the successful() function.
We can update the previous example so that we first wait for the issued task to complete by calling the wait() function.
1 2 3 |
... # wait for tasks to complete result.wait() |
We can then call the successful() function and report a message if the issued task was completed successfully, which we expect it was in this case.
1 2 3 4 5 6 |
... # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of checking if tasks were successful via a asyncresult from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait() # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then waits for the issued task to complete, blocking until the wait() function returns.
The issued task is executed, generating a random number, reporting a message, blocking, then returning the generated value.
The main process unblocks and continues on. It checks the success of the tasks associated with the AsyncResult object.
The task was completed successfully and an appropriate message was reported.
1 2 |
Task 0 executing with 0.8043870405211134 Tasks were successful |
Example of Tasks Completed Unsuccessfully
We can explore checking the status of a completed task via the successful() function that we know was not completed successfully.
This can be achieved by updating the previous example so that the task() function raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value |
We then expect that checking the success of the issued task will then report a different message.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of checking if tasks were successful via a asyncresult and they are not from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # fail raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # wait for tasks to complete result.wait() # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then waits for the issued task to complete, blocking until the wait() function returns.
The issued task is executed, generating a random number, reporting a message, blocking, then raises an exception.
The main process unblocks and continues on. It checks the success of the tasks associated with the AsyncResult object.
The task was not completed successfully this time and an appropriate message is reported.
1 2 |
Task 0 executing with 0.2618913820370421 Tasks were not successful |
Example of Tasks Not Completed
We can explore checking the status of a task that is not yet complete via the successful() function.
The expectation is that the function will raise an exception as the task is not yet completed.
This can be achieved by updating the example in the previous section to check the successful status of the task immediately after it is issued.
1 2 3 4 5 6 7 8 |
... # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
This will raise an exception as the task is not completed
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of checking if tasks were successful via a asyncresult and they are not complete from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,)) # check if the tasks were successful if result.successful(): print('Tasks were successful') else: print('Tasks were not successful') |
Running the example first creates the process pool with the default configuration.
A task is issued which returns an AsyncResult object immediately.
The main process then checks the success of the tasks associated with the AsyncResult object.
The task was not yet complete and an exception was raised.
1 2 3 |
Traceback (most recent call last): ... ValueError: <multiprocessing.pool.ApplyResult object at 0x1015077c0> not ready |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use an AsyncResult object associated with tasks issued to the process pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Vidar Nordli-Mathisen on Unsplash
Do you have any questions?