You can share a queue with workers in the pool using the fork start method and inheritance of global variables or by sharing proxy objects for a queue hosted using a manager.
In this tutorial, you will discover how to share a multiprocessing queue with tasks executed by child process workers in the multiprocessing pool in python.
Let’s get started.
Cannot Share a Queue with a Pool
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A multiprocessing pool can be configured when it is created, which will prepare the child workers.
We can issue one-off tasks to the process pool using functions such as Pool.apply() or we can apply the same function to an iterable of items using functions such as Pool.map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as Pool.apply_async() and Pool.map_async().
For more on the multiprocessing pool, see the tutorial:
A queue is a data structure where items can be added at one end and retrieved from the other in a first-in-first-out order, although other orderings are possible.
The multiprocessing.Queue class is process-safe meaning that multiple processes can add and remove items from the queue simultaneously without fear of race conditions, data loss, or data corruption.
You can learn more about the multiprocessing queue in the tutorial:
When issuing tasks to the pool, we may want to pass a shared queue as an argument.
This will fail with an error that includes the following message:
- Queue objects should only be shared between processes through inheritance
Therefore, we cannot share a multiprocessing queue with tasks executed in the multiprocessing pool directly.
How can we share a queue with tasks in the pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Share a Queue with a Pool
There are two ways that we can share a multiprocessing queue with tasks executed in a multiprocessing pool.
They are:
- Use the ‘fork’ start method and share the queue as an inherited global variable.
- Host the queue in a manager process and share the queue proxy objects via arguments.
Let’s take a closer look at each approach.
Share a Queue Via an Inherited Global Variable
When we share a queue via an argument to a task function executed by the multiprocessing Pool class, the error suggests using inheritance.
Inheritance refers to child processes that are forked from the parent process and get a copy of all or most of the memory of the parent process, including global variables.
A child process can be forked from a parent process using the ‘fork‘ start method, which can be configured at the start of a program via a call to the multiprocessing.set_start_method() function.
For example:
1 2 3 |
... # set the fork start method set_start_method('fork') |
This will ensure that all child processes created in the program will be forked copies of the parent process, including workers in the multiprocessing pool.
We can then define a global variable that refers to the shared queue.
For example:
1 2 3 4 5 6 |
... set_start_method('fork') # declare the global variable global queue # define the queue global variable queue = Queue() |
The multiprocessing pool can then be created as per normal and all child worker processes in the pool will inherit the “queue” global variable and be able to access the shared queue.
For example, a task function may declare the “queue” global variable before making use of it.
1 2 3 4 5 |
# task completed in a worker def task(): # declare the queue global variable global queue # ... |
And that’s it.
You can learn more about forked child processes inheriting global variables in the tutorial:
Share a Queue With a Manager
Another approach to sharing a queue with child worker processes in the Pool class is via a Manager.
A manager is a server process that can host Python objects and provide access to them for other processes via proxy objects.
The proxy objects handle all inter-process communication required for the other processes to interact with the hosted Python object.
A manager can be created and used via the context manager interface.
For example:
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
You can learn more about multiprocessing managers in the tutorial:
Once created, the manager can be used to create a hosted Queue object and will return proxy objects.
For example;
1 2 3 |
... # create the shared queue queue = manager.Queue() |
The queue proxy objects can then be safely shared with task functions executed in the multiprocessing pool as arguments.
You can learn more about creating and sharing a queue via a manager with child processes in the tutorial:
Now that we know how to share a queue with workers in the multiprocessing pool, let’s look at some worked examples.
Example of Error When Sharing a Queue with a Pool
Before we explore an example of sharing a queue with child worker processes, let’s look at the failure case.
In this example, we will attempt to share a multiprocessing queue directly with workers in the multiprocessing pool.
We expect that this will result in an error.
The task function tasks the queue as an argument generates a random number between 0 and 5, sleeps for that many seconds to simulate computational effort, then puts the generated number on the queue.
The main process creates the shared queue, then creates a Pool with 10 workers and issues 10 tasks to the pool, each providing access to the shared queue.
The main process then consumes the expected 10 numbers from the shared queue.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of sharing a queue with a pool that results in an error from random import random from time import sleep from multiprocessing import Pool from multiprocessing import Queue # task completed in a worker def task(queue): # generate some work data = random() * 5 # block to simulate work sleep(data) # put it on the queue queue.put(data) # protect the entry point if __name__ == '__main__': # define the shared queue queue = Queue() # create the pool of workers with Pool(10) as pool: # create a list of arguments, one for each call args = [queue for _ in range(10)] # execute the tasks in parallel pool.map(task, args) # consume all items for i in range(10): # get item from queue item = queue.get() # report item print(f'> got {item}') |
Running the example first creates the shared queue.
The Pool is then created and all 10 tasks are issued to the pool.
These fail immediately with a RuntimeError.
The message of the error indicates that a multiprocessing Queue should only be shared with child processes using inheritance, specifically by using the fork start method and an inherited global variable.
1 2 3 |
Traceback (most recent call last): ... RuntimeError: Queue objects should only be shared between processes through inheritance |
Next, let;s look at how we can share the queue via an inherited global variable.
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Example of Sharing a Queue with a Pool As a Global Variable
We can explore how to share a queue with child processes using an inherited global variable.
The example in the previous section can be updated to use the fork start method.
It is good practice to set the fork start method as the first line of the program if we wish all child processes to be created using this method.
1 2 3 |
... # set the fork start method set_start_method('fork') |
An alternate method would be to set the multiprocessing context for the pool that would set the start method that would only influence workers in the pool.
You can learn more about setting the multiprocessing context for the pool in the tutorial:
Next, we can define a global variable for the queue and assign it normally. I think it’s good to be explicit about these things.
1 2 3 4 5 |
... # declare the global variable global queue # define the queue global variable queue = Queue() |
The tasks can be issued as before, although without the queue argument, as an error will be raised.
Instead, the task function is updated to declare the global variable and then use it directly.
1 2 3 4 5 6 7 8 9 10 |
# task completed in a worker def task(): # declare the queue global variable global queue # generate some work data = random() * 5 # block to simulate work sleep(data) # put it on the queue queue.put(data) |
And that’s it.
Tying this together, the complete example is listed below.
Note, the ‘fork‘ start method is not supported on all platforms, e.g. I don’t think this example will run on the Windows platform.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# SuperFastPython.com # example of sharing a queue via the fork start method and a global variable from random import random from time import sleep from multiprocessing import Pool from multiprocessing import Queue from multiprocessing import set_start_method # task completed in a worker def task(): # declare the queue global variable global queue # generate some work data = random() * 5 # block to simulate work sleep(data) # put it on the queue queue.put(data) # protect the entry point if __name__ == '__main__': # set the fork start method set_start_method('fork') # declare the global variable global queue # define the queue global variable queue = Queue() # create the pool of workers with Pool(10) as pool: # issue tasks results = [pool.apply_async(task) for _ in range(10)] # wait for all the tasks to complete for result in results: result.wait() # consume all items for i in range(10): # get item from queue item = queue.get() # report item print(f'> got {item}') |
Running the example first sets the fork start method.
Next, the queue global variable is declared and defined.
The pool is started and all tasks are issued. The main process then waits for the tasks to complete.
Each task runs, declares the queue global variable, generates its number, sleeps, and puts its generated number on the queue.
The main process resumes after all tasks are completed and consumes the expected 10 generated numbers and reports each in turn.
This highlights how we can share a multiprocessing queue with tasks executed by child process workers in the multiprocessing pool using the inheritance of a global variable.
1 2 3 4 5 6 7 8 9 10 |
> got 0.12432051503051156 > got 0.2095435490376535 > got 0.26406409859466407 > got 1.337933891251271 > got 1.4197110269904367 > got 2.1694064382747005 > got 2.659180184099527 > got 4.131037733434911 > got 4.25445903944172 > got 4.782490715214807 |
Next, we will explore how to share a queue with workers in the pool via a manager.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Sharing a Queue with a Pool Via a Manager
We can explore how to share a queue with workers in the multiprocessing pool using a Manager.
In this example, we can update the first failing example to create and host the queue in a manager server process and share the queue proxy objects with tasks executed in the multiprocessing pool.
First, we will set the spawn start method, to ensure that the inheritance of global variables is not possible. This is not required at all, but it is explicitly shown that this option is not available in this example.
We can then create a manager and use it to create the shared queue.
1 2 3 4 5 |
... # create the manager with Manager() as manager: # create the shared queue queue = manager.Queue() |
And that’s it. The rest of the example unfolds as before.
The proxy object for the queue is shared with tasks executed in the multiprocessing pool as an argument and used directly.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of sharing a queue via a manager from random import random from time import sleep from multiprocessing import Pool from multiprocessing import Manager from multiprocessing import set_start_method # task completed in a worker def task(queue): # generate some work data = random() * 5 # block to simulate work sleep(data) # put it on the queue queue.put(data) # protect the entry point if __name__ == '__main__': # set the fork start method set_start_method('spawn') # create the manager with Manager() as manager: # create the shared queue queue = manager.Queue() # create the pool of workers with Pool(10) as pool: # create a list of arguments, one for each call args = [queue for _ in range(10)] # execute the tasks in parallel pool.map(task, args) # consume all items for i in range(10): # get item from queue item = queue.get() # report item print(f'> got {item}') |
Running the example first sets the process start method to ‘spawn‘, which means each child process is created anew and does not inherit anything from the parent process.
Next, the manager is created. This starts a server process for hosting Python objects.
A queue is created via the manager. It is hosted in the Manager’s server process and proxy objects are returned that allows parent and child processes alike to interact with it like a normal Python object.
The pool is started and tasks are executed. The queue proxy objects are provided to the tasks executed in the pool as per normal.
The tasks execute, sleep, and place their generated number on the queue.
Once all tasks are complete, the main process consumes all items from the queue and reports their values.
This highlights how we can share a multiprocessing queue with tasks executed in a multiprocessing pool using a manager and proxy objects.
1 2 3 4 5 6 7 8 9 10 |
> got 0.05443966396912636 > got 0.8187454080662726 > got 2.1131564921057504 > got 2.2639148461313874 > got 2.287475773829026 > got 2.4898694172287645 > got 3.3739494879733503 > got 3.5294822364629406 > got 4.359748607285485 > got 4.366403932616186 |
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to share a multiprocessing queue with tasks executed by child process workers in the multiprocessing pool in python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Nathan Trampe on Unsplash
Do you have any questions?