Last Updated on September 12, 2022
You can use a thread-safe queue via the queue.Queue class.
In this tutorial, you will discover how to use a thread-safe queue in Python.
Let’s get started.
Need for a Queue
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guide:
In concurrent programming, we often need to share data between threads.
One approach to sharing data is to use a queue data structure.
Python provides a number of thread-safe queues in the queue module, such as the queue.Queue class.
What is the Queue and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
How to Use the Queue
Python provides a thread-safe queue in the queue.Queue class.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.
— queue — A synchronized queue class
Thread-safe means that it can be used by multiple threads to put and get items concurrently without a race condition.
The queue.Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved. This is opposed to other queue types such as last-in, first-out and priority queues.
Create a Queue
The queue.Queue can be used by first creating an instance of the class. This will create an unbounded queue by default, that is, a queue with no size limit.
For example:
1 2 3 |
... # created an unbounded queue queue = queue.Queue() |
A create can be created with a size limit by specifying the “maxsize” argument with a value larger than zero.
For example:
1 2 3 |
... # created a size limited queue queue = queue.Queue(maxsize=100) |
Add Items to the Queue
Once a size limited queue is full, new items cannot be added and calls to put() will block until space becomes available.
Items can be added to the queue via a call to put(), for example:
1 2 3 |
... # add an item to the queue queue.put(item) |
By default, the call to put() will block and will not use a timeout.
For example, the above is equivalent to the following:
1 2 3 |
... # add an item to the queue queue.put(item, block=True, timeout=None) |
The call to put() will block if the queue is full. We can choose to not block when adding items by setting the “block” argument to False. If the queue is full, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put(item, block=False) except queue.Full: # ... |
This can also be achieved with the put_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue without blocking try: queue.put_nowait(item) except queue.Full: # ... |
Alternatively we may wish to add an item to a size limited queue and block with a timeout. This can be achieved by setting the “timeout” argument to a positive value in seconds. If an item cannot be added before the timeout expires, then a queue.Full exception will be raised which may be handled.
For example:
1 2 3 4 5 6 |
... # add an item to a size limited queue with a timeout try: queue.put(item, timeout=5) except queue.Full: # ... |
Get Items From The Queue
Items can be retrieved from the queue by calls to get().
For example:
1 2 3 |
... # get an item from the queue item = queue.get() |
By default, the call to get() will block until an item is available to retrieve from the queue and will not use a timeout. Therefore, the above call is equivalent to the following:
1 2 3 |
... # get an item from the queue item = queue.get(block=True, timeout=0) |
We can retrieve items from the queue without blocking by setting the “block” argument to False. If an item is not available to retrieve, then a queue.Empty exception will be raised and may be handled.
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get(block=False) except queue.Empty: # ... |
This can also be achieved with the get_nowait() function that does the same thing.
For example:
1 2 3 4 5 6 |
... # get an item from the queue without blocking try: item = queue.get_nowait() except queue.Empty: # ... |
Alternatively, we may wish to retrieve items from the queue and block with a time limit. This can be achieved by setting the “timeout” argument to a positive value in seconds. If the timeout expires before an item can be retrieved, then a queue.Empty exception will be raised and may be handled.
For example:
1 2 3 4 5 6 |
... # get an item from the queue with a timeout try: item = queue.get(timeout=10) except queue.Empty: # ... |
Query Queue Size
The number of items in the queue can be checked by the qsize() function.
For example:
1 2 3 |
... # check the size of the queue size = queue.qsize() |
We can check if the queue contains no values via the empty() function.
For example:
1 2 3 4 |
... # check if the queue is empty if queue.empty(): # ... |
We may also check if the queue is full, if it is size limited when configured.
For example:
1 2 3 4 |
... # check if the queue is full if queue.full(): # ... |
Note, the state of the queue may change immediately after any of these queries, so care must be taken to avoid a race condition.
For example, this would not be thread-safe:
1 2 3 4 5 |
... # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
Although not recommended, you could acquire the queue.Queue classes mutex to make the operation thread-safe.
This would prevent any other thread from changing the state of the queue while performing your operation.
For example:
1 2 3 4 5 6 7 |
... # acquire the lock on the queue with queue.mutex: # check if the queue has space if not queue.full(): # add an item to the queue queue.put_nowait(item) |
Queue Join and Task Done
An object added to a queue by representing a task or a unit of work.
When a consumer thread calls get() to retrieve the item from the queue, it may need to process it before the task is considered complete.
Once complete, the thread may then call the task_done() method on the queue to indicate that the item that was just retrieved has been completely processed.
For example:
1 2 3 4 5 6 7 |
... # get a task item = queue.get() # process it # ... # mark the task as completed queue.task_done() |
This is helpful to other threads that may be interested to know once all tasks that have been added to the queue have been processed completely.
Other threads can wait for all tasks currently on the to be completed by calling the join() function.
For example:
1 2 3 |
... # wait for all current tasks on the queue to be marked as done queue.join() |
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
— Queue Objects, queue — A synchronized queue class
If the task_done() function is called more times than there are items on the queue, then a ValueError will be raised to indicate the invalid state.
If the join() function is called after all tasks have been marked done, then the function will return immediately.
Now that we know how to use the queue.Queue, let’s look at some worked examples.
Example of Using a Queue
We can explore how to use the queue.Queue class with a worked example.
In this example, we will create a producer thread that will generate ten random numbers and put them on the queue. We will also create a consumer thread that will get numbers from the queue and report their values.
The queue.Queue provides a way to allow these producer and consumer threads to communicate data with each other.
First, we can define the function to be executed by the producer thread.
The task will iterate ten times in a loop. Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then block for that fraction of a second, then put the value on the queue.
Once the task is complete it will put the value None on the queue to signal to the consumer that there is no further work.
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') |
Next, we can define the function to be executed by the consumer thread.
The task will loop forever. Each iteration, it will get an item from the queue and block if there is no item yet available.
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the thread. Otherwise, the value is reported.
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Finally, in the main thread we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = Queue() |
We can then configure and start the consumer thread, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() |
Then we can configure and start the producer thread, which will generate work and add it to the queue for the consumer to retrieve.
1 2 3 4 |
... # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() |
The main thread will then block until both the producer and consumer threads terminate, then terminate itself.
1 2 3 4 |
... # wait for all threads to finish producer.join() consumer.join() |
Tying this together, the complete example of using the queue.Queue is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# SuperFastPython.com # example of using the queue from time import sleep from random import random from threading import Thread from queue import Queue # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # create the shared queue queue = Queue() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all threads to finish producer.join() consumer.join() |
Running the example first creates the shared queue.Queue instance.
Next, the consumer thread is created and passed the queue instance. Then the producer thread is started and the main thread blocks until the worker threads terminate.
The producer thread generates a new random value each iteration of the task, blocks and adds it to the queue. The consumer thread waits on the queue for items to arrive, then consumes them one at a time, reporting their value.
Finally, the producer task finishes, a None value is put on the queue and the thread terminates. The consumer thread gets the None value, breaks its loop and also terminates.
This highlights how the queue.Queue can be used to share data easily between a producer and consumer threads.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Consumer: Running Producer: Running >got 0.85752270025858 >got 0.059777468920459875 >got 0.06938782672451504 >got 0.3457155007416144 >got 0.21026741695523765 >got 0.007632273698975234 >got 0.6916144122030056 >got 0.4141278307054348 >got 0.017832304211696903 Producer: Done >got 0.2948033525485403 Consumer: Done |
Next, let’s look at how we might get values from the queue without blocking.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
Using a Queue Without Blocking
We can get values from the queue.Queue without blocking.
This might be useful if we wish to use busy waiting in the consumer task to check other states or perform other tasks while waiting for data to arrive on the queue.
You can learn more about busy waiting in the tutorial:
We can update the example from the previous section to get items from the queue without blocking.
This can be achieved by setting the “blocking” argument to False when calling get().
For example:
1 2 3 |
... # get a value from the queue without blocking item = queue.get(block=False) |
The get() function will return immediately.
If there is a value in the queue to retrieve, then it is returned. Otherwise, if the queue is empty, then an queue.Empty exception is raised, which can be handled.
In this case, if there is no value to get from the queue, we report a message and sleep for a fraction of a second. We will then continue which will jump back to the start of the consumer busy waiting loop.
For example:
1 2 3 4 5 6 7 8 |
... # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...') sleep(0.5) continue |
The updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...') sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of using the queue without blocking from time import sleep from random import random from threading import Thread from queue import Queue from queue import Empty # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(block=False) except Empty: print('Consumer: got nothing, waiting a while...') sleep(0.5) continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # create the shared queue queue = Queue() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all threads to finish producer.join() consumer.join() |
Running the example creates the shared queue.Queue, then starts the consumer and producer threads, as before.
The producer thread will generate, block and add items to the queue.
The consumer thread will attempt to get a value from the queue. If there is no value to retrieve, a queue.Empty exception is raised and handled by reporting a message, sleeping for a fraction of a second then starting the busy wait loop again.
Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see the messages from the consumer thread busy waiting for new data to arrive in the queue.
This highlights how to get items from the queue.Queue without blocking.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Consumer: Running Consumer: got nothing, waiting a while... Producer: Running >got 0.14323615162716996 Consumer: got nothing, waiting a while... >got 0.5767637686794239 Consumer: got nothing, waiting a while... >got 0.5537675506277863 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.726352636610821 Consumer: got nothing, waiting a while... >got 0.7929787922412898 Consumer: got nothing, waiting a while... >got 0.3767920195107032 Consumer: got nothing, waiting a while... Consumer: got nothing, waiting a while... >got 0.8565665596301556 Consumer: got nothing, waiting a while... Producer: Done >got 0.4825189093948836 >got 0.08511852863104696 >got 0.0350239896827691 Consumer: Done |
Next, let’s look at how we might get values from the queue with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Using a Queue With a Timeout
We can get values from the queue.Queue by blocking but limited by a timeout.
This allows a consumer thread to both block while waiting for values to arrive in the queue, but also execute other tasks while busy waiting. It may be more efficient than being busy waiting without any blocking calls.
We can update the above example to use a timeout when getting items from the queue in the consumer thread.
This can be achieved by calling the get() function and specifying a timeout value in seconds.
For example:
1 2 3 |
... # get a value and block for a timeout item = queue.get(timeout=0.5) |
If a value is available, the function will return immediately with the value.
Otherwise, if no value is available within the timeout, then a queue.Empty exception is raised which may be handled. In this case, we will handle the exception by reporting a message and starting the busy-wait loop again.
1 2 3 4 5 6 7 |
... # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...') continue |
Tying this together, the updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...') continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# SuperFastPython.com # example of using the queue with a timeout from time import sleep from random import random from threading import Thread from queue import Queue from queue import Empty # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done queue.put(None) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work try: item = queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...') continue # check for stop if item is None: break # report print(f'>got {item}') # all done print('Consumer: Done') # create the shared queue queue = Queue() # start the consumer consumer = Thread(target=consumer, args=(queue,)) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all threads to finish producer.join() consumer.join() |
Running the example creates the shared queue.Queue, then starts the consumer and producer threads, as before.
The producer thread will generate, block and add items to the queue.
The consumer thread will attempt to get a value from the queue. The thread will block for a timeout.
If no value is available before the timeout expires, then a queue.Empty exception is raised and handled by reporting a message then starting the busy wait loop again. Otherwise, if there is a value in the queue, the consumer will retrieve it and report it as per normal.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
We can see the messages from the consumer thread busy waiting for new data to arrive in the queue.
This highlights how to get items from the queue.Queue by blocking with a timeout.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Consumer: Running Producer: Running Consumer: gave up waiting... >got 0.7138275468445925 >got 0.3358802728370527 >got 0.33205578336152375 >got 0.47029718362121775 >got 0.20958152874020686 >got 0.29239630051529786 >got 0.011483347112813846 >got 0.1215998738213061 Consumer: gave up waiting... >got 0.8984151759427139 Producer: Done >got 0.3474700844158405 Consumer: Done |
Next, let’s look at how we might wait on the queue and mark tasks as completed in the queue.
Using a Queue Join and Task Done
In the previous examples we have sent a special message into the queue to indicate that all tasks are done.
An alternative approach is to have threads wait on the queue directly and to have the consumer thread mark tasks as done.
This can be achieved via the join() and task_done() functions on the queue.Queue.
The producer thread can be updated to no longer send a None value into the queue to indicate no further tasks.
The updated version of the producer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done print('Producer: Done') |
The consumer thread can be updated to no longer check for None messages, and to mark each task as completed via a call to task_done().
The updated version of the consumer() function with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 11 |
# consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # report print(f'>got {item}') # mark the task as done queue.task_done() |
The producer thread will run until there are no longer any tasks to add to the queue and will terminate. The consumer thread will now run forever.
This means the Python process will not exit cleanly after all tasks are completed.
To achieve this, we can mark the consumer thread as a background daemon thread by setting the “daemon” argument to True when configuring it.
For example:
1 2 3 4 |
... # start the consumer consumer = Thread(target=consumer, args=(queue,), daemon=True) consumer.start() |
The Python process will exit once all non-daemon threads have terminated, including the producer thread and the main thread, but not the consumer thread.
You can learn more about daemon threads in this tutorial:
Finally, the main thread no longer needs to wait on the producer and consumer threads to terminate before terminating itself.
Instead, it can wait on the queue itself and stop waiting once the consumer has processed all tasks.
1 2 3 |
... # wait for all tasks to be processed queue.join() |
This may result in a race condition, as it is possible for no items to be added to the queue before the main thread reaches this line, in which case the main thread will terminate immediately.
Therefore, we can have the main thread first wait on the producer thread to complete, in which case we know all work has been added to the queue, and then wait on the queue itself for that work to be processed.
1 2 3 4 |
... # wait for all tasks to be processed producer.join() queue.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# SuperFastPython.com # example of using join and task_done queue from time import sleep from random import random from threading import Thread from queue import Queue # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) # all done print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # report print(f'>got {item}') # mark the task as done queue.task_done() # create the shared queue queue = Queue() # start the consumer consumer = Thread(target=consumer, args=(queue,), daemon=True) consumer.start() # start the producer producer = Thread(target=producer, args=(queue,)) producer.start() # wait for all tasks to be processed producer.join() queue.join() |
Running the example creates the shared queue.Queue, then starts the consumer and producer threads, as before.
The producer thread will generate, block and add items to the queue.
The consumer thread will attempt to get a value from the queue.
Once the producer thread has added all of its ten items, it will terminate. The consumer thread will run forever as a daemon thread in the background.
Once the producer thread has terminated, the main thread will then block on the queue. Once the consumer thread has processed all items in the queue, the main thread will then progress and terminate. The Python process will then terminate with the consumer thread still running, but no work to process.
A sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
This highlights how to mark tasks as completed in the queue and how to wait on the queue for all work to be completed.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Consumer: Running Producer: Running >got 0.5768459464031224 >got 0.5387029732694203 >got 0.05103270659146786 >got 0.21472900665702122 >got 0.80048102695732 >got 0.9177938211361101 >got 0.5310976298447865 >got 0.31694201291521085 >got 0.48620645142849794 Producer: Done >got 0.05241988742861203 |
Next, let’s look at how we might work with a limited size queue.
Example of a Limited Sized Queue
We can limit the capacity of the queue.
This can be helpful if we have a large number of producers or slow consumers. It allows us to limit the number of tasks that may be in memory at any one time, limiting overall memory usage of the application.
This can be achieved by setting the “maxsize” argument in the constructor to the queue when configuring it.
For example:
1 2 3 |
... # limit the queue to 20 items queue = queue.Queue(maxsize=20) |
When the queue is full, calls to put() will block until a position becomes available to place another item on the queue.
We can demonstrate this by updating the example in the previous section to limit the size of the queue to a small number, like 2 items, have a modest number of producers, e.g. 5, attempt to populate the queue. This will keep the queue full most of the time forcing the producers to block when adding items.
Firstly, we can specify the capacity of the queue when it is constructed.
1 2 3 |
... # create the shared queue queue = Queue(2) |
Next, we can create five producer threads that will all add attempt tasks to the queue at the same time.
This can be done in a list comprehension, after which we can iterate the list to start the threads, then iterate the same list to block on the threads until they terminate.
1 2 3 4 5 6 7 8 |
... # start 5 producers producers = [Thread(target=producer, args=(queue,)) for _ in range(5)] for producer in producers: producer.start() # wait for all producers to finish for producer in producers: producer.join() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# SuperFastPython.com # example of using a queue with a limited capacity from time import sleep from random import random from threading import Thread from queue import Queue # generate work def producer(queue): print('Producer: Running') # generate work for i in range(10): # generate a value value = random() # block sleep(value) # add to the queue queue.put(value) print('Producer: Done') # consume work def consumer(queue): print('Consumer: Running') # consume work while True: # get a unit of work item = queue.get() # report print(f'>got {item}') # mark as completed queue.task_done() # all done print('Consumer: Done') # create the shared queue queue = Queue(2) # start the consumer consumer = Thread(target=consumer, args=(queue,), daemon=True) consumer.start() # start 5 producers producers = [Thread(target=producer, args=(queue,)) for _ in range(5)] for producer in producers: producer.start() # wait for all producers to finish for producer in producers: producer.join() # wait for all work to be processed queue.join() |
Running the example first starts the daemon consumer thread, then starts all five non-daemon producer threads.
The main thread then blocks until the producer threads finish.
Each of the five producer threads then attempt to add ten items each to the queue as fast as they are able. The capacity of the queue is so limited that these threads will block most of the time on their calls to put() until a position becomes available to add a new item to the queue.
The consumer thread will consume items from the queue as fast as it can, report their values and mark them as done. The thread will run forever as a background daemon thread.
Once the producer threads finish, the main thread wakes up and then blocks on the queue, waiting for all remaining tasks to be consumed and marked done by the consumer thread.
A truncated sample output of the program is listed below. Note, your specific output will differ given the use of random numbers.
This highlights how to use a limited capacity queue.Queue and how to combine this with marking tasks as completed and joining the queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
... >got 0.9877054143810615 >got 0.03814093447616407 Producer: Done >got 0.8169902676615277 >got 0.49280756305486806 >got 0.3265465525372153 >got 0.8425179777275013 >got 0.02703277989933861 >got 0.4488251391999387 >got 0.02153494221923058 >got 0.6886137371165443 >got 0.47059809894098237 >got 0.3053977952093049 >got 0.30392121610327416 Producer: Done >got 0.020133360919975862 >got 0.78861829251425 >got 0.7655923815757242 >got 0.9222692404875706 Producer: Done >got 0.5258917296823923 Producer: Done >got 0.6725687445562611 >got 0.8983776149827499 Producer: Done >got 0.42104528678479713 |
Common Questions
This section lists some common questions when using a queue.Queue in Python.
Do you have a question about the queue?
Let me know in the comments below and I may add it to this section with my answer.
What is a Queue?
A queue is a type of data structure.
Items in the queue are ordered and the queue supports efficient operations for adding items to the end and retrieving items from the front.
There are many queue types based on the order of the items maintained within the queue, such as:
- First in, First out order or FIFO.
- List in, First out order or LIFO.
- Priority order.
What is FIFO?
FIFO is an acronym for the order of items in a queue.
It stands for: “First In, First Out”
This order means that the next item retrieved from the list will be the first item in the list that was added.
In a FIFO queue, the first tasks added are the first retrieved.
— queue — A synchronized queue class
If we think of each item added to the list as having a timestamp, then the oldest item on the list will be the next item removed and returned when requested.
For example, if the following items were added ‘A’, ‘B’, ‘C’, for example:
1 2 3 4 |
... queue.put('A') queue.put('B') queue.put('C') |
Then the items will be retrieved in the order ‘A’, ‘B’, ‘C’, for example:
1 2 3 4 |
... value = item.get() # A value = item.get() # B value = item.get() # C |
Is the Queue Thread-Safe?
Yes.
The queue.Queue class is thread-safe.
This means that multiple threads may call get() and/or put() concurrently and the internal state of the queue will be maintained correctly without corruption.
It also means that queries of the queue like qsize(), empty() and full() will report the correct state of the queue at the time of the function call.
Why Not Use a List?
A queue.Queue provides more functionality than a Python list.
Many operations on a Python list are atomic and therefore thread-safe. This includes appending items to the list and popping items from the list, the two actions needed to use the list like a queue.
There are three reasons why you should use a queue.Queue instead of a list, they are:
- The queue.Queue protects state using a mutex, a list does not.
- The queue.Queue allows threads to block while waiting for a value via get(), a list does not.
- The queue.Queue allows threads to block while adding for a value via put(), a list does not.
The list does not protect the internal data structure with a mutual exclusion (mutex) lock. This may be required if the Python interpreter, Python byte codes, or Python virtual machine changes in the future, or if an alternate Python interpreter is used. The queue.Queue is specifically designed to be thread-safe and uses a mutex lock internally to protect the state of the queue.
The list does not provide the ability for threads to block on the queue and be notified when a new item arrives. This requires the use of a threading.Condition object, which is implemented within the queue.Queue.
Additionally, the list does not provide the ability to limit the maximum size of the data structure and block threads from adding new values until a position becomes available. This too requires a threading.Condition, which is implemented within the queue.Queue.
Why Not Use a deque?
Python provides a queue data structure via the deque object.
The deque data structure does provide atomic operations for adding and getting values from the queue.
As such, these operations are thread-safe under the current Python interpreter.
Nevertheless, unlike the queue.Queue, it was not designed to be thread-safe and does not provide three key capabilities needed for multithreaded programming that are provided by the queue.Queue.
These are the same limitations not provided by list objects, described above. Specifically:
- The queue.Queue protects state using a mutex, a deque does not.
- The queue.Queue allows threads to block while waiting for a value via get(), a deque does not.
- The queue.Queue allows threads to block while adding for a value via put(), a deque does not.
Why Not Use queue.SimpleQueue?
A queue.SimpleQueue is a thread-safe queue.
It provides a limited interface compared to the queue.Queue class.
Specifically:
- It does not provide the ability to limit the capacity of the queue.
- It does not provide a way to check if the queue is full, because its size is unbounded.
- It does not provide join() and task_done() capabilities.
How to Tell Consumers That There Are No More Items?
Producer thread can tell consumer threads that no more items are expected by sending a special message.
This is called a sentinel object or sentinel value.
A common approach is for consumers to send the value None.
For example:
1 2 3 |
... # indicate to consumers that there are no further messages queue.put(None) |
If a consumer gets this message, it knows that no further messages are expected and to shutdown.
For example:
1 2 3 4 5 6 7 |
... # get the next value item = queue.get() # check if no further values are expected if item is None: # terminate the thread return |
If there are multiple consumers that need to get the message, then a consumer may get the None item, and re-add it for other consumers to consume and respond to.
For example:
1 2 3 4 5 6 7 8 9 |
... # get the next value item = queue.get() # check if no further values are expected if item is None: # re-add the message for other consumers queue.put(item) # terminate the thread return |
Does queue.Queue Support Peek?
No.
A peek operation allows a consumer to check what the next retrieved value will be without removing it.
The queue.Queue does not provide this capability.
A peek-like operation could be simulated with a get() and a put().
For example:
1 2 3 4 5 6 |
... # simulate a peek item = queue.get() # do something # ... queue.put(item) |
This approach is not recommended.
The item removed and re-added will not be the next item retrieved from the queue, therefore, this technically is not a peek operation.
It may also trigger undesirable side-effects for other threads waiting on the queue with a join().
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use the queue with threads in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by vikram sundaramoorthy on Unsplash
Do you have any questions?