Last Updated on September 12, 2022
You can communicate between processes with a queue via the multiprocessing.JoinableQueue class.
In this tutorial you will discover how to use the process JoinableQueue in Python.
Let’s get started.
Need for a JoinableQueue
A process is a running instance of a computer program.
Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create new child processes in our program in order to execute code concurrently.
Python provides the ability to create and manage new processes via the multiprocessing.Process class.
In multiprocessing programming, we often need to share data between processes.
One approach to sharing data is to use a queue data structure.
A problem when using queues is knowing when all items in the queue have been processed by consumer processes.
How can we know when all items have been processed in a queue?
Run loops using all CPUs, download your FREE book to learn how.
Why Care When All Tasks Are Done
There are many reasons why a process may want to know when all tasks in a queue have been processed.
For example:
- A producer process may want to wait until all work is done before adding new work.
- A producer process may want to wait for all tasks to be done before sending a shutdown signal.
- A main process may want to wait for all tasks to be done before terminating the program.
There are two aspects to this, they are:
- A process blocking until tasks are done.
- A task being done is more than being retrieved from the queue.
Specifically, waiting means that the process is blocked until the condition is met.
The condition of all tasks being processed is not only the case that all items put on the queue have been retrieved, but have been processed by the process that retrieved them.
Next, let’s look at how we might do this in Python.
How to Use the JoinableQueue
Python provides a simple queue in the multiprocessing.JoinableQueue class.
The multiprocessing.JoinableQueue class extends the multiprocessing.Queue class. This means it provides the same functionality, such as adding items to the queue via put() and getting items from the queue via get().
You can learn more about this interface on multiprocessing.Queue in the tutorial:
The multiprocessing.JoinableQueue class offers two additional methods for joining a queue and marking items on the queue as done.
Let’s take a closer look at this added capability.
An object added to a queue by representing a task or a unit of work.
When a consumer process calls get() to retrieve the item from the queue, it may need to do additional work to it before the task is considered complete.
Once complete, the process may then call the task_done() method on the queue to indicate that the item that was just retrieved has been completely processed.
For example:
1 2 3 4 5 6 7 |
... # get a task item = queue.get() # process it # ... # mark the task as completed queue.task_done() |
This is helpful to other processes that may be interested to know once all tasks that have been added to the queue have been completed.
Other processes can wait for all tasks currently on the queue to be completed by calling the join() function.
For example:
1 2 3 |
... # wait for all current tasks on the queue to be marked as done queue.join() |
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
— multiprocessing — Process-based parallelism
If the task_done() function is called more times than there are items on the queue, then a ValueError will be raised to indicate the invalid state.
If the join() function is called after all tasks have been marked done, then the function will return immediately.
Note, this capability is also available on queue.Queue for threads instead of child processes. Learn more in the tutorial:
Now that we know how to use the multiprocessing.JoinableQueue, let’s look at some worked examples.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of Using the JoinableQueue
We can explore an example of how to use join() and task_done() on the JoinableQueue.
In this example we will have a producer process that will add ten tasks to the queue and then signal that no further tasks are to be expected. The consumer process will get the tasks, process them and mark them as done. When the signal to exit is received, the consumer process will terminate. The main process will wait for the producer process to add all items to the queue, and will wait for all items on the queue to be processed before moving on.
Let’s dive in.
Producer Process
First, we can define the function to be executed by the producer process.
The task will iterate ten times in a loop.
1 2 3 4 5 |
... print('Producer starting', flush=True) # add tasks to the queue for i in range(10): # ... |
Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then pair the generated value with an integer timestamp from 0 to 9 into a tuple and put the value on the queue.
1 2 3 4 5 6 |
... # generate a task task = (i, random()) print(f'.producer added {task}', flush=True) # add it to the queue queue.put(task) |
Finally, the producer will put the value None on the queue to signal to the consumer that there are no further tasks.
This is called a Sentinel Value and is a common way for processes to communicate via queues to signal an important event, like a shutdown.
1 2 3 4 |
... # send a signal that no further tasks are coming queue.put(None) print('Producer finished', flush=True) |
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# task for the producer process def producer(queue): print('Producer starting', flush=True) # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}', flush=True) # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished', flush=True) |
Consumer Process
Next, we can define the function to be executed by the consumer process.
The consumer process will loop forever.
1 2 3 4 5 |
... print('Consumer starting', flush=True) # process items from the queue while True: #... |
Each iteration, it will get an item from the queue and block if there is no item yet available.
1 2 3 |
... # get a task from the queue task = queue.get() |
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the process.
1 2 3 4 |
... # check for signal that we are done if task is None: break |
Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().
1 2 3 4 5 6 |
... # process the item sleep(task[1]) print(f'.consumer got {task}', flush=True) # mark the unit of work as processed queue.task_done() |
Finally, just prior to the process exiting, it will mark the signal to terminate as processed.
1 2 3 4 |
... # mark the signal as processed queue.task_done() print('Consumer finished', flush=True) |
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# task for the consumer process def consumer(queue): print('Consumer starting', flush=True) # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}', flush=True) # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished', flush=True) |
Create Queue and Child Processes
In the main process we can create the shared joinable queue instance.
1 2 3 |
... # create the shared queue queue = JoinableQueue() |
Then we can configure and start the producer process, which will generate tasks and add them to the queue for the consumer to retrieve.
1 2 3 4 |
... # create and start the producer process producer_process = Process(target=producer, args=(queue,)) producer_process.start() |
We can then configure and start the consumer process, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # create and start the consumer process consumer_process = Process(target=consumer, args=(queue,)) consumer_process.start() |
The main process will then block until the producer process has added all work to the queue and the process has terminated.
1 2 3 4 |
... # wait for the producer to finish producer_process.join() print('Main found that the producer has finished', flush=True) |
The main process will then block on the queue with a call to join() until the consumer has retrieved all values from the queue and processed them appropriately. This includes the final signal that there are no further task items to process.
1 2 3 4 |
... # wait for the queue to empty queue.join() print('Main found that all tasks are processed', flush=True) |
It is important that the main process blocks on the producer process first, before blocking on the queue. This is to avoid a possible race condition.
For example, if the main process blocked on the queue directly, it is possible that at that time for the queue to be empty, in which case the call would return immediately. Alternatively, it may join at a time when there are only a few tasks on the queue, they are consumed by the consumer process and the join call returns.
The problem is that in both of these cases, we don’t know if the call to join returned because all tasks were marked done or just a subset of tasks that had been added to the queue at the time join was called.
By waiting for the producer process to terminate first, we know that all tasks that could be added to the queue have been added to the queue. By blocking on the queue after the producer has finished, we know that when the call to join returns, that all tasks were added to the queue, all tasks were retrieved from the queue and all retrieved tasks were marked as done.
Complete Example
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# SuperFastPython.com # example of join and task done for a joinable queue with processes from time import sleep from random import random from multiprocessing import JoinableQueue from multiprocessing import Process # task for the producer process def producer(queue): print('Producer starting', flush=True) # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}', flush=True) # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished', flush=True) # task for the consumer process def consumer(queue): print('Consumer starting', flush=True) # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}', flush=True) # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished', flush=True) # entry point if __name__ == '__main__': # create the shared queue queue = JoinableQueue() # create and start the producer process producer_process = Process(target=producer, args=(queue,)) producer_process.start() # create and start the consumer process consumer_process = Process(target=consumer, args=(queue,)) consumer_process.start() # wait for the producer to finish producer_process.join() print('Main found that the producer has finished', flush=True) # wait for the queue to empty queue.join() print('Main found that all tasks are processed', flush=True) |
Running the example first creates the queue to be shared between the producer and consumer processes.
Then the producer process is created and configured to execute our producer() function and passed in the shared queue instance. The producer process is then started.
Next, the consumer process is configured to execute our consumer() function and is then started.
The producer process runs as fast as it is able, each iteration generating a random number and adding it along with the task number to the queue. Once all ten tasks have been added to the queue, a None message is sent to the queue to indicate no further messages are to be expected and the producer process terminates.
The main process notices that the producer process has terminated, then blocks on the queue itself waiting for all tasks to be retrieved and marked as done.
The consumer process retrieves tasks one at a time from the queue, blocks and reports their value, then marks the task as done. This is repeated until the sentinel message is received and the loop is broken. Just before the consumer process is terminated, the final sentinel message is marked as done.
The main process notices that all tasks were marked done and is then free to continue on with other tasks. In this case, the main process reports a message and terminates.
A sample of the output from the program is listed below. Note, your specific results will differ given the use of random numbers.
In this case, we can see that the producer ran first, adding all tasks to the queue. Then the consumer process ran, reading off each value. After all tasks are marked done, finally the main process continues on.
This highlights how to use task_done() and join() on a process queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Producer starting .producer added (0, 0.17191334178951057) Consumer starting .producer added (1, 0.7281669000124695) .producer added (2, 0.8835116735385814) .producer added (3, 0.3613833018911995) .producer added (4, 0.6356607291920195) .producer added (5, 0.8971417071906157) .producer added (6, 0.424420704274559) .producer added (7, 0.017250244327121078) .producer added (8, 0.35167161248303724) .producer added (9, 0.5265211005357712) Producer finished Main found that the producer has finished .consumer got (0, 0.17191334178951057) .consumer got (1, 0.7281669000124695) .consumer got (2, 0.8835116735385814) .consumer got (3, 0.3613833018911995) .consumer got (4, 0.6356607291920195) .consumer got (5, 0.8971417071906157) .consumer got (6, 0.424420704274559) .consumer got (7, 0.017250244327121078) .consumer got (8, 0.35167161248303724) .consumer got (9, 0.5265211005357712) Consumer finished Main found that all tasks are processed |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to share data among processes with the JoinableQueue in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Arno Senoner on Unsplash
tsering says
How to handle if a task is failed to process?
Jason Brownlee says
One approach might be to re-submit the task if it fails.
For example, we can do this using a process pool:
https://superfastpython.com/processpoolexecutor-retry-tasks/
Charles "Chuck" Wegrzyn says
I figured you might be trying to have this work in Python 2, but for Python 3 I would do this
while while task := queue.get():
instead of this code:
while True:
# get a task from the queue
task = queue.get()
# check for signal that we are done
if task is None:
break
Might as well use it if we have it….lol
Jason Brownlee says
Thanks. Not a fan of the walrus operator yet.
I’m trying to write code that almost all pydevs will read and get instantly.
jim says
Better example – feeding a Process pool via a queue. Using the task compete method, and the error_method. Using the “swarm” process for the Processes. Not sure what the advantage is to putting your producer ( que manager) code into a process. Would it be better just to lave this in the main process and just run the consumers in the Process pool. Methods : result_callback, error_callback, task. The examples I see on the internet commonly don’t include error_callback, which comes in handy for debugging.
Most the the Pool examples that I’ve seen are tinker toys and not applicable to the usual stuff that you see. Didn’t see anything where when one process is done, another gets started in the pool.
The example that I’m looking for would have a function call that I would want to run a few thousand times with unique inputs. The pool is limited # of cpu processors -1.
Want to run my tasks in as much a parallel form as the machine can handle. Want the result_callback for each task. Think I need “swarm” to keep from blowing memory.
When a que item gets que.task_done() – does gc occur to remove this item from the que? Does it stay in the que, just marking it as completed ?
Writing a multiprocessing program without some kind of que structure to load the Processes, makes the code machine dependant ?
Appreciate that someone has these samples out there.
Jim
Jason Brownlee says
Perhaps you can use a process pool directly without the external queue. E.g. a Pool or a ProcessPoolExecutor that have their own internal queues for issued tasks.
The call to task_done() does not remove an item from the queue, it only changes an internal counter. You can learn more here (in the context of threads, but its the same stuff):
https://superfastpython.com/thread-queue-task-done-join/
Queue’s are not needed to “load processes”. Python implementations are machine-dependent (e.g. the c code under the covers), but the Python code (e.g. the API) is machine independent.