Last Updated on September 12, 2022
You can learn more about asynchronous tasks that fail silently in the multiprocessing pool by adding an error callback, by explicitly getting the task return value, or by issuing the task synchronously.
In this tutorial you will discover how to learn more about asynchronous tasks that fail silently in the Python process pool.
Let’s get started.
Need To Fix Tasks Failing Silently in the Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When issuing tasks to the process pool, it is possible for asynchronous tasks to fail silently.
How can we learn more about and fix tasks that fail silently in the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Learn More About Asynchronous Tasks That Fail Silently
Tasks issued asynchronous to the process pool may fail silently.
For this to happen, the requires are:
- Those tasks are issued using apply_async(), map_async(), or starmap_async().
- No error callback is being used.
- Results from issued tasks are not being retrieved.
This is a problem as if a task fails silently in the process pool, we may have no way of knowing.
There are three ways we can learn more about asynchronous tasks that fail silently in the process pool.
They are:
- Add an error callback.
- Explicitly get task results.
- Issue the task synchronously.
Let’s take a closer look at each in turn.
Add An Error Callback
An error callback can be added when issuing tasks asynchronously.
This can be achieved by defining a function that takes a raised error or exception as an argument and handles it in some way.
For example:
1 2 3 |
# handle raised errors def handle_error(error) print(error, flush=True) |
We can then configure asynchronous tasks issued with apply_async(), map_async(), or starmap_async() to call the custom function when an error is raised in a task and not handled.
This requires that we set the “error_callback” to the name of the custom function.
For example:
1 2 3 |
... # issue a task with an error callback result = pool.apply(..., error_callback=handle_error) |
Now, when an error is raised in an issued task, the custom function will be called, providing more information about the error instead of failing silently.
Explicitly Get Task Results
A way to expose errors raised in asynchronous tasks is to explicitly get the results of issued tasks.
If a task raises an error or exception while executing that was not handled, it will be re-raised when attempting to get the return value from the task.
Recall that when we issue an asynchronous task, we receive an AsyncResult object in return.
We can use the get() function to retrieve the return value of the task in the case of apply_async() or an iterable of return values for all issue tasks in the case of map_async() and starmap_async().
For example:
1 2 3 4 5 |
... # issue a task to the process pool result = pool.apply(..., error_callback=handle_error) # get the result of the task result.get() |
We can then choose to handle the error, if desired.
This will work, even if your custom task function does not return a value, e.g. it returns None. If it raises an error or exception, it will still be re-raised.
Issue Tasks Synchronously
A final way of exposing an error in an asynchronous task that has failed silently is to change it to be synchronous.
THat is, we can use the synchronous version of the function to issue the task instead of the asynchronous function.
For example:
- Use apply() instead of apply_async().
- Use map() instead of map_async().
- Use starmap() instead of starmap_async().
If a task issued synchronously raises an exception, the exception will be re-raised when getting the return value from the function which happens automatically when using a synchronous method to issue tasks on the process pool.
For example, if a task was issued asynchronously with apply_async(), we can change it to apply() and have the exception re-raised in the calling process.
1 2 3 |
... # issue a task synchronously pool.apply(...) |
This will work, even if your custom task function does not return a value, e.g. it returns None. If it raises an error or exception, it will still be re-raised.
Now that we know how to learn more about tasks that fail silently in the process pool, let’s look at some worked examples.
Example of Asynchronous Tasks Failing Silently
Before we look at examples of learning more about failed tasks in the process pool, let’s look at an example of a task issued asynchronously that fails silently.
In this example we will define a custom function to execute a task in the process pool. THe task will do some work then raise an exception. We will issue the task to the process pool asynchronously and wait for the task to complete. This will give no indication of whether the task completed successfully or not.
Firstly, we can define a custom task function.
The task will first block for a moment to simulate computational effort, then raise an exception. Finally, it prints a message, but this line of code is never reached.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(): # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) |
We then define a process pool with the default number of worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
Next, we will issue a single task to the process pool asynchronously via the apply_async() function. The function takes the name of our custom task function and returns an AsyncResult object in return.
1 2 3 |
... # issue an asynchronous task into the process pool result = pool.apply_async(task) |
You can learn more about issuing tasks asynchronously with apply_async() in the tutorial:
Next, we will wait for the issued task to complete by calling the wait() function on the returned AsyncResult object.
1 2 3 |
... # wait for all tasks to finish result.wait() |
Finally, we will report a message that all issued tasks are completed.
1 2 3 |
... # report done message print(f'Tasks done.', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of asynchronous tasks failing silently in the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue an asynchronous task into the process pool result = pool.apply_async(task) # wait for all tasks to finish result.wait() # report done message print(f'Tasks done.', flush=True) |
Running the example first creates the process pool with the default configuration.
Next, the task is issued to the process pool. The main process then blocks until the task is complete, successfully or otherwise.
The task begins executing, blocks for a moment, then fails by raising an exception.
No indication of the failure is reported.
The task finished unsuccessfully. The main process unblocks and continues on, reporting that all tasks are done.
1 |
Tasks done. |
Next, let’s look at how we might learn more about the task that failed silently.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Details of Tasks Failing Silently With an Error Callback
We can learn more about a task that fails silently in the process pool by adding an error callback function.
In this example, we will update the above example and add an error callback.
Firstly, we must define a function that is called when an issued task raises an error or exception.
The function must take an error object and can handle it in some way, such as reporting the error details to standard out.
The callback() function below implements this.
1 2 3 |
# error callback function def callback(error): print(f'Error: {error}', flush=True) |
Next, we need to configure the method that issues tasks to use an error callback if needed.
This can be achieved by setting the “error_callback” argument to the name of our custom callback function.
1 2 3 |
... # issue an asynchronous task into the process pool result = pool.apply_async(task, error_callback=callback) |
And that’s it.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of adding an error callback to prevent an asynchronous task from failing silently from time import sleep from multiprocessing.pool import Pool # error callback function def callback(error): print(f'Error: {error}', flush=True) # task executed in a worker process def task(): # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue an asynchronous task into the process pool result = pool.apply_async(task, error_callback=callback) # wait for all tasks to finish result.wait() # report done message print(f'Tasks done.', flush=True) |
Running the example first creates the process pool with the default configuration.
Next, the task is issued to the process pool. The main process then blocks until the task is complete, successfully or otherwise.
The task begins executing, blocks for a moment, then fails by raising an exception.
The error callback function is called, reporting the details of the exception that was raised.
The task finished unsuccessfully. The main process unblocks and continues on, reporting that all tasks are done.
This highlights how we can learn more about a task that fails silently in the process pool.
1 2 |
Error: Something bad happened Tasks done. |
Next, let’s look at an alternate way of learning more about a task that fails silently by getting the return value of the issued task.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Details of Tasks Failing Silently By Getting Results
We can explore how to learn more about a task that failed silently in the process pool by getting a return value from the task.
This approach is effective even if your target task function does not return a value, e.g. returns None.
We can update the above example where a task fails silently to explicitly get the result from the task which will in turn re-raise any exceptions raised in the task.
This requires that we call the get() function on the AsyncResult object returned when issuing the task asynchronously.
1 2 3 |
... # get task result result.get() |
This call will re-raise any exception in the target task function.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of getting task results to prevent an asynchronous task from failing silently from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue an asynchronous task into the process pool result = pool.apply_async(task) # wait for all tasks to finish result.wait() # get task result result.get() # report done message print(f'Tasks done.', flush=True) |
Running the example first creates the process pool with the default configuration.
Next, the task is issued to the process pool. The main process then blocks until the task is complete, successfully or otherwise.
The task begins executing, blocks for a moment, then fails by raising an exception.
No indication of the failure is reported.
The task finished unsuccessfully. The main process unblocks and continues on.
It then attempts to get the result from the issue task, which re-raises the exception raised in the target task function.
This highlights an alternate way of learning more about a task that failed silently in the process pool.
1 2 3 4 5 6 7 8 9 10 11 12 |
multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last): ... Exception: Something bad happened """ The above exception was the direct cause of the following exception: Traceback (most recent call last): ... Exception: Something bad happened |
Next, we will look at learning more about a silently failing task in the process pool by issuing it synchronously.
Details of Tasks Failing Silently By Making Them Synchronous
We can explore how to learn more about a task that fails silently in the process pool by issuing it synchronously instead of asynchronously.
In this example, we can update the above example where the task fails silently to issue the task synchronously. This requires changing the call to apply_async() to apply().
The task will be issued and the call will block until the result of the task is returned.
1 2 3 4 5 |
... # issue task into the process pool pool.apply(task) # report done message print(f'Tasks done.', flush=True) |
Importantly, because the result is returned as part of the call, any exceptions raised in the target task function will be re-raised in the calling process.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of making a task synchronous to prevent it from failing silently from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) # fail raise Exception('Something bad happened') # report a message print(f'Task done', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue task into the process pool pool.apply(task) # report done message print(f'Tasks done.', flush=True) |
Running the example first creates the process pool with the default configuration.
Next, the task is issued to the process pool and the main process blocks while the task is executing.
The begins executing, blocks for a moment, then fails by raising an exception.
The exception is then re-raised in the main process.
This highlights a final alternate way of learning more about a task that failed silently in the process pool.
1 2 3 4 5 6 7 8 9 10 11 12 |
multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last): ... Exception: Something bad happened """ The above exception was the direct cause of the following exception: Traceback (most recent call last): ... Exception: Something bad happened |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to learn more about asynchronous tasks that fail silently in the Python process pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?