Last Updated on September 12, 2022
You can call Pool.apply_async() to issue an asynchronous tasks to the multiprocessing.pool.Pool process pool.
In this tutorial you will discover how to issue one-off asynchronous tasks to the process pool in Python.
Let’s get started.
Need to Issue Tasks To The Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
The process pool allows you to issue tasks in the form of target functions to be executed by the worker processes.
How can we issue one-off asynchronous tasks to the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Use apply_async()
We can issue one-off tasks to the process pool using the apply_async() function.
Asynchronous means that the call to the process pool does not block, allowing the caller that issued the task to carry on.
The apply_async() function takes the name of the function to execute in a worker process and returns immediately with a AsyncResult object for the task.
For example:
1 2 3 |
... # issue a task asynchronously to the process pool result = pool.apply_async(task) |
A variant of the apply() method which returns a AsyncResult object.
— multiprocessing — Process-based parallelism
If the target function takes arguments they can be specified as a tuple to the “args” argument or as a dictionary to the “kwds” argument.
For example:
1 2 3 |
... # issue a task asynchronously to the process pool with arguments result = pool.apply_async(task, args=(arg1, arg2, arg3)) |
A callback function can be called automatically if the task was successful, e.g. no error or exception.
The callback function must take one argument, the result of the target function.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed …
— multiprocessing — Process-based parallelism
The function is specified via the “callback” argument to the apply_async() function.
For example:
1 2 3 4 5 6 7 |
# callback function def custom_callback(result): print(f'Got result: {result}') ... # issue a task asynchronously to the process pool with a callback result = pool.apply_async(task, callback=custom_callback) |
Similarly, an error callback function can be specified via the “error_callback” argument that is called only when an unexpected error or exception is raised.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
— multiprocessing — Process-based parallelism
The error callback function must take one argument, which is the instance of the error or exception that was raised.
For example:
1 2 3 4 5 6 7 |
# error callback function def custom_error_callback(error): print(f'Got error: {error}') ... # issue a task asynchronously to the process pool with an error callback result = pool.apply_async(task, error_callback=custom_error_callback) |
Difference Between apply_async vs apply()
How does the apply_async() function compare to the apply() for issuing tasks?
Both the apply_async() and apply() may be used to issue one-off tasks to the process pool.
The following summarizes the key differences between these two functions:
- The apply_async() function does not block, whereas the apply() function does block.
- The apply_async() function returns an AsyncResult, whereas the apply() function returns the result of the target function.
- The apply_async() function can execute callback functions when the task is complete, whereas the apply() function cannot execute callback functions.
The apply_async() function should be used for issuing target task functions to the process pool where the caller cannot or must not block while the task is executing.
The apply() function should be used for issuing target task functions to the process pool where the caller can or must block until the task is complete.
You can learn more about the Pool.apply() function in the tutorial:
Now that we know how to issue one-off tasks to the process pool, let’s look at some worked examples.
Example Pool.apply_async()
The apply_async() function can be called directly to execute a target function in the process pool.
The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.
The example below demonstrates this by defining a task that reports a message and blocks for one second.
The task() function implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # report a message print(f'Task done', flush=True) |
We can then create and configure a process pool with the default configuration.
1 2 3 |
... # create and configure the process pool pool = Pool() |
Next, we can issue the task() function to the process pool.
1 2 3 |
... # issue tasks to the process pool pool.apply_async(task) |
Finally, we can close the process pool and release the resources and wait until all tasks in the pool are completed.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to finish pool.join() |
Note, if we did not call join() to wait for the tasks to complete, then it is possible that the main process would exit and terminate the process pool without completing the tasks.
You can learn more about joining the process pool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool.
The main process then closes the process pool and blocks until all tasks complete and all processes in the process pool close.
A worker process executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The main process continues on and the program exits.
1 2 |
Task executing Task done |
Next, let’s look at an example of issuing a task to the process pool that has arguments.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Pool.apply_async() with Arguments
We can call apply_async() to issue a task to the process pool that takes arguments.
This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.
In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(message): # report a message print(f'Task executing: {message}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done: {message}', flush=True) |
We can then update the call to apply_async() to issue the task() function and specify a tuple containing one argument.
1 2 3 |
... # issue tasks to the process pool pool.apply_async(task, args=('Hello world',)) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool with arguments from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(message): # report a message print(f'Task executing: {message}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done: {message}', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool pool.apply_async(task, args=('Hello world',)) # close the process pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool with an argument.
The main process then closes the process pool and blocks until all tasks complete and all processes in the process pool close.
A worker process executes the task() function, reporting messages with the provided argument, then sleeping for a second. The task is finished and returns.
The main process continues on and the program exits.
1 2 |
Task executing: Hello world Task done: Hello world |
Next, let’s look at an example of issuing a task to the pool and handling the result with a callback.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Pool.apply_async() with a Callback Function
We can issue a task to the process pool that returns a value and specify a callback function to handle the returned value.
This can be achieved via the “callback” argument.
In this example, we can update the above example so that the task() function generates a value and returns it. We can then define a function to handle the return value, in this case to simply report the value.
Firstly, we can define the function to call to handle the value returned from the function.
The return_callback() below implements this, taking one argument which is the value returned from the target task function.
1 2 3 |
# handle the return value callback def return_callback(result): print(f'Callback received: {result}', flush=True) |
Next, we can update the task function to generate a random value between 0 and 1. It then reports the value, sleeps, then returns the value that was generated.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(): # generate a random value value = random() # report a message print(f'Task generated {value}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done with {value}', flush=True) # return the generated value return value |
Finally, we can update the call to apply_async() to specify the callback function via the “callback” argument, giving the name of our custom function.
1 2 3 |
... # issue tasks to the process pool pool.apply_async(task, callback=return_callback) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool with a callback from random import random from time import sleep from multiprocessing.pool import Pool # handle the return value callback def return_callback(result): print(f'Callback received: {result}', flush=True) # task executed in a worker process def task(): # generate a random value value = random() # report a message print(f'Task generated {value}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done with {value}', flush=True) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool pool.apply_async(task, callback=return_callback) # close the process pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool with the custom callback function.
The main process then closes the process pool and blocks until all tasks complete and all processes in the process pool close.
A worker process executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The custom callback function is then called and passed the value returned from the target task function. The value is reported in a message by the callback function.
Finally, the main process continues on and the program exits.
1 2 3 |
Task generated 0.9289777262614618 Task done with 0.9289777262614618 Callback received: 0.9289777262614618 |
Next, let’s look at an example of issuing a task to the pool with an error callback function.
Example of Pool.apply_async() with an Error Callback Function
We can issue a task to the process pool that may raise an unhandled exception and specify an error callback function to handle the exception.
This can be achieved via the “error_callback” argument.
In this example, we can update the above example so that the task() function raises an exception. We can then define a function to handle the raised example, in this case to simply report the details of the exception.
Firstly, we can define the function to call to handle an exception raised by the function.
The custom_error_callback() below implements this, taking one argument which is the exception raised by the target task function.
1 2 3 |
# handle any errors in the task function def custom_error_callback(error): print(f'Got an Error: {error}', flush=True) |
Next, we can update the task() function so that it raises an exception that is not handled.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) |
Finally, we can update the call to apply_async() to specify the error callback function via the “custom_error_callback” argument, giving the name of our custom function.
1 2 3 |
... # issue tasks to the process pool pool.apply_async(task, error_callback=custom_error_callback) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool with an error callback from random import random from time import sleep from multiprocessing.pool import Pool # handle any errors in the task function def custom_error_callback(error): print(f'Got an Error: {error}', flush=True) # task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool pool.apply_async(task, error_callback=custom_error_callback) # close the process pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool with the custom callback function.
The main process then closes the process pool and blocks until all tasks complete and all processes in the process pool close.
A worker process executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.
The exception is trapped by the process pool. Then the custom error callback function is then called and passed the raised exception. The details of the raised exception are then reported by the callback function.
Finally, the main process continues on and the program exits.
1 2 |
Task executing Got an Error: Something bad happened |
Next, let’s look at an example of waiting for an issued task to complete.
Example of Pool.apply_async() and Wait For Task to Complete
The apply_async() function returns an AsyncResult object that provides a handle on the asynchronous task.
We can wait for the issued task to complete via the wait() function on the AsyncResult object.
In this example, we can update the above example so that the AsyncResult object is assigned when the task is issued, then the main process waits for the task to complete explicitly.
1 2 3 4 5 |
... # issue tasks to the process pool result = pool.apply_async(task) # wait for the task to complete result.wait() |
The wait() function returns once the task is complete.
At this time, there are no further tasks in the process pool. Therefore the main process can close the pool and exit, without calling the join() function.
1 2 3 |
... # close the process pool pool.close() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool and waiting for the task to complete from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool result = pool.apply_async(task) # wait for the task to complete result.wait() # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool and the returned AsyncResult object is assigned.
The main process then waits on the returned AsyncResult object for the issued task to complete.
A worker process executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The main process then continues on and closes the process pool.
1 2 |
Task executing Task done |
Next, let’s look at an example where we wait for the result of an issued task.
Example of Pool.apply_async() and Wait For Result
The AsyncResult object returned when issuing a task via the apply_async() provides a way to access the value returned from the target task function.
This can be achieved by calling the get() function that returns the value from the function issued to the process pool.
In this example, we can update the target task() function to generate a random value between 0 and 1 and return the value. We can then assign the AsyncResult object returned from apply_async() and call get the result. This will block until the task is completed and the result is returned.
Firstly, we can update the task() function to generate a random value, report the value, then return it.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(): # generate a random value value = random() # report a message print(f'Task generated {value}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done with {value}', flush=True) # return the generated value return value |
Next, we can issue the task and assign the AsyncResult object returned.
1 2 3 |
... # issue tasks to the process pool result = pool.apply_async(task) |
We can then get the result from the AsyncResult object and report the value.
This will block until the task is complete and the result is returned.
1 2 3 4 5 |
... # wait for the return value value = result.get() # report the return value print(f'Got: {value}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool and wait for the result from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # generate a random value value = random() # report a message print(f'Task generated {value}', flush=True) # block for a moment sleep(1) # report a message print(f'Task done with {value}', flush=True) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool result = pool.apply_async(task) # wait for the return value value = result.get() # report the return value print(f'Got: {value}') # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool and the returned AsyncResult object is assigned.
The main process then gets the result from the AsyncResult object, blocking until the the issued task is finished.
A worker process executes the task() function, generates a value, reporting messages and sleeping for a second. The task is finished and returns the generated value.
The main process then receives the value, reports it then continues on and closes the process pool.
1 2 3 |
Task generated 0.5761970564452767 Task done with 0.5761970564452767 Got: 0.5761970564452767 |
Next, let’s look at an example of getting a return value from a function that raised an exception.
Example of Pool.apply_async() with Exception
We can get a return value from a target task function issued to the process pool via the AsyncResult.get().
If the target task function raises an exception that was not handled, then the get() function will re-raise the same exception.
This means, we may need to explicitly handle exceptions that may be raised in the target task function when getting a returned value from an issued task.
In this example, we will update the above example so that the target task() function raises an unhandled exception. We will then get the return value from the target task function via the returned AsyncResult object and handle the possible exception with a try-except pattern.
Firstly, we can update the target task() function that returns a value to raise an exception.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # return a value return "DONE!" |
We can then issue the task as before and assign the returned AsyncResult object.
1 2 3 |
... # issue tasks to the process pool result = pool.apply_async(task) |
Finally, we can attempt to get the result from the target task function via the AsyncResult.get() function, and handle the exception with a try-except block.
1 2 3 4 5 6 |
... # wait for the return value try: value = result.get() except Exception as e: print(f'Failed with: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of issuing a task with apply_async() to the process pool and handle exception from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print(f'Task executing', flush=True) # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # return a value return "DONE!" # protect the entry point if __name__ == '__main__': # create and configure the process pool pool = Pool() # issue tasks to the process pool result = pool.apply_async(task) # wait for the return value try: value = result.get() except Exception as e: print(f'Failed with: {e}') # close the process pool pool.close() |
Running the example first creates and configures the process pool.
Next, the task() function is issued to the process pool and the returned AsyncResult object is assigned.
The main process then gets the result from the AsyncResult object, blocking until the the issued task is finished.
A worker process executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.
The main process then catches the raised exception and reports the message. It then continues on and closes the process pool.
1 2 |
Task executing Failed with: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to issue one-off tasks to the process pool using the apply_async() function.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Juli Kosolapova on Unsplash
Do you have any questions?