Last Updated on September 12, 2022
You can adopt one of the common usage patterns to get the most out of the ThreadPoolExecutor in Python.
In this tutorial, you will discover the common usage patterns for Python thread pools.
Let’s get started.
ThreadPoolExecutor Usage Patterns
The ThreadPoolExecutor class provides a lot of flexibility for executing concurrent tasks in Python.
Nevertheless, there are a handful of common usage patterns that will fit most program scenarios.
This section lists the common usage patterns with worked examples that you can copy and paste into your own project and adapt as needed.
The patterns we will look at are as follows:
- Map and Wait Pattern
- Submit and Use as Completed Pattern
- Submit and Use Sequentially Pattern
- Submit and Use Callback Pattern
- Submit and Wait for All Pattern
- Submit and Wait for First Pattern
We will use a contrived task in each example that will sleep for a random amount of time equal to less than one second. You can easily replace this example task with your own task in each pattern.
Also, recall that each Python program has one thread by default called the main thread, where we do our work. We will create the thread pool in the main thread in each example and may reference actions in the main thread in some of the patterns, as opposed to actions in threads in the thread pool.
Let’s start with the first usage pattern, the map() and wait.
Run loops using all CPUs, download your FREE book to learn how.
Map and Wait Pattern
Perhaps the most common pattern when using the ThreadPoolExecutor is to convert a for loop that executes a function on each item in a collection to use threads.
It assumes that the function has no side effects, meaning it does not access any data outside of the function and does not change the data provided to it. It takes data and produces a result.
These types of for loops can be written explicitly in Python; for example:
1 2 3 4 |
... # apply a function to each element in a collection for item in mylist: result = task(item) |
A better practice is to use the built-in map() function that applies the function to each item in the iterable for you.
1 2 3 |
... # apply the function to each element in the collection results = map(task, mylist) |
This does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Therefore, it is common to see this operation consolidated to the following:
1 2 3 4 |
... # iterate the results from map for result in map(task, mylist): print(result) |
We can perform this same operation using the thread pool, except each application of the function to an item in the list is a task that is executed asynchronously.
For example:
1 2 3 4 |
... # iterate the results from map for result in executor.map(task, mylist): print(result) |
Although the tasks are executed concurrently, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.
In this way, we can think of the thread pool version of map() as a concurrent version of the map() function and is ideal if you are looking to update your for loop to use threads.
The example below demonstrates using the map and wait pattern with a task that will sleep a random amount of time less than one second and return the provided value.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# SuperFastPython.com # example of the map and wait pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # start the thread pool with ThreadPoolExecutor(10) as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # retrieve the result print(result) |
Running the example, we can see that the results are reported in the order that the tasks were created and sent into the thread pool.
1 2 3 4 5 6 7 8 9 10 |
0 1 2 3 4 5 6 7 8 9 |
The map() function supports target functions that take more than one argument by providing more than iterable as arguments to the call to map().
For example, we can define a target function for map that takes two arguments, then provide two iterables of the same length to the call to map.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# SuperFastPython.com # example of calling map with two iterables from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) # start the thread pool with ThreadPoolExecutor() as executor: # submit all tasks for result in executor.map(task, ['1', '2', '3'], ['a', 'b', 'c']): print(result) |
Running the example executes the tasks as expected, providing two arguments to map and reporting a result that combines both arguments.
1 2 3 |
('1', 'a') ('2', 'b') ('3', 'c') |
A call to the map() function will issue all tasks to the thread pool immediately, even if you do not iterate the iterable of results.
This is unlike the built-in map() function that is lazy and does not compute each call until you ask for the result during iteration.
The example below confirms this by issuing all tasks with a map and not iterating the results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# SuperFastPython.com # example of calling map and not iterating the results from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') return value # start the thread pool with ThreadPoolExecutor() as executor: # submit all tasks executor.map(task, range(5)) print('All done!') |
Running the example, we can see that the tasks are sent into the thread pool and executed without having to explicitly pass over the iterable of results that was returned.
The use of the context manager ensured that the thread pool did not shutdown until all tasks were complete.
1 2 3 4 5 6 |
Done: 0 Done: 2 Done: 1 Done: 3 Done: 4 All done! |
Submit and Use as Completed
Perhaps the second most common pattern when using the ThreadPoolExecutor is to submit tasks and use the results as they become available.
This can be achieved using the submit() function to push tasks into the thread pool that returns Future objects, then calling the module method as_completed() on the list of Future objects that will return each Future object as its task is completed.
The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing results in the order that they were completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of the submit and use as completed pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results as they are available for future in as_completed(futures): # retrieve the result print(future.result()) |
Running the example, we can see that the results are retrieved and printed in the order that the tasks completed, not the order that the tasks were submitted to the thread pool.
1 2 3 4 5 6 7 8 9 10 |
5 9 6 1 0 7 3 8 4 2 |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Submit and Use Sequentially
We may require the results from tasks in the order that the tasks were submitted.
This may be because the tasks have a natural ordering.
We can implement this pattern by calling submit() for each task to get a list of Future objects, then iterating over the Future objects in the order that the tasks were submitted and retrieving the results.
The main difference from the “as completed” pattern is that we enumerate the list of futures directly, instead of calling the as_completed() function.
1 2 3 4 5 |
... # process task results in the order they were submitted for future in futures: # retrieve the result print(future.result()) |
The example below demonstrates this pattern, submitting the tasks in order from 0 to 9 and showing the results in the order that they were submitted.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of the submit and use sequentially pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # process task results in the order they were submitted for future in futures: # retrieve the result print(future.result()) |
Running the example, we can see that the results are retrieved and printed in the order that the tasks were submitted, not the order that the tasks were completed.
1 2 3 4 5 6 7 8 9 10 |
0 1 2 3 4 5 6 7 8 9 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Submit and Use Callback
We may not want to explicitly process the results once they are available; instead, we want to call a function on the result.
Rather than doing this manually, such as in the as completed pattern above, we can have the thread pool call the function for us with the result automatically.
This can be achieved by setting a callback on each future object by calling the add_done_callback() function and passing the name of the function.
The thread pool will then call the callback function as each task completes, passing in future objects for the task.
The example below demonstrates this pattern, registering a custom callback function to be applied to each task as it is completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of the submit and use a callback pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # custom callback function called on tasks when they complete def custom_callback(fut): # retrieve the result print(fut.result()) # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # register the callback on all tasks for future in futures: future.add_done_callback(custom_callback) # wait for tasks to complete... |
Running the example, we can see that results are retrieved and printed in the order they are completed, not the order that tasks were submitted.
1 2 3 4 5 6 7 8 9 10 |
8 0 7 1 4 6 5 3 2 9 |
We can register multiple callbacks on each Future object; it is not limited to a single callback.
The callback functions are called in the order in which they were registered on each Future object.
The following example demonstrates having two callbacks on each Future.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of the submit and use multiple callbacks for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # custom callback function called on tasks when they complete def custom_callback1(fut): # retrieve the result print(f'Callback 1: {fut.result()}') # custom callback function called on tasks when they complete def custom_callback2(fut): # retrieve the result print(f'Callback 2: {fut.result()}') # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # register the callbacks on all tasks for future in futures: future.add_done_callback(custom_callback1) future.add_done_callback(custom_callback2) # wait for tasks to complete... |
Running the example, we can see that results are reported in the order that tasks were completed and that the two callback functions are called for each task in the order that we registered them with each Future object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Callback 1: 3 Callback 2: 3 Callback 1: 9 Callback 2: 9 Callback 1: 7 Callback 2: 7 Callback 1: 2 Callback 2: 2 Callback 1: 0 Callback 2: 0 Callback 1: 5 Callback 2: 5 Callback 1: 1 Callback 2: 1 Callback 1: 8 Callback 2: 8 Callback 1: 4 Callback 2: 4 Callback 1: 6 Callback 2: 6 |
Submit and Wait For All
It is common to submit all tasks and then wait for all tasks in the thread pool to complete.
This pattern may be useful when tasks do not return a result directly, such as if each task stores the result in a resource directly like a file.
There are two ways that we can wait for tasks to complete: by calling the wait() module function or by calling shutdown().
The most likely case is you want to explicitly wait for a set or subset of tasks in the thread pool to complete.
You can achieve this by passing the list of tasks to the wait() function, which by default will wait for all tasks to complete.
1 2 3 |
... # wait for all tasks to complete wait(futures) |
We can explicitly specify to wait for all tasks by setting the return_when argument to the ALL_COMPLETED constant; for example:
1 2 3 |
... # wait for all tasks to complete wait(futures, return_when=ALL_COMPLETED) |
The example below demonstrates this pattern. Note that we are intentionally ignoring the return from calling wait() as we have no need to inspect it in this case.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of the submit and wait for all pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete wait(futures) print('All tasks are done!') |
Running the example, we can see that results are handled by each task as the tasks complete. Importantly, we can see that the main thread waits until all tasks are completed before carrying on and printing a message.
1 2 3 4 5 6 7 8 9 10 11 |
3 9 0 8 4 6 2 1 5 7 All tasks are done! |
An alternative approach would be to shut down the thread pool and wait for all executing and queued tasks to complete before moving on.
This might be preferred when we don’t have a list of Future objects or when we only intend to use the thread pool once for a set of tasks.
We can implement this pattern using the context manager; for example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# SuperFastPython.com # example of the submit and wait for all with shutdown pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # start the thread pool with ThreadPoolExecutor(10) as executor: # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete print('All tasks are done!') |
Running the example, we can see that the main thread does not move on and print the message until all tasks are completed, after the thread pool has been automatically shut down by the context manager.
1 2 3 4 5 6 7 8 9 10 11 |
1 2 8 4 5 3 9 0 7 6 All tasks are done! |
The context manager automatic shutdown pattern might be confusing to developers not used to how thread pools work, hence the comment at the end of the context manager block in the previous example.
We can achieve the same effect without the context manager and an explicit call to shutdown.
1 2 3 |
... # wait for all tasks to complete and close the pool executor.shutdown() |
Recall that the shutdown() function will wait for all tasks to complete by default and will not cancel any queued tasks, but we can make this explicit by setting the wait argument to True and the cancel_futures argument to False; for example:
1 2 3 |
... # wait for all tasks to complete and close the pool executor.shutdown(wait=True, cancel_futures=False) |
The example below demonstrates the pattern of waiting for all tasks in the thread pool to complete by calling shutdown() before moving on.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of the submit and wait for all with shutdown pattern for the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) # display the result print(name) # start the thread pool executor = ThreadPoolExecutor(10) # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait for all tasks to complete executor.shutdown() print('All tasks are done!') |
Running the example, we can see that all tasks report their result as they complete and that the main thread does not move on until all tasks have completed and the thread pool has been shutdown.
1 2 3 4 5 6 7 8 9 10 11 |
3 5 2 6 8 9 7 1 4 0 All tasks are done! |
Submit and Wait for First
It is common to issue many tasks and only be concerned with the first result returned.
That is, not the result of the first task, but a result from any task that happens to be the first to complete its execution.
This may be the case if you are trying to access the same resource from multiple locations, like a file or some data.
This pattern can be achieved using the wait() module function and setting the return_when argument to the FIRST_COMPLETED constant.
1 2 3 |
... # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) |
We must also manage the thread pool manually by constructing it and calling shutdown() manually so that we can continue on with the execution of the main thread without waiting for all of the other tasks to complete.
The example below demonstrates this pattern and will stop waiting as soon as the first task is completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of the submit and wait for first the ThreadPoolExecutor from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait from concurrent.futures import FIRST_COMPLETED # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return name # start the thread pool executor = ThreadPoolExecutor(10) # submit tasks and collect futures futures = [executor.submit(task, i) for i in range(10)] # wait until any task completes done, not_done = wait(futures, return_when=FIRST_COMPLETED) # get the result from the first task to complete print(done.pop().result()) # shutdown without waiting executor.shutdown(wait=False, cancel_futures=True) |
Running the example will wait for any of the tasks to complete, then retrieve the result of the first completed task and shut down the thread pool.
Importantly, the tasks will continue to execute in the thread pool in the background and the main thread will not close until all tasks have completed.
1 |
9 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know the common usage patterns for the ThreadPoolExecutor in Python.
Do you have any questions about the common usage patterns?
Ask your question in the comments below and I will do my best to answer.
Paul Chamberlain says
Thanks for the good helpful examples. Here’s some constructive feedback:
I saw a note somewhere that if you pass an iterator to the “map” technique that it will read the entire list before it begins any of the “work”. So, for big lists of work that might not be the right pattern.
It’s been a little challenge to pass multiple parameters to the map and return multiple results from the worker. Turns out that you can do it with tuples.
If the “master” wants to do something with the results from a lot of workers then there’s some cleverness needed to correlate the results with the worker’s parameters.
Jason Brownlee says
Thanks for sharing Paul!
Yes, map will complete iterator first and pass out all tasks to the thread pool. No good for massive lists. An alternative may be to use submit and limit the max size of the task pool, e.g.:
https://superfastpython.com/threadpoolexecutor-limit-pending-tasks/
With multiple arguments to map, you can use compound data structure like a tuple as you suggest, or iterate over multiple lists if appropriate. For example:
https://superfastpython.com/threadpoolexecutor-map/#Example_of_Using_map_With_Multiple_Arguments