Last Updated on September 12, 2022
You must handle exceptions when using the multiprocessing.pool.Pool in Python.
Exceptions may be raised when initializing worker processes, in target task processes, and in callback functions once tasks are completed.
In this tutorial you will discover how to handle exceptions in a Python multiprocessing pool.
Let’s get started.
Multiprocessing Pool Exception Handling
Exception handling is an important consideration when using processes.
Code may raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.
There are three points you may need to consider exception handling when using the multiprocessing.pool.Pool, they are:
- Process Initialization
- Task Execution
- Task Completion Callbacks
Let’s take a closer look at each point in turn.
Run loops using all CPUs, download your FREE book to learn how.
Exception Handling in Worker Initialization
You can specify a custom initialization function when configuring your multiprocessing.pool.Pool.
This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function.
Each process started by the process pool will call your initialization function before starting the process.
For example:
1 2 3 4 5 6 7 |
# worker process initialization function def worker_init(): # ... ... # create a process pool and initialize workers pool = multiprocessing.pool.Pool(initializer=worker_init) |
You can learn more about configuring the pool with worker initializer functions in the tutorial:
If your initialization function raises an exception it will break your process pool.
We can demonstrate this with an example of a contrived initializer function that raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of an exception raised in the worker initializer function from time import sleep from multiprocessing.pool import Pool # function for initializing the worker process def init(): # raise an exception raise Exception('Something bad happened!') # task executed in a worker process def task(): # block for a moment sleep(1) # protect the entry point if __name__ == '__main__': # create a process pool with Pool(initializer=init) as pool: # issue a task pool.apply(task) |
Running the example fails with an exception, as we expected.
The process pool is created and nearly immediately, the internal child worker processes are created and initialized.
Each worker process fails to be initialized given that the initialization function raises an exception.
The process pool then attempts to restart new replacement child workers for each process that was started and failed. These too fail with exceptions.
The process repeats many times until some internal limit is reached and the program exits.
A truncated example of the output is listed below.
1 2 3 4 5 |
Process SpawnPoolWorker-1: Traceback (most recent call last): ... raise Exception('Something bad happened!') Exception: Something bad happened! |
This highlights that if you use a custom initializer function, that you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise out at risk all tasks that depend on the process pool.
Exception Handling in Task Execution
An exception may occur while executing your task.
This will cause the task to stop executing, but will not break the process pool.
If tasks were issued with a synchronous function, such as apply(), map(), or starmap() the exception will be re-raised in the caller.
If tasks are issued with an asynchronous function such as apply_async(), map_async(), or starmap_async(), an AsyncResult object will be returned. If a task issued asynchronously raises an exception, it will be caught by the process pool and re-raised if you call get() function in the AsyncResult object in order to get the result.
It means that you have two options for handling exceptions in tasks, they are:
- Handle exceptions within the task function.
- Handle exceptions when getting results from tasks.
Let’s take a closer look at each approach in turn.
Exception Handling Within the Task
Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.
This could be via the return value from the function, e.g. None.
Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.
The example below defines a work task that will raise an exception, but will catch the exception and return a result indicating a failure case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) try: raise Exception('Something bad happened!') except Exception: return 'Unable to get the result' return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issue a task result = pool.apply_async(task) # get the result value = result.get() # report the result print(value) |
Running the example starts the process pool as per normal, issues the task, then blocks waiting for the result.
The task raises an exception and the result received is an error message.
This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both synchronous and asynchronous functions like apply(), apply_async() and map().
It may require special handling of a custom return value for the failure case.
1 |
Unable to get the result |
Exception Handling Outside the Task
An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.
This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for-loop.
It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.
The example below defines a simple task that raises an Exception, which is then handled by the recipient when issuing the task asynchronously and then attempting to get the result from the returned AsyncResult object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of handling an exception raised within a task in the caller from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) # fail with an exception raise Exception('Something bad happened!') # unreachable return value return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issue a task result = pool.apply_async(task) # get the result try: value = result.get() # report the result print(value) except Exception: print('Unable to get the result') |
Running the example creates the process pool and submits the work as per normal.
The task fails with an exception, the process pool catches the exception, stores it, then re-raises it when we call the get() function in the AsyncResult object.
The recipient of the result accepts the exception and catches it, reporting a failure case.
1 |
Unable to get the result |
This approach will also work for any task issued synchronously to the process pool.
In this case, the exception raised by the task is caught by the process pool and re-raised in the caller when getting the result.
The example below demonstrates handling an exception in the caller for a task issued synchronously.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of handling an exception raised within a task in the caller from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) # fail with an exception raise Exception('Something bad happened!') # unreachable return value return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: try: # issue a task and get the result value = pool.apply(task) # report the result print(value) except Exception: print('Unable to get the result') |
Running the example creates the process pool and issues the work as per normal.
The task fails with an error, the process pool catches the exception, stores it, then re-raises it in the caller rather than returning the value.
The recipient of the result accepts the exception and catches it, reporting a failure case.
1 |
Unable to get the result |
Check for a Task Exception
We can also check for the exception directly via a call to the successful() function on the AsyncResult object for tasks issued asynchronously to the process pool.
This function must be called after the task has finished and indicates whether the task finished normally (True) or whether it failed with an Exception or similar (False).
We can demonstrate the explicit checking for an exceptional case in the task in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of checking for an exception raised in the task from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # block for a moment sleep(1) # fail with an exception raise Exception('Something bad happened!') # unreachable return value return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issue a task result = pool.apply_async(task) # wait for the task to finish result.wait() # check for a failure if result.successful(): # get the result value = result.get() # report the result print(value) else: # report the failure case print('Unable to get the result') |
Running the example creates and submits the task as per normal.
The recipient waits for the task to complete then checks for an unsuccessful case.
The failure of the task is identified and an appropriate message is reported.
1 |
Unable to get the result |
Exception Handling When Calling map()
We may issue many tasks to the process pool using the synchronous version of the map() function or starmap().
One or more of the issued tasks may fail, which will effectively cause all issued tasks to fail as the results will not be accessible.
We can demonstrate this with an example, listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # exception in one of many tasks issued to the process pool synchronously from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(value): # block for a moment sleep(1) # check for failure case if value == 2: raise Exception('Something bad happened!') # report a value return value # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issues tasks to the process pool for result in pool.map(task, range(5)): print(result) |
Running the example, creates the process pool and issues 5 tasks using map().
One of the 5 tasks fails with an exception.
The exception is then re-raised in the caller instead of returning the iterator over return values.
1 2 3 4 5 6 7 8 9 10 11 12 |
multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last): ... Exception: Something bad happened! """ The above exception was the direct cause of the following exception: Traceback (most recent call last): ... Exception: Something bad happened! |
This also happens when issuing tasks using the asynchronous versions of map(), such as map_async().
The example below demonstrates this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # exception in one of many tasks issued to the process pool asynchronously from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(value): # block for a moment sleep(1) # check for failure case if value == 2: raise Exception('Something bad happened!') # report a value return value # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issues tasks to the process pool result = pool.map_async(task, range(5)) # iterate over the results for value in result.get(): print(value) |
Running the example, creates the process pool and issues 5 tasks using map_async().
One of the 5 tasks fails with an exception.
The exception is then re-raised in the caller instead of returning the iterator over return values.
1 2 3 4 5 6 7 8 9 10 11 12 |
multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last): ... Exception: Something bad happened! """ The above exception was the direct cause of the following exception: Traceback (most recent call last): ... Exception: Something bad happened! |
If we issue tasks with imap() and imap_unordered(), the exception is not re-raised in the caller until the return value for the specific task that failed is requested from the returned iterator.
The example below demonstrates this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # exception in one of many tasks issued to the process pool synchronously from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(value): # block for a moment sleep(1) # check for failure case if value == 2: raise Exception('Something bad happened!') # report a value return value # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issues tasks to the process pool for result in pool.imap(task, range(5)): print(result) |
Running the example, creates the process pool and issues 5 tasks using map_async().
One of the 5 tasks fails with an exception.
We see return values for the first two tasks that complete successfully.
Then, when we access the result for the third task that failed, the exception is re-raised in the caller and the program is terminated.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
0 1 multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last): ... Exception: Something bad happened! """ The above exception was the direct cause of the following exception: Traceback (most recent call last): ... Exception: Something bad happened! |
These examples highlight that if map() or equivalents are used to issue tasks to the process pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Exception Handling in Task Completion Callbacks
A final case we must consider for exception handling when using the multiprocessing.Pool is in callback functions.
When issuing tasks to the process pool asynchronously with a call to apply_async() or map_async() we can add a callback function to be called with the result of the task or a callback function to call if there was an error in the task.
For example:
1 2 3 4 5 6 7 |
# result callback function def result_callback(result): print(result, flush=True) ... # issue a single task result = apply_async(..., callback=result_callback) |
You can learn more about using callback function with asynchronous tasks in the tutorial:
The callback function is executed in a helper thread in the main process, the same process that creates the process pool.
If an exception is raised in the callback function, it will break the helper thread and in turn break the process pool.
Any tasks waiting for a result from the process pool will wait forever and will have to be killed manually.
We can demonstrate this with a worked example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example in a callback function for the process pool from time import sleep from multiprocessing.pool import Pool # callback function def handler(result): # report result print(f'Got result {result}', flush=True) # fail with an exception raise Exception('Something bad happened!') # task executed in a worker process def task(): # block for a moment sleep(1) # return a value return 22 # protect the entry point if __name__ == '__main__': # create a process pool with Pool() as pool: # issue a task to the process pool result = pool.apply_async(task, callback=handler) # wait for the task to finish result.wait() |
Running the example starts the process pool as per normal and issues the task.
When the task completes, the callback function is called which fails with a raised exception.
The helper thread (Thread-3 in this case) unwinds and brakes the process pool.
The caller in the main thread of the main process then waits forever for the result.
Note, you must terminate the program forcefully by pressing Control-C.
1 2 3 4 5 |
Got result 22 Exception in thread Thread-3: Traceback (most recent call last): ... Exception: Something bad happened! |
This highlights that if callbacks are expected to raise an exception, that it must be handled explicitly otherwise it puts all the entire process pool at risk.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to handle exceptions when using the multiprocessing.Pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by engin akyurt on Unsplash
Do you have any questions?