Multiprocessing Pool Exception Handling in Python
You must handle exceptions when using the multiprocessing.pool.Pool in Python.
Exceptions may be raised when initializing worker processes, in target task processes, and in callback functions once tasks are completed.
In this tutorial you will discover how to handle exceptions in a Python multiprocessing pool.
Let's get started.
Multiprocessing Pool Exception Handling
Exception handling is an important consideration when using processes.
Code may raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.
There are three points you may need to consider exception handling when using the multiprocessing.pool.Pool, they are:
- Process Initialization
- Task Execution
- Task Completion Callbacks
Let's take a closer look at each point in turn.
Exception Handling in Worker Initialization
You can specify a custom initialization function when configuring your multiprocessing.pool.Pool.
This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function.
Each process started by the process pool will call your initialization function before starting the process.
For example:
# worker process initialization function
def worker_init():
# ...
...
# create a process pool and initialize workers
pool = multiprocessing.pool.Pool(initializer=worker_init)
You can learn more about configuring the pool with worker initializer functions in the tutorial:
If your initialization function raises an exception it will break your process pool.
We can demonstrate this with an example of a contrived initializer function that raises an exception.
# SuperFastPython.com
# example of an exception raised in the worker initializer function
from time import sleep
from multiprocessing.pool import Pool
# function for initializing the worker process
def init():
# raise an exception
raise Exception('Something bad happened!')
# task executed in a worker process
def task():
# block for a moment
sleep(1)
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool(initializer=init) as pool:
# issue a task
pool.apply(task)
Running the example fails with an exception, as we expected.
The process pool is created and nearly immediately, the internal child worker processes are created and initialized.
Each worker process fails to be initialized given that the initialization function raises an exception.
The process pool then attempts to restart new replacement child workers for each process that was started and failed. These too fail with exceptions.
The process repeats many times until some internal limit is reached and the program exits.
A truncated example of the output is listed below.
Process SpawnPoolWorker-1:
Traceback (most recent call last):
...
raise Exception('Something bad happened!')
Exception: Something bad happened!
This highlights that if you use a custom initializer function, that you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise out at risk all tasks that depend on the process pool.
Exception Handling in Task Execution
An exception may occur while executing your task.
This will cause the task to stop executing, but will not break the process pool.
If tasks were issued with a synchronous function, such as apply(), map(), or starmap() the exception will be re-raised in the caller.
If tasks are issued with an asynchronous function such as apply_async(), map_async(), or starmap_async(), an AsyncResult object will be returned. If a task issued asynchronously raises an exception, it will be caught by the process pool and re-raised if you call get() function in the AsyncResult object in order to get the result.
It means that you have two options for handling exceptions in tasks, they are:
- Handle exceptions within the task function.
- Handle exceptions when getting results from tasks.
Let's take a closer look at each approach in turn.
Exception Handling Within the Task
Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.
This could be via the return value from the function, e.g. None.
Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.
The example below defines a work task that will raise an exception, but will catch the exception and return a result indicating a failure case.
# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# block for a moment
sleep(1)
try:
raise Exception('Something bad happened!')
except Exception:
return 'Unable to get the result'
return 'Never gets here'
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issue a task
result = pool.apply_async(task)
# get the result
value = result.get()
# report the result
print(value)
Running the example starts the process pool as per normal, issues the task, then blocks waiting for the result.
The task raises an exception and the result received is an error message.
This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both synchronous and asynchronous functions like apply(), apply_async() and map().
It may require special handling of a custom return value for the failure case.
Unable to get the result
Exception Handling Outside the Task
An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.
This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for-loop.
It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.
The example below defines a simple task that raises an Exception, which is then handled by the recipient when issuing the task asynchronously and then attempting to get the result from the returned AsyncResult object.
# SuperFastPython.com
# example of handling an exception raised within a task in the caller
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# block for a moment
sleep(1)
# fail with an exception
raise Exception('Something bad happened!')
# unreachable return value
return 'Never gets here'
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issue a task
result = pool.apply_async(task)
# get the result
try:
value = result.get()
# report the result
print(value)
except Exception:
print('Unable to get the result')
Running the example creates the process pool and submits the work as per normal.
The task fails with an exception, the process pool catches the exception, stores it, then re-raises it when we call the get() function in the AsyncResult object.
The recipient of the result accepts the exception and catches it, reporting a failure case.
Unable to get the result
This approach will also work for any task issued synchronously to the process pool.
In this case, the exception raised by the task is caught by the process pool and re-raised in the caller when getting the result.
The example below demonstrates handling an exception in the caller for a task issued synchronously.
# SuperFastPython.com
# example of handling an exception raised within a task in the caller
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# block for a moment
sleep(1)
# fail with an exception
raise Exception('Something bad happened!')
# unreachable return value
return 'Never gets here'
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
try:
# issue a task and get the result
value = pool.apply(task)
# report the result
print(value)
except Exception:
print('Unable to get the result')
Running the example creates the process pool and issues the work as per normal.
The task fails with an error, the process pool catches the exception, stores it, then re-raises it in the caller rather than returning the value.
The recipient of the result accepts the exception and catches it, reporting a failure case.
Unable to get the result
Check for a Task Exception
We can also check for the exception directly via a call to the successful() function on the AsyncResult object for tasks issued asynchronously to the process pool.
This function must be called after the task has finished and indicates whether the task finished normally (True) or whether it failed with an Exception or similar (False).
We can demonstrate the explicit checking for an exceptional case in the task in the example below.
# SuperFastPython.com
# example of checking for an exception raised in the task
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# block for a moment
sleep(1)
# fail with an exception
raise Exception('Something bad happened!')
# unreachable return value
return 'Never gets here'
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issue a task
result = pool.apply_async(task)
# wait for the task to finish
result.wait()
# check for a failure
if result.successful():
# get the result
value = result.get()
# report the result
print(value)
else:
# report the failure case
print('Unable to get the result')
Running the example creates and submits the task as per normal.
The recipient waits for the task to complete then checks for an unsuccessful case.
The failure of the task is identified and an appropriate message is reported.
Unable to get the result
Exception Handling When Calling map()
We may issue many tasks to the process pool using the synchronous version of the map() function or starmap().
One or more of the issued tasks may fail, which will effectively cause all issued tasks to fail as the results will not be accessible.
We can demonstrate this with an example, listed below.
# SuperFastPython.com
# exception in one of many tasks issued to the process pool synchronously
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(value):
# block for a moment
sleep(1)
# check for failure case
if value == 2:
raise Exception('Something bad happened!')
# report a value
return value
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issues tasks to the process pool
for result in pool.map(task, range(5)):
print(result)
Running the example, creates the process pool and issues 5 tasks using map().
One of the 5 tasks fails with an exception.
The exception is then re-raised in the caller instead of returning the iterator over return values.
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
...
Exception: Something bad happened!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
...
Exception: Something bad happened!
This also happens when issuing tasks using the asynchronous versions of map(), such as map_async().
The example below demonstrates this.
# SuperFastPython.com
# exception in one of many tasks issued to the process pool asynchronously
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(value):
# block for a moment
sleep(1)
# check for failure case
if value == 2:
raise Exception('Something bad happened!')
# report a value
return value
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issues tasks to the process pool
result = pool.map_async(task, range(5))
# iterate over the results
for value in result.get():
print(value)
Running the example, creates the process pool and issues 5 tasks using map_async().
One of the 5 tasks fails with an exception.
The exception is then re-raised in the caller instead of returning the iterator over return values.
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
...
Exception: Something bad happened!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
...
Exception: Something bad happened!
If we issue tasks with imap() and imap_unordered(), the exception is not re-raised in the caller until the return value for the specific task that failed is requested from the returned iterator.
The example below demonstrates this.
# SuperFastPython.com
# exception in one of many tasks issued to the process pool synchronously
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(value):
# block for a moment
sleep(1)
# check for failure case
if value == 2:
raise Exception('Something bad happened!')
# report a value
return value
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issues tasks to the process pool
for result in pool.imap(task, range(5)):
print(result)
Running the example, creates the process pool and issues 5 tasks using map_async().
One of the 5 tasks fails with an exception.
We see return values for the first two tasks that complete successfully.
Then, when we access the result for the third task that failed, the exception is re-raised in the caller and the program is terminated.
0
1
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
...
Exception: Something bad happened!
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
...
Exception: Something bad happened!
These examples highlight that if map() or equivalents are used to issue tasks to the process pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.
Exception Handling in Task Completion Callbacks
A final case we must consider for exception handling when using the multiprocessing.Pool is in callback functions.
When issuing tasks to the process pool asynchronously with a call to apply_async() or map_async() we can add a callback function to be called with the result of the task or a callback function to call if there was an error in the task.
For example:
# result callback function
def result_callback(result):
print(result, flush=True)
...
# issue a single task
result = apply_async(..., callback=result_callback)
You can learn more about using callback function with asynchronous tasks in the tutorial:
The callback function is executed in a helper thread in the main process, the same process that creates the process pool.
If an exception is raised in the callback function, it will break the helper thread and in turn break the process pool.
Any tasks waiting for a result from the process pool will wait forever and will have to be killed manually.
We can demonstrate this with a worked example.
# SuperFastPython.com
# example in a callback function for the process pool
from time import sleep
from multiprocessing.pool import Pool
# callback function
def handler(result):
# report result
print(f'Got result {result}', flush=True)
# fail with an exception
raise Exception('Something bad happened!')
# task executed in a worker process
def task():
# block for a moment
sleep(1)
# return a value
return 22
# protect the entry point
if __name__ == '__main__':
# create a process pool
with Pool() as pool:
# issue a task to the process pool
result = pool.apply_async(task, callback=handler)
# wait for the task to finish
result.wait()
Running the example starts the process pool as per normal and issues the task.
When the task completes, the callback function is called which fails with a raised exception.
The helper thread (Thread-3 in this case) unwinds and brakes the process pool.
The caller in the main thread of the main process then waits forever for the result.
Note, you must terminate the program forcefully by pressing Control-C.
Got result 22
Exception in thread Thread-3:
Traceback (most recent call last):
...
Exception: Something bad happened!
This highlights that if callbacks are expected to raise an exception, that it must be handled explicitly otherwise it puts all the entire process pool at risk.
Takeaways
You now know how to handle exceptions when using the multiprocessing.Pool.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.