Last Updated on October 29, 2022
You can initialize worker threads in the ThreadPool by setting the “initializer” argument in the multiprocessing.pool.ThreadPool class constructor.
In this tutorial you will discover how to initialize worker threads in the ThreadPool in Python.
Let’s get started.
Need to Initialize Worker Threads
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
— multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using a ThreadPool, we may need to initialize a variable, data, or resource to be used by each worker threads across all tasks executed by that worker.
For example, perhaps each thread is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker thread in the ThreadPool, prior to executing any tasks.
How can we initialize worker threads in the ThreadPool?
Run loops using all CPUs, download your FREE book to learn how.
How to Initialize Worker Threads
We can configure worker threads in the ThreadPool to execute an initialization function prior to executing tasks.
This can be achieved by setting the “initializer” argument when configuring a ThreadPool object via the class constructor.
By default, there is no initializer function and “initializer” is set to None.
The “initializer” argument can be set to the name of a function that will be called to initialize the worker threads.
If initializer is not None then each worker process will call initializer(*initargs) when it starts.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 7 |
# worker thread initialization function def worker_init(): # ... ... # create a thread pool and initialize workers pool = multiprocessing.pool.ThreadPool(initializer=worker_init) |
If our worker thread initialization function takes arguments, they can be specified to the thread pool constructor via the “initargs” argument, which takes an ordered list or tuple of arguments for the custom initialization function.
For example:
1 2 3 4 5 6 7 |
# worker thread initialization function def worker_init(arg1, arg2, arg3): # ... ... # create a thread pool and initialize workers pool = multiprocessing.pool.ThreadPool(initializer=worker_init, initargs=(arg1, arg2, arg3)) |
Now that we know how to initialize worker threads in the ThreadPool, let’s look at a worked example.
Example of Initializing Worker Threads
We can develop an example of calling a custom function to initialize each thread in the ThreadPool.
In this example we will define a task to simulate work that will report a message and block for a moment. We will also define a simple worker threads initialization function that will simply report a message. We will then configure a ThreadPool to initialize the workers with our initialization function and execute a number of tasks.
The messages will show that the workers are initialized once right after they are created in the thread pool, then thread executes tasks until all work is complete.
Firstly, we can define a function to execute tasks in the thread pool. The function will report a message and block for a moment to simulate computational effort.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in a worker thread def task(): # report a message print('Worker executing task...') # block for a moment sleep(1) |
Next, we can define a function to initialize the worker threads.
The initialization function won’t do anything interesting in this case, just report a message so we have some idea that the initialization function was called and when it was called, e.g. once, before executing tasks.
The initialize_worker() function below implements this.
1 2 3 4 |
# initialize a worker in the thread pool def initialize_worker(): # report a message print('Initializing worker...') |
Next, we can create a ThreadPool and configure it to use our initialize_worker() function to initialize each worker thread.
We will create the ThreadPool using the context manager interface and configure it to create two worker threads.
1 2 3 4 |
... # create and configure the thread pool with ThreadPool(2, initializer=initialize_worker) as pool: # ... |
Next, we will issue four tasks to the ThreadPool to be executed asynchronously. There are only two workers, so only two tasks will be executed at a time.
1 2 3 4 |
... # issue tasks to the thread pool for _ in range(4): _ = pool.apply_async(task) |
Finally, we will close down the ThreadPool and wait for all tasks to complete.
1 2 3 4 5 |
... # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of initializing worker threads in the thread pool from time import sleep from multiprocessing.pool import ThreadPool # task executed in a worker thread def task(): # report a message print('Worker executing task...') # block for a moment sleep(1) # initialize a worker in the thread pool def initialize_worker(): # report a message print('Initializing worker...') # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool(2, initializer=initialize_worker) as pool: # issue tasks to the thread pool for _ in range(4): _ = pool.apply_async(task) # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates and configures the ThreadPool.
Tasks are issued to the pool and worker threads are created as needed to execute the tasks.
After the worker threads are created they are initialized, then start executing the issued tasks.
Importantly, each worker thread is initialized once and only before it begins consuming and completing tasks in the pool.
1 2 3 4 5 6 |
Initializing worker... Initializing worker... Worker executing task... Worker executing task... Worker executing task... Worker executing task... |
Free Python ThreadPool Course
Download your FREE ThreadPool PDF cheat sheet and get BONUS access to my free 7-day crash course on the ThreadPool API.
Discover how to use the ThreadPool including how to configure the number of worker threads and how to execute tasks asynchronously
Names of Threads That Initialize Workers
It may be helpful to know which thread is used to initialize the worker threads.
For example, if the same thread is used to initialize the worker thread as is used to execute tasks in the worker threads, we can share data across tasks using mechanisms such as thread-local data and/or global variables.
We can get the current thread and current thread in the initialization function and report their name. The same can be done in the task function executed by worker thread, and we can see if the names match.
This can be achieved using the threading.current_thread() function.
We can update the example in the previous section to first get the current thread, then report its name.
Firstly, we can update the task function executed by worker threads to report the current thread name.
1 2 3 4 5 6 7 8 |
# task executed in a worker thread def task(): # get the current thread thread = current_thread() # report a message print(f'Worker executing task, thread={thread.name}') # block for a moment sleep(1) |
Next, we can update the worker thread initialization function to report the thread name.
1 2 3 4 5 6 |
# initialize a worker in the thread pool def initialize_worker(): # get the current thread thread = current_thread() # report a message print(f'Initializing worker, thread={thread.name}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# SuperFastPython.com # example of initializing workers in a thread pool and reporting thread names from time import sleep from multiprocessing.pool import ThreadPool from threading import current_thread # task executed in a worker thread def task(): # get the current thread thread = current_thread() # report a message print(f'Worker executing task, thread={thread.name}') # block for a moment sleep(1) # initialize a worker in the thread pool def initialize_worker(): # get the current thread thread = current_thread() # report a message print(f'Initializing worker, thread={thread.name}') # protect the entry point if __name__ == '__main__': # create and configure the thread pool with ThreadPool(2, initializer=initialize_worker) as pool: # issue tasks to the thread pool for _ in range(4): _ = pool.apply_async(task) # close the thread pool pool.close() # wait for all tasks to complete pool.join() |
Running the example creates the thread pool and configures it so that worker threads are initialized using our custom function, as before.
Four tasks are issued to the threads pool and the main thread waits for all tasks to complete.
The name of the worker thread within the ThreadPool is reported both when initializing the workers and in executing the task.
Interestingly, we can see that the same thread that executes the task is used to initialize the worker.
For example the first worker thread has the name “Thread-1” and executes both the initialization function and the task function.
This highlights that we can use mechanisms like global variables and thread-local storage in worker threads to share data across tasks executed by worker threads.
1 2 3 4 5 6 |
Initializing worker, thread=Thread-1 Initializing worker, thread=Thread-2 Worker executing task, thread=Thread-1 Worker executing task, thread=Thread-2 Worker executing task, thread=Thread-1 Worker executing task, thread=Thread-2 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Books
- Python ThreadPool Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- ThreadPool PDF Cheat Sheet
I also recommend specific chapters from the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python ThreadPool: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python Threading: The Complete Guide
APIs
References
Takeaways
You now know how to initialize workers in the ThreadPool class in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Andrew Palmer on Unsplash
Do you have any questions?