ThreadPool Exception Handling in Python

October 15, 2022 Python ThreadPool

You must handle exceptions when using the ThreadPool in Python.

Exceptions may be raised when initializing worker threads, in target task threads, and in callback functions once tasks are completed.

In this tutorial, you will discover how to handle exceptions in a Python ThreadPool.

Let's get started.

ThreadPool Exception Handling

Exception handling is an important consideration when using threads.

Code may raise an exception when something unexpected happens and the exception should be dealt with by your application explicitly, even if it means logging it and moving on.

Python threads are well suited for use with IO-bound tasks, and operations within these tasks often raise exceptions, such as if a server cannot be reached, if the network goes down if a file cannot be found, and so on.

There are three points you may need to consider regarding exception handling when using the ThreadPool, they are:

  1. Worker Initialization
  2. Task Execution
  3. Task Completion Callbacks

Let's take a closer look at each point in turn.

Exception Handling in Worker Initialization

You can specify a custom initialization function when configuring your ThreadPool.

This can be set via the "initializer" argument to specify the function name and "initargs" to specify a tuple of arguments to the function.

Each thread started by the ThreadPool will call your initialization function before starting the thread.

For example:

# worker thread initialization function
def worker_init():
	# ...

...
# create a thread pool and initialize workers
pool = ThreadPool(initializer=worker_init)

You can learn more about configuring the pool with worker initializer functions in the tutorial:

If your initialization function raises an exception it will break your ThreadPool.

We can demonstrate this with an example of a contrived initializer function that raises an exception.

# SuperFastPython.com
# example of an exception raised in the worker initializer function
from time import sleep
from multiprocessing.pool import ThreadPool

# function for initializing the worker thread
def init():
    # raise an exception
    raise Exception('Something bad happened!')

# task executed in a worker thread
def task():
    # block for a moment
    sleep(1)

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool(initializer=init) as pool:
        # issue a task
        pool.apply(task)

Running the example fails with an exception, as we expected.

The ThreadPool is created and nearly immediately, the internal worker threads are created and initialized.

Each worker thread fails to be initialized given that the initialization function raises an exception.

The ThreadPool then attempts to restart new replacement thread workers for each thread that was started and failed. These too fail with exceptions.

The problem repeats many times until some internal limit is reached and the program exits.

A truncated example of the output is listed below.

Exception in thread Exception in thread Thread-2:
Traceback (most recent call last):
  ...
Exception in thread Thread-3:
Traceback (most recent call last):
  ...
Thread-1:
Traceback (most recent call last):
  ...
...

This highlights that if you use a custom initializer function, you must carefully consider the exceptions that may be raised and perhaps handle them, otherwise out at risk for all tasks that depend on the ThreadPool.

Exception Handling in Task Execution

An exception may occur while executing your task.

This will cause the task to stop executing, but will not break the ThreadPool.

If tasks were issued with a synchronous function, such as apply(), map(), or starmap() the exception will be re-raised in the caller.

If tasks are issued with an asynchronous function such as apply_async(), map_async(), or starmap_async(), an AsyncResult object will be returned. If a task issued asynchronously raises an exception, it will be caught by the ThreadPool and re-raised if you call get() function in the AsyncResult object in order to get the result.

It means that you have two options for handling exceptions in tasks, they are:

  1. Handle exceptions within the task function.
  2. Handle exceptions when getting results from tasks.

Let's take a closer look at each approach in turn.

Exception Handling Within the Task

Handling the exception within the task means that you need some mechanism to let the recipient of the result know that something unexpected happened.

This could be via the return value from the function, e.g. None.

Alternatively, you can re-raise an exception and have the recipient handle it directly. A third option might be to use some broader state or global state, perhaps passed by reference into the call to the function.

The example below defines a work task that will raise an exception but will catch the exception and return a result indicating a failure case.

# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task():
    # block for a moment
    sleep(1)
    try:
        raise Exception('Something bad happened!')
    except Exception:
        return 'Unable to get the result'
    return 'Never gets here'

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issue a task
        result = pool.apply_async(task)
        # get the result
        value = result.get()
        # report the result
        print(value)

Running the example starts the ThreadPool as per normal, issues the task, then blocks waiting for the result.

The task raises an exception and the result received is an error message.

This approach is reasonably clean for the recipient code and would be appropriate for tasks issued by both synchronous and asynchronous functions like apply(), apply_async() and map().

It may require special handling of a custom return value for the failure case.

Unable to get the result

Exception Handling Outside the Task

An alternative to handling the exception in the task is to leave the responsibility to the recipient of the result.

This may feel like a more natural solution, as it matches the synchronous version of the same operation, e.g. if we were performing the function call in a for-loop.

It means that the recipient must be aware of the types of errors that the task may raise and handle them explicitly.

The example below defines a simple task that raises an Exception, which is then handled by the recipient when issuing the task asynchronously and then attempting to get the result from the returned AsyncResult object.

# SuperFastPython.com
# example of handling an exception raised within a task in the caller
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task():
    # block for a moment
    sleep(1)
    # fail with an exception
    raise Exception('Something bad happened!')
    # unreachable return value
    return 'Never gets here'

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issue a task
        result = pool.apply_async(task)
        # get the result
        try:
            value = result.get()
            # report the result
            print(value)
        except Exception:
            print('Unable to get the result')

Running the example creates the ThreadPool and submits the work as per normal.

The task fails with an exception, the ThreadPool catches the exception, stores it, then re-raises it when we call the get() function in the AsyncResult object.

The recipient of the result accepts the exception and catches it, reporting a failure case.

Unable to get the result

This approach will also work for any task issued synchronously to the ThreadPool.

In this case, the exception raised by the task is caught by the ThreadPool and re-raised in the caller when getting the result.

The example below demonstrates handling an exception in the caller for a task issued synchronously.

# SuperFastPython.com
# example of handling an exception raised within a task in the caller
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task():
    # block for a moment
    sleep(1)
    # fail with an exception
    raise Exception('Something bad happened!')
    # unreachable return value
    return 'Never gets here'

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        try:
            # issue a task and get the result
            value = pool.apply(task)
            # report the result
            print(value)
        except Exception:
            print('Unable to get the result')

Running the example creates the ThreadPool and issues the work as per normal.

The task fails with an error, the ThreadPool catches the exception, stores it, then re-raises it in the caller rather than returning the value.

The recipient of the result accepts the exception and catches it, reporting a failure case.

Unable to get the result

Check for a Task Exception

We can also check for the exception directly via a call to the successful() function on the AsyncResult object for tasks issued asynchronously to the ThreadPool.

This function must be called after the task has finished and indicates whether the task finished normally (True) or whether it failed with an Exception or similar (False).

We can demonstrate the explicit checking for an exceptional case in the task in the example below.

# SuperFastPython.com
# example of checking for an exception raised in the task
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task():
    # block for a moment
    sleep(1)
    # fail with an exception
    raise Exception('Something bad happened!')
    # unreachable return value
    return 'Never gets here'

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issue a task
        result = pool.apply_async(task)
        # wait for the task to finish
        result.wait()
        # check for a failure
        if result.successful():
            # get the result
            value = result.get()
            # report the result
            print(value)
        else:
            # report the failure case
            print('Unable to get the result')

Running the example creates and submits the task as per normal.

The recipient waits for the task to complete and then checks for an unsuccessful case.

The failure of the task is identified and an appropriate message is reported.

Unable to get the result

Exception Handling When Calling map()

We may issue many tasks to the ThreadPool using the synchronous version of the map() function or starmap().

One or more of the issued tasks may fail, which will effectively cause all issued tasks to fail as the results will not be accessible.

We can demonstrate this with an example, listed below.

# SuperFastPython.com
# exception in one of many tasks issued to the thread pool synchronously
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(value):
    # block for a moment
    sleep(1)
    # check for failure case
    if value == 2:
        raise Exception('Something bad happened!')
    # report a value
    return value

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issues tasks to the thread pool
        for result in pool.map(task, range(5)):
            print(result)

Running the example creates the ThreadPool and issues 5 tasks using map().

One of the 5 tasks fails with an exception.

The exception is then re-raised in the caller instead of returning the iterator over return values.

Traceback (most recent call last):
  ...
Exception: Something bad happened!

This also happens when issuing tasks using the asynchronous versions of map(), such as map_async().

The example below demonstrates this.

# SuperFastPython.com
# exception in one of many tasks issued to the thread pool asynchronously
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(value):
    # block for a moment
    sleep(1)
    # check for failure case
    if value == 2:
        raise Exception('Something bad happened!')
    # report a value
    return value

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issues tasks to the thread pool
        result = pool.map_async(task, range(5))
        # iterate over the results
        for value in result.get():
            print(value)

Running the example creates the ThreadPool and issues 5 tasks using map_async().

One of the 5 tasks fails with an exception.

The exception is then re-raised in the caller instead of returning the iterator over return values.

Traceback (most recent call last):
  ...
Exception: Something bad happened!

If we issue tasks with imap() and imap_unordered(), the exception is not re-raised in the caller until the return value for the specific task that failed is requested from the returned iterator.

The example below demonstrates this.

# SuperFastPython.com
# exception in one of many tasks issued to the thread pool synchronously
from time import sleep
from multiprocessing.pool import ThreadPool

# task executed in a worker thread
def task(value):
    # block for a moment
    sleep(1)
    # check for failure case
    if value == 2:
        raise Exception('Something bad happened!')
    # report a value
    return value

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issues tasks to the thread pool
        for result in pool.imap(task, range(5)):
            print(result)

Running the example creates the ThreadPool and issues 5 tasks using imap().

One of the 5 tasks fails with an exception.

We see return values for the first two tasks that complete successfully.

Then, when we access the result for the third task that failed, the exception is re-raised in the caller and the program is terminated.

0
1
Traceback (most recent call last):
  ...
Exception: Something bad happened!

These examples highlight that if map() or equivalents are used to issue tasks to the ThreadPool, then the tasks should handle their own exceptions or be simple enough that exceptions are not expected.

Exception Handling in Task Completion Callbacks

A final case we must consider for exception handling when using the ThreadPool is in callback functions.

When issuing tasks to the ThreadPool asynchronously with a call to apply_async() or map_async() we can add a callback function to be called with the result of the task or a callback function to call if there was an error in the task.

For example:

# result callback function
def result_callback(result):
	print(result)

...
# issue a single task
result = apply_async(..., callback=result_callback)

You can learn more about using callback function with asynchronous tasks in the tutorial:

The callback function is executed in a helper thread in the main thread, the same thread that creates the ThreadPool.

If an exception is raised in the callback function, it will break the helper thread and in turn break the ThreadPool.

Any tasks waiting for a result from the ThreadPool will wait forever and will have to be killed manually.

We can demonstrate this with a worked example.

# SuperFastPython.com
# example in a callback function for the thread pool
from time import sleep
from multiprocessing.pool import ThreadPool

# callback function
def handler(result):
    # report result
    print(f'Got result {result}')
    # fail with an exception
    raise Exception('Something bad happened!')

# task executed in a worker thread
def task():
    # block for a moment
    sleep(1)
    # return a value
    return 22

# protect the entry point
if __name__ == '__main__':
    # create a thread pool
    with ThreadPool() as pool:
        # issue a task to the thread pool
        result = pool.apply_async(task, callback=handler)
        # wait for the task to finish
        result.wait()

Running the example starts the ThreadPool as per normal and issues the task.

When the task completes, the callback function is called which fails with a raised exception.

The helper thread (Thread-3 in this case) unwinds and breaks the ThreadPool.

The caller in the main thread of the main thread then waits forever for the result.

Note, that you must terminate the program forcefully by pressing Control-C.

Got result 22
Exception in thread Thread-11:
Traceback (most recent call last):
  ...
Exception: Something bad happened!

This highlights that if callbacks are expected to raise an exception, it must be handled explicitly otherwise it puts the entire thread pool at risk.

Takeaways

You now know how to handle exceptions when using the ThreadPool.



If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.