Last Updated on February 14, 2022
You can set ProcessPoolExecutor initializer functions via the “initializer” argument.
In this tutorial you will discover how to set the initializer function for process pools in Python
Let’s get started.
Table of Contents
Need to Initialize Worker Processes
The ProcessPoolExecutor in Python provides a pool of reusable processes for executing ad hoc tasks.
You can submit tasks to the process pool by calling the submit() function and passing in the name of the function you wish to execute on another process.
Calling the submit() function will return a Future object that allows you to check on the status of the task and get the result from the task once it completes.
You can also submit tasks by calling the map() function and specify the name of the function to execute and the iterable of items to which your function will be applied.
When using a process pool, we may need to initialize a variable, data, or resource to be used by each worker process across all tasks executed by that process.
For example, perhaps each process is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker process in the process pool, prior to executing any tasks.
How can we initialize worker processes in the ProcessPoolExecutor?
How to Initialize Worker Processes
Worker processes can call a function before they start processing tasks.
This is called an initializer function and can be specified via the “initializer” argument when creating a process pool. If the initializer function takes arguments, they can be passed in via the “initargs” argument to the process pool which is a tuple of arguments to pass to the initializer function.
For example:
1 2 3 |
... # configure an initialization function for each worker process executor = ProcessPoolExecutor(initializer=custom_initializer) |
By default, there is no initializer function.
The ProcessPoolExecutor will create processes as needed to complete submitted work tasks.
That is, the ProcessPoolExecutor does not create all of the worker processes up-front when the process pool is created, instead, it will create worker processes just-in-time until the fixed number of worker processes specified when configuring the process pool are created.
If the initializer function is set, it is called for each worker process as the process is created.
Now that we know how to initialize worker processes in the ProcessPoolExecutor, let’s look at a worked example.
Example of Initializing Worker Processes
Let’s develop an example of calling a custom function to initialize each process in the ProcessPoolExecutor.
First, let’s define a simple task that takes a unique identifier as an argument, blocks for a fraction of a second and returns the same identifier.
1 2 3 4 5 |
# a task that blocks for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier |
Next, we can define an initialization function for each worker process.
We can define a simple process initialization function. In this case the function does nothing other than print a message.
1 2 3 4 |
# function for initializing the worker processes def initializer_worker(): # report an initialization message print(f'Initializing worker process.', flush=True) |
We can create a process pool using the context manager with two worker processes and specify our custom initialization function via the “initializer” argument.
1 2 3 4 |
... # create a process pool with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # ... |
Finally, we can call the map() function to submit ten tasks to the process pool for execution and report the results returned from the target task function.
1 2 3 4 |
... # execute tasks for result in executor.map(task, range(10)): print(result) |
Tying this all together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of a custom worker process initialization function from time import sleep from random import random from concurrent.futures import ProcessPoolExecutor # function for initializing the worker processes def initializer_worker(): # report an initialization message print(f'Initializing worker process.', flush=True) # a mock task that sleeps for a random amount of time less than one second def task(identifier): sleep(random()) # get the unique name return identifier # entry point def main(): # create a process pool with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor: # execute asks for result in executor.map(task, range(10)): print(result) if __name__ == '__main__': main() |
Running the example, we can see that the two processes are initialized before running any tasks, then all ten tasks are completed successfully.
1 2 3 4 5 6 7 8 9 10 11 12 |
Initializing worker process. Initializing worker process. 0 1 2 3 4 5 6 7 8 9 |
Takeaways
You now know how to configure the ProcessPoolExecutor initializer function.
Do you have any questions?
Ask your question in the comments below and I will do my best to answer.
Photo by Daniel Eledut on Unsplash
Do you have any questions?