Last Updated on November 22, 2022
You can execute tasks in batches using the “chunksize” argument when using the Pool map() method.
In this tutorial you will discover the chunksize argument when executing multiple tasks with the multiprocessing pool in Python.
Let’s get started.
Problem With Issuing Many Tasks to the Pool
The multiprocessing pool allows us to issue many tasks to the process pool at once.
This can be achieved by calling a function like Pool.map() to apply the same function to each item in an iterable and wait for the results, or with a function like Pool.map_async() that does the same thing asynchronously.
A problem when executing a large number of tasks in the multiprocessing pool is that each task must be managed which is slow.
Specifically:
- Each task must be issued to the pool and added to an internal queue before being dispatched to a worker process.
- The arguments for each task must be serialized (automatically) using pickle in order to be transmitted to the worker process.
- After completed, the return value of the task must be serialized (automatically) using pickle in order to be transmitted back to the caller process.
- The return value for each task must be stored in an internal queue of results for the caller to retrieve.
These activities are repeated for each task issued to the pool, adding a small computational overhead and a slight delay.
When hundreds, thousands, or millions of tasks are issued to the multiprocessing pool, the computational overhead and associated delay can really add up.
Thankfully, the multiprocessing pool provides a way to group tasks into batches or chunks in an attempt to reduce this overhead.
This can be achieved using a “chunksize” argument when issuing tasks.
Run loops using all CPUs, download your FREE book to learn how.
What is Chunksize
The “chunksize” is an argument specified in a function to the multiprocessing pool when issuing many tasks.
It controls the mapping of tasks issued to the pool (e.g. calls to a target function with one or more arguments), to internal tasks that are transmitted to child worker processes in the pool to be executed and that return a result.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
— multiprocessing — Process-based parallelism
The chunksize allows:
- Tasks to be transmitted to child workers in batch.
- A batch of tasks to be executed by a child worker.
- Return values generated by a task to be transmitted back to the caller in batch.
Perhaps a better name for the argument would be “batch size“.
What Functions Support Chunksize
There are 6 functions in the multiprocessing pool that support the “chunksize” argument when issuing multiple tasks.
They are:
- Pool.map()
- Pool.map_async()
- Pool.imap()
- Pool.imap_unordered()
- Pool.starmap()
- Pool.starmap_async()
You can learn more about the differences between these functions in the tutorial:
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
What is the Default Chunksize?
The multiprocessing pool “chunksize” has the default value of None.
In the case of Pool.imap() and Pool.imap_unordered(), the default chunksize is 1.
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.
— multiprocessing — Process-based parallelism
In the case of Pool.map(), Pool.starmap() and the asynchronous versions of these functions, the default is not 1.
Instead, when chunksize is set to None, a default chunksize is calculated based on the length of the provided iterable.
We can see this if we review the code for the multiprocessing.Pool class.
The snippet of code below is taken from the Pool class where the default chunksize is calculated.
1 2 3 4 5 |
... if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 |
The calculation involves dividing the number of items in the iterable (e.g. the number of tasks issued) by the number of worker child processes multiplied by four.
If there is a remainder, then one is added to the chunksize, to round up for extra tasks.
You might recall that the built-in divmod() function will return the result of dividing one number by another (quotient) as well as the remainder.
We can demonstrate this calculation with a small example.
Consider a multiprocessing pool with 2 child worker processes and a call to map() with an iterable containing 500,000 items.
The chunksize in this situation would be calculated as follows:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- chunksize, extra = divmod(500,000, 2 * 4)
- chunksize, extra = divmod(500,000, 8)
- chunksize, extra = 62500, 0
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How To Configure Chunksize
We can configure a custom chunksize value.
We must assume that the calculation used for the default chunksize was implemented because it works well in most situations.
This means that any custom chunksize we set should be compared to the default to ensure it improves performance.
In fact, there are no good general guidelines for how to set the “chunksize” argument.
Instead, I recommend using some trial and error in order to discover what works well for your specific applications.
Some values to explore might be:
- No Chunksize: chunksize=1
- Default Chunksize: chunksize=None (default chunksize)
- Even Chunksize: chunksize = len(iterable) / len(pool._pool)
Alternatively, you could try a log scale (orders of magnitude) between 1 and the number of items in the provided iterable to help quickly flush out a value that works well.
For example:
- chunksize=1
- chunksize=10
- chunksize=100
- chunksize=1000
- chunksize=10000
- and so on…
Now that we are familiar with chunksize, let’s look at some worked examples.
Example With No Chunking (chunksize=1)
We can explore the effect of a chunksize of one.
In this example, we will define a simple task that takes an argument, blocks for a moment to simulate work, and returns a value. We can then issue many calls to this function with different arguments and set a chunksize of one and consider how long the example takes to execute. This will provide a point of comparison with other chunksize values later.
Firstly, we can define a function to execute on the multiprocessing pool.
The function will take an integer argument, will generate a random value between 0 and 1, block for between 1 and 2 seconds, and then returns the generated value added to the provided argument.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task to execute in a separate process def task(arg): # generate a value value = random() # block for a fraction of a second sleep(1 + value) # return generated value combined with argument return arg + value |
Next, in the main process will create a multiprocessing pool using the default configuration of all logical CPUs in the system.
You can learn more about the default configuration of the multiprocessing pool in the tutorial:
We will use the context manager interface to ensure the pool is closed automatically for us once we’re finished.
1 2 3 4 |
... # create the multiprocessing pool with Pool() as pool: # ... |
You can learn more about the context manager interface for the pool in the tutorial:
Next, we will issue 100 calls to our custom task() function with integer values between 0 and 99.
We will use the synchronous map() function to issue the tasks and a chunksize of 1 and iterate the return values once all tasks have completed.
1 2 3 4 5 |
... # issue tasks and process results for result in pool.map(task, range(100), chunksize=1): # report the result print(result) |
You can learn more about the Pool.map() function in the tutorial:
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example with no chunking, e.g. chunksize=1 from time import sleep from random import random from multiprocessing import Pool # task to execute in a separate process def task(arg): # generate a value value = random() # block for a fraction of a second sleep(1 + value) # return generated value combined with argument return arg + value # protect the entry point if __name__ == '__main__': # create the multiprocessing pool with Pool() as pool: # issue tasks and process results for result in pool.map(task, range(100), chunksize=1): # report the result print(result) |
Running the example first creates the multiprocessing pool with a default configuration.
Next, the 100 tasks are issued to the multiprocessing pool with a chunksize of 1.
Tasks are issued to workers one at a time, executed, and the result returned.
No batching of issued tasks to internal tasks sent to workers is performed.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 19.9 seconds to complete on my system.
1 2 3 4 5 6 7 8 9 10 11 |
... 90.00666396577743 91.78748090488585 92.54642553173721 93.89105290250727 94.1142948528356 95.32983425821804 96.57469861761551 97.04100646174298 98.55079918540376 99.46373262845606 |
Next, let’s look at an example of using the default chunksize.
Example With Default Chunksize (chunksize=None)
We can explore using the default chunksize used by map(), starmap() and their asynchronous versions.
In this example, we can update the previous example to set the “chunksize” argument to None, the same as omitting the chunksize argument completely.
For example:
1 2 3 4 5 |
... # issue tasks and process results for result in pool.map(task, range(100), chunksize=None): # report the result print(result) |
This will cause the multiprocessing pool to calculate and use a default chunksize.
I have 8 logical CPUs in my system, therefore 8 child workers by default. Therefore, on my system the default chunksize in this case will be calculated as:
- chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
- chunksize, extra = divmod(100, 8 * 4)
- chunksize, extra = divmod(100, 32)
- chunksize, extra = 3, 4
Given that there is a remainder, 1 is added to the chunksize to give:
- chunksize = 3 + 1
- chunksize = 4
That means the 100 issued tasks will be mapped to 25 internal tasks.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example with the default chunksize from time import sleep from random import random from multiprocessing import Pool # task to execute in a separate process def task(arg): # generate a value value = random() # block for a fraction of a second sleep(1 + value) # return generated value combined with argument return arg + value # protect the entry point if __name__ == '__main__': # create the multiprocessing pool with Pool() as pool: # issue tasks and process results for result in pool.map(task, range(100), chunksize=None): # report the result print(result) |
Running the example first creates the multiprocessing pool with a default configuration.
Next, the 100 tasks are issued to the multiprocessing pool with a default chunksize of 4.
Tasks are issued to workers in batches of four, executed, and the result returned.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 22.0 seconds to complete on my system, worse than using a chunksize of 1.
The default chunksize does not help in this case.
1 2 3 4 5 6 7 8 9 10 11 |
... 90.72061963275682 91.51395925172658 92.44189834307795 93.1294542451108 94.7868929918878 95.19862834343039 96.07772741723383 97.69995047391141 98.31798025450472 99.62925679159557 |
Next, let’s explore the effect of using an even chunksize.
Example With Even Chunksize (chunksize=n_tasks/n_workers)
We can explore the effect of using an even chunksize, that is an even division of the number of tasks to child worker processes.
In this case, we can update the previous example to first calculate a chunksize based on the number of items in the iterable and the number of child worker processes in the pool.
1 2 3 4 5 |
... # number of tasks to execute n_tasks = 100 # chunksize to use n_tasks_per_chunk = ceil(n_tasks / len(pool._pool)) |
Recall that the math.ceil() function rounds a floating point number up to the next integer.
On my system, I have 8 logical CPUs, therefore 8 child workers by default.
Therefore, the even chunksize in this case will be:
- n_tasks_per_chunk = ceil(n_tasks / len(pool._pool))
- n_tasks_per_chunk = ceil(100 / 8)
- n_tasks_per_chunk = ceil(12.5)
- n_tasks_per_chunk = 13
We will then report the values as confirmation.
1 2 3 |
... # chunksize to use n_tasks_per_chunk = ceil(n_tasks / len(pool._pool)) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example with the even chunksize from math import ceil from time import sleep from random import random from multiprocessing import Pool # task to execute in a separate process def task(arg): # generate a value value = random() # block for a fraction of a second sleep(1 + value) # return generated value combined with argument return arg + value # protect the entry point if __name__ == '__main__': # create the multiprocessing pool with Pool() as pool: # number of tasks to execute n_tasks = 100 # chunksize to use n_tasks_per_chunk = ceil(n_tasks / len(pool._pool)) # report details for reference print(f'chunksize={n_tasks_per_chunk}, n_workers={len(pool._pool)}') # issue tasks and process results for result in pool.map(task, range(100), chunksize=n_tasks_per_chunk): # report the result print(result) |
Running the example first creates the multiprocessing pool with a default configuration.
Next, the 100 tasks are issued to the multiprocessing pool with a specified chunksize of 13.
Tasks are issued to workers in batches of 13, executed, and the result returned.
All tasks are completed, then the iterable of return values is returned to the caller and traversed, reporting all values.
A truncated example of the output is provided below. Note, the results will differ each time the example is run because of the use of random numbers.
In this case, the example took about 20.3 seconds to complete on my system, still worse than using a chunksize of 1, although slightly better than the default chunksize in this case.
1 2 3 4 5 6 7 8 9 10 11 12 |
chunksize=13, n_workers=8 ... 90.9139613348572 91.29627702972807 92.44633424109573 93.71585909166346 94.27121635373051 95.05300459383739 96.76344007460122 97.70486914449164 98.74067104900065 99.05785271152739 |
Extensions
This section gives you some ideas to explore in order to learn more about chunksize.
- Develop an example that issues many more tasks, e.g. 100,000 and try different chunksizes on a log scale between 1 and 100,000. Plot the results.
- Develop an example where there are two types of tasks, short duration tasks and long duration tasks (2x or more of the short duration), then experiment with different chunk sizes.
- Develop an example that experiments with the chunksize for the imap() function.
If you explore any of these examples, let me know. I’d love to see what you come up with.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use the chunksize argument when executing multiple tasks with the multiprocessing pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Austin Neill on Unsplash
A.A. says
This website has been super-helpful in my development of a multi-threaded application I’m writing to translate words and short phrases on the order of thousands of words.
Given the following:
Where thread_inputs is a list of strings to translate (these lists are typically 1000 to 3000 strings long).
The translate function attempts to translate the word using several different methods until it gets a result, and failing that returns the original text.
The problem I’m facing is sometimes the whole application freezes non-deterministically, usually when I set c.THREAD_CONCURRENCY to a number higher than 10. I can use a debugger if I set the concurrency to 1 but the problem doesn’t seem to occur in these cases or I’d have to run it for too long to encounter the problem.
I am guessing that a thread isn’t returning and therefore blocking everything. Is there a way to specify a timeout for threads with a callback to a function that will write the exception to the log?
Jason Brownlee says
I believe that if one of the calls within map() fails, the exception will be propagated to the main thread.
Perhaps test with a mock task? See if you can reproduce the fault without business logic.
Perhaps check the chunksize vs the number of tasks issued, if there is a mismatch, e.g. there are only 20 tasks, it will create a bottleneck.
Let me know how you go.