Last Updated on October 29, 2022
You can call the apply_async() method to issue asynchronous tasks to the ThreadPool.
In this tutorial you will discover how to issue one-off asynchronous tasks to the ThreadPool in Python.
Let’s get started.
Need to Issue Tasks To The ThreadPool
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
The ThreadPool allows you to issue tasks in the form of target functions to be executed by the worker threads.
How can we issue one-off asynchronous tasks to the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How to Use apply_async()
We can issue one-off tasks to the ThreadPool using the apply_async() method.
Asynchronous means that the call to the ThreadPool does not block, calling the caller that issued the task to carry on.
The apply_async() method takes the name of the function to execute in a worker thread and returns immediately with a AsyncResult object for the task.
For example:
1 2 3 |
... # issue a task asynchronously to the thread pool result = pool.apply_async(task) |
A variant of the apply() method which returns a AsyncResult object.
— multiprocessing — Process-based parallelism
If the target function takes arguments they can be specified as a tuple to the “args” argument or as a dictionary to the “kwds” argument.
For example:
1 2 3 |
... # issue a task asynchronously to the thread pool with arguments result = pool.apply_async(task, args=(arg1, arg2, arg3)) |
A callback function can be called automatically if the task was successful, e.g. no error or exception.
The callback function must take one argument, the result of the target function.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed …
— multiprocessing — Process-based parallelism
The function is specified via the “callback” argument to the apply_async() function.
For example:
1 2 3 4 5 6 7 |
# callback function def custom_callback(result): print(f'Got result: {result}') ... # issue a task asynchronously to the thread pool with a callback result = pool.apply_async(task, callback=custom_callback) |
Similarly, an error callback function can be specified via the “error_callback” argument that is called only when an unexpected error or exception is raised.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
— multiprocessing — Process-based parallelism
The error callback function must take one argument, which is the instance of the error or exception that was raised.
For example:
1 2 3 4 5 6 7 |
# error callback function def custom_error_callback(error): print(f'Got error: {error}') ... # issue a task asynchronously to the thread pool with an error callback result = pool.apply_async(task, error_callback=custom_error_callback) |
Difference Between apply_async vs apply()
How does the apply_async() method compare to the apply() for issuing tasks?
Both the apply_async() and apply() may be used to issue one-off tasks to the ThreadPool.
The following summarizes the key differences between these two methods:
- The apply_async() method does not block, whereas the apply() method does block.
- The apply_async() method returns an AsyncResult, whereas the apply() method returns the result of the target function.
- The apply_async() method can execute callback functions when the task is complete, whereas the apply() method cannot execute callback functions.
The apply_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.
The apply() method should be used for issuing target task functions to the ThreadPool where the caller can or must block until the task is complete.
Now that we know how to issue one-off tasks to the ThreadPool, let’s look at some worked examples.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of Calling apply_async()
The apply_async() method can be called directly to execute a target function in the ThreadPool.
The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.
The example below demonstrates this by defining a task that reports a message and blocks for one second.
The task() function implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # report a message print(f'Task done') |
We can then create and configure a ThreadPool with the default configuration.
1 2 3 |
... # create and configure the thread pool pool = ThreadPool() |
Next, we can issue the task() function to the ThreadPool.
1 2 3 |
... # issue tasks to the thread pool pool.apply_async(task) |
Finally, we can close the thread pool and release the resources and wait until all tasks in the pool are completed.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to finish pool.join() |
Note, if we did not call join() to wait for the tasks to complete, then it is possible that the main thread would exit and terminate the thread pool without completing the tasks.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # report a message print(f'Task done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool pool.apply_async(task) # close the thread pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool.
The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The main thread continues on and the program exits.
1 2 |
Task executing Task done |
Next, let’s look at an example of issuing a task to the ThreadPool that has arguments.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Calling apply_async() with Arguments
We can call apply_async() to issue a task to the ThreadPool that takes arguments.
This can be achieved by passing a tuple of arguments to the “args” argument or a dictionary of arguments to the “kwds” argument.
In this example, we can update the previous examples so that our task() function takes one argument that is then reported in printed messages.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(message): # report a message print(f'Task executing: {message}') # block for a moment sleep(1) # report a message print(f'Task done: {message}') |
We can then update the call to apply_async() to issue the task() function and specify a tuple containing one argument.
1 2 3 |
... # issue tasks to the thread pool pool.apply_async(task, args=('Hello world',)) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool with arguments from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(message): # report a message print(f'Task executing: {message}') # block for a moment sleep(1) # report a message print(f'Task done: {message}') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool pool.apply_async(task, args=('Hello world',)) # close the thread pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool with an argument.
The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the process pool close.
A worker thread executes the task() function, reporting messages with the provided argument, then sleeping for a second. The task is finished and returns.
The main thread continues on and the program exits.
1 2 |
Task executing: Hello world Task done: Hello world |
Next, let’s look at an example of issuing a task to the pool and handling the result with a callback.
Example of Calling apply_async() with a Callback Function
We can issue a task to the ThreadPool that returns a value and specify a callback function to handle the returned value.
This can be achieved via the “callback” argument.
In this example, we can update the above example so that the task() function generates a value and returns it. We can then define a function to handle the return value, in this case to simply report the value.
Firstly, we can define the function to call to handle the value returned from the function.
The return_callback() below implements this, taking one argument which is the value returned from the target task function.
1 2 3 |
# handle the return value callback def return_callback(result): print(f'Callback received: {result}') |
Next, we can update the task function to generate a random value between 0 and 1. It then reports the value, sleeps, then returns the value that was generated.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker thread def task(): # generate a random value value = random() # report a message print(f'Task generated {value}') # block for a moment sleep(1) # report a message print(f'Task done with {value}') # return the generated value return value |
Finally, we can update the call to apply_async() to specify the callback function via the “callback” argument, giving the name of our custom function.
1 2 3 |
... # issue tasks to the thread pool pool.apply_async(task, callback=return_callback) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool with a callback from random import random from time import sleep from multiprocessing.pool import ThreadPool # handle the return value callback def return_callback(result): print(f'Callback received: {result}') # task executed in a worker thread def task(): # generate a random value value = random() # report a message print(f'Task generated {value}') # block for a moment sleep(1) # report a message print(f'Task done with {value}') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool pool.apply_async(task, callback=return_callback) # close the thread pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool with the custom callback function.
The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The custom callback function is then called and passed the value returned from the target task function. The value is reported in a message by the callback function.
Finally, the main thread continues on and the program exits.
1 2 3 |
Task generated 0.6357624807921874 Task done with 0.6357624807921874 Callback received: 0.6357624807921874 |
Next, let’s look at an example of issuing a task to the pool with an error callback function.
Example of Calling apply_async() with an Error Callback Function
We can issue a task to the ThreadPool that may raise an unhandled exception and specify an error callback function to handle the exception.
This can be achieved via the “error_callback” argument.
In this example, we can update the above example so that the task() function raises an exception. We can then define a function to handle the raised example, in this case to simply report the details of the exception.
Firstly, we can define the function to call to handle an exception raised by the function.
The custom_error_callback() below implements this, taking one argument which is the exception raised by the target task function.
1 2 3 |
# handle any errors in the task function def custom_error_callback(error): print(f'Got an Error: {error}') |
Next, we can update the task() function so that it raises an exception that is not handled.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done') |
Finally, we can update the call to apply_async() to specify the error callback function via the “custom_error_callback” argument, giving the name of our custom function.
1 2 3 |
... # issue tasks to the thread pool pool.apply_async(task, error_callback=custom_error_callback) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool with an error callback from random import random from time import sleep from multiprocessing.pool import ThreadPool # handle any errors in the task function def custom_error_callback(error): print(f'Got an Error: {error}') # task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool pool.apply_async(task, error_callback=custom_error_callback) # close the thread pool pool.close() # wait for all tasks to finish pool.join() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool with the custom callback function.
The main thread then closes the ThreadPool and blocks until all tasks complete and all threads in the ThreadPool close.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.
The exception is trapped by the ThreadPool. Then the custom error callback function is then called and passed the raised exception. The details of the raised exception are then reported by the callback function.
Finally, the main thread continues on and the program exits.
1 2 |
Task executing Got an Error: Something bad happened |
Next, let’s look at an example of waiting for an issued task to complete.
Example of Calling apply_async() and Wait For Task to Complete
The apply_async() function returns an AsyncResult object that provides a handle on the asynchronous task.
We can wait for the issued task to complete via the wait() function on the AsyncResult object.
In this example, we can update the above example so that the AsyncResult object is assigned when the task is issued, then the main thread waits for the task to complete explicitly.
1 2 3 4 5 |
... # issue tasks to the thread pool result = pool.apply_async(task) # wait for the task to complete result.wait() |
The wait() function returns once the task is complete.
At this time, there are no further tasks in the ThreadPool. Therefore the main thread can close the pool and exit, without calling the join() function.
1 2 3 |
... # close the thread pool pool.close() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool and waiting for the task to complete from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # report a message print(f'Task done') # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool result = pool.apply_async(task) # wait for the task to complete result.wait() # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool and the returned AsyncResult object is assigned.
The main thread then waits on the returned AsyncResult object for the issued task to complete.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task is finished and returns.
The main thread then continues on and closes the ThreadPool.
1 2 |
Task executing Task done |
Next, let’s look at an example where we wait for the result of an issued task.
Example of Calling apply_async() and Wait For Result
The AsyncResult object returned when issuing a task via the apply_async() provides a way to access the value returned from the target task function.
This can be achieved by calling the get() function that returns the value from the function issued to the ThreadPool.
In this example, we can update the target task() function to generate a random value between 0 and 1 and return the value. We can then assign the AsyncResult object returned from apply_async() and call get the result. This will block until the task is completed and the result is returned.
Firstly, we can update the task() function to generate a random value, report the value, then return it.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker thread def task(): # generate a random value value = random() # report a message print(f'Task generated {value}') # block for a moment sleep(1) # report a message print(f'Task done with {value}') # return the generated value return value |
Next, we can issue the task and assign the AsyncResult object returned.
1 2 3 |
... # issue tasks to the thread pool result = pool.apply_async(task) |
We can then get the result from the AsyncResult object and report the value.
This will block until the task is complete and the result is returned.
1 2 3 4 5 |
... # wait for the return value value = result.get() # report the return value print(f'Got: {value}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool and wait for the result from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # generate a random value value = random() # report a message print(f'Task generated {value}') # block for a moment sleep(1) # report a message print(f'Task done with {value}') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool result = pool.apply_async(task) # wait for the return value value = result.get() # report the return value print(f'Got: {value}') # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool and the returned AsyncResult object is assigned.
The main thread then gets the result from the AsyncResult object, blocking until the the issued task is finished.
A worker thread executes the task() function, generates a value, reporting messages and sleeping for a second. The task is finished and returns the generated value.
The main thread then receives the value, reports it then continues on and closes the ThreadPool.
1 2 3 |
Task generated 0.0378375302457159 Task done with 0.0378375302457159 Got: 0.0378375302457159 |
Next, let’s look at an example of getting a return value from a function that raised an exception.
Example of Calling apply_async() with Exception
We can get a return value from a target task function issued to the ThreadPool via the AsyncResult.get().
If the target task function raises an exception that was not handled, then the get() function will re-raise the same exception.
This means, we may need to explicitly handle exceptions that may be raised in the target task function when getting a returned value from an issued task.
In this example, we will update the above example so that the target task() function raises an unhandled exception. We will then get the return value from the target task function via the returned AsyncResult object and handle the possible exception with a try-except pattern.
Firstly, we can update the target task() function that returns a value to raise an exception.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done') # return a value return "DONE!" |
We can then issue the task as before and assign the returned AsyncResult object.
1 2 3 |
... # issue tasks to the thread pool result = pool.apply_async(task) |
Finally, we can attempt to get the result from the target task function via the AsyncResult.get() function, and handle the exception with a try-except block.
1 2 3 4 5 6 |
... # wait for the return value try: value = result.get() except Exception as e: print(f'Failed with: {e}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of issuing a task with apply_async() to the thread pool and handle exception from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # report a message print(f'Task executing') # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # report a message print(f'Task done') # return a value return "DONE!" # protect the entry point if __name__ == '__main__': # create and configure the thread pool pool = ThreadPool() # issue tasks to the thread pool result = pool.apply_async(task) # wait for the return value try: value = result.get() except Exception as e: print(f'Failed with: {e}') # close the thread pool pool.close() |
Running the example first creates and configures the ThreadPool.
Next, the task() function is issued to the ThreadPool and the returned AsyncResult object is assigned.
The main thread then gets the result from the AsyncResult object, blocking until the the issued task is finished.
A worker thread executes the task() function, reporting messages and sleeping for a second. The task then raises an exception.
The main thread then catches the raised exception and reports the message. It then continues on and closes the ThreadPool.
1 2 |
Task executing Failed with: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to issue one-off tasks to the ThreadPool using the apply_async() method.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?