Last Updated on August 8, 2022
You can initialize workers in the process pool by setting the “initializer” argument in the multiprocessing.pool.Pool class constructor.
In this tutorial you will discover how to initialize worker processes in the process pool in Python.
Learn multiprocessing pools systematically with this 7-day jump-start.
Let’s get started.
Table of Contents
Need to Initialize Worker Processes
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map(). Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using a process pool, we may need to initialize a variable, data, or resource to be used by each worker process across all tasks executed by that process.
For example, perhaps each process is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker process in the process pool, prior to executing any tasks.
How can we initialize worker processes in the process pool?
Get the whole Pool class API at a glance with this PDF cheat sheet.
How to Initialize Worker Processes
We can configure worker processes in the process pool to execute an initialization function prior to executing tasks.
This can be achieved by setting the “initializer” argument when configuring the process pool via the class constructor.
By default, there is no initializer function and “initializer” is set to None.
The “initializer” argument can be set to the name of a function that will be called to initialize the worker processes.
If initializer is not None then each worker process will call initializer(*initargs) when it starts.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 7 |
# worker process initialization function def worker_init(): # ... ... # create a process pool and initialize workers pool = multiprocessing.pool.Pool(initializer=worker_init) |
If our worker process initialization function takes arguments, they can be specified to the process pool constructor via the “initargs” argument, which takes an ordered list or tuple of arguments for the custom initialization function.
For example:
1 2 3 4 5 6 7 |
# worker process initialization function def worker_init(arg1, arg2, arg3): # ... ... # create a process pool and initialize workers pool = multiprocessing.pool.Pool(initializer=worker_init, initargs=(arg1, arg2, arg3)) |
Now that we know how to initialize worker processes in the process pool, let’s look at a worked example.
How well do you know multiprocessing?
Find out with 180+ answers to interview questions. Learn more.
Example of Initializing Worker Processes
We can develop an example of calling a custom function to initialize each process in the process pool.
In this example we will define a task to simulate work that will report a message and block for a moment. We will also define a simple worker process initialization function that will simply report a message. We will then configure a process pool to initialize the workers with our initialization function and execute a number of tasks.
The messages will show that the workers are initialized once right after they are created in the process pool, then process tasks until all work is complete.
Firstly, we can define a function to execute tasks in the process pool. The function will report a message and block for a moment to simulate computational effort.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in a worker process def task(): # report a message print('Worker executing task...', flush=True) # block for a moment sleep(1) |
Next, we can define a function to initialize the worker processes.
The initialization function won’t do anything interesting, just report a message so we have some idea that the initialization function was called and when it was called, e.g. once, before executing tasks.
The initialize_worker() function below implements this.
1 2 3 4 |
# initialize a worker in the process pool def initialize_worker(): # report a message print('Initializing worker...', flush=True) |
Next, we can create a process pool and configure it to use our initialize_worker() function to initialize each worker process.
We will create the process pool using the context manager interface and configure it to create two worker processes.
1 2 3 4 |
... # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # ... |
Next, we will issue four tasks to the process pool to be executed asynchronously. There are only two workers, so only two tasks will be executed at a time.
1 2 3 4 |
... # issue tasks to the process pool for _ in range(4): pool.apply_async(task) |
Finally, we will close down the process pool and wait for all tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of initializing worker processes in the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print('Worker executing task...', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def initialize_worker(): # report a message print('Initializing worker...', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates and configures the process pool.
Tasks are issued to the pool and worker processes are created as needed to execute the tasks.
After the worker processes are created they are initialized, then start executing the issued tasks.
Importantly, each worker process is initialized once and only before it begins consuming and completing tasks in the pool.
1 2 3 4 5 6 |
Initializing worker... Worker executing task... Initializing worker... Worker executing task... Worker executing task... Worker executing task... |
Names of Threads and Processes That Initialize Worker Processes
It may be helpful to know which process and which thread in that process is used to initialize the worker processes.
For example, if the same thread is used to initialize the worker process as is used to execute tasks in the worker process, we can share data across tasks using mechanisms such as thread-local data and/or global variables.
We can get the current process and current thread in the initialization function and report their name. The same can be done in the task function executed by worker process, and we can see if the names match.
This can be achieved using the multiprocessing.current_process() and threading.current_thread() functions.
We can update the example in the previous section to first get the current process and thread, then report their names.
Firstly, we can update the task function executed by worker processes to report the current process and thread names.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Worker executing task, process={process.name}, thread={thread.name}', flush=True) # block for a moment sleep(1) |
Next, we can update the worker process initialization function to report the process and thread names.
1 2 3 4 5 6 7 8 |
# initialize a worker in the process pool def initialize_worker(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Initializing worker, process={process.name}, thread={thread.name}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of initializing workers in a process pool and reporting threads and processes from time import sleep from multiprocessing.pool import Pool from multiprocessing import current_process from threading import current_thread # task executed in a worker process def task(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Worker executing task, process={process.name}, thread={thread.name}', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def initialize_worker(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Initializing worker, process={process.name}, thread={thread.name}', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example creates the thread pool and configures it so that worker processes are initialized using our custom function, as before.
Four tasks are issued to the process pool and the main process waits for all tasks to complete.
THe name of the worker process and thread within the process is reported both when initializing the workers and in executing the task.
Interestingly, we can see that the same process that executes the task is used to initialize the worker and the same thread is used in each case.
For example the first worker process has the name “SpawnPoolWorker-1” and executes both the initialization function and the task function in the main thread named “MainThread” of the process.
This highlights that we can use mechanisms like global variables and thread-local storage in worker processes to share data across tasks executed by a worker process.
1 2 3 4 5 6 |
Initializing worker, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Initializing worker, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread |
Takeaways
You now know how to initialize workers in the multiprocessing process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by GEORGE DESIPRIS on Unsplash
Do you have any questions?