Concurrent For-Loop With a ThreadPool in Python
You can convert a for-loop to be concurrent using the ThreadPool class.
In this tutorial, you will discover how to convert a for-loop to be concurrent using the ThreadPool.
Let's get started.
Need to Make For-Loop Concurrent
You have a for-loop and you want to execute each iteration concurrently.
This is a common situation.
The loop involves performing the same operation multiple times with different data.
For example:
...
# perform the same operation on each item
for item in items:
# perform operation on item...
It most commonly involves calling the same function each iteration with different arguments.
For example:
...
# call the same function each iteration with different data
for item in items:
# call function with one data item
task(item)
Alternatively, we might want the results from calling the same function multiple times with different data.
This is often achieved using a list comprehension.
For example:
...
# get results from applying the same function to different data
results = [task(item) for item in items]
Each iteration of the for-loop is performed sequentially. One after the other on one CPU core.
This is a problem because we may have many CPU cores, such as 2, 4, 8 or even 32.
How can we convert a for-loop to be concurrent?
Prerequisite (prepare your code before we make it concurrent)
Before we dive into discovering how to make for-loops concurrently, there is a prerequisite.
You will need to refactor your code so that each iteration of your loop calls the same target function each iteration with different arguments.
This means you will need to create a new target function that takes one (or more) arguments with the specific data to operate on each iteration.
For example, your for-loop may look as follows:
# slow for loop executed sequentially
for item in items:
# do one thing using item
# do another thing using item
...
You can convert it into a loop that calls a target function each iteration that takes the unique data each iteration as an argument.
For example:
# task that operates on an item
def task(item):
# do one thing using item
# do another thing using item
# slow for loop executed sequentially
for item in items:
task(item)
Generally, the function should be self-contained.
It should not use any shared resources to avoid possible concurrency failure modes like race conditions.
It should be a function with no side effects, taking data as arguments and returning any results via return values.
If this is not the case, you must refactor your code so that this is the case, otherwise, you may have to contend with mutex locks and such which will make everything a whole lot harder.
Next, let's look at how we can execute for loops concurrently using a ThreadPool.
How To Make For-Loop Concurrent With ThreadPool
We can make a for-loop concurrent using the ThreadPool class.
A thread pool is a programming pattern for automatically managing a pool of worker threads.
The ThreadPool class provides a thread pool with helpful functions for executing for loops concurrently.
An instance of the ThreadPool class can be created and used to issue IO-bound tasks that will run concurrently. Once we are finished with it, we can close it to release the worker threads.
For example:
...
# create a thread pool
pool = ThreadPool()
# ...
# close the thread pool
pool.close()
We can use the context manager interface if we only need to use the pool for one for-loop. This will ensure the pool is closed for us automatically once we are finished using it, even if a task raises an exception.
For example:
...
# create a thread pool
with ThreadPool() as pool:
# ...
This is the recommended usage of the ThreadPool.
You can learn more about how to use the ThreadPool context manager in the tutorial:
The ThreadPool provides a concurrent version of the built-in map() function that will call the same function for each item in a provided iterable, e.g. same function with different data.
For example:
...
# create a thread pool
with ThreadPool() as pool:
# call the function for each item concurrently
pool.map(task, items)
This will traverse the "items" iterable and issue one call to the task() function for each item and return once all tasks have been completed.
The map() function returns an iterable of return values from the target function which can be stored or iterated directly if our target function has a return value.
For example:
...
# create a thread pool
with ThreadPool() as pool:
# call the function for each item concurrently
for result in pool.map(task, items):
print(result)
You can learn more about how to use the map() method in the tutorial:
The map() method on the ThreadPool only takes a single argument.
If our target function takes more than one argument, we can use the starmap() method instead.
It takes an iterable of iterable, where each nested iterable provides arguments for one call to the target task function.
For example:
...
# create a thread pool
with ThreadPool() as pool:
# prepare arguments for reach call to target function
items = [(1,2), (3,4), (5,6)]
# call the function for each item concurrently with multiple arguments
for result in pool.starmap(task, items):
print(result)
You can learn more about how to use the starmap() method in the tutorial:
A problem with both map() and starmap() is they do not return their iterable or return values until all tasks have been completed.
Instead, we can use the imap() method. This will only issue tasks to the ThreadPool once a worker becomes available and will yield a return value as tasks are completed.
This is helpful in order to save memory when issuing tasks if we have thousands or millions of tasks to issue. It is also more responsive as we can handle or show results as tasks finish rather than after all tasks have been completed.
For example:
...
# create a thread pool
with ThreadPool() as pool:
# call the function for each item concurrently, get results as tasks complete
for result in pool.imap(task, items):
print(result)
You can learn more about how to use the imap() function in the tutorial:
This was a crash course in using the Python ThreadPool class.
For more information on how to use the ThreadPool, see the tutorial:
Now that we know how to convert our for-loops to be concurrent, let's look at some worked examples that you can use as templates for your own project.
Example of Concurrent For-Loop with map()
We can explore how to use map() to execute for-loops concurrently.
In this example, we will use a simple task function as a placeholder that you can replace with your own task function to be called for each loop iteration.
In this case, our task will take one integer argument to the task. It will then generate a random number between 0 and 1, then sleep for a fraction of a second. Finally, it will either return the generated value as a result of the task.
The task() function below implements this.
# task to execute in another thread
def task(arg):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# return the generated value
return value
Let's dive in.
Serial For-Loop with map()
Before we dive into concurrent for-loops with map(), let's start with a sequential for-loop.
We can use the built-in map() function to call our task function each iteration.
Each function call will be executed sequentially, one after the other on one CPU core.
The complete example is listed below.
# SuperFastPython.com
# example of a sequential for loop
from time import sleep
from random import random
# a task to execute in another thread
def task(arg):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# return the generated value
return value
# entry point for the program
if __name__ == '__main__':
# call the same function with different data sequentially
for result in map(task, range(10)):
# report the value to show progress
print(result)
Running the example calls the same task() function for each value in the iterable from 0 to 9.
Different random numbers are generated each time the code is run.
Each call to task() happens sequentially, and the entire example takes about 4-5 seconds, depending on the specific numbers generated.
0.5630244862672978
0.2836364390877101
0.9745080925600337
0.29594912197249446
0.5231057287082254
0.37694481629598386
0.43459465386042007
0.46378398936169984
0.8335450323832597
0.18162938199256062
Next, let's look at converting the example to execute concurrently using the ThreadPool.
Concurrent For-Loop with map()
We can update the previous example so that each call to the task() function is executed concurrently.
First, we can create the ThreadPool configured by default to use all available CPU cores.
...
# create the thread pool
with ThreadPool() as pool:
# ...
Next, we can call the map() function as before and iterate the result values, although this time the map() function is a method on the ThreadPool instance.
...
# call the same function with different data concurrently
for result in pool.map(task, range(10)):
# report the value to show progress
print(result)
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of a concurrent for loop
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in another thread
def task(arg):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# return the generated value
return value
# entry point for the program
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# call the same function with different data concurrently
for result in pool.map(task, range(10)):
# report the value to show progress
print(result)
Running the example first creates the ThreadPool with one worker per logical CPU in the system.
All ten calls to the task() function are issued to the ThreadPool and it completes them as fast as it is able.
Once all tasks are completed, an iterable of return values are returned from map() and traversed, reporting the values that were generated.
The loop completes in about one second, depending on the specific random numbers generated.
0.38921437480525056
0.00274209022910743
0.47380078292456784
0.48057613702571955
0.3369157832526938
0.8032938130252197
0.054295951305564416
0.5250676587109085
0.15348015617252353
0.760021963316182
Next, let's look at the same example if we did not have return values.
Concurrent For-Loop with map() and No Return Values
We can update the task() function to no longer return a result.
In that case, we do not need to traverse the iterable of return values from the map() function in the caller.
First, we can update the task function to not return the value and instead to print the generated value directly.
# task to execute in another thread
def task(arg):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# # report the value to show progress
print(f'{arg} got {value}')
We can then call map() with our task function and arguments and not iterate the results.
...
# create the thread pool
with ThreadPool() as pool:
# call the same function with different data concurrently
pool.map(task, range(10))
Tying this together, a complete example of this change is listed below.
# SuperFastPython.com
# example of a concurrent for loop with no return values
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in another thread
def task(arg):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# # report the value to show progress
print(f'{arg} got {value}')
# entry point for the program
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# call the same function with different data concurrently
pool.map(task, range(10))
Running the example creates the ThreadPool and issues all ten tasks.
The tasks execute and report their messages.
Once all tasks have been completed, the map() function returns and the caller continues on closing the ThreadPool automatically.
7 got 0.013600947003163277
5 got 0.10815249596520748
4 got 0.15002452030422675
1 got 0.15964744042261703
8 got 0.33930654014931927
0 got 0.38503085630695577
3 got 0.5141336760744062
9 got 0.533113522908684
6 got 0.7963439065904523
2 got 0.8736759497247216
Next, let's explore how to make a for-loop concurrent where the target function has multiple arguments.
Example of Concurrent For-Loop with starmap()
A limitation of the map() function is that it only takes one argument.
If our target task function requires multiple arguments, we can use the starmap() function instead.
The iterable passed to the starmap() function must have an iterable for each call to the task function containing the arguments for the function call.
That is, an iterable of iterables, such as a list of tuples, where each tuple is a set of arguments for one function call.
Let's demonstrate this with a worked example.
We can update the task() function to take three arguments.
# task to execute in another thread
def task(arg1, arg2, arg3):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# return the generated value
return value
We can then prepare an iterable of iterables as arguments for each function call.
In this case, we will create a list of tuples, where each tuple has 3 arguments for a single call to task().
This can be achieved using a list comprehension.
...
# prepare arguments
items = [(i, i*2, i*3) for i in range(10)]
We can then call the starmap() function to execute the for-loop concurrently and report results.
...
# call the same function with different data concurrently
for result in pool.starmap(task, items):
# report the value to show progress
print(result)
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of a concurrent for loop with multiple arguments
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in another thread
def task(arg1, arg2, arg3):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# return the generated value
return value
# entry point for the program
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# prepare arguments
items = [(i, i*2, i*3) for i in range(10)]
# call the same function with different data concurrently
for result in pool.starmap(task, items):
# report the value to show progress
print(result)
Running the example first creates the ThreadPool with one worker per logical CPU in the system.
All ten calls to the task() function are issued to the ThreadPool, each with three arguments.
Once all tasks are completed, an iterable of return values are returned from starmap() and traversed, reporting the values that were generated.
The loop completes in about one second, depending on the specific random numbers generated.
0.2968352724495821
0.6546567643971847
0.8943971105900977
0.08974919320932084
0.6555904739275706
0.041439037787545985
0.2792081868739754
0.3145043760268833
0.5954887178593287
0.6360420800934208
Next, let's look at how we can use imap() to make our programs more responsive.
Example of Concurrent For-Loop with imap()
A limitation of map() and starmap() is that they traverse the provided iterable immediately and issue all function calls to the ThreadPool.
This may use a lot of memory if there are thousands or millions of tasks.
Additionally, the iterable of return values is not returned until all tasks have been completed. This can be problematic if we would prefer to show progress as tasks are completed.
The imap() function overcomes these limitations by only issuing tasks one at a time as workers become available and yielding return values as tasks are completed.
The example below demonstrates a concurrent for-loop using the more responsive imap() function.
# SuperFastPython.com
# example of a concurrent for loop that is more responsive
from time import sleep
from random import random
from multiprocessing.pool import ThreadPool
# task to execute in another thread
def task(arg):
# generate a value between 0 and 1
value = random()
# block for a fraction of a second to simulate work
sleep(value)
# return the generated value
return value
# entry point for the program
if __name__ == '__main__':
# create the thread pool
with ThreadPool() as pool:
# call the same function with different data concurrently
for result in pool.imap(task, range(10)):
# report the value to show progress
print(result)
Running the example first creates the ThreadPool with one worker per logical CPU in the system.
Tasks are issued to the ThreadPool until all workers are occupied. The remaining tasks are only issued once workers become available.
The imap() function returns an iterable immediately and begins yielding results as soon as tasks are completed. Return values are returned in the order that tasks were issued to the ThreadPool.
0.9274582295093903
0.7546060428458197
0.21040423129818286
0.0698291809212942
0.38748299352458526
0.5058055735770848
0.26369837423203313
0.431736504005662
0.9105708852888462
0.6894030278339277
Note, if the order of return values does not matter, the imap_unordered() function can be used instead, which may be even more responsive.
You can learn more about how to use the imap_unordered() function in the tutorial:
Common Questions
This section lists common questions about making for-loops concurrently using the ThreadPool.
Do you have any questions?
Share your question in the comments and I may add it to this section.
How To Set The Number of Workers
You can set the number of workers via the "processes" argument to the ThreadPool class.
Although it is called "processes", it refers to the number of worker threads.
For example:
...
# create a thread pool with 4 workers
pool = ThreadPool(processes=4)
Because it is the first argument, you can omit the argument name.
For example:
...
# create a thread pool with 4 workers
pool = ThreadPool(4)
You can learn more about how to set the number of workers in the tutorial:
How Many Workers Should I Use?
Use the default, which is one worker per logical CPU.
For example:
...
# create a thread pool with one worker per logical CPU
pool = ThreadPool()
You can have many more workers than you have physical or logical CPU cores.
Perhaps experiment and discover what works well for your system and your specific tasks.
For example, you could try one worker per task, or hundreds or thousands of workers, if you have more than ten thousand tasks.
What is a Logical vs Physical CPU?
Modern CPUs typically make use of a technology called hyperthreading.
Hyperthreading does not refer to a program using threads. Instead, it refers to technology within the CPU cores themselves that allows each physical core or CPU to act as if it were two logical cores or two CPUs.
- Physical Cores: The number of CPU cores provided in the hardware, e.g. the chips.
- Logical Cores: The number of CPU cores after hyperthreading is taken into account.
It provides automatic in-core parallelism that can offer up to a 30% speed-up over CPU cores that do not offer the technology.
As such, when we count CPU cores in a system, we typically count the number of logical CPU cores, not the number of physical CPU cores.
If you know your system uses hyperthreading (it probably does), then you can get the number of physical CPUs in your system by dividing the number of logical CPUs by two.
- Count Physical Cores = Count Logical Cores / 2
How Many CPUs Do I Have?
There are a number of ways to get the number of CPUs in Python.
The two pure Python approaches provided by the standard library are:
- multiprocessing.cpu_count() function
- os.cpu_count() function.
For example:
...
# get the number of logical cpu cores
n_cores = multiprocessing.cpu_count()
This will report the number of logical CPUs in your system.
You can learn more about getting the number of CPUs in your system in the tutorial:
What If I Need to Access a Shared Resource?
You may need to access a shared resource.
Worker threads can share data directly via arguments or global variables.
If your task functions are just reading data, then it should be fine.
If your task functions are writing data, then you may need a mutex lock to ensure that only one thread can change the shared resource at a time.
This is to avoid race conditions.
You can learn more about how to use a mutex lock with threads in the tutorial:
What Types of Tasks Can Be Made Concurrent?
The ThreadPool is probably well suited to IO-bound tasks.
These are tasks that perform blocking IO, such as reading or writing from a file, socket, or device.
Takeaways
You now know how to convert a for-loop to be concurrent using the ThreadPool.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.