Parallel For-Loop With a Multiprocessing Pool

August 15, 2022 Python Multiprocessing Pool

You can convert a for-loop to be parallel using the multiprocessing.Pool class.

In this tutorial you will discover how to convert a for-loop to be parallel using the multiprocessing pool.

Let's get started.

Need to Make For-Loop Parallel

You have a for-loop and you want to execute each iteration in parallel using a separate CPU core.

This is a common situation.

The loop involves performing the same operation multiple times with different data.

For example:

...
# perform the same operation on each item
for item in items:
	# perform operation on item...

It most commonly involves calling the same function each iteration with different arguments.

For example:

...
# call the same function each iteration with different data
for item in items:
	# call function with one data item
	task(item)

Alternately, we might want the results from calling the same function multiple times with different data.

This is often achieved using a list comprehension.

For example:

...
# get results from applying the same function to different data
results = [task(item) for item in items]

Each iteration of the for-loop is performed sequentially. One after the other on one CPU core.

This is a problem because we may have many CPU cores, such as 2, 4, 8 or even 32.

How can we convert a for-loop to be parallel?

Prerequisite (prepare your code before we make it parallel)

Before we dive into discovering how to make for-loops parallel, there is a prerequisite.

You will need to refactor your code so that each iteration of your loop calls the same target function each iteration with different arguments.

This means you will need to create a new target function that takes one (or more) arguments with the specific data to operate on each iteration.

For example, your for-loop may look as follows:

...
# slow for loop executed sequentially
for item in items:
	# do one thing using item
	# do another thing using item
	...

You can convert into a loop that calls a target function each iteration that takes the unique data each iteration as an argument.

For example:

# task that operates on an item
def task(item):
	# do one thing using item
	# do another thing using item

# slow for loop executed sequentially
for item in items:
	task(item)

Generally, the function should be self-contained.

It should not use any shared resources to avoid possible concurrency failure modes like race conditions.

It should be a function with no side effects, taking data as arguments and returning any results via return values.

If this is not the case, you must refactor your code so that this is the case, otherwise you may have to contend with mutex locks and such which will make everything a whole lot harder.

Next, let's look at how we can execute for loops in parallel using all CPU cores.

How To Make For-Loop Parallel With Pool

We can make a for-loop parallel using the multiprocessing pool.

A process pool is a programming pattern for automatically managing a pool of worker processes.

The multiprocessing.Pool class provides a process pool with helpful functions for executing for loops in parallel.

An instance of the Pool class can be created and by default it will use all available CPU cores. Once we are finished with it, we can close it to release the worker processes.

For example:

...
# create a process pool that uses all cpus
pool = multiprocessing.Pool()
# ...
# close the process pool
pool.close()

We can use the context manager interface if we only need to use the pool for one for-loop. This will ensure the pool is closed for us automatically once we are finished using it, even if a task raises an exception.

For example:

...
# create a process pool that uses all cpus
with multiprocessing.Pool() as pool:
	# ...

This is the recommended usage of the process pool.

You can learn more about how to use the multiprocessing pool context manager in the tutorial:

The multiprocessing pool provides a parallel version of the built-in map() function that will call the same function for each item in a provided iterable, e.g. same function with different data.

For example:

...
# create a process pool that uses all cpus
with multiprocessing.Pool() as pool:
	# call the function for each item in parallel
	pool.map(task, items)

This will traverse the "items" iterable and issue one call to the task() function for each item and return once all tasks have completed.

The map() function returns an iterable of return values from the target function which can be stored or iterated directly, if our target function has a return value.

For example:

...
# create a process pool that uses all cpus
with multiprocessing.Pool() as pool:
	# call the function for each item in parallel
	for result in pool.map(task, items):
		print(result)

You can learn more about how to use the map() function in the tutorial:

The map() function on the multiprocessing pool only takes a single argument.

If our target function takes more than one argument, we can use the starmap() function instead.

It takes an iterable of iterable, where each nested iterable provides arguments for one call to the target task function.

For example:

...
# create a process pool that uses all cpus
with multiprocessing.Pool() as pool:
	# prepare arguments for reach call to target function
	items = [(1,2), (3,4), (5,6)]
	# call the function for each item in parallel with multiple arguments
	for result in pool.starmap(task, items):
		print(result)

You can learn more about how to use the starmap() function in the tutorial:

A problem with both map() and starmap() is they do not return their iterable of return values until all tasks have completed.

Instead we can use the imap() function. This will only issue tasks to the multiprocessing pool once a worker becomes available, and will yield a return value as tasks are completed.

This is helpful in order to save memory when issuing tasks, if we have thousands or millions of tasks to issue. It is also more responsive as we can process or show results as tasks finish rather than after all tasks have completed.

For example:

...
# create a process pool that uses all cpus
with multiprocessing.Pool() as pool:
	# call the function for each item in parallel, get results as tasks complete
	for result in pool.imap(task, items):
		print(result)

You can learn more about how to use the imap() function in the tutorial:

This was a crash course in using the multiprocessing pool.

Now that we know how to convert our for-loops to be parallel, let's look at some worked examples that you can use as templates for your own project.

Example of Parallel For-Loop with map()

We can explore how to use map() to execute for-loops in parallel.

In this example we will use a simple task function as a placeholder that you can replace with your own task function to be called each loop iteration.

In this case, our task will take one integer argument to the task. It will then generate a random number between 0 and 1, then sleep for a fraction of a simulate work. Finally, it will either return the generated value as a result for the task.

The task() function below implements this.

# task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value

Let's dive-in.

Serial For-Loop with map()

Before we dive into parallel for-loops with map(), let's start with a sequential for-loop.

We can use the built-in map() function to call our task function each iteration.

Each function call will be executed sequentially, one after the other on one CPU core.

The complete example is listed below.

# SuperFastPython.com
# example of a sequential for loop
from time import sleep
from random import random

# a task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value

# entry point for the program
if __name__ == '__main__':
    # call the same function with different data sequentially
    for result in map(task, range(10)):
        # report the value to show progress
        print(result)

Running the example calls the same task() function for each value in the iterable form 0 to 9.

Different random numbers are generated each time the code is run.

Each call to task() happens sequentially, and the entire example takes about 4-5 seconds, depending on the specific numbers generated.

0.12199618835333303
0.5212357710994685
0.7417626978125494
0.0450874955439563
0.4708361362483452
0.46524023376998846
0.32858199011122213
0.468263433349433
0.3856435694592504
0.995716230169814

Next, let's look at converting the example to execute in parallel using the multiprocessing pool.

Parallel For-Loop with map()

We can update the previous example so that each call to the task() function is executed in parallel using all available CPU cores.

For example, if we have 4 CPU cores, this might be seen as 8 logical CPU cores after hyperthreading. This means our system would be able to execute 8 calls to task() in parallel, then as calls complete the remaining calls to task() will be executed.

First, we can create the multiprocessing pool configured by default to use all available CPU cores.

...
# create the process pool
with Pool() as pool:
	# ...

Next, we can call the map() function as before and iterate the result values, although this time the map() function is a method on the multiprocessing pool.

...
# call the same function with different data in parallel
for result in pool.map(task, range(10)):
    # report the value to show progress
    print(result)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a parallel for loop
from time import sleep
from random import random
from multiprocessing import Pool

# task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value

# entry point for the program
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # call the same function with different data in parallel
        for result in pool.map(task, range(10)):
            # report the value to show progress
            print(result)

Running the example first creates the multiprocessing pool with one worker per logical CPU in the system.

All ten calls to the task() function are issued to the process pool and it completes them as fast as it is able.

Once all tasks are completed, an iterable of return values is returned from map() and traversed, reporting the values that were generated.

The loop completes in about one second, depending on the specific random numbers generated.

0.5884491047270914
0.6946893602558559
0.7169680024270103
0.9433207432796479
0.5374063478473085
0.49870576951245515
0.5229616564751852
0.250982204375086
0.670282828432953
0.44766416300786493

Next, let's look at the same example if we did not have return values.

Parallel For-Loop with map() and No Return Values

We can update the task() function to no longer return a result.

In that case, we do not need to traverse the iterable of return values from the map() function in the caller.

First, we can update the task function to not return the value and instead to print the generated value directly.

# task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # # report the value to show progress
    print(f'{arg} got {value}', flush=True)

Note, print messages from child worker processes do not appear immediately by default, but are instead buffered until the child process is terminated.

We can force messages to show immediately, by setting the "flush" argument to True.

You can learn more about printing messages from child processes in the tutorial:

We can then call map() with our task function and arguments and not iterate the results.

...
# create the process pool
with Pool() as pool:
    # call the same function with different data in parallel
    pool.map(task, range(10))

Tying this together, a complete example with this change is listed below.

# SuperFastPython.com
# example of a parallel for loop with no return values
from time import sleep
from random import random
from multiprocessing import Pool

# task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # # report the value to show progress
    print(f'{arg} got {value}', flush=True)

# entry point for the program
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # call the same function with different data in parallel
        pool.map(task, range(10))

Running the example creates the multiprocessing pool and issues all ten tasks.

The tasks execute and report their messages.

Once all tasks have completed, the map() function returns and the caller continues on closing the process pool automatically.

3 got 0.07495290707222657
1 got 0.3551532315681727
5 got 0.5775819151509843
0 got 0.6205673200232464
7 got 0.6050404433157823
8 got 0.5876738799350407
2 got 0.6776107991833116
4 got 0.7459149152853299
6 got 0.8483482745906772
9 got 0.7278120326654482

Next, let's explore how to make a for-loop parallel where the target function has multiple arguments.

Example of Parallel For-Loop with starmap()

A limitation of the map() function is that it only takes one argument.

If our target task function requires multiple arguments, we can use the starmap() function instead.

The iterable passed to the starmap() function must have an iterable for each call to the task function containing the arguments for the function call.

That is, an iterable of iterables, such as a list of tuples, where each tuple is a set of arguments for one function call.

Let's demonstrate this with a worked example.

We can update the task() function to take three arguments.

# task to execute in another process
def task(arg1, arg2, arg3):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value

We can then prepare an iterable of iterables as arguments for each function call.

In this case, we will create a list of tuples, where each tuple has 3 arguments for a single call to task().

This can be achieved using a list comprehension.

...
# prepare arguments
items = [(i, i*2, i*3) for i in range(10)]

We can then call the starmap() function to execute the for-loop in parallel and report results.

...
# call the same function with different data in parallel
for result in pool.starmap(task, items):
    # report the value to show progress
    print(result)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a parallel for loop with multiple arguments
from time import sleep
from random import random
from multiprocessing import Pool

# task to execute in another process
def task(arg1, arg2, arg3):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value

# entry point for the program
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # prepare arguments
        items = [(i, i*2, i*3) for i in range(10)]
        # call the same function with different data in parallel
        for result in pool.starmap(task, items):
            # report the value to show progress
            print(result)

Running the example first creates the multiprocessing pool with one worker per logical CPU in the system.

All ten calls to the task() function are issued to the process pool, each with three arguments.

Once all tasks are completed, an iterable of return values is returned from starmap() and traversed, reporting the values that were generated.

The loop completes in about one second, depending on the specific random numbers generated.

0.3649786159984052
0.48469386498153644
0.3880300260728501
0.11865254960654414
0.42234122971676635
0.8348296520018135
0.1768696329327919
0.043473834132026656
0.4244687958601574
0.2971373822352351

Next, let's look at how we can use imap() to make our programs more responsive.

Example of Parallel For-Loop with imap()

A limitation of map() and starmap() is that they traverse the provided iterable immediately and issue all function calls to the multiprocessing pool.

This may use a lot of memory if there are thousands or millions of tasks.

Additionally, the iterable of return values is not returned until all tasks have completed. This can be problematic if we would prefer to show progress as tasks are completed.

The imap() function overcomes these limitations by only issuing tasks one at a time as workers become available and yielding return values as tasks complete.

The example below demonstrates a parallel for-loop using the more responsive imap() function.

# SuperFastPython.com
# example of a parallel for loop that is more responsive
from time import sleep
from random import random
from multiprocessing import Pool

# task to execute in another process
def task(arg):
    # generate a value between 0 and 1
    value = random()
    # block for a fraction of a second to simulate work
    sleep(value)
    # return the generated value
    return value

# entry point for the program
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # call the same function with different data in parallel
        for result in pool.imap(task, range(10)):
            # report the value to show progress
            print(result)

Running the example first creates the multiprocessing pool with one worker per logical CPU in the system.

Tasks are issued to the process pool until all workers are occupied. Remaining tasks are only issued once workers become available.

The imap() function returns an iterable immediately and begins yielding results as soon as tasks are completed. Return values are returned in the order that tasks were issued to the process pool.

0.835257932028165
0.23239313640799752
0.9100656574752977
0.541963403413606
0.771756987041196
0.26518666284928805
0.7566356594123003
0.5955443214316828
0.8208877725107132
0.018073556089019838

Note, if the order of return values does not matter, the imap_unordered() function can be used instead, which may be even more responsive.

You can learn more about how to use the imap_unordered() function in the tutorial:

Common Questions

This section lists common questions about making for-loops parallel using the multiprocessing pool.

Do you have any questions?
Share your question in the comments and I may add it to this section.

How To Set The Number of Workers

You can set the number of workers via the "processes" argument to the multiprocessing.Pool class.

For example:

...
# create a process pool with 4 workers
pool = multiprocessing.Pool(processes=4)

Because it is the first argument, you can omit the argument name.

For example:

...
# create a process pool with 4 workers
pool = multiprocessing.Pool(4)

You can learn more about how to set the number of workers in the tutorial:

How Many Workers Should I Use?

Use the default, which is one worker per logical CPU.

For example:

...
# create a process pool with one worker per logical CPU
pool = multiprocessing.Pool()

Shouldn't I Set Number of Workers to Be 1 Less Than The Number of CPUs?

You can if you want, but probably not.

It only makes sense to do this if you have issued tasks to the multiprocessing pool asynchronously (e.g. using map_async()) and then intend to perform computational intensive work in the calling process.

This topic is covered in more detail in the tutorial:

What is a Logical vs Physical CPU?

Modern CPUs typically make use of a technology called hyperthreading.

Hyperthreading does not refer to a program using threads. Instead, it refers to a technology within the CPU cores themselves that allows each physical core or CPU to act as if it were two logical cores or two CPUs.

It provides automatic in-core parallelism that can offer up to a 30% speed-up over CPU cores that do not offer the technology.

As such, when we count CPU cores in a system, we typically count the number of logical CPU cores, not the number of physical CPU cores.

If you know your system uses hyperthreading (it probably does), then you can get the number of physical CPUs in your system by dividing the number of logical CPUs by two.

How Many CPUs Do I Have?

There are a number of ways to get the number of CPUs in Python.

The two pure Python approaches provided by the standard library are:

For example:

...
# get the number of logical cpu cores
n_cores = multiprocessing.cpu_count()

This will report the number of logical CPUs in your system.

You can learn more about getting the number of CPUs in your system in the tutorial:

What If I Need to Access a Shared Resource?

You may need to access a shared resource.

If your task functions are just reading data, then it should be fine.

If your task functions are writing data, then you may need a mutex lock to ensure that only one process can change the shared resource at a time.

This is to avoid a race condition.

You can learn more about how to use a mutex lock with the multiprocessing pool in the tutorial:

What Types of Tasks Can Be Made Parallel?

All types.

The multiprocessing pool is probably well suited to computationally intensive tasks that receive or return only very small amounts of data.

For IO-bound tasks, use thread-based concurrency instead, such as the ThreadPoolExecutor or the ThreadPool.

For tasks that receive or return a lot of data, consider having the processes store the data somewhere for the caller to retrieve later, such as in a file, database, or similar. This is to avoid the computational cost of serializing (pickling) the data for transmission between Python processes.

Takeaways

You now know how to convert a for-loop to be parallel using the multiprocessing pool.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.