You can use a coroutine-safe LIFO queue via the asyncio.LifoQueue class.
In this tutorial, you will discover how to use an asyncio last-in, first-out or LIFO queue in Python.
Let’s get started.
What is an Asyncio LifoQueue
The asyncio.LifoQueue provides a last-in, first-out (LIFO) queue for use with coroutines.
This is different from the asyncio.Queue that provides a first-in, first-out (FIFO) queue.
You can learn more about the asyncio.Queue in the tutorial:
Let’s take a closer look at the difference between LIFO and FIFO ordering in queues.
LIFO vs FIFO Ordering
In order to understand LIFO order, let’s contrast it with FIFO ordering.
A queue is a data structure for maintaining a linear sequence of items.
The difference between queues is the order in which items are maintained. Specifically, the order in which items are returned by calls to get() relative to the order in which they were added via calls to put().
Two common queue orderings are LIFO and FIFO.
LIFO
The asyncio.LifoQueue class in Python maintains items in a LIFO order.
LIFO is an acronym that stands for: Last In, First Out.
This means that the call to get() will return the last item added to the queue via put().
In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack).
— QUEUE — A SYNCHRONIZED QUEUE CLASS
This means the asyncio.LifoQueue class works like a stack.
Consider if the LifoQueue added the following three items ‘A’, ‘B’, and ‘C’.
For example:
1 2 3 4 5 |
... # add 3 items to a lifo queue queue.put('A') queue.put('B') queue.put('C') |
Removing items from this queue in LIFO order would result in the items returning ‘C’, ‘B’, and ‘A’.
For example:
1 2 3 4 5 |
... # get 3 items from a lifo queue item = queue.get() # C item = queue.get() # B item = queue.get() # A |
FIFO
The asyncio.Queue class maintains items in a LIFO order.
LIFO ordering refers to: First-in, First-Out.
This means that the call to get() will return the first item added to the queue via put(), for example, the oldest item on the queue.
In a FIFO queue, the first tasks added are the first retrieved.
— QUEUE — A SYNCHRONIZED QUEUE CLASS
This means the asyncio.Queue class works like a list.
Consider if the Queue added the following three items ‘A’, ‘B’, and ‘C’.
For example:
1 2 3 4 5 |
... # add 3 items to a fifo queue queue.put('A') queue.put('B') queue.put('C') |
Removing items from this queue in FIFO order would result in the items returning ‘A’, ‘B’, ‘C’.
For example:
1 2 3 4 5 |
... # get 3 items from a fifo queue item = queue.get() # A item = queue.get() # B item = queue.get() # C |
Next, let’s look at some uses of LIFO ordering.
Run loops using all CPUs, download your FREE book to learn how.
When to use a LIFO Queue
To better understand LIFO order, let’s look at some examples of where it may be used.
A LIFO queue or Stack is useful in many programming situations.
Five common examples include:
- Undo: Maintaining changes to an object so that they can be undone or reversed in the order they were applied.
- Depth-First: Maintaining a list of nodes when navigating a tree or graph in a depth-first manner.
- Parsers: Maintain a list of expressions in the order they must be executed.
- Backtracking: Maintain a list of options in the order they were encountered or made available, in case the current option fails.
- Freshness: Maintain a list of connections or data in the reverse order they were used so the most recent can be acquired when needed.
Also, stack-based processing of data is a common building block in many algorithms used in computer science, such as depth-first search, branch and bound, and many more.
Next, let’s look at how we might use a LIFO queue in Python.
How to Use an Asyncio LifoQueue
In this section, we will explore how to use the asyncio.LifoQueue class, including how to create and configure an instance, how to add and remove items, query the properties of the queue and manage tasks.
A variant of Queue that retrieves most recently added entries first (last in, first out).
— LIFO Queue
Create an Asyncio LifoQueue
We can create an asyncio.LifoQueue by creating an instance of the class.
By default, the queue will not be limited in capacity.
For example:
1 2 3 |
... # create a queue with no size limit queue = asyncio.LifoQueue() |
The asyncio.LifoQueue takes one constructor argument which is “maxsize“, set to zero (no limit) by default.
For example:
1 2 3 |
... # create a queue with no size limit queue = asyncio.LifoQueue(maxsize=0) |
We can set a size limit on the queue.
The effect of a size limit means that when the queue is full and coroutines attempt to add an object, they will block until space becomes available, or fail if a non-blocking method is used.
For example:
1 2 3 |
... # create a queue with a size limit queue = asyncio.LifoQueue(maxsize=100) |
Because the “maxsize” is the first position argument, we don’t need to specify it by name.
For example:
1 2 3 |
... # create a queue with a size limit queue = asyncio.LifoQueue(100) |
Add Items to Asyncio LifoQueue
Python objects can be added to a queue via the put() method.
This is in fact a coroutine that must be awaited. The reason is that the calling coroutine may block if the queue is full.
For example:
1 2 3 |
... # add an object to the queue await queue.put(item) |
An item can also be added to the queue without blocking via the put_nowait() method.
This method is not a coroutine and will either add the item or return immediately or fail with an asyncio.QueueFull exception if the queue is full and the item cannot be added.
For example:
1 2 3 4 5 6 |
... try: # attempt to add an item queue.put_nowait(item) except asyncio.QueueFull: # ... |
Get Items from Asyncio LifoQueue
Items can be retrieved from the queue by calling the get() method.
This meth is in fact a coroutine that must be awaited. The reason is that the queue may not have any items to retrieve at the time, and the calling coroutine may need to block until an item becomes available.
For example:
1 2 3 |
... # retrieve an item from the queue item = await queue.get() |
This will return the last items added, not the first items added, e.g. LIFO ordering.
An item can be retrieved from the queue without blocking via the get_nowait() method.
This method is not a coroutine and will return an item immediately if available, otherwise will fail with an asyncio.QueueEmpty exception.
For example:
1 2 3 4 5 6 |
... try: # attempt retrieve an item item = queue.get_nowait() except asyncio.QueueEmpty: # ... |
Query Asyncio LifoQueue Size
We can retrieve the fixed size of the queue via the “maxsize” property.
For example:
1 2 3 |
... # report the size of the queue print(queue.maxsize) |
We can check if the queue is empty via the empty() which returns True if the queue contains no items or False otherwise.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We can also check if the queue is full via the full() method that returns True if the queue is at capacity or False otherwise.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
Asyncio LifoQueue Join and Task Done
Items on the queue can be treated as tasks that can be marked as processes by consumer coroutines.
This can be achieved by consumer coroutines retrieving items from the queue via get() or get_nowait() and once processed marking them via the task_done() method.
For example:
1 2 3 4 5 6 7 |
... # retrieve an item from the queue item = await queue.get() # process the item # ... # mark the item as processes queue.task_done() |
Other coroutines may be interested to know when all items added to the queue have been retrieved and marked as done.
This can be achieved by the coroutine awaiting the join() coroutine on the queue.
The join() coroutine will not return until all items added to the queue prior to the call have been marked as done.
For example:
1 2 3 |
... # wait for all items on the queue to be marked as done await queue.join() |
If the queue is empty or all items have already been marked as done, the joined coroutine will return immediately.
Now that we know how to use the asyncio.LifoQueue, let’s look at a worked example.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of Using an Asyncio LifoQueue
We can explore how to use the asyncio.LifoQueue class with a worked example.
In this example, we will create a producer coroutine that will generate ten random numbers and put them in the queue. We will also create a consumer coroutine that will get numbers from the queue and report their values.
The producer will timestamp each added item from 0 to 9 so we can confirm that items are removed in the correct order. The consumer will retrieve the items from the queue, block for a random fraction of a second, then report the value.
This will demonstrate the LIFO ordering of the asyncio.LifoQueue class.
Additionally, the consumer will mark each item as done. This will be helpful to the producer to know when all items have been processed so that a special shutdown signal can be sent to the consumer, called a sentinel value.
Producer Coroutine
First, we can define the function to be executed by the producer coroutine.
The task will iterate ten times in a loop.
1 2 3 4 |
... # generate work for i in range(10): # ... |
Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then pair the generated value with an integer timestamp from 0 to 9 into a tuple and put the value on the queue.
1 2 3 4 5 6 7 |
... # generate a value value = random() # create an item item = (i, value) # add to the queue await queue.put(item) |
Once the task is complete it will block on the queue until all items have been processed and marked as done by the consumer.
This can be achieved by calling the join() function.
1 2 3 |
... # wait for all items to be processed await queue.join() |
Finally, the producer will put the value None on the queue to signal to the consumer that there is no further work. This is called a Sentinel Value and is a common way for coroutines to communicate via queues to signal an important event, like a shutdown.
1 2 3 |
... # send sentinel value await queue.put(None) |
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # create an item item = (i, value) # add to the queue await queue.put(item) # wait for all items to be processed await queue.join() # send sentinel value await queue.put(None) print('Producer: Done') |
Consumer Coroutine
Next, we can define the function to be executed by the consumer coroutine.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the coroutine. Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # check for stop if item is None: break # block await asyncio.sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') |
Create LIFO Queue and Coroutines
Finally, in the main coroutine, we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = asyncio.LifoQueue() |
We can then create and run the producer and consumer coroutines and wait for them both to complete via the asyncio.gather() method.
1 2 3 |
... # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) |
Tying this together, the complete main() coroutine is listed below.
1 2 3 4 5 6 |
# entry point coroutine async def main(): # create the shared queue queue = asyncio.LifoQueue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) |
This can then be executed as the entry point for the asyncio program using the asyncio.run() function.
1 2 3 |
... # start the asyncio program asyncio.run(main()) |
Complete Example
Tying this together, the complete example of using the asyncio.LifoQueue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# SuperFastPython.com # example of using the asyncio lifo queue from time import sleep from random import random import asyncio # generate work async def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # create an item item = (i, value) # add to the queue await queue.put(item) # wait for all items to be processed await queue.join() # send sentinel value await queue.put(None) print('Producer: Done') # consume work async def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = await queue.get() # check for stop if item is None: break # block await asyncio.sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') # entry point coroutine async def main(): # create the shared queue queue = asyncio.LifoQueue() # run the producer and consumers await asyncio.gather(producer(queue), consumer(queue)) # start the asyncio program asyncio.run(main()) |
Running the example first creates the shared asyncio.LifoQueue instance.
The producer coroutine and consumer coroutines are configured and started and the main coroutine blocks until the new coroutines terminate.
Next, the producer coroutine generates a new random value each iteration of the task and adds it to the queue with a timestamp. The producer coroutine does not block so it likely adds all of its values to the queue before the consumer coroutine starts processing.
The producer coroutine finishes all of its items on the queue and then blocks on the queue until all work has been marked as done.
The consumer coroutine waits on the queue for items to arrive, then consumes them one at a time. It blocks for a moment, then reports the value. The consumer coroutine processes all values in the queue in LIFO order, indicated by the timestamp values counting down from 9 back to zero. All work units are marked as done.
The producer is notified that all work units are done, then sends a None value to signal to the consumer that no further work units are expected, then terminates.
The consumer coroutine gets the None value, breaks its loop, and also terminates.
This highlights how the asyncio.LifoQueue can be used to share data easily between a producer and consumer coroutines.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see that indeed the messages added to the queue were processed in LIFO order with the last message processed first and the first message processed last.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Producer: Running Consumer: Running >got (9, 0.7831579524499971) >got (8, 0.8240354614939776) >got (7, 0.7219086709369879) >got (6, 0.520199452967734) >got (5, 0.1092602643714582) >got (4, 0.9290940031420288) >got (3, 0.3850379796514597) >got (2, 0.19532994102354484) >got (1, 0.45846638692268327) >got (0, 0.5752086997693613) Producer: Done Consumer: Done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to use an asyncio FIFO queue in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Mateus Maia on Unsplash
Do you have any questions?