Multiprocessing For-Loop in Python
You can execute a for-loop that calls a function in parallel by creating a new multiprocessing.Process instance for each iteration.
In this tutorial you will discover how to execute a for-loop in parallel using multiprocessing in Python.
Let's get started.
Need a Concurrent For-Loop
Perhaps one of the most common constructs in programming is the loop, such as for-loop.
That is, execute the same task with different data until some condition is met, such as a fixed number of iterations.
...
# execute a task in a for-loop
for i in range(20):
# execute some task
Each iteration of the loop is executed sequentially, one after the other.
This is fine for small loops with very little work to perform each iteration, but becomes a problem when there may be a large amount of work to perform each iteration, such as:
- Encoding a video.
- Zipping a file.
- Parsing a document.
When the tasks are independent (or mostly independent) and slow to execute, we would prefer to execute them in parallel and make use of the multiple CPU cores available in modern systems.
How can we update a for-loop to execute in parallel?
How to Use Multiprocessing For-Loop
We can use the multiprocessing.Process to create a parallel-for loop.
The multiprocessing.Process class allows us to create and manage a new child process in Python.
This can be achieved by creating a Process instance and specifying the function to execute using the "target" argument in the class constructor.
For example:
...
# create a new process to execute a function
process = Process(target=task)
We can then call the start() method to start the new child process and begin executing the target function in the child process.
For example:
...
# start the child process
process.start()
If you are new to executing a function in a new process, this tutorial will help:
This approach can be used to execute a for-loop in parallel.
It requires that each iteration of the loop involves executing a function. If this is not the case, we can refactor the code so the content of the loop is in a new custom function, e.g. called task().
The target function may or may not take arguments required for each step in the loop, such as an integer value in a range.
For example:
...
# execute a function in a for-loop
for i in range(20):
# execute target function
task(i)
Next, we can update the example to create and start a new child process to execute our task() function each iteration.
Any arguments to the target function can be specified as a tuple to the "args" argument of the multiprocessing.Process constructor.
For example:
...
# execute a function in a for-loop
for i in range(20):
# create a new process instance
process = Process(target=task, args=(i,))
# start the process
process.start()
This will execute the loop in parallel using all CPUs.
Once concern may be how to get results out of the target function.
There are many approaches that can be used, such as storing results in a process-safe shared data structure or sending results back to the main process using a queue.
You can learn more about how to return values from a new process in the tutorial:
Now that we know how to execute a for loop in parallel using child processes, let's look at a worked example.
Example of a Multiprocessing For-Loop
In this section we will explore an example of how we can use the multiprocessing.Process to execute a for-loop in parallel.
This will involve first developing an example of executing a task sequentially, just like it may have at the moment, then updating the sequential example to execute tasks in a for-loop in parallel using all CPU cores.
Firstly, let's develop the slower sequential version of the program
Sequential For-Loop Version(slower)
We can develop a program that executes a task in a for-loop sequentially.
In this example the task simulates computational work by first generating a random number between 0 and 1, blocking for a fraction of a second and reporting a message. This task is then executed many times in a loop, providing an integer argument each iteration.
Firstly, we can develop the task function.
The function takes an integer argument, generates the random value, blocks then reports the provided argument and generates random value.
The task() function below implements this.
# execute a task
def task(arg):
# generate a random value between 0 and 1
value = random()
# block for a fraction of a second
sleep(value)
# report a message
print(f'.done {arg}, generated {value}', flush=True)
Note, we will explicitly flush the output buffer when calling print() by setting flush=True.
This is required when calling print in child processes, as we will in the next section.
You can learn more about correctly flushing the output buffer when calling print in child processes in the tutorial:
Next, we can call this function in a loop with a unique integer from 0 to 19.
# protect the entry point
if __name__ == '__main__':
# run tasks sequentially
for i in range(20):
task(i)
print('Done', flush=True)
Tying this together, the complete example is listed below.
# SuperFastPython.com
# execute tasks sequentially in a for loop
from time import sleep
from random import random
# execute a task
def task(arg):
# generate a random value between 0 and 1
value = random()
# block for a fraction of a second
sleep(value)
# report a message
print(f'.done {arg}, generated {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# run tasks sequentially
for i in range(20):
task(i)
print('Done', flush=True)
Running the example executes the target function 20 times in a loop.
Each iteration, a message is reported that includes the task number and the random number that was generated.
On my system, the example took about 9.7 seconds to execute.
Note, the output of the program will differ each time the program is run given the use of random numbers.
.done 0, generated 0.2592232667178327
.done 1, generated 0.38789035599597854
.done 2, generated 0.4429238296998014
.done 3, generated 0.25616373394379677
.done 4, generated 0.9385552994424995
.done 5, generated 0.4467664868139203
.done 6, generated 0.7133623556037814
.done 7, generated 0.8103247486692265
.done 8, generated 0.4715292009220228
.done 9, generated 0.09560488430855196
.done 10, generated 0.5105330898226743
.done 11, generated 0.1563419600953151
.done 12, generated 0.16424534769120125
.done 13, generated 0.0654927844143921
.done 14, generated 0.4368437770498672
.done 15, generated 0.19936852905258873
.done 16, generated 0.27891923055048506
.done 17, generated 0.9927804528392498
.done 18, generated 0.9978627179624324
.done 19, generated 0.9280287164497075
Done
Next, let's look at how we might execute the for-loop in parallel.
Parallel For-Loop Version (faster)
We can update the sequential for-loop version above to execute each iteration of the loop in a new child process.
This can be achieved by first creating a multiprocessing.Process for each iteration of the loop to execute, specifying the function to execute and passing any arguments required. The processes can then be started to begin executing the tasks in parallel. The main process can then wait for all tasks to complete.
No change to the target function is required.
Firstly, we can create a new multiprocessing.Process instance for each iteration of the loop.
This can be performed in a list comprehension to give us a list of Process instances.
...
# create all tasks
processes = [Process(target=task, args=(i,)) for i in range(20)]
Next, we can iterate our list of Process instances and start each in turn.
This will create a new child process in the underlying operating system and execute the target function with the given argument.
...
# start all processes
for process in processes:
process.start()
Finally, the main process can explicitly wait for all new processes to finish.
This can be achieved by joining each process in turn by calling the join() function on each Process instance.
This will block the main process until the given Process has terminated. By repeating this operation on each process in the list, it means that once we finish joining all processes it means all processes have terminated and our tasks are complete.
...
# wait for all processes to complete
for process in processes:
process.join()
If you are new to joining processes, this tutorial will help:
We can then signal that all work was completed successfully.
...
# report that all tasks are completed
print('Done', flush=True)
Tying this together, the complete example is listed below.
# SuperFastPython.com
# execute tasks in parallel in a for loop
from time import sleep
from random import random
from multiprocessing import Process
# execute a task
def task(arg):
# generate a random value between 0 and 1
value = random()
# block for a fraction of a second
sleep(value)
# report a message
print(f'.done {arg}, generated {value}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create all tasks
processes = [Process(target=task, args=(i,)) for i in range(20)]
# start all processes
for process in processes:
process.start()
# wait for all processes to complete
for process in processes:
process.join()
# report that all tasks are completed
print('Done', flush=True)
Running the example first creates one multiprocessing.Process instance for each call to the task() function we wish to execute.
The main process then starts the created processes that actually start new child processes in the underlying operating system to execute the task function in each.
The main process then blocks until all of the issued tasks are complete.
Each task generates a random number, blocks for a fraction of a second, then reports a message.
Once all tasks have completed and their child processes have exited, the main process reports a final message and terminates.
We can see from the program output that because all tasks are executed in parallel, they report their message as soon as each task is finished. This means that the messages are out of order compared to the previous example.
On my system, the example completed in about 1.2 seconds.
That is about 8.5 seconds faster or about 8.08x faster than the sequential version in the previous section.
Note, the output of the program will differ each time the program is run given the use of random numbers.
.done 11, generated 0.01107997576551567
.done 8, generated 0.041118656735705694
.done 19, generated 0.07910007022893262
.done 7, generated 0.28244029644892266
.done 2, generated 0.31037342796826595
.done 1, generated 0.42856475955180384
.done 17, generated 0.34081913523263097
.done 0, generated 0.5519368862510039
.done 15, generated 0.5125772350948196
.done 16, generated 0.5014851525864039
.done 4, generated 0.5932907711735493
.done 13, generated 0.5442291501534473
.done 14, generated 0.7127111612776107
.done 12, generated 0.7724266976056693
.done 18, generated 0.7939959478751412
.done 9, generated 0.8472348164199222
.done 3, generated 0.9571907534245616
.done 5, generated 0.9616059861410663
.done 10, generated 0.9016934830388904
.done 6, generated 0.9916324870990662
Done
Limitations of Creating Processes Each Iteration
In this example we created 20 processes, one for each task, even though we may have far fewer CPU cores in our system.
Ideally, we would create one task per CPU core in our system, or perhaps two per core given that modern CPUs offer hyperthreading.
This could be achieved by executing tasks in batches until all tasks are complete.
For example:
...
# protect the entry point
if __name__ == '__main__':
# define batch size
batch_size = 5
# execute in batches
for i in range(0, 20, batch_size):
# execute all tasks in a batch
processes = [Process(target=task, args=(j,)) for j in range(i, i+batch_size)]
# start all processes
for process in processes:
process.start()
# wait for all processes to complete
for process in processes:
process.join()
# report that all tasks are completed
print('Done', flush=True)
Another approach would be to use a multiprocessing.Semaphore to ensure that only a fixed number of tasks could execute at a time.
For example, a multiprocessing.Semaphore instance can be created in the main process, then provided to each task() function. The task must then acquire the semaphore before executing, ensuring a fixed number of tasks executed in parallel and the remainder waiting for a position to become available.
# execute a task
def task(arg, shared_semaphore):
# acquire the semaphore
with shared_semaphore:
# generate a random value between 0 and 1
value = random()
# block for a fraction of a second
sleep(value)
# report a message
print(f'.done {arg}, generated {value}', flush=True)
You can learn more about how to use a semaphore in the tutorial:
Another limitation is that we have to create a new Process instance each iteration.
A better approach would be to reuse Process instances for each task and to maintain a pool of running worker processes, then provide tasks to the pool of workers as they become available. This is more efficient as we don't have to continually create and destroy Process instances and it allows the number of concurrent workers to be limited to match the number of physical or logical CPU cores.
This is called a process pool and is provided via the multiprocessing.Pool class which you can learn more about in this tutorial:
Takeaways
You now know how to update a for-loop to execute in parallel using a multiprocessing.Process instance.