Multiprocessing SimpleQueue in Python
You can communicate between processes with a queue via the multiprocessing.SimpleQueue class.
In this tutorial you will discover how to use the process SimpleQueue in Python.
Let's get started.
Need for a SimpleQueue
A process is a running instance of a computer program.
Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create new child processes in our program in order to execute code concurrently.
Python provides the ability to create and manage new processes via the multiprocessing.Process class.
In multiprocessing programming, we often need to share data between processes.
One approach to sharing data is to use a queue data structure.
Python provides a number of process-safe queues, such as the multiprocessing.SimpleQueue class.
What is the Queue and how can we use it in Python?
How to Use the SimpleQueue
Python provides a simple queue in the multiprocessing.SimpleQueue class.
The "Simple" in the name of the class refers to it providing a minimal interface compared to other queues classes.
This minimal interface includes:
- Close the queue and release resources via close().
- Check if empty via empty().
- Add an item to the queue via put().
- Get an item from the queue via get().
Next, let's look at how the SimpleQueue is different from the Queue class.
Differences From multiprocessing.Queue
Unlike the multiprocessing.Queue class, the multiprocessing.SimpleQueue does not provide the ability to limit the capacity of the queue and as such does not allow processes to block while putting items on the queue.
As such, it does not have any facility for checking on the size, fullness of the queue. It also does not have any capability for setting a timeout for blocking operations, like a get() or a put().
We can summarize these differences as follows:
- SimpleQueue is always unbounded, unlike Queue that can be configured to be bounded or unbounded.
- SimpleQueue does not offer qsize() or full() functions.
- SimpleQueue does not offer "block" or "timeout" arguments on get() and put().
- SimpleQueue does not offer put_nowait() and get_nowait() functions.
- SimpleQueue does not offer join_thread() and cancel_join_thread() functions.
Next, let's take a closer look at how to use the class interface.
SimpleQueue Interface
We can create a new instance of the queue via the constructor.
...
# create a new queue
queue = multiprocessing.SimpleQueue()
Items are added to the end of the queue and retrieved from the front of the queue in a first-in, first-out or FIFO order.
We can add items to the queue by calling the put() function.
For example:
...
# add an item to the queue
queue.put(item)
Items can be retrieved from the queue by calling the get() function.
...
# get an item from the queue
item = queue.get()
The call to get() will block until an item is available to be returned.
We can check if the queue does not have any items via the empty() function.
For example:
...
# check if the queue is empty
if queue.empty():
# ...
Finally, if we are finished with the queue, we can call the close() function to release resources, such as internal worker processes.
For example:
...
# close the queue
queue.close()
Now that we know how to use the multiprocessing.SimpleQueue class, let's look at some worked examples.
Example of Using the SimpleQueue
We can explore how to use the multiprocessing.SimpleQueue class with a worked example.
In this example, we will create a producer process that will generate ten random numbers and put them on the queue. We will also create a consumer process that will get numbers from the queue and report their values.
The multiprocessing.SimpleQueue provides a simple way to allow these simple producer and consumer processes to share data with each other.
First, we can define the function to be executed by the producer process.
The task will iterate ten times in a loop. Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then block for that fraction of a second, then put the value on the queue.
Once the task is complete it will put the value None on the queue to signal to the consumer that there is no further work.
The producer() function below implements this by taking the queue instance as an argument.
# generate work
def producer(queue):
print('Producer: Running', flush=True)
# generate work
for i in range(10):
# generate a value
value = random()
# block
sleep(value)
# add to the queue
queue.put(value)
# all done
queue.put(None)
print('Producer: Done', flush=True)
Next, we can define the function to be executed by the consumer processes.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the process. Otherwise, the value is reported.
The consumer() function below implements this and takes the queue instance as an argument.
# consume work
def consumer(queue):
print('Consumer: Running', flush=True)
# consume work
while True:
# get a unit of work
item = queue.get()
# check for stop
if item is None:
break
# report
print(f'>got {item}', flush=True)
# all done
print('Consumer: Done', flush=True)
Finally, in the main process we can create the shared queue instance.
...
# create the shared queue
queue = SimpleQueue()
We can then configure and start the consumer process, which will patiently wait for work to arrive on the queue.
...
# start the consumer
consumer_process = Process(target=consumer, args=(queue,))
consumer_process.start()
Then we can configure and start the producer process, which will generate work and add it to the queue for the consumer to retrieve.
...
# start the producer
producer_process = Process(target=producer, args=(queue,))
producer_process.start()
The main process will then block until both the producer and consumer processes terminate, then terminate itself.
...
# wait for all child processes to finish
producer_process.join()
consumer_process.join()
Tying this together, the complete example of using the SimpleQueue is listed below.
# SuperFastPython.com
# example of using the simple queue among processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import SimpleQueue
# generate work
def producer(queue):
print('Producer: Running', flush=True)
# generate work
for i in range(10):
# generate a value
value = random()
# block
sleep(value)
# add to the queue
queue.put(value)
# all done
queue.put(None)
print('Producer: Done', flush=True)
# consume work
def consumer(queue):
print('Consumer: Running', flush=True)
# consume work
while True:
# get a unit of work
item = queue.get()
# check for stop
if item is None:
break
# report
print(f'>got {item}', flush=True)
# all done
print('Consumer: Done', flush=True)
# entry point
if __name__ == '__main__':
# create the shared queue
queue = SimpleQueue()
# start the consumer
consumer_process = Process(target=consumer, args=(queue,))
consumer_process.start()
# start the producer
producer_process = Process(target=producer, args=(queue,))
producer_process.start()
# wait for all child processes to finish
producer_process.join()
consumer_process.join()
Running the example first creates the shared queue instance.
Next, the consumer process is started and passed the queue instance. Then the producer process is started and the main process blocks until the child processes terminate.
The producer process generates a new random value each iteration of the task, blocks and adds it to the queue. The consumer process waits on the queue for items to arrive, then consumes them one at a time, reporting their value.
Finally, the producer task finishes, a None value is put on the queue and the process terminates. The consumer process gets the None value, breaks its loop and also terminates.
This highlights how the multiprocessing.SimpleQueue can be used to share data easily between a producer and consumer processes.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
Consumer: Running
Producer: Running
>got 0.8206242364576763
>got 0.4907919136677238
>got 0.21142315844448445
>got 0.15892072679219693
>got 0.038130416915268195
>got 0.7512743637475896
>got 0.03490291888924202
>got 0.17188372196447388
>got 0.6387632218666497
Producer: Done
>got 0.09321308658358085
Consumer: Done
Takeaways
You now know how to use the SimpleQueue for processes in Python.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.