How to Initialize Worker Processes in the ProcessPoolExecutor
You can set ProcessPoolExecutor initializer functions via the "initializer" argument.
In this tutorial you will discover how to set the initializer function for process pools in Python.
Let's get started.
Need to Initialize Worker Processes
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You can also submit tasks by calling the map() function and specify the name of the function to execute and the iterable of items to which your function will be applied.
When using a process pool, we may need to initialize a variable, data, or resource to be used by each worker process across all tasks executed by that process.
For example, perhaps each process is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker process in the process pool, prior to executing any tasks.
How can we initialize worker processes in the ProcessPoolExecutor?
How to Initialize Worker Processes
Worker processes can call a function before they start processing tasks.
This is called an initializer function and can be specified via the "initializer" argument when creating a process pool. If the initializer function takes arguments, they can be passed in via the "initargs" argument to the process pool which is a tuple of arguments to pass to the initializer function.
For example:
...
# configure an initialization function for each worker process
executor = ProcessPoolExecutor(initializer=custom_initializer)
By default, there is no initializer function.
The ProcessPoolExecutor will create processes as needed to complete submitted work tasks.
That is, the ProcessPoolExecutor does not create all of the worker processes up-front when the process pool is created, instead, it will create worker processes just-in-time until the fixed number of worker processes specified when configuring the process pool are created.
If the initializer function is set, it is called for each worker process as the process is created.
Now that we know how to initialize worker processes in the ProcessPoolExecutor, let's look at a worked example.
Example of Initializing Worker Processes
Let's develop an example of calling a custom function to initialize each process in the ProcessPoolExecutor.
First, let's define a simple task that takes a unique identifier as an argument, blocks for a fraction of a second and returns the same identifier.
# a task that blocks for a random amount of time less than one second
def task(identifier):
sleep(random())
# get the unique name
return identifier
Next, we can define an initialization function for each worker process.
We can define a simple process initialization function. In this case the function does nothing other than print a message.
# function for initializing the worker processes
def initializer_worker():
# report an initialization message
print(f'Initializing worker process.', flush=True)
We can create a process pool using the context manager with two worker processes and specify our custom initialization function via the "initializer" argument.
...
# create a process pool
with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
# ...
Finally, we can call the map() function to submit ten tasks to the process pool for execution and report the results returned from the target task function.
...
# execute tasks
for result in executor.map(task, range(10)):
print(result)
Tying this all together, the complete example is listed below.
# SuperFastPython.com
# example of a custom worker process initialization function
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
# function for initializing the worker processes
def initializer_worker():
# report an initialization message
print(f'Initializing worker process.', flush=True)
# a mock task that sleeps for a random amount of time less than one second
def task(identifier):
sleep(random())
# get the unique name
return identifier
# entry point
def main():
# create a process pool
with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
# execute asks
for result in executor.map(task, range(10)):
print(result)
if __name__ == '__main__':
main()
Running the example, we can see that the two processes are initialized before running any tasks, then all ten tasks are completed successfully.
Initializing worker process.
Initializing worker process.
0
1
2
3
4
5
6
7
8
9
Takeaways
You now know how to configure the ProcessPoolExecutor initializer function.
If you enjoyed this tutorial, you will love my book: Python ProcessPoolExecutor Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.