You can execute a list comprehension concurrently with threads by using the ThreadPoolExecutor with either the submit() or map() methods.
In this tutorial, you will discover how to execute a list comprehension concurrently using the ThreadPoolExecutor.
Let’s get started.
Need a Concurrent List Comprehension
A list comprehension is a Python syntax for creating a list object with minimal code.
List comprehensions provide a concise way to create lists. Common applications are to make new lists where each element is the result of some operations applied to each member of another sequence or iterable, or to create a subsequence of those elements that satisfy a certain condition.
— Python API Docs: List Comprehensions
For example, we can traverse a range of values from range() and apply an operation to each, then put the results into a list.
1 2 3 4 5 6 7 8 9 |
... # create the list results = list() # traverse the range for value in range(100): # calculate the new value new_value = value * 1000 # add to list results.append(new_value) |
This loop to construct the list can be performed in one line using a list comprehension.
For example:
1 2 3 |
... # create list using a list comprehension results = [value*1000 for value in range(100)] |
This is more compact and more readable.
We can also call a function for each item that is being traversed in the list comprehension.
For example:
1 2 3 |
... # create list using a list comprehension with a function results = [calculate(value) for value in range(100)] |
The function we are calling may be very slow as it may be performing some IO operation, such as reading from a file, or a network connection.
Alternatively, it may be performing heavy computation that releases the global interpreter lock, such as calling a numpy or scipy function.
In this case, we may want to execute all of the calls to the custom function concurrently.
Without concurrency, a slow function that takes 1 second per function call, would take 1000 seconds for a list comprehension with 1000 elements.
With concurrency, we may be able to execute all 1000 function calls concurrently, allowing the list comprehension to be completed in one second.
How can we execute a list comprehension concurrently using threads in Python?
Run loops using all CPUs, download your FREE book to learn how.
Concurrent List Comprehension with ThreadPoolExecutor
We can make a list comprehension concurrent using the ThreadPoolExecutor class.
What is the ThreadPoolExecutor
A thread pool is a programming pattern for automatically managing a pool of worker threads.
The ThreadPoolExecutor class provides a thread pool with helpful functions for executing for loops concurrently.
An instance of the ThreadPoolExecutor class can be created and used to issue IO-bound tasks that will run concurrently. Once we are finished with it, we can close it to release the worker threads.
For example:
1 2 3 4 5 6 |
... # create a thread pool tpe = ThreadPoolExecutor() # ... # close the thread pool tpe.close() |
You can learn more about shutting down the thread pool in the tutorial:
We can use the context manager interface if we only need to use the pool for one for-loop. This will ensure the pool is closed for us automatically once we are finished using it, even if a task raises an exception.
For example:
1 2 3 4 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # ... |
This is the recommended usage of the ThreadPoolExecutor.
You can learn more about how to use the ThreadPoolExecutor context manager in the tutorial:
There are two ways we can make a list comprehension concurrent using the ThreadPoolExecutor, they are:
- Method 1: Use ThreadPoolExecutor.submit()
- Method 2: Use ThreadPoolExecutor.map()
Let’s take a closer look at each in turn.
Method 1: Use ThreadPoolExecutor.submit()
The ThreadPoolExecutor provides the submit() method.
This method can be used to issue tasks asynchronously to the thread pool. This means that the call requests that the ThreadPoolExecutor run the task as soon as it is able and returns immediately.
The task will execute sometime in the future.
The submit() method takes the name of a function to execute, and any argument for the function, and returns a Future object.
We can then get the return value from the issued task by calling the result() method on the Future object. This will block and return the return value as soon as it is able.
For example:
1 2 3 4 5 6 7 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # issue one task future = tpe.submit(task, 1) # get the return value as soon as the task is done value = future.result() |
We can use the submit() method to create a concurrent list comprehension by first setting the thread pool to have one worker per task we wish to issue.
1 2 3 4 |
... # create a thread pool with ThreadPoolExecutor(1000) as tpe: # ... |
We can then issue all tasks to the thread pool using submit() in a list comprehension and gather the Future objects.
1 2 3 |
... # issue all tasks to the thread pool futures = [tpe.submit(task, i) for i in range(1000)] |
We can then traverse the list of Future objects and get the return value for each task, blocking until the tasks are completed.
The results are gathered in the same order that tasks were issued.
1 2 3 |
... # retrieve all return values in order values_list = [future.result() for future in futures] |
You can learn more about the ThreadPoolExecutor.submit() method in the tutorial:
Method 2: Use ThreadPoolExecutor.map()
The ThreadPoolExecutor provides a concurrent version of the built-in map() function that will call the same function for each item in a provided iterable, e.g. the same function with different data.
For example:
1 2 3 4 5 |
... # create a thread pool with ThreadPoolExecutor() as tpe: # call the function for each item concurrently tpe.map(task, items) |
This will traverse the “items” iterable and issue one call to the task() function for each item and return once all tasks have been completed.
We can use the ThreadPoolExecutor.map() method to issue all tasks to the thread pool concurrently and gather the return values into a list.
This can be achieved by setting the number of threads in the thread pool to be equal to the number of tasks and using a call to the map() method in a list comprehension directly.
For example:
1 2 3 4 5 6 |
... # define the number of tasks # create a thread pool with ThreadPoolExecutor(1000) as tpe: # execute tasks in parallel and gather the results results_list = [result for result in tpe.map(task, range(20))] |
You can learn more about how to use the ThreadPoolExecutor.map() method in the tutorial:
Now that we know how to make a list comprehension concurrent using the ThreadPoolExecutor, let’s look at some worked examples.
Example of a List Comprehension (slow)
Before we dive into concurrent list comprehensions, let’s look at a traditional list comprehension without using threads.
In this example, we will first develop a task that takes an argument, blocks for a variable amount of time, and returns a value. We will execute many examples of this task sequentially in order to build a list of return values and time the overall duration of the program.
Firstly, we can develop a task that takes a variable amount of time.
In this case, the task takes an integer value, then generates a random value between 0 and 1. It then blocks for a fraction of a second to simulate effort and adds the generated value to the input argument and returns the result.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task to execute in a list comprehension def task(value): # generate a random value between 0 and 1 rand = random() # block to simulate work sleep(rand) # construct a return value return value + rand |
Next, in the main thread, we can call the task() function 20 times with arguments 0 to 19 and gather the return values into a list using a list comprehension.
We can then report the list of return values for reference.
1 2 3 4 5 |
... # execute the list comprehension values_list = [task(i) for i in range(20)] # report results print(values_list) |
We can then surround this statement with some benchmarking code so that we can report the overall duration of the program.
Firstly we can record the time before running the statement.
1 2 3 |
... # record the start time time_start = time() |
After the statement, we can calculate and report the duration.
1 2 3 4 5 |
... # calculate the duration time_duration = time() - time_start # report the duration print(f'Took: {time_duration:1f} seconds') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of a list comprehension from random import random from time import sleep from time import time # task to execute in a list comprehension def task(value): # generate a random value between 0 and 1 rand = random() # block to simulate work sleep(rand) # construct a return value return value + rand # protect the entry point if __name__ == '__main__': # record the start time time_start = time() # execute the list comprehension values_list = [task(i) for i in range(20)] # report results print(values_list) # calculate the duration time_duration = time() - time_start # report the duration print(f'Took: {time_duration:1f} seconds') |
Running the example calls the task() function 20 times in a list comprehension, building a list of the return values.
The return values are then reported.
The duration that the program took is reported. In this case, it is about 9.2 seconds. This provides a baseline for comparison with the concurrent versions that we expect to be much faster.
1 2 |
[0.44468888035830945, 1.072323122689519, 2.1867678556950043, 3.1717982473609867, 4.863004434947132, 5.569760659167664, 6.842064461325271, 7.239165158376318, 8.268862066877738, 9.080432796976408, 10.821335643710782, 11.504750567400274, 12.938205076623351, 13.66868180373165, 14.016819181226216, 15.425157363723386, 16.420069045157874, 17.86282739985222, 18.089233287190517, 19.672617008799865] Took: 9.207543 seconds |
Next, let’s look at how we can execute the list comprehension concurrently.
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example Concurrent List Comprehension with submit()
We can perform a list comprehension concurrently using the ThreadPoolExecutor with the submit() method.
This involves first creating a ThreadPoolExecutor with enough worker threads to handle all tasks concurrently, in this case, 20.
1 2 3 4 |
... # create the thread pool with ThreadPoolExecutor(20) as tpe: # ... |
Next, all 20 calls to the task() function can be issued to the ThreadPoolExecutor using the submit() method. This can be performed in a list comprehension, providing a list of Future objects, one for each task that was issued.
1 2 3 |
... # issue all tasks to the thread pool futures = [tpe.submit(task, i) for i in range(20)] |
The main thread can then traverse the list of Future objects and get the return value result from each. This blocks until the task is complete and the result is available.
1 2 3 |
... # retrieve all return values in order values_list = [future.result() for future in futures] |
Finally, the return values can be reported.
1 2 3 |
... # report results print(values_list) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of a concurrent list comprehension with submit() from random import random from time import sleep from time import time from concurrent.futures import ThreadPoolExecutor # task to execute in a list comprehension def task(value): # generate a random value between 0 and 1 rand = random() # block to simulate work sleep(rand) # construct a return value return value + rand # protect the entry point if __name__ == '__main__': # record the start time time_start = time() # create the thread pool with ThreadPoolExecutor(20) as tpe: # issue all tasks to the thread pool futures = [tpe.submit(task, i) for i in range(20)] # retrieve all return values in order values_list = [future.result() for future in futures] # report results print(values_list) # calculate the duration time_duration = time() - time_start # report the duration print(f'Took: {time_duration:1f} seconds') |
Running the example first issues all 20 calls to the task() function to the ThreadPoolExecutor, providing a list of Future objects.
The main thread then gathers the return values from each call to the task() function in order, blocking until each task is complete and the result is available.
The tasks execute, block, and return their values.
Once all results are available, the list of return values is reported in the main threads.
The duration is reported, in this case showing that the program takes about 0.9 seconds to complete, which is about 10x faster than the sequential version.
This makes sense. If each task takes about 0.5 seconds on average, then 20 sequential tasks should take about 10 seconds. If all tasks are executed at once, then all tasks should complete in under one second.
This highlights how we can execute a list comprehension concurrently in two steps.
1 2 |
[0.8891772267573991, 1.0807393576786208, 2.719593233914207, 3.8036783311116187, 4.078326443423349, 5.020742460659824, 6.218544200388753, 7.4432661940982765, 8.832731057292051, 9.83096619614077, 10.121994808543738, 11.051216868947039, 12.579995554922753, 13.509059436363483, 14.378772584154657, 15.382430334834421, 16.899475552461503, 17.79445320306152, 18.31953097696884, 19.66183983354772] Took: 0.901804 seconds |
Next, let’s look at how we can execute the list comprehension concurrently using the map() method.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example Concurrent List Comprehension with map()
We can explore how to execute a list comprehension concurrently using the ThreadPoolExecutor and the map() method.
In this case, we will update the sequential version of the program to issue each call to the task() function to the ThreadPoolExecutor using the map() method. This returns an iterable of results once all tasks are done, which is traversed in order to gather the list of results.
1 2 3 |
... # issue all tasks to the thread pool and gather results values_list = [result for result in tpe.map(task, range(20))] |
This is closer to the original sequential version of the list comprehension, as the construction of the list is limited to one line of code.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of a concurrent list comprehension with map() from random import random from time import sleep from time import time from concurrent.futures import ThreadPoolExecutor # task to execute in a list comprehension def task(value): # generate a random value between 0 and 1 rand = random() # block to simulate work sleep(rand) # construct a return value return value + rand # protect the entry point if __name__ == '__main__': # record the start time time_start = time() # create the thread pool with ThreadPoolExecutor(20) as tpe: # issue all tasks to the thread pool and gather results values_list = [result for result in tpe.map(task, range(20))] # report results print(values_list) # calculate the duration time_duration = time() - time_start # report the duration print(f'Took: {time_duration:1f} seconds') |
Running the example first creates the ThreadPoolExecutor.
All 20 tasks are then issued to the ThreadPoolExecutor via the map() method, then the main thread blocks until the iterable of return values is available.
Each task executes, generating a random value, blocking, then returning the combined input and generated value.
All tasks are complete, then the main thread traverses the iterable of return values returned from the map() method and constructs the list, which is then reported.
In this case, the example takes about 0.9 seconds, similar to the above example using submit(), and about 10x faster than the sequential version.
This highlights how we can execute a list comprehension concurrently using the ThreadPoolExecutor and the map() method.
1 2 |
[0.2873758280190196, 1.2305885042865983, 2.340634400782975, 3.2257637271960826, 4.63660869182357, 5.942457996172246, 6.092516687004832, 7.036296477714468, 8.0108431156771, 9.42627610742479, 10.340301127580133, 11.237674724004842, 12.309415284107502, 13.914252080278235, 14.085799826244386, 15.298127949026954, 16.92919123304112, 17.3373992964576, 18.03936827033344, 19.018877202530124] Took: 0.948253 seconds |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to execute a list comprehension concurrently using the ThreadPoolExecutor.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Vander Films on Unsplash
Do you have any questions?