Last Updated on October 8, 2022
You can use a Manager to create a hosted Queue object and share it via proxy objects with multiple child processes.
In this tutorial you will discover how to share a queue using a manager in Python.
Let’s get started.
Need Manager to Share a Queue
A manager in the multiprocessing module provides a way to create Python objects that can be shared easily between processes.
Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
— multiprocessing — Process-based parallelism
A manager creates a server process that hosts a centralized version of objects that can be shared among multiple processes.
The objects are not shared directly. Instead, the manager creates a proxy object for each object that it manages and the proxy objects are shared among processes.
The proxy objects are used and operate just like the original objects, except that they serialize data, synchronize and coordinate with the centralized version of the object hosted in the manager server process.
A proxy is an object which refers to a shared object which lives (presumably) in a different process. […] A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy).
— multiprocessing — Process-based parallelism
This makes managers a process-safe and preferred way to share Python objects among processes.
You can learn more about multiprocessing managers in the tutorial:
When using multiprocessing, we may need to share a Queue with child processes.
This may be for many reasons, such as:
- Child processes need to put results to the queue.
- Child processes need to get results from the queue.
We may not be able to share the Queue directly with the child processes.
This may be because a queue object cannot be pickled, and we may have to pickle objects when sending them to a child process such as via a Pipe, via another Queue or as an argument to a task to the Pool class.
How can we use a multiprocessing Manager to share a Queue with child processes?
Run loops using all CPUs, download your FREE book to learn how.
Manager for Sharing a Sharing Queue
Python provides a process-safe queue in the multiprocessing.Queue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The multiprocessing.Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.
You can learn more about how to use the process-safe Queue in the tutorial:
Managers are provided via the multiprocessing.Manager class, which creates and returns a multiprocessing.managers.SyncManager instance.
The SyncManager allows a suite of Python objects to be created and managed by default, including a Queue.
Specifically the SyncManager provides two registered Queue objects, they are:
- Queue
- JoinableQueue
As such, we can create a Manager instance and use it directly to create a Queue object.
This will create a hosted version of the Queue in the Manager‘s server process and return proxy objects for the Queue that can be pickled and shared with child processes.
For example:
1 2 3 4 5 |
# create the manager with Manager() as manager: # create a shared queue queue = manager.Queue() # ... |
From reviewing the source code for the SyncManager class, we can see in fact that a queue.Queue is created and hosted in the Manager’s server process. This means that all process safety is handled via the proxy objects that are returned.
Now that we know how to use a manager to create a shared Queue, let’s look at a worked example.
Example of Using a Manager for Sharing a Queue
We can use a manager to share a Queue safely with multiple processes.
In this example we will define a task that generates some data and puts it on the shared queue. We will then execute the target from many child processes in parallel. The main process will get data from the queue until the expected number of results have been received.
Firstly, we can define a function to execute in child processes.
The function will take an integer number argument and the shared queue. It will then generate a random number between 0 and 1, block for a fraction of a second, then put the integer argument and generated number on the shared queue as a tuple.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task executed in a new process def task(number, shared_queue): # generate a number between 0 and 1 value = random() # block for a fraction of a second sleep(value) # add the value to the queue shared_queue.put((number, value)) |
Next, in the main process, we will create and start a Manager instance using the context manager interface.
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
We can then use the manager to create a shared queue.
The Queue object will be hosted in the manager server process and returns proxy objects that can be shared among child processes, can be pickled if needed, and used to interact with the Queue in a process-safe manner.
1 2 3 |
... # create the shared queue shared_queue = manager.Queue() |
Next, we can create many multiprocessing.Process instances configured to run our task() function and pass an integer and the shared Queue object as arguments.
This can be achieved in a list comprehension that will create a list of configured Process instances.
1 2 3 4 |
... # create many child processes n_tasks = 50 processes = [Process(target=task, args=(i, shared_queue)) for i in range(n_tasks)] |
The main process can then iterate through the processes and start each in turn.
1 2 3 4 |
... # start all processes for process in processes: process.start() |
The main process will then loop a fixed number of times to retrieve the expected number of task results. Each iteration, a result will be retrieved from the shared queue and reported.
If no result is ready, the main process will block, waiting for a task result to be made available by a child process.
1 2 3 4 5 6 7 |
... # read data from the queue for _ in range(n_tasks): # get an item from the queue item = shared_queue.get() # report the item print(f'> got {item}') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# SuperFastPython.com # example of using the manager to create a shared queue from time import sleep from random import random from multiprocessing import Process from multiprocessing import Manager # task executed in a new process def task(number, shared_queue): # generate a number between 0 and 1 value = random() # block for a fraction of a second sleep(value) # add the value to the queue shared_queue.put((number, value)) # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared queue shared_queue = manager.Queue() # create many child processes n_tasks = 50 processes = [Process(target=task, args=(i, shared_queue)) for i in range(n_tasks)] # start all processes for process in processes: process.start() # read data from the queue for _ in range(n_tasks): # get an item from the queue item = shared_queue.get() # report the item print(f'> got {item}') |
Running the example first creates the manager using the context manager interface.
Next, the manager is used to create the shared Queue object.
This creates a centralized version of the Queue on the Manager’s server process and returns proxy objects for interacting with the Queue.
Next, 50 child processes are created and configured to call our custom task() function with the shared Queue, then the processes are started. The main process then blocks waiting to retrieve results from the queue.
Each process generates a random number, blocks for a fraction of a second then puts its result on the shared queue.
As results are put on the queue, the main process gets the results one at a time and reports them. This continues until the expected number of results is retrieved.
Note: The output will vary each time the program is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
> got (5, 0.0552169162342705) > got (9, 0.07825637315262579) > got (6, 0.10157886972748231) > got (29, 0.04139268238075278) > got (15, 0.07605744573406148) > got (32, 0.01199711633027356) > got (22, 0.3369483192790511) > got (16, 0.18600321245074625) > got (34, 0.09202394836028704) > got (46, 0.147466011593676) > got (36, 0.1530744506112982) > got (2, 0.5681469000437313) > got (33, 0.19404736754609286) > got (38, 0.19805999708181476) > got (41, 0.196408968294976) > got (37, 0.34048036860699915) > got (17, 0.5126272212510875) > got (1, 0.6880499703640428) > got (8, 0.643672408020424) > got (14, 0.33535288895002646) > got (42, 0.34655067813560403) > got (7, 0.44115086547035043) > got (10, 0.4631407835930157) > got (25, 0.46771235872701966) > got (0, 0.7437980873768921) > got (26, 0.4759230001253324) > got (11, 0.5665834898045142) > got (45, 0.46254549270060463) > got (28, 0.7379418161205183) > got (13, 0.5514031371486676) > got (21, 0.6988164598056216) > got (31, 0.5547229541997934) > got (30, 0.6104076189216526) > got (49, 0.5769510396121789) > got (35, 0.5840033206921457) > got (40, 0.6007083881076727) > got (3, 0.9221122110996065) > got (48, 0.6091882244335914) > got (23, 0.9072243486332091) > got (44, 0.6766206739116987) > got (20, 0.8721542709017209) > got (24, 0.7844127332179919) > got (19, 0.8072288535642465) > got (4, 0.9667084556246381) > got (43, 0.7920153782223315) > got (27, 0.8396040191905791) > got (12, 0.8785249373545645) > got (18, 0.8797470817908314) > got (39, 0.8449071205855895) > got (47, 0.9074657897891643) |
This example highlights how we might use a Manager to create a hosted Queue object that can be shared with many child processes.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to share a queue using a manager in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by John Cameron on Unsplash
Muhammad Gulfam says
This way one might be able to use the Manager Queue to retrieve results from child processes in parent process, but will we be able to track which process returned what?
If I have two different child processes and they both are using the same Queue to return the results to parent process. How will i track which while is returning the first result and which is returning the second?
Thanks
Jason Brownlee says
Good question.
You could have the child process include its PID in the result object. Or you could have some aspect of the task included in the result, like a task id or the task data.