Last Updated on October 29, 2022
You must handle exceptions when using the ThreadPool in Python.
Exceptions may be raised when initializing worker threads, in target task threads, and in callback functions once tasks are completed.
In this tutorial, you will discover how to handle exceptions in a Python ThreadPool.
Let’s get started.
ThreadPool Exception Handling
Exception handling is an important consideration when using threads.
Code may raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.
Python threads are well suited for use with IO-bound tasks, and operations within these tasks often raise exceptions, such as if a server cannot be reached, if the network goes down if a file cannot be found, and so on.
There are three points you may need to consider regarding exception handling when using the ThreadPool, they are:
- Worker Initialization
- Task Execution
- Task Completion Callbacks
Let’s take a closer look at each point in turn.
Run loops using all CPUs, download your FREE book to learn how.
Exception Handling in Worker Initialization
You can specify a custom initialization function when configuring your ThreadPool.
This can be set via the “initializer” argument to specify the function name and “initargs” to specify a tuple of arguments to the function.
Each thread started by the ThreadPool will call your initialization function before starting the thread.
For example:
1 2 3 4 5 6 7 |
# worker thread initialization function def worker_init(): # ... ... # create a thread pool and initialize workers pool = ThreadPool(initializer=worker_init) |
You can learn more about configuring the pool with worker initializer functions in the tutorial:
If your initialization function raises an exception it will break your ThreadPool.
We can demonstrate this with an example of a contrived initializer function that raises an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of an exception raised in the worker initializer function from time import sleep from multiprocessing.pool import ThreadPool # function for initializing the worker thread def init(): # raise an exception raise Exception('Something bad happened!') # task executed in a worker thread def task(): # block for a moment sleep(1) # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool(initializer=init) as pool: # issue a task pool.apply(task) |
Running the example fails with an exception, as we expected.
The ThreadPool is created and nearly immediately, the internal worker threads are created and initialized.
Each worker thread fails to be initialized given that the initialization function raises an exception.
The ThreadPool then attempts to restart new replacement thread workers for each thread that was started and failed. These too fail with exceptions.
The problem repeats many times until some internal limit is reached and the program exits.
A truncated example of the output is listed below.
1 2 3 4 5 6 7 8 9 10 |
Exception in thread Exception in thread Thread-2: Traceback (most recent call last): ... Exception in thread Thread-3: Traceback (most recent call last): ... Thread-1: Traceback (most recent call last): ... ... |
This highlights that if you use a custom initializer function, you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise out at risk for all tasks that depend on the ThreadPool.
Exception Handling in Task Execution
An exception may occur while executing your task.
This will cause the task to stop executing, but will not break the ThreadPool.
If tasks were issued with a synchronous function, such as apply(), map(), or starmap() the exception will be re-raised in the caller.
If tasks are issued with an asynchronous function such as apply_async(), map_async(), or starmap_async(), an AsyncResult object will be returned. If a task issued asynchronously raises an exception, it will be caught by the ThreadPool and re-raised if you call get() function in the AsyncResult object in order to get the result.
It means that you have two options for handling exceptions in tasks, they are:
- Handle exceptions within the task function.
- Handle exceptions when getting results from tasks.
Let’s take a closer look at each approach in turn.
Exception Handling Within the Task
Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.
This could be via the return value from the function, e.g. None.
Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.
The example below defines a work task that will raise an exception but will catch the exception and return a result indicating a failure case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # block for a moment sleep(1) try: raise Exception('Something bad happened!') except Exception: return 'Unable to get the result' return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issue a task result = pool.apply_async(task) # get the result value = result.get() # report the result print(value) |
Running the example starts the ThreadPool as per normal, issues the task, then blocks waiting for the result.
The task raises an exception and the result received is an error message.
This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both synchronous and asynchronous functions like apply(), apply_async() and map().
It may require special handling of a custom return value for the failure case.
1 |
Unable to get the result |
Exception Handling Outside the Task
An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.
This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for-loop.
It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.
The example below defines a simple task that raises an Exception, which is then handled by the recipient when issuing the task asynchronously and then attempting to get the result from the returned AsyncResult object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of handling an exception raised within a task in the caller from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # block for a moment sleep(1) # fail with an exception raise Exception('Something bad happened!') # unreachable return value return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issue a task result = pool.apply_async(task) # get the result try: value = result.get() # report the result print(value) except Exception: print('Unable to get the result') |
Running the example creates the ThreadPool and submits the work as per normal.
The task fails with an exception, the ThreadPool catches the exception, stores it, then re-raises it when we call the get() function in the AsyncResult object.
The recipient of the result accepts the exception and catches it, reporting a failure case.
1 |
Unable to get the result |
This approach will also work for any task issued synchronously to the ThreadPool.
In this case, the exception raised by the task is caught by the ThreadPool and re-raised in the caller when getting the result.
The example below demonstrates handling an exception in the caller for a task issued synchronously.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of handling an exception raised within a task in the caller from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # block for a moment sleep(1) # fail with an exception raise Exception('Something bad happened!') # unreachable return value return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: try: # issue a task and get the result value = pool.apply(task) # report the result print(value) except Exception: print('Unable to get the result') |
Running the example creates the ThreadPool and issues the work as per normal.
The task fails with an error, the ThreadPool catches the exception, stores it, then re-raises it in the caller rather than returning the value.
The recipient of the result accepts the exception and catches it, reporting a failure case.
1 |
Unable to get the result |
Check for a Task Exception
We can also check for the exception directly via a call to the successful() function on the AsyncResult object for tasks issued asynchronously to the ThreadPool.
This function must be called after the task has finished and indicates whether the task finished normally (True) or whether it failed with an Exception or similar (False).
We can demonstrate the explicit checking for an exceptional case in the task in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of checking for an exception raised in the task from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # block for a moment sleep(1) # fail with an exception raise Exception('Something bad happened!') # unreachable return value return 'Never gets here' # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issue a task result = pool.apply_async(task) # wait for the task to finish result.wait() # check for a failure if result.successful(): # get the result value = result.get() # report the result print(value) else: # report the failure case print('Unable to get the result') |
Running the example creates and submits the task as per normal.
The recipient waits for the task to complete and then checks for an unsuccessful case.
The failure of the task is identified and an appropriate message is reported.
1 |
Unable to get the result |
Exception Handling When Calling map()
We may issue many tasks to the ThreadPool using the synchronous version of the map() function or starmap().
One or more of the issued tasks may fail, which will effectively cause all issued tasks to fail as the results will not be accessible.
We can demonstrate this with an example, listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # exception in one of many tasks issued to the thread pool synchronously from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(value): # block for a moment sleep(1) # check for failure case if value == 2: raise Exception('Something bad happened!') # report a value return value # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issues tasks to the thread pool for result in pool.map(task, range(5)): print(result) |
Running the example creates the ThreadPool and issues 5 tasks using map().
One of the 5 tasks fails with an exception.
The exception is then re-raised in the caller instead of returning the iterator over return values.
1 2 3 |
Traceback (most recent call last): ... Exception: Something bad happened! |
This also happens when issuing tasks using the asynchronous versions of map(), such as map_async().
The example below demonstrates this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # exception in one of many tasks issued to the thread pool asynchronously from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(value): # block for a moment sleep(1) # check for failure case if value == 2: raise Exception('Something bad happened!') # report a value return value # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issues tasks to the thread pool result = pool.map_async(task, range(5)) # iterate over the results for value in result.get(): print(value) |
Running the example creates the ThreadPool and issues 5 tasks using map_async().
One of the 5 tasks fails with an exception.
The exception is then re-raised in the caller instead of returning the iterator over return values.
1 2 3 |
Traceback (most recent call last): ... Exception: Something bad happened! |
If we issue tasks with imap() and imap_unordered(), the exception is not re-raised in the caller until the return value for the specific task that failed is requested from the returned iterator.
The example below demonstrates this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # exception in one of many tasks issued to the thread pool synchronously from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(value): # block for a moment sleep(1) # check for failure case if value == 2: raise Exception('Something bad happened!') # report a value return value # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issues tasks to the thread pool for result in pool.imap(task, range(5)): print(result) |
Running the example creates the ThreadPool and issues 5 tasks using imap().
One of the 5 tasks fails with an exception.
We see return values for the first two tasks that complete successfully.
Then, when we access the result for the third task that failed, the exception is re-raised in the caller and the program is terminated.
1 2 3 4 5 |
0 1 Traceback (most recent call last): ... Exception: Something bad happened! |
These examples highlight that if map() or equivalents are used to issue tasks to the ThreadPool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Exception Handling in Task Completion Callbacks
A final case we must consider for exception handling when using the ThreadPool is in callback functions.
When issuing tasks to the ThreadPool asynchronously with a call to apply_async() or map_async() we can add a callback function to be called with the result of the task or a callback function to call if there was an error in the task.
For example:
1 2 3 4 5 6 7 |
# result callback function def result_callback(result): print(result) ... # issue a single task result = apply_async(..., callback=result_callback) |
You can learn more about using callback function with asynchronous tasks in the tutorial:
The callback function is executed in a helper thread in the main thread, the same thread that creates the ThreadPool.
If an exception is raised in the callback function, it will break the helper thread and in turn break the ThreadPool.
Any tasks waiting for a result from the ThreadPool will wait forever and will have to be killed manually.
We can demonstrate this with a worked example.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example in a callback function for the thread pool from time import sleep from multiprocessing.pool import ThreadPool # callback function def handler(result): # report result print(f'Got result {result}') # fail with an exception raise Exception('Something bad happened!') # task executed in a worker thread def task(): # block for a moment sleep(1) # return a value return 22 # protect the entry point if __name__ == '__main__': # create a thread pool with ThreadPool() as pool: # issue a task to the thread pool result = pool.apply_async(task, callback=handler) # wait for the task to finish result.wait() |
Running the example starts the ThreadPool as per normal and issues the task.
When the task completes, the callback function is called which fails with a raised exception.
The helper thread (Thread-3 in this case) unwinds and breaks the ThreadPool.
The caller in the main thread of the main thread then waits forever for the result.
Note, that you must terminate the program forcefully by pressing Control-C.
1 2 3 4 5 |
Got result 22 Exception in thread Thread-11: Traceback (most recent call last): ... Exception: Something bad happened! |
This highlights that if callbacks are expected to raise an exception, it must be handled explicitly otherwise it puts the entire thread pool at risk.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to handle exceptions when using the ThreadPool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Messala Ciulla on Unsplash
Do you have any questions?