How to Configure Multiprocessing Pool.map() Chunksize
You can execute tasks in batches using the “chunksize” argument when using the Pool map() method.
In this tutorial you will discover the chunksize argument when executing multiple tasks with the multiprocessing pool in Python.
Let's get started.
Problem With Issuing Many Tasks to the Pool
The multiprocessing pool allows us to issue many tasks to the process pool at once.
This can be achieved by calling a function like Pool.map() to apply the same function to each item in an iterable and wait for the results, or with a function like Pool.map_async() that does the same thing asynchronously.
A problem when executing a large number of tasks in the multiprocessing pool is that each task must be managed which is slow.
Specifically:
- Each task must be issued to the pool and added to an internal queue before being dispatched to a worker process.
- The arguments for each task must be serialized (automatically) using pickle in order to be transmitted to the worker process.
- After completed, the return value of the task must be serialized (automatically) using pickle in order to be transmitted back to the caller process.
- The return value for each task must be stored in an internal queue of results for the caller to retrieve.
These activities are repeated for each task issued to the pool, adding a small computational overhead and a slight delay.
When hundreds, thousands, or millions of tasks are issued to the multiprocessing pool, the computational overhead and associated delay can really add up.
Thankfully, the multiprocessing pool provides a way to group tasks into batches or chunks in an attempt to reduce this overhead.
This can be achieved using a "chunksize" argument when issuing tasks.
What is Chunksize
The "chunksize" is an argument specified in a function to the multiprocessing pool when issuing many tasks.
It controls the mapping of tasks issued to the pool (e.g. calls to a target function with one or more arguments), to internal tasks that are transmitted to child worker processes in the pool to be executed and that return a result.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
-- multiprocessing — Process-based parallelism
The chunksize allows:
- Tasks to be transmitted to child workers in batch.
- A batch of tasks to be executed by a child worker.
- Return values generated by a task to be transmitted back to the caller in batch.
Perhaps a better name for the argument would be "batch size".
What Functions Support Chunksize
There are 6 functions in the multiprocessing pool that support the "chunksize" argument when issuing multiple tasks.
They are:
- Pool.map()
- Pool.map_async()
- Pool.imap()
- Pool.imap_unordered()
- Pool.starmap()
- Pool.starmap_async()
You can learn more about the differences between these functions in the tutorial:
What is the Default Chunksize?
The multiprocessing pool "chunksize" has the default value of None.
In the case of Pool.imap() and Pool.imap_unordered(), the default chunksize is 1.
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
-- multiprocessing — Process-based parallelism
In the case of Pool.map(), Pool.starmap() and the asynchronous versions of these functions, the default is not 1.
Instead, when chunksize is set to None, a default chunksize is calculated based on the length of the provided iterable.
We can see this if we review the code for the multiprocessing.Pool class.
The snippet of code below is taken from the Pool class where the default chunksize is calculated.
...
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
The calculation involves dividing the number of items in the iterable (e.g. the number of tasks issued) by the number of worker child processes multiplied by four.
If there is a remainder, then one is added to the chunksize, to round up for extra tasks.
You might recall that the built-in divmod() function will return the result of dividing one number by another (quotient) as well as the remainder.
We can demonstrate this calculation with a small example.
Consider a multiprocessing pool with 2 child worker processes and a call to map() with an iterable containing 500,000 items.
The chunksize in this situation would be calculated as follows:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- chunksize, extra = divmod(500,000, 2 * 4)
- chunksize, extra = divmod(500,000, 8)
- chunksize, extra = 62500, 0
How To Configure Chunksize
We can configure a custom chunksize value.
We must assume that the calculation used for the default chunksize was implemented because it works well in most situations.
This means that any custom chunksize we set should be compared to the default to ensure it improves performance.
In fact, there are no good general guidelines for how to set the "chunksize" argument.
Instead, I recommend using some trial and error in order to discover what works well for your specific applications.
Some values to explore might be:
- No Chunksize: chunksize=1
- Default Chunksize: chunksize=None (default chunksize)
- Even Chunksize: chunksize = len(iterable) / len(pool._pool)
Alternatively, you could try a log scale (orders of magnitude) between 1 and the number of items in the provided iterable to help quickly flush out a value that works well.
For example:
- chunksize=1
- chunksize=10
- chunksize=100
- chunksize=1000
- chunksize=10000
- and so on...
Now that we are familiar with chunksize, let's look at some worked examples.
Example With No Chunking (chunksize=1)
We can explore the effect of a chunksize of one.
In this example, we will define a simple task that takes an argument, blocks for a moment to simulate work, and returns a value. We can then issue many calls to this function with different arguments and set a chunksize of one and consider how long the example takes to execute. This will provide a point of comparison with other chunksize values later.
Firstly, we can define a function to execute on the multiprocessing pool.
The function will take an integer argument, will generate a random value between 0 and 1, block for between 1 and 2 seconds, and then returns the generated value added to the provided argument.
The task() function below implements this.
# task to execute in a separate process
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
Next, in the main process will create a multiprocessing pool using the default configuration of all logical CPUs in the system.
You can learn more about the default configuration of the multiprocessing pool in the tutorial:
We will use the context manager interface to ensure the pool is closed automatically for us once we're finished.
...
# create the multiprocessing pool
with Pool() as pool:
# ...
You can learn more about the context manager interface for the pool in the tutorial:
Next, we will issue 100 calls to our custom task() function with integer values between 0 and 99.
We will use the synchronous map() function to issue the tasks and a chunksize of 1 and iterate the return values once all tasks have completed.
...
# issue tasks and process results
for result in pool.map(task, range(100), chunksize=1):
# report the result
print(result)
You can learn more about the Pool.map() function in the tutorial:
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example with no chunking, e.g. chunksize=1
from time import sleep
from random import random
from multiprocessing import Pool
# task to execute in a separate process
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
# protect the entry point
if __name__ == '__main__':
# create the multiprocessing pool
with Pool() as pool:
# issue tasks and process results
for result in pool.map(task, range(100), chunksize=1):
# report the result
print(result)
Running the example first creates the multiprocessing pool with a default configuration.
Next, the 100 tasks are issued to the multiprocessing pool with a chunksize of 1.
Tasks are issued to workers one at a time, executed, and the result returned.
No batching of issued tasks to internal tasks sent to workers is performed.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 19.9 seconds to complete on my system.
...
90.00666396577743
91.78748090488585
92.54642553173721
93.89105290250727
94.1142948528356
95.32983425821804
96.57469861761551
97.04100646174298
98.55079918540376
99.46373262845606
Next, let's look at an example of using the default chunksize.
Example With Default Chunksize (chunksize=None)
We can explore using the default chunksize used by map(), starmap() and their asynchronous versions.
In this example, we can update the previous example to set the "chunksize" argument to None, the same as omitting the chunksize argument completely.
For example:
...
# issue tasks and process results
for result in pool.map(task, range(100), chunksize=None):
# report the result
print(result)
This will cause the multiprocessing pool to calculate and use a default chunksize.
I have 8 logical CPUs in my system, therefore 8 child workers by default. Therefore, on my system the default chunksize in this case will be calculated as:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- chunksize, extra = divmod(100, 8 * 4)
- chunksize, extra = divmod(100, 32)
- chunksize, extra = 3, 4
Given that there is a remainder, 1 is added to the chunksize to give:
- chunksize = 3 + 1
- chunksize = 4
That means the 100 issued tasks will be mapped to 25 internal tasks.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example with the default chunksize
from time import sleep
from random import random
from multiprocessing import Pool
# task to execute in a separate process
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
# protect the entry point
if __name__ == '__main__':
# create the multiprocessing pool
with Pool() as pool:
# issue tasks and process results
for result in pool.map(task, range(100), chunksize=None):
# report the result
print(result)
Running the example first creates the multiprocessing pool with a default configuration.
Next, the 100 tasks are issued to the multiprocessing pool with a default chunksize of 4.
Tasks are issued to workers in batches of four, executed, and the result returned.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 22.0 seconds to complete on my system, worse than using a chunksize of 1.
The default chunksize does not help in this case.
...
90.72061963275682
91.51395925172658
92.44189834307795
93.1294542451108
94.7868929918878
95.19862834343039
96.07772741723383
97.69995047391141
98.31798025450472
99.62925679159557
Next, let's explore the effect of using an even chunksize.
Example With Even Chunksize (chunksize=n_tasks/n_workers)
We can explore the effect of using an even chunksize, that is an even division of the number of tasks to child worker processes.
In this case, we can update the previous example to first calculate a chunksize based on the number of items in the iterable and the number of child worker processes in the pool.
...
# number of tasks to execute
n_tasks = 100
# chunksize to use
n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
Recall that the math.ceil() function rounds a floating point number up to the next integer.
On my system, I have 8 logical CPUs, therefore 8 child workers by default.
Therefore, the even chunksize in this case will be:
- n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
- n_tasks_per_chunk = ceil(100 / 8)
- n_tasks_per_chunk = ceil(12.5)
- n_tasks_per_chunk = 13
We will then report the values as confirmation.
...
# chunksize to use
n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example with the even chunksize
from math import ceil
from time import sleep
from random import random
from multiprocessing import Pool
# task to execute in a separate process
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
# protect the entry point
if __name__ == '__main__':
# create the multiprocessing pool
with Pool() as pool:
# number of tasks to execute
n_tasks = 100
# chunksize to use
n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
# report details for reference
print(f'chunksize={n_tasks_per_chunk}, n_workers={len(pool._pool)}')
# issue tasks and process results
for result in pool.map(task, range(100), chunksize=n_tasks_per_chunk):
# report the result
print(result)
Running the example first creates the multiprocessing pool with a default configuration.
Next, the 100 tasks are issued to the multiprocessing pool with a specified chunksize of 13.
Tasks are issued to workers in batches of 13, executed, and the result returned.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 20.3 seconds to complete on my system, still worse than using a chunksize of 1, although slightly better than the default chunksize in this case.
chunksize=13, n_workers=8
...
90.9139613348572
91.29627702972807
92.44633424109573
93.71585909166346
94.27121635373051
95.05300459383739
96.76344007460122
97.70486914449164
98.74067104900065
99.05785271152739
Extensions
This section gives you some ideas to explore in order to learn more about chunksize.
- Develop an example that issues many more tasks, e.g. 100,000 and try different chunksizes on a log scale between 1 and 100,000. Plot the results.
- Develop an example where there are two types of tasks, short duration tasks and long duration tasks (2x or more of the short duration), then experiment with different chunk sizes.
- Develop an example that experiments with the chunksize for the imap() function.
If you explore any of these examples, let me know. I'd love to see what you come up with.
Takeaways
You now know how to use the chunksize argument when executing multiple tasks with the multiprocessing pool in Python.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.