Last Updated on September 12, 2022
You can specify a custom callback function when using the apply_async(), map_async(), and starmap_async() functions in multiprocessing pool class via the “callback” argument.
In this tutorial you will discover how to use callback functions with the multiprocessing pool in Python.
Let’s get started.
Need to Use Callbacks with the Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When issuing tasks to the process pool asynchronously, we may need to configure a callback function. That is, we may need to have a custom function called automatically with the results of each task.
How can we use callback functions with the process pool in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Configure a Callback Function
The multiprocessing.pool.Pool supports custom callback functions.
Callback functions are called in two situations:
- With the results of a task.
- When an error is raised in a task.
We are only considering callbacks called with the results of a task, so-called result callbacks.
Result callbacks are supported in the process pool when issuing tasks asynchronously with any of the following functions:
- apply_async(): For issuing a single task asynchronously.
- map_async(): For issuing multiple tasks with a single argument asynchronously.
- starmap_async(): For issuing multiple tasks with multiple arguments asynchronously.
A result callback can be specified via the “callback” argument.
The argument specifies the name of a custom function to call with the result of asynchronous task or tasks.
Note, a configured callback function will be called, even if your task function does not have a return value. In that case, a default return value of None will be passed as an argument to the callback function.
The function may have any name you like, as long as it does not conflict with a function name already in use.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it
— multiprocessing — Process-based parallelism
For example, if apply_async() is configured with a callback, then the callback function will be called with the return value of the task function that was executed.
1 2 3 4 5 6 7 |
# result callback function def result_callback(result): print(result, flush=True) ... # issue a single task result = apply_async(..., callback=result_callback) |
Alternatively, if map_async() or starmap_async() are configured with a callback, then the callback function will be called with an iterable of return values from all tasks issued to the process pool.
1 2 3 4 5 6 7 8 9 |
# result callback function def result_callback(result): # iterate all results for value in result: print(value, flush=True) ... # issue a single task result = map_async(..., callback=result_callback) |
Result callbacks should be used to perform a quick action with the result or results of issued tasks from the process pool.
They should not block or execute for an extended period as they will occupy the resources of the process pool while running.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
— multiprocessing — Process-based parallelism
Now that we know how to configure a result callback function, let’s look at some worked examples.
Example Callback Function for apply_async()
We can explore how to use a result callback with the process pool when issuing tasks via the apply_async() function.
In this example we will define a task that generates a random number, reports the number, blocks for a moment, then returns the value that was generated. A callback function will be defined that receives the return value from the task function and reports the value.
Firstly, we can define the result callback function.
The function takes a single return value from a target task function and reports it directly.
The result_callback() function below implements this.
1 2 3 |
# result callback function def result_callback(result): print(f'Callback got: {result}', flush=True) |
Next, we can define a target task function.
The function takes a unique integer identifier for the task. It then generates a random number between 0 and 1 and sleeps for a fraction of a second to simulate computational effort. Finally, it returns the random value that was generated.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value |
We then define a process pool with the default number of worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We will then issue the task to the process pool using the apply_async() and specify the result callback function to execute with the result of the task.
1 2 3 |
... # issue tasks to the process pool result = pool.apply_async(task, args=(0,), callback=result_callback) |
Finally, the main process will close the process pool and wait for the issued task to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
You can learn more about joining the process pool in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of a callback function for apply_async() from random import random from time import sleep from multiprocessing.pool import Pool # result callback function def result_callback(result): print(f'Callback got: {result}', flush=True) # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), callback=result_callback) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool with the default configuration.
Then the task is issued to the process pool. The main process then closes the pool and then waits for the issued task to complete.
The task function executes, generating a random number, reporting a message, blocking and returning a value.
The result callback function is then called with the generated value, which is then reported.
The task ends and the main process wakes up and continues on, closing the program.
1 2 |
Task 0 executing with 0.2998712545246843 Callback got: 0.2998712545246843 |
Next, let’s look at an example of using a callback function with the map_async() function.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example Callback Function for map_async()
We can explore how to use a result callback with the process pool when issuing tasks via the map_async() function.
In this example, we can update the previous example to issue multiple calls to the task() function. The callback function will then be called with an iterable of return values from all tasks, that will be iterated and each value reported.
Firstly, we must update the callback function to receive an iterable of return values instead of a single return value. It traverses the return values and reports each value.
The updated result_callback() function is listed below.
1 2 3 4 5 |
# result callback function def result_callback(result): # iterate over all results for item in result: print(f'Callback got: {item}', flush=True) |
The task function does not need any change and can be used as-is.
Next, in the main process, we will call the map_async() function to issue five calls to the task function with integer values from 0 to 9 and to call the callback function once all tasks are completed.
1 2 3 |
... # issue tasks to the process pool result = pool.map_async(task, range(5), callback=result_callback) |
You can learn more about the map_async() function in the tutorial:
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of a callback function for map_async() from random import random from time import sleep from multiprocessing.pool import Pool # result callback function def result_callback(result): # iterate over all results for item in result: print(f'Callback got: {item}', flush=True) # task executed in a worker process def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), callback=result_callback) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool with the default configuration.
Then the 5 tasks are issued to the process pool. The main process then closes the pool and then waits for the issued tasks to complete.
The tasks execute in the process pool. Each task runs, generating a random number, reporting a message, blocking and returning a value.
All tasks complete and their return values are gathered.
The result callback function is then called with an iterable of the return values. The iterable is traversed and each value is then reported.
All tasks end, and the main process wakes up and continues on, closing the program.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.28084761901957966 Task 1 executing with 0.7827992273822049 Task 2 executing with 0.08924616372428673 Task 3 executing with 0.829206520017164 Task 4 executing with 0.4014550685167869 Callback got: 0.28084761901957966 Callback got: 0.7827992273822049 Callback got: 0.08924616372428673 Callback got: 0.829206520017164 Callback got: 0.4014550685167869 |
Next, let’s look at an example of using a callback function with the starmap_async() function.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example Callback Function for starmap_async()
We can explore how to use a result callback with the process pool when issuing tasks via the starmap_async() function.
In this example, we will update the previous example so that the target task function takes two arguments instead of one. The second argument will be a generated random value that will be passed in instead of generated in the task. A list of arguments will be prepared in the main process that contain the integer identifiers and random values, then tasks with multiple arguments will be issued via the starmap_async() function.
Firstly, we must update the task() function to take the random number as a second argument and to not generate a number as part of the task.
The updated task() function with these changes is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value |
Next, in the main process we will first create a list of arguments. Each item in the list will be a tuple of arguments for one call to the target task() function, containing an integer and a generated random floating point value.
1 2 3 |
... # prepare arguments items = [(i, random()) for i in range(5)] |
We can then issue the 5 tasks via the starmap_async(), configured to call a callback function once all of the issued tasks have completed.
1 2 3 |
... # issue tasks to the process pool result = pool.starmap_async(task, items, callback=result_callback) |
You can learn more about the starmap_async() function in the tutorial:
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of a callback function for starmap_async() from random import random from time import sleep from multiprocessing.pool import Pool # result callback function def result_callback(result): # iterate over all results for item in result: print(f'Callback got: {item}', flush=True) # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(5)] # issue tasks to the process pool result = pool.starmap_async(task, items, callback=result_callback) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool with the default configuration.
Then the 5 tasks are issued to the process pool. The main process then closes the pool and then waits for the issued tasks to complete.
The tasks execute in the process pool. Each task runs, reporting a message, blocking and returning a value.
All tasks complete and their return values are gathered.
The result callback function is then called with an iterable of the return values. The iterable is traversed and each value is then reported.
All tasks end, and the main process wakes up and continues on, closing the program.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.834942465014341 Task 1 executing with 0.7453534961538749 Task 2 executing with 0.07411023076351042 Task 3 executing with 0.1270685080190611 Task 4 executing with 0.6265983893158643 Callback got: 0.834942465014341 Callback got: 0.7453534961538749 Callback got: 0.07411023076351042 Callback got: 0.1270685080190611 Callback got: 0.6265983893158643 |
Next, let’s take a look at the specific process and thread used to execute the callback function.
Which Process and Thread Executes the Callback
We can explore the specific process and thread used to execute the result callback function.
In this example we will define a task that gets and reports the current process and current thread. We expect this to be the main thread of a child worker process. We will also get and report the current process and current thread in the main process, which we expect to be the main thread of the main process. Finally, we will get and report the current process and thread used to execute the result callback.
Firstly, we can define a result callback function that gets and reports the current process and current thread.
Recall that we can access the current process via the multiprocessing.current_process() module function that returns a multiprocessing.Process instance. Similarly, we can access the current thread via the threading.current_thread() function that returns a threading.Thread instance.
This will report the details of the process and thread executing the callback function, most notably the names and the ids.
The result_callback() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# result callback function def result_callback(result): # get the current process process = current_process() # report the details of the current process print(f'Callback Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Callback Thread: {thread}', flush=True) |
Next, we can define a target task function that does the same thing. It gets and reports the current process and thread used to execute the task.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier): # get the current process process = current_process() # report the details of the current process print(f'Task Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Task Thread: {thread}', flush=True) |
Finally, in the main process, we will create and configure a process pool using the context manager, then issue a single task to the process pool via the apply_async() function, configured with a result callback function.
1 2 3 4 5 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), callback=result_callback) |
We will then get and report the details of the current process and thread.
1 2 3 4 5 6 7 8 9 |
... # get the current process process = current_process() # report the details of the current process print(f'Main Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Main Thread: {thread}', flush=True) |
We will then close the process pool and wait for the issued task to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# SuperFastPython.com # example of reporting the process and thread that executes the callback function from random import random from time import sleep from threading import current_thread from multiprocessing import current_process from multiprocessing.pool import Pool # result callback function def result_callback(result): # get the current process process = current_process() # report the details of the current process print(f'Callback Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Callback Thread: {thread}', flush=True) # task executed in a worker process def task(identifier): # get the current process process = current_process() # report the details of the current process print(f'Task Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Task Thread: {thread}', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), callback=result_callback) # get the current process process = current_process() # report the details of the current process print(f'Main Process: {process}', flush=True) # get the current thread thread = current_thread() # report the details of the current thread print(f'Main Thread: {thread}', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first starts the process pool.
The main process then issues a task to the process pool. It then reports the current process and thread.
In this case, we can see that the program is executed by the “MainProcess” and the “MainThread” as we might expect.
You can learn more about the main process in the tutorial:
Next, the task is executed.
In this case, we can see that the task is executed by the main thread in a child worker process created with the spawn start method (the default on my system), with the names “SpawnProcess” and “_MainThread“.
You can learn more about the child worker processes and their names created in the process pool in the tutorial:
Finally, the result callback function is executed. In this case, we can see that the callback is executed by a helper thread named “Thread-3” in the main process named “MainProcess“
This highlights that the callback is executed in the main process by a helper thread provided by the process pool.
You can learn more about the helper threads created by the process pool in the tutorial:
Note, the specific process ids and some of the process names may be different, depending on the specifics of your system.
1 2 3 4 5 6 |
Main Process: <_MainProcess name='MainProcess' parent=None started> Main Thread: <_MainThread(MainThread, started 4795883008)> Task Process: <SpawnProcess name='SpawnPoolWorker-3' parent=38978 started daemon> Task Thread: <_MainThread(MainThread, started 4422417920)> Callback Process: <_MainProcess name='MainProcess' parent=None started> Callback Thread: <Thread(Thread-3, started daemon 123145557532672)> |
Next, let’s look at how we might share data with the callback function.
How to Share Data With the Callback Function
We can explore how to share data with the callback function.
As we discovered in the previous section, both the code where we create the process pool and issue tasks and the callback function are executed in the main process of the application, at least in these examples.
Therefore, we can define a global variable in the program and have it shared and available to the callback function.
We will define a global variable with a random floating point value between 0 and 1. We will then access this global variable in the callback function, then change it. Once all tasks are finished, we will then report the global variable again and confirm that change was reflected.
This will demonstrate how the callback function can both access and change global variables from the program.
Firstly, we can define a callback function that declares the global variable named “data“, reports the value, changes it, then reports the changed value.
The result_callback() function below implements this.
1 2 3 4 5 6 7 8 9 |
# result callback function def result_callback(result): global data # report shared global data from main process print(f'Callback data: {data}', flush=True) # change it data = random() # report changed global data print(f'Callback data now: {data}', flush=True) |
Next, we can define a target task function that blocks for a moment to simulate computational effort.
1 2 3 |
# task executed in a worker process def task(identifier): sleep(1) |
Finally, in the main process we can define a global variable and assign it a random number.
1 2 3 |
... # prepare shared global data data = random() |
We can then create the process pool, issue the task configured with the result callback, then wait for the task to complete.
1 2 3 4 5 6 7 8 9 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), callback=result_callback) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
We will then report the global variable in order to confirm the changes made in the callback function are reflected in the main thread of the main process.
1 2 3 |
... # report shared global data again print(f'Main data: {data}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of sharing data with the callback function from random import random from time import sleep from multiprocessing.pool import Pool # result callback function def result_callback(result): global data # report shared global data from main process print(f'Callback data: {data}', flush=True) # change it data = random() # report changed global data print(f'Callback data now: {data}', flush=True) # task executed in a worker process def task(identifier): sleep(1) # protect the entry point if __name__ == '__main__': # prepare shared global data data = random() print(f'Main data: {data}', flush=True) # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.apply_async(task, args=(0,), callback=result_callback) # close the process pool pool.close() # wait for all tasks to complete pool.join() # report shared global data again print(f'Main data: {data}', flush=True) |
Running the example first creates the global variable and assigns it a random number, then reports the value.
Next, the process pool is created and the task is issued. The main thread then waits for the task in the process pool to finish.
The task executes, blocking for a second, then the task finishes.
The callback function is called, even though there is no return value. E.g. the argument to the function will be None.
The callback function declares the global variable, then reports its value. We can see that the reported value matches the value reported from the main thread. It then assigns a new random value to the global variable and reports its value.
The main thread continues on and reports the current value of the global variable. We can see that the main thread sees the changed value, matching the value that was set in the callback function.
This highlights how data can be made available to the callback function and how the callback function may make data available to the main thread.
Note, the results will differ each time the program is run given the use of random numbers.
1 2 3 4 |
Main data: 0.9727166653031785 Callback data: 0.9727166653031785 Callback data now: 0.2009902690070836 Main data: 0.2009902690070836 |
Next, let’s take a closer look at when a callback function is executed.
When is the Callback Executed
We can explore when exactly the callback function is called.
IN this example we will report a message in the main process, in the task, and in the callback function, then after the task is completed. The order of the reported messages will give an idea of when exactly the callback function is executed.
Firstly, we can define a callback function that just reports a message that it is finished.
1 2 3 |
# result callback function def result_callback(result): print(f'Callback done.', flush=True) |
Similarly, we can define a task function that reports a message that it is finished.
1 2 3 |
# task executed in a worker process def task(identifier): print(f'Task {identifier} done.', flush=True) |
We can create a process pool and issue five tasks to the pool, configured with the custom callback function.
1 2 3 4 5 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), callback=result_callback) |
We can then report a message, wait for the tasks to complete, then report another message.
1 2 3 4 5 6 |
... # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) |
Finally, the pool can be closed.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of determining when the result callback is called from random import random from time import sleep from multiprocessing.pool import Pool # result callback function def result_callback(result): print(f'Callback done.', flush=True) # task executed in a worker process def task(identifier): print(f'Task {identifier} done.', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), callback=result_callback) # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool, then issues five tasks.
The main process then reports a message that the tasks are issued, then blocks until the tasks are done.
Next, each task executes and reports a message that they are done.
The callback function is then called and reports that it is done.
Finally, the main task unblocks, reports a message that all tasks are done, and closes the process pool.
This highlights that the callback is called after all issued tasks are completed, but before the caller is made aware that the tasks are done. That is, the callback is a part of the task from the perspective of the caller waiting on the task to complete.
1 2 3 4 5 6 7 8 |
Main tasks issued. Task 0 done. Task 1 done. Task 2 done. Task 3 done. Task 4 done. Callback done. Main tasks done. |
Next, let’s take a look at what happens if there is an error in a callback function.
What Happens if a Callback Raises an Exception
We can explore what happens if an error occurs in the result callback function.
In this example, we will define a result callback function that raises an exception. We will then issue the task as per normal and wait for it to complete.
Firstly, we can define the custom callback function that reports a message then raises an exception.
1 2 3 4 5 |
# result callback function def result_callback(result): print(f'Callback running.', flush=True) # failure raise Exception('Something bad happened') |
Next, we will define a target task function that simply reports a message.
1 2 3 |
# task executed in a worker process def task(identifier): print(f'Task {identifier} done.', flush=True) |
In the main process, we will create the process pool, then issue five tasks to the pool with the configured callback.
1 2 3 4 5 |
... # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), callback=result_callback) |
The main process will then wait for the tasks to complete, then close the process pool.
1 2 3 4 5 6 7 8 9 10 |
... # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of determining what happens if an exception is raised in the result callback from time import sleep from multiprocessing.pool import Pool # result callback function def result_callback(result): print(f'Callback running.', flush=True) # failure raise Exception('Something bad happened') # task executed in a worker process def task(identifier): print(f'Task {identifier} done.', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool result = pool.map_async(task, range(5), callback=result_callback) # report tasks are issued print(f'Main tasks issued.', flush=True) # wait for tasks to complete result.wait() print(f'Main tasks done.', flush=True) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates the process pool.
It then issues 5 tasks and waits for them to complete.
Each task executes, reporting a message as per normal.
The callback function is then called after all tasks are finished. A message is reported, then an exception is raised.
The exception then appears to unwind the helper thread in the process pool. This very likely breaks the process pool.
The main thread in the main process blocks forever waiting for the issued tasks to complete. The tasks never complete because the callback function never completes successfully.
This highlights that care must be taken in the callback function as an error in the callback may bring down the application.
Note, you will have to manually terminate the program, e.g. Control-C.
1 2 3 4 5 6 7 8 9 10 11 |
Main tasks issued. Task 0 done. Task 1 done. Task 2 done. Task 3 done. Task 4 done. Callback running. Exception in thread Thread-3: Traceback (most recent call last): ... Exception: Something bad happened |
Common Questions
This section lists common questions about result callbacks used with asynchronous tasks in the process pool.
Do you have any questions?
Let me know below.
What is a Callback?
A callback is a function called automatically once all asynchronous tasks issued to the process pool in a batch have completed.
When Should I Use a Callback?
You can use a callback function when issuing asynchronous tasks to the process pool.
A callback function can be used to handle the results of issued tasks, such as reporting, storing, or gathering the results of issued tasks.
Callback functions should not take a long time to execute as they will occupy resources of the process pool.
What Argument Does the Callback Receive?
The callback function will receive the return value from the task or tasks issued to the process pool.
In the case of apply_asunc(), the argument will be a single value.
In the case of map_async() and starmap_async(), the argument will be an iterable of return values.
What if the Task Function Does Not Return a Value?
A configured callback function will be called even if your target task function does not return a value.
In that case, the argument to the callback function will be the default return value of None.
Which Process and Thread Executes the Callback Function?
The callback function is executed in a helper thread of the process pool in the main process, or the process in which the process pool was created and tasks were issued.
Will The Result Callback Be Called Before The Error Callback?
No.
If at least one task raises an error, then the result callback will not be called and the error callback function will be called instead.
Can I Issue Follow-Up Tasks From the Result Callback?
Yes.
The process pool can be made accessible via a global variable and follow-up tasks may be issued directly from within the callback function.
For an example of issuing follow-up tasks automatically from the callback function, see the tutorial:
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use callback functions with the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Ludovic Gauthier on Unsplash
Do you have any questions?