Last Updated on September 12, 2022
You can adopt one of the common usage patterns to get the most out of the ProcessPoolExecutor in Python.
In this tutorial, you will discover the common usage patterns for Python process pools.
Let’s get started.
ProcessPoolExecutor Usage Patterns
The ProcessPoolExecutor provides a lot of flexibility for executing concurrent tasks in Python.
Nevertheless, there are a handful of common usage patterns that will fit most program scenarios.
This section lists the common usage patterns with worked examples that you can copy and paste into your own project and adapt as needed.
The patterns we will look at are as follows:
- Map and Wait Pattern
- Submit and Use as Completed Pattern
- Submit and Use Sequentially Pattern
- Submit and Use Callback Pattern
- Submit and Wait for All Pattern
- Submit and Wait for First Pattern
We will use a contrived task in each example that will sleep for a random amount of time equal to less than one second. You can easily replace this example task with your own task in each pattern.
Also, recall that each Python program has one thread by default called the main thread, where we do our work. We will create the process pool in the main thread in each example and may reference actions in the main thread in some of the patterns, as opposed to actions in processes in the process pool.
Let’s start with the first usage pattern, the map() and wait.
Run loops using all CPUs, download your FREE book to learn how.
Map and Wait Pattern
Perhaps the most common pattern when using the ProcessPoolExecutor is to convert a for loop that executes a function on each item in a collection to use processes.
It assumes that the function has no side effects, meaning it does not access any data outside of the function and does not change the data provided to it. It takes data and produces a result.
These types of for loops can be written explicitly in Python; for example:
1 2 3 4 |
... # apply a function to each element in a collection for item in mylist: result = task(item) |
A better practice is to use the built-in map() function that applies the function to each item in the iterable for you.
1 2 3 |
... # apply the function to each element in the collection results = map(task, mylist) |
This does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Therefore, it is common to see this operation consolidated to the following:
1 2 3 4 |
... # iterate the results from map for result in map(task, mylist): print(result) |
We can perform this same operation using the process pool, except each application of the function to an item in the list is a task that is executed asynchronously. For example:
1 2 3 4 |
... # iterate the results from map for result in executor.map(task, mylist): print(result) |
Although the tasks are executed concurrently, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.
In this way, we can think of the process pool version of map() as a concurrent version of the map() function and is ideal if you are looking to update your for loop to use processes.
The example below demonstrates using the map and wait pattern with a task that will sleep a random amount of time less than one second and return the provided value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of the map and wait pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # retrieve the result print(result) if __name__ == '__main__': main() |
Running the example, we can see that the results are reported in the order that the tasks were created and sent into the process pool.
1 2 3 4 5 6 7 8 9 10 |
0 1 2 3 4 5 6 7 8 9 |
The map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map().
For example, we can define a target function for map that takes two arguments, then provide two iterables of the same length to the call to map.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of calling map on a process pool with two iterables from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit all tasks for result in executor.map(task, ['1', '2', '3'], ['a', 'b', 'c']): print(result) if __name__ == '__main__': main() |
Running the example executes the tasks as expected, providing two arguments to map and reporting a result that combines both arguments.
1 2 3 |
('1', 'a') ('2', 'b') ('3', 'c') |
A call to the map function will issue all tasks to the process pool immediately, even if you do not iterate the returned iterable of results.
This is unlike the built-in map() function that is lazy and does not compute each call until you ask for the result during iteration.
The example below confirms this by issuing all tasks with a call to the map() function and not iterating the results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of calling map on the process pool and not iterating the results from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') return value # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit all tasks executor.map(task, range(5)) print('All done!') if __name__ == '__main__': main() |
Running the example, we can see that the tasks are sent into the process pool and executed without having to explicitly pass over the iterable of results that was returned.
The use of the context manager ensured that the process pool did not shutdown until all tasks were complete.
1 2 3 4 5 6 |
Done: 0 Done: 2 Done: 1 Done: 3 Done: 4 All done! |
Submit and Use as Completed
Perhaps the second most common pattern when using the ProcessPoolExecutor is to submit tasks and use the results as they become available.
This can be achieved using the submit() function to push tasks into the process pool that returns Future objects, then calling the module method as_completed() on the list of Future objects that will return each Future object as its task is completed.
The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing results in the order that they were completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of the submit and use as completed pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results as they are available for future in as_completed(futures): # retrieve the result print(future.result()) if __name__ == '__main__': main() |
Running the example, we can see that the results are retrieved and printed in the order that the tasks completed, not the order that the tasks were submitted to the process pool.
1 2 3 4 5 6 7 8 9 10 |
5 9 6 1 0 7 3 8 4 2 |
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Submit and Use Sequentially
We may require the results from tasks in the order that the tasks were submitted.
This may be because the tasks have a natural ordering.
We can implement this pattern by calling submit() for each task to get a list of Future objects, then iterating over the Future objects in the order that the tasks were submitted and retrieving the results.
The main difference from the “as completed” pattern is that we enumerate the list of futures directly, instead of calling the as_completed() function.
1 2 3 4 5 |
... # process task results in the order they were submitted for future in futures: # retrieve the result print(future.result()) |
The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing the results in the order that they were submitted.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of the submit and use sequentially pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results in the order they were submitted for future in futures: # retrieve the result print(future.result()) if __name__ == '__main__': main() |
Running the example, we can see that the results are retrieved and printed in the order that the tasks were submitted, not the order that the tasks were completed.
1 2 3 4 5 6 7 8 9 10 |
0 1 2 3 4 5 6 7 8 9 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Submit and Use Callback
We may not want to explicitly process the results once they are available; instead, we want to call a function on the result.
Rather than doing this manually, such as in the as completed pattern above, we can have the process pool call the function for us with the result automatically.
This can be achieved by setting a callback on each Future object by calling the add_done_callback() function and passing the name of the function.
The process pool will then call the callback function as each task completes, passing in Future objects for the task.
The example below demonstrates this pattern, registering a custom callback function to be applied to each task as it is completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of the submit and use a callback pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # custom callback function called on tasks when they complete def custom_callback(future): # retrieve the result print(future.result()) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # register the callback on all tasks for future in futures: future.add_done_callback(custom_callback) # wait for tasks to complete... if __name__ == '__main__': main() |
Running the example, we can see that results are retrieved and printed in the order they are completed, not the order that tasks were submitted.
1 2 3 4 5 6 7 8 9 10 |
8 0 7 1 4 6 5 3 2 9 |
We can register multiple callbacks on each Future object; it is not limited to a single callback.
The callback functions are called in the order in which they were registered on each Future object.
The following example demonstrates having two callbacks on each Future object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of the submit and use multiple callbacks for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # custom callback function called on tasks when they complete def custom_callback1(future): # retrieve the result print(f'Callback 1: {future.result()}') # custom callback function called on tasks when they complete def custom_callback2(future): # retrieve the result print(f'Callback 2: {future.result()}') # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # register the callbacks on all tasks for future in futures: future.add_done_callback(custom_callback1) future.add_done_callback(custom_callback2) # wait for tasks to complete... if __name__ == '__main__': main() |
Running the example, we can see that results are reported in the order that tasks were completed and that the two callback functions are called for each task in the order that we registered them with each Future object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Callback 1: 3 Callback 2: 3 Callback 1: 9 Callback 2: 9 Callback 1: 7 Callback 2: 7 Callback 1: 2 Callback 2: 2 Callback 1: 0 Callback 2: 0 Callback 1: 5 Callback 2: 5 Callback 1: 1 Callback 2: 1 Callback 1: 8 Callback 2: 8 Callback 1: 4 Callback 2: 4 Callback 1: 6 Callback 2: 6 |
Submit and Wait For All
It is common to submit all tasks and then wait for all tasks in the process pool to complete.
This pattern may be useful when tasks do not return a result directly, such as if each task stores the result in a resource directly like a file.
There are two ways that we can wait for tasks to complete: by calling the wait() module function or by calling shutdown() on the process pool.
The most likely case is you want to explicitly wait for a set or subset of tasks in the process pool to complete.
You can achieve this by passing the list of Future objects to the wait() function, which by default will wait for all tasks to complete.
1 2 3 |
... # wait for all tasks to complete wait(futures) |
We can explicitly specify to wait for all tasks by setting the “return_when” argument to the ALL_COMPLETED constant; for example:
1 2 3 |
... # wait for all tasks to complete wait(futures, return_when=ALL_COMPLETED) |
The example below demonstrates this pattern. Note that we are intentionally ignoring the return from calling wait() as we have no need to inspect it in this case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of the submit and wait for all pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete wait(futures) print('All tasks are done!') if __name__ == '__main__': main() |
Running the example, we can see that results are handled by each task as the tasks complete. Importantly, we can see that the main process waits until all tasks are completed before carrying on and printing a message.
1 2 3 4 5 6 7 8 9 10 11 |
3 9 0 8 4 6 2 1 5 7 All tasks are done! |
An alternative approach would be to shut down the process pool and wait for all executing and queued tasks to complete before moving on.
This might be preferred when we don’t have a list of Future objects or when we only intend to use the process pool once for a set of tasks.
We can implement this pattern using the context manager; for example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of the submit and wait for all with shutdown pattern for the process pool from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete print('All tasks are done!') if __name__ == '__main__': main() |
Running the example, we can see that the main process does not move on and print the message until all tasks are completed, after the process pool has been automatically shut down by the context manager.
1 2 3 4 5 6 7 8 9 10 11 |
1 2 8 4 5 3 9 0 7 6 All tasks are done! |
The context manager automatic shutdown pattern might be confusing to developers not used to how process pools work, hence the comment at the end of the context manager block in the previous example.
We can achieve the same effect without the context manager and an explicit call to the shutdown() function.
1 2 3 |
... # wait for all tasks to complete and close the pool executor.shutdown() |
Recall that the shutdown() function will wait for all tasks to complete by default and will not cancel any queued tasks, but we can make this explicit by setting the “wait” argument to True and the “cancel_futures” argument to False; for example:
1 2 3 |
... # wait for all tasks to complete and close the pool executor.shutdown(wait=True, cancel_futures=False) |
The example below demonstrates the pattern of waiting for all tasks in the process pool to complete by calling shutdown() before moving on.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of the submit and wait for all with shutdown pattern for the process pool from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point def main(): # start the process pool executor = ProcessPoolExecutor() # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete executor.shutdown() print('All tasks are done!') if __name__ == '__main__': main() |
Running the example, we can see that all tasks report their result as they complete and that the main process does not move on until all tasks have completed and the process pool has been shutdown.
1 2 3 4 5 6 7 8 9 10 11 |
3 5 2 6 8 9 7 1 4 0 All tasks are done! |
Submit and Wait for First
It is common to issue many tasks and only be concerned with the first result returned.
That is, not the result of the first task, but a result from any task that happens to be the first to complete its execution.
This may be the case if you are trying to access the same resource from multiple locations, like a file or some data.
This pattern can be achieved using the wait() module function and setting the “return_when” argument to the FIRST_COMPLETED constant.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
We must also manage the process pool manually by constructing it and calling shutdown() manually so that we can continue on with the execution of the main process without waiting for all of the other tasks to complete.
The example below demonstrates this pattern and will stop waiting as soon as the first task is completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of the submit and wait for first the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool executor = ProcessPoolExecutor() # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # get the result from the first task to complete print(done.pop().result()) # shutdown without waiting executor.shutdown(wait=False, cancel_futures=True) if __name__ == '__main__': main() |
Running the example will wait for any of the tasks to complete, then retrieve the result of the first completed task and shut down the process pool.
Importantly, the tasks will continue to execute in the process pool in the background and the main process will not close until all tasks have completed.
1 |
9 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
APIs
References
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
Takeaways
You now know the common usage patterns for the ProcessPoolExecutor in Python.
Do you have any questions?
Ask your question in the comments below and I will do my best to answer.
Photo by Emiel Molenaar on Unsplash
Do you have any questions?