Multiprocessing SimpleQueue in Python

May 28, 2022 Python Multiprocessing

You can communicate between processes with a queue via the multiprocessing.SimpleQueue class.

In this tutorial you will discover how to use the process SimpleQueue in Python.

Let's get started.

Need for a SimpleQueue

A process is a running instance of a computer program.

Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.

Sometimes we may need to create new child processes in our program in order to execute code concurrently.

Python provides the ability to create and manage new processes via the multiprocessing.Process class.

In multiprocessing programming, we often need to share data between processes.

One approach to sharing data is to use a queue data structure.

Python provides a number of process-safe queues, such as the multiprocessing.SimpleQueue class.

What is the Queue and how can we use it in Python?

How to Use the SimpleQueue

Python provides a simple queue in the multiprocessing.SimpleQueue class.

The "Simple" in the name of the class refers to it providing a minimal interface compared to other queues classes.

This minimal interface includes:

Next, let's look at how the SimpleQueue is different from the Queue class.

Differences From multiprocessing.Queue

Unlike the multiprocessing.Queue class, the multiprocessing.SimpleQueue does not provide the ability to limit the capacity of the queue and as such does not allow processes to block while putting items on the queue.

As such, it does not have any facility for checking on the size, fullness of the queue. It also does not have any capability for setting a timeout for blocking operations, like a get() or a put().

We can summarize these differences as follows:

Next, let's take a closer look at how to use the class interface.

SimpleQueue Interface

We can create a new instance of the queue via the constructor.

...
# create a new queue
queue = multiprocessing.SimpleQueue()

Items are added to the end of the queue and retrieved from the front of the queue in a first-in, first-out or FIFO order.

We can add items to the queue by calling the put() function.

For example:

...
# add an item to the queue
queue.put(item)

Items can be retrieved from the queue by calling the get() function.

...
# get an item from the queue
item = queue.get()

The call to get() will block until an item is available to be returned.

We can check if the queue does not have any items via the empty() function.

For example:

...
# check if the queue is empty
if queue.empty():
	# ...

Finally, if we are finished with the queue, we can call the close() function to release resources, such as internal worker processes.

For example:

...
# close the queue
queue.close()

Now that we know how to use the multiprocessing.SimpleQueue class, let's look at some worked examples.

Example of Using the SimpleQueue

We can explore how to use the multiprocessing.SimpleQueue class with a worked example.

In this example, we will create a producer process that will generate ten random numbers and put them on the queue. We will also create a consumer process that will get numbers from the queue and report their values.

The multiprocessing.SimpleQueue provides a simple way to allow these simple producer and consumer processes to share data with each other.

First, we can define the function to be executed by the producer process.

The task will iterate ten times in a loop. Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then block for that fraction of a second, then put the value on the queue.

Once the task is complete it will put the value None on the queue to signal to the consumer that there is no further work.

The producer() function below implements this by taking the queue instance as an argument.

# generate work
def producer(queue):
    print('Producer: Running', flush=True)
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block
        sleep(value)
        # add to the queue
        queue.put(value)
    # all done
    queue.put(None)
    print('Producer: Done', flush=True)

Next, we can define the function to be executed by the consumer processes.

The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.

If the item retrieved from the queue is the value None, then the task will break the loop and terminate the process. Otherwise, the value is reported.

The consumer() function below implements this and takes the queue instance as an argument.

# consume work
def consumer(queue):
    print('Consumer: Running', flush=True)
    # consume work
    while True:
        # get a unit of work
        item = queue.get()
        # check for stop
        if item is None:
            break
        # report
        print(f'>got {item}', flush=True)
    # all done
    print('Consumer: Done', flush=True)

Finally, in the main process we can create the shared queue instance.

...
# create the shared queue
queue = SimpleQueue()

We can then configure and start the consumer process, which will patiently wait for work to arrive on the queue.

...
# start the consumer
consumer_process = Process(target=consumer, args=(queue,))
consumer_process.start()

Then we can configure and start the producer process, which will generate work and add it to the queue for the consumer to retrieve.

...
# start the producer
producer_process = Process(target=producer, args=(queue,))
producer_process.start()

The main process will then block until both the producer and consumer processes terminate, then terminate itself.

...
# wait for all child processes to finish
producer_process.join()
consumer_process.join()

Tying this together, the complete example of using the SimpleQueue is listed below.

# SuperFastPython.com
# example of using the simple queue among processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import SimpleQueue

# generate work
def producer(queue):
    print('Producer: Running', flush=True)
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block
        sleep(value)
        # add to the queue
        queue.put(value)
    # all done
    queue.put(None)
    print('Producer: Done', flush=True)

# consume work
def consumer(queue):
    print('Consumer: Running', flush=True)
    # consume work
    while True:
        # get a unit of work
        item = queue.get()
        # check for stop
        if item is None:
            break
        # report
        print(f'>got {item}', flush=True)
    # all done
    print('Consumer: Done', flush=True)

# entry point
if __name__ == '__main__':
    # create the shared queue
    queue = SimpleQueue()
    # start the consumer
    consumer_process = Process(target=consumer, args=(queue,))
    consumer_process.start()
    # start the producer
    producer_process = Process(target=producer, args=(queue,))
    producer_process.start()
    # wait for all child processes to finish
    producer_process.join()
    consumer_process.join()

Running the example first creates the shared queue instance.

Next, the consumer process is started and passed the queue instance. Then the producer process is started and the main process blocks until the child processes terminate.

The producer process generates a new random value each iteration of the task, blocks and adds it to the queue. The consumer process waits on the queue for items to arrive, then consumes them one at a time, reporting their value.

Finally, the producer task finishes, a None value is put on the queue and the process terminates. The consumer process gets the None value, breaks its loop and also terminates.

This highlights how the multiprocessing.SimpleQueue can be used to share data easily between a producer and consumer processes.

A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.

Consumer: Running
Producer: Running
>got 0.8206242364576763
>got 0.4907919136677238
>got 0.21142315844448445
>got 0.15892072679219693
>got 0.038130416915268195
>got 0.7512743637475896
>got 0.03490291888924202
>got 0.17188372196447388
>got 0.6387632218666497
Producer: Done
>got 0.09321308658358085
Consumer: Done

Takeaways

You now know how to use the SimpleQueue for processes in Python.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.