Last Updated on September 12, 2022
You must handle exceptions when using the ProcessPoolExecutor in Python.
Exceptions may be raised when initializing worker processes, in target task processes, and in callback functions once tasks are completed.
In this tutorial you will discover how to handle exceptions in a Python process pool.
Let’s get started.
ProcessPoolExecutor Exception Handling
Exception handling is an important consideration when using processes.
Code may raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.
Python processes are well suited for use with IO-bound tasks, and operations within these tasks often raise exceptions, such as if a server cannot be reached, if the network goes down, if a file cannot be found, and so on.
There are three points you may need to consider exception handling when using the ProcessPoolExecutor, they are:
- Process Initialization
- Task Execution
- Task Completion Callbacks
Let’s take a closer look at each point in turn.
Run loops using all CPUs, download your FREE book to learn how.
Exception Handling in Process Initialization
You can specify a custom initialization function when configuring your ProcessPoolExecutor.
This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function.
Each process started by the process pool will call your initialization function before starting the process.
If your initialization function raises an exception it will break your process pool.
All current tasks and any future tasks executed by the process pool will not run and will raise a BrokenProcessPool exception.
We can demonstrate this with an example of a contrived initializer function that raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of an exception in a process pool initializer function from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # function for initializing the worker process def initializer_worker(): # raise an exception raise Exception('Something bad happened!', flush=True) # a task that sleeps for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier # entry point def main(): # create a process pool with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # execute tasks for result in executor.map(task, range(10)): print(result) if __name__ == '__main__': main() |
Running the example fails with an exception, as we expected.
The process pool is created as per normal, but as soon as we try to execute tasks, new worker processes are created, the custom worker process initialization function is called and raises an exception.
Multiple processes attempt to start, and in turn, multiple processes fail with an Exception. Finally, the process pool itself logged a message that the pool is broken and cannot be used any longer.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Exception in initializer: Traceback (most recent call last): ... raise Exception('Something bad happened!', flush=True) TypeError: Exception() takes no keyword arguments Exception in initializer: Traceback (most recent call last): ... raise Exception('Something bad happened!', flush=True) TypeError: Exception() takes no keyword arguments Traceback (most recent call last): ... concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending. |
This highlights that if you use a custom initializer function, that you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise out at risk all tasks that depend on the process pool.
Exception Handling in Task Execution
An exception may occur while executing your task.
This will cause the task to stop executing, but will not break the process pool. Instead, the exception will be caught by the process pool and will be available via the Future object associated with the task via the exception() function.
Alternately, the exception will be re-raised if you call result() in the future in order to get the result. This will impact both calls to submit() and map() when adding tasks to the process pool.
It means that you have two options for handling exceptions in tasks, they are:
- Handle exceptions within the task function.
- Handle exceptions when getting results from tasks.
Exception Handling Within the Task
Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.
This could be via the return value from the function, e.g. None.
Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.
The example below defines a work task that will raise an exception, but will catch the exception and return a result indicating a failure case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ProcessPoolExecutor # task that will sleep for a moment def work(): sleep(1) try: raise Exception('Something bad happened!') except Exception: return 'Unable to get the result' return "never gets here" # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # execute our task future = executor.submit(work) # get the result from the task result = future.result() print(result) if __name__ == '__main__': main() |
Running the example starts the process pool as per normal, issues the task, then blocks waiting for the result.
The task raises an exception and the result received is an error message.
This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both submit() and map(). It may require special handling of a custom return value for the failure case.
1 |
Unable to get the result |
Exception Handling Outside the Task
An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.
This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for-loop.
It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.
The example below defines a simple task that raises an Exception, which is then handled by the recipient when attempting to get the result from the function call.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ProcessPoolExecutor # task that will sleep for a moment def work(): sleep(1) raise Exception('Something bad happened!') return "never gets here" # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # execute our task future = executor.submit(work) # get the result from the task try: result = future.result() except Exception: print('Unable to get the result') if __name__ == '__main__': main() |
Running the example creates the process pool and submits the work as per normal. The task fails with an error, the process pool catches the exception, stores it, then re-raises it when we call the result() function in the future.
The recipient of the result accepts the exception and catches it, reporting a failure case.
1 |
Unable to get the result |
Check for a Task Exception
We can also check for the exception directly via a call to the exception() function on the Future object. This function blocks until an exception occurs and takes a timeout argument, just like a call to result().
If an exception never occurs and the task is cancelled or completes successfully, then exception() will return a value of None.
We can demonstrate the explicit checking for an exceptional case in the task in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ProcessPoolExecutor # mock task that will sleep for a moment def work(): sleep(1) raise Exception('Something bad happened!') return "never gets here" # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # execute our task future = executor.submit(work) # get the result from the task exception = future.exception() # handle exceptional case if exception: print(exception) else: result = future.result() print(result) if __name__ == '__main__': main() |
Running the example creates and submits the work per normal.
The recipient checks for the exceptional case which blocks until an exception is raised or the task is completed. An exception is received and is handled by reporting it.
1 |
Something bad happened! |
Exception Handling When Calling map()
We cannot check the exception() function of the Future object for each task, as map() does not provide access to Future objects.
Worse still, the approach of handling the exception in the recipient cannot be used when using map() to submit tasks, unless you wrap the entire iteration.
We can demonstrate this by submitting one task with map() that happens to raise an Exception.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ProcessPoolExecutor # mock task that will sleep for a moment def work(value): sleep(1) raise Exception('Something bad happened!') return f'Never gets here {value}' # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # execute our task for result in executor.map(work, [1]): print(result) if __name__ == '__main__': main() |
Running the example submits the single task (a bad use for map()) and waits for the first result.
The task raises an exception and the main process exits, as we expected.
1 2 3 4 5 6 7 8 9 10 |
concurrent.futures.process._RemoteTraceback: ... Exception: Something bad happened! """ The above exception was the direct cause of the following exception: Traceback (most recent call last): ... Exception: Something bad happened! |
This highlights that if map() is used to submit tasks to the process pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Exception Handling in Task Completion Callbacks
A final case we must consider for exception handling when using the ProcessPoolExecutor is in callback functions.
When issuing tasks to the process pool with a call to submit() we receive a Future object in return on which we can register callback functions to call when the task completes via the add_done_callback() function.
This allows one or more callback functions to be registered that will be executed in the order in which they are registered.
These callbacks are always called, even if the task is cancelled or fails itself with an exception.
A callback can fail with an exception and it will not impact other callback functions that have been registered or the task.
The exception is caught by the process pool, logged as an exception type message and the procedure moves on. In a sense, callbacks are able to fail silently.
We can demonstrate this with a worked example with multiple callback functions, the first of which will raise an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # add callbacks to a future, one of which raises an exception from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # callback function to call when a task is completed def custom_callback1(future): raise Exception('Something bad happened!') # never gets here print('Callback 1 called.', flush=True) # callback function to call when a task is completed def custom_callback2(future): print('Callback 2 called.', flush=True) # task that will sleep for a moment def work(): sleep(1) return 'Task is done' # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # execute the task future = executor.submit(work) # add the custom callbacks future.add_done_callback(custom_callback1) future.add_done_callback(custom_callback2) # wait for the task to complete and get the result result = future.result() # wait for callbacks to finish sleep(0.1) print(result) if __name__ == '__main__': main() |
Running the example starts the process pool as per normal and executes the task.
When the task completes, the first callback is called which fails with a raised exception. The exception is logged and reported on the console (the default behavior for logging).
The process pool is not broken and carries on.
The second callback is called successfully, and finally the main process gets the result of the task.
1 2 3 4 5 6 |
exception calling callback for <Future at 0x10dc8a9a0 state=finished returned str> ... raise Exception('Something bad happened!') Exception: Something bad happened! Callback 2 called. Task is done |
This highlights that if callbacks are expected to raise an exception, that it must be handled explicitly and checked for if you wish to have the failure impact the task itself.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know how to handle exceptions in the ProcessPoolExecutor in Python.
Do you have any questions?
Ask your question in the comments below and I will do my best to answer.
Photo by Lacie Slezak on Unsplash
Do you have any questions?