Multiprocessing Manager Share Queue in Python
You can use a Manager to create a hosted Queue object and share it via proxy objects with multiple child processes.
In this tutorial you will discover how to share a queue using a manager in Python.
Let's get started.
Need Manager to Share a Queue
A manager in the multiprocessing module provides a way to create Python objects that can be shared easily between processes.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
-- multiprocessing — Process-based parallelism
A manager creates a server process that hosts a centralized version of objects that can be shared among multiple processes.
The objects are not shared directly. Instead, the manager creates a proxy object for each object that it manages and the proxy objects are shared among processes.
The proxy objects are used and operate just like the original objects, except that they serialize data, synchronize and coordinate with the centralized version of the object hosted in the manager server process.
A proxy is an object which refers to a shared object which lives (presumably) in a different process. [...] A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy).
-- multiprocessing — Process-based parallelism
This makes managers a process-safe and preferred way to share Python objects among processes.
You can learn more about multiprocessing managers in the tutorial:
When using multiprocessing, we may need to share a Queue with child processes.
This may be for many reasons, such as:
- Child processes need to put results to the queue.
- Child processes need to get results from the queue.
We may not be able to share the Queue directly with the child processes.
This may be because a queue object cannot be pickled, and we may have to pickle objects when sending them to a child process such as via a Pipe, via another Queue or as an argument to a task to the Pool class.
How can we use a multiprocessing Manager to share a Queue with child processes?
Manager for Sharing a Sharing Queue
Python provides a process-safe queue in the multiprocessing.Queue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The multiprocessing.Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.
You can learn more about how to use the process-safe Queue in the tutorial:
Managers are provided via the multiprocessing.Manager class, which creates and returns a multiprocessing.managers.SyncManager instance.
The SyncManager allows a suite of Python objects to be created and managed by default, including a Queue.
Specifically the SyncManager provides two registered Queue objects, they are:
- Queue
- JoinableQueue
As such, we can create a Manager instance and use it directly to create a Queue object.
This will create a hosted version of the Queue in the Manager's server process and return proxy objects for the Queue that can be pickled and shared with child processes.
For example:
# create the manager
with Manager() as manager:
# create a shared queue
queue = manager.Queue()
# ...
From reviewing the source code for the SyncManager class, we can see in fact that a queue.Queue is created and hosted in the Manager's server process. This means that all process safety is handled via the proxy objects that are returned.
Now that we know how to use a manager to create a shared Queue, let's look at a worked example.
Example of Using a Manager for Sharing a Queue
We can use a manager to share a Queue safely with multiple processes.
In this example we will define a task that generates some data and puts it on the shared queue. We will then execute the target from many child processes in parallel. The main process will get data from the queue until the expected number of results have been received.
Firstly, we can define a function to execute in child processes.
The function will take an integer number argument and the shared queue. It will then generate a random number between 0 and 1, block for a fraction of a second, then put the integer argument and generated number on the shared queue as a tuple.
The task() function below implements this.
# task executed in a new process
def task(number, shared_queue):
# generate a number between 0 and 1
value = random()
# block for a fraction of a second
sleep(value)
# add the value to the queue
shared_queue.put((number, value))
Next, in the main process, we will create and start a Manager instance using the context manager interface.
...
# create the manager
with Manager() as manager:
# ...
We can then use the manager to create a shared queue.
The Queue object will be hosted in the manager server process and returns proxy objects that can be shared among child processes, can be pickled if needed, and used to interact with the Queue in a process-safe manner.
...
# create the shared queue
shared_queue = manager.Queue()
Next, we can create many multiprocessing.Process instances configured to run our task() function and pass an integer and the shared Queue object as arguments.
This can be achieved in a list comprehension that will create a list of configured Process instances.
...
# create many child processes
n_tasks = 50
processes = [Process(target=task, args=(i, shared_queue)) for i in range(n_tasks)]
The main process can then iterate through the processes and start each in turn.
...
# start all processes
for process in processes:
process.start()
The main process will then loop a fixed number of times to retrieve the expected number of task results. Each iteration, a result will be retrieved from the shared queue and reported.
If no result is ready, the main process will block, waiting for a task result to be made available by a child process.
...
# read data from the queue
for _ in range(n_tasks):
# get an item from the queue
item = shared_queue.get()
# report the item
print(f'> got {item}')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using the manager to create a shared queue
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Manager
# task executed in a new process
def task(number, shared_queue):
# generate a number between 0 and 1
value = random()
# block for a fraction of a second
sleep(value)
# add the value to the queue
shared_queue.put((number, value))
# protect the entry point
if __name__ == '__main__':
# create the manager
with Manager() as manager:
# create the shared queue
shared_queue = manager.Queue()
# create many child processes
n_tasks = 50
processes = [Process(target=task, args=(i, shared_queue)) for i in range(n_tasks)]
# start all processes
for process in processes:
process.start()
# read data from the queue
for _ in range(n_tasks):
# get an item from the queue
item = shared_queue.get()
# report the item
print(f'> got {item}')
Running the example first creates the manager using the context manager interface.
Next, the manager is used to create the shared Queue object.
This creates a centralized version of the Queue on the Manager's server process and returns proxy objects for interacting with the Queue.
Next, 50 child processes are created and configured to call our custom task() function with the shared Queue, then the processes are started. The main process then blocks waiting to retrieve results from the queue.
Each process generates a random number, blocks for a fraction of a second then puts its result on the shared queue.
As results are put on the queue, the main process gets the results one at a time and reports them. This continues until the expected number of results is retrieved.
Note: The output will vary each time the program is run given the use of random numbers.
> got (5, 0.0552169162342705)
> got (9, 0.07825637315262579)
> got (6, 0.10157886972748231)
> got (29, 0.04139268238075278)
> got (15, 0.07605744573406148)
> got (32, 0.01199711633027356)
> got (22, 0.3369483192790511)
> got (16, 0.18600321245074625)
> got (34, 0.09202394836028704)
> got (46, 0.147466011593676)
> got (36, 0.1530744506112982)
> got (2, 0.5681469000437313)
> got (33, 0.19404736754609286)
> got (38, 0.19805999708181476)
> got (41, 0.196408968294976)
> got (37, 0.34048036860699915)
> got (17, 0.5126272212510875)
> got (1, 0.6880499703640428)
> got (8, 0.643672408020424)
> got (14, 0.33535288895002646)
> got (42, 0.34655067813560403)
> got (7, 0.44115086547035043)
> got (10, 0.4631407835930157)
> got (25, 0.46771235872701966)
> got (0, 0.7437980873768921)
> got (26, 0.4759230001253324)
> got (11, 0.5665834898045142)
> got (45, 0.46254549270060463)
> got (28, 0.7379418161205183)
> got (13, 0.5514031371486676)
> got (21, 0.6988164598056216)
> got (31, 0.5547229541997934)
> got (30, 0.6104076189216526)
> got (49, 0.5769510396121789)
> got (35, 0.5840033206921457)
> got (40, 0.6007083881076727)
> got (3, 0.9221122110996065)
> got (48, 0.6091882244335914)
> got (23, 0.9072243486332091)
> got (44, 0.6766206739116987)
> got (20, 0.8721542709017209)
> got (24, 0.7844127332179919)
> got (19, 0.8072288535642465)
> got (4, 0.9667084556246381)
> got (43, 0.7920153782223315)
> got (27, 0.8396040191905791)
> got (12, 0.8785249373545645)
> got (18, 0.8797470817908314)
> got (39, 0.8449071205855895)
> got (47, 0.9074657897891643)
This example highlights how we might use a Manager to create a hosted Queue object that can be shared with many child processes.
Takeaways
You now know how to share a queue using a manager in Python.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.