Last Updated on September 12, 2022
The ProcessPoolExecutor is a flexible and powerful process pool for executing ad hoc CPU-bound tasks in an asynchronous manner.
In this tutorial you will discover a ProcessPoolExecutor example that you can use as a template for your own project.
Let’s get started.
ProcessPoolExecutor Example
Perhaps the most common use case for the ProcessPoolExecutor is to perform a repeated calculation in parallel.
Consider a situation where we might want to check if a word is known to the program or not, e.g. whether it is in a dictionary of known words.
If the word is known, that is fine, but if not, we might want to take action for the user, perhaps underline it in read like an automatic spell check.
One approach to implementing this feature would be to load a dictionary of known words and create a hash of each word. We can then hash new words and check if they exist in the set of known hashed words or not.
Recall that a hash function is a mathematical operation that transforms unique values into integers. This is helpful because it is much faster to check a number rather than to compare string values.
This is a good problem to explore with the ProcessPoolExecutor as hashing words can be relatively slow, especially for large dictionaries of hundreds of thousands or millions of known words.
First, let’s develop a serial (non-concurrent) version of the program.
Run loops using all CPUs, download your FREE book to learn how.
Hash a Dictionary of Words One-By-One
The first step is to select a dictionary of words to use.
On Unix systems, like MacOS and Linux, we have a dictionary already installed, called Unix Words.
It is located in one of the following locations:
- /usr/share/dict/words
- /usr/dict/words
On my system it is located in ‘/usr/share/dict/words‘ and contains 235,886 words calculated using the command:
1 |
cat /usr/share/dict/words | wc -l |
We can use this dictionary of words.
Alternatively, if we are on windows or wish to have a larger dictionary, we can download one of many free lists of words online.
For example, you can download a list of one million English words from here:
Download this file and unzip the archive to your current working directory with the filename “1m_words.txt“.
Looking in the file, we can see that indeed we have a long list of words, one per line.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
aaccf aalders aaren aarika aaron aartjan aasen ab abacus abadines abagael abagail abahri abasolo abazari ... |
First, we need to load the list of words into memory.
This can be achieved by first opening the file, then calling the readlines() function that will automatically read ASCII lines of text into a list.
The load_words() function below takes a path to the text file and returns a list of words loaded from the file.
1 2 3 4 5 6 |
# load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() |
Next, we need to hash each word.
We will intentionally select a slow hash function in this example, specifically the SHA512 algorithm.
This is available in Python via the hashlib.sha512() function.
You can learn more about the hashlib module here:
First, we can create an instance of the hashing object by calling the sha512() function.
1 2 3 |
... # create the hash object hash_object = sha512() |
Next, we can convert a given word to bytes and then hash it using the hash function.
1 2 3 4 5 |
... # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) |
Finally, we can get a hex string representation of the hash for the word by calling the hashlib.hexdigest() function.
1 2 3 |
... # get the hex hash of the word h = hash_object.hexdigest() |
Tying this together, the hash_word() function below takes a word and returns a hex hash code of the word.
1 2 3 4 5 6 7 8 9 10 |
# hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() |
That’s about all there is to it.
We can define a function that will drive the program, first loading the list of words by calling our load_words() then creating a set of hashes of known words by calling our hash_word() for each loaded word.
The main() function below implements this.
1 2 3 4 5 6 7 8 9 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # hash all known words known_words = {hash_word(word) for word in words} print(f'Done, with {len(known_words)} hashes') |
Tying this all together, the complete example of loading a dictionary of words and creating a set of known word hashes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# # example of hashing a word list serially from hashlib import sha512 # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # hash all known words known_words = {hash_word(word) for word in words} print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example, first loads the file and reports that a total of 1,049,938 words were loaded.
The list of words is then hashed and the hashes are stored in a set.
The program reports that a total of 979,250 hashes were stored, suggesting thousands of duplicates in the dictionary.
The program takes about 1.4 seconds to run on a modern system.
How long does the example take to run on your system?
Let me know in the comments below.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Next, we can update the program to hash the words concurrently.
Hash a Dictionary of Words Concurrently with map()
Hashing words is relatively slow, but even so, hashing nearly one million words takes under two seconds.
Nevertheless, we can accelerate the process by making use of all CPUs in the system and hashing the words concurrently.
This can be achieved using the ProcessPoolExecutor.
Firstly, we can create the process pool and specify the number of concurrent processes to run. I recommend configuring the pool to match the number of physical CPU cores in your system.
I have four cores, so the example will use four cores, but update it for the number of cores you have available.
1 2 3 4 |
... # create the process pool with ProcessPoolExecutor(4) as executor: # ... |
Next, we need to submit the tasks to the process pool, that is, the hashing of each word.
Because the task is simply applying a function for each item in a list, we can use the map() function directly.
For example:
1 2 3 |
... # create a set of word hashes known_words = set(, words)) |
And that’s it.
For example, the updated version of the main() function to hash words concurrently is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with ProcessPoolExecutor(4) as executor: # create a set of word hashes known_words = set(, words)) print(f'Done, with {len(known_words)} hashes') |
Well, not so fast.
This would execute, but it would take a very long time to complete.
The reason is that we would be adding nearly one million tasks to the pool to be executed by four processes, and each task would need to be pickled and queued internally. Repeating these operations so many times results in an overhead that far surpasses the execution time of the task.
We must reduce the overhead by reducing the number of internal tasks within the process pool.
This can be achieved by setting the “chunksize” parameter when calling map().
This controls how many items in the iterable map to one task in the process pool. By default, one item is mapped to one task, meaning we have nearly one million tasks.
Perhaps a good first approach would be to split the number items by the number of processes available, in this case four. This would create four tasks, e.g. four large chunks of words, each to be processed by one process, likely on one CPU core.
This can be achieved by calculating the length of the list of words and dividing it by the number of worker processes. The division might not be clean, therefore we can use the math.ceil() function to round the number of items per task up to the nearest integer.
1 2 3 |
... # select a chunk size chunksize = ceil(len(words) / 4) |
We can estimate that this would be (1,049,938 / 4) or about 262,484.5 words per task, e.g. just over a quarter million.
We can then use this chunksize when calling the map() function.
1 2 3 |
... # create a set of word hashes known_words = set(, words, chunksize=chunksize)) |
Tying this together, the complete example of hashing a dictionary of words concurrently using the ProcessPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# # example of hashing a word list concurrently from math import ceil from hashlib import sha512 from concurrent.futures import ProcessPoolExecutor # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path) as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with ProcessPoolExecutor(4) as executor: # select a chunk size chunksize = ceil(len(words) / 4) # create a set of word hashes known_words = set(, words, chunksize=chunksize)) print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example loads the words as before then creates the set of hashed words concurrently by splitting it into four tasks, one for each process in the pool.
This concurrent version does offer a very minor speedup, taking about 1.2 seconds on my system, offering a small speedup.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Next, let’s see if we can get a further improvement by tuning the chunksize argument.
Free Python ProcessPoolExecutor Course
Download your FREE ProcessPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ProcessPoolExecutor API.
Discover how to use the ProcessPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
How to Tune chunksize Values with map()
Splitting items into tasks for the process pool is more art than science.
Getting it wrong, like setting it to one when we have a large number of tasks, can result in much worse performance than the serial case. Setting it naively can result in equivalent or slightly better performance than the serial case.
As such, we can tune the performance of the application by testing different values of the “chunksize” argument.
In the previous section we saw that a chunksize of 262,485 resulted in similar performance to the serial case.
I recommend testing different chunk sizes in order to discover what works well on your specific system, for example, some numbers you could try include:
- 100,000
- 50,000
- 10,000
- 5,000
- 1,000
- 500
It is common to perform this type of tuning when working with distributed systems and multi-process systems as the specific cost of serializing and transmitting data between workers depends on the hardware and specific data.
If the tasks involved were long running or sensitive in some way, you could design a test harness with mock tasks.
We can define a function to test a given chunksize argument that also calculates how long the task takes to complete, including the fixed cost of setting up the process pool.
The test_chunksize() function below implements this, taking the loaded dictionary of words and chunksize to test, and reports how long it took to execute the task for the given chunksize.
1 2 3 4 5 6 7 8 9 10 |
# test a chunksize def test_chunksize(words, size): time1 = time() # create the process pool with ProcessPoolExecutor(4) as executor: # create a set of word hashes _ = set(, words, chunksize=size)) time2 = time() total = time2 - time1 print(f'{size}: {total:.3f} seconds') |
We can call this function from our main() function with a list of different chunk size values to test, for example:
1 2 3 4 5 6 7 8 9 10 11 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # test chunk sizes base = ceil(len(words) / 4) sizes = [base, 100000, 50000, 10000, 5000, 1000, 500] for size in sizes: test_chunksize(words, size) |
Tying this together, the complete example of testing different chunksize values is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# # example of testing chunksize when hashing a word list concurrently from math import ceil from time import time from hashlib import sha512 from concurrent.futures import ProcessPoolExecutor # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() # test a chunksize def test_chunksize(words, size): time1 = time() # create the process pool with ProcessPoolExecutor(4) as executor: # create a set of word hashes _ = set(, words, chunksize=size)) time2 = time() total = time2 - time1 print(f'{size}: {total:.3f} seconds') # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # test chunk sizes base = ceil(len(words) / 4) sizes = [base, 100000, 50000, 10000, 5000, 1000, 500] for size in sizes: test_chunksize(words, size) if __name__ == '__main__': main() |
Running the example, we can see that a chunksize of about 10,000 or 5,000 would work well, performing the task in about 0.8 seconds as opposed to about 1.4 in the serial case and 1.2 for the naive configuration of chunksize, at least on my system.
That is about 42% faster than the serial version and about 33% faster than the unoptimized version.
This highlights the importance of tuning the chunksize for your specific task and computer hardware.
1 2 3 4 5 6 7 8 |
Loaded 1049938 words from 1m_words.txt 262485: 1.242 seconds 100000: 1.122 seconds 50000: 1.157 seconds 10000: 0.871 seconds 5000: 0.842 seconds 1000: 1.036 seconds 500: 1.112 seconds |
What worked well on your system?
Let me know in the comments below.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
- ProcessPoolExecutor Jump-Start, Jason Brownlee (my book!)
- Concurrent Futures API Interview Questions
- ProcessPoolExecutor PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
- Python ProcessPoolExecutor: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python Pool: The Complete Guide
- Thread (computing), Wikipedia.
- Process (computing), Wikipedia.
- Thread Pool, Wikipedia.
- Futures and promises, Wikipedia.
You now know how to download files concurrently with this ProcessPoolExecutor example.
Do you have any questions about this example?
Ask your question in the comments below and I will do my best to answer.
What about the order of words hashed: is it preserved?
What if we need to make sure the words are processed in the exact order they are in the initial dictionary?
The order is preserved if using map() not if using submit() and as_completed()
You could also return the word with the hash in a tuple, then search the list of results by words and get the ordered hashes.
Does that help?
I’m experimenting with the code above, but I noticed that my CPU (4 logical cores/ 8 hyperthreaded) is never going above around 30% load. Not even when I raise the amount of workers. Is this expected behavior?
Interesting. Perhaps try increasing the number of workers and see if that has an effect on load?
Try extending the duration of the task and see if that has an effect?