Last Updated on October 29, 2022
You can create a thread pool using the multiprocessing.ThreadPool class.
In this tutorial you will discover the ThreadPool wrapper for the multiprocessing.Pool class in Python.
Let’s get started.
What is a Thread
A thread refers to a thread of execution by a computer program.
Every Python program is a process with one thread called the main thread used to execute your program instructions. Each process is in fact one instance of the Python interpreter that executes Python instructions (Python bytecode), which is a slightly lower level than the code you type into your Python program.
Sometimes, we may need to create additional threads within our Python program to execute tasks concurrently.
Python provides real naive (system-level) threads via the threading.Thread class.
A task can be run in a new thread by creating an instance of the threading.Thread class and specifying the function to run in the new thread via the target argument.
1 2 3 |
... # create and configure a new thread to run a function thread = Thread(target=task) |
Once the thread is created, it must be started by calling the start() method.
1 2 3 |
... # start the task in a new thread thread.start() |
We can then wait around for the task to complete by joining the thread; for example
1 2 3 |
... # wait for the task to complete thread.join() |
You can learn more about how to run functions in a new thread in the tutorial:
We can demonstrate this with a complete example with a task that sleeps for a moment and prints a message.
The complete example of executing a target task function in a separate thread is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of executing a function in a new thread from time import sleep from threading import Thread # task function executed in a new thread def task(): # block for a moment sleep(1) # display a message print('This is coming from another thread') # create and configure a new thread to run a function thread = Thread(target=task) # start the task in a new thread thread.start() # display a message print('Waiting for the new thread to finish...') # wait for the task to complete thread.join() |
Running the example creates the thread object to run the task() function.
The thread is started and the task() function is executed in another thread. The task sleeps for a moment; meanwhile, in the main thread, a message is printed that we are waiting around and the main thread joins the new thread waiting for it to terminate.
Finally, the new thread finishes sleeping, prints a message, and closes. The main thread then carries on and also closes as there are no more instructions to execute.
1 2 |
Waiting for the new thread to finish... This is coming from another thread |
This is useful for running one-off ad hoc tasks in a separate thread, although it becomes cumbersome when you have many tasks to run.
Each thread that is created requires the application of resources (e.g. memory for the thread’s stack space). The computational costs for setting up threads can become expensive if we are creating and destroying many threads over and over for ad hoc tasks.
Instead, we would prefer to keep worker threads around for reuse if we expect to run many ad hoc tasks throughout our program.
This can be achieved using a thread pool.
Run loops using all CPUs, download your FREE book to learn how.
What is a Thread Pool
A thread pool is a programming pattern for automatically managing a pool of worker threads.
The pool is responsible for a fixed number of threads.
- It controls when they are created, such as when they are needed.
- It also controls what they should do when they are not being used, such as making them wait without consuming computational resources.
The pool can provide a generic interface for executing ad hoc tasks with a variable number of arguments, much like the target property on the threading.Thread class, but does not require that we choose a thread to run the task, start the thread, or wait for the task to complete.
Python provides a thread pool via the ThreadPoolExecutor class.
You can learn more about the ThreadPoolExecutor class in the tutorial:
Python also provides a thread pool via the multiprocessing.pool.ThreadPool class.
What is the ThreadPool Class
The multiprocessing.pool.ThreadPool class provides a thread pool in Python.
It allows tasks to be submitted as functions to the thread pool to be executed concurrently.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class is in the multiprocessing module, rather than the threading module because it provides a thread-based wrapper for the multiprocessing.pool.Pool class.
A ThreadPool shares the same interface as Pool, which is designed around a pool of processes …
— multiprocessing — Process-based parallelism
Because ThreadPool is a wrapper for Pool, it does have some aspects that can be confusing initially, such as the number of workers are called “processes“.
To use the thread pool, we must first create and configure an instance of the class.
For example:
1 2 3 |
... # create a thread pool pool = multiprocessing.pool.ThreadPool(...) |
Once configured, tasks can be submitted to the pool for execution using blocking and asynchronous versions of apply() and map().
For example:
1 2 3 |
... # issues tasks for execution results = pool.map(task, items) |
Once we have finished with the thread pool, it can be closed and resources used by the pool can be released.
For example:
1 2 3 |
... # close the thread pool pool.close() |
Next, let’s take a closer look at each of these aspects in turn.
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
How to Configure the ThreadPool
The thread pool is configured via the class constructor.
All arguments to the constructor are optional, therefore it is possible to create a thread pool with all default configuration by providing no arguments.
For example:
1 2 3 |
... # create a thread pool with a default configuration pool = multiprocessing.pool.ThreadPool() |
The first argument is “processes” that specifies the number of workers to create and manage within the pool. The will be worker threads, not worker processes, the name of the argument is a hangover from the Pool class.
By default this equals the number of logical CPUs in your system.
processes is the number of worker threads to use. If processes is None then the number returned by os.cpu_count() is used.
— multiprocessing — Process-based parallelism
For example, if you had 4 physical CPU cores with hyperthreading, this would mean you would have 8 logical CPU cores and this would be the default number of workers in the thread pool.
In practice, we may have many more worker threads than we have CPU cores in our system, such as hundreds or thousands of threads.
For example:
1 2 3 |
... # create a thread pool with a given number of workers pool = multiprocessing.pool.ThreadPool(processes=100) |
Each worker thread may prepare resources used within that thread by executing tasks.
A good example might be access to a file, socket, or logging.
This can be achieved by configuring an initializer function to execute by each worker thread. The “initializer” argument specifies the name of the function and the “initargs” specifies any arguments to the function.
For example:
1 2 3 4 5 6 7 |
# initialization function for worker threads def init_worker(log_name): # ... ... # create a thread pool with initialized workers pool = multiprocessing.pool.ThreadPool(initializer=init_worker, initargs=(arg1,)) |
Unlike the multiprocessing.pool.Pool class, the multiprocessing.pool.ThreadPool does not support a “maxtasksperchild” argument to limit the number of tasks executed by a worker.
Now that we know how to configure the thread pool, let’s look at how we might submit tasks.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
How to Submit Tasks to the ThreadPool
There are four ways to issue tasks to the thread pool.
They are: apply() for one-off tasks, and map(), starmap(), and imap()/imap_unordered() for multiple tasks.
The apply() method will submit a task to the thread pool that executes a function with the given arguments.
It is used for one-off function calls to be executed by a worker thread and specifies the function to execute and any arguments to the function. It will block until the task is completed.
For example:
1 2 3 |
... # execute a function call by the thread pool result = pool.apply(task) |
The map() method is a parallel version of the built-in map() function.
Recall that the built-in map() function will call a specified function for each item on a provided iterable.
The map() method on the thread pool performs the same action, except that each function call on an item in an iterable is executed by a worker thread.
A common idiom for using map() is to iterate the results returned from the function call.
For example:
1 2 3 4 |
... # execute tasks in the thread pool and handle results for result in pool.map(task, items): # ... |
Unlike the built-in map() function, the pool version of map() only supports one iterable instead of multiple iterable, and all items are iterated and submitted as tasks instead of being lazily evaluated.
The starmap() method can be used for a version of map() that supports a target function that handles multiple arguments.
Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments. Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 |
... # prepare arguments args = [(1,2), (3,4), (5,6)] # execute tasks in the thread pool and handle results for result in pool.starmap(task, args): # ... |
The imap() method provides a version of the map() method on the thread pool that performs a lazy evaluation. That is, it does not dispatch tasks to the thread pool until just-in-time, such as when requesting the next result on the returned iterator.
It also yields task return values in the same lazy manner, as they become available. The imap_unordered() method is the same, except it yields task return values in the order that tasks are completed rather than the order they were issued.
For example:
1 2 3 4 |
... # execute tasks in the thread pool and handle results for result in pool.imap(task, items): # ... |
Most methods for issuing tasks to the thread pool have both a blocking (synchronous) and a non-blocking (asynchronous) version.
Recall that a blocking function call will not return until the function has finished, whereas a non-blocking function call will return immediately and provide a callback mechanism for getting the result of the function call.
The methods for issuing tasks to the thread pool listed above, apply(), map(), and starmap() are all blocking function calls.
The non-blocking or asynchronous versions of these methods have a _async suffix, for example:
- apply_async()
- map_async()
- starmap_async()
All async versions of the methods allow callback and error handling functions to be specified and called automatically once the issued task is complete.
The async methods return immediately with an instance of the AsyncResult class for each task that provides a mechanism for getting the results of the submitted tasks once they are completed.
Next, let’s take a closer look at the AsyncResult class.
What is the AsyncResult Class
The multiprocessing.pool.AsyncResult represents a result from a task issued asynchronously to the ThreadPool class (or the Pool class).
It provides a mechanism to check the status, wait for, and get the result for a task executed asynchronously in the thread pool.
An instance of the multiprocessing.pool.AsyncResult class is returned for each task submitted by both the appl_async(), map_async() and starmap_async() methods.
For example, a call to map_async() for a function task() with an iterable of ten items, will return a list of ten instances of the multiprocessing.pool.AsyncResult class.
For example:
1 2 3 |
... # submit tasks to the pool in a non-blocking manner async_result = pool.map_async(task, items) |
For a single task represented via a multiprocessing.pool.AsyncResult, we can check if the task is completed via the ready() method which returns True if the task is completed (successfully or with an error) or False otherwise.
For example:
1 2 3 4 |
... # check if a task is done if async_result.ready(): # ... |
A task may be completed successfully or may raise an Error or Exception. We can check if a task completed successfully via the successful() method. If the task is still running, it raises a ValueError.
For example:
1 2 3 4 |
... # check if a task was completed successfully if async_result.successful(): # ... |
We can wait for a task to complete via the wait() method.
If called with no argument, the call will block until the task finishes. A “timeout” can be provided so that the function will after a fixed number of seconds if the task has not completed.
For example:
1 2 3 |
... # wait 10 seconds for the task to complete async_result.wait(timeout=10) |
Finally, we can get the result from the task via the get() method.
If the task is finished, then get() will return immediately. If the task is still running, a call to get() will not return until the task finishes and returns the result.
For example:
1 2 3 |
... # get the result of a task result = async_result.get() |
If an exception was raised while the task was being executed, it is re-raised by the get() method in the parent thread.
Finally, a “timeout” argument can be specified when getting the result. It will return when the task is finished or after the fixed number of seconds have elapsed after which a time multiprocessing.TimeoutError is raised.
You can learn more about how to use the AsyncResult class in the tutorial:
Next, let’s look at how we might close down the thread pool.
How to Close the ThreadPool
The thread pool can be closed once we have no further tasks to issue.
We can call the close() method to close down the thread pool once all the currently issued tasks have completed.
The close() method will return immediately and the pool will not take any further tasks.
For example:
1 2 3 |
... # close the thread pool pool.close() |
The pool can also be closed by calling the terminate() method.
On the Pool class this will forcefully terminate the workers, even if they are are executing tasks. This cannot be achieved when using thread workers in the ThreadPool. As such, calling the terminate() method has the same effect as calling close().
For example:
1 2 3 |
... # forcefully close all worker threads pool.terminate() |
We may want to then wait for all tasks in the pool to finish.
This can be achieved by calling the join() method on the pool.
For example:
1 2 3 |
... # wait for all issued tasks to complete pool.join() |
Alternately, we may use the context manager interface with the ThreadPool class.
This will mean that all usage of the pool will be limited to the context manager block, and once this block is exited (normally or by an error), the pool will be closed automatically.
For example:
1 2 3 4 |
... # create the thread pool with multiprocessing.pool.ThreadPool() as pool: # ... |
This is the preferred way to use the thread pool, if possible.
Next, let’s look at an example of using the thread pool.
Example of Using the ThreadPool
Now that we are familiar with how to use the ThreadPool class, let’s look at a worked example.
We can update the above example of executing a task function in a new thread and waiting for the result seen above.
In this case, we can create the ThreadPool instance using the context manager interface.
For example:
1 2 3 4 |
... # create a thread pool with ThreadPool() as pool: # ... |
We can then issue the task asynchronously to the pool using the apply_async() method which will return immediately with an AsyncResult object.
1 2 3 |
... # execute the task asynchronously async_result = pool.apply_async(task) |
We can report a message, then wait for the task to complete by calling the wait() method on the AsyncResult object.
1 2 3 4 5 |
... # display a message print('Waiting for the new thread to finish...') # wait for the task to complete async_result.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# SuperFastPython.com # example of executing a function in a new thread using a threadpool from time import sleep from multiprocessing.pool import ThreadPool # task function executed in a new thread def task(): # block for a moment sleep(1) # display a message print('This is coming from another thread') # create a thread pool with ThreadPool() as pool: # execute the task asynchronously async_result = pool.apply_async(task) # display a message print('Waiting for the new thread to finish...') # wait for the task to complete async_result.wait() |
Running the example first starts the thread pool.
This will start the default number of worker threads in the pool, waiting and ready for work.
Next the task function is issued to the pool asynchronously. This call returns immediately with an AsyncResult object.
The main thread then reports a message and waits on the AsyncResult object for the task to complete.
A worker thread in the pool then executes the task, blocking for a moment and reporting a message.
The task completes and the main thread continues on.
The thread pool is closed automatically by the context manager interface and the program exits.
1 2 |
Waiting for the new thread to finish... This is coming from another thread |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to create and use a thread pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Filipe Dos Santos Mendes on Unsplash
Do you have any questions?