ThreadPoolExecutor Concurrent List Comprehension

August 27, 2023 Python ThreadPoolExecutor

You can execute a list comprehension concurrently with threads by using the ThreadPoolExecutor with either the submit() or map() methods.

In this tutorial, you will discover how to execute a list comprehension concurrently using the ThreadPoolExecutor.

Let's get started.

Need a Concurrent List Comprehension

A list comprehension is a Python syntax for creating a list object with minimal code.

List comprehensions provide a concise way to create lists. Common applications are to make new lists where each element is the result of some operations applied to each member of another sequence or iterable, or to create a subsequence of those elements that satisfy a certain condition.

-- Python API Docs: List Comprehensions

For example, we can traverse a range of values from range() and apply an operation to each, then put the results into a list.

...
# create the list
results = list()
# traverse the range
for value in range(100):
	# calculate the new value
	new_value = value * 1000
	# add to list
	results.append(new_value)

This loop to construct the list can be performed in one line using a list comprehension.

For example:

...
# create list using a list comprehension
results = [value*1000 for value in range(100)]

This is more compact and more readable.

We can also call a function for each item that is being traversed in the list comprehension.

For example:

...
# create list using a list comprehension with a function
results = [calculate(value) for value in range(100)]

The function we are calling may be very slow as it may be performing some IO operation, such as reading from a file, or a network connection.

Alternatively, it may be performing heavy computation that releases the global interpreter lock, such as calling a numpy or scipy function.

In this case, we may want to execute all of the calls to the custom function concurrently.

Without concurrency, a slow function that takes 1 second per function call, would take 1000 seconds for a list comprehension with 1000 elements.

With concurrency, we may be able to execute all 1000 function calls concurrently, allowing the list comprehension to be completed in one second.

How can we execute a list comprehension concurrently using threads in Python?

Concurrent List Comprehension with ThreadPoolExecutor

We can make a list comprehension concurrent using the ThreadPoolExecutor class.

What is the ThreadPoolExecutor

A thread pool is a programming pattern for automatically managing a pool of worker threads.

The ThreadPoolExecutor class provides a thread pool with helpful functions for executing for loops concurrently.

An instance of the ThreadPoolExecutor class can be created and used to issue IO-bound tasks that will run concurrently. Once we are finished with it, we can close it to release the worker threads.

For example:

...
# create a thread pool
tpe = ThreadPoolExecutor()
# ...
# close the thread pool
tpe.close()

You can learn more about shutting down the thread pool in the tutorial:

We can use the context manager interface if we only need to use the pool for one for-loop. This will ensure the pool is closed for us automatically once we are finished using it, even if a task raises an exception.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# ...

This is the recommended usage of the ThreadPoolExecutor.

You can learn more about how to use the ThreadPoolExecutor context manager in the tutorial:

There are two ways we can make a list comprehension concurrent using the ThreadPoolExecutor, they are:

Let's take a closer look at each in turn.

Method 1: Use ThreadPoolExecutor.submit()

The ThreadPoolExecutor provides the submit() method.

This method can be used to issue tasks asynchronously to the thread pool. This means that the call requests that the ThreadPoolExecutor run the task as soon as it is able and returns immediately.

The task will execute sometime in the future.

The submit() method takes the name of a function to execute, and any argument for the function, and returns a Future object.

We can then get the return value from the issued task by calling the result() method on the Future object. This will block and return the return value as soon as it is able.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# issue one task
	future = tpe.submit(task, 1)
	# get the return value as soon as the task is done
	value = future.result()

We can use the submit() method to create a concurrent list comprehension by first setting the thread pool to have one worker per task we wish to issue.

...
# create a thread pool
with ThreadPoolExecutor(1000) as tpe:
	# ...

We can then issue all tasks to the thread pool using submit() in a list comprehension and gather the Future objects.

...
# issue all tasks to the thread pool
futures = [tpe.submit(task, i) for i in range(1000)]

We can then traverse the list of Future objects and get the return value for each task, blocking until the tasks are completed.

The results are gathered in the same order that tasks were issued.

...
# retrieve all return values in order
values_list = [future.result() for future in futures]

You can learn more about the ThreadPoolExecutor.submit() method in the tutorial:

Method 2: Use ThreadPoolExecutor.map()

The ThreadPoolExecutor provides a concurrent version of the built-in map() function that will call the same function for each item in a provided iterable, e.g. the same function with different data.

For example:

...
# create a thread pool
with ThreadPoolExecutor() as tpe:
	# call the function for each item concurrently
	tpe.map(task, items)

This will traverse the "items" iterable and issue one call to the task() function for each item and return once all tasks have been completed.

We can use the ThreadPoolExecutor.map() method to issue all tasks to the thread pool concurrently and gather the return values into a list.

This can be achieved by setting the number of threads in the thread pool to be equal to the number of tasks and using a call to the map() method in a list comprehension directly.

For example:

...
# define the number of tasks
# create a thread pool
with ThreadPoolExecutor(1000) as tpe:
	# execute tasks in parallel and gather the results
	results_list = 

You can learn more about how to use the ThreadPoolExecutor.map() method in the tutorial:

Now that we know how to make a list comprehension concurrent using the ThreadPoolExecutor, let's look at some worked examples.

Example of a List Comprehension (slow)

Before we dive into concurrent list comprehensions, let's look at a traditional list comprehension without using threads.

In this example, we will first develop a task that takes an argument, blocks for a variable amount of time, and returns a value. We will execute many examples of this task sequentially in order to build a list of return values and time the overall duration of the program.

Firstly, we can develop a task that takes a variable amount of time.

In this case, the task takes an integer value, then generates a random value between 0 and 1. It then blocks for a fraction of a second to simulate effort and adds the generated value to the input argument and returns the result.

The task() function below implements this.

# task to execute in a list comprehension
def task(value):
    # generate a random value between 0 and 1
    rand = random()
    # block to simulate work
    sleep(rand)
    # construct a return value
    return value + rand

Next, in the main thread, we can call the task() function 20 times with arguments 0 to 19 and gather the return values into a list using a list comprehension.

We can then report the list of return values for reference.

...
# execute the list comprehension
values_list = [task(i) for i in range(20)]
# report results
print(values_list)

We can then surround this statement with some benchmarking code so that we can report the overall duration of the program.

Firstly we can record the time before running the statement.

...
# record the start time
time_start = time()

After the statement, we can calculate and report the duration.

...
# calculate the duration
time_duration = time() - time_start
# report the duration
print(f'Took: {time_duration:1f} seconds')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a list comprehension
from random import random
from time import sleep
from time import time

# task to execute in a list comprehension
def task(value):
    # generate a random value between 0 and 1
    rand = random()
    # block to simulate work
    sleep(rand)
    # construct a return value
    return value + rand

# protect the entry point
if __name__ == '__main__':
    # record the start time
    time_start = time()
    # execute the list comprehension
    values_list = [task(i) for i in range(20)]
    # report results
    print(values_list)
    # calculate the duration
    time_duration = time() - time_start
    # report the duration
    print(f'Took: {time_duration:1f} seconds')

Running the example calls the task() function 20 times in a list comprehension, building a list of the return values.

The return values are then reported.

The duration that the program took is reported. In this case, it is about 9.2 seconds. This provides a baseline for comparison with the concurrent versions that we expect to be much faster.

[0.44468888035830945, 1.072323122689519, 2.1867678556950043, 3.1717982473609867, 4.863004434947132, 5.569760659167664, 6.842064461325271, 7.239165158376318, 8.268862066877738, 9.080432796976408, 10.821335643710782, 11.504750567400274, 12.938205076623351, 13.66868180373165, 14.016819181226216, 15.425157363723386, 16.420069045157874, 17.86282739985222, 18.089233287190517, 19.672617008799865]
Took: 9.207543 seconds

Next, let's look at how we can execute the list comprehension concurrently.

Example Concurrent List Comprehension with submit()

We can perform a list comprehension concurrently using the ThreadPoolExecutor with the submit() method.

This involves first creating a ThreadPoolExecutor with enough worker threads to handle all tasks concurrently, in this case, 20.

...
# create the thread pool
with ThreadPoolExecutor(20) as tpe:
	# ...

Next, all 20 calls to the task() function can be issued to the ThreadPoolExecutor using the submit() method. This can be performed in a list comprehension, providing a list of Future objects, one for each task that was issued.

...
# issue all tasks to the thread pool
futures = [tpe.submit(task, i) for i in range(20)]

The main thread can then traverse the list of Future objects and get the return value result from each. This blocks until the task is complete and the result is available.

...
# retrieve all return values in order
values_list = [future.result() for future in futures]

Finally, the return values can be reported.

...
# report results
print(values_list)

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a concurrent list comprehension with submit()
from random import random
from time import sleep
from time import time
from concurrent.futures import ThreadPoolExecutor

# task to execute in a list comprehension
def task(value):
    # generate a random value between 0 and 1
    rand = random()
    # block to simulate work
    sleep(rand)
    # construct a return value
    return value + rand

# protect the entry point
if __name__ == '__main__':
    # record the start time
    time_start = time()
    # create the thread pool
    with ThreadPoolExecutor(20) as tpe:
        # issue all tasks to the thread pool
        futures = [tpe.submit(task, i) for i in range(20)]
        # retrieve all return values in order
        values_list = [future.result() for future in futures]
        # report results
        print(values_list)
    # calculate the duration
    time_duration = time() - time_start
    # report the duration
    print(f'Took: {time_duration:1f} seconds')

Running the example first issues all 20 calls to the task() function to the ThreadPoolExecutor, providing a list of Future objects.

The main thread then gathers the return values from each call to the task() function in order, blocking until each task is complete and the result is available.

The tasks execute, block, and return their values.

Once all results are available, the list of return values is reported in the main threads.

The duration is reported, in this case showing that the program takes about 0.9 seconds to complete, which is about 10x faster than the sequential version.

This makes sense. If each task takes about 0.5 seconds on average, then 20 sequential tasks should take about 10 seconds. If all tasks are executed at once, then all tasks should complete in under one second.

This highlights how we can execute a list comprehension concurrently in two steps.

[0.8891772267573991, 1.0807393576786208, 2.719593233914207, 3.8036783311116187, 4.078326443423349, 5.020742460659824, 6.218544200388753, 7.4432661940982765, 8.832731057292051, 9.83096619614077, 10.121994808543738, 11.051216868947039, 12.579995554922753, 13.509059436363483, 14.378772584154657, 15.382430334834421, 16.899475552461503, 17.79445320306152, 18.31953097696884, 19.66183983354772]
Took: 0.901804 seconds

Next, let's look at how we can execute the list comprehension concurrently using the map() method.

Example Concurrent List Comprehension with map()

We can explore how to execute a list comprehension concurrently using the ThreadPoolExecutor and the map() method.

In this case, we will update the sequential version of the program to issue each call to the task() function to the ThreadPoolExecutor using the map() method. This returns an iterable of results once all tasks are done, which is traversed in order to gather the list of results.

...
# issue all tasks to the thread pool and gather results
values_list = 

This is closer to the original sequential version of the list comprehension, as the construction of the list is limited to one line of code.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of a concurrent list comprehension with map()
from random import random
from time import sleep
from time import time
from concurrent.futures import ThreadPoolExecutor

# task to execute in a list comprehension
def task(value):
    # generate a random value between 0 and 1
    rand = random()
    # block to simulate work
    sleep(rand)
    # construct a return value
    return value + rand

# protect the entry point
if __name__ == '__main__':
    # record the start time
    time_start = time()
    # create the thread pool
    with ThreadPoolExecutor(20) as tpe:
        # issue all tasks to the thread pool and gather results
        values_list = 
        # report results
        print(values_list)
    # calculate the duration
    time_duration = time() - time_start
    # report the duration
    print(f'Took: {time_duration:1f} seconds')

Running the example first creates the ThreadPoolExecutor.

All 20 tasks are then issued to the ThreadPoolExecutor via the map() method, then the main thread blocks until the iterable of return values is available.

Each task executes, generating a random value, blocking, then returning the combined input and generated value.

All tasks are complete, then the main thread traverses the iterable of return values returned from the map() method and constructs the list, which is then reported.

In this case, the example takes about 0.9 seconds, similar to the above example using submit(), and about 10x faster than the sequential version.

This highlights how we can execute a list comprehension concurrently using the ThreadPoolExecutor and the map() method.

[0.2873758280190196, 1.2305885042865983, 2.340634400782975, 3.2257637271960826, 4.63660869182357, 5.942457996172246, 6.092516687004832, 7.036296477714468, 8.0108431156771, 9.42627610742479, 10.340301127580133, 11.237674724004842, 12.309415284107502, 13.914252080278235, 14.085799826244386, 15.298127949026954, 16.92919123304112, 17.3373992964576, 18.03936827033344, 19.018877202530124]
Took: 0.948253 seconds

Takeaways

You now know how to execute a list comprehension concurrently using the ThreadPoolExecutor.



If you enjoyed this tutorial, you will love my book: Python ThreadPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.