Last Updated on September 12, 2022
You must handle exceptions when using the ThreadPoolExecutor in Python.
Exceptions may be raised when initializing worker threads, in target task threads, and in callback functions once tasks are completed.
In this tutorial, you will discover how to handle exceptions in a Python thread pool.
Let’s get started.
ThreadPoolExecutor Exception Handling
Exception handling is an important consideration when using threads.
Code will raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.
Python threads are well suited for use with IO-bound tasks, and operations within these tasks often raise exceptions, such as if a server cannot be reached, if the network goes down, if a file cannot be found, and so on.
There are three points you may need to consider with regards to exception handling when using the ThreadPoolExecutor; they are:
- Thread Initialization
- Task Execution
- Task Completion Callbacks
Let’s take a closer look at each point in turn.
Run loops using all CPUs, download your FREE book to learn how.
Exception Handling in Thread Initialization
You can specify a custom initialization function when configuring your ThreadPoolExecutor.
This can be set via the initializer argument to specify the function name and initargs to specify a tuple of arguments to the function.
Each thread started by the thread pool will call your initialization function before starting the thread.
If your initialization function raises an exception, it will break your thread pool.
All current tasks and any future tasks executed by the thread pool will not run and will raise a BrokenThreadPool exception.
We can demonstrate this with an example of a contrived initializer function that raisees an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of an exception in a thread pool initializer function from time import sleep from random import random from threading import current_thread from concurrent.futures import ThreadPoolExecutor # function for initializing the worker thread def initializer_worker(): # raises an exception raise Exception('Something bad happened!') # a mock task that sleeps for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier # create a thread pool with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # execute tasks for result in executor.map(task, range(10)): print(result) |
Running the example fails with an exception, as we expected.
The thread pool is created as per normal, but as soon as we try to execute tasks, new worker threads are created and the custom worker thread initialization function is called and raises an exception.
Multiple threads attempted to start, and in turn, multiple threads failed with an Exception. Finally, the thread pool itself logged a message that the pool is broken and cannot be used any longer.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Exception in initializer: Traceback (most recent call last): ... raise Exception('Something bad happened!') Exception: Something bad happened! Exception in initializer: Traceback (most recent call last): ... raise Exception('Something bad happened!') Exception: Something bad happened! Traceback (most recent call last): ... concurrent.futures.thread.BrokenThreadPool: A thread initializer failed, the thread pool is not usable anymore |
This highlights that if you use a custom initializer function, you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise, you’ll risk all tasks that depend on the thread pool.
Exception Handling in Task Execution
An exception may occur while executing your task.
This will cause the task to stop executing, but will not break the thread pool. Instead, the exception will be caught by the thread pool and will be available via the Future object associated with the task via the exception() function.
Alternately, the exception will be re-raised if you call result() in the future in order to get the result. This will impact both calls to submit() and map() when adding tasks to the thread pool.
It means that you have two options for handling exceptions in tasks; they are:
- Handle exceptions within the task function.
- Handle exceptions when getting results from tasks.
Exception Handling Within the Task
Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.
This could be via the return value from the function, e.g. None.
Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.
The example below defines a work task that will raise an exception but will catch the exception and return a result indicating a failure case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ThreadPoolExecutor # mock task that will sleep for a moment def work(): sleep(1) try: raise Exception('Something bad happened!') except Exception: return 'Unable to get the result' return "never gets here" # create a thread pool with ThreadPoolExecutor() as executor: # execute our task future = executor.submit(work) # get the result from the task result = future.result() print(result) |
Running the example starts the thread pool as per normal, issues the task, then blocks waiting for the result.
The task raises an exception and the result received is an error message.
This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both submit() and map(). It may require special handling of a custom return value for the failure case.
1 |
Unable to get the result |
Exception Handling Outside the Task with submit()
An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.
This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for loop.
It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.
The example below defines a simple task that raises an Exception, which is then handled by the recipient when attempting to get the result from the function call.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ThreadPoolExecutor # mock task that will sleep for a moment def work(): sleep(1) raise Exception('Something bad happened!') return "never gets here" # create a thread pool with ThreadPoolExecutor() as executor: # execute our task future = executor.submit(work) # get the result from the task try: result = future.result() except Exception: print('Unable to get the result') |
Running the example creates the thread pool and submits the work as per normal. The task fails with an Exception, the thread pool catches the exception, stores it, then re-raises it when we call the result() function in the future.
The recipient of the result accepts the exception and catches it, reporting a failure case.
1 |
Unable to get the result |
We can also check for the exception directly via a call to the exception() function on the Future object. This function blocks until an exception occurs and takes a timeout, just like a call to result().
If an exception never occurs and the task is cancelled or completes successfully, then exception() will return a value of None.
We can demonstrate the explicit checking for an exceptional case in the task in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ThreadPoolExecutor # mock task that will sleep for a moment def work(): sleep(1) raise Exception('Something bad happened!') return "never gets here" # create a thread pool with ThreadPoolExecutor() as executor: # execute our task future = executor.submit(work) # get the result from the task exception = future.exception() # handle exceptional case if exception: print(exception) else: result = future.result() print(result) |
Running the example creates and submits the work per normal.
The recipient checks for the exceptional case, which blocks until an exception is raised or the task is completed. An exception is received and is handled by reporting it.
1 |
Something bad happened! |
Exception Handling Outside the Task with map()
What about tasks submitted via map()?
We cannot check the exception() function of the Future object for each task, as map() does not provide access to Future objects.
Worse still, the approach of handling the exception in the recipient cannot be used when using map() to submit tasks, unless you wrap the entire iteration.
We can demonstrate this by submitting one task with map() that happens to raise an Exception.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of handling an exception raised within a task from time import sleep from concurrent.futures import ThreadPoolExecutor # mock task that will sleep for a moment def work(value): sleep(1) raise Exception('Something bad happened!') return f'Never gets here {value}' # create a thread pool with ThreadPoolExecutor() as executor: # execute our task for result in executor.map(work, [1]): print(result) |
Running the example submits the single task (a bad use for map()) and waits for the first result.
The task raises an exception and the main thread exits, as we expected.
1 2 3 4 |
Traceback (most recent call last): ... raise Exception('Something bad happened!') Exception: Something bad happened! |
This highlights that if map() is used to submit tasks to the thread pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Exception Handling in Callbacks
A final case we must consider for exception handling when using the ThreadPoolExecutor is in callback functions.
When issuing tasks to the thread pool with a call to submit(), we receive a Future object in return on which we can register callback functions to call when the task completes via the add_done_callback() function.
This allows one or more callback functions to be registered that will be executed in the order in which they are registered.
You can learn more about adding callbacks to tasks in the ThreadPoolExecutor here:
These callbacks are always called, even if the task is cancelled or fails itself with an exception.
A callback can fail with an exception and it will not impact other callback functions that have been registered or the task.
The exception is caught by the thread pool, logged as an exception type message, and the procedure moves on. In a sense, callbacks are able to fail silently.
We can demonstrate this with a worked example with multiple callback functions, the first of which will raise an exception.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # add callbacks to a future, one of which raises an exception from time import sleep from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait # callback function to call when a task is completed def custom_callback1(future): raise Exception('Something bad happened!') # never gets here print('Callback 1 called.') # callback function to call when a task is completed def custom_callback2(future): print('Callback 2 called.') # mock task that will sleep for a moment def work(): sleep(1) return 'Task is done' # create a thread pool with ThreadPoolExecutor() as executor: # execute the task future = executor.submit(work) # add the custom callbacks future.add_done_callback(custom_callback1) future.add_done_callback(custom_callback2) # wait for the task to complete and get the result result = future.result() # wait for callbacks to finish sleep(0.1) print(result) |
Running the example starts the thread pool as per normal and executes the task.
When the task completes, the first callback is called, which fails with an exception. The exception is logged and reported on the console (the default behavior for logging).
The thread pool is not broken and carries on.
The second callback is called successfully, and finally, the main thread gets the result of the task.
1 2 3 4 5 6 7 |
exception calling callback for <Future at 0x101d76730 state=finished returned str> Traceback (most recent call last): ... raise Exception('Something bad happened!') Exception: Something bad happened! Callback 2 called. Task is done |
This highlights that if callbacks are expected to raised an exception, they must be handled explicitly and checked for if you wish to have the failure impact the task itself.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to handle exceptions in the ThreadPoolExecutor in Python.
Do you have any questions about how to handle exceptions?
Ask your question in the comments below and I will do my best to answer.
Photo by Daniel Frank on Unsplash
Artem Orlov says
Hello. Very helpful article. I’m using TreadPoolExcecutor in Databricks notebooks, running them in parallel. Question – how we can get more details from future like work description? e.g. Feature N started function X and failed.
Jason Brownlee says
Thanks!
Good question.
The Future API does not provide insight into the details of the task, which I agree is a pain.
A common approach is to create a map of future objects to details of the task they are executing, then look up details in the map using the future object when needed, such as when reporting results or handling errors.
Does that help?