How to Configure ThreadPool map() Chunksize
You can execute tasks in batches using the "chunksize" argument when using the ThreadPool map() method.
In this tutorial you will discover the chunksize argument when executing multiple tasks with the ThreadPool in Python.
Let's get started.
Problem With Issuing Many Tasks to the Pool
The ThreadPool allows us to issue many tasks at once.
This can be achieved by calling a function like map() method to call the same function to each item in an iterable and wait for the results, or with a function like map_async() method that does the same thing asynchronously.
A problem when executing a large number of tasks in the ThreadPool is that each task must be managed which is slow.
Specifically:
- Each task must be issued to the pool and added to an internal queue before being dispatched to a worker thread.
- The arguments for each task must be serialized (automatically) using pickle in order to be transmitted to the worker thread.
- After completed, the return value of the task must be serialized (automatically) using pickle in order to be transmitted back to the caller thread.
- The return value for each task must be stored in an internal queue of results for the caller to retrieve.
These activities are repeated for each task issued to the ThreadPool, adding a small computational overhead and a slight delay.
When hundreds, thousands, or millions of tasks are issued to the ThreadPool, the computational overhead and associated delay can really add up.
Thankfully, the ThreadPool provides a way to group tasks into batches or chunks in an attempt to reduce this overhead.
This can be achieved using a "chunksize" argument when issuing tasks.
What is Chunksize
The "chunksize" is an argument specified in a function to the ThreadPool when issuing many tasks.
It controls the mapping of tasks issued to the pool (e.g. calls to a target function with one or more arguments), to internal tasks that are transmitted to worker threads in the pool to be executed and that return a result.
Recall that the ThreadPool class extends the Pool class, and the methods that support the "chunksize" argument are defined on the Pool class.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
-- multiprocessing — Process-based parallelism
The chunksize allows:
- Tasks to be transmitted to workers in batch.
- A batch of tasks to be executed by a thread worker.
- Return values generated by a task to be transmitted back to the caller in batch.
Perhaps a better name for the argument would be "batch size".
What Functions Support Chunksize
There are 6 methods in the ThreadPool that support the "chunksize" argument when issuing multiple tasks.
They are:
- map()
- map_async()
- imap()
- imap_unordered()
- starmap()
- starmap_async()
You can learn more about the differences between these functions in the tutorial:
What is the Default Chunksize?
The ThreadPool "chunksize" has the default value of None.
In the case of imap() and imap_unordered() methods, the default chunksize is 1.
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
-- multiprocessing — Process-based parallelism
In the case of map(), starmap() methods and the asynchronous versions of these methods, the default is not 1.
Instead, when chunksize is set to None, a default chunksize is calculated based on the length of the provided iterable.
We can see this if we review the code for the multiprocessing.Pool class, the parent of the ThreadPool class.
The snippet of code below is taken from the Pool class where the default chunksize is calculated.
...
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
The calculation involves dividing the number of items in the iterable (e.g. the number of tasks issued) by the number of worker threads multiplied by four.
If there is a remainder, then one is added to the chunksize, to round up for extra tasks.
You might recall that the built-in divmod() function will return the result of dividing one number by another (quotient) as well as the remainder.
We can demonstrate this calculation with a small example.
Consider a ThreadPool with 2 worker threads and a call to map() with an iterable containing 500,200 items.
The chunksize in this situation would be calculated as follows:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- chunksize, extra = divmod(500,000, 2 * 4)
- chunksize, extra = divmod(500,000, 8)
- chunksize, extra = 62500, 0
How To Configure Chunksize
We can configure a custom chunksize value.
We must assume that the calculation used for the default chunksize was implemented because it works well in most situations.
This means that any custom chunksize we set should be compared to the default to ensure it improves performance.
In fact, there are no good general guidelines for how to set the "chunksize" argument.
Instead, I recommend using some trial and error in order to discover what works well for your specific applications.
Some values to explore might be:
- No Chunksize: chunksize=1
- Default Chunksize: chunksize=None (default chunksize)
- Even Chunksize: chunksize = len(iterable) / len(pool._pool)
Alternatively, you could try a log scale (orders of magnitude) between 1 and the number of items in the provided iterable to help quickly flush out a value that works well.
For example:
- chunksize=1
- chunksize=10
- chunksize=100
- chunksize=1000
- chunksize=10000
- and so on...
Now that we are familiar with chunksize, let's look at some worked examples.
Example With No Chunking (chunksize=1)
We can explore the effect of a chunksize of one.
In this example, we will define a simple task that takes an argument, blocks for a moment to simulate work, and returns a value. We can then issue many calls to this function with different arguments and set a chunksize of one and consider how long the example takes to execute. This will provide a point of comparison with other chunksize values later.
Firstly, we can define a function to execute on the ThreadPool.
The function will take an integer argument, will generate a random value between 0 and 1, block for between 1 and 2 seconds, and then returns the generated value added to the provided argument.
The task() function below implements this.
# task to execute in a separate thread
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
Next, in the main thread will create a ThreadPool using the default configuration of all logical CPUs in the system.
You can learn more about the default configuration of the ThreadPool in the tutorial:
We will use the context manager interface to ensure the pool is closed automatically for us once we're finished.
...
# create the thread pool
with ThreadPool() as pool:
# ...
You can learn more about the context manager interface for the ThreadPool in the tutorial:
Next, we will issue 100 calls to our custom task() function with integer values between 0 and 99.
We will use the synchronous map() function to issue the tasks and a chunksize of 1 and iterate the return values once all tasks have completed.
...
# issue tasks and handle results
for result in pool.map(task, range(100), chunksize=1):
# report the result
print(result)
You can learn more about the map() method in the tutorial:
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example with no chunking, e.g. chunksize=1
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in a separate thread
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
# protect the entry point
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# issue tasks and handle results
for result in pool.map(task, range(100), chunksize=1):
# report the result
print(result)
Running the example first creates the ThreadPool with a default configuration.
Next, the 100 tasks are issued to the ThreadPool with a chunksize of 1.
Tasks are issued to workers one at a time, executed, and the result returned.
No batching of issued tasks to internal tasks sent to workers is performed.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 19.8 seconds to complete on my system.
...
87.75213213041215
88.77941875387141
89.39870765701268
90.84941587881116
91.63501318380227
92.21817634886247
93.18482472789132
94.22651614448517
95.4592651817853
96.07769894628859
97.12184212447335
98.95054587257876
99.65877159758986
Next, let's look at an example of using the default chunksize.
Example With Default Chunksize (chunksize=None)
We can explore using the default chunksize used by map(), starmap() and their asynchronous versions.
In this example, we can update the previous example to set the "chunksize" argument to None, the same as omitting the chunksize argument completely.
For example:
...
# issue tasks and handle results
for result in pool.map(task, range(100), chunksize=None):
# report the result
print(result)
This will cause the ThreadPool to calculate and use a default chunksize.
I have 8 logical CPUs in my system, therefore 8 thread workers by default. Therefore, on my system the default chunksize in this case will be calculated as:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- chunksize, extra = divmod(100, 8 * 4)
- chunksize, extra = divmod(100, 32)
- chunksize, extra = 3, 4
Given that there is a remainder, 1 is added to the chunksize to give:
- chunksize = 3 + 1
- chunksize = 4
That means the 100 issued tasks will be mapped to 25 internal tasks.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example with the default chunksize
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in a separate thread
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
# protect the entry point
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# issue tasks and handle results
for result in pool.map(task, range(100), chunksize=None):
# report the result
print(result)
Running the example first creates the ThreadPool with a default configuration.
Next, the 100 tasks are issued to the ThreadPool with a default chunksize of 4.
Tasks are issued to workers in batches of four, executed, and the result returned.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 22.2 seconds to complete on my system, worse than using a chunksize of 1.
The default chunksize does not help in this case.
...
88.24822285708896
89.21641752734365
90.08068496930139
91.5811526207494
92.48400618616967
93.01525835935198
94.77618153107508
95.39102670384972
96.2953574433547
97.17086167858272
98.1802522847596
99.85580788682849
Next, let's explore the effect of using an even chunksize.
Example With Even Chunksize (chunksize=n_tasks/n_workers)
We can explore the effect of using an even chunksize, that is an even division of the number of tasks to worker threads.
In this case, we can update the previous example to first calculate a chunksize based on the number of items in the iterable and the number of worker threads in the pool.
...
# number of tasks to execute
n_tasks = 100
# chunksize to use
n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
Recall that the math.ceil() function rounds a floating point number up to the next integer.
On my system, I have 8 logical CPUs, therefore 8 thread workers by default.
Therefore, the even chunksize in this case will be:
- n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
- n_tasks_per_chunk = ceil(100 / 8)
- n_tasks_per_chunk = ceil(12.5)
- n_tasks_per_chunk = 13
We will then report the values as confirmation.
...
# chunksize to use
n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example with the even chunksize
from math import ceil
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in a separate thread
def task(arg):
# generate a value
value = random()
# block for a fraction of a second
sleep(1 + value)
# return generated value combined with argument
return arg + value
# protect the entry point
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# number of tasks to execute
n_tasks = 100
# chunksize to use
n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
# report details for reference
print(f'chunksize={n_tasks_per_chunk}, n_workers={len(pool._pool)}')
# issue tasks and handle results
for result in pool.map(task, range(100), chunksize=n_tasks_per_chunk):
# report the result
print(result)
Running the example first creates the ThreadPool with a default configuration.
Next, the 100 tasks are issued to the ThreadPool with a specified chunksize of 13.
Tasks are issued to workers in batches of 13, executed, and the result returned.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 20.1 seconds to complete on my system, still worse than using a chunksize of 1, although slightly better than the default chunksize in this case.
chunksize=13, n_workers=8
...
88.62561237774375
89.38040892006248
90.29703505596949
91.7007948403023
92.09543932128561
93.21707173734491
94.41084787812804
95.25364149256991
96.79837460354642
97.99080993034889
98.96586922969918
99.63253431106968
Extensions
This section gives you some ideas to explore in order to learn more about chunksize.
- Develop an example that issues many more tasks, e.g. 100,000 and try different chunk sizes on a log scale between 1 and 100,000. Plot the results.
- Develop an example where there are two types of tasks, short duration tasks and long duration tasks (2x or more of the short duration), then experiment with different chunk sizes.
- Develop an example that experiments with the chunksize for the imap() function.
If you explore any of these examples, let me know. I'd love to see what you come up with.
Takeaways
You now know how to use the chunksize argument when executing multiple tasks with the ThreadPool in Python.