Multiprocessing JoinableQueue on Python

May 29, 2022 Python Multiprocessing

You can communicate between processes with a queue via the multiprocessing.JoinableQueue class.

In this tutorial you will discover how to use the process JoinableQueue in Python.

Let's get started.

Need for a JoinableQueue

A process is a running instance of a computer program.

Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.

Sometimes we may need to create new child processes in our program in order to execute code concurrently.

Python provides the ability to create and manage new processes via the multiprocessing.Process class.

In multiprocessing programming, we often need to share data between processes.

One approach to sharing data is to use a queue data structure.

A problem when using queues is knowing when all items in the queue have been processed by consumer processes.

How can we know when all items have been processed in a queue?

Why Care When All Tasks Are Done

There are many reasons why a process may want to know when all tasks in a queue have been processed.

For example:

There are two aspects to this, they are:

Specifically, waiting means that the process is blocked until the condition is met.

The condition of all tasks being processed is not only the case that all items put on the queue have been retrieved, but have been processed by the process that retrieved them.

Next, let’s look at how we might do this in Python.

How to Use the JoinableQueue

Python provides a simple queue in the multiprocessing.JoinableQueue class.

The multiprocessing.JoinableQueue class extends the multiprocessing.Queue class. This means it provides the same functionality, such as adding items to the queue via put() and getting items from the queue via get().

You can learn more about this interface on multiprocessing.Queue in the tutorial:

The multiprocessing.JoinableQueue class offers two additional methods for joining a queue and marking items on the queue as done.

Let's take a closer look at this added capability.

An object added to a queue by representing a task or a unit of work.

When a consumer process calls get() to retrieve the item from the queue, it may need to do additional work to it before the task is considered complete.

Once complete, the process may then call the task_done() method on the queue to indicate that the item that was just retrieved has been completely processed.

For example:

...
# get a task
item = queue.get()
# process it
# ...
# mark the task as completed
queue.task_done()

This is helpful to other processes that may be interested to know once all tasks that have been added to the queue have been completed.

Other processes can wait for all tasks currently on the queue to be completed by calling the join() function.

For example:

...
# wait for all current tasks on the queue to be marked as done
queue.join()

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

-- multiprocessing — Process-based parallelism

If the task_done() function is called more times than there are items on the queue, then a ValueError will be raised to indicate the invalid state.

If the join() function is called after all tasks have been marked done, then the function will return immediately.

Note, this capability is also available on queue.Queue for threads instead of child processes. Learn more in the tutorial:

Now that we know how to use the multiprocessing.JoinableQueue, let's look at some worked examples.

Example of Using the JoinableQueue

We can explore an example of how to use join() and task_done() on the JoinableQueue.

In this example we will have a producer process that will add ten tasks to the queue and then signal that no further tasks are to be expected. The consumer process will get the tasks, process them and mark them as done. When the signal to exit is received, the consumer process will terminate. The main process will wait for the producer process to add all items to the queue, and will wait for all items on the queue to be processed before moving on.

Let's dive in.

Producer Process

First, we can define the function to be executed by the producer process.

The task will iterate ten times in a loop.

...
print('Producer starting', flush=True)
# add tasks to the queue
for i in range(10):
	# ...

Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then pair the generated value with an integer timestamp from 0 to 9 into a tuple and put the value on the queue.

...
# generate a task
    task = (i, random())
    print(f'.producer added {task}', flush=True)
    # add it to the queue
    queue.put(task)

Finally, the producer will put the value None on the queue to signal to the consumer that there are no further tasks.

This is called a Sentinel Value and is a common way for processes to communicate via queues to signal an important event, like a shutdown.

...
# send a signal that no further tasks are coming
queue.put(None)
print('Producer finished', flush=True)

The producer() function below implements this by taking the queue instance as an argument.

# task for the producer process
def producer(queue):
    print('Producer starting', flush=True)
    # add tasks to the queue
    for i in range(10):
        # generate a task
        task = (i, random())
        print(f'.producer added {task}', flush=True)
        # add it to the queue
        queue.put(task)
    # send a signal that no further tasks are coming
    queue.put(None)
    print('Producer finished', flush=True)

Consumer Process

Next, we can define the function to be executed by the consumer process.

The consumer process will loop forever.

...
print('Consumer starting', flush=True)
# process items from the queue
while True:
    #...

Each iteration, it will get an item from the queue and block if there is no item yet available.

...
# get a task from the queue
task = queue.get()

If the item retrieved from the queue is the value None, then the task will break the loop and terminate the process.

...
# check for signal that we are done
if task is None:
    break

Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().

...
# process the item
sleep(task[1])
print(f'.consumer got {task}', flush=True)
# mark the unit of work as processed
queue.task_done()

Finally, just prior to the process exiting, it will mark the signal to terminate as processed.

...
# mark the signal as processed
queue.task_done()
print('Consumer finished', flush=True)

The consumer() function below implements this and takes the queue instance as an argument.

# task for the consumer process
def consumer(queue):
    print('Consumer starting', flush=True)
    # process items from the queue
    while True:
        # get a task from the queue
        task = queue.get()
        # check for signal that we are done
        if task is None:
            break
        # process the item
        sleep(task[1])
        print(f'.consumer got {task}', flush=True)
        # mark the unit of work as processed
        queue.task_done()
    # mark the signal as processed
    queue.task_done()
    print('Consumer finished', flush=True)

Create Queue and Child Processes

In the main process we can create the shared joinable queue instance.

...
# create the shared queue
queue = JoinableQueue()

Then we can configure and start the producer process, which will generate tasks and add them to the queue for the consumer to retrieve.

...
# create and start the producer process
producer_process = Process(target=producer, args=(queue,))
producer_process.start()

We can then configure and start the consumer process, which will patiently wait for work to arrive on the queue.

...
# create and start the consumer process
consumer_process = Process(target=consumer, args=(queue,))
consumer_process.start()

The main process will then block until the producer process has added all work to the queue and the process has terminated.

...
# wait for the producer to finish
producer_process.join()
print('Main found that the producer has finished', flush=True)

The main process will then block on the queue with a call to join() until the consumer has retrieved all values from the queue and processed them appropriately. This includes the final signal that there are no further task items to process.

...
# wait for the queue to empty
queue.join()
print('Main found that all tasks are processed', flush=True)

It is important that the main process blocks on the producer process first, before blocking on the queue. This is to avoid a possible race condition.

For example, if the main process blocked on the queue directly, it is possible that at that time for the queue to be empty, in which case the call would return immediately. Alternatively, it may join at a time when there are only a few tasks on the queue, they are consumed by the consumer process and the join call returns.

The problem is that in both of these cases, we don’t know if the call to join returned because all tasks were marked done or just a subset of tasks that had been added to the queue at the time join was called.

By waiting for the producer process to terminate first, we know that all tasks that could be added to the queue have been added to the queue. By blocking on the queue after the producer has finished, we know that when the call to join returns, that all tasks were added to the queue, all tasks were retrieved from the queue and all retrieved tasks were marked as done.

Complete Example

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of join and task done for a joinable queue with processes
from time import sleep
from random import random
from multiprocessing import JoinableQueue
from multiprocessing import Process

# task for the producer process
def producer(queue):
    print('Producer starting', flush=True)
    # add tasks to the queue
    for i in range(10):
        # generate a task
        task = (i, random())
        print(f'.producer added {task}', flush=True)
        # add it to the queue
        queue.put(task)
    # send a signal that no further tasks are coming
    queue.put(None)
    print('Producer finished', flush=True)

# task for the consumer process
def consumer(queue):
    print('Consumer starting', flush=True)
    # process items from the queue
    while True:
        # get a task from the queue
        task = queue.get()
        # check for signal that we are done
        if task is None:
            break
        # process the item
        sleep(task[1])
        print(f'.consumer got {task}', flush=True)
        # mark the unit of work as processed
        queue.task_done()
    # mark the signal as processed
    queue.task_done()
    print('Consumer finished', flush=True)

# entry point
if __name__ == '__main__':
    # create the shared queue
    queue = JoinableQueue()
    # create and start the producer process
    producer_process = Process(target=producer, args=(queue,))
    producer_process.start()
    # create and start the consumer process
    consumer_process = Process(target=consumer, args=(queue,))
    consumer_process.start()
    # wait for the producer to finish
    producer_process.join()
    print('Main found that the producer has finished', flush=True)
    # wait for the queue to empty
    queue.join()
    print('Main found that all tasks are processed', flush=True)

Running the example first creates the queue to be shared between the producer and consumer processes.

Then the producer process is created and configured to execute our producer() function and passed in the shared queue instance. The producer process is then started.

Next, the consumer process is configured to execute our consumer() function and is then started.

The producer process runs as fast as it is able, each iteration generating a random number and adding it along with the task number to the queue. Once all ten tasks have been added to the queue, a None message is sent to the queue to indicate no further messages are to be expected and the producer process terminates.

The main process notices that the producer process has terminated, then blocks on the queue itself waiting for all tasks to be retrieved and marked as done.

The consumer process retrieves tasks one at a time from the queue, blocks and reports their value, then marks the task as done. This is repeated until the sentinel message is received and the loop is broken. Just before the consumer process is terminated, the final sentinel message is marked as done.

The main process notices that all tasks were marked done and is then free to continue on with other tasks. In this case, the main process reports a message and terminates.

A sample of the output from the program is listed below. Note, your specific results will differ given the use of random numbers.

In this case, we can see that the producer ran first, adding all tasks to the queue. Then the consumer process ran, reading off each value. After all tasks are marked done, finally the main process continues on.

This highlights how to use task_done() and join() on a process queue.

Producer starting
.producer added (0, 0.17191334178951057)
Consumer starting
.producer added (1, 0.7281669000124695)
.producer added (2, 0.8835116735385814)
.producer added (3, 0.3613833018911995)
.producer added (4, 0.6356607291920195)
.producer added (5, 0.8971417071906157)
.producer added (6, 0.424420704274559)
.producer added (7, 0.017250244327121078)
.producer added (8, 0.35167161248303724)
.producer added (9, 0.5265211005357712)
Producer finished
Main found that the producer has finished
.consumer got (0, 0.17191334178951057)
.consumer got (1, 0.7281669000124695)
.consumer got (2, 0.8835116735385814)
.consumer got (3, 0.3613833018911995)
.consumer got (4, 0.6356607291920195)
.consumer got (5, 0.8971417071906157)
.consumer got (6, 0.424420704274559)
.consumer got (7, 0.017250244327121078)
.consumer got (8, 0.35167161248303724)
.consumer got (9, 0.5265211005357712)
Consumer finished
Main found that all tasks are processed

Takeaways

You now know how to share data among processes with the JoinableQueue in Python.



If you enjoyed this tutorial, you will love my book: Python Multiprocessing Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.