Last Updated on September 12, 2022
You can add done callback functions to tasks in the ProcessPoolExecutor by calling the add_done_callback() function.
In this tutorial, you will discover how to add callbacks for tasks submitted to a Python process pool.
Let’s get started.
Need to Add Callback Functions
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
After submitting tasks to the process pool, you may want to call a function for each task as it is completed.
This may be for many reasons, such as:
- Handling the result from the task.
- Handling any exceptions from the task.
- Updating and reporting the progress of all tasks.
How can you automatically call a function for each task as it is completed?
Run loops using all CPUs, download your FREE book to learn how.
How to Add a Done Callback
You can add a callback to a task via the Future object for the task.
Recall that you will get a Future object for each task submitted to the ProcessPoolExecutor via a call to the submit() function.
Once submitted, you can call the add_done_callback() to add a callback function to your task.
1 2 3 |
... # add a callback to a task future.add_done_callback(my_callback) |
Your callback function must take one argument, which is the Future object for the task that is completed.
1 2 3 |
# example callback function def my_callback(future) # do something... |
You can add multiple callback functions to a task and they will be called in the order that they were added.
1 2 3 4 5 |
... # add a callback to a task future.add_done_callback(my_callback) # add another callback to the task future.add_done_callback(my_other_callback) |
Callback functions will be called when:
- The task finishes running and is done.
- The task raises an exception and is done.
- The task is cancelled and is done.
If the task is already completed when the callback is added, the callback function will be executed immediately.
If a callback function raises an exception, it will not prevent any other registered callbacks from being called and executed.
You cannot add a callback function to tasks added to the process pool via a call to map().
Now that we know how to add callback functions to tasks, let’s look at a worked example.
Example of Adding a Callback Function
Let’s look at a worked example of adding a callback to a task in the Python process pool.
Firstly, let’s define a task that will block for a moment and report when it is completed.
1 2 3 4 |
# task that will sleep for a moment def work(): sleep(1) print('The task is done.') |
Next, let’s define a custom callback function to be called once the task has finished executing.
Recall that the custom callback function must take the Future object as an argument.
Forgetting the Future object in the signature of your function will result in an error when the callback is called once the task is done.
1 2 3 |
# callback function to call when a task is completed def custom_callback(future): print('The custom callback was called.') |
Next, we can create a process pool using the context manager and call submit() to put the task into the process pool for execution.
This will return a Future object, which we will require in a moment to add our callback function.
1 2 3 4 5 |
... # create a process pool with ProcessPoolExecutor() as executor: # execute the task future = executor.submit(work) |
Next, we can register the custom callback function with the task by calling the add_done_callback() function on the Future object and passing it the name of our custom callback function.
1 2 3 |
... # add the custom callback future.add_done_callback(custom_callback) |
That’s it.
Once the task is complete, we expect our custom callback function to be called.
Tying this together, the complete example of adding a custom callback to a task in a ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of adding a callback to a task in the process pool from time import sleep from concurrent.futures import ProcessPoolExecutor # callback function to call when a task is completed def custom_callback(future): print('The custom callback was called.') # mock task that will sleep for a moment def work(): sleep(1) print('The task is done.') # entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as executor: # execute the task future = executor.submit(work) # add the custom callback future.add_done_callback(custom_callback) # wait for all tasks to complete... |
Running the example first creates the process pool and submits the task.
The custom callback is then added to the task via its Future object.
The task completes then reports a message, then the callback is called, reporting a second message.
1 2 |
The task is done. The custom callback was called. |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example Callback For Tasks That Raise an Exception
The callback is still called, even if the task raises an exception.
We can demonstrate this by updating the task to raise an Exception, for example:
1 2 3 4 5 |
# task that will sleep for a moment def work(): sleep(1) raise Exception('Something bad') print('The task is done.') |
Tying this together, the complete example of adding a callback for a task known to raise an exception is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of adding a callback to a task in the process pool from time import sleep from concurrent.futures import ProcessPoolExecutor # callback function to call when a task is completed def custom_callback(future): print('The custom callback was called.') # task that will sleep for a moment def work(): sleep(1) raise Exception('Something bad') print('The task is done.') # entry point if __name__ == '__main__': # create a process pool with ProcessPoolExecutor() as executor: # execute the task future = executor.submit(work) # add the custom callback future.add_done_callback(custom_callback) # wait for all tasks to complete... |
Running the example, the process pool is created and the task is submitted as per normal.
The task runs then raises an exception before completion.
The task is marked done internally, and the callback function is called.
1 |
The custom callback was called. |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to add a callback to tasks in the ProcessPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Simon Maage on Unsplash
Do you have any questions?