Last Updated on January 25, 2022
You can set ThreadPoolExecutor initializer functions via the initializer argument.
In this tutorial, you will discover how to set the initializer function for thread pools in Python
Let’s get started.
Table of Contents
Need to Initialize Worker Threads in the ThreadPoolExecutor
The ThreadPoolExecutor in Python provides a pool of reusable threads for executing ad hoc tasks.
You can submit tasks to the thread pool by calling the submit() function and passing in the name of the function you wish to execute on another thread.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You can also submit tasks by calling the map() function and specifying the name of the function to execute and the iterable of items to which your function will be applied.
When using a thread pool, we may need to initialize a variable, data, or resource to be used by each worker thread across all tasks executed by that thread.
For example, perhaps each thread is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker thread in the thread pool prior to executing any tasks.
How can we initialize worker threads in the ThreadPoolExecutor?
How to Initialize Worker Threads
Worker threads can call a function before they start processing tasks.
This is called an initialization function and can be specified via the initializer argument when creating a thread pool.
If the initializer function takes arguments, they can be passed in via the initargs argument to the thread pool, which is a tuple of arguments to pass to the initializer function.
For example:
1 2 3 |
... # configure an initialization function for each worker thread executor = ThreadPoolExecutor(initializer=custom_initializer) |
By default, there is no initializer function.
The ThreadPoolExecutor will create threads as needed to complete submitted work tasks.
That is, the ThreadPoolExecutor does not create all of the worker threads up-front when the thread pool is created; instead, it will create worker threads just-in-time until the fixed number of worker threads specified when configuring the thread pool are created.
If the initializer function is set, it is called for each worker thread as the thread is created.
Now that we know how to initialize worker threads in the ThreadPoolExecutor, let’s look at a worked example.
Example of Initializing Worker Threads
Let’s develop an example of calling a custom function to initialize each thread in the ThreadPoolExecutor.
First, let’s define a simple task that takes a unique identifier as an argument, sleeps for a fraction of a second, and returns the same identifier.
1 2 3 4 5 |
# a mock task that sleeps for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier |
Next, we can define an initialization function for each worker thread.
Our function will retrieve the unique name of the worker thread. This will be achieved by calling the current_thread() function from the threading module to get the current thread’s context, then access the name property.
1 2 3 |
... # get the unique name for this thread name = current_thread().name |
Our custom initialization function for each worker thread will access the name of the thread and report it with a print statement.
1 2 3 4 5 6 |
# function for initializing the worker thread def initializer_worker(): # get the unique name for this thread name = current_thread().name # store the unique worker name in a thread local variable print(f'Initializing worker thread {name}') |
We can create a thread pool using the context manager with two worker threads and specify our custom initialization function via the initializer argument.
1 2 3 4 |
... # create a thread pool with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # ... |
Finally, we can call the map() function to submit ten tasks to the thread pool for execution and report the results returned from the target task function.
1 2 3 4 |
... # execute tasks for result in executor.map(task, range(10)): print(result) |
Tying this all together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# SuperFastPython.com # example of a custom worker thread initialization function from time import sleep from random import random from threading import current_thread from concurrent.futures import ThreadPoolExecutor # function for initializing the worker thread def initializer_worker(): # get the unique name for this thread name = current_thread().name # store the unique worker name in a thread local variable print(f'Initializing worker thread {name}') # a mock task that sleeps for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier # create a thread pool with ThreadPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # execute tasks for result in executor.map(task, range(10)): print(result) |
Running the example, we can see that the two threads are initialized before running any tasks, then all ten tasks are completed successfully.
1 2 3 4 5 6 7 8 9 10 11 12 |
Initializing worker thread ThreadPoolExecutor-0_0 Initializing worker thread ThreadPoolExecutor-0_1 0 1 2 3 4 5 6 7 8 9 |
Takeaways
You now know how to configure the ThreadPoolExecutor initializer function.
Do you have any questions about how to use ThreadPoolExecutor initializer functions?
Ask your question in the comments below and I will do my best to answer.
Photo by Dmitrii Vaccinium on Unsplash
Do you have any questions?