Last Updated on September 12, 2022
You can communicate between processes with queue via the multiprocessing.Queue class.
In this tutorial you will discover how to use the process queue in Python.
Let’s get started.
Need for a Queue
A process is a running instance of a computer program.
Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create new child processes in our program in order to execute code concurrently.
Python provides the ability to create and manage new processes via the multiprocessing.Process class.
In multiprocessing programming, we often need to share data between processes.
One approach to sharing data is to use a queue data structure.
Python provides a number of process-safe queues, such as the multiprocessing.Queue class.
What is the Queue and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Use the Queue
Python provides a process-safe queue in the multiprocessing.Queue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The multiprocessing.Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.
Let’s look at how we can use the multiprocessing.Queue class.
Create a Queue
The multiprocessing.Queue can be used by first creating an instance of the class. This will create an unbounded queue by default, that is, a queue with no size limit.
For example:
1 2 3 |
... # created an unbounded queue queue = multiprocessing.Queue() |
A create can be created with a size limit by specifying the “maxsize” argument to a value larger than zero.
For example:
1 2 3 |
... # created a size limited queue queue = multiprocessing.Queue(maxsize=100) |
Add Items to the Queue
Once a size limited queue is full, new items cannot be added and calls to put() will block until space becomes available.
Items can be added to the queue via a call to put(), for example:
1 2 3 |
... # add an item to the queue queue.put(item) |
By default, the call to put() will block and will not use a timeout.
For example, the above is equivalent to the following:
1 2 3 |
... # add an item to the queue queue.put(item, block=True, timeout=None) |
The call to put() will block if the queue is full. We can choose to not block when adding items by setting the “block” argument to False. If the queue is full, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put(item, block=False) except queue.Full: # ... |
This can also be achieved with the put_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put_nowait(item) except queue.Full: # ... |
Alternatively we may wish to add an item to a size limited queue and block with a timeout. This can be achieved by setting the “timeout” argument to a positive value in seconds. If an item cannot be added before the timeout expires, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue with a timeout try: queue.put(item, timeout=5) except queue.Full: # ... |
Get Items From The Queue
Items can be retrieved from the queue by calls to get().
For example:
1 2 3 |
... # get an item from the queue item = queue.get() |
By default, the call to get() will block until an item is available to retrieve from the queue and will not use a timeout. Therefore, the above call is equivalent to the following:
1 2 3 |
... # get an item from the queue item = queue.get(block=True, timeout=0) |
We can retrieve items from the queue without blocking by setting the “block” argument to False. If an item is not available to retrieve, then a queue.Empty exception will be raised and may be handled.
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get(block=False) except queue.Empty: # ... |
This can also be achieved with the get_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get_nowait() except queue.Empty: # ... |
Alternatively, we may wish to retrieve items from the queue and block with a time limit. This can be achieved by setting the “timeout” argument to a positive value in seconds. If the timeout expires before an item can be retrieved, then a queue.Empty exception will be raised and may be handled.
For example:
1 2 3 4 5 6 |
... # get an item from the queue with a timeout try: item = queue.get(timeout=10) except queue.Empty: # ... |
Query Queue Size
The number of items in the queue can be checked by the qsize() function.
For example:
1 2 3 |
... # check the size of the queue size = queue.qsize() |
We can check if the queue contains no values via the empty() function.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We may also check if the queue is full, if it is size limited when configured.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
Note, the state of the queue may change immediately after any of these queries, so care must be taken to avoid a race condition.
For example, this would not be process-safe:
1 2 3 4 5 |
... # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
Now that we know how to use the multiprocessing.Queue class, let’s look at some worked examples.
Example of Using a Queue
We can explore how to use the multiprocessing.Queue class with a worked example.
In this example, we will create a producer child process that will generate ten random numbers and put them on the queue. We will also create a consumer child process that will get numbers from the queue and report their values.
The multiprocessing.Queue provides a way to allow these producer and consumer processes to communicate data with each other.
First, we can define the function to be executed by the producer process.
The task will iterate ten times in a loop. Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then block for that fraction of a second, then put the value on the queue.
Once the task is complete it will put the value None on the queue to signal to the consumer that there is no further work.
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# generate work def producer(queue): print('Producer: Running', flush=True) # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done', flush=True) |
Next, we can define the function to be executed by the consumer process.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the process. Otherwise, the value is reported.
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# consume work def consumer(queue): print('Consumer: Running', flush=True) # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # report print(f'>got {item}', flush=True) # all done print('Consumer: Done', flush=True) |
Finally, in the main process we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = Queue() |
We can then configure and start the consumer process, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # start the consumer consumer_process = Process(target=consumer, args=(queue,)) consumer_process.start() |
Then we can configure and start the producer process, which will generate work and add it to the queue for the consumer to retrieve.
1 2 3 4 |
... # start the producer producer_process = Process(target=producer, args=(queue,)) producer_process.start() |
The main process will then block until both the producer and consumer processes terminate, then terminate itself.
1 2 3 4 |
... # wait for all processes to finish producer_process.join() consumer_process.join() |
Tying this together, the complete example of using the multiprocessing.Queue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# SuperFastPython.com # example of using the queue with processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Queue # generate work def producer(queue): print('Producer: Running', flush=True) # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done', flush=True) # consume work def consumer(queue): print('Consumer: Running', flush=True) # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # report print(f'>got {item}', flush=True) # all done print('Consumer: Done', flush=True) # entry point if __name__ == '__main__': # create the shared queue queue = Queue() # start the consumer consumer_process = Process(target=consumer, args=(queue,)) consumer_process.start() # start the producer producer_process = Process(target=producer, args=(queue,)) producer_process.start() # wait for all processes to finish producer_process.join() consumer_process.join() |
Running the example first creates the shared multiprocessing.Queue instance.
Next, the consumer process is created and passed the queue instance. Then the producer process is started and the main process blocks until the worker processes terminate.
The producer process generates a new random value each iteration of the task, blocks and adds it to the queue. The consumer process waits on the queue for items to arrive, then consumes them one at a time, reporting their value.
Finally, the producer task finishes, a None value is put on the queue and the process terminates. The consumer process gets the None value, breaks its loop and also terminates.
This highlights how the multiprocessing.Queue can be used to share data easily between a producer and consumer processes.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Consumer: Running Producer: Running >got 0.32321446554917554 >got 0.3842895366752117 >got 0.8866995716571132 >got 0.005021326155600803 >got 0.7021642936563511 >got 0.1021891092939955 >got 0.7857069833021124 >got 0.513372258782853 >got 0.5931580317744637 Producer: Done >got 0.5689894972203111 Consumer: Done |
Next, let’s look at how we might get values from the queue without blocking.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Using a Queue Without Blocking
We can get values from the multiprocessing.Queue without blocking.
This might be useful if we wish to use busy waiting in the consumer task to check other states or perform other tasks while waiting for data to arrive on the queue.
You can learn more about busy waiting in the tutorial:
We can update the example from the previous section to get items from the queue without blocking.
This can be achieved by setting the “blocking” argument to False when calling get().
For example:
1 2 3 |
... # get a value from the queue without blocking item = queue.get(block=False) |
The get() function will return immediately.
If there is a value in the queue to retrieve, then it is returned. Otherwise, if the queue is empty, then an queue.Empty exception is raised, which can be handled.
In this case, if there is no value to get from the queue, we report a message and sleep for a fraction of a second. We will then continue which will jump back to the start of the consumer busy waiting loop.
For example:
1 2 3 4 5 6 7 8 |
... # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...', flush=True) sleep(0.5) continue |
The updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# consume work def consumer(queue): print('Consumer: Running', flush=True) # consume work while True: # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...', flush=True) sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}', flush=True) # all done print('Consumer: Done', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# SuperFastPython.com # example of using the queue between processes without blocking from time import sleep from random import random from multiprocessing import Process from multiprocessing import Queue from queue import Empty # generate work def producer(queue): print('Producer: Running', flush=True) # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done', flush=True) # consume work def consumer(queue): print('Consumer: Running', flush=True) # consume work while True: # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...', flush=True) sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}', flush=True) # all done print('Consumer: Done', flush=True) # entry point if __name__ == '__main__': # create the shared queue queue = Queue() # start the consumer process consumer_process = Process(target=consumer, args=(queue,)) consumer_process.start() # start the producer process producer_process = Process(target=producer, args=(queue,)) producer_process.start() # wait for all processes to finish producer_process.join() consumer_process.join() |
Running the example creates the shared multiprocessing.Queue, then starts the consumer and producer processes, as before.
The producer process will generate, block and add items to the queue.
The consumer process will attempt to get a value from the queue. If there is no value to retrieve, an queue.Empty exception is raised and handled by reporting a message, sleeping for a fraction of a second then starting the busy wait loop again.
Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see the messages from the consumer process busy waiting for new data to arrive in the queue.
This highlights how to get items from the multiprocessing.Queue without blocking.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
Consumer: Running Consumer: got nothing, waiting a while... Producer: Running Consumer: got nothing, waiting a while... >got 0.5612374602030994 >got 0.14488044454921012 >got 0.030220788923018715 >got 0.16924209194407025 >got 0.0812798571435901 Consumer: got nothing, waiting a while... >got 0.4102769523644676 Consumer: got nothing, waiting a while... >got 0.271948071166351 Consumer: got nothing, waiting a while... >got 0.5846904456412122 Consumer: got nothing, waiting a while... >got 0.5872974022673578 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... Producer: Done >got 0.6900489760069827 Consumer: Done |
Next, let’s look at how we might get values from the queue with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Using a Queue With a Timeout
We can get values from the multiprocessing.Queue by blocking but limited by a timeout.
This allows a consumer process to both block while waiting for values to arrive in the queue, but also execute other tasks while busy waiting. It may be more efficient than being busy waiting without any blocking calls.
We can update the above example to use a timeout when getting items from the queue in the consumer process.
This can be achieved by calling the get() function and specifying a timeout value in seconds.
For example:
1 2 3 |
... # get a value and block for a timeout item = queue.get(timeout=0.5) |
If a value is available, the function will return immediately with the value.
Otherwise, if no value is available within the timeout, then an queue.Empty exception is raised which may be handled. In this case, we will handle the exception by reporting a message and starting the busy-wait loop again.
1 2 3 4 5 6 7 |
... # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...', flush=True) continue |
Tying this together, the updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work def consumer(queue): print('Consumer: Running', flush=True) # consume work while True: # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...', flush=True) continue # check for stop if item is None: break # report print(f'>got {item}', flush=True) # all done print('Consumer: Done', flush=True) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# SuperFastPython.com # example of using the queue between processes with a timeout from time import sleep from random import random from multiprocessing import Process from multiprocessing import Queue from queue import Empty # generate work def producer(queue): print('Producer: Running', flush=True) # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done', flush=True) # consume work def consumer(queue): print('Consumer: Running', flush=True) # consume work while True: # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...', flush=True) continue # check for stop if item is None: break # report print(f'>got {item}', flush=True) # all done print('Consumer: Done', flush=True) # entry point if __name__ == '__main__': # create the shared queue queue = Queue() # start the consumer consumer_process = Process(target=consumer, args=(queue,)) consumer_process.start() # start the producer producer_process = Process(target=producer, args=(queue,)) producer_process.start() # wait for all processes to finish producer_process.join() consumer_process.join() |
Running the example creates the shared multiprocessing.Queue, then starts the consumer and producer processes, as before.
The producer process will generate, block and add items to the queue.
The consumer process will attempt to get a value from the queue. The process will block for a timeout.
If no value is available before the timeout expires, then an queue.Empty exception is raised and handled by reporting a message then starting the busy wait loop again. Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see the messages from the consumer process busy waiting for new data to arrive in the queue.
This highlights how to get items from the multiprocessing.Queue by blocking with a timeout.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Consumer: Running Producer: Running Consumer: gave up waiting... >got 0.6777681327580363 >got 0.33912016606505224 Consumer: gave up waiting... >got 0.551430813825231 >got 0.4166212071285842 >got 0.4195302940423783 Consumer: gave up waiting... >got 0.8868255895307277 >got 0.30331843332848807 Consumer: gave up waiting... >got 0.5310664635533945 Consumer: gave up waiting... >got 0.5251686669518133 Consumer: gave up waiting... Producer: Done >got 0.9179018231729207 Consumer: Done |
Common Questions
This section lists some common questions when using a multiprocessing.Queue in Python.
Do you have a question about the queue?
Let me know in the comments below and I may add it to this section with my answer.
What is a Queue?
A queue is a type of data structure.
Items in the queue are ordered and the queue supports efficient operations for adding items to the end and retrieving items from the front.
There are many queue types based on the order of the items maintained within the queue, such as:
- First in, First out order or FIFO.
- List in, First out order or LIFO.
- Priority order.
What is FIFO?
FIFO is an acronym for the order of items in a queue.
It stands for: “First In, First Out”
This order means that the next item retrieved from the list will be the first item in the list that was added.
In a FIFO queue, the first tasks added are the first retrieved.
— queue — A synchronized queue class
If we think of each item added to the list as having a timestamp, then the oldest item on the list will be the next item removed and returned when requested.
For example, if the following items were added ‘A’, ‘B’, ‘C’, for example:
1 2 3 4 |
... queue.put('A') queue.put('B') queue.put('C') |
Then the items will be retrieved in the order ‘A’, ‘B’, ‘C’, for example:
1 2 3 4 |
... value = item.get() # A value = item.get() # B value = item.get() # C |
Is the Queue Process-Safe?
Yes.
The multiprocessing.Queue class is designed to be shared and used among multiple processes. It is process-safe.
This means that multiple processes may call get() and/or put() concurrently and the internal state of the queue will be maintained correctly without corruption.
It also means that queries of the queue like qsize(), empty() and full() will report the correct state of the queue at the time of the function call.
Why Not Use a queue.Queue?
A queue.Queue is thread-safe, but it is not process-safe.
This means it cannot be used among multiple processes without a race condition.
If you need to use a queue among threads, use queue.Queue, otherwise, if you need to use a queue among processes, use multiprocessing.Queue.
You can learn more about how to use queue.Queue in the tutorial:
How to Tell Consumers That There Are No More Items?
Producer process can tell consumer processes that no more items are expected by sending a special message.
This is called a sentinel object or sentinel value.
A common approach is for consumers to send the value None.
For example:
1 2 3 |
... # indicate to consumers that there are no further messages queue.put(None) |
If a consumer gets this message, it knows that no further messages are expected and to shutdown.
For example:
1 2 3 4 5 6 7 |
... # get the next value item = queue.get() # check if no further values are expected if item is None: # terminate the process return |
If there are multiple consumers that need to get the message, then a consumer may get the None item, and re-add it for other consumers to consume and respond to.
For example:
1 2 3 4 5 6 7 8 9 |
... # get the next value item = queue.get() # check if no further values are expected if item is None: # re-add the message for other consumers queue.put(item) # terminate the process return |
Does multiprocessing.Queue Support Peek?
No.
A peek operation allows a consumer to check what the next retrieved value will be without removing it.
The multiprocessing.Queue does not provide this capability.
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use the queue with processes in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Sachin Dogra on Unsplash
Charles "Chuck" Wegrzyn says
All of your examples have both the producer and consumer in the same Python program. How about an example of independently written Python programs? In other words sharing resources – queues and pipes across two independently started processes.
Jason Brownlee says
Great suggestion!
Ramses Alexander Coraspe Valdez says
Thanks for this example!
Jason Brownlee says
You’re very welcome!