Last Updated on September 12, 2022
You can submit a follow-up task to a ProcessPoolExecutor by calling the submit() function.
In this tutorial you will discover how to submit follow-up tasks to a process pool in Python.
Let’s get started.
Need to Submit Follow-up Tasks
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You can also submit tasks by calling the map() function and specify the name of the function to execute and the iterable of items to which your function will be applied.
Some tasks require that a second task be executed that makes use of the result from the first task in some way.
We might call this the need to execute a follow-up task for each task that is submitted, which might be conditional on the result in some way.
How can you submit a follow-up task to a ProcessPoolExecutor?
Run loops using all CPUs, download your FREE book to learn how.
How to Submit Follow-Up Tasks
You can submit a follow-up task to a process pool in Python by calling submit() on the process pool while processing the results of your first-round tasks.
The need for a subsequent or follow-up task may be determined within another task.
You may be tempted to submit a child or follow-up task directly from the first round task. This would require that the task needs access to the process pool to be able to submit a new task. Although possible, this would break encapsulation, requiring a component (task) to have knowledge of the executor of tasks (process pool).
Generally, interacting with the process pool directly from target task functions can lead to deadlocks, especially if you start waiting on results from tasks within tasks.
An alternative approach is to return values from tasks that indicate whether a follow-up task is required, and if so, any details required for the follow-up.
This allows the process that is processing the outcomes from tasks to interpret the results of each task and trigger a follow-up task as needed.
As such, follow-up tasks are submitted to the ProcessPoolExecutor just like first-round tasks, specifically by calling the submit() function.
This could be achieved when processing the results from calling map(), for example:
1 2 3 4 5 6 7 8 |
... # process the results from first-round tasks for result in executor.map(work, items): # check if the task requires a follow-up by calling a custom function if requires_followup(result): # submit a follow-up task future = executor.submit(work) # ... |
This could also be achieved when processing Future objects directly returned from calling submit(), for example:
1 2 3 4 5 6 7 8 9 10 |
... # process the results from first-round tasks for future in as_completed(futures): # get the result result = future.result() # check if the task requires a follow-up by calling a custom function if requires_followup(result): # submit a follow-up task future2 = executor.submit(work) # ... |
Now that we know how to submit a follow-up task to a ProcessPoolExecutor, let’s look at a worked example.
Example of Submitting Follow-Up Tasks
Let’s develop an example of submitting a follow-up task to a ProcessPoolExecutor.
First, we need to define a mock task that sleeps for a moment and returns a value.
Our task will generate a random value between 0 and 1, sleep for that fraction of a second, then return the value for interpretation to see if a subsequent task is required.
1 2 3 4 5 6 |
# test that works for moment def task1(): value = random() sleep(value) print(f'Task 1: {value}') return value |
Next, we can define a follow-up task to be called under some condition based on the result of the first round task.
This task takes the value returned from the first round task, generates a second random value and sleeps for that fraction of a second before reporting both values and returning the newly generated value.
1 2 3 4 5 6 |
# test that works for moment def task2(value1): value2 = random() sleep(value2) print(f'Task 2: value1={value1}, value2={value2}') return value2 |
Next, we can start a process pool with five worker processes using the context manager, then call submit() with ten round one tasks calls to task1().
1 2 3 4 5 |
... # start the process pool with ProcessPoolExecutor(5) as executor: # send in the first tasks futures1 = [executor.submit(task1) for _ in range(10)] |
Recall that each call to submit() returns a Future object.
We store all Future objects in a list so that we can get the results later.
We will use the as_complete() module function to process the tasks, returning each Future as its task is completed.
1 2 3 4 |
... # process results in the order they are completed for future1 in as_completed(futures1): # ... |
For each Future, we will retrieve the result and check the return value from the task.
If the value is above 0.5 (recall that task1() will return random values between 0 and 1), then we will submit a second task.
This is achieved by calling submit() with the name of the second task function which is task2() and passing in the value returned from the first task.
1 2 3 4 5 6 |
... # get the result result = future1.result() # check if we should trigger a follow-up task if result > 0.5: _ = executor.submit(task2, result) |
We can then let the context manager close the process pool automatically for us, which will wait for all second round tasks in the process pool to complete.
Tying this together, the complete example of submitting follow-up tasks to the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of submitting follow-up tasks to the process pool from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # test that works for moment def task1(): value = random() sleep(value) print(f'Task 1: {value}') return value # test that works for moment def task2(value1): value2 = random() sleep(value2) print(f'Task 2: value1={value1}, value2={value2}') return value2 # entry point if __name__ == '__main__': # start the process pool with ProcessPoolExecutor(5) as executor: # send in the first tasks futures1 = [executor.submit(task1) for _ in range(10)] # process results in the order they are completed for future1 in as_completed(futures1): # get the result result = future1.result() # check if we should trigger a follow-up task if result > 0.5: _ = executor.submit(task2, result) # wait for all follow-up tasks to complete |
Running the example creates the process pool and submits ten calls to task1() to the process pool.
We then process results as they become available and submit follow-up calls to task2().
IN this case we can see that five of the ten calls to task1() resulted in a required follow-up call to task2() to be submitted into the process pool.
Your results may vary given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Task 1: 0.6815888998456426 Task 1: 0.4990513169281877 Task 1: 0.7644185045411436 Task 2: value1=0.6815888998456426, value2=0.06649497959119754 Task 2: value1=0.8046358214353102, value2=0.6284180100537056 Task 1: 0.043775434252316114 Task 1: 0.8223726377063809 Task 2: value1=0.8223726377063809, value2=0.8621622158328371 Task 1: 0.3324119487420051 Task 1: 0.39657906247854857 Task 1: 0.011057417763156496 Task 1: 0.9961686025166375 Task 1: 0.8046358214353102 Task 2: value1=0.7644185045411436, value2=0.3128083257978407 Task 2: value1=0.9961686025166375, value2=0.5579914795664156 |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to submit follow-up tasks to a ProcessPoolExecutor.
Do you have any questions about how to submit follow-up tasks?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jacek Dylag on Unsplash
Do you have any questions?