Last Updated on September 12, 2022
You can execute tasks asynchronously with the ThreadPoolExecutor by calling the map() function.
In this tutorial, you will discover how to use the map() function to execute tasks with the thread pool in Python.
Let’s get started.
Need to Call Functions in Separate Threads
You may have a for loop that calls a function for each item in an iterable like a list.
1 2 3 4 |
... # apply a function to each item in a list for item in items: result = task(item) |
How can you make each function call in a separate thread?
Or put another way:
How can you make the for-loop concurrent?
Alternatively, you may be using the built-in map() function to apply the function to each item in an iterable for you.
You can learn more about the built-in functions here:
1 2 3 |
... # apply the function to each element in the collection results = map(task, items) |
This does not perform the task() function to each item until we iterate the results, so-called lazy evaluation:
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Therefore, it is common to see this operation consolidated to the following:
1 2 3 4 |
... # iterate the results from map for result in map(task, items): print(result) |
How can you execute each call by map() with threads?
Run loops using all CPUs, download your FREE book to learn how.
How to Apply a Function to an Iterable in Separate Threads
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can specify the number of threads to create in the thread pool as an argument, which does not have to be related to the number of CPUs or CPU cores in your system.
1 2 3 |
... # create a thread pool executor = ThreadPoolExecutor(20) |
You can also submit tasks by calling the map() function and specifying the name of the function to execute and the iterable of items to which your function will be applied.
1 2 3 |
... # execute each function call in a separate thread results = executor.map(task, items) |
Each call to the target function with one item from the iterable will be executed in a separate thread.
You can then iterate over the results as we would with map(), except we may have to wait for the results as the tasks complete in separate threads. This waiting is called “blocking” and happens automatically.
1 2 3 4 |
... # iterate the results from map for result in results: print(result) |
Although the tasks are executed concurrently, the results are iterated in the order of the iterable provided to the map() function, the same as the built-in map() function.
In this way, we can think of the thread pool version of map() as a concurrent version of the map() function and is ideal if you are looking to update your for loop to use threads.
We can call the map() function to execute the functions in separate threads and process the results using the common for-loop idiom, as follows:
1 2 3 4 |
... # iterate the results from map performed in separate threads for result in executor.map(task, items): print(result) |
The map() function also takes a chunksize argument, but this has no effect with the ThreadPoolExecutor and can be safely ignored.
How to Wait for Results With Timeout
We may want to limit how long we are willing to wait for a task to complete and return a result when iterating results.
This can be achieved by setting the timeout argument when calling map() and specifying how long we are willing to wait in seconds.
1 2 3 4 |
... # iterate the results from map performed in separate threads, wait a limited time for result in executor.map(task, items, timeout=5): print(result) |
If more than the timeout number of seconds elapses before a task completes and returns a result, then a TimeoutError is raised that may need to be handled.
1 2 3 4 5 6 7 |
# handle a timeout error when getting results try: # iterate the results from map performed in separate threads, wait a limited time for result in executor.map(task, items, timeout=5): print(result) except TimeoutError: print('Waited too long') |
How to Wait for All Results to Complete
We don’t have to wait for the results from the call to the map() function in order to continue.
For example, we can call map() with a target task function and an iterable and then carry on with other operations.
The call to map() does not block, meaning that we do not have to wait for the results unless we iterate the results returned from the call to the map() explicitly.
This might be helpful if our target task function does not return a result, although you may want to use the submit() function on the ThreadPoolExecutor instead.
1 2 3 4 |
... # perform the tasks in separate threads map(task, items) # does not block # do other things.. |
Alternatively, we may want the map() function to call the target function in separate threads and wait for all tasks to complete.
This can be achieved by calling the shutdown() function of the ThreadPoolExecutor, which, by default, will wait for all scheduled and running tasks to complete before returning.
1 2 3 |
... # wait for all tasks to complete executor.shutdown() |
Alternately, we use the context manager of the thread pool that will automatically shut down the thread pool for us when we are finished and will block until all tasks have completed.
1 2 3 4 5 6 |
... # start the thread pool with ThreadPoolExecutor(20) as executor: # perform the tasks in separate threads map(task, items) # does not block # shutdown automatically, wait for all tasks to complete |
How to Call map() With Multiple Arguments
The ThreadPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map().
For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map(). The map() function will then apply the function with both sets of arguments and stop when the shortest iterable is exhausted.
1 2 3 |
... # call map with a target function that takes more than one argument executor.map(task, items1, items2) |
If you want to provide ad hoc arguments to your target task function, e.g. variable numbers of arguments or the same argument to multiple function calls, then you may be better off using the submit() function on the ThreadPoolExecutor.
Now that we are familiar with how to use map() function to call a function using threads in Python, let’s look at a worked example.
Example of Using map() With the ThreadPoolExecutor
Let’s explore how to use the map() function on the ThreadPoolExecutor with a worked example.
First, we can define a simple task that will sleep for a variable amount of time less than one second.
The task takes a unique name and will return a message that it has completed.
1 2 3 4 5 |
# custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return f'Task: {name} done.' |
This task is a good candidate for calling the built-in map() function and the ThreadPoolExecutor map() function because it is a pure function that does not have any side effects and does not access or change state outside of the function itself.
Next, we can define a thread pool with ten worker threads using the context manager.
1 2 3 4 |
... # start the thread pool with ThreadPoolExecutor(10) as executor: # ... |
Next, we can call the map() function to apply the task() function to a range of integers from 0 to 9.
Finally, we can report the results as they are made available in the order that the tasks were submitted to the thread pool for execution.
1 2 3 4 5 |
... # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # report the result print(result) |
Tying this together, the complete example of calling map() to apply a function to a range of data using worker threads in the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# SuperFastPython.com # example of calling map and processing results from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return f'Task: {name} done.' # start the thread pool with ThreadPoolExecutor(10) as executor: # execute tasks concurrently and process results in order for result in executor.map(task, range(10)): # report the result print(result) |
Running the example first creates the thread pool, then submits the tasks to the thread pool for execution.
The tasks are completed and results are retrieved and reported as they become available in the order that tasks were submitted.
1 2 3 4 5 6 7 8 9 10 |
Task: 0 done. Task: 1 done. Task: 2 done. Task: 3 done. Task: 4 done. Task: 5 done. Task: 6 done. Task: 7 done. Task: 8 done. Task: 9 done. |
Free Python ThreadPoolExecutor Course
Download your FREE ThreadPoolExecutor PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPoolExecutor API.
Discover how to use the ThreadPoolExecutor class including how to configure the number of workers and how to execute tasks asynchronously.
Example of Using map() With a Time Limit
We may want the results from the function calls, but are unwilling to wait an extended period.
This can be achieved by setting the timeout argument when calling the map() function.
As we process the results by iterating the value returned from the calling map, if a task takes more than the timeout to return a value, then a TimeoutError is raised, which we can choose to handle.
The list below updates the example to wait a timeout of a tiny fraction of a second, and if the timeout elapses, a message is reported.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of calling map and processing results with a timeout from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor from concurrent.futures import TimeoutError # custom task that will sleep for a variable amount of time def task(name): # sleep for less than a second sleep(random()) return f'Task: {name} done.' # start the thread pool with ThreadPoolExecutor(10) as executor: # handle a timeout error when getting results try: # iterate the results from map performed in separate threads, wait a limited time for result in executor.map(task, range(10), timeout=0.05): print(result) except TimeoutError: print('Waited too long') |
Running the example starts the thread pool, then submits ten function call tasks to the pool.
We wait a fraction of a second for the first result before a TimeoutError is raised and caught, reporting a message that we gave up waiting.
Note that if a TimeoutError is raised, it does not impact the tasks that already exist in the thread pool. For example, although we were impatient for a result, the tasks will continue to run and the thread pool will not close until all tasks have completed.
1 |
Waited too long |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Using map () and Waiting for All Tasks to Complete
We may have a target task function that does not return a value, and yet we may want to call the function for an iterable of data and wait for all tasks to complete.
We can do this by calling map, not iterating the results and using the context manager to close the thread pool, which will wait for all tasks to complete.
First, let’s update our target task function to not return a value, but instead report a value directly via a print statement.
1 2 3 4 5 |
# custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') |
We can then submit the tasks to the thread pool via a call to map() without iterating the results.
1 2 3 |
... # submit all tasks executor.map(task, range(5)) |
The call to map() does not block, so we are able to carry on with other tasks in our program.
We can then wait for all tasks in the thread pool to complete by allowing the context manager to call shutdown() for us and return once all tasks have finished.
To make this clear, we will add a print statement that we are waiting for tasks to complete.
1 2 3 |
... # shutdown, wait for all tasks to complete print('Waiting...') |
Once all tasks have completed, we can carry on without a program.
1 2 |
... print('All done.') |
Tying this together, the complete example of waiting for all map() tasks to complete in the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# SuperFastPython.com # example of calling map and waiting for all tasks to complete from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(value): # sleep for less than a second sleep(random()) print(f'Done: {value}') # start the thread pool with ThreadPoolExecutor() as executor: # submit all tasks executor.map(task, range(5)) # shutdown, wait for all tasks to complete print('Waiting...') print('All done.') |
Running the example starts the thread pool and submits five tasks by calling the map() function.
We do not iterate the results of the tasks; instead, we start waiting for the tasks to complete immediately with an implicit call to shutdown() by the context manager.
The tasks complete one by one and we then carry on with our program once all tasks have completed.
1 2 3 4 5 6 7 |
Waiting... Done: 1 Done: 3 Done: 4 Done: 0 Done: 2 All done! |
Example of Using map() With Multiple Arguments
We may want to call a target task function that takes more than one argument.
This can be done with the built-in map() function and with the ThreadPoolExecutor map() function.
Both map() functions take one or more iterables and the map function will stop calling the target task function with values from each iterable when the shortest iterable is exhausted.
We can update our target task function to take two values and to return a tuple that combines the values after a short sleep.
1 2 3 4 5 |
# custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) |
We can define two lists of data that we will use as iterables for our call to map.
1 2 3 4 |
... # define our data data1 = ['1', '2', '3'] data2 = ['a', 'b', 'c'] |
Finally, we can update our call to map() to take both iterables of data.
1 2 3 4 |
... # submit all tasks for result in executor.map(task, data1, data2): print(result) |
Tying this together, the complete example of calling map to execute function calls with two arguments asynchronously with the ThreadPoolExecutor is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of calling map with multiple arguments from time import sleep from random import random from concurrent.futures import ThreadPoolExecutor # custom task that will sleep for a variable amount of time def task(value1, value2): # sleep for less than a second sleep(random()) return (value1, value2) # define our data data1 = ['1', '2', '3'] data2 = ['a', 'b', 'c'] # start the thread pool with ThreadPoolExecutor() as executor: # submit all tasks for result in executor.map(task, data1, data2): print(result) |
Running the example first starts the thread pool, then submits three function calls to the worker threads, one for each pair of values in the two iterables passed to map.
Each task returns a tuple that is printed.
1 2 3 |
('1', 'a') ('2', 'b') ('3', 'c') |
Further Reading
This section provides additional resources that you may find helpful.
Books
- ThreadPoolExecutor Jump-Start, Jason Brownlee, (my book!)
- Concurrent Futures API Interview Questions
- ThreadPoolExecutor Class API Cheat Sheet
I also recommend specific chapters from the following books:
- Effective Python, Brett Slatkin, 2019.
- See Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPoolExecutor: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use the map() function to execute tasks asynchronously with the ThreadPoolExecutor.
Do you have any questions about how to use the map() function?
Ask your questions in the comments below and I will do my best to answer.
Photo by David Marcu on Unsplash
so says
Thanks !, your article is very useful and clear.
Jason Brownlee says
You’re very welcome!