Last Updated on September 12, 2022
You can use a thread-safe LIFO queue via the queue.LifoQueue class.
In this tutorial you will discover how to use the LIFO queue in Python.
Let’s get started.
Need for a LIFO Queue
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guide:
In concurrent programming, we often need to share data between threads.
One approach to sharing data is to use a queue data structure.
Python provides a number of thread-safe queues in the queue module, such as the queue.LifoQueue class.
What is LIFO and what is the LifoQueue and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
LIFO vs FIFO Ordering
In order to understand LIFO ordering, let’s contrast it with FIFO ordering.
A queue is a data structure for maintaining a linear sequence of items.
The difference between queues is the order in which items are maintained. Specifically, the order in which items are returned by calls to get() relative to the order in which they were added via calls to put().
Two common queue orderings are LIFO and FIFO.
LIFO
The queue.LifoQueue class in Python maintains items in a LIFO ordering.
LIFO is an acronym that stands for: Last In, First Out.
This means that the call to get() will return the last item added to the queue via put().
In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack).
— queue — A synchronized queue class
This means the queue.LifoQueue class works like a stack.
Consider if the LifoQueue added the following three items ‘A’, ‘B’, and ‘C’.
For example:
1 2 3 4 5 |
... # add 3 items to a lifo queue queue.put('A') queue.put('B') queue.put('C') |
Removing items from this queue in LIFO order would result in the items returning ‘C’, ‘B’, ‘A’.
For example:
1 2 3 4 5 |
... # get 3 items from a lifo queue item = queue.get() # C item = queue.get() # B item = queue.get() # A |
FIFO
The queue.Queue class maintains items in a LIFO ordering.
LIFO ordering refers to: First-in, First-Out.
This means that the call to get() will return the first item added to the queue via put(), for example the oldest item on the queue.
In a FIFO queue, the first tasks added are the first retrieved.
— queue — A synchronized queue class
This means the queue.Queue class works like a list.
Consider if the Queue added the following three items ‘A’, ‘B’, and ‘C’.
For example:
1 2 3 4 5 |
... # add 3 items to a fifo queue queue.put('A') queue.put('B') queue.put('C') |
Removing items from this queue in FIFO order would result in the items returning ‘A’, ‘B’, ‘C’.
For example:
1 2 3 4 5 |
... # get 3 items from a fifo queue item = queue.get() # A item = queue.get() # B item = queue.get() # C |
Next, let’s look at some uses of LIFO ordering.
When to use a LIFO Queue
To better understand LIFO ordering, let’s look at some examples of where it may be used.
A LIFO queue or Stack is useful in many programming situations.
Five common examples include:
- Undo: Maintaining changes to an object so that they can be undone or reversed in the order they were applied.
- Depth-First: Maintaining a list of nodes when navigating a tree or graph in a depth-first manner.
- Parsers: Maintain a list of expressions in the order they must be executed.
- Backtracking: Maintain a list of options in the order they were encountered or made available, in case the current option fails.
- Freshness: Maintain a list of connections or data in the reverse order they were used so the most recent can be acquired when needed.
Also, stack-based processing of data is a common building block in many algorithms used in computer science, such as depth-first search, branch and bound, and many more.
Next, let’s look at how we might use a LIFO queue in Python.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
How to Use the LifoQueue
Python provides a thread-safe LIFO queue in the queue.LifoQueue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.
— queue — A synchronized queue class
Thread-safe means that it can be used by multiple threads to put and get items concurrently without a race condition.
The LifoQueue maintains items in the queue in Last In, First Out or LIFO order, as its name suggests.
Create a LifoQueue
The queue.LifoQueue can be used by first creating an instance of the class. This will create an unbounded queue by default, that is, a queue with no size limit.
For example:
1 2 3 |
... # created an unbounded queue queue = queue.LifoQueue() |
A queue can be created with a size limit by specifying the “maxsize” argument to a value larger than zero.
For example:
1 2 3 |
... # created a size limited queue queue = queue.LifoQueue(maxsize=100) |
Add Items to the LifoQueue
Once a size limited queue is full, new items cannot be added and calls to put() will block until space becomes available.
Items can be added to the queue via a call to put(), for example:
1 2 3 |
... # add an item to the queue queue.put(item) |
By default, the call to put() will block and will not use a timeout.
For example, the above is equivalent to the following:
1 2 3 |
... # add an item to the queue queue.put(item, block=True, timeout=None) |
The call to put() will block if the queue is full. We can choose to not block when adding items by setting the “block” argument to False. If the queue is full, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put(item, block=False) except queue.Full: # ... |
This can also be achieved with the put_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put_nowait(item) except queue.Full: # ... |
Alternatively we may wish to add an item to a size limited queue and block with a timeout. This can be achieved by setting the “timeout” argument to a positive value in seconds. If an item cannot be added before the timeout expires, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue with a timeout try: queue.put(item, timeout=5) except queue.Full: # ... |
Get Items From The LifoQueue
Items can be retrieved from the queue by calls to get().
For example:
1 2 3 |
... # get an item from the queue item = queue.get() |
By default, the call to get() will block until an item is available to retrieve from the queue and will not use a timeout. Therefore, the above call is equivalent to the following:
1 2 3 |
... # get an item from the queue item = queue.get(block=True, timeout=0) |
We can retrieve items from the queue without blocking by setting the “block” argument to False. If an item is not available to retrieve, then a queue.Empty exception will be raised and may be handled.
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get(block=False) except queue.Empty: # ... |
This can also be achieved with the get_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get_nowait() except queue.Empty: # ... |
Alternatively, we may wish to retrieve items from the queue and block with a time limit. This can be achieved by setting the “timeout” argument to a positive value in seconds. If the timeout expires before an item can be retrieved, then a queue.Empty exception will be raised and may be handled.
For example:
1 2 3 4 5 6 |
... # get an item from the queue with a timeout try: item = queue.get(timeout=10) except queue.Empty: # ... |
Query LifoQueue Size
The number of items in the queue can be checked by the qsize() function.
For example:
1 2 3 |
... # check the size of the queue size = queue.qsize() |
We can check if the queue contains no values via the empty() function.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We may also check if the queue is full, if it is size limited when configured.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
Note, the state of the queue may change immediately after any of these queries, so care must be taken to avoid a race condition.
For example, this would not be thread-safe:
1 2 3 4 5 |
... # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
Although not recommended, you could acquire the queue.LifoQueue classes mutex to make the operation thread-safe.
This would prevent any other thread from changing the state of the queue while performing your operation.
For example:
1 2 3 4 5 6 7 |
... # acquire the lock on the queue with queue.mutex: # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
LifoQueue Join and Task Done
An object added to a queue by representing a task or a unit of work.
When a consumer thread calls get() to retrieve the item from the queue, it may need to process it before the task is considered complete.
Once complete, the thread may then call the task_done() method on the queue to indicate that the item that was just retrieved has been completely processed.
For example:
1 2 3 4 5 6 7 |
... # get a task item = queue.get() # process it # ... # mark the task as completed queue.task_done() |
This is helpful to other threads that may be interested to know once all tasks that have been added to the queue have been processed completely.
Other threads can wait for all tasks currently on the to be completed by calling the join() function.
For example:
1 2 3 |
... # wait for all current tasks on the queue to be marked as done queue.join() |
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
— Queue Objects, queue — A synchronized queue class
If the task_done() function is called more times than there are items on the queue, then a ValueError will be raised to indicate the invalid state.
If the join() function is called after all tasks have been marked done, then the function will return immediately.
Now that we know how to use the queue.LifoQueue, let’s look at a worked example.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Using a LifoQueue
We can explore how to use the queue.LifoQueue class with a worked example.
In this example, we will create a producer thread that will generate ten random numbers and put them on the queue. We will also create a consumer thread that will get numbers from the queue and report their values.
The producer will timestamp each added item from 0 to 9 so we can confirm that items are removed in the correct order. The consumer will retrieve the items from the queue, block for a random fraction of a second, then report the value.
This will demonstrate the LIFO ordering and multithreaded nature of the queue.LifoQueue class.
Additionally, the consumer will mark each item as done. This will be helpful to the producer to know when all items have been processed so that a special shutdown signal can be sent to the consumer, called a sentinel value.
Producer Thread
First, we can define the function to be executed by the producer thread.
The task will iterate ten times in a loop.
1 2 3 4 |
... # generate work for i in range(10): # ... |
Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then pair the generated value with an integer timestamp from 0 to 9 into a tuple and put the value on the queue.
1 2 3 4 5 6 7 |
... # generate a value value = random() # create an item item = (i, value) # add to the queue queue.put(item) |
Once the task is complete it will block on the queue until all items have been processed and marked as done by the consumer.
This can be achieved by calling the join() function.
1 2 3 |
... # wait for all items to be processed queue.join() |
Finally, the producer will put the value None on the queue to signal to the consumer that there is no further work. This is called a Sentinel Value and is a common way for threads to communicate via queues to signal an important event, like a shutdown.
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # create an item item = (i, value) # add to the queue queue.put(item) # wait for all items to be processed queue.join() # send sentinel value queue.put(None) print('Producer: Done') |
Consumer Thread
Next, we can define the function to be executed by the consumer thread.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the thread. Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # block sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') |
Create LIFO Queue and Threads
Finally, in the main thread we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = LifoQueue() |
Then we can configure and start the producer thread, which will generate work and add it to the queue for the consumer to retrieve.
1 2 3 4 |
... # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() |
We can then configure and start the consumer thread, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() |
The main thread will then block until both the producer and consumer threads terminate, then terminate itself.
1 2 3 4 |
... # wait for all threads to finish producer.join() consumer.join() |
Complete Example
Tying this together, the complete example of using the queue.LifoQueue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of using the lifo queue from time import sleep from random import random from threading import Thread from queue import LifoQueue # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # create an item item = (i, value) # add to the queue queue.put(item) # wait for all items to be processed queue.join() # send sentinel value queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # block sleep(item[1]) # report print(f'>got {item}') # mark it as processed queue.task_done() # all done print('Consumer: Done') # create the shared queue queue = LifoQueue() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # wait for threads to finish producer.join() consumer.join() |
Running the example first creates the shared queue.LifoQueue instance.
The producer thread and consumer threads are configured and started and the main thread blocks until the new threads terminate.
Next, the producer thread generates a new random value each iteration of the task and adds it to the queue with a timestamp. The producer thread does not block so it likely adds all of its values to the queue before the consumer thread starts processing.
The producer thread finishes all of its items on the queue and then blocks on the queue until all work has been marked as done.
The consumer thread waits on the queue for items to arrive, then consumes them one at a time. It blocks for a moment, then reports the value. The consumer thread processes all values in the queue in LIFO order, indicated by the timestamp values counting down from 9 back to zero. All work units are marked as done.
The producer is notified that all work units are done, then sends a None value to signal to the consumer that no further work units are expected, then terminates.
The consumer thread gets the None value, breaks its loop and also terminates.
This highlights how the queue.LifoQueue can be used to share data easily between a producer and consumer threads.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see that indeed the messages added to the queue were processed in LIFO order with the last message processed first and the first message processed last.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Producer: Running Consumer: Running >got (9, 0.6701514254373837) >got (8, 0.2169047154467909) >got (7, 0.9680620238543799) >got (6, 0.4839906369277093) >got (5, 0.4487681650444133) >got (4, 0.8767684241503664) >got (3, 0.07537458393406571) >got (2, 0.6564890107273412) >got (1, 0.6574401836116125) >got (0, 0.4198189286479037) Producer: Done Consumer: Done |
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use a thread-safe LIFO queue in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Glenn Hansen on Unsplash
Do you have any questions?