Last Updated on September 12, 2022
You can initialize workers in the process pool by setting the “initializer” argument in the multiprocessing.pool.Pool class constructor.
In this tutorial you will discover how to initialize worker processes in the process pool in Python.
Let’s get started.
Need to Initialize Worker Processes
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map(). Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using a process pool, we may need to initialize a variable, data, or resource to be used by each worker process across all tasks executed by that process.
For example, perhaps each process is required to have its own handle for logging or connection to a remote server to be held open and reused when executing tasks.
We need a way of calling a function to initialize each worker process in the process pool, prior to executing any tasks.
How can we initialize worker processes in the process pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Initialize Worker Processes
We can configure worker processes in the process pool to execute an initialization function prior to executing tasks.
This can be achieved by setting the “initializer” argument when configuring the process pool via the class constructor.
By default, there is no initializer function and “initializer” is set to None.
The “initializer” argument can be set to the name of a function that will be called to initialize the worker processes.
If initializer is not None then each worker process will call initializer(*initargs) when it starts.
— multiprocessing — Process-based parallelism
For example:
1 2 3 4 5 6 7 |
# worker process initialization function def worker_init(): # ... ... # create a process pool and initialize workers pool = multiprocessing.pool.Pool(initializer=worker_init) |
If our worker process initialization function takes arguments, they can be specified to the process pool constructor via the “initargs” argument, which takes an ordered list or tuple of arguments for the custom initialization function.
For example:
1 2 3 4 5 6 7 |
# worker process initialization function def worker_init(arg1, arg2, arg3): # ... ... # create a process pool and initialize workers pool = multiprocessing.pool.Pool(initializer=worker_init, initargs=(arg1, arg2, arg3)) |
Now that we know how to initialize worker processes in the process pool, let’s look at a worked example.
Example of Initializing Worker Processes
We can develop an example of calling a custom function to initialize each process in the process pool.
In this example we will define a task to simulate work that will report a message and block for a moment. We will also define a simple worker process initialization function that will simply report a message. We will then configure a process pool to initialize the workers with our initialization function and execute a number of tasks.
The messages will show that the workers are initialized once right after they are created in the process pool, then process tasks until all work is complete.
Firstly, we can define a function to execute tasks in the process pool. The function will report a message and block for a moment to simulate computational effort.
The task() function below implements this.
1 2 3 4 5 6 |
# task executed in a worker process def task(): # report a message print('Worker executing task...', flush=True) # block for a moment sleep(1) |
Next, we can define a function to initialize the worker processes.
The initialization function won’t do anything interesting, just report a message so we have some idea that the initialization function was called and when it was called, e.g. once, before executing tasks.
The initialize_worker() function below implements this.
1 2 3 4 |
# initialize a worker in the process pool def initialize_worker(): # report a message print('Initializing worker...', flush=True) |
Next, we can create a process pool and configure it to use our initialize_worker() function to initialize each worker process.
We will create the process pool using the context manager interface and configure it to create two worker processes.
1 2 3 4 |
... # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # ... |
Next, we will issue four tasks to the process pool to be executed asynchronously. There are only two workers, so only two tasks will be executed at a time.
1 2 3 4 |
... # issue tasks to the process pool for _ in range(4): pool.apply_async(task) |
Finally, we will close down the process pool and wait for all tasks to complete.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of initializing worker processes in the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print('Worker executing task...', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def initialize_worker(): # report a message print('Initializing worker...', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first creates and configures the process pool.
Tasks are issued to the pool and worker processes are created as needed to execute the tasks.
After the worker processes are created they are initialized, then start executing the issued tasks.
Importantly, each worker process is initialized once and only before it begins consuming and completing tasks in the pool.
1 2 3 4 5 6 |
Initializing worker... Worker executing task... Initializing worker... Worker executing task... Worker executing task... Worker executing task... |
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Thread and Process Names that Initialize
It may be helpful to know which process and which thread in that process is used to initialize the worker processes.
For example, if the same thread is used to initialize the worker process as is used to execute tasks in the worker process, we can share data across tasks using mechanisms such as thread-local data and/or global variables.
We can get the current process and current thread in the initialization function and report their name. The same can be done in the task function executed by worker process, and we can see if the names match.
This can be achieved using the multiprocessing.current_process() and threading.current_thread() functions.
We can update the example in the previous section to first get the current process and thread, then report their names.
Firstly, we can update the task function executed by worker processes to report the current process and thread names.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Worker executing task, process={process.name}, thread={thread.name}', flush=True) # block for a moment sleep(1) |
Next, we can update the worker process initialization function to report the process and thread names.
1 2 3 4 5 6 7 8 |
# initialize a worker in the process pool def initialize_worker(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Initializing worker, process={process.name}, thread={thread.name}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of initializing workers in a process pool and reporting threads and processes from time import sleep from multiprocessing.pool import Pool from multiprocessing import current_process from threading import current_thread # task executed in a worker process def task(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Worker executing task, process={process.name}, thread={thread.name}', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def initialize_worker(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Initializing worker, process={process.name}, thread={thread.name}', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example creates the thread pool and configures it so that worker processes are initialized using our custom function, as before.
Four tasks are issued to the process pool and the main process waits for all tasks to complete.
THe name of the worker process and thread within the process is reported both when initializing the workers and in executing the task.
Interestingly, we can see that the same process that executes the task is used to initialize the worker and the same thread is used in each case.
For example the first worker process has the name “SpawnPoolWorker-1” and executes both the initialization function and the task function in the main thread named “MainThread” of the process.
This highlights that we can use mechanisms like global variables and thread-local storage in worker processes to share data across tasks executed by a worker process.
1 2 3 4 5 6 |
Initializing worker, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Initializing worker, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Accessing an Initialized Variable in a Worker
Perhaps one of the most common uses of the Pool initializer function is initialize a variable that is made available to the worker.
This variable may be passed into the initializer or may be a resource that is prepared in the initializer.
We can prepare a variable in the initializer function and share it with the worker process by making it a global variable.
Recall that each worker process is initialized with the initializer function. As we have seen in previous examples, the initialization occurs within the worker process.
Therefore defining a global variable in the initializer function will make it available to the worker process.
This can be achieved by first defining a variable in the custom initializer function as global, then assigning it a value.
For example:
1 2 3 4 5 |
def custom_init(): # define a global variable in this process global data # assign data to the global variable data = 100 |
Now that we know how to share an initialized variable with a worker, let’s look at a worked example.
In this example, we will pass in an object to the initializer function and store it in a global variable in the worker process. This will make it available to all worker processes.
This is the approach to share a multiprocessing.Event object with all workers in order to stop them gracefully as seen in this tutorial:
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of sharing data with all workers via the initializer from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # declare global variable global custom_data # report a message with global variable print(f'Worker executing with: {custom_data}', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def worker_init(custom): # declare global variable global custom_data # assign the global variable custom_data = custom # report a message print(f'Initializing worker with: {custom_data}', flush=True) # protect the entry point if __name__ == '__main__': # define data to share with all workers data = 'Global State' # create and configure the process pool with Pool(2, initializer=worker_init, initargs=(data,)) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join() |
Running the example first defines the global state.
Next, the Pool is created and configured with two workers, and the custom worker initializer function and state as an argument.
Each of the two works are initialized, declaring a global variable, storing the data in the global variable, then reporting the value.
Four tasks are then issued to the pool for execution.
Each worker executes the task, reporting the value of its own copy of the shared data.
The example highlights how a variable can be defined in the initializer and used in the worker, and how we can share data from the main process with each worker without passing it as an argument to the task function.
1 2 3 4 5 6 |
Initializing worker with: Global State Worker executing with: Global State Initializing worker with: Global State Worker executing with: Global State Worker executing with: Global State Worker executing with: Global State |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to initialize workers in the multiprocessing process pool in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by GEORGE DESIPRIS on Unsplash
Nicolas says
This article misses the main roadblock python developers have when trying to use the initializer : how to access a variable initialized in the initializer ?
Jason Brownlee says
Great point!
You can define a global variable in the initalizer function which is accessible to all threads in the worker process.
You can see an example of this here:
https://superfastpython.com/multiprocessing-pool-stop-all-tasks-on-failure/
I’ll update the above tutorial to demonstrate this. Thanks again!
UPDATE: I have added the example, I hope it helps. Please let me know if you have any questions about it.