You can convert a for-loop to be concurrent using the ThreadPoolExecutor class.
In this tutorial, you will discover how to convert a for-loop to be concurrently using the ThreadPoolExecutor.
Let’s get started.
Need a ConcurrentFor-Loop
You have a for-loop and you want to execute each iteration concurrently.
This is a common situation.
The loop involves performing the same operation multiple times with different data.
For example:
1 2 3 4 |
... # perform the same operation on each item for item in items: # perform operation on item... |
It most commonly involves calling the same function each iteration with different arguments.
For example:
1 2 3 4 5 |
... # call the same function each iteration with different data for item in items: # call function with one data item task(item) |
Alternatively, we might want the results from calling the same function multiple times with different data.
This is often achieved using a list comprehension.
For example:
1 2 3 |
... # get results from applying the same function to different data results = [task(item) for item in items] |
Each iteration of the for-loop is performed sequentially. One after the other on one CPU core.
This is a problem because we may have many CPU cores, such as 2, 4, 8, or even 32.
How can we convert a for-loop to be concurrent?
Run loops using all CPUs, download your FREE book to learn how.
First Step: Prepare Your Code For Concurrency
Before we dive into discovering how to make for-loops concurrently, there is a prerequisite.
You will need to refactor your code so that each iteration of your loop calls the same target function each iteration with different arguments.
This means you will need to create a new target function that takes one (or more) arguments with the specific data to operate on each iteration.
For example, your for-loop may look as follows:
1 2 3 4 5 |
# slow for loop executed sequentially for item in items: # do one thing using item # do another thing using item ... |
You can convert it into a loop that calls a target function each iteration that takes the unique data each iteration as an argument.
For example:
1 2 3 4 5 6 7 8 |
# task that operates on an item def task(item): # do one thing using item # do another thing using item # slow for loop executed sequentially for item in items: task(item) |
Generally, the function should be self-contained.
It should not use any shared resources to avoid possible concurrency failure modes like race conditions.
It should be a function with no side effects, taking data as arguments and returning any results via return values.
If this is not the case, you must refactor your code so that this is the case, otherwise, you may have to contend with mutex locks and such which will make everything a whole lot harder.
Next, let’s look at how we can execute for loops concurrently using a ThreadPoolExecutor.
How To Make For-Loop Concurrent With ThreadPoolExecutor
We can make a for-loop concurrent using the ThreadPoolExecutor class.
A thread pool is a programming pattern for automatically managing a pool of worker threads.
The ThreadPoolExecutor class provides a thread pool with helpful functions for executing for loops concurrently.
An instance of the ThreadPoolExecutor class can be created and used to issue IO-bound tasks that will run concurrently. Once we are finished with it, we can close it to release the worker threads.
For example:
1 2 3 4 5 6 |
... # create a thread pool tpe = ThreadPoolExecutor() # ... # close the thread pool tpe.close() |
We can use the context manager interface if we only need to use the pool for one for-loop. This will ensure the pool is closed for us automatically once we are finished using it, even if a task raises an exception.
For example:
1 2 3 4 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # ... |
This is the recommended usage of the ThreadPoolExecutor.
You can learn more about how to use the ThreadPoolExecutor context manager in the tutorial:
The ThreadPoolExecutor provides a concurrent version of the built-in map() function that will call the same function for each item in a provided iterable, e.g. the same function with different data.
For example:
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # call the function for each item concurrently tpe.map(task, items) |
This will traverse the “items” iterable and issue one call to the task() function for each item and return once all tasks have been completed.
If the target function takes multiple arguments, then one iterator can be provided to the map() method for each argument.
For example:
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # call the function for each item concurrently tpe.map(task, items1, items2, items3) |
The map() function returns an iterable of return values from the target function which can be stored or iterated directly if our target function has a return value.
For example:
1 2 3 4 5 6 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # call the function for each item concurrently for result in tpe.map(task, items): print(result) |
You can learn more about how to use the map() method in the tutorial:
Now that we know how to convert our for-loops to be concurrent, let’s look at some worked examples that you can use as templates for your own project.
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Concurrent For-Loop with map()
We can explore how to use map() to execute for-loops concurrently.
In this example, we will use a simple task function as a placeholder that you can replace with your own task function to be called for each loop iteration.
In this case, our task will take one integer argument to the task. It will then generate a random number between 0 and 1, then sleep for a fraction of a second. Finally, it will either return the generated value as a result of the task.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task to execute in another thread def task(arg): # generate a value between 0 and 1 value = random() # block for a fraction of a second to simulate work sleep(value) # return the generated value return value |
Let’s dive in.
Serial For-Loop with map()
Before we dive into concurrent for-loops with map(), let’s start with a sequential for-loop.
We can use the built-in map() function to call our task function each iteration.
Each function call will be executed sequentially, one after the other on one CPU core.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of a sequential for loop from time import sleep from random import random # a task to execute in another thread def task(arg): # generate a value between 0 and 1 value = random() # block for a fraction of a second to simulate work sleep(value) # return the generated value return value # entry point for the program if __name__ == '__main__': # call the same function with different data sequentially for result in map(task, range(10)): # report the value to show progress print(result) |
Running the example calls the same task() function for each value in the iterable from 0 to 9.
Different random numbers are generated each time the code is run.
Each call to task() happens sequentially, and the entire example takes about 4-5 seconds, depending on the specific numbers generated.
1 2 3 4 5 6 7 8 9 10 |
0.4243133760805442 0.045707298020461584 0.5935655464316753 0.7663974921875926 0.246644383852908 0.7426194244409888 0.5884364546911707 0.7430522698968164 0.1931857147844338 0.3468595028527215 |
Next, let’s look at converting the example to execute concurrently using the ThreadPoolExecutor.
Concurrent For-Loop with map()
We can update the previous example so that each call to the task() function is executed concurrently.
First, we can create the ThreadPoolExecutor configured with the default number of worker threads.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor() as tpe: # ... |
Next, we can call the map() function as before and iterate the result values, although this time the map() function is a method on the ThreadPoolExecutor instance.
1 2 3 4 5 |
... # call the same function with different data concurrently for result in tpe.map(task, range(10)): # report the value to show progress print(result) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# SuperFastPython.com # example of a concurrent for loop from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # task to execute in another thread def task(arg): # generate a value between 0 and 1 value = random() # block for a fraction of a second to simulate work sleep(value) # return the generated value return value # entry point for the program if __name__ == '__main__': # create the thread pool with ThreadPoolExecutor() as tpe: # call the same function with different data concurrently for result in tpe.map(task, range(10)): # report the value to show progress print(result) |
Running the example first creates the ThreadPoolExecutor with the default number of worker threads.
All ten calls to the task() function are issued to the ThreadPoolExecutor and it completes them as fast as it is able.
Once all tasks are completed, an iterable of return values is returned from map() and traversed, reporting the values that were generated.
The loop completes in about 1 second, depending on the specific random numbers generated.
1 2 3 4 5 6 7 8 9 10 |
0.9876407209459578 0.6759433823870357 0.8143074378181886 0.9216241923184098 0.5894315828217366 0.6238674603877243 0.27974960559550655 0.8604062115277111 0.6586268213993277 0.328717597775 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Common Questions
This section lists common questions about making for-loops concurrently using the ThreadPoolExecutor.
Do you have any questions?
Share your question in the comments and I may add it to this section.
How To Set The Number of Workers
You can set the number of workers via the “max_workers” argument to the ThreadPoolExecutor class constructor.
For example:
1 2 3 4 5 6 |
... # create a thread pool and set the number of worker threads executor = ThreadPoolExecutor(max_workers=100) # ... # shutdown the thread pool executor.shutdown() |
The max_workers argument is the first argument in the constructor and does not need to be specified by name to be set; for example:
1 2 3 |
... # create a thread pool and set the number of worker threads executor = ThreadPoolExecutor(100) |
You can learn more about how to configure the number of thread workers in the tutorial:
What Is the Default Number of Threads in the ThreadPoolExecutor?
The default number of threads in the ThreadPoolExecutor is calculated as follows:
- Total Number Worker Threads = (CPUs in Your System) + 4
Where the number of CPUs in your system is determined by Python and will take hyperthreading into account.
For example, if you have two CPU cores each with hyperthreading (which is common), then Python will “see” four CPUs in your system.
How Many Thread Workers Should I Use?
You can use one thread worker per task to be executed, up to hundreds or thousands of workers if needed.
For example:
1 2 3 |
... # create a thread pool with many workers executor = ThreadPoolExecutor(1000) |
Experiment and discover the best number of workers for your use case.
How Many CPUs or CPU Cores Do I Have?
You can check the number of CPUs or CPU cores that are visible to Python via the cpu_count() function in the os module.
For example, the following program will report the number of CPU cores in your system that are visible to Python:
1 2 3 |
# report the number of CPUs in your system visible to Python import os print(os.cpu_count()) |
You can learn more about checking the number of CPU cores in the tutorial:
Does the Number of Threads in the ThreadPoolExecutor Match the Number of CPUs or Cores?
The number of worker threads in the ThreadPoolExecutor is not related to the number of CPUs or CPU cores in your system.
You can configure the number of threads based on the number of tasks you need to execute, the amount of local system resources you have available (e.g. memory), and the limitations of resources you intend to access within your tasks (e.g. connections to remote servers).
What Is the Maximum Number of Worker Threads in the ThreadPoolExecutor?
There is no maximum number of worker threads in the ThreadPoolExecutor.
Nevertheless, your system will have an upper limit on the number of threads you can create based on how much main memory (RAM) you have available.
Before you exceed the main memory, you will reach a point of diminishing returns in terms of adding new threads and executing more tasks. This is because your operating system must switch between the threads, called context switching. With too many threads active at once, your program may spend more time context-switching than actually executing tasks.
A sensible upper limit for many applications is hundreds of threads to perhaps a few thousand threads. More than a few thousand threads on a modern system may result in too much context switching, depending on your system and on the types of tasks that are being executed.
What If I Need to Access a Shared Resource?
You may need to access a shared resource in the task executed by worker threads.
Worker threads can share data directly via arguments or global variables.
If your task functions are just reading data, then it should be fine.
If your task functions are writing data, then you may need a mutex lock to ensure that only one thread can change the shared resource at a time.
This is to avoid race conditions.
You can learn more about how to use a mutex lock with threads in the tutorial:
What Types of Tasks Can Be Made Concurrent?
The ThreadPoolExecutor is probably well suited to IO-bound tasks.
These are tasks that perform blocking IO, such as reading or writing from a file, socket, or device.
You can also execute tasks that release the global interpreter lock, such as scipy and numpy functions.
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to convert a for-loop to be concurrent using the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jan Kopřiva on Unsplash
Do you have any questions?