ThreadPool Initialize Worker Threads in Python
You can initialize worker threads in the ThreadPool by setting the "initializer" argument in the multiprocessing.pool.ThreadPool class constructor.
In this tutorial you will discover how to initialize worker threads in the ThreadPool in Python.
Let's get started.
Need to Initialize Worker Threads
The multiprocessing.pool.ThreadPool in Python provides a pool of reusable threads for executing ad hoc tasks.
A thread pool object which controls a pool of worker threads to which jobs can be submitted.
-- multiprocessing — Process-based parallelism
The ThreadPool class extends the Pool class. The Pool class provides a pool of worker processes for process-based concurrency.
Although the ThreadPool class is in the multiprocessing module it offers thread-based concurrency and is best suited to IO-bound tasks, such as reading or writing from sockets or files.
A ThreadPool can be configured when it is created, which will prepare the new threads.
We can issue one-off tasks to the ThreadPool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using a ThreadPool, we may need to initialize a variable, data, or resource to be used by each worker threads across all tasks executed by that worker.
For example, perhaps each thread is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker thread in the ThreadPool, prior to executing any tasks.
How can we initialize worker threads in the ThreadPool?
How to Initialize Worker Threads
We can configure worker threads in the ThreadPool to execute an initialization function prior to executing tasks.
This can be achieved by setting the "initializer" argument when configuring a ThreadPool object via the class constructor.
By default, there is no initializer function and "initializer" is set to None.
The "initializer" argument can be set to the name of a function that will be called to initialize the worker threads.
If initializer is not None then each worker process will call initializer(*initargs) when it starts.
-- multiprocessing — Process-based parallelism
For example:
# worker thread initialization function
def worker_init():
# ...
...
# create a thread pool and initialize workers
pool = multiprocessing.pool.ThreadPool(initializer=worker_init)
If our worker thread initialization function takes arguments, they can be specified to the thread pool constructor via the "initargs" argument, which takes an ordered list or tuple of arguments for the custom initialization function.
For example:
# worker thread initialization function
def worker_init(arg1, arg2, arg3):
# ...
...
# create a thread pool and initialize workers
pool = multiprocessing.pool.ThreadPool(initializer=worker_init, initargs=(arg1, arg2, arg3))
Now that we know how to initialize worker threads in the ThreadPool, let's look at a worked example.
Example of Initializing Worker Threads
We can develop an example of calling a custom function to initialize each thread in the ThreadPool.
In this example we will define a task to simulate work that will report a message and block for a moment. We will also define a simple worker threads initialization function that will simply report a message. We will then configure a ThreadPool to initialize the workers with our initialization function and execute a number of tasks.
The messages will show that the workers are initialized once right after they are created in the thread pool, then thread executes tasks until all work is complete.
Firstly, we can define a function to execute tasks in the thread pool. The function will report a message and block for a moment to simulate computational effort.
The task() function below implements this.
# task executed in a worker thread
def task():
# report a message
print('Worker executing task...')
# block for a moment
sleep(1)
Next, we can define a function to initialize the worker threads.
The initialization function won't do anything interesting in this case, just report a message so we have some idea that the initialization function was called and when it was called, e.g. once, before executing tasks.
The initialize_worker() function below implements this.
# initialize a worker in the thread pool
def initialize_worker():
# report a message
print('Initializing worker...')
Next, we can create a ThreadPool and configure it to use our initialize_worker() function to initialize each worker thread.
We will create the ThreadPool using the context manager interface and configure it to create two worker threads.
...
# create and configure the thread pool
with ThreadPool(2, initializer=initialize_worker) as pool:
# ...
Next, we will issue four tasks to the ThreadPool to be executed asynchronously. There are only two workers, so only two tasks will be executed at a time.
...
# issue tasks to the thread pool
for _ in range(4):
_ = pool.apply_async(task)
Finally, we will close down the ThreadPool and wait for all tasks to complete.
...
# close the thread pool
pool.close()
# wait for all tasks to complete
pool.join()
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of initializing worker threads in the thread pool
from time import sleep
from multiprocessing.pool import ThreadPool
# task executed in a worker thread
def task():
# report a message
print('Worker executing task...')
# block for a moment
sleep(1)
# initialize a worker in the thread pool
def initialize_worker():
# report a message
print('Initializing worker...')
# protect the entry point
if __name__ == '__main__':
# create and configure the thread pool
with ThreadPool(2, initializer=initialize_worker) as pool:
# issue tasks to the thread pool
for _ in range(4):
_ = pool.apply_async(task)
# close the thread pool
pool.close()
# wait for all tasks to complete
pool.join()
Running the example first creates and configures the ThreadPool.
Tasks are issued to the pool and worker threads are created as needed to execute the tasks.
After the worker threads are created they are initialized, then start executing the issued tasks.
Importantly, each worker thread is initialized once and only before it begins consuming and completing tasks in the pool.
Initializing worker...
Initializing worker...
Worker executing task...
Worker executing task...
Worker executing task...
Worker executing task...
Names of Threads That Initialize Workers
It may be helpful to know which thread is used to initialize the worker threads.
For example, if the same thread is used to initialize the worker thread as is used to execute tasks in the worker threads, we can share data across tasks using mechanisms such as thread-local data and/or global variables.
We can get the current thread and current thread in the initialization function and report their name. The same can be done in the task function executed by worker thread, and we can see if the names match.
This can be achieved using the threading.current_thread() function.
We can update the example in the previous section to first get the current thread, then report its name.
Firstly, we can update the task function executed by worker threads to report the current thread name.
# task executed in a worker thread
def task():
# get the current thread
thread = current_thread()
# report a message
print(f'Worker executing task, thread={thread.name}')
# block for a moment
sleep(1)
Next, we can update the worker thread initialization function to report the thread name.
# initialize a worker in the thread pool
def initialize_worker():
# get the current thread
thread = current_thread()
# report a message
print(f'Initializing worker, thread={thread.name}')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of initializing workers in a thread pool and reporting thread names
from time import sleep
from multiprocessing.pool import ThreadPool
from threading import current_thread
# task executed in a worker thread
def task():
# get the current thread
thread = current_thread()
# report a message
print(f'Worker executing task, thread={thread.name}')
# block for a moment
sleep(1)
# initialize a worker in the thread pool
def initialize_worker():
# get the current thread
thread = current_thread()
# report a message
print(f'Initializing worker, thread={thread.name}')
# protect the entry point
if __name__ == '__main__':
# create and configure the thread pool
with ThreadPool(2, initializer=initialize_worker) as pool:
# issue tasks to the thread pool
for _ in range(4):
_ = pool.apply_async(task)
# close the thread pool
pool.close()
# wait for all tasks to complete
pool.join()
Running the example creates the thread pool and configures it so that worker threads are initialized using our custom function, as before.
Four tasks are issued to the threads pool and the main thread waits for all tasks to complete.
The name of the worker thread within the ThreadPool is reported both when initializing the workers and in executing the task.
Interestingly, we can see that the same thread that executes the task is used to initialize the worker.
For example the first worker thread has the name "Thread-1" and executes both the initialization function and the task function.
This highlights that we can use mechanisms like global variables and thread-local storage in worker threads to share data across tasks executed by worker threads.
Initializing worker, thread=Thread-1
Initializing worker, thread=Thread-2
Worker executing task, thread=Thread-1
Worker executing task, thread=Thread-2
Worker executing task, thread=Thread-1
Worker executing task, thread=Thread-2
Takeaways
You now know how to initialize workers in the ThreadPool class in Python.
If you enjoyed this tutorial, you will love my book: Python ThreadPool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.