Last Updated on September 12, 2022
You can use a fast and simple thread-safe queue via the queue.SimpleQueue class.
In this tutorial you will discover how to use the thread-safe SimpleQueue class in Python.
Let’s get started.
Need for a Thread-Safe Simple Queue
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guide:
In concurrent programming, we often need to share data between threads.
One approach to sharing data is to use a queue.
Python provides a number of thread-safe queues in the queue module, such as the queue.SimpleQueue class.
What is the SimpleQueue and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Use the SimpleQueue
Python provides a simple thread-safe queue in the queue.SimpleQueue class.
Thread-safe means that it can be used by multiple threads to put and get items concurrently without a race condition.
The “Simple” in the name of the class refers to it providing a minimal interface compared to other queues in the “queue” module.
This minimal interface includes:
- Checking the size via qsize().
- Check if empty via empty().
- Add an item to the queue via put() and put_nowait().
- Get an item from the queue via get() and get_nowait().
Unlike the queue.Queue class, the queue.SimpleQueue does not provide the ability to limit the capacity of the queue and as such does not allow threads to block while putting items on the queue. It also does not provide the ability for consumer threads to mark tasks as done and have threads join the queue until all tasks are marked done.
Next, let’s take a closer look at this interface.
SimpleQueue Interface
We can create a new instance of the queue via the constructor which will be unbounded in size by default.
1 2 3 |
... # create a new queue queue = queue.SimpleQueue() |
Items are added to the end of the queue and retrieved from the front of the queue in a first-in, first-out or FIFO order.
We can add items to the queue by calling the put() function or the put_nowait() function.
For example:
1 2 3 |
... # add an item to the queue (without blocking) queue.put(item) |
Adding items to the queue does not block and does not use a timeout, although the function does take a “block” and “timeout” to match the queue.Queue interface, but these arguments have no effect.
Put item into the queue. The method never blocks and always succeeds (except for potential low-level errors such as failure to allocate memory). The optional args block and timeout are ignored and only provided for compatibility with Queue.put().
— SimpleQueue Objects
Therefore, put_nowait() is equivalent to put().
1 2 3 |
... # add an item to the queue without blocking queue.put_nowait(item) |
Items can be retrieved from the queue by calling the get() function.
1 2 3 |
... # get an item from the queue item = queue.get() |
By default, the call to get() will block until an item is available to be returned. Therefore, the above is equivalent to the following:
1 2 3 |
... # get an item from the queue item = queue.get(block=True, timeout=None) |
A timeout can be used when waiting for an item on the queue via the “timeout” argument. If the timeout expires before an item is available, then an queue.Empty exception is raised.
1 2 3 4 5 6 |
... # get an item from the queue with a timeout try: item = queue.get(timeout=1) except Empty: # no item... |
The “block” argument can be set false, which means the function will return immediately if there is an item to retrieve, otherwise it will raise an queue.Empty exception.
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get(block=False) except Empty: # no item... |
This is equivalent to calling the get_nowait() function, for example:
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get_nowait() except Empty: # no item... |
The number of items in the queue can be queried via the qsize() function.
For example:
1 2 3 |
... # check the size of the queue size = queue.qsize() |
We can check if the queue has no values via the empty() function.
For example
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
Next, let’s take a look at the reentrant capability of the SimpleQueue class.
SimpleQueue is Reentrant
Unlike the queue.Queue class, the get() and put() functions on the queue supports reentry.
Reentrancy in concurrency refers to a capability from concurrent programming where a function call can be interrupted in the middle of execution and another function call before the first function call has finished.
For example, a reentrant lock provided by threading.RLock class can be acquired by a thread, then acquired again by the same thread.
The queue.SimpleQueue is reentrant.
This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.
— SimpleQueue Objects
In the queue.SimpleQueue, reentrant refers to the ability to call get() while calling put(), or the reverse, calling put() while calling get().
This can happen in obscure situations.
For example, see:
Now that we know how to use the queue.SimpleQueue class, let’s look at some worked examples.
Example of Using the SimpleQueue
We can explore how to use the queue.SimpleQueue class with a worked example.
In this example, we will create a producer thread that will generate ten random numbers and put them on the queue. We will also create a consumer thread that will get numbers from the queue and report their values.
The queue.SimpleQueue provides a simple way to allow these simple producer and consumer threads to communicate data with each other.
First, we can define the function to be executed by the producer thread.
The task will iterate ten times in a loop. Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then block for that fraction of a second, then put the value on the queue.
Once the task is complete it will put the value None on the queue to signal to the consumer that there is no further work.
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') |
Next, we can define the function to be executed by the consumer thread.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the thread. Otherwise, the value is reported.
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Finally, in the main thread we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = SimpleQueue() |
We can then configure and start the consumer thread, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() |
Then we can configure and start the producer thread, which will generate work and add it to the queue for the consumer to retrieve.
1 2 3 4 |
... # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() |
The main thread will then block until both the producer and consumer threads terminate, then terminate itself.
1 2 3 4 |
... # wait for all threads to finish producer.join() consumer.join() |
Tying this together, the complete example of using the queue.SimpleQueue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# SuperFastPython.com # example of using the simple queue from time import sleep from random import random from threading import Thread from queue import SimpleQueue # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # create the shared queue queue = SimpleQueue() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all threads to finish producer.join() consumer.join() |
Running the example first creates the shared queue instance.
Next, the consumer thread is started and passed the queue instance. Then the producer thread is started and the main thread blocks until the worker threads terminate.
The producer thread generates a new random value each iteration of the task, blocks and adds it to the queue. The consumer thread waits on the queue for items to arrive, then consumes them one at a time, reporting their value.
Finally, the producer task finishes, a None value is put on the queue and the thread terminates. The consumer thread gets the None value, breaks its loop and also terminates.
This highlights how the queue.SimpleQueue can be used to share data easily between a producer and consumer threads.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Consumer: Running Producer: Running >got 0.49138927575578184 >got 0.9089938160636776 >got 0.9000414387422191 >got 0.69580688995332 >got 0.7816543465569435 >got 0.2853295879765214 >got 0.6840369305347521 >got 0.2025555428328133 >got 0.5096460791785361 Producer: Done >got 0.3847119182144745 Consumer: Done |
Next, let’s look at how we might get values from the queue without blocking.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
Using the SimpleQueue Without Blocking
We can get values from the queue without blocking.
This might be useful if we wish to use busy waiting in the consumer task to check other state or perform other tasks while waiting for data to arrive on the queue.
You can learn more about busy waiting in the tutorial:
We can update the example from the previous section to get items from the queue without blocking.
This can be achieved by setting the “blocking” argument to False when calling get().
For example:
1 2 3 |
... # get a value from the queue without blocking item = queue.get(block=False) |
The get() function will return immediately.
If there is a value in the queue to retrieve, then it is returned. Otherwise, if the queue is empty, then a queue.Empty exception is raised, which can be handled.
In this case, if there is no value to get from the queue, we report a message and sleep for a fraction of a second. We will then continue which will jump back to the start of the consumer busy waiting loop.
For example:
1 2 3 4 5 6 7 8 |
... # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...') sleep(0.5) continue |
The updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...') sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of using the simple queue without blocking from time import sleep from random import random from threading import Thread from queue import SimpleQueue from queue import Empty # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...') sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # create the shared queue queue = SimpleQueue() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all threads to finish producer.join() consumer.join() |
Running the example creates the shared queue, then starts the consumer and producer threads, as before.
The producer thread will generate, block and add items to the queue.
The consumer thread will attempt to get a value from the queue. If there is no value to retrieve, a queue.Empty exception is raised and handled by reporting a message, sleeping for a fraction of a second then starting the busy waiting loop again.
Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see the messages from the consumer thread busy waiting for new data to arrive in the queue.
This highlights how to get items from the queue.SimpleQueue without blocking.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
Consumer: Running Consumer: got nothing, waiting a while... Producer: Running Consumer: got nothing, waiting a while... >got 0.8315210847454647 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.9040040774166886 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.9654033549252191 Consumer: got nothing, waiting a while... >got 0.7305006936148903 >got 0.06463532037048014 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.8309731213746061 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.9414631757891674 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... Producer: Done >got 0.7431005827399254 >got 0.0005807148605703194 >got 0.1393792628777406 Consumer: Done |
Next, let’s look at how we might get values from the queue with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Using the SimpleQueue With a Timeout
We can get values from the queue.SimpleQueue by blocking but limited by a timeout.
This allows a consumer thread to both block while waiting for values to arrive in the queue, but also execute other tasks while busy waiting. It may be more efficient that busy waiting without any blocking calls.
We can update the above example to use a timeout when getting items from the queue in the consumer thread.
This can be achieved by calling the get() function and specifying a timeout value in seconds.
For example:
1 2 3 |
... # get a value and block for a timeout item = queue.get(timeout=0.5) |
If a value is available, the function will return immediately with the value.
Otherwise, if no value is available within the timeout, then a queue.Empty exception is raised which may be handled. In this case, we will handle the exception by reporting a message and starting the busy-wait loop again.
1 2 3 4 5 6 7 |
... # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...') continue |
Tying this together, the updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...') continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# SuperFastPython.com # example of using the simple queue with a timeout from time import sleep from random import random from threading import Thread from queue import SimpleQueue from queue import Empty # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...') continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # create the shared queue queue = SimpleQueue() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all threads to finish producer.join() consumer.join() |
Running the example creates the shared queue, then starts the consumer and producer threads, as before.
The producer thread will generate, block and add items to the queue.
The consumer thread will attempt to get a value from the queue. The thread will block for a timeout.
If no value is available before the timeout expires, then a queue.Empty exception is raised and handled by reporting a message then starting the busy wait loop again. Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see the messages from the consumer thread busy waiting for new data to arrive in the queue.
This highlights how to get items from the queue.SimpleQueue by blocking with a timeout.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Consumer: Running Producer: Running >got 0.38573432597027735 >got 0.4636838697288195 Consumer: gave up waiting... >got 0.7517639069916314 >got 0.4420844820641848 >got 0.08157221351989186 Consumer: gave up waiting... >got 0.7285944241750508 Consumer: gave up waiting... >got 0.6143809725593375 >got 0.3434913262962024 Consumer: gave up waiting... >got 0.6376752385413578 Consumer: gave up waiting... Producer: Done >got 0.568278446546111 Consumer: Done |
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use the queue.SimpleQueue in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Lital Levy on Unsplash
Do you have any questions?