You can use a coroutine-safe priority queue via the asyncio.PriorityQueue class.
In this tutorial, you will discover how to use an asyncio priority queue in Python.
Let’s get started.
What is Priority Ordering
A queue is a data structure for maintaining a linear sequence of items.
The difference between queues is the order in which items are maintained. Specifically, the order in which items are returned by calls to get() relative to the order in which they were added via calls to put().
A priority queue is a queue in which the order that items are retrieved from the queue is defined by an item priority.
With a priority queue, the entries are kept sorted (using the heapq module) and the lowest valued entry is retrieved first.
— QUEUE — A SYNCHRONIZED QUEUE CLASS
The priority order is determined by their value.
Internally, the priority queue makes use of the heapq module that provides utilities for maintaining ordered queues using a heap data structure (a tree).
This module provides an implementation of the heap queue algorithm, also known as the priority queue algorithm. Heaps are binary trees for which every parent node has a value less than or equal to any of its children.
— HEAPQ — HEAP QUEUE ALGORITHM
This means the priority queue works like a list that is kept sorted. For example, if the priority queue held integers, they would be kept in ascending integer values where lower values indicated higher priority.
Consider if we created a priority queue and added the following three items ‘7’, ‘9’, and ‘2’.
For example:
1 2 3 4 5 |
... # add 3 items to a priority queue queue.put('7') queue.put('9') queue.put('2') |
Internally, the items would be added to a heap data structure and kept in ascending order 2, 7, 9.
Removing items from this queue in priority order would result in the items returning ‘2’, ‘7’, ‘9’.
For example:
1 2 3 4 5 |
... # get 3 items from a priority queue item = queue.get() # 2 item = queue.get() # 7 item = queue.get() # 9 |
Next, let’s look at how we might use a priority queue in Python.
Run loops using all CPUs, download your FREE book to learn how.
How to Use the PriorityQueue
Python provides a coroutine-safe priority queue in the asyncio.PriorityQueue class.
A variant of Queue; retrieves entries in priority order (lowest first).
— Asyncio Queues
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The PriorityQueue maintains items in the queue in priority order, which is ascending order based on the evaluated value of each item.
PriorityQueue Items
We may use a composite data structure or an object as an item to store data in a priority queue with separate priority and data.
For example, we may use a tuple where the first element contains the priority of the data and the second contains the data object itself.
For example:
1 2 3 |
... # example of a tuple representing an item in a priority queue item = (priority, data) |
Items in the priority queue would then be sorted by the first item in the tuple, the priority integer or floating point value, and the second item in the tuple that would hold the data.
Alternatively, a new class may be defined where the first attribute is the priority and other attributes represent the data for the item on the queue.
Create a PriorityQueue
The asyncio.PriorityQueue can be used by first creating an instance of the class. This will create an unbounded queue by default, that is, a queue with no size limit.
For example:
1 2 3 |
... # created a priority queue queue = asyncio.PriorityQueue() |
A queue can be created with a size limit by specifying the “maxsize” argument to a value larger than zero.
For example:
1 2 3 |
... # created a size limited queue queue = asyncio.PriorityQueue(maxsize=100) |
Add Items to the PriorityQueue
Python objects can be added to the priority queue via the put() method.
This is in fact a coroutine that must be awaited. The reason is that the calling coroutine may block if the queue is full.
For example:
1 2 3 |
... # add an object to the queue await queue.put(item) |
An item can also be added to the queue without blocking via the put_nowait() method.
This method is not a coroutine and will either add the item or return immediately or fail with an asyncio.QueueFull exception if the queue is full and the item cannot be added.
For example:
1 2 3 4 5 6 |
... try: # attempt to add an item queue.put_nowait(item) except asyncio.QueueFull: # ... |
Get Items From The PriorityQueue
Items can be retrieved from the priority queue by calling the get() method.
This is in fact a coroutine that must be awaited. The reason is that the queue may not have any items to retrieve at the time, and the calling coroutine may need to block until an item becomes available.
For example:
1 2 3 |
... # retrieve an item from the queue item = await queue.get() |
The item retrieved will be the oldest item added, e.g. FIFO ordering.
An item can be retrieved from the queue without blocking via the get_nowait() method.
This method is not a coroutine and will return an item immediately if available, otherwise will fail with an asyncio.QueueEmpty exception.
For example:
1 2 3 4 5 6 |
... try: # attempt retrieve an item item = queue.get_nowait() except asyncio.QueueEmpty: # ... |
Query PriorityQueue Size
We can retrieve the fixed size of the priority queue via the “maxsize” property.
For example:
1 2 3 |
... # report the size of the queue print(queue.maxsize) |
We can check if the queue is empty via the empty() which returns True if the queue contains no items or False otherwise.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We can also check if the queue is full via the full() method that returns True if the queue is at capacity or False otherwise.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
PriorityQueue Join and Task Done
An object is added to a priority queue by representing a task or a unit of work.
When a consumer coroutine calls get() to retrieve the item from the queue, it may need to process it before the task is considered complete.
Once complete, the coroutine may then call the task_done() method on the queue to indicate that the item that was just retrieved has been completely processed.
For example:
1 2 3 4 5 6 7 |
... # get a task item = await queue.get() # process it # ... # mark the task as completed queue.task_done() |
This is helpful to other coroutines that may be interested to know once all tasks that have been added to the queue have been processed completely.
Other coroutines can wait for all tasks currently on the queue to be completed by calling the join() function.
For example:
1 2 3 |
... # wait for all current tasks on the queue to be marked as done await queue.join() |
If the task_done() function is called more times than there are items on the queue, then a ValueError will be raised to indicate the invalid state.
If the join() function is called after all tasks have been marked done, then the function will return immediately.
Now that we know how to use the asyncio.PriorityQueue, let’s look at a worked example.
Example of Using an Asyncio PriorityQueue
We can explore how to use the asyncio.PriorityQueue class with a worked example.
In this example, we will create a producer coroutine that will generate ten random numbers as data and put them on the queue with a randomly determined priority. We will also create a consumer coroutine that will get numbers from the queue in priority order (ascending order or lower values are higher priority) and report their values.
The producer coroutine will run fast and populate the queue as fast as it can. The consumer will retrieve the items from the queue, block for a random fraction of a second, then report the value.
This will demonstrate the priority ordering of the asyncio.PriorityQueue class.
Additionally, the consumer will mark each item as done. This will be helpful to the producer to know when all items have been processed so that a special shutdown signal can be sent to the consumer, called a sentinel value.
Producer Coroutine
First, we can define the function to be executed by the producer coroutine.
The task will iterate ten times in a loop.
1 2 3 4 |
... # generate work for i in range(10): # ... |
Each iteration, it will generate a new random value between 0 and 1 via the random.random() function and a random integer between 0 and 10 as the priority via the random.randint() function.
1 2 3 4 5 |
... # generate a value value = random() # generate a priority priority = randint(0, 10) |
The priority and data will be paired into a tuple so that each item is ordered by the priority, separate from the data. The tuple is then added to the priority queue.
1 2 3 4 5 |
... # create an item item = (priority, value) # add to the queue await queue.put(item) |
Once the task is complete it will block on the queue until all items have been processed and marked as done by the consumer.
This can be achieved by calling the join() function.
1 2 3 |
... # wait for all items to be processed await queue.join() |
Finally, the producer will put the value None on the queue to signal to the consumer that there is no further work. This is called a Sentinel Value and is a common way for coroutines to communicate via queues to signal an important event, like a shutdown.
1 2 3 |
... # send sentinel value await queue.put(None) |
It is important that we send this signal after all items have been processed as it will not be comparable with our tuples, resulting in an exception.
An alternative approach would be to add a tuple with a very low priority (large or out-of-band integer value) and a data value of None. This item would be processed last by the consumer coroutine and not require the producer coroutine to join.
1 2 3 |
... # send sentinel value with lowest priority await queue.put((100, None)) |
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # generate a priority priority = randint(0, 10) # create an item item = (priority, value) # add to the queue await queue.put(item) # wait for all items to be processed await queue.join() # send sentinel value await queue.put(None) print('Producer: Done') |
Consumer Coroutine
Next, we can define the function to be executed by the consumer coroutine.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the coroutine. Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().
Reporting items as they are retrieved will demonstrate that they were processed in priority order.
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # check for stop if item is None: break # block await asyncio.sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') |
Create PriorityQueue and Coroutines
Finally, in the main coroutine, we can create the shared print queue instance.
We can then create and run the producer and consumer coroutines and wait for them both to complete via the asyncio.gather() method.
1 2 3 4 5 6 |
# entry point coroutine async def main(): # create the shared queue queue = asyncio.PriorityQueue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) |
Complete Example
Tying this together, the complete example of using the asyncio.PriorityQueue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of using the asyncio priority queue from time import sleep from random import random from random import randint import asyncio # generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # generate a priority priority = randint(0, 10) # create an item item = (priority, value) # add to the queue await queue.put(item) # wait for all items to be processed await queue.join() # send sentinel value await queue.put(None) print('Producer: Done') # consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # check for stop if item is None: break # block await asyncio.sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') # entry point coroutine async def main(): # create the shared queue queue = asyncio.PriorityQueue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) # start the asyncio program asyncio.run(main()) |
Running the example first creates the shared asyncio.PriorityQueue instance.
The producer coroutine and consumer coroutines are configured and started and the main coroutine blocks until the new coroutines terminate.
Next, the producer coroutine generates a new random value and random priority for each iteration of the task and adds them as a tuple to the queue. The producer coroutine does not block so it adds all of its values to the queue before the consumer coroutine starts processing.
The producer coroutine finishes all of its items on the queue and then blocks on the queue until all work has been marked as done.
The consumer coroutine waits on the queue for items to arrive, then consumes them one at a time in priority order. For each item consumed, the consumer coroutine blocks for a moment then reports the value. All work units are marked as done.
The producer is notified that all work units are done, then sends a None value to signal to the consumer that no further work units are expected, then terminates.
The consumer coroutine gets the None value, breaks its loop, and also terminates.
This highlights how the asyncio.PriorityQueue can be used to share data easily between a producer and consumer coroutines and that items can be consumed from the queue in priority order.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see that indeed the messages added to the queue were processed in priority order (ascending integer values).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Producer: Running Consumer: Running >got (3, 0.4365639343825306) >got (3, 0.9432548077989523) >got (4, 0.3900794957833612) >got (4, 0.6686971487660403) >got (5, 0.5427515210968707) >got (6, 0.5964368836047645) >got (7, 0.7587592727122667) >got (7, 0.894722600162155) >got (10, 0.4072262186357074) >got (10, 0.8525338760651432) Producer: Done Consumer: Done |
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to use an asyncio priority queue in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?