Last Updated on September 12, 2022
You can issue follow-up tasks with the multiprocessing pool either manually by checking the results of tasks, or automatically using a result callback function.
In this tutorial you will discover how to execute follow-up tasks to the process pool in Python.
Let’s get started.
Need to Issue Follow-Up Tasks
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to execute follow-up tasks. This may be for many reasons, such as based on the results of tasks already issued to the process pool.
How can we execute follow-up tasks in the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Issue Follow-Up Tasks to the Process Pool
We can execute follow-up tasks in the process pool.
There are two main approaches we can use, they are:
- Manually issue follow-up tasks.
- Automatically issue follow-up tasks via a callback.
We will also consider an approach that does not work:
- Issue a task from a task already running in the process pool.
Let’s take a closer look at each approach of executing a follow-up task.
Manually Execute a Follow-Up Task
We can manually issue a follow-up task based on the results of a first-round task.
For example, we may issue a task asynchronously using the apply_async() function that returns a ResultAsync.
1 2 3 |
... # issue a task result = pool.apply_async(...) |
We can then get the result of the issued task, once available, and conditionally issue a follow-up task.
For example:
1 2 3 4 5 |
... # check the result of an issued task if result.get() > 1.0: # issue a follow-up task pool.apply_async(...) |
Automatically Execute a Follow-Up Task with a Callback
We can automatically issue follow-up tasks to the process pool.
This can be achieved by configuring issued tasks to have a result callback function.
The callback function is a custom function that takes the result of the function call used to issue tasks, e.g. a single return value or an iterator of return values if multiple tasks are issued.
The callback function is executed in the main thread of the main process.
If the process pool is created and used within the main process, then it may be available as a global variable to the result callback function.
As such, we can directly issue follow-up tasks from the callback function.
For example:
1 2 3 4 5 6 |
# result callback function def result_callback(result): # check the result of an issued task if result.get() > 1.0: # issue a follow-up task pool.apply_async(...) |
The callback function can be specified when issuing tasks in the main process via the “callback” argument.
For example:
1 2 3 |
... # issue a task with a result callback result = pool.apply_async(..., callback=result_callback) |
Issue a Follow-Up Task From A Running Task
We might be tempted to try and issue a task to the process pool from a task already running in the process pool.
This approach does not work.
It requires that we share the process pool with the task, which fails with an error because the process pool cannot be serialized.
For example:
1 |
NotImplementedError: pool objects cannot be passed between processes or pickled |
Instead, we must signal to another process to issue a follow-up task on our behalf.
This can easily be achieved using return values to the main process, as in the above two approaches.
Now that we know how to issue follow-up tasks to the process pool, let’s look at some worked examples.
Example of Manually Issuing Follow-Up Tasks to the Process Pool
We can explore how to manually issue follow-up tasks to the process pool.
In this example, we will define a task that takes an integer argument, generates a random number, blocks for a moment, reports the value, then returns the integer argument and generated value. The main process will issue many of these tasks asynchronously then check the result. If the result is above a threshold, a follow-up task will be issued that will do much the same as the first task, it will generate a random number, block for a moment and report the values.
Firstly, we can define the function to execute for the follow-up or second-round task.
The function takes an integer identifier for the task and a floating point value generated by the first round task.
The function then generates a random number between 0 and 1 and then blocks for this many seconds to simulate computational effort. It then reports the value and returns a tuple of the two arguments as well as the generated value.
The task2() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task2(identifier, result): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>>{identifier} with {result}, generated {value}', flush=True) # return result return (identifier, result, value) |
Next, we can define the function to execute first, the first-round task.
This function takes an integer identifier for the task. It then generates a random value between 0 and 1 and blocks for a fraction of a second. It then reports the value and returns a tuple of the integer argument and the generated value.
The task1() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task1(identifier): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>{identifier} generated {value}', flush=True) # return result return (identifier, value) |
We then define a process pool with the default number of worker processes. In this case we use the context manager interface to ensure the process pool closes automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool() as pool: # ... |
You can learn more about the context manager interface in the tutorial:
We can then issue 10 calls to task1() asynchronously with integer values from 0 to 9. We will issue the tasks using the imap_unordered() function as it will return results as tasks are completed out of order.
We will iterate the results of task1() directly in a for-loop.
1 2 3 4 |
... # issue tasks to the process pool and process results for i,v in pool.imap_unordered(task1, range(10)): # ... |
You can learn more about the imap_unordered() function in the tutorial:
For each result from the first-round task, we will check if the generated value is above 0.5, and if so, we will issue a single follow-up task using the apply_async() function to call task2().
1 2 3 |
... # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) |
You can learn more about the apply_async() function in the tutorial:
Once the results from all first-round tasks have been processed and any follow-up tasks issued, we will close the process pool to prevent any further tasks from being issued, then join the pool to wait for all issued tasks to complete.
1 2 3 4 5 |
... # close the pool pool.close() # wait for all issued tasks to complete pool.join() |
You can learn more about joining the process pool in the tutorial:
Finally, an “all done” message is reported once all worker processes are closed.
1 2 3 |
... # all done print('All done.') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# SuperFastPython.com # example of manually issuing a follow-up tasks to the process pool from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task2(identifier, result): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>>{identifier} with {result}, generated {value}', flush=True) # return result return (identifier, result, value) # task executed in a worker process def task1(identifier): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>{identifier} generated {value}', flush=True) # return result return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks to the process pool and process results for i,v in pool.imap_unordered(task1, range(10)): # check result if v > 0.5: # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) # close the pool pool.close() # wait for all issued tasks to complete pool.join() # all done print('All done.') |
Running the example first creates and starts the process pool with a default configuration.
Next, 10 calls to task1() are issued as tasks to the process pool and the results of the tasks are processed in the order that the tasks are completed.
Each first-round task generates a random number, blocks, reports a message and returns a tuple.
The main process receives results from first-round tasks as they complete and checks if the generated number was above 0.5. If so, it issues a follow-up task to call task2() asynchronously.
The second-round tasks generate another random number, report the value, and return a tuple. The return results from this task are not considered.
Once the results from all first-round tasks are processed, the main process closes the pool and waits for all remaining tasks to complete, e.g. any remaining task2() tasks.
All tasks complete, the main process unblocks and continues on, terminating the application.
Note, the results will differ each time the program is run given the use of random numbers. Try running the example a few times.
This highlights how we can manually issue follow-up tasks to the process pool from the main process.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
>3 generated 0.2605878019636232 >6 generated 0.27359550984496794 >0 generated 0.40999955327713244 >4 generated 0.4519214058440285 >1 generated 0.5499098580952393 >2 generated 0.6852001923395571 >8 generated 0.4681425515972236 >7 generated 0.9082498918840236 >>1 with 0.5499098580952393, generated 0.4148749468523081 >5 generated 0.9607853615700284 >9 generated 0.8586973247929102 >>2 with 0.6852001923395571, generated 0.4512218980620065 >>5 with 0.9607853615700284, generated 0.6103151596080635 >>7 with 0.9082498918840236, generated 0.8080954535903131 >>9 with 0.8586973247929102, generated 0.6552476786576102 |
Next, let’s look at how we might issue follow-up tasks automatically using a callback function.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Issuing Follow-Up Tasks From a Callback
We can explore how to issue follow-up tasks automatically using a callback function.
This can be achieved by updating the previous example to define a callback function that processes the results from first-round tasks and issues follow-tasks to the process pool. The main process is only responsible for issuing the first round tasks.
This makes the code much cleaner to read and manage.
Importantly, this is only possible because the callback function is executed in the main process, which means it has access to the process pool directly as a global variable.
Firstly, we can define a callback function. This function is called only once all first-round tasks are completed and is passed an iterable of all results from the first round tasks.
This is a downside of using a callback. Specifically, that it is not called until all tasks are completed, compared to the previous approach of issuing follow-up tasks manually as first-round tasks are completed.
The callback function iterates over the return values from each first-round task, checks the floating point value, and issues a follow-up task call to task2() as needed.
The result_callback() function below implements this.
1 2 3 4 5 6 7 8 |
# handle results of the task (in the main process) def result_callback(result_iterator): # unpack result for i,v in result_iterator: # check result if v > 0.5: # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) |
Next, in the main process we can issue all 10 first-round tasks asynchronously using the map_async() function and specify the result callback function.
This returns an ResultAsync object.
1 2 3 |
... # issue tasks asynchronously to the process pool result = pool.map_async(task1, range(10), callback=result_callback) |
You can learn more about the map_async() function in the tutorial:
We can then wait for all first-round tasks to complete by waiting on the ResultAsync object.
1 2 3 |
... # wait for issued tasks to complete result.wait() |
After we know that all first-round tasks are completed and any follow-up tasks have been issued, we can then close the pool and wait for all second-round tasks to complete.
1 2 3 4 5 |
... # close the pool pool.close() # wait for all issued tasks to complete pool.join() |
If we did not wait for the first-round tasks to complete first and closed the pool immediately, then we would not be able to issue any follow-up tasks, and an error would be raised.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# SuperFastPython.com # example of issuing a follow-up task automatically with a result callback from random import random from time import sleep from multiprocessing.pool import Pool # handle results of the task (in the main process) def result_callback(result_iterator): # unpack result for i,v in result_iterator: # check result if v > 0.5: # issue a follow-up task _ = pool.apply_async(task2, args=(i, v)) # task executed in a worker process def task2(identifier, result): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>>{identifier} with {result}, generated {value}', flush=True) # return result return (identifier, result, value) # task executed in a worker process def task1(identifier): # generate a random number value = random() # block for a moment sleep(value) # report result print(f'>{identifier} generated {value}', flush=True) # return result return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # issue tasks asynchronously to the process pool result = pool.map_async(task1, range(10), callback=result_callback) # wait for issued tasks to complete result.wait() # close the pool pool.close() # wait for all issued tasks to complete pool.join() # all done print('All done.') |
Running the example first creates and starts the process pool with a default configuration.
Next, 10 calls to task1() are issued as tasks to the process pool and an AsyncResult is returned. The main process then blocks waiting for all first-round tasks to complete.
Each first-round task generates a random number, blocks, reports a message and returns a tuple.
Once all first round tasks finish, the result callback is then called with an iterable of the return values from the first round tasks. The iterable is traversed and any second round tasks are issued to the process pool.
The main process carries on, closing the pool and blocking until all second round tasks complete.
The second-round tasks generate another random number, report the value, and return a tuple. The return results from this task are not considered.
All second-round tasks complete, the main process unblocks and continues on, terminating the application.
Note, the results will differ each time the program is run given the use of random numbers. Try running the example a few times.
This highlights how we can automatically issue follow-up tasks to the process pool from the main process.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
>3 generated 0.026053470123407307 >5 generated 0.10861107021623373 >1 generated 0.283776825109049 >2 generated 0.3812563597749301 >0 generated 0.47683270636655106 >4 generated 0.5622089438427635 >6 generated 0.62617584793209 >7 generated 0.7745782124217487 >8 generated 0.9132687159744634 >9 generated 0.9283828682129988 >>7 with 0.7745782124217487, generated 0.07060930439614588 >>6 with 0.62617584793209, generated 0.155765787897853 >>4 with 0.5622089438427635, generated 0.7745894701213474 >>8 with 0.9132687159744634, generated 0.8280977095949135 >>9 with 0.9283828682129988, generated 0.9937548669689297 All done. |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to execute follow-up tasks to the process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?