How to Handle Exceptions With the ThreadPoolExecutor in Python

November 18, 2021 Python ThreadPoolExecutor

You must handle exceptions when using the ThreadPoolExecutor in Python.

Exceptions may be raised when initializing worker threads, in target task threads, and in callback functions once tasks are completed.

In this tutorial, you will discover how to handle exceptions in a Python thread pool.

Let's get started.

ThreadPoolExecutor Exception Handling

Exception handling is an important consideration when using threads.

Code will raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.

Python threads are well suited for use with IO-bound tasks, and operations within these tasks often raise exceptions, such as if a server cannot be reached, if the network goes down, if a file cannot be found, and so on.

There are three points you may need to consider with regards to exception handling when using the ThreadPoolExecutor; they are:

Let's take a closer look at each point in turn.

Exception Handling in Thread Initialization

You can specify a custom initialization function when configuring your ThreadPoolExecutor.

This can be set via the initializer argument to specify the function name and initargs to specify a tuple of arguments to the function.

Each thread started by the thread pool will call your initialization function before starting the thread.

If your initialization function raises an exception, it will break your thread pool.

All current tasks and any future tasks executed by the thread pool will not run and will raise a BrokenThreadPool exception.

We can demonstrate this with an example of a contrived initializer function that raisees an exception.

# SuperFastPython.com
# example of an exception in a thread pool initializer function
from time import sleep
from random import random
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor

# function for initializing the worker thread
def initializer_worker():
    # raises an exception
    raise Exception('Something bad happened!')

# a mock task that sleeps for a random amount of time less than one second
def task(identifier):
    sleep(random())
    # get the unique name
    return identifier

# create a thread pool
with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
    # execute tasks
    for result in executor.map(task, range(10)):
        print(result)

Running the example fails with an exception, as we expected.

The thread pool is created as per normal, but as soon as we try to execute tasks, new worker threads are created and the custom worker thread initialization function is called and raises an exception.

Multiple threads attempted to start, and in turn, multiple threads failed with an Exception. Finally, the thread pool itself logged a message that the pool is broken and cannot be used any longer.

Exception in initializer:
Traceback (most recent call last):
...
    raise Exception('Something bad happened!')
Exception: Something bad happened!
Exception in initializer:
Traceback (most recent call last):
...
    raise Exception('Something bad happened!')
Exception: Something bad happened!
Traceback (most recent call last):
...
concurrent.futures.thread.BrokenThreadPool: A thread initializer failed, the thread pool is not usable anymore

This highlights that if you use a custom initializer function, you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise, you’ll risk all tasks that depend on the thread pool.

Exception Handling in Task Execution

An exception may occur while executing your task.

This will cause the task to stop executing, but will not break the thread pool. Instead, the exception will be caught by the thread pool and will be available via the Future object associated with the task via the exception() function.

Alternately, the exception will be re-raised if you call result() in the future in order to get the result. This will impact both calls to submit() and map() when adding tasks to the thread pool.

It means that you have two options for handling exceptions in tasks; they are:

  1. Handle exceptions within the task function.
  2. Handle exceptions when getting results from tasks.

Exception Handling Within the Task

Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.

This could be via the return value from the function, e.g. None.

Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.

The example below defines a work task that will raise an exception but will catch the exception and return a result indicating a failure case.

# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment
def work():
    sleep(1)
    try:
        raise Exception('Something bad happened!')
    except Exception:
        return 'Unable to get the result'
    return "never gets here"

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute our task
    future = executor.submit(work)
    # get the result from the task
    result = future.result()
    print(result)

Running the example starts the thread pool as per normal, issues the task, then blocks waiting for the result.

The task raises an exception and the result received is an error message.

This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both submit() and map(). It may require special handling of a custom return value for the failure case.

Unable to get the result

Exception Handling Outside the Task with submit()

An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.

This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for loop.

It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.

The example below defines a simple task that raises an Exception, which is then handled by the recipient when attempting to get the result from the function call.

# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('Something bad happened!')
    return "never gets here"

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute our task
    future = executor.submit(work)
    # get the result from the task
    try:
        result = future.result()
    except Exception:
        print('Unable to get the result')

Running the example creates the thread pool and submits the work as per normal. The task fails with an Exception, the thread pool catches the exception, stores it, then re-raises it when we call the result() function in the future.

The recipient of the result accepts the exception and catches it, reporting a failure case.

Unable to get the result

We can also check for the exception directly via a call to the exception() function on the Future object. This function blocks until an exception occurs and takes a timeout, just like a call to result().

If an exception never occurs and the task is cancelled or completes successfully, then exception() will return a value of None.

We can demonstrate the explicit checking for an exceptional case in the task in the example below.

# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('Something bad happened!')
    return "never gets here"

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute our task
    future = executor.submit(work)
    # get the result from the task
    exception = future.exception()
    # handle exceptional case
    if exception:
         print(exception)
    else:
        result = future.result()
        print(result)

Running the example creates and submits the work per normal.

The recipient checks for the exceptional case, which blocks until an exception is raised or the task is completed. An exception is received and is handled by reporting it.

Something bad happened!

Exception Handling Outside the Task with map()

What about tasks submitted via map()?

We cannot check the exception() function of the Future object for each task, as map() does not provide access to Future objects.

Worse still, the approach of handling the exception in the recipient cannot be used when using map() to submit tasks, unless you wrap the entire iteration.

We can demonstrate this by submitting one task with map() that happens to raise an Exception.

The complete example is listed below.

# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# mock task that will sleep for a moment
def work(value):
    sleep(1)
    raise Exception('Something bad happened!')
    return f'Never gets here {value}'

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute our task
    for result in executor.map(work, [1]):
        print(result)

Running the example submits the single task (a bad use for map()) and waits for the first result.

The task raises an exception and the main thread exits, as we expected.

Traceback (most recent call last):
...
    raise Exception('Something bad happened!')
Exception: Something bad happened!

This highlights that if map() is used to submit tasks to the thread pool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.

Exception Handling in Callbacks

A final case we must consider for exception handling when using the ThreadPoolExecutor is in callback functions.

When issuing tasks to the thread pool with a call to submit(), we receive a Future object in return on which we can register callback functions to call when the task completes via the add_done_callback() function.

This allows one or more callback functions to be registered that will be executed in the order in which they are registered.

You can learn more about adding callbacks to tasks in the ThreadPoolExecutor here:

These callbacks are always called, even if the task is cancelled or fails itself with an exception.

A callback can fail with an exception and it will not impact other callback functions that have been registered or the task.

The exception is caught by the thread pool, logged as an exception type message, and the procedure moves on. In a sense, callbacks are able to fail silently.

We can demonstrate this with a worked example with multiple callback functions, the first of which will raise an exception.

# SuperFastPython.com
# add callbacks to a future, one of which raises an exception
from time import sleep
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait

# callback function to call when a task is completed
def custom_callback1(future):
    raise Exception('Something bad happened!')
    # never gets here
    print('Callback 1 called.')

# callback function to call when a task is completed
def custom_callback2(future):
    print('Callback 2 called.')

# mock task that will sleep for a moment
def work():
    sleep(1)
    return 'Task is done'

# create a thread pool
with ThreadPoolExecutor() as executor:
    # execute the task
    future = executor.submit(work)
    # add the custom callbacks
    future.add_done_callback(custom_callback1)
    future.add_done_callback(custom_callback2)
    # wait for the task to complete and get the result
    result = future.result()
    # wait for callbacks to finish
    sleep(0.1)
    print(result)

Running the example starts the thread pool as per normal and executes the task.

When the task completes, the first callback is called, which fails with an exception. The exception is logged and reported on the console (the default behavior for logging).

The thread pool is not broken and carries on.

The second callback is called successfully, and finally, the main thread gets the result of the task.

exception calling callback for <Future at 0x101d76730 state=finished returned str>
Traceback (most recent call last):
...
    raise Exception('Something bad happened!')
Exception: Something bad happened!
Callback 2 called.
Task is done

This highlights that if callbacks are expected to raised an exception, they must be handled explicitly and checked for if you wish to have the failure impact the task itself.

Takeaways

You now know how to handle exceptions in the ThreadPoolExecutor in Python.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.