Last Updated on September 12, 2022
You can specify a custom error callback function when using the apply_async(), map_async(), and starmap_async() functions in multiprocessing pool via the “error_callback” argument.
In this tutorial you will discover how to use error callback functions with the process pool in Python.
Let’s get started.
Need to Use Error Callbacks with the Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When issuing tasks to the process pool asynchronously, we may need to configure an error callback function. That is, we may need to have a custom function called automatically if an error occurs within an issued task.
How can we use error callback functions with the process pool in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Configure an Error Callback Function
The multiprocessing.pool.Pool supports custom callback functions.
Callback functions are called in two situations:
- With the results of a task.
- When an error is raised in a task.
We are only considering error callbacks called with the results of a task.
Error callbacks are supported in the process pool when issuing tasks asynchronously with any of the following functions:
- apply_async(): For issuing a single task asynchronously.
- map_async(): For issuing multiple tasks with a single argument asynchronously.
- starmap_async(): For issuing multiple tasks with multiple arguments asynchronously.
An error callback can be specified via the “error_callback” argument.
The argument specifies the name of a custom function to call with the error raised in an asynchronous task.
Note, the first task to raise an error will be called, not all tasks that raise an error.
The function may have any name you like, as long as it does not conflict with a function name already in use.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
— multiprocessing — Process-based parallelism
For example, if apply_async() is configured with an error callback, then the callback function will be called with the error raised in the task.
1 2 3 4 5 6 7 |
# error callback function def custom_callback(error): print(error, flush=True) ... # issue a single task result = apply_async(..., error_callback=custom_callback) |
Error callbacks should be used to perform a quick action with the error raised by a task in the process pool.
They should not block or execute for an extended period as they will occupy the resources of the process pool while running.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
— multiprocessing — Process-based parallelism
Now that we know how to configure an error callback function, let’s look at some worked examples.
Example of Error Callback Function for apply_async()
We can explore how to use an error callback with the process pool when issuing tasks via the apply_async() function.
In this example we will define a task that generates a random number, reports the number, blocks for a moment, then raises an exception. An error callback function will be defined that receives the error from the task function and reports the details.
Firstly, we can define the error callback function.
The function takes a single error from a target task function and reports it directly.
The handler() function below implements this.
1 2 3 |
# error callback function def handler(error): print(f'Error: {error}', flush=True) |
Next, we can define a target task function.
The function takes a unique integer identifier for the task. It then generates a random number between 0 and 1 and sleeps for a fraction of a second to simulate computational effort. Finally, it raises an exception before it has a chance to return the random value that was generated.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # raise an exception raise Exception('Something bad happened') # return the generated value return value |
We then define a process pool with the default number of worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We will then issue the task to the process pool using the apply_async() and specify the error callback function to execute with the result of the task.
1 2 3 |
... # issue tasks to the process pool result = pool.apply_async(task, args=(0,), error_callback=handler) |
You can learn more about the apply_async() function in the tutorial:
Finally, the main process will close the process pool and wait for the issued task to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
You can learn more about joining the process pool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of an error callback function for apply_async() from random import random from time import sleep from multiprocessing.pool import Pool # error callback function def handler(error): print(f'Error: {error}', flush=True) # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # raise an exception raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), error_callback=handler) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool with the default configuration.
Then the task is issued to the process pool. The main process then closes the pool and then waits for the issued task to complete.
The task function executes, generating a random number, reporting a message, blocking then raising an exception.
The error callback function is then called with the exception instance, and the details are reported.
The task ends and the main process wakes up and continues on, closing the program.
1 2 |
Task 0 executing with 0.5733818665399869 Error: Something bad happened |
Next, let’s look at an example of using an error callback function with the map_async() function.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Error Callback Function for map_async()
We can explore how to use an error callback with the process pool when issuing tasks via the map_async() function.
In this example, we can update the previous example to issue multiple calls to the task() function. All tasks will fail, but only one will call the error callback function and the details will be reported
The error callback function and task function do not need any change and can be used as-is.
In the main process, we will call the map_async() function to issue five calls to the task function with integer values from 0 to 9 and to call the error callback function once all tasks are completed.
1 2 3 |
... # issue tasks to the process pool result = pool.map_async(task, range(5), error_callback=handler) |
You can learn more about the map_async() function in the tutorial:
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of an error callback function for map_async() from random import random from time import sleep from multiprocessing.pool import Pool # error callback function def handler(error): print(f'Error: {error}', flush=True) # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # raise an exception raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), error_callback=handler) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool with the default configuration.
Then the 5 tasks are issued to the process pool. The main process then closes the pool and then waits for the issued tasks to complete.
The tasks execute in the process pool. Each task runs, generating a random number, reporting a message, blocking and then raising an exception.
The error callback function is then called once, with the error raised by the first task that failed. The details of the exception passed in as an argument are then reported.
All tasks end, and the main process wakes up and continues on, closing the program.
1 2 3 4 5 6 |
Task 0 executing with 0.34212481988546195 Task 1 executing with 0.6263747401615434 Task 2 executing with 0.6189520747536078 Task 3 executing with 0.9796725071490143 Task 4 executing with 0.5299654531937743 Error got: Something bad happened |
Next, let’s look at an example of using an error callback function with the starmap_async() function.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Error Callback Function for starmap_async()
We can explore how to use an error callback with the process pool when issuing tasks via the starmap_async() function.
In this example, we will update the previous example so that the target task function takes two arguments instead of one. The second argument will be a generated random value that will be passed in instead of generated in the task. A list of arguments will be prepared in the main process that contain the integer identifiers and random values, then tasks with multiple arguments will be issued via the starmap_async() function.
Firstly, we must update the task() function to take the random number as a second argument and to not generate a number as part of the task.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # raise an exception raise Exception('Something bad happened') # return the generated value return value |
Next, in the main process we will first create a list of arguments. Each item in the list will be a tuple of arguments for one call to the target task() function, containing an integer and a generated random floating point value.
1 2 3 |
... # prepare arguments items = [(i, random()) for i in range(5)] |
We can then issue the 5 tasks via the starmap_async(), configured to call an error callback function once all of the issued tasks have completed.
1 2 3 |
... # issue tasks to the process pool result = pool.starmap_async(task, items, error_callback=handler) |
You can learn more about the starmap_async() function in the tutorial:
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of an error callback function for starmap_async() from random import random from time import sleep from multiprocessing.pool import Pool # error callback function def handler(error): print(f'Error: {error}', flush=True) # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # raise an exception raise Exception('Something bad happened') # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(5)] # issue tasks to the process pool result = pool.starmap_async(task, items, error_callback=handler) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool with the default configuration.
Then the 5 tasks are issued to the process pool. The main process then closes the pool and then waits for the issued tasks to complete.
The tasks execute in the process pool. Each task runs, reporting a message, blocking and then raising an exception.
All tasks complete and their return values are gathered.
The error callback function is then called with an iterable of the return values. The iterable is traversed and each value is then reported.
All tasks end, and the main process wakes up and continues on, closing the program.
1 2 3 4 5 6 |
Task 0 executing with 0.07242882273875217 Task 1 executing with 0.4517291952736481 Task 2 executing with 0.24844159488291695 Task 3 executing with 0.365882805037431 Task 4 executing with 0.0457172378448234 Error: Something bad happened |
Next, let’s take a look at the specific process and thread used to execute the error callback function.
Which Process and Thread Executes the Error Callback
We can explore the specific process and thread used to execute the error callback function.
In this example we will define a task that gets and reports the current process and current thread then raises an exception. We expect this to be the main thread of a child worker process. We will also get and report the current process and current thread in the main process, which we expect to be the main thread of the main process. Finally, we will get and report the current process and thread used to execute the error callback.
Firstly, we can define an error callback function that gets and reports the current process and current thread.
Recall that we can access the current process via the multiprocessing.current_process() module function that returns a multiprocessing.Process instance. Similarly, we can access the current thread via the threading.current_thread() function that returns a threading.Thread instance.
This will report the details of the process and thread executing the error callback function, most notably the names and the ids.
The handler() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# error callback function def handler(error): # get the current process process = current_process() # report the details of the current process print(f'Callback Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Callback Thread: {thread}', flush=True) |
Next, we can define a target task function that does the same thing. It gets and reports the current process and thread used to execute the task, then raises an exception.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 11 12 |
# task executed in a worker process def task(identifier): # get the current process process = current_process() # report the details of the current process print(f'Task Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Task Thread: {thread}', flush=True) # raise an exception raise Exception('Something bad happened') |
Finally, in the main process, we will create and configure a process pool using the context manager, then issue a single task to the process pool via the apply_async() function, configured with an error callback function.
1 2 3 4 5 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), error_callback=handler) |
We will then get and report the details of the current process and thread.
1 2 3 4 5 6 7 8 9 |
... # get the current process process = current_process() # report the details of the current process print(f'Main Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Main Thread: {thread}', flush=True) |
We will then close the process pool and wait for the issued task to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of reporting the process and thread that executes the error callback function from random import random from time import sleep from threading import current_thread from multiprocessing import current_process from multiprocessing.pool import Pool # error callback function def handler(error): # get the current process process = current_process() # report the details of the current process print(f'Callback Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Callback Thread: {thread}', flush=True) # task executed in a worker process def task(identifier): # get the current process process = current_process() # report the details of the current process print(f'Task Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Task Thread: {thread}', flush=True) # raise an exception raise Exception('Something bad happened') # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), error_callback=handler) # get the current process process = current_process() # report the details of the current process print(f'Main Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Main Thread: {thread}', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool.
The main process then issues a task to the process pool. It then reports the current process and thread.
In this case, we can see that the program is executed by the “MainProcess” and the “MainThread” as we might expect.
You can learn more about the main process in the tutorial:
Next, the task is executed.
In this case, we can see that the task is executed by the main thread in a child worker process created with the spawn start method (the default on my system), with the names “SpawnProcess” and “MainThread“.
You can learn more about the child worker processes and their names created in the process pool in the tutorial:
Finally, the error callback function is executed. In this case, we can see that the callback is executed by a helper thread named “Thread-3” in the main process named “MainProcess“
This highlights that the error callback is executed in the main process by a helper thread provided by the process pool.
You can learn more about the helper threads created by the process pool in the tutorial:
Note, the specific process ids and some of the process names may be different, depending on the specifics of your system.
1 2 3 4 5 6 |
Main Process: <_MainProcess name='MainProcess' parent=None started> Main Thread: <_MainThread(MainThread, started 4765982208)> Task Process: <SpawnProcess name='SpawnPoolWorker-1' parent=42021 started daemon> Task Thread: <_MainThread(MainThread, started 4638854656)> Callback Process: <_MainProcess name='MainProcess' parent=None started> Callback Thread: <Thread(Thread-3, started daemon 123145570770944)> |
Next, let’s look at how we might share data with the error callback function.
How to Share Data With the Error Callback Function
We can explore how to share data with the error callback function.
As we discovered in the previous section, both the code where we create the process pool and issue tasks and the error callback function are executed in the main process of the application, at least in these examples.
Therefore, we can define a global variable in the program and have it shared and available to the error callback function.
We will define a global variable with a random floating point value between 0 and 1. We will then access this global variable in the error callback function, then change it. Once all tasks are finished, we will then report the global variable again and confirm that change was reflected.
This will demonstrate how the error callback function can both access and change global variables from the program.
Firstly, we can define an error callback function that declares the global variable named “data“, reports the value, changes it, then reports the changed value.
The handler() function below implements this.
1 2 3 4 5 6 7 8 9 |
# error callback function def handler(error): global data # report shared global data from main process print(f'Callback data: {data}', flush=True) # change it data = random() # report changed global data print(f'Callback data now: {data}', flush=True) |
Next, we can define a target task function that blocks for a moment to simulate computational effort, then raises an exception.
1 2 3 4 5 6 |
# task executed in a worker process def task(identifier): # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') |
Finally, in the main process we can define a global variable and assign it a random number.
1 2 3 |
... # prepare shared global data data = random() |
We can then create the process pool, issue the task configured with the error callback, then wait for the task to complete.
1 2 3 4 5 6 7 8 9 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), error_callback=handler) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
We will then report the global variable in order to confirm the changes made in the error callback function are reflected in the main thread of the main process.
1 2 3 |
... # report shared global data again print(f'Main data: {data}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of sharing data with the error callback function from random import random from time import sleep from multiprocessing.pool import Pool # error callback function def handler(error): global data # report shared global data from main process print(f'Callback data: {data}', flush=True) # change it data = random() # report changed global data print(f'Callback data now: {data}', flush=True) # task executed in a worker process def task(identifier): # block for a moment sleep(1) # raise an exception raise Exception('Something bad happened') # protect the entry point if __name__ == '__main__': # prepare shared global data data = random() print(f'Main data: {data}', flush=True) # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), error_callback=handler) # close the process pool pool.close() # wait for all tasks to complete pool.join() # report shared global data again print(f'Main data: {data}', flush=True) |
Running the example first creates the global variable and assigns it a random number, then reports the value.
Next, the process pool is created and the task is issued. The main thread then waits for the task in the process pool to finish.
The task executes, blocking for a second, then raises an exception.
The error callback function is called. It declares the global variable, then reports its value. We can see that the reported value matches the value reported from the main thread. It then assigns a new random value to the global variable and reports its value.
The main thread continues on and reports the current value of the global variable. We can see that the main thread sees the changed value, matching the value that was set in the error callback function.
This highlights how data can be made available to the error callback function and how the callback function may make data available to the main thread.
Note, the results will differ each time the program is run given the use of random numbers.
1 2 3 4 |
Main data: 0.4109063218234106 Callback data: 0.4109063218234106 Callback data now: 0.5713657283355823 Main data: 0.5713657283355823 |
Next, let’s take a closer look at when an error callback function is executed.
When is the Error Callback Executed
We can explore when exactly the error callback function is called.
In this example we will report a message in the main process, in the task, and in the error callback function, then after the task is completed. The order of the reported messages will give an idea of when exactly the error callback function is executed.
Firstly, we can define an error callback function that just reports a message that it is finished.
1 2 3 |
# error callback function def handler(error): print(f'Error callback done.', flush=True) |
Similarly, we can define a task function that reports a message that it is finished, then raises an exception.
1 2 3 4 5 |
# task executed in a worker process def task(identifier): print(f'Task {identifier} done.', flush=True) # raise an exception raise Exception('Something bad happened') |
We can create a process pool and issue five tasks to the pool, configured with the custom error callback function.
1 2 3 4 5 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), error_callback=handler) |
We can then report a message, wait for the tasks to complete, then report another message.
1 2 3 4 5 6 |
... # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) |
Finally, the pool can be closed.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of determining when the error callback is called from random import random from time import sleep from multiprocessing.pool import Pool # error callback function def handler(error): print(f'Error callback done.', flush=True) # task executed in a worker process def task(identifier): print(f'Task {identifier} done.', flush=True) # raise an exception raise Exception('Something bad happened') # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), error_callback=handler) # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool, then issues five tasks.
The main process then reports a message that the tasks are issued, then blocks until the tasks are done.
Next, each task executes and reports a message that they are done, then raises an exception.
The error callback function is then called and reports that it is done.
Finally, the main task unblocks, reports a message that all tasks are done, and closes the process pool.
This highlights that the error callback is called after all issued tasks are completed, but before the caller is made aware that the tasks are done. That is, the error callback is a part of the task from the perspective of the caller waiting on the task to complete.
1 2 3 4 5 6 7 8 |
Main tasks issued. Task 0 done. Task 1 done. Task 2 done. Task 3 done. Task 4 done. Error callback done. Main tasks done. |
Next, let’s take a look at what happens if there is an error in a callback function.
What Happens if an Error Callback Raises an Exception
We can explore what happens if an error occurs in the error callback function.
In this example, we will define an error callback function that raises an exception. We will then issue the task as per normal and wait for it to complete.
Firstly, we can define the custom error callback function that reports a message then raises an exception.
1 2 3 4 5 |
# error callback function def handler(error): print(f'Error callback running.', flush=True) # failure raise Exception('Something bad happened') |
Next, we will define a target task function that simply reports a message and raises an exception ensuring the error callback is called.
1 2 3 4 5 6 |
# task executed in a worker process def task(identifier): # report a message print(f'Task {identifier} done.', flush=True) # raise an exception raise Exception('Something bad happened') |
In the main process, we will create the process pool, then issue five tasks to the pool with the configured error callback.
1 2 3 4 5 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), error_callback=handler) |
The main process will then wait for the tasks to complete, then close the process pool.
1 2 3 4 5 6 7 8 9 10 |
... # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of determining what happens if an exception is raised in the error callback from time import sleep from multiprocessing.pool import Pool # error callback function def handler(error): print(f'Error callback running.', flush=True) # failure raise Exception('Something bad happened') # task executed in a worker process def task(identifier): # report a message print(f'Task {identifier} done.', flush=True) # raise an exception raise Exception('Something bad happened') # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), error_callback=handler) # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool.
It then issues 5 tasks and waits for them to complete.
Each task executes, reporting a message as per normal, then raising an exception.
The error callback function is then called after all tasks are finished. A message is reported, then an exception is raised.
The exception then appears to unwind the helper thread in the process pool. This very likely breaks the process pool.
The main thread in the main process blocks forever waiting for the issued tasks to complete. The tasks never complete because the error callback function never completed successfully.
This highlights that care must be taken in the error callback function as an error in the callback may bring down the application.
Note, you will have to manually terminate the program, e.g. Control-C.
1 2 3 4 5 6 7 8 9 10 11 |
Main tasks issued. Task 0 done. Task 1 done. Task 2 done. Task 3 done. Task 4 done. Error callback running. Exception in thread Thread-3: Traceback (most recent call last): ... Exception: Something bad happened |
Error Callback Common Questions
This section lists common questions about error callbacks used with asynchronous tasks in the process pool.
Do you have any questions?
Let me know below.
What is an Error Callback Function?
An error callback is a function called automatically if an asynchronous task issued to the process pool raises an error or exception.
When Should I Use an Error Callback?
You can use an error callback function when issuing asynchronous tasks to the process pool.
An error callback function can be used to handle an unexpected error in a task, such as reporting or logging the failure.
Error callback functions should not take a long time to execute as they will occupy resources of the process pool.
What Argument Does the Error Callback Receive?
The error callback function will receive the exception or error raised in the task executed by the process pool.
Which Process and Thread Executes the Error Callback Function?
The error callback function is executed in a helper thread of the process pool in the main process, or the process in which the process pool was created and tasks were issued.
Will The Result Callback Be Called Before The Error Callback?
No.
If at least one task raises an error, then the result callback will not be called and the error callback function will be called instead.
You can learn more about the result callback in the tutorial:
Can I Issue Follow-Up Tasks From the Error Callback?
Yes.
The process pool can be made accessible via a global variable and follow-up tasks may be issued directly from within the error callback function.
For an example of issuing follow-up tasks automatically from a callback function, see the tutorial:
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use error callback functions with the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Bas van den Eijkhof on Unsplash
Do you have any questions?