Last Updated on September 12, 2022
The multiprocessing.Pool is a flexible and powerful process pool for executing ad hoc CPU-bound tasks in a synchronous or asynchronous manner.
In this tutorial you will discover a multiprocessing.Pool example that you can use as a template for your own project.
Let’s get started.
Multiprocessing Pool Example
Perhaps the most common use case for the multiprocessing.Pool is to perform a repeated calculation in parallel.
Consider a situation where we might want to check if a word is known to the program or not, e.g. whether it is in a dictionary of known words.
If the word is known, that is fine, but if not, we might want to take action for the user, perhaps underline it in read like an automatic spell check.
One approach to implementing this feature would be to load a dictionary of known words and create a hash of each word. We can then hash new words and check if they exist in the set of known hashed words or not.
Recall that a hash function is a mathematical operation that transforms unique values into integers. This is helpful because it is much faster to check a number rather than to compare string values.
This is a good problem to explore with the multiprocessing.Pool as hashing words can be relatively slow, especially for large dictionaries of hundreds of thousands or millions of known words.
First, let’s develop a serial (non-concurrent) version of the program.
Run loops using all CPUs, download your FREE book to learn how.
Hash a Dictionary of Words One-By-One
The first step is to select a dictionary of words to use.
On Unix systems, like MacOS and Linux, we have a dictionary already installed, called Unix Words.
It is located in one of the following locations:
- /usr/share/dict/words
- /usr/dict/words
On my system it is located in ‘/usr/share/dict/words‘ and contains 235,886 words calculated using the command:
1 |
cat /usr/share/dict/words | wc -l |
We can use this dictionary of words.
Alternatively, if we are on windows or wish to have a larger dictionary, we can download one of many free lists of words online.
For example, you can download a list of one million English words from here:
Download this file and unzip the archive to your current working directory with the filename “1m_words.txt“.
Looking in the file, we can see that indeed we have a long list of words, one per line.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
aaccf aalders aaren aarika aaron aartjan aasen ab abacus abadines abagael abagail abahri abasolo abazari ... |
First, we need to load the list of words into memory.
This can be achieved by first opening the file, then calling the readlines() function that will automatically read ASCII lines of text into a list.
The load_words() function below takes a path to the text file and returns a list of words loaded from the file.
1 2 3 4 5 6 |
# load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() |
Next, we need to hash each word.
We will intentionally select a slow hash function in this example, specifically the SHA512 algorithm.
This is available in Python via the hashlib.sha512() function.
You can learn more about the hashlib module here:
First, we can create an instance of the hashing object by calling the sha512() function.
1 2 3 |
... # create the hash object hash_object = sha512() |
Next, we can convert a given word to bytes and then hash it using the hash function.
1 2 3 4 5 |
... # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) |
Finally, we can get a hex string representation of the hash for the word by calling the hashlib.hexdigest() function.
1 2 3 |
... # get the hex hash of the word h = hash_object.hexdigest() |
Tying this together, the hash_word() function below takes a word and returns a hex hash code of the word.
1 2 3 4 5 6 7 8 9 10 |
# hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() |
That’s about all there is to it.
We can define a function that will drive the program, first loading the list of words by calling our load_words() then creating a set of hashes of known words by calling our hash_word() for each loaded word.
The main() function below implements this.
1 2 3 4 5 6 7 8 9 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # hash all known words known_words = {hash_word(word) for word in words} print(f'Done, with {len(known_words)} hashes') |
Tying this all together, the complete example of loading a dictionary of words and creating a set of known word hashes is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of hashing a word list serially from hashlib import sha512 # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # hash all known words known_words = {hash_word(word) for word in words} print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example, first loads the file and reports that a total of 1,049,938 words were loaded.
The list of words is then hashed and the hashes are stored in a set.
The program reports that a total of 979,250 hashes were stored, suggesting thousands of duplicates in the dictionary.
The program takes about 1.6 seconds to run on a modern system.
How long does the example take to run on your system?
Let me know in the comments below.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Next, we can update the program to hash the words in parallel.
Hash a Dictionary of Words Concurrently with map()
Hashing words is relatively slow, but even so, hashing nearly one million words takes under two seconds.
Nevertheless, we can accelerate the process by making use of all CPUs in the system and hashing the words concurrently.
This can be achieved using the multiprocessing.Pool.
Firstly, we can create the process pool and specify the number of concurrent processes to run. I recommend configuring the pool to match the number of physical CPU cores in your system.
I have four cores, so the example will use four cores, but update it for the number of cores you have available.
1 2 3 4 |
... # create the process pool with Pool(4) as pool: # ... |
Next, we need to submit the tasks to the process pool, that is, the hashing of each word.
Because the task is simply applying a function for each item in a list, we can use the map() function directly.
For example:
1 2 3 |
... # create a set of word hashes known_words = set(pool.map(hash_word, words)) |
And that’s it.
For example, the updated version of the main() function to hash words concurrently is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with Pool(4) as pool: # create a set of word hashes known_words = set(pool.map(hash_word, words)) print(f'Done, with {len(known_words)} hashes') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of hashing a word list in parallel with a process pool from math import ceil from hashlib import sha512 from multiprocessing import Pool # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path) as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with Pool(4) as pool: # create a set of word hashes known_words = set(pool.map(hash_word, words)) print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example loads the words as before then applies the hash_word() function to each word in the loaded list as before, except this time the functions are executed in parallel using the process pool.
This concurrent version does offer a minor speedup, taking about 1.1 seconds on my system, compared to 1.6 seconds for the serial version.
That is about 0.6 seconds faster or a 1.55x speed-up.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Hash Words Concurrently with map() and Default chunksize
The example in the previous section applied our hash_word() function to each word in the loaded list.
We are adding nearly one million tasks to the pool to be executed by four processes, and each task would need to be pickled and queued internally. Repeating these operations so many times results in some overhead adding to the overall execution time of the task. This is not technically true, but we will get to the bottom of this in a moment.
We can reduce the overhead by reducing the number of internal tasks within the multiprocessing.pool.
This can be achieved by setting the “chunksize” parameter when calling map().
This argument controls how many items in the iterable map to one task in the process pool.
By default the chunksize is set to None. In this case, the chunksize will not be set to 1 as we might expect, instead, a chunksize is calculated automatically.
We can see this in the source code for the Pool class:
1 2 3 4 5 |
... if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 |
The divmod() function will return the result (quotient) and remainder.
Here, we are dividing the length of the input by 4 times the number of workers in the pool, e.g.:
- chunksize, extra = divmod(1,049,938, 4 * 4)
- chunksize, extra = divmod(1,049,938, 16)
- chunksize, extra = divmod(1,049,938, 16)
- chunksize, extra = 65621, 2
Given there is an extra (remainder), one is added to the chunksize.
This means that the default chunksize in this application is 65,622. or about 65K.
Maybe we can do better by setting a custom chunksize value.
Perhaps a good first approach would be to split the number items by the number of processes available, in this case 4. This would create four tasks, e.g. four large chunks of words, each to be processed by one process, likely on one CPU core.
This can be achieved by calculating the length of the list of words and dividing it by the number of worker processes. The division might not be clean, therefore we can use the math.ceil() function to round the number of items per task up to the nearest integer.
1 2 3 |
... # select a chunk size chunksize = ceil(len(words) / 4) |
We can estimate that this would be (1,049,938 / 4) or about 262,484.5 words per task, e.g. just over a quarter million.
We can then use this chunksize when calling the map() function.
1 2 3 |
... # create a set of word hashes known_words = set(pool.map(hash_word, words, chunksize=chunksize)) |
Tying this together, the complete example of hashing a dictionary of words concurrently using the multiprocessing.Pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of hashing a word list in parallel with a process pool and chunksize from math import ceil from hashlib import sha512 from multiprocessing import Pool # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path) as file: # read all data as lines return file.readlines() # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # create the process pool with Pool(4) as pool: # select a chunk size chunksize = ceil(len(words) / 4) # create a set of word hashes known_words = set(pool.map(hash_word, words, chunksize=chunksize)) print(f'Done, with {len(known_words)} hashes') if __name__ == '__main__': main() |
Running the example loads the words as before then creates the set of hashed words concurrently by splitting it into four tasks, one for each process in the pool.
This concurrent version does offer a minor speedup, taking about 1.3 seconds on my system, compared to 1.6 seconds for the serial case.
That is about 0.4 seconds faster or a 1.31x speed-up.
Nevertheless, this is slower than the concurrent version above that did not use the chunksize. The default of splitting the task into 4 subtasks can be improved upon.
1 2 |
Loaded 1049938 words from 1m_words.txt Done, with 979250 hashes |
Next, let’s see if we can get a further improvement by tuning the chunksize argument.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Tune chunksize Values with map()
Splitting items into tasks for the process pool is more art than science.
Getting it wrong, like setting it to one when we have a large number of tasks, can result in much worse performance than the serial case. Setting it naively can result in equivalent or slightly better performance than the serial case.
As such, we can tune the performance of the application by testing different values of the “chunksize” argument.
In the previous section we saw that a chunksize of 262,485 resulted in better performance to the serial case, but worse than the default chunksize of 65,622.
I recommend testing different chunk sizes in order to discover what works well on your specific system, for example, some numbers you could try include:
- 100,000
- 50,000
- 10,000
- 5,000
- 1,000
- 500
It is common to perform this type of tuning when working with distributed systems and multi-process systems as the specific cost of serializing and transmitting data between workers depends on the hardware and specific data.
If the tasks involved were long running or sensitive in some way, you could design a test harness with mock tasks.
We can define a function to test a given chunksize argument that also calculates how long the task takes to complete, including the fixed cost of setting up the process pool.
The test_chunksize() function below implements this, taking the loaded dictionary of words and chunksize to test, and reports how long it took to execute the task for the given chunksize.
1 2 3 4 5 6 7 8 9 10 |
# test a chunksize def test_chunksize(words, size): time1 = time() # create the process pool with Pool(4) as pool: # create a set of word hashes _ = set(pool.map(hash_word, words, chunksize=size)) time2 = time() total = time2 - time1 print(f'{size}: {total:.3f} seconds') |
We can call this function from our main() function with a list of different chunk size values to test, for example:
1 2 3 4 5 6 7 8 9 10 11 |
# entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # test chunk sizes base = ceil(len(words) / 4) sizes = [None, base, 100000, 50000, 10000, 5000, 1000, 500] for size in sizes: test_chunksize(words, size) |
Tying this together, the complete example of testing different chunksize values is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of testing chunksize when hashing a word list in parallel with the process pool from math import ceil from time import time from hashlib import sha512 from multiprocessing import Pool # hash one word using the SHA algorithm def hash_word(word): # create the hash object hash_object = sha512() # convert the string to bytes byte_data = word.encode('utf-8') # hash the word hash_object.update(byte_data) # get the hex hash of the word return hash_object.hexdigest() # load a file of words def load_words(path): # open the file with open(path, encoding='utf-8') as file: # read all data as lines return file.readlines() # test a chunksize def test_chunksize(words, size): time1 = time() # create the process pool with Pool(4) as pool: # create a set of word hashes _ = set(pool.map(hash_word, words, chunksize=size)) time2 = time() total = time2 - time1 print(f'{size}: {total:.3f} seconds') # entry point def main(): # load a file of words path = '1m_words.txt' words = load_words(path) print(f'Loaded {len(words)} words from {path}') # test chunk sizes base = ceil(len(words) / 4) sizes = [None, base, 100000, 50000, 10000, 5000, 1000, 500] for size in sizes: test_chunksize(words, size) if __name__ == '__main__': main() |
Firstly, we can see that the default chunksize of about 65K took about 0.926 seconds compared to 1.1 seconds above. Using a chunksize of about 250K took about 1.13 seconds compared to 1.3 seconds previously.
This is because in this example we are only measuring the time of the task, excluding the time loading and preparing the data file.
Reviewing the results, we can see that a chunksize between 10K and 500 would be fine.
The best result was achieved with a chunksize of 5,000, performing the task in about 0.702 seconds as opposed to about 1.6 seconds in the serial, 1.1 seconds with the default chunksize and 1.3 seconds with our clumsy chunksize.
This is about 0.998 second faster than the serial case or a 2.42x speed-up.
This highlights the importance of tuning the chunksize for your specific task and computer hardware.
1 2 3 4 5 6 7 8 9 |
Loaded 1049938 words from 1m_words.txt None: 0.926 seconds 262485: 1.135 seconds 100000: 0.908 seconds 50000: 0.976 seconds 10000: 0.733 seconds 5000: 0.702 seconds 1000: 0.711 seconds 500: 0.728 seconds |
What worked well on your system?
Let me know in the comments below.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to download files concurrently with this multiprocessing.Pool example.
Do you have any questions about this example?
Ask your question in the comments below and I will do my best to answer.
Photo by Tobias Nii Kwatei Quartey on Unsplash
eric kischell says
Nice article. I plan to adapt it to a slow Python Mass Spec. problem.
thx e.-
keesh@ieee.org
Jason Brownlee says
Thank you.
Let me know how you go.