Last Updated on November 23, 2023
The Python ProcessPoolExecutor provides reusable worker processes in Python.
The ProcessPoolExecutor class is part of the Python standard library. It offers easy-to-use pools of child worker processes via the modern executor design pattern. It is ideal for parallelizing loops of CPU-bound tasks and for issuing tasks asynchronously.
This book-length guide provides a detailed and comprehensive walkthrough of the Python ProcessPoolExecutor API.
Some tips:
- You may want to bookmark this guide and read it over a few sittings.
- You can download a zip of all code used in this guide.
- You can get help, ask a question in the comments or email me.
- You can jump to the topics that interest you via the table of contents (below).
Let’s dive in.
Python Processes and the Need for Process Pools
So, what are processes and why do we care about process pools?
What Are Python Processes
A process refers to a computer program.
Every Python program is a process and has one thread called the main thread used to execute your program instructions. Each process is, in fact, one instance of the Python interpreter that executes Python instructions (Python byte-code), which is a slightly lower level than the code you type into your Python program.
Sometimes we may need to create new processes to run additional tasks concurrently.
Python provides real system-level processes via the Process class in the multiprocessing module.
You can learn more about multiprocessing in the tutorial:
The underlying operating system controls how new processes are created. On some systems, that may require spawning a new process, and on others, it may require that the process is forked. The operating-specific method used for creating new processes in Python is not something we need to worry about as it is managed by your installed Python interpreter.
A task can be run in a new process by creating an instance of the Process class and specifying the function to run in the new process via the “target” argument.
1 2 3 |
... # define a task to run in a new process p = Process(target=task) |
Once the process is created, it must be started by calling the start() function.
1 2 3 |
... # start the task in a new process p.start() |
We can then wait around for the task to complete by joining the process; for example:
1 2 3 |
... # wait for the task to complete p.join() |
Whenever we create new processes, we must protect the entry point of the program.
1 2 3 |
# entry point for the program if __name__ == '__main__': # do things... |
Tying this together, the complete example of creating a Process to run an ad hoc task function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of running a function in a new process from multiprocessing import Process # a task to execute in another process def task(): print('This is another process', flush=True) # entry point for the program if __name__ == '__main__': # define a task to run in a new process p = Process(target=task) # start the task in a new process p.start() # wait for the task to complete p.join() |
This is useful for running one-off ad hoc tasks in a separate process, although it becomes cumbersome when you have many tasks to run.
Each process that is created requires the application of resources (e.g. an instance of the Python interpreter and a memory for the process’s main thread’s stack space). The computational costs for setting up processes can become expensive if we are creating and destroying many processes over and over for ad hoc tasks.
Instead, we would prefer to keep worker processes around for reuse if we expect to run many ad hoc tasks throughout our program.
This can be achieved using a process pool.
What Is a Process Pool
A process pool is a programming pattern for automatically managing a pool of worker processes.
The pool is responsible for a fixed number of processes.
- It controls when they are created, such as when they are needed.
- It also controls what they should do when they are not being used, such as making them wait without consuming computational resources.
The pool can provide a generic interface for executing ad hoc tasks with a variable number of arguments, much like the target property on the Process object, but does not require that we choose a process to run the task, start the process, or wait for the task to complete.
Python provides a process pool via the ProcessPoolExecutor class.
Run loops using all CPUs, download your FREE book to learn how.
ProcessPoolExecutor for Process Pools in Python
The ProcessPoolExecutor Python class is used to create and manage process pools and is provided in the concurrent.futures module.
The concurrent.futures module was introduced in Python 3.2 written by Brian Quinlan and provides both thread pools and process pools, although we will focus our attention on process pools in this guide.
If you’re interested, you can access the Python source code for the ProcessPoolExecutor class directly via process.py. It may be interesting to dig into how the class works internally, perhaps after you are familiar with how it works from the outside.
The ProcessPoolExecutor extends the Executor class and will return Future objects when it is called.
- Executor: Parent class for the ProcessPoolExecutor that defines basic life-cycle operations for the pool.
- Future: Object returned when submitting tasks to the process pool that may complete later.
Let’s take a closer look at Executors, Futures, and the life-cycle of using the ProcessPoolExecutor class.
What Are Executors
The ProcessPoolExecutor class extends the abstract Executor class.
The Executor class defines three methods used to control our process pool; they are: submit(), map(), and shutdown().
- submit(): Dispatch a function to be executed and return a future object.
- map(): Apply a function to an iterable of elements.
- shutdown(): Shut down the executor.
The Executor is started when the class is created and must be shut down explicitly by calling shutdown(), which will release any resources held by the Executor. We can also shut down automatically, but we will look at that a little later.
The submit() and map() functions are used to submit tasks to the Executor for asynchronous execution.
The map() function operates just like the built-in map() function and is used to apply a function to each element in an iterable object, like a list. Unlike the built-in map() function, each application of the function to an element will happen asynchronously instead of sequentially.
The submit() function takes a function as well as any arguments and will execute it asynchronously, although the call returns immediately and provides a Future object.
We will take a closer look at each of these three functions in a moment. Firstly, what is a Future?
What Are Futures
A future is an object that represents a delayed result for an asynchronous task.
It is also sometimes called a promise or a delay. It provides a context for the result of a task that may or may not be executing and a way of getting a result once it is available.
In Python, the Future object is returned from an Executor, such as a ProcessPoolExecutor, when calling the submit() function to dispatch a task to be executed asynchronously.
In general, we do not create Future objects; we only receive them and we may need to call functions on them.
There is always one Future object for each task sent into the ProcessPoolExecutor via a call to submit().
The Future object provides a number of helpful functions for inspecting the status of the task such as: cancelled(), running(), and done() to determine if the task was cancelled, is currently running, or has finished execution.
- cancelled(): Returns True if the task was cancelled before being executed.
- running(): Returns True if the task is currently running.
- done(): Returns True if the task has completed or was cancelled.
A running task cannot be cancelled and a done task could have been cancelled.
A Future object also provides access to the result of the task via the result() function. If an exception was raised while executing the task, it will be re-raised when calling the result() function, or can be accessed via the exception() function.
- result(): Access the result from running the task.
- exception(): Access any exception raised while running the task.
Both the result() and exception() functions allow a timeout to be specified as an argument, which is the number of seconds to wait for a return value if the task is not yet complete. If the timeout expires, then a TimeoutError will be raised.
Finally, we may want to have the process pool automatically call a function once the task is completed.
This can be achieved by attaching a callback to the Future object for the task via the add_done_callback() function.
- add_done_callback(): Add a callback function to the task to be executed by the process pool once the task is completed.
We can add more than one callback to each task, and they will be executed in the order they were added. If the task has already completed before we add the callback, then the callback is executed immediately.
Any exceptions raised in the callback function will not impact the task or process pool.
We will take a closer look at the Future object in a later section.
Now that we are familiar with the functionality of a ProcessPoolExecutor provided by the Executor class and of Future objects returned by calling submit(), let’s take a closer look at the life-cycle of the ProcessPoolExecutor class.
LifeCycle of the ProcessPoolExecutor
The ProcessPoolExecutor provides a pool of generic worker processes.
The ProcessPoolExecutor was designed to be easy and straightforward to use.
If multiprocessing was like the transmission for changing gears in a car, then using multiprocessing.Process is a manual transmission (e.g. hard to learn and and use) whereas concurrency.futures.ProcessPoolExecutor is an automatic transmission (e.g. easy to learn and use).
- multiprocessing.Process: Manual multiprocessing in Python.
- concurrency.futures.ProcessPoolExecutor: Automatic or “just work” mode for multiprocessing in Python.
There are four main steps in the life-cycle of using the ProcessPoolExecutor class; they are: create, submit, wait, and shut down.
- Step 1. Create: Create the process pool by calling the constructor ProcessPoolExecutor().
- Step 2. Submit: Submit tasks and get futures by calling submit() or map().
- Step 3. Wait: Wait and get results as tasks complete (optional).
- Step 4. Shut down: Shut down the process pool by calling shutdown().
The following figure helps to picture the life-cycle of the ProcessPoolExecutor class.
Let’s take a closer look at each life-cycle step in turn.
Step 1. Create the Process Pool
First, a ProcessPoolExecutor instance must be created.
When an instance of a ProcessPoolExecutor is created, it must be configured with the fixed number of processes in the pool, method used for creating new processes (e.g. spawn or fork), and the name of a function to call when initializing each process along with any arguments for the function.
The pool is created with one process for each CPU in your system. This is good for most purposes.
- Default Total Processes = (Total CPUs)
For example, if you have 4 CPUs, each with hyperthreading (most modern CPUs have this), then Python will see 8 CPUs and will allocate 6 processes to the pool by default.
1 2 3 |
... # create a process pool with the default number of worker processes executor = ProcessPoolExecutor() |
It is a good idea to test your application in order to determine the number of processes that results in the best performance.
For example, for some computationally intensive tasks, you may achieve the best performance by setting the number of processes to be equal to the number of physical CPU cores (before hyperthreading), instead of the logical number of CPU cores (after hyperthreading).
We’ll discuss tuning the number of processes for your pool more later on.
You can specify the number of process to create in the pool via the max_workers argument; for example:
1 2 3 |
... # create a process pool with 4 workers executor = ProcessPoolExecutor(max_workers=4) |
Recall, whenever we use processes, we must protect the entry point of the program.
This can be achieved using an if-statement; for example:
1 2 3 4 5 |
... # entry point of the program if __name__ == '__main__': # create a process pool with 4 workers executor = ProcessPoolExecutor(max_workers=4) |
Step 2. Submit Tasks to the Process Pool
Once the process pool has been created, you can submit tasks for asynchronous execution.
As discussed, there are two main approaches for submitting tasks defined on the Executor parent class. They are: map() and submit().
Step 2a. Submit Tasks With map()
The map() function is an asynchronous version of the built-in map() function for applying a function to each element in an iterable, like a list.
You can call the map() function on the pool and pass it the name of your function and the iterable.
You are most likely to use map() when converting a for loop to run using one process per loop iteration.
1 2 3 |
... # perform all tasks in parallel results = executor.map(my_task, my_items) # does not block |
Where “my_task” is your function that you wish to execute and “my_items” is your iterable of objects, each to be executed by your “my_task” function.
The tasks will be queued up in the process pool and executed by worker processes in the pool as they become available.
The map() function will return an iterable immediately. This iterable can be used to access the results from the target task function as they are available in the order that the tasks were submitted (e.g. order of the iterable you provided).
1 2 3 4 |
... # iterate over results as they become available for result in executor.map(my_task, my_items): print(result) |
You can also set a timeout when calling map() via the “timeout” argument in seconds if you wish to impose a limit on how long you’re willing to wait for each task to complete as you’re iterating, after which a TimeOut error will be raised.
1 2 3 4 5 |
... # iterate over results as they become available for result in executor.map(my_task, my_items, timeout=5): # wait for task to complete or timeout expires print(result) |
Step 2b. Submit Tasks With submit()
The submit() function submits one task to the process pool for execution.
The function takes the name of the function to call and all arguments to the function, then returns a Future object immediately.
The Future object is a promise to return the results from the task (if any) and provides a way to determine if a specific task has been completed or not.
1 2 3 |
... # submit a task with arguments and get a future object future = executor.submit(my_task, arg1, arg2) # does not block |
Where “my_task” is the function you wish to execute and “arg1” and “arg2” are the first and second arguments to pass to the “my_task” function.
You can use the submit() function to submit tasks that do not take any arguments; for example:
1 2 3 |
... # submit a task with no arguments and get a future object future = executor.submit(my_task) # does not block |
You can access the result of the task via the result() function on the returned Future object. This call will block until the task is completed.
1 2 3 |
... # get the result from a future result = future.result() # blocks |
You can also set a timeout when calling result() via the “timeout” argument in seconds if you wish to impose a limit on how long you’re willing to wait for each task to complete, after which a TimeOut error will be raised.
1 2 3 |
... # wait for task to complete or timeout expires result = future.result(timeout=5) # blocks |
Step 3. Wait for Tasks to Complete (Optional)
The concurrent.futures module provides two module utility functions for waiting for tasks via their Future objects.
Recall that Future objects are only created when we call submit() to push tasks into the process pool.
These wait functions are optional to use, as you can wait for results directly after calling map() or submit() or wait for all tasks in the process pool to finish.
These two module functions are wait() for waiting for Future objects to complete and as_completed() for getting Future objects as their tasks complete.
- wait(): Wait on one or more Future objects until they are completed.
- as_completed(): Returns Future objects from a collection as they complete their execution.
You can use both functions with Future objects created by one or more process pools; they are not specific to any given process pool in your application. This is helpful if you want to perform waiting operations across multiple process pools that are executing different types of tasks.
Both functions are useful to use with an idiom of dispatching multiple tasks into the process pool via submit in a list compression; for example:
1 2 3 |
... # dispatch tasks into the process pool and create a list of futures futures = [executor.submit(my_task, my_data) for my_data in my_datalist] |
Here, my_task is our custom target task function, “my_data” is one element of data passed as an argument to “my_task“, and “my_datalist” is our source of my_data objects.
We can then pass the “futures” to wait() or as_completed().
Creating a list of futures in this way is not required, just a common pattern when converting for loops into tasks submitted to a process pool.
Step 3a. Wait for Futures to Complete
The wait() function can take one or more futures and will return when a specified action occurs, such as all tasks completing, one task completing, or one task throwing an exception.
The function will return one set of future objects that match the condition set via the “return_when“. The second set will contain all of the futures for tasks that did not meet the condition. These are called the “done” and the “not_done” sets of futures.
It is useful for waiting on a large batch of work and to stop waiting when we get the first result.
This can be achieved via the FIRST_COMPLETED constant passed to the “return_when” argument.
1 2 3 |
... # wait until we get the first result done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
Alternatively, we can wait for all tasks to complete via the ALL_COMPLETED constant.
This can be helpful if you are using submit() to dispatch tasks and are looking for an easy way to wait for all work to be completed.
1 2 3 |
... # wait for all tasks to complete done, not_done = wait(futures, return_when=ALL_COMPLETED) |
There is also an option to wait for the first exception via the FIRST_EXCEPTION constant.
1 2 3 |
... # wait for the first exception done, not_done = wait(futures, return_when=FIRST_EXCEPTION) |
Step 3b. Wait for Futures as Completed
The beauty of performing tasks concurrently is that we can get results as they become available, rather than waiting for all tasks to be completed.
The as_completed() function will return Future objects for tasks as they are completed in the process pool.
We can call the function and provide it a list of Future objects created by calling submit() and it will return Future objects as they are completed in whatever order.
It is common to use the as_completed() function in a loop over the list of Futures created when calling submit(); for example:
1 2 3 4 5 |
... # iterate over all submitted tasks and get results as they are available for future in as_completed(futures): # get the result for the next completed task result = future.result() # blocks |
Note: this is different from iterating over the results from calling map() in two ways.
Firstly, map() returns an iterator over objects, not over Futures.
Secondly, map() returns results in the order that the tasks were submitted, not in the order that they are completed.
Step 4. Shut Down the Process Pool
Once all tasks are completed, we can close down the process pool, which will release each process and any resources it may hold.
For example, each process is running an instance of the Python interpreter and at least one thread (the main thread) that has its own stack space.
1 2 3 |
... # shutdown the process pool executor.shutdown() # blocks |
The shutdown() function will wait for all tasks in the process pool to complete before returning by default.
This behavior can be changed by setting the “wait” argument to False when calling shutdown(), in which case the function will return immediately. The resources used by the process pool will not be released until all current and queued tasks are completed.
1 2 3 |
... # shutdown the process pool executor.shutdown(wait=False) # does not blocks |
We can also instruct the pool to cancel all queued tasks to prevent their execution. This can be achieved by setting the “cancel_futures” argument to True. By default, queued tasks are not cancelled when calling shutdown().
1 2 3 |
... # cancel all queued tasks executor.shutdown(cancel_futures=True) # blocks |
If we forget to close the process pool, the process pool will be closed automatically when we exit the main thread. If we forget to close the pool and there are still tasks executing, the main process will not exit until all tasks in the pool and all queued tasks have executed.
ProcessPoolExecutor Context Manager
A preferred way to work with the ProcessPoolExecutor class is to use a context manager.
This matches the preferred way to work with other resources, such as files and sockets.
Using the ProcessPoolExecutor with a context manager involves using the “with” keyword to create a block in which you can use the process pool to execute tasks and get results.
Once the block has completed, the process pool is automatically shut down. Internally, the context manager will call the shutdown() function with the default arguments, waiting for all queued and executing tasks to complete before returning and carrying on.
Below is a code snippet to demonstrate creating a process pool using the context manager.
1 2 3 4 5 6 7 |
... # create a process pool with ProcessPoolExecutor(max_workers=10) as pool: # submit tasks and get results # ... # automatically shutdown the process pool... # the pool is shutdown at this point |
This is a very handy idiom if you are converting a for loop to be executed asynchronously.
It is less useful if you want the process pool to operate in the background while you perform other work in the main thread of your program, or if you wish to reuse the process pool multiple times throughout your program.
Now that we are familiar with how to use the ProcessoolExecutor, let’s look at some worked examples.
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
ProcessPoolExecutor Example
In this section, we will look at a more complete example of using the ProcessPoolExecutor.
Consider a situation where we might want to check if a word is known to the program or not, e.g. whether it is in a dictionary of known words.
If the word is known, that is fine, but if not, we might want to take action for the user, perhaps underline it in read like an automatic spell check.
One approach to implementing this feature would be to load a dictionary of known words and create a hash of each word. We can then hash new words and check if they exist in the set of known hashed words or not.
This is a good problem to explore with the ProcessPoolExecutor as hashing words can be relatively slow, especially for large dictionaries of hundreds of thousands or millions of known words.
First, let’s develop a serial (non-concurrent) version of the program.
Hash a Dictionary of Words Serially
The first step is to select a dictionary of words to use.
On Unix systems, like MacOS and Linux, we have a dictionary already installed, called Unix Words.
It is located in one of the following locations:
- /usr/share/dict/words
- /usr/dict/words
On my system, it is located in ‘/usr/share/dict/words‘ and contains 235,886 words calculated using the command:
1 |
cat /usr/share/dict/words | wc -l |
We can use this dictionary of words.
Alternatively, if we are on Windows or wish to have a larger dictionary, we can download one of many free lists of words online.
For example, you can download a list of one million English words from here:
Download this file and save it to your current working directory with the filename “1m_words.txt“.
Looking in the file, we can see that indeed we have a long list of words, one per line.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
aaccf aalders aaren aarika aaron aartjan aasen ab abacus abadines abagael abagail abahri abasolo abazari ... |
First, we need to load the list of words into memory.
This can be achieved by first opening the file, then calling the readlines() function that will automatically read ASCII lines of text into a list.
The load_words() function below takes a path to the text file and returns a list of words loaded from the file.
1 2 3 4 5 6 |
# load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() |
Next, we need to hash each word.
We will intentionally select a slow hash function in this example, specifically the SHA512 algorithm.
This is available in Python via the hashlib.ha512() function.
First, we can create an instance of the hashing object by calling the sha512() function.
1 2 3 |
... # create the hash object hash_object = sha512() |
Next, we can convert a given word to bytes and then hash it using the hash function.
1 2 3 4 5 |
... # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) |
Finally, we can get a HEX string representation of the hash for the word by calling the hexdigest() function.
1 2 3 |
... # get the hex hash of the word h = hash_object.hexdigest() |
Tying this together, the hash_word() function below takes a word and returns a HEX hash code of the word.
1 2 3 4 5 6 7 8 9 10 |
# hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() |
That’s about all there is to it.
We can define a function that will drive the program, first loading the list of words by calling our load_words(), then creating a set of hashes of known words by calling our hash_word() for each loaded word.
The main() function below implements this.
1 2 3 4 5 6 7 8 9 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # hash all known words known_words = {hash_word(word) for word in words} print(f'Done, with {len(known_words)} hashes') |
Tying this all together, the complete example of loading a dictionary of words and creating a set of known word hashes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of hashing a word list serially from hashlib import sha512 # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # hash all known words known_words = {hash_word(word) for word in words} print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example first loads the file and reports that a total of 1,049,938 words were loaded.
The list of words is then hashed and the hashes are stored in a set.
The program reports that a total of 979,250 hashes were stored, suggesting thousands of duplicates in the dictionary.
The program takes about 1.4 seconds to run on a modern system.
How long does the example take to run on your system?
Let me know in the comments below.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Next, we can update the program to hash the words concurrently.
Hash a Dictionary of Words Concurrently With map()
Hashing words is relatively slow, but even so, hashing nearly one million words takes under two seconds.
Nevertheless, we can accelerate the process by making use of all CPUs in the system and hashing the words concurrently.
This can be achieved using the ProcessPoolExecutor.
Firstly, we can create the process pool and specify the number of concurrent processes to run. I recommend configuring the pool to match the number of physical CPU cores in your system.
I have four cores, so the example will use four cores, but update it for the number of cores you have available.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(4) as executor: # ... |
Next, we need to submit the tasks to the process pool, that is, the hashing of each word.
Because the task is simply applying a function for each item in a list, we can use the map() function directly.
For example:
1 2 3 |
... # create a set of word hashes known_words = set(executor.map(hash_word, words)) |
And that’s it.
For example, the updated version of the main() function to hash words concurrently is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with ProcessPoolExecutor(4) as executor: # create a set of word hashes known_words = set(executor.map(hash_word, words)) print(f'Done, with {len(known_words)} hashes') |
Well, not so fast.
This would execute, but it would take a very long time to complete.
The reason is that we would be adding nearly one million tasks to the pool to be executed by four processes, and each task would need to be pickled and queued internally. Repeating these operations so many times results in an overhead that far surpasses the execution time of the task.
We must reduce the overhead by reducing the number of internal tasks within the process pool.
This can be achieved by setting the “chunksize” parameter when calling map().
This controls how many items in the iterable map to one task in the process pool. By default, one item is mapped to one task, meaning we have nearly one million tasks.
Perhaps a good first approach would be to split the number items by the number of processes available, in this case four. This would create four tasks, e.g. four large chunks of words, each to be processed by one process, likely on one CPU core.
This can be achieved by calculating the length of the list of words and dividing it by the number of worker processes. The division might not be clean, therefore we can use the math.ceil() math function to round the number of items per task up to the nearest integer.
1 2 3 |
... # select a chunk size chunksize = ceil(len(words) / 4) |
We can estimate that this would be (1049938 / 4) or about 262484.5 words per task, e.g. just over half a million.
We can then use this chunksize when calling the map() function.
1 2 3 |
... # create a set of word hashes known_words = set(executor.map(hash_word, words, chunksize=chunksize)) |
Tying this together, the complete example of hashing a dictionary of words concurrently using the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of hashing a word list concurrently from math import ceil from hashlib import sha512 from concurrent.futures import ProcessPoolExecutor # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path) as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with ProcessPoolExecutor(4) as executor: # select a chunk size chunksize = ceil(len(words) / 4) # create a set of word hashes known_words = set(executor.map(hash_word, words, chunksize=chunksize)) print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example loads the words as before, then creates the set of hashed words concurrently by splitting it into four tasks, one for each process in the pool.
This concurrent version does offer a very minor speedup, taking about 1.2 seconds on my system, offering a small speedup.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Next, let’s see if we can get a further improvement by tuning the chunksize argument.
Testing chunksize Values When Hashing a Dictionary of Words With map()
Splitting items into tasks for the process pool is more art than science.
Getting it wrong, like setting it to one when we have a large number of tasks, can result in much worse performance than the serial case. Setting it naively can result in equivalent or slightly better performance than the serial case.
As such, we can tune the performance of the application by testing different values of the “chunksize” argument.
In the previous section, we saw that a chunksize of 262485 resulted in similar performance to the serial case.
I recommend testing different chunk sizes in order to discover what works well on your specific system; for example, some numbers you could try include:
- 100,000
- 50,000
- 10,000
- 5,000
- 1,000
- 500
It is common to perform this type of tuning when working with distributed systems and multi-process systems as the specific cost of serializing and transmitting data between workers depends on the hardware and specific data.
If the tasks involved were long running or sensitive in some way, you could design a test harness with mock tasks.
We can define a function to test a given chunksize argument that also calculates how long the task takes to complete, including the fixed cost of setting up the process pool.
The test_chunksize() function below implements this, taking the loaded dictionary of words and chunksize to test, and reports how long it took to execute the task for the given chunksize.
1 2 3 4 5 6 7 8 9 10 |
# test a chunksize def test_chunksize(words, size): time1 = time() # create the process pool with ProcessPoolExecutor(4) as executor: # create a set of word hashes _ = set(executor.map(hash_word, words, chunksize=size)) time2 = time() total = time2 - time1 print(f'{size}: {total:.3f} seconds') |
We can call this function from our main() function with a list of different chunk size values to test; for example:
1 2 3 4 5 6 7 8 9 10 11 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # test chunk sizes base = ceil(len(words) / 4) sizes = [base, 100000, 50000, 10000, 5000, 1000, 500] for size in sizes: test_chunksize(words, size) |
Tying this together, the complete example of testing different chunksize values is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of testing chunksize when hashing a word list concurrently from math import ceil from time import time from hashlib import sha512 from concurrent.futures import ProcessPoolExecutor # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() # test a chunksize def test_chunksize(words, size): time1 = time() # create the process pool with ProcessPoolExecutor(4) as executor: # create a set of word hashes _ = set(executor.map(hash_word, words, chunksize=size)) time2 = time() total = time2 - time1 print(f'{size}: {total:.3f} seconds') # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # test chunk sizes base = ceil(len(words) / 4) sizes = [base, 100000, 50000, 10000, 5000, 1000, 500] for size in sizes: test_chunksize(words, size) if __name__ == '__main__': main() |
Running the example, we can see that a chunksize of about 10,000 or 5,000 would work well, performing the task in about 0.8 seconds as opposed to about 1.4 in the serial case and 1.2 for the naive configuration of chunksize, at least on my system.
This highlights the importance of tuning the chunksize for your specific task and computer hardware.
1 2 3 4 5 6 7 8 |
Loaded 1049938 words from 1m_words.txt 262485: 1.242 seconds 100000: 1.122 seconds 50000: 1.157 seconds 10000: 0.871 seconds 5000: 0.842 seconds 1000: 1.036 seconds 500: 1.112 seconds |
What worked well on your system?
Let me know in the comments below.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
ProcessPoolExecutor Usage Patterns
The ProcessPoolExecutor provides a lot of flexibility for executing concurrent tasks in Python.
Nevertheless, there are a handful of common usage patterns that will fit most program scenarios.
This section lists the common usage patterns with worked examples that you can copy-and-paste into your own project and adapt as needed.
The patterns we will look at are as follows:
- Map and Wait Pattern
- Submit and Use as Completed Pattern
- Submit and Use Sequentially Pattern
- Submit and Use Callback Pattern
- Submit and Wait for All Pattern
- Submit and Wait for First Pattern
We will use a contrived task in each example that will sleep for a random amount of time less than one second. You can easily replace this example task with your own task in each pattern.
Also, recall that each Python program is a process and has one thread by default called the main thread where we do our work. We will create the process pool in the main thread in each example and may reference actions in the main thread in some of the patterns, as opposed to actions in processes in the process pool.
Map and Wait Pattern
Perhaps the most common pattern when using the ProcessPoolExecutor is to convert a for loop that executes a function on each item in a collection to use multiprocessing.
It assumes that the function has no side effects, meaning it does not access any data outside of the function and does not change the data provided to it. It takes data and produces a result.
These types of for loops can be written explicitly in Python; for example:
1 2 3 4 |
... # apply a function to each element in a collection for item in mylist: result = task(item) |
A better practice is to use the built-in map() function that applies the function to each item in the iterable for you.
1 2 3 |
... # apply the function to each element in the collection results = map(task, mylist) |
This does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Therefore, it is common to see this operation consolidated to the following:
1 2 3 4 |
... # iterate the results from map for result in map(task, mylist): print(result) |
We can perform this same operation using the process pool, except each application of the function to an item in the list is a task that is executed asynchronously. For example:
1 2 3 4 |
... # iterate the results from map for result in executor.map(task, mylist): print(result) |
Although the tasks are executed asynchronously, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.
In this way, we can think of the process pool version of map() as a concurrent version of the map() function and is ideal if you are looking to update your for loop to use processes.
The example below demonstrates using the map and wait pattern with a task that will sleep a random amount of time less than one second and return the provided value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of the map and wait pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # retrieve the result print(result) if __name__ == '__main__': main() |
Running the example, we can see that the results are reported in the order that the tasks were created and sent into the process pool.
1 2 3 4 5 6 7 8 9 10 |
0 1 2 3 4 5 6 7 8 9 |
The map() function supports target functions that take more than one argument by providing more than iterable as arguments to the call to map().
For example, we can define a target function for map that takes two arguments, then provide two iterables of the same length to the call to map.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of calling map on a process pool with two iterables from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit all tasks for result in executor.map(task, ['1', '2', '3'], ['a', 'b', 'c']): print(result) if __name__ == '__main__': main() |
Running the example executes the tasks as expected, providing two arguments to map and reporting a result that combines both arguments.
1 2 3 |
('1', 'a') ('2', 'b') ('3', 'c') |
A call to the map function will issue all tasks to the process pool immediately, even if you do not iterate the iterable of results.
This is unlike the built-in map() function that is lazy and does not compute each call until you ask for the result during iteration.
The example below confirms this by issuing all tasks with a map and not iterating the results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of calling map on the process pool and not iterating the results from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') return value # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit all tasks executor.map(task, range(5)) print('All done!') if __name__ == '__main__': main() |
Running the example, we can see that the tasks are sent into the process pool and executed without having to explicitly pass over the iterable of results that was returned.
The use of the context manager ensured that the process pool did not shut down until all tasks were complete.
1 2 3 4 5 6 |
Done: 0 Done: 2 Done: 1 Done: 3 Done: 4 All done! |
Submit and Use as Completed
Perhaps the second most common pattern when using the ProcessPoolExecutor is to submit tasks and use the results as they become available.
This can be achieved using the submit() function to push tasks into the process pool that returns Future objects, then calling the module method as_completed() on the list of Future objects that will return each Future object as it’s task is completed.
The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing results in the order that they were completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of the submit and use as completed pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results as they are available for future in as_completed(futures): # retrieve the result print(future.result()) if __name__ == '__main__': main() |
Running the example, we can see that the results are retrieved and printed in the order that the tasks completed, not the order that the tasks were submitted to the process pool.
1 2 3 4 5 6 7 8 9 10 |
9 6 7 0 3 8 1 4 5 2 |
Submit and Use Sequentially
We may require the results from tasks in the order that the tasks were submitted.
This may be because the tasks have a natural ordering.
We can implement this pattern by calling submit() for each task to get a list of Future objects then iterating over the Future objects in the order that the tasks were submitted and retrieving the results.
The main difference from the “as completed” pattern is that we enumerate the list of futures directly, instead of calling the as_completed() function.
1 2 3 4 5 |
... # process task results in the order they were submitted for future in futures: # retrieve the result print(future.result()) |
The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing the results in the order that they were submitted.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of the submit and use sequentially pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results in the order they were submitted for future in futures: # retrieve the result print(future.result()) if __name__ == '__main__': main() |
Running the example, we can see that the results are retrieved and printed in the order that the tasks were submitted, not the order that the tasks were completed.
1 2 3 4 5 6 7 8 9 10 |
0 1 2 3 4 5 6 7 8 9 |
Submit and Use Callback
We may not want to explicitly process the results once they are available; instead, we want to call a function on the result.
Instead of doing this manually, such as in the as completed pattern above, we can have the process pool call the function for us with the result automatically.
This can be achieved by setting a callback on each future object by calling the add_done_callback() function and passing the name of the function.
The process pool will then call the callback function as each task completes, passing in Future objects for the task.
The example below demonstrates this pattern, registering a custom callback function to be applied to each task as it is completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of the submit and use a callback pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # custom callback function called on tasks when they complete def custom_callback(future): # retrieve the result print(future.result()) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # register the callback on all tasks for future in futures: future.add_done_callback(custom_callback) # wait for tasks to complete... if __name__ == '__main__': main() |
Running the example, we can see that results are retrieved and printed in the order they are completed, not the order that tasks were completed.
1 2 3 4 5 6 7 8 9 10 |
8 0 7 1 4 6 5 3 2 9 |
We can register multiple callbacks on each Future object; it is not limited to a single callback.
The callback functions are called in the order in which they were registered on each Future object.
The following example demonstrates having two callbacks on each Future.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of the submit and use multiple callbacks for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # custom callback function called on tasks when they complete def custom_callback1(future): # retrieve the result print(f'Callback 1: {future.result()}') # custom callback function called on tasks when they complete def custom_callback2(future): # retrieve the result print(f'Callback 2: {future.result()}') # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # register the callbacks on all tasks for future in futures: future.add_done_callback(custom_callback1) future.add_done_callback(custom_callback2) # wait for tasks to complete... if __name__ == '__main__': main() |
Running the example, we can see that results are reported in the order that tasks were completed and that the two callback functions are called for each task in the order that we registered them with each Future object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Callback 1: 2 Callback 2: 2 Callback 1: 3 Callback 2: 3 Callback 1: 6 Callback 2: 6 Callback 1: 8 Callback 2: 8 Callback 1: 7 Callback 2: 7 Callback 1: 5 Callback 2: 5 Callback 1: 0 Callback 2: 0 Callback 1: 1 Callback 2: 1 Callback 1: 4 Callback 2: 4 Callback 1: 9 Callback 2: 9 |
Submit and Wait for All
It is common to submit all tasks and then wait for all tasks in the process pool to complete.
This pattern may be useful when tasks do not return a result directly, such as if each task stores the result in a resource directly like a file.
There are two ways that we can wait for tasks to complete: by calling the wait() module function or by calling shutdown().
The most likely case is you want to explicitly wait for a set or subset of tasks in the process pool to complete.
You can achieve this by passing the list of tasks to the wait() function, which by default will wait for all tasks to complete.
1 2 3 |
... # wait for all tasks to complete wait(futures) |
We can explicitly specify to wait for all tasks by setting the “return_when” argument to the ALL_COMPLETED constant; for example:
1 2 3 |
... # wait for all tasks to complete wait(futures, return_when=ALL_COMPLETED) |
The example below demonstrates this pattern. Note that we are intentionally ignoring the return from calling wait() as we have no need to inspect it in this case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of the submit and wait for all pattern for the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete wait(futures) print('All tasks are done!') if __name__ == '__main__': main() |
Running the example, we can see that results are handled by each task as the tasks complete. Importantly, we can see that the main process waits until all tasks are completed before carrying on and printing a message.
1 2 3 4 5 6 7 8 9 10 11 |
3 9 0 8 4 6 2 1 5 7 All tasks are done! |
An alternative approach would be to shut down the process pool and wait for all executing and queued tasks to complete before moving on.
This might be preferred when we don’t have a list of Future objects or when we only intend to use the process pool once for a set of tasks.
We can implement this pattern using the context manager; for example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of the submit and wait for all with shutdown pattern for the process pool from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point def main(): # start the process pool with ProcessPoolExecutor() as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete print('All tasks are done!') if __name__ == '__main__': main() |
Running the example, we can see that the main process does not move on and print the message until all tasks are completed, after the process pool has been automatically shut down by the context manager.
1 2 3 4 5 6 7 8 9 10 11 |
1 2 8 4 5 3 9 0 7 6 All tasks are done! |
The context manager automatic shutdown pattern might be confusing to developers not used to how process pools work, hence the comment at the end of the context manager block in the previous example.
We can achieve the same effect without the context manager and an explicit call to shutdown.
1 2 3 |
... # wait for all tasks to complete and close the pool executor.shutdown() |
Recall that the shutdown() function will wait for all tasks to complete by default and will not cancel any queued tasks, but we can make this explicit by setting the “wait” argument to True and the “cancel_futures” argument to False; for example:
1 2 3 |
... # wait for all tasks to complete and close the pool executor.shutdown(wait=True, cancel_futures=False) |
The example below demonstrates the pattern of waiting for all tasks in the process pool to complete by calling shutdown() before moving on.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of the submit and wait for all with shutdown pattern for the process pool from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # entry point def main(): # start the process pool executor = ProcessPoolExecutor() # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete executor.shutdown() print('All tasks are done!') if __name__ == '__main__': main() |
Running the example, we can see that all tasks report their result as they complete and that the main thread does not move on until all tasks have completed and the process pool has been shut down.
1 2 3 4 5 6 7 8 9 10 11 |
3 5 2 6 8 9 7 1 4 0 All tasks are done! |
Submit and Wait for First
It is common to issue many tasks and only be concerned with the first result returned.
That is, not the result of the first task, but a result from any task that happens to be the first to complete its execution.
This may be the case if you are trying to access the same resource from multiple locations, like a file or some data.
This pattern can be achieved using the wait() module function and setting the “return_when” argument to the FIRST_COMPLETED constant.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
We must also manage the process pool manually by constructing it and calling shutdown() manually so that we can continue on with the execution of the main process without waiting for all of the other tasks to complete.
The example below demonstrates this pattern and will stop waiting as soon as the first task is completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # example of the submit and wait for first the ProcessPoolExecutor from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # entry point def main(): # start the process pool executor = ProcessPoolExecutor() # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # get the result from the first task to complete print(done.pop().result()) # shutdown without waiting executor.shutdown(wait=False, cancel_futures=True) if __name__ == '__main__': main() |
Running the example will wait for any of the tasks to complete, then retrieve the result of the first completed task and shut down the process pool.
Importantly, the tasks will continue to execute in the process pool in the background and the main thread will not close until all tasks have completed.
1 |
2 |
Now that we have seen some common usage patterns for the ProcessPoolExecutor, let’s look at how we might customize the configuration of the process pool.
How to Configure ProcessPoolExecutor
We can customize the configuration of the process pool when constructing a ProcessPoolExecutor instance.
There are three aspects of the process pool we may wish to customize for our application; they are the number of workers, the names of processes in the pool, and the initialization of each process in the pool.
Let’s take a closer look at each in turn.
Configure the Number of Processes
The number of processes in the process pool can be configured by the “max_workers” argument.
It takes a positive integer and defaults to the number of CPUs in your system.
- Total Number Worker Processes = (CPUs in Your System)
For example, if you had 2 physical CPUs in your system and each CPU has hyperthreading (common in modern CPUs), then you would have 2 physical and 4 logical CPUs. Python would see 4 CPUs. The default number of worker processes on your system would then be 4.
The number of workers must be less than or equal to 61 if Windows is your operating system.
It is common to have more processes than CPUs (physical or logical) in your system, if the target task function is performing IO operations.
The reason for this is that IO-bound tasks spend most of their time waiting rather than using the CPU. Examples include reading or writing from hard drives, DVD drives, printers, and network connections, and much more. We will discuss the best application of processes in a later section.
If you require hundreds or processes for IO-bound tasks, you might want to consider using threads instead and the ThreadPoolExecutor. If you require thousands of processes for IO-bound tasks, you might want to consider using the AsyncIO module.
First, let’s check how many processes are created for process pools on your system.
Looking at the source code for the ProcessPoolExecutor, we can see that the number of worker processes chosen by default is stored in the _max_workers property, which we can access and report after a process pool is created.
Note: “_max_workers” is a protected member and may change in the future.
The example below reports the number of default processes in a process pool on your system.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# SuperFastPython.com # report the default number of worker processes on your system from concurrent.futures import ProcessPoolExecutor # entry point def main(): # create a process pool with the default number of worker processes pool = ProcessPoolExecutor() # report the number of worker processes chosen by default print(pool._max_workers) if __name__ == '__main__': main() |
Running the example reports the number of worker processes used by default on your system.
I have four physical CPU cores, eight logical cores; therefore, the default is 8 processes.
1 |
8 |
How many worker processes are allocated by default on your system?
Let me know in the comments below.
We can specify the number of worker processes directly, and this is a good idea in most applications.
The example below demonstrates how to configure 60 worker processes.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# SuperFastPython.com # configure and report the default number of worker processes from concurrent.futures import ProcessPoolExecutor # entry point def main(): # create a process pool with a large number of worker processes pool = ProcessPoolExecutor(60) # report the number of worker processes print(pool._max_workers) if __name__ == '__main__': main() |
Running the example configures the process pool to use 60 processes and confirms that it will create 60 processes.
1 |
60 |
How Many Processes Should You Use?
This is a tough question and depends on the specifics of your program.
Perhaps if you have fewer than 100 IO-bound tasks (or 60 on Windows), then you might want to set the number of worker processes to the number of tasks.
If you are working with IO-bound tasks, then you might want to cap the number of workers that number of logical CPUs in your system, e.g. the default for the ProcessPoolExecutor.
If your application is intended to be executed multiple times in the future, you can test different numbers of processes and compare overall execution time, then choose a number of processes that gives approximately the best performance. You may want to mock the task in these tests with a random sleep or compute operation.
Configure MultiProcess Context
Different operating systems provide different ways to create new processes.
Some operating systems support multiple ways to create processes.
Perhaps the two most common ways to create new processes are spawn and fork.
- spawn: Creates a new instance of the Python interpreter as a process. Available on Windows, Unix, and MacOS.
- fork: Creates a fork of an existing Python interpreter process. Available on Unix.
- forkserver: Creates a server Python interpreter process to be used to create all all forked processes for the life of the program. Available on Unix.
Your Python installation will select the most appropriate method for creating a new process for your operating system. Nevertheless, you can specify how new processes are created and this is called a “process context.”
You can get a new process context for a specific method for creating new processes (e.g. fork or spawn) and pass this context to the ProcessPoolExecutor. This will allow all new processes created by the process pool to be created using the provided context and use your preferred method for starting processes.
Firstly, let’s see what process start methods are supported by your operating system and discover the default method that you are using.
We can call the get_all_start_methods() function to get a list of all supported methods and the get_start_method() function to get the currently configured (default) process start method.
The program below will report the process start methods and default start method on your system.
1 2 3 4 5 6 7 8 9 |
# SuperFastPython.com from multiprocessing import get_all_start_methods from multiprocessing import get_start_method # list of all process start methods supported on the os result = get_all_start_methods() print(result) # get the default process start method result = get_start_method() print(result) |
Running the example first reports all of the process start methods supported by your system.
Next, the default process start method is supported.
In this case, running the program on MacOS, we can see that the operating system supports all three process start methods and the default is the “spawn” method.
1 2 |
['spawn', 'fork', 'forkserver'] spawn |
Next, we can check the default context used to start processes in the process pool.
The ProcessPoolExecutor will use the default context unless it is configured to use a different context.
We can check the start process context used by the ProcessPoolExecutor via the “_mp_context” protected property.
The example below creates a process pool and reports the default context used by the process pool
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# SuperFastPython.com # example of checking the process start context from concurrent.futures import ProcessPoolExecutor # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # report the context used print(executor._mp_context) if __name__ == '__main__': main() |
Running the example creates a process pool and reports the default start process context used by the pool.
In this case, we can see that it is the ‘spawn‘ context, denoted by the “SpawnContext” object.
1 |
<multiprocessing.context.SpawnContext object at 0x1034fd4c0> |
Next, we can create a context and pass it to the process pool.
The ProcessPoolExecutor takes an argument named “mp_context” that defines the context used for creating processes in the pool.
By default, it is set to None, in which case the default context is used.
We can set the context by first calling the get_context() function and specifying the preferred method as a string that matches a string returned from calling the get_all_start_methods() function, e.g. ‘fork‘ or ‘spawn‘.
Perhaps we wanted to force all processes to be created using the ‘fork‘ method, regardless of the default.
Note: using ‘fork‘ will not work on windows. You might want to change it to use ‘spawn‘ or report the error message you see in the comments below.
First, we would create a context, then pass this context to the process pool. We can then access and report the context manager used by the process pool; for example.
1 2 3 4 5 6 7 |
... # create a start process context context = get_context('fork') # create a process pool with ProcessPoolExecutor(mp_context=context) as executor: # report the context used print(executor._mp_context) |
Tying this together, the complete example of setting the context manager for the process pool and then confirming it was changed is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# SuperFastPython.com # example of setting the process start context from multiprocessing import get_context from concurrent.futures import ProcessPoolExecutor # entry point def main(): # create a start process context context = get_context('fork') # create a process pool with ProcessPoolExecutor(mp_context=context) as executor: # report the context used print(executor._mp_context) if __name__ == '__main__': main() |
Running the example first creates a new start process context, then passes it to the new ProcessPoolExecutor.
After the pool is created, the context manager used by the pool is reported, which in this case is ‘fork‘ denoted by the ‘ForkContext‘ object.
Configure the Process Initializer
Worker processes can call a function before they start executing tasks.
This is called an initializer function and can be specified via the “initializer” argument when creating a process pool. If the initializer function takes arguments, they can be passed in via the “initargs” argument to the process pool which is a tuple of arguments to pass to the initializer function.
By default, there is no initializer function.
We might choose to set an initializer function for worker processes if we would like each process to set up resources specific to the process.
Examples might include a process-specific log file or a process-specific connection to a remote resource like a server or database. The resource would then be available to all tasks executed by the process, rather than being created and discarded or opened and closed for each task.
These process-specific resources can then be stored somewhere where the worker process can reference, like a global variable, or in a process-local variable. Care must be taken to correctly close these resources once you are finished with the process pool.
The example below will create a process pool with two workers and use a custom initialization function. In this case, the function does nothing other than print a message. We then complete ten tasks with the process pool.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of a custom worker process initialization function from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # function for initializing the worker processes def initializer_worker(): # report an initialization message print(f'Initializing worker process.', flush=True) # a mock task that sleeps for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier # entry point def main(): # create a process pool with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # execute asks for result in executor.map(task, range(10)): print(result) if __name__ == '__main__': main() |
Running the example, we can see that the two processes are initialized before running any tasks, then all ten tasks are completed successfully.
1 2 3 4 5 6 7 8 9 10 11 12 |
Initializing worker process. Initializing worker process. 0 1 2 3 4 5 6 7 8 9 |
Now that we are familiar with how to configure the process pools, let’s learn more about how to check and manipulate tasks via Future objects.
How to Use Future Objects in Detail
Future objects are created when we call submit() to send tasks into the ProcessPoolExecutor to be executed asynchronously.
Future objects provide the capability to check the status of a task (e.g. is it running?) and to control the execution of the task (e.g. cancel).
In this section, we will look at some examples of checking and manipulating Future objects created by our process pool.
Specifically, we will look at the following:
- How to Check the Status of Futures
- How to Get Results From Futures
- How to Cancel Futures
- How to Add a Callback to Futures
- How to Get Exceptions from Futures
First, let’s take a closer look at the life-cycle of a Future object.
Life-Cycle of a Future Object
A Future object is created when we call submit() for a task on a ProcessPoolExecutor.
While the task is executing, the Future object has the status “running“.
When the task completes, it has the status “done” and if the target function returns a value, it can be retrieved.
Before a task is running, it will be inserted into a queue of tasks for a worker process to take and start running. In this “pre-running” state, the task can be cancelled and has the “cancelled” state. A task in the “running” state cannot be cancelled.
A “cancelled” task is always also in the “done” state.
While a task is running, it can raise an uncaught exception, causing the execution of the task to stop. The exception will be stored and can be retrieved directly or will be re-raised if the result is attempted to be retrieved.
The figure below summarizes the life-cycle of a Future object.
Now that we are familiar with the life-cycle of a Future object, let’s look at how we might use check and manipulate it.
How to Check the Status of Futures
There are two types of normal status of a Future object that we might want to check: running and done.
Each has its own function that returns a True if the Future object is in that state or False otherwise; for example:
- running(): Returns True if the task is currently running.
- done(): Returns True if the task has completed or was cancelled.
We can develop simple examples to demonstrate how to check the status of a Future object.
In this example, we can start a task and then check that it’s running and not done, wait for it to complete, then check that it is done and not running.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# SuperFastPython.com # check the status of a Future object for task executed by a process pool from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # mock task that will sleep for a moment def work(): sleep(0.5) # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # start one process future = executor.submit(work) # confirm that the task is running running = future.running() done = future.done() print(f'Future running={running}, done={done}') # wait for the task to complete wait([future]) # confirm that the task is done running = future.running() done = future.done() print(f'Future running={running}, done={done}') if __name__ == '__main__': main() |
Running the example, we can see that immediately after the task is submitted that it is marked as running, and that after the task is completed, we can confirm that it is done.
1 2 |
Future running=True, done=False Future running=False, done=True |
How to Get Results From Futures
When a task is completed, we can retrieve the result from the task by calling the result() function on the Future.
This returns the result from the return function of the task we executed or None if the function did not return a value.
The function will block until the task completes and a result can be retrieved. If the task has already been completed, it will return a result immediately.
The example below demonstrates how to retrieve a result from a Future object
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # get the result from a completed future task from time import sleep from concurrent.futures import ProcessPoolExecutor # mock task that will sleep for a moment def work(): sleep(1) return "all done" # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # start one process future = executor.submit(work) # get the result from the task, wait for task to complete result = future.result() print(f'Got Result: {result}') if __name__ == '__main__': main() |
Running the example submits the task, then attempts to retrieve the result, blocking until the result is available then reports the result that was received.
1 |
Got Result: all done |
We can also set a timeout for how long we wish to wait for a result in seconds.
If the timeout elapses before we get a result, a TimeoutError is raised.
The example below demonstrates the timeout, showing how to give up waiting before the task has completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # set a timeout when getting results from a future from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import TimeoutError # mock task that will sleep for a moment def work(): sleep(1) return "all done" # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # start one process future = executor.submit(work) # get the result from the task, wait for task to complete try: result = future.result(timeout=0.5) print(f'Got Result: {result}') except TimeoutError: print('Gave up waiting for a result') if __name__ == '__main__': main() |
Running the example shows that we gave up waiting for a result after half a second.
1 |
Gave up waiting for a result |
How to Cancel Futures
We can also cancel a task that has not yet started running.
Recall that when we put tasks into the pool with submit() or map() that the tasks are added to an internal queue of work from which worker processes can remove the tasks and execute them.
While a task is on the queue and before it has been started, we can cancel it by calling cancel() on the Future object associated with the task. The cancel() function will return True if the task was cancelled, False otherwise.
Let’s demonstrate this with a worked example.
We can create a process pool with one process then start a long running task, then submit a second task, request that it is cancelled, then confirm that it was indeed cancelled.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of cancelling a task via it's future from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # mock task that will sleep for a moment def work(sleep_time): sleep(sleep_time) # entry point def main(): # create a process pool with ProcessPoolExecutor(1) as executor: # start a long running task future1 = executor.submit(work, 2) running = future1.running() print(f'First task running={running}') # start a second future2 = executor.submit(work, 0.1) running = future2.running() print(f'Second task running={running}') # cancel the second task was_cancelled = future2.cancel() print(f'Second task was cancelled: {was_cancelled}') # wait for the second task to finish, just in case wait([future2]) # confirm it was cancelled running = future2.running() cancelled = future2.cancelled() done = future2.done() print(f'Second task running={running}, cancelled={cancelled}, done={done}') # wait for the long running task to finish wait([future1]) if __name__ == '__main__': main() |
Running the example, we can see that the first task is started and is running normally.
The second task is scheduled and is not yet running because the process pool is occupied with the first task. We then cancel the second task and confirm that it is indeed not running; it was cancelled and is done.
1 2 3 4 |
First task running=True Second task running=False Second task was cancelled: True Second task running=False, cancelled=True, done=True |
Cancel a Running Future
Next, let’s try to cancel a task that has already completed running.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of trying to cancel a running task via its future from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # mock task that will sleep for a moment def work(sleep_time): sleep(sleep_time) # entry point def main(): # create a process pool with ProcessPoolExecutor(1) as executor: # start a long running task future = executor.submit(work, 2) running = future.running() print(f'Task running={running}') # try to cancel the task was_cancelled = future.cancel() print(f'Task was cancelled: {was_cancelled}') # wait for the task to finish wait([future]) # check if it was cancelled running = future.running() cancelled = future.cancelled() done = future.done() print(f'Task running={running}, cancelled={cancelled}, done={done}') if __name__ == '__main__': main() |
Running the example, we can see that the task was started as per normal.
We then tried to cancel the task, but this was not successful, as we expected since the task was already running.
We then wait for the task to complete and then check it’s status. We can see that the task is no longer running and was not cancelled, as we expect, and it was marked as done.
1 2 3 |
Task running=True Task was cancelled: False Task running=False, cancelled=False, done=True |
Cancel a Done Future
Consider what would happen if we tried to cancel a task that was already done.
We might expect that canceling a task that is already done has no effect, and this happens to be the case.
This can be demonstrated with a short example.
We start and run a task as per normal, then wait for it to complete and report its status. We then attempt to cancel the task.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of trying to cancel a done task via its future from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # mock task that will sleep for a moment def work(sleep_time): sleep(sleep_time) # entry point def main(): # create a process pool with ProcessPoolExecutor(1) as executor: # start a long running task future = executor.submit(work, 2) running = future.running() # wait for the task to finish wait([future]) # check the status running = future.running() cancelled = future.cancelled() done = future.done() print(f'Task running={running}, cancelled={cancelled}, done={done}') # try to cancel the task was_cancelled = future.cancel() print(f'Task was cancelled: {was_cancelled}') # check if it was cancelled running = future.running() cancelled = future.cancelled() done = future.done() print(f'Task running={running}, cancelled={cancelled}, done={done}') if __name__ == '__main__': main() |
Running the example confirms that the task runs and is marked done, as per normal.
The attempt to cancel the task fails and checking the status after the attempt to cancel, confirms that the task was not impacted by the attempt.
1 2 3 |
Task running=False, cancelled=False, done=True Task was cancelled: False Task running=False, cancelled=False, done=True |
How to Add a Callback to Futures
We have already seen above how to add a callback to a Future; nevertheless, let’s look at some more examples for completeness, including some edge cases.
We can register one or more callback functions on a Future object by calling the add_done_callback() function and specifying the name of the function to call.
The callbacks functions will be called with the Future object as an argument immediately after the completion of the task. If more than one callback function is registered, then they will be called in the order they were registered and any exceptions within each callback function will be caught, logged, and ignored.
The callback will be called by the worker process that executed the task.
The example below demonstrates how to add a callback function to a Future object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # add a callback option to a future object from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # callback function to call when a task is completed def custom_callback(future): print('Custom callback was called', flush=True) # mock task that will sleep for a moment def work(): sleep(1) print('Task is done') # entry point def main(): # create a process pool with ProcessPoolExecutor() as executor: # execute the task future = executor.submit(work) # add the custom callback future.add_done_callback(custom_callback) # wait for the task to complete wait([future]) if __name__ == '__main__': main() |
Running the example, we can see that the task is completed first, then the callback is executed as we expected.
1 2 |
Task is done Custom callback was called |
Common Error When Using Future Callbacks
A common error is to forget to add the Future object as an argument to the custom callback.
For example:
1 2 3 |
# callback function to call when a task is completed def custom_callback(): print('Custom callback was called') |
If you register this function and try to run the code, you will get a TypeError as follows:
1 2 3 4 5 |
exception calling callback for <Future at 0x10d8e2730 state=finished returned NoneType> Traceback (most recent call last): ... callback(self) TypeError: custom_callback() takes 0 positional arguments but 1 was given |
The message in the TypeError makes it clear how to fix the issue: add a single argument to the function for the Future object, even if you don’t intend on using it in your callback.
Callbacks Execute When Cancelling a Future
We can also see the effect of callbacks on Future objects for tasks that are cancelled.
The effect does not appear to be documented in the API, but we might expect for the callback to always be executed, whether the task is run normally or whether it is cancelled. And this happens to be the case.
The example below demonstrates this.
First, a process pool is created with a single process. A long running task is issued that occupies the entire pool, then we send in a second task, add a callback to the second task, cancel it, and wait for all tasks to finish.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of a callback for a cancelled task via the future object from time import sleep from concurrent.futures import ProcessPoolExecutor from concurrent.futures import wait # callback function to call when a task is completed def custom_callback(future): print('Custom callback was called', flush=True) # mock task that will sleep for a moment def work(sleep_time): sleep(sleep_time) # entry point def main(): # create a process pool with ProcessPoolExecutor(1) as executor: # start a long running task future1 = executor.submit(work, 2) running = future1.running() print(f'First task running={running}') # start a second future2 = executor.submit(work, 0.1) running = future2.running() print(f'Second task running={running}') # add the custom callback future2.add_done_callback(custom_callback) # cancel the second task was_cancelled = future2.cancel() print(f'Second task was cancelled: {was_cancelled}') # explicitly wait for all tasks to complete wait([future1, future2]) if __name__ == '__main__': main() |
Running the example, we can see that the first task is started as we expect.
The second task is scheduled but does not get a chance to run before we cancel it.
The callback is run immediately after we cancel the task, then we report in the main thread that indeed the task was cancelled correctly.
1 2 3 4 |
First task running=True Second task running=False Custom callback was called Second task was cancelled: True |
How to Get Exceptions From Futures
A task may raise an exception during execution.
If we can anticipate the exception, we can wrap parts of our task function in a try-except block and handle the exception within the task.
If an unexpected exception occurs within our task, the task will stop executing.
We cannot know based on the task status whether an exception was raised, but we can check for an exception directly.
We can then access the exception via the exception() function. Alternately, the exception will be re-raised when calling the result() function when trying to get a result.
We can demonstrate this with an example.
The example below will raise a ValueError within the task that will not be caught but instead will be caught by the process pool for us to ac