Last Updated on September 12, 2022
You can use a thread-safe Priority Queue via the queue.PriorityQueue class.
In this tutorial you will discover how to use Priority Queues in Python.
Let’s get started.
Need for a Priority Queue
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guide:
In concurrent programming, we often need to share data between threads.
One approach to sharing data is to use a queue data structure.
Python provides a number of thread-safe queues in the queue module, such as the queue.PriorityQueue class.
What is a Priority Queue and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
Priority Ordering
A queue is a data structure for maintaining a linear sequence of items.
The difference between queues is the order in which items are maintained. Specifically, the order in which items are returned by calls to get() relative to the order in which they were added via calls to put().
A priority queue is a queue in which the order that items are retrieved from the queue is defined by an item priority.
With a priority queue, the entries are kept sorted (using the heapq module) and the lowest valued entry is retrieved first.
— queue — A synchronized queue class
The priority order is determined by their value.
Internally, the priority queue makes use of the heapq module that provides utilities for maintaining ordered queues using a heap data structure (a tree).
This module provides an implementation of the heap queue algorithm, also known as the priority queue algorithm. Heaps are binary trees for which every parent node has a value less than or equal to any of its children.
— heapq — Heap queue algorithm
This means the priority queue works like a list that is kept sorted. For example, if the priority queue held integers, they would be kept in ascending integer values where lower values indicated higher priority.
Consider if we created a priority queue and added the following three items ‘7’, ‘9’, and ‘2’.
For example:
1 2 3 4 5 |
... # add 3 items to a priority queue queue.put('7') queue.put('9') queue.put('2') |
Internally, the items would be added to a heap data structure and kept in ascending order 2, 7, 9.
Removing items from this queue in priority order would result in the items returning ‘2’, ‘7’, ‘9’.
For example:
1 2 3 4 5 |
... # get 3 items from a priority queue item = queue.get() # 2 item = queue.get() # 7 item = queue.get() # 9 |
Next, let’s look at how we might use a priority queue in Python.
How to Use the PriorityQueue
Python provides a thread-safe priority queue in the queue.PriorityQueue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.
— queue — A synchronized queue class
Thread-safe means that it can be used by multiple threads to put and get items concurrently without a race condition.
The PriorityQueue maintains items in the queue in priority order, which is ascending order based on the evaluated value of each item.
PriorityQueue Items
We may use a composite data structure or an object as an item to store data in a priority queue with separate priority and data.
For example, we may use a tuple where the first element contains the priority of the data and the second contains the data object itself.
For example:
1 2 3 |
... # example of a tuple representing an item in a priority queue item = (priority, data) |
Items in the priority queue would then be sorted by the first item in the tuple, the priority integer or floating point value, and the second item in the tuple would hold the data.
Alternatively, a new class may be defined where the first attribute is the priority and other attributes represent the data for the item on the queue.
The queue module API documentation provides an example of such a data class.
Create a PriorityQueue
The queue.PriorityQueue can be used by first creating an instance of the class. This will create an unbounded queue by default, that is, a queue with no size limit.
For example:
1 2 3 |
... # created an unbounded queue queue = queue.PriorityQueue() |
A queue can be created with a size limit by specifying the “maxsize” argument to a value larger than zero.
For example:
1 2 3 |
... # created a size limited queue queue = queue.PriorityQueue(maxsize=100) |
Add Items to the PriorityQueue
Once a size limited queue is full, new items cannot be added and calls to put() will block until space becomes available.
Items can be added to the queue via a call to put(), for example:
1 2 3 |
... # add an item to the queue queue.put(item) |
By default, the call to put() will block and will not use a timeout.
For example, the above is equivalent to the following:
1 2 3 |
... # add an item to the queue queue.put(item, block=True, timeout=None) |
The call to put() will block if the queue is full. We can choose to not block when adding items by setting the “block” argument to False. If the queue is full, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put(item, block=False) except queue.Full: # ... |
This can also be achieved with the put_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put_nowait(item) except queue.Full: # ... |
Alternatively we may wish to add an item to a size limited queue and block with a timeout. This can be achieved by setting the “timeout” argument to a positive value in seconds. If an item cannot be added before the timeout expires, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue with a timeout try: queue.put(item, timeout=5) except queue.Full: # ... |
Get Items From The PriorityQueue
Items can be retrieved from the queue by calls to get().
For example:
1 2 3 |
... # get an item from the queue item = queue.get() |
By default, the call to get() will block until an item is available to retrieve from the queue and will not use a timeout. Therefore, the above call is equivalent to the following:
1 2 3 |
... # get an item from the queue item = queue.get(block=True, timeout=0) |
We can retrieve items from the queue without blocking by setting the “block” argument to False. If an item is not available to retrieve, then a queue.Empty exception will be raised and may be handled.
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get(block=False) except queue.Empty: # ... |
This can also be achieved with the get_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get_nowait() except queue.Empty: # ... |
Alternatively, we may wish to retrieve items from the queue and block with a time limit. This can be achieved by setting the “timeout” argument to a positive value in seconds. If the timeout expires before an item can be retrieved, then a queue.Empty exception will be raised and may be handled.
For example:
1 2 3 4 5 6 |
... # get an item from the queue with a timeout try: item = queue.get(timeout=10) except queue.Empty: # ... |
Query PriorityQueue Size
The number of items in the queue can be checked by the qsize() function.
For example:
1 2 3 |
... # check the size of the queue size = queue.qsize() |
We can check if the queue contains no values via the empty() function.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We may also check if the queue is full, if it is size limited when configured.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
Note, the state of the queue may change immediately after any of these queries, so care must be taken to avoid a race condition.
For example, this would not be thread-safe:
1 2 3 4 5 |
... # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
Although not recommended, you could acquire the queue.PriorityQueue classes mutex to make the operation thread-safe.
This would prevent any other thread from changing the state of the queue while performing your operation.
For example:
1 2 3 4 5 6 7 |
... # acquire the lock on the queue with queue.mutex: # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
PriorityQueue Join and Task Done
An object added to a queue by representing a task or a unit of work.
When a consumer thread calls get() to retrieve the item from the queue, it may need to process it before the task is considered complete.
Once complete, the thread may then call the task_done() method on the queue to indicate that the item that was just retrieved has been completely processed.
For example:
1 2 3 4 5 6 7 |
... # get a task item = queue.get() # process it # ... # mark the task as completed queue.task_done() |
This is helpful to other threads that may be interested to know once all tasks that have been added to the queue have been processed completely.
Other threads can wait for all tasks currently on the to be completed by calling the join() function.
For example:
1 2 3 |
... # wait for all current tasks on the queue to be marked as done queue.join() |
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
— Queue Objects, queue — A synchronized queue class
If the task_done() function is called more times than there are items on the queue, then a ValueError will be raised to indicate the invalid state.
If the join() function is called after all tasks have been marked done, then the function will return immediately.
Now that we know how to use the queue.PriorityQueue, let’s look at a worked example.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
Example of Using a PriorityQueue
We can explore how to use the queue.PriorityQueue class with a worked example.
In this example, we will create a producer thread that will generate ten random numbers as data and put them on the queue with a randomly determined priority. We will also create a consumer thread that will get numbers from the queue in priority order (ascending order or lower values are higher priority) and report their values.
The producer thread will run fast and populate the queue as fast as it can. The consumer will retrieve the items from the queue, block for a random fraction of a second, then report the value.
This will demonstrate the priority ordering and multithreaded nature of the queue.PriorityQueue class.
Additionally, the consumer will mark each item as done. This will be helpful to the producer to know when all items have been processed so that a special shutdown signal can be sent to the consumer, called a sentinel value.
Producer Thread
First, we can define the function to be executed by the producer thread.
The task will iterate ten times in a loop.
1 2 3 4 |
... # generate work for i in range(10): # ... |
Each iteration, it will generate a new random value between 0 and 1 via the random.random() function and a random integer between 0 and 10 as the priority via the random.randint() function.
1 2 3 4 5 |
... # generate a value value = random() # generate a priority priority = randint(0, 10) |
The priority and data will be paired into a tuple so that each item is ordered by the priority, separate from the data. The tuple is then added to the priority queue.
1 2 3 4 5 |
... # create an item item = (priority, value) # add to the queue queue.put(item) |
Once the task is complete it will block on the queue until all items have been processed and marked as done by the consumer.
This can be achieved by calling the join() function.
1 2 3 |
... # wait for all items to be processed queue.join() |
Finally, the producer will put the value None on the queue to signal to the consumer that there is no further work. This is called a Sentinel Value and is a common way for threads to communicate via queues to signal an important event, like a shutdown.
1 2 3 |
... # send sentinel value queue.put(None) |
It is important that we send this signal after all items have been processed as it will not be comparable with our tuples, resulting in an exception.
An alternative approach would be to add a tuple with a very low priority (large or out of band integer value) and a data value of None. This item would be processed last by the consumer thread and not require the producer thread to join.
1 2 3 |
... # send sentinel value with lowest priority queue.put((100, None)) |
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # generate a priority priority = randint(0, 10) # create an item item = (priority, value) # add to the queue queue.put(item) # wait for all items to be processed queue.join() # send sentinel value queue.put(None) print('Producer: Done') |
Consumer Thread
Next, we can define the function to be executed by the consumer thread.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the thread. Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().
Reporting items as they are retrieved will demonstrate that they were processed in priority order.
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # block sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') |
Create PriorityQueue and Threads
Finally, in the main thread we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = PriorityQueue() |
Then we can configure and start the producer thread, which will generate work and add it to the queue for the consumer to retrieve.
1 2 3 4 |
... # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() |
We can then configure and start the consumer thread, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() |
The main thread will then block until both the producer and consumer threads terminate, then terminate itself.
1 2 3 4 |
... # wait for all threads to finish producer.join() consumer.join() |
Complete Example
Tying this together, the complete example of using the queue.PriorityQueue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# SuperFastPython.com # example of using the priority queue from time import sleep from random import random from random import randint from threading import Thread from queue import PriorityQueue # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # generate a priority priority = randint(0, 10) # create an item item = (priority, value) # add to the queue queue.put(item) # wait for all items to be processed queue.join() # send sentinel value queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # block sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') # create the shared queue queue = PriorityQueue() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # wait for threads to finish producer.join() consumer.join() |
Running the example first creates the shared queue.PriorityQueue instance.
The producer thread and consumer threads are configured and started and the main thread blocks until the new threads terminate.
Next, the producer thread generates a new random value and random priority each iteration of the task and adds them as a tuple to the queue. The producer thread does not block so it adds all of its values to the queue before the consumer thread starts processing.
The producer thread finishes all of its items on the queue and then blocks on the queue until all work has been marked as done.
The consumer thread waits on the queue for items to arrive, then consumes them one at a time in priority order. For each item consumed, the consumer thread blocks for a moment, then reports the value. All work units are marked as done.
The producer is notified that all work units are done, then sends a None value to signal to the consumer that no further work units are expected, then terminates.
The consumer thread gets the None value, breaks its loop and also terminates.
This highlights how the queue.PriorityQueue can be used to share data easily between a producer and consumer threads and that items can be consumed from the queue in priority order.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see that indeed the messages added to the queue were processed in priority order (ascending integer values).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Producer: Running Consumer: Running >got (0, 0.0828697796138792) >got (0, 0.4530969199311018) >got (1, 0.6194263138195527) >got (1, 0.933420944414303) >got (2, 0.5545498132122124) >got (2, 0.6857092203915951) >got (3, 0.3501221466243839) >got (7, 0.19032109263026764) >got (8, 0.5022069566894527) >got (9, 0.8435921663610007) Producer: Done Consumer: Done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use a thread-safe PriorityQueue in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Taras Chernus on Unsplash
Do you have any questions?