Last Updated on October 29, 2022
You can execute multiple tasks in the ThreadPool using the map() method.
In this tutorial you will discover how to use the map() method with the ThreadPool in Python.
Let’s get started.
Need a Concurrent Version of map()
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using methods such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
The built-in map() function allows you to apply a function to each item in an iterable.
The ThreadPool provides a map() method for executing multiple tasks in the pool.
How can we use the ThreadPool map() method?
Run loops using all CPUs, download your FREE book to learn how.
How to Use ThreadPool map()
The ThreadPool provides the map() method for executing multiple tasks.
Recall that the built-in map() function will apply a given function to each item in a given iterable.
Return an iterator that applies function to every item of iterable, yielding the results.
— Built-in Functions
It yields one result returned from the given target function called with one item from a given iterable. It is common to call map and iterate the results in a for-loop.
For example:
1 2 3 4 |
... # iterates results from map for result in map(task, items): # ... |
The ThreadPool provides a version of the map() function where the target function is called for each item in the provided iterable with separate worker threads.
A parallel equivalent of the map() built-in function […]. It blocks until the result is ready.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 |
... # iterates results from map for result in pool.map(task, items): # ... |
Each item in the iterable is taken as a separate task in the ThreadPool.
Like the built-in map() function, the returned iterator of results will be in the order of the provided iterable. This means that tasks are issued (and perhaps executed) in the same order as the results are returned.
Unlike the built-in map() function, the ThreadPool map() method only takes one iterable as an argument. This means that the target function executed in the thread can only take a single argument.
The iterable of items that is passed is iterated in order to issue all tasks to the ThreadPool. Therefore, if the iterable is very long, it may result in many tasks waiting in memory to execute, e.g. one per worker thread.
It is possible to split up the items in the iterable evenly to worker threads.
For example, if we had a ThreadPool with 4 worker threads and an iterable with 40 items, we can split up the items into 4 chunks of 10 items, with one chunk allocated to each worker thread.
The effect is less overhead in transmitting tasks to worker threads and collecting results.
This can be achieved via the “chunksize” argument to map().
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 |
... # iterates results from map with chunksize for result in pool.map(task, items, chunksize=10): # ... |
Difference Between map() and map_async()
How does the map() method compare to the map_async() method for issuing tasks to the ThreadPool?
Both the map() and map_async() may be used to issue tasks that call a function to all items in an iterable via the ThreadPool.
The following summarizes the key differences between these two methods:
- The map() method blocks, whereas the map_async() method does not block.
- The map() method returns an iterable of return values from the target function, whereas the map_async() function returns an AsyncResult.
- The map() method does not support callback functions, whereas the map_async() method can execute callback functions on return values and errors.
The map() method should be used for issuing target task functions to the ThreadPool where the caller can or must block until all function calls are complete.
The map_async() method should be used for issuing target task functions to the ThreadPool where the caller cannot or must not block while the task is executing.
Now that we know how to use the map() method to execute tasks in the ThreadPool, let’s look at some worked examples.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Example of ThreadPool map()
We can explore how to use the map() method on the ThreadPool.
In this example, we can define a target task function that takes an integer as an argument, generates a random number, reports the value then returns the value that was generated. We can then call this function for each integer between 0 and 9 using the ThreadPool map() method.
This will apply the function to each integer in parallel using as many cores as are available in the system.
Firstly, we can define the target task function.
The function takes an argument, generates a random number between 0 and 1, reports the integer and generated number. It then blocks for a fraction of a second to simulate computational effort, then returns the number that was generated.
The task() function below implements this.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value |
We can then create and configure a ThreadPool.
We will use the context manager interface to ensure the pool is shutdown automatically once we are finished with it.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool() as pool: # ... |
We can then call the map() function on the ThreadPool to apply our task() function to each value in a range between 0 and 1.
This returns an iterator over the results returned from the task() function, in the order that function calls are completed. We will iterate over the results and report each in turn.
This can all be achieved in a for-loop.
1 2 3 4 |
... # execute tasks in order for result in pool.map(task, range(10)): print(f'Got result: {result}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of parallel map() with the thread pool from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # execute tasks in order for result in pool.map(task, range(10)): print(f'Got result: {result}') # thread pool is closed automatically |
Running the example first creates the ThreadPool with a default configuration.
It will have one worker thread for each logical CPU in your system.
The map() function is then called for the range.
This issues ten calls to the task() function, one for each integer between 0 and 9. An iterator is returned with the result for each function call, in order.
Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.
The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.
Importantly, all task() function calls are issued and executed before the iterator of results is returned. We cannot iterate over results as they are completed by the caller.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Task 0 executing with 0.9939823945701354 Task 1 executing with 0.47729468910203754 Task 2 executing with 0.8758836044357526 Task 3 executing with 0.5022561566448386 Task 4 executing with 0.9057454582446577 Task 5 executing with 0.2211189207208677 Task 6 executing with 0.13548680901347598 Task 7 executing with 0.12991297482843456 Task 8 executing with 0.3186675013898512 Task 9 executing with 0.6515046375113036 Got result: 0.9939823945701354 Got result: 0.47729468910203754 Got result: 0.8758836044357526 Got result: 0.5022561566448386 Got result: 0.9057454582446577 Got result: 0.2211189207208677 Got result: 0.13548680901347598 Got result: 0.12991297482843456 Got result: 0.3186675013898512 Got result: 0.6515046375113036 |
Next, let’s look at an example where we might call a map for a function with no return value.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of ThreadPool map() with No Return Value
We can explore using the map() function to call a function for each item in an iterable that does not have a return value.
This means that we are not interested in the iterable of results returned by the call to map() and instead are only interested that all issued tasks get executed.
This can be achieved by updating the previous example so that the task() function does not return a value.
The updated task() function with this change is listed below.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) |
Then, in the main thread, we can call map() with our task() function and the range, and not iterate the results.
1 2 3 |
... # execute tasks, block until all completed pool.map(task, range(10)) |
Importantly, the call to map() on the ThreadPool will block the main thread until all issued tasks are completed.
Once completed, the call will return and the ThreadPool will be closed by the context manager.
This is a helpful pattern to issue many tasks to the ThreadPool with a single function call.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of parallel map() with the thread pool and a task that does not return a value from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool() as pool: # execute tasks, block until all completed pool.map(task, range(10)) # thread pool is closed automatically |
Running the example first creates the ThreadPool with a default configuration.
The map() function is then called for the range. This issues ten calls to the task() function, one for each integer between 0 and 9. An iterator is returned with the result for each function call, but is ignored in this case.
Each call to the task function generates a random number between 0 and 1, reports a message, then blocks.
The main thread blocks until the map() function returns.
The tasks finish, map() returns, then the ThreadPool is closed.
This example again highlights that the call to map() blocks until all issued tasks are completed.
1 2 3 4 5 6 7 8 9 10 |
Task 0 executing with 0.43501726038642363 Task 1 executing with 0.06737546043581677 Task 2 executing with 0.7243486114586718 Task 3 executing with 0.8507370154883928 Task 4 executing with 0.419007224196702 Task 5 executing with 0.8363778406379463 Task 6 executing with 0.5561916282608673 Task 7 executing with 0.6785705226822631 Task 8 executing with 0.5750485172990072 Task 9 executing with 0.23184915901578418 |
Next, let’s explore the chunksize argument to the map() function.
Example of ThreadPool map() with chunksize
The map() function will apply a function to each item in an iterable.
If the iterable has a large number of items, it may be inefficient to issue function calls to the target function for each item.
A more efficient approach would be to divide the items in the iterable into chunks and issue chunks of items to each worker thread to which the target function can be applied.
This can be achieved with the “chunksize” argument to the map() function.
We can demonstrate this with an example, first without using the “chunksize” argument, then using the “chunksize” to speed up the division of work.
Example Without Chunks
Before we demonstrate the “chunksize” argument, we can devise an example that has a reasonably large iterable.
In this example, we can update the previous example to call the task() function that returns a value as before, but to have 4 worker threads and to call the task() function 40 times, for integers 0 and 39
This number of worker threads and calls to task() were chosen so that we can test the “chunksize” argument in the next section. If you have fewer than 4 logical CPU cores in your system, change the example accordingly, e.g. 2 worker threads and 20 tasks.
1 2 3 4 5 |
... # create and configure the thread pool with ThreadPool(4) as pool: # execute tasks, block until all complete pool.map(task, range(40)) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of parallel map() with the thread pool with a larger iterable from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(1) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool(4) as pool: # execute tasks, block until all complete pool.map(task, range(40)) # thread pool is closed automatically |
Running the example first creates the ThreadPool with 4 thread workers.
The map() function is then called for the range.
This issues 40 calls to the task() function, one for each integer between 0 and 39. An iterator is returned with the result for each function call, in order.
Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.
The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.
On my system, the example took about 12.2 seconds to complete.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
Task 0 executing with 0.05101144919690426 Task 3 executing with 0.41536115411459507 Task 6 executing with 0.18432973765312977 Task 9 executing with 0.4121165717166093 Task 1 executing with 0.6526851129540462 Task 4 executing with 0.3633527093870562 Task 7 executing with 0.03836331451131314 Task 10 executing with 0.5025071772499283 Task 2 executing with 0.5917107837535422 Task 5 executing with 0.24927356845428306 Task 11 executing with 0.029055109351983965 Task 8 executing with 0.7944756165805348 Task 12 executing with 0.953161547946998 Task 15 executing with 0.9633620949523187 Task 18 executing with 0.5630690381245529 Task 21 executing with 0.7917587608087215 Task 13 executing with 0.8886629299919615 Task 22 executing with 0.7678136381876737 Task 19 executing with 0.20375196689255326 Task 16 executing with 0.5825942559782655 Task 14 executing with 0.7629464546697364 Task 17 executing with 0.6080522573065245 Task 20 executing with 0.15562873549515743 Task 23 executing with 0.3991570170366793 Task 24 executing with 0.23485439519262563 Task 27 executing with 0.060887602001631014 Task 30 executing with 0.014004149328884608 Task 33 executing with 0.8863013572225961 Task 25 executing with 0.6254970384413928 Task 28 executing with 0.06705618072486286 Task 34 executing with 0.25579916690630056 Task 31 executing with 0.1140059202602185 Task 35 executing with 0.12945687883162815 Task 29 executing with 0.8168323186884446 Task 32 executing with 0.4232992703694398 Task 26 executing with 0.7958862148609922 Task 36 executing with 0.3210403570430992 Task 39 executing with 0.43617001646541165 Task 37 executing with 0.2931899057063859 Task 38 executing with 0.2332844283750951 |
Next, let’s look at the same example using a chunksize.
Example With Chunks
We can update the previous example to use a chunksize.
This can be achieved by adding a “chunksize” argument to the call to map().
Given that we are issuing 40 calls to the task() function to the ThreadPool and that we have 4 worker threads, we can divide the work evenly into 4 groups of ten function calls.
This even division is a good starting point, but it is always a good idea to test different values of the chunksize and optimize it for the performance of your specific application with some trial and error.
This means we can set the chunksize to 10.
1 2 3 |
... # execute tasks in chunks, block until all complete pool.map(task, range(40), chunksize=10) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# SuperFastPython.com # example of parallel map() with the thread pool with a larger iterable and chunksize from random import random from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(identifier): # generate a value value = random() # report a message print(f'Task {identifier} executing with {value}') # block for a moment sleep(1) # return the generated value return value # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool(4) as pool: # execute tasks in chunks, block until all complete pool.map(task, range(40), chunksize=10) # thread pool is closed automatically |
Running the example first creates the ThreadPool with 4 thread workers.
The map() function is then called for the range with a chunksize of 10.
This issues 4 units of work to the ThreadPool, one for each worker thread and each composed of 10 calls to the task() function.
Each call to the task function generates a random number between 0 and 1, reports a message, blocks, then returns a value.
The main thread iterates over the values returned from the calls to the task() function and reports the generated values, matching those generated in each worker thread.
On my system, the example took about 10.2 seconds to complete. This is about 1.20x faster.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
Task 0 executing with 0.4292359227954552 Task 10 executing with 0.8035383645900721 Task 20 executing with 0.8055058403391574 Task 30 executing with 0.5716898589126458 Task 11 executing with 0.7301860867528934 Task 1 executing with 0.9427468480637557 Task 31 executing with 0.3120724424695628 Task 21 executing with 0.8669744851178738 Task 2 executing with 0.8822181175850636 Task 22 executing with 0.5356334224776518 Task 12 executing with 0.36804922820385 Task 32 executing with 0.37973678217433093 Task 23 executing with 0.9655524667616698 Task 3 executing with 0.1994058828293016 Task 33 executing with 0.49890604894446444 Task 13 executing with 0.9088708711374434 Task 4 executing with 0.9044398148839545 Task 34 executing with 0.7193553257167641 Task 24 executing with 0.02503920599590137 Task 14 executing with 0.42626836352563635 Task 35 executing with 0.6325508905206192 Task 5 executing with 0.564840401593125 Task 25 executing with 0.004884008120288219 Task 15 executing with 0.37919302159091073 Task 6 executing with 0.2471211041819601 Task 26 executing with 0.7081653903136206 Task 36 executing with 0.4169043153310472 Task 16 executing with 0.7107127096350152 Task 37 executing with 0.8624703449972991 Task 17 executing with 0.5654652526893504 Task 27 executing with 0.5867574922576663 Task 7 executing with 0.06293018780599402 Task 18 executing with 0.524178579044118 Task 38 executing with 0.4080694402854004 Task 8 executing with 0.5836109732516527 Task 28 executing with 0.5069538921285582 Task 9 executing with 0.8417207064383568 Task 39 executing with 0.9516992338401458 Task 29 executing with 0.8145430811197704 Task 19 executing with 0.6213181769812072 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to use the map() method to issue many tasks to the ThreadPool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Kevin Schmid on Unsplash
Bhavani says
Can I combine asyncio with thread pool? Basically the task is an ‘async def’. Will i be able to use thread pool to execute this task? If so, do you have any reference to such usage? Thanks in advance.
Jason Brownlee says
Best way to use thread pools, is to use the ThreadPoolExecutor from asyncio via the asyncio.to_thread() function.
https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread
You would not run coroutines in worker threads, instead you would call blocking functions from coroutines in worker threads.
Does that help?
Francisco says
Dear Jason
Your code and clear explanations saved my assignments
I was very frustrated searching to make the simple words count in parallel
I was thinking to quit multiprocessing, and then, you show me the light with this post
good work and big thank you for enlighten us all
Regards
Francisco
Jason Brownlee says
You’re welcome Francisco, I’m happy it helped.
Hang in there!