Last Updated on September 12, 2022
You can share a global variable with all child workers processes in the multiprocessing pool by defining it in the worker process initialization function.
In this tutorial you will discover how to share global variables with all workers in the Python process pool.
Let’s get started.
Need To Share Global Variable With All Workers in Process Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A process pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the process pool, we may need to share a global variable with all child worker processes in the process pool.
This would allow all tasks executed in the process pool to use the shared global variable.
We may need this capability for many reasons, such as:
- Allow all tasks to use a shared log.
- Allow all tasks to use a shared queue or pipe.
- Allow all tasks to use a shared synchronization primitive like a lock, semaphore, or event.
The process pool does not provide this capability.
How can we share a global variable with all child worker processes?
Or, put another way:
How can a shared global variable be accessed by all tasks executed by the process pool in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Share a Global Variable With All Workers
We can share a global variable with all child process workers in the process pool.
This can be achieved by configuring the process pool to initialize each worker process using a custom function.
For example:
1 2 3 |
... # create a process pool with custom initialization pool = Pool(initializer=init_worker, initargs=(data,)) |
The global variable data required by each child worker process can be passed as an argument to the initialization function. It can then be stored in a global variable. This will make it available to each child worker process.
Recall, declaring a variable “global” in a function will define a global variable for the process, rather than a local variable for the function.
You may also recall that the worker initialization function is executed by the main thread of each new worker process. Therefore, a global variable defined in the initialization function will be available to the process later.
For example:
1 2 3 4 5 6 |
# initialize worker processes def init_worker(data): # declare scope of a new global variable global shared_data # store argument in the global variable for this process shared_data = data |
Because each child worker process in the process pool will be initialized using the same function, the global variable or (or variables) will be accessible by all child worker processes in the process pool.
This means that any tasks executed in the process pool can access the global variable, such as custom functions executed as tasks in the process pool.
For example:
1 2 3 4 |
# task executed in a worker process def task(): # access the global variable print(shared_data) |
You can learn more about configuring the child worker process initialization function in the tutorial:
Now that we know how to share global variables with all worker processes, let’s look at a worked example.
Example of Sharing a Global Variable With All Workers
We can explore how to share a global variable with all child worker processes.
In this example, we will define a shared multiprocessing queue. We will then share this queue with each child worker process via its initialization function. Each child worker will store reference to the queue in a global variable so that all tasks executed by each worker can access it. We will then execute tasks in the process pool that put task results into the shared queue. The main process will then read results as they become available via the shared queue.
Firstly, we need to define the custom function used to initialize the child worker processes.
The initialization function must take the shared queue as an argument. It will then declare a new global variable for the child process and store a reference to the shared queue in the global variable.
The init_worker() function below implements this.
1 2 3 4 5 6 |
# initialize worker processes def init_worker(shared_queue): # declare scope of a new global variable global queue # store argument in the global variable for this process queue = shared_queue |
Next, we can define a custom task function to execute in the process pool.
The task function will take an integer identifier as a number as an argument. It will then generate a number between 0 and 5, then block for that many seconds to simulate a variable amount of computational effort. Finally, it will send the generated number and integer identifier as a tuple into the shared queue.
The task() function below implements this.
Note that we explicitly define the scope of the queue global variable. This is technically not required, but I believe it helps make the code more readable.
1 2 3 4 5 6 7 8 9 10 |
# task executed in a worker process def task(identifier): # generate a value value = random() * 5 # block for a moment sleep(value) # declare scope of shared queue global queue # send result using shared queue queue.put((identifier, value)) |
Next, in the main process we can first create the shared multiprocessing queue. We will use a multiprocessing.SimpleQueue in this case.
1 2 3 |
... # create a shared queue shared_queue = SimpleQueue() |
You can learn more about the multiprocessing.SimpleQueue in the tutorial:
Next, we can create and configure the process pool.
In this case, we will configure it so that each worker process is initialized using our init_worker() custom initialization function and pass the shared queue as an argument.
We will use the context manager interface so that the process pool is closed for us automatically once we are finished with it.
1 2 3 4 |
... # create and configure the process pool with Pool(initializer=init_worker, initargs=(shared_queue,)) as pool: # ... |
You can learn more about the context manager interface in the tutorial:
Next, we will issue 10 calls to our custom task function asynchronously using the map_async() function.
1 2 3 |
... # issue tasks into the process pool _ = pool.map_async(task, range(10)) |
We will then consume the results of the tasks as they are available (e.g. simulating the imap_unordered() function). This can be achieved by iterating over the expected number of results and calling get() ob the shared queue for each task result.
1 2 3 4 5 |
... # read results from the queue as they become available for i in range(10): result = shared_queue.get() print(f'Got {result}', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of sharing a global variable among all workers from random import random from time import sleep from multiprocessing import SimpleQueue from multiprocessing.pool import Pool # initialize worker processes def init_worker(shared_queue): # declare scope of a new global variable global queue # store argument in the global variable for this process queue = shared_queue # task executed in a worker process def task(identifier): # generate a value value = random() * 5 # block for a moment sleep(value) # declare scope of shared queue global queue # send result using shared queue queue.put((identifier, value)) # protect the entry point if __name__ == '__main__': # create a shared queue shared_queue = SimpleQueue() # create and configure the process pool with Pool(initializer=init_worker, initargs=(shared_queue,)) as pool: # issue tasks into the process pool _ = pool.map_async(task, range(10)) # read results from the queue as they become available for i in range(10): result = shared_queue.get() print(f'Got {result}', flush=True) |
Running the example first creates the shared queue.
Next, the process pool is created and configured to use the custom initialization function.
Each worker process is created and started then initialized with the custom initialization function. Each worker creates a new global variable named “queue” and stores the passed in shared queue against the global variable. This makes “queue” available to all tasks executed by the worker process, and all worker processes are initialized the same way.
Next, 10 tasks are issued into the process pool.
The main process then iterates the 10 results, calling get() on the queue which will block and not return until a result is available.
Each task first generates a random number between 0 and 5, then blocks for that many seconds to simulate computational effort. The “queue” global variable for the process is declared explicitly, then accessed. The result for the task is put on the queue and the task completes.
Results are reported in the main process as they become available.
After all 10 results are retrieved from the shared queue, the main process continues on, automatically closing the process pool and then closing the application.
Note, the specific results will differ each time the program is run due to the use of random numbers.
1 2 3 4 5 6 7 8 9 10 |
Got (2, 0.38947694846648895) Got (3, 0.7665425799985037) Got (4, 1.6182597482880667) Got (6, 2.912364034686572) Got (7, 3.0557058569816458) Got (8, 2.6846243338785) Got (0, 3.589396223885189) Got (5, 4.1921930714219116) Got (1, 4.282642869898409) Got (9, 4.827317385371338) |
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to share global variables with all workers in the Python process pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jeremy Bezanger on Unsplash
Diego H says
Don’t you have to worry about deadlocks?
Jason Brownlee says
In some cases, yes, such as using a mutex lock to protect shared state or having workers waiting each other in a cycle.
You can learn more here:
https://superfastpython.com/multiprocessing-deadlock/
Wei Y. says
Does ProcessPoolExecutor support global variables like above?
Jason Brownlee says
Yes, I believe so. Try it and see.
Wei Y. says
ProcessPoolExecutor never worked for me when there were global variables. But, you are right – the way to make it work is to add global variables to the initializer. Thanks.