Last Updated on September 12, 2022
You can mark queue tasks done via task_done() and be notified when all tasks are done via join().
In this tutorial you will discover how to use queue task done and join in Python.
Let’s get started.
Need To Know When All Tasks are Done
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guide:
Threads can share data with each other using thread-safe queues, such as the queue.Queue class.
A problem when using queues is knowing when all items in the queue have been processed by consumer threads.
How can we know when all items have been processed in a queue?
Run loops using all CPUs, download your FREE book to learn how.
Why Care When All Tasks Are Done
There are many reasons why a thread may want to know when all tasks in a queue have been processed.
For example:
- A producer thread may want to wait until all work is done before adding new work.
- A producer thread may want to wait for all tasks to be done before sending a shutdown signal.
- A main thread may want to wait for all tasks to be done before terminating the program.
There are two aspects to this, they are:
- A thread blocking until tasks are done.
- A task being done is more than being retrieved from the queue.
Specifically, waiting means that the thread is blocked until the condition is met.
The condition of all tasks being processed is not only the case that all items put on the queue have been retrieved, but have been processed by the thread that retrieved them.
Next, let’s look at how we might do this in Python.
How to Know When All Tasks Are Done
Python provides thread-safe queue data structures in the queue module, such as the queue.Queue, queue.LifoQueue and queue.PriorityQueue classes.
Objects can be added to the queue via calls to Queue.put() and removed from the queue via calls to Queue.get().
Thread-safe means that multiple threads may put and get items from the queue concurrently without fear of a race condition or corruption of the internal data structure.
A thread can block and be notified when all current items or tasks on the queue are done by calling the Queue.join() function.
For example:
1 2 3 |
... # block until all tasks are done queue.join() |
Block means that the calling thread will wait until the condition is met, specifically that all tasks are done. The Queue.join() function will not return until then.
Notified means that the blocked thread will be woken up and allowed to proceed when all tasks are done. This means that the join() function will return at this time allowing the thread to continue on with the next instructions.
Multiple different threads may join the queue and await the state that all tasks are marked done.
If there are no tasks in the queue, e.g. the queue is empty, then the join() function will return immediately.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
— queue — A synchronized queue class
The Queue.join() function only works if threads retrieving items or tasks from the queue via Queue.get() also call the Queue.task_done() function.
For example:
1 2 3 4 5 6 7 |
... # get an item from the queue item = queue.get() # process the item # ... # mark the task as done or processed queue.task_done() |
The Queue.task_done() function is called by the consumer thread (e.g. the thread that calls Queue.get()) only after the thread has finished processing the task retrieved from the queue.
This will be application specific, but is more than simply retrieving the task from the queue.
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
— queue — A synchronized queue class
If there are multiple consumer threads, then for join() to function correctly, each consumer thread must mark tasks as done.
If processing the task may fail with an unexpected Error or Exception, it is a good practice to wrap the processing of the task in a try-finally pattern.
For example:
1 2 3 4 5 6 7 8 9 |
... # get an item from the queue item = queue.get() try: # process the item # ... finally: # mark the task as done or processed queue.task_done() |
This ensures that the task is marked done in the queue, even if processing the task fails.
In turn, it allows any threads blocked by calling Queue.join() to be appropriately notified when all items are retrieved from the queue, avoiding a possible deadlock concurrency failure mode.
Now that we are familiar with the Queue.task_done() and Queue.join() functions, let’s look at some worked examples.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
Example of Queue Join and Task Done
We can explore an example of how to use join() and task_done().
In this example we will have a producer thread that will add ten tasks to the queue and then signal that no further tasks are to be expected. The consumer thread will get the tasks, process them and mark them as done. When the signal to exit is received, the consumer thread will terminate. The main thread will wait for the producer thread to add all items to the queue, and will wait for all items on the queue to be processed before moving on.
Let’s dive in.
Producer Thread
First, we can define the function to be executed by the producer thread.
The task will iterate ten times in a loop.
1 2 3 4 5 |
... print('Producer starting') # add tasks to the queue for i in range(10): # ... |
Each iteration, it will generate a new random value between 0 and 1 via the random.random() function. It will then pair the generated value with an integer timestamp from 0 to 9 into a tuple and put the value on the queue.
1 2 3 4 5 6 |
... # generate a task task = (i, random()) print(f'.producer added {task}') # add it to the queue queue.put(task) |
Finally, the producer will put the value None on the queue to signal to the consumer that there are no further tasks.
This is called a Sentinel Value and is a common way for threads to communicate via queues to signal an important event, like a shutdown.
1 2 3 4 |
... # send a signal that no further tasks are coming queue.put(None) print('Producer finished') |
The producer() function below implements this by taking the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# task for the producer thread def producer(queue): print('Producer starting') # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}') # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished') |
Consumer Thread
Next, we can define the function to be executed by the consumer thread.
The consumer thread will loop forever.
1 2 3 4 5 |
... print('Consumer starting') # process items from the queue while True: # ... |
Each iteration, it will get an item from the queue and block if there is no item yet available.
1 2 3 |
... # get a task from the queue task = queue.get() |
If the item retrieved from the queue is the value None, then the task will break the loop and terminate the thread.
1 2 3 4 |
... # check for signal that we are done if task is None: break |
Otherwise, the fractional value is used to block with a call to time.sleep() and is then reported. The item is then marked as processed via a call to task_done().
1 2 3 4 5 6 |
... # process the item sleep(task[1]) print(f'.consumer got {task}') # mark the unit of work as processed queue.task_done() |
Finally, just prior to the thread exiting, it will mark the signal to terminate as processed.
1 2 3 4 |
... # mark the signal as processed queue.task_done() print('Consumer finished') |
The consumer() function below implements this and takes the queue instance as an argument.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') |
Create Queue and Threads
In the main thread we can create the shared queue instance.
1 2 3 |
... # create the shared queue queue = Queue() |
Then we can configure and start the producer thread, which will generate tasks and add them to the queue for the consumer to retrieve.
1 2 3 4 |
... # create and start the producer thread producer = Thread(target=producer, args=(queue,)) producer.start() |
We can then configure and start the consumer thread, which will patiently wait for work to arrive on the queue.
1 2 3 4 |
... # create and start the consumer thread consumer = Thread(target=consumer, args=(queue,)) consumer.start() |
The main thread will then block until the producer thread has added all work to the queue and the thread has terminated.
1 2 3 4 |
... # wait for the producer to finish producer.join() print('Main found that the producer has finished') |
The main thread will then block on the queue with a call to join() until the consumer has retrieved all values from the queue and processed them appropriately. This includes the final signal that there are no further task items to process.
1 2 3 4 |
... # wait for the queue to empty queue.join() print('Main found that all tasks are processed') |
It is important that the main thread blocks on the producer thread first, before blocking on the queue. This is to avoid a possible race condition.
For example, if the main thread blocked on the queue directly, it is possible that at that time for the queue to be empty, in which case the call would return immediately. Alternatively, it may join at a time when there are only a few tasks on the queue, they are consumed by the consumer thread and the join call returns.
The problem is that in both of these cases, we don’t know if the call to join returned because all tasks were marked done or just a subset of tasks that had been added to the queue at the time join was called.
By waiting for the producer thread to terminate first, we know that all tasks that could be added to the queue have been added to the queue. By blocking on the queue after the producer has finished, we know that when the call to join returns, that all tasks were added to the queue, all tasks were retrieved from the queue and all retrieved tasks were marked as done.
Complete Example
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# SuperFastPython.com # example of join and task done for a thread queue from time import sleep from random import random from queue import Queue from threading import Thread # task for the producer thread def producer(queue): print('Producer starting') # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}') # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished') # task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') # create the shared queue queue = Queue() # create and start the producer thread producer = Thread(target=producer, args=(queue,)) producer.start() # create and start the consumer thread consumer = Thread(target=consumer, args=(queue,)) consumer.start() # wait for the producer to finish producer.join() print('Main found that the producer has finished') # wait for the queue to empty queue.join() print('Main found that all tasks are processed') |
Running the example first creates the queue to be shared between the producer and consumer threads.
Then the producer thread is created and configured to execute our producer() function and passed in the shared queue instance. The producer thread is then started.
Next, the consumer thread is configured to execute our consumer() function and is then started.
The producer thread runs as fast as it is able, each iteration generating a random number and adding it along with the task number to the queue. Once all ten tasks have been added to the queue, a None message is sent to the queue to indicate no further messages are to be expected and the producer thread terminates.
The main thread notices that the producer thread has terminated, then blocks on the queue itself waiting for all tasks to be retrieved and marked as done.
The consumer thread retrieves tasks one at a time from the queue, blocks and reports their value, then marks the task as done. This is repeated until the sentinel message is received and the loop is broken. Just before the consumer thread is terminated, the final sentinel message is marked as done.
The main thread notices that all tasks were marked done and is then free to continue on with other tasks. In this case, the main thread reports a message and terminates.
A sample of the output from the program is listed below. Note, your specific results will differ given the use of random numbers.
In this case, we can see that the producer ran first, adding all tasks to the queue. Then the consumer thread ran, reading off each value. After all tasks are marked done, finally the main thread continues on.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Producer starting .producer added (0, 0.28872499722981937) .producer added (1, 0.3550399584093443) .producer added (2, 0.26390707899153476) .producer added (3, 0.824020601129119) .producer added (4, 0.6694888538902768) .producer added (5, 0.849513498553388) .producer added (6, 0.2664456911379752) .producer added (7, 0.08127443075397023) .producer added (8, 0.33470326350900137) .producer added (9, 0.005206803840811625) Producer finished Consumer starting Main found that the producer has finished .consumer got (0, 0.28872499722981937) .consumer got (1, 0.3550399584093443) .consumer got (2, 0.26390707899153476) .consumer got (3, 0.824020601129119) .consumer got (4, 0.6694888538902768) .consumer got (5, 0.849513498553388) .consumer got (6, 0.2664456911379752) .consumer got (7, 0.08127443075397023) .consumer got (8, 0.33470326350900137) .consumer got (9, 0.005206803840811625) Consumer finished Main found that all tasks are processed |
This highlights how to use task_done() and join() on a thread queue.
Next, let’s look at what happens if all tasks are already marked as done on the queue.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Join When All Tasks Are Already Done
If all tasks on the thread queue have been marked as done, then any call to join() will return immediately.
This can happen if the queue is empty, for example no tasks were added to the queue.
For example:
1 2 3 4 5 |
... # create a queue queue = Queue() # block until all tasks are done queue.join() # returns immediately |
This can also happen if tasks were added to the queue, then retrieved and marked done and no further tasks were added.
We can update the example from the previous section to demonstrate this case.
Specifically, after the main thread is notified that all tasks were marked as done by the consumer thread, it can then attempt to join the queue again.
For example:
1 2 3 4 |
... # try joining again queue.join() print('Tried joining again') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# SuperFastPython.com # example of join on a thread queue after all tasks are done from time import sleep from random import random from queue import Queue from threading import Thread # task for the producer thread def producer(queue): print('Producer starting') # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}') # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished') # task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') # create the shared queue queue = Queue() # create and start the producer thread producer = Thread(target=producer, args=(queue,)) producer.start() # create and start the consumer thread consumer = Thread(target=consumer, args=(queue,)) consumer.start() # wait for the producer to finish producer.join() print('Main found that the producer has finished') # wait for the queue to empty queue.join() print('Main found that all tasks are processed') # try joining again queue.join() print('Tried joining again') |
Running the example creates the shared queue, then creates and starts the producer and consumer threads.
The producer thread puts ten tasks to the queue, then the sentinel message. The consumer thread gets the ten tasks and marks each as done, then gets the sentinel message and marks it as done.
The main thread is then notified that all tasks on the queue are done.
The main thread then attempts to block on the queue again until all tasks are done. There are no tasks on the queue, therefore the call to join() returns immediately.
A sample of the output from the program is listed below. Note, your specific results will differ given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
Producer starting .producer added (0, 0.8035836748847393) .producer added (1, 0.33666098167059233) .producer added (2, 0.4520998845104912) .producer added (3, 0.18490109918532738) .producer added (4, 0.4515820360797962) .producer added (5, 0.35465194721393) .producer added (6, 0.29102171031802837) .producer added (7, 0.12298364974886544) .producer added (8, 0.9406798868626429) .producer added (9, 0.843344957321916) Producer finished Consumer starting Main found that the producer has finished .consumer got (0, 0.8035836748847393) .consumer got (1, 0.33666098167059233) .consumer got (2, 0.4520998845104912) .consumer got (3, 0.18490109918532738) .consumer got (4, 0.4515820360797962) .consumer got (5, 0.35465194721393) .consumer got (6, 0.29102171031802837) .consumer got (7, 0.12298364974886544) .consumer got (8, 0.9406798868626429) .consumer got (9, 0.843344957321916) Consumer finished Main found that all tasks are processed Tried joining again |
This example highlights that a call to join() will only block if there is at least one task that has not been marked as done.
Next, let’s look at what happens if we call the task_done() function too many times.
Example of Too Many Calls to Task Done
The task_done() function may be called more times than there are tasks on the queue.
This will result in a ValueError being raised.
This can happen if task_done() is called when there are no items in the queue.
For example:
1 2 3 4 5 |
... # create a queue queue = Queue() # mark the task as done queue.task_done() |
It can also happen if all tasks in the queue have been retrieved and marked as done.
We can update the above example to demonstrate this case.
Consider if the consumer thread marked the sentinel message as done just prior to breaking the loop, and again after the loop.
1 2 3 4 5 6 7 8 9 |
... # check for signal that we are done if task is None: # mark the signal as processed queue.task_done() break ... # mark the signal as processed queue.task_done() |
This would be one too many calls to task_done() compared to tasks on the queue and will result in a ValueError.
The updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: # mark the signal as processed queue.task_done() break # process the item sleep(task[1]) print(f'.consumer got {task}') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# SuperFastPython.com # example of too many calls to task_done for a thread queue from time import sleep from random import random from queue import Queue from threading import Thread # task for the producer thread def producer(queue): print('Producer starting') # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}') # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished') # task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: # mark the signal as processed queue.task_done() break # process the item sleep(task[1]) print(f'.consumer got {task}') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') # create the shared queue queue = Queue() # create and start the producer thread producer = Thread(target=producer, args=(queue,)) producer.start() # create and start the consumer thread consumer = Thread(target=consumer, args=(queue,)) consumer.start() # wait for the producer to finish producer.join() print('Main found that the producer has finished') # wait for the queue to empty queue.join() print('Main found that all tasks are processed') |
Running the example creates the shared queue, then creates and starts the producer and consumer threads.
The producer thread puts ten tasks to the queue, then the sentinel message.
The consumer thread gets the ten tasks and marks each as done, then gets the sentinel message and marks it as done.
The main thread is then notified that all tasks on the queue are done as per normal.
The consumer thread breaks the loop, then marks the sentinel message as done again. This causes a ValueError to be raised, which unravels the stack and ultimately terminates consumer thread.
A sample of the output from the program is listed below. Note, your specific results will differ given the use of random numbers.
The program achieves the desired outcome, but the consumer thread does not terminate normally.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
Producer starting .producer added (0, 0.32924547770163737) .producer added (1, 0.24588715644316705) .producer added (2, 0.20271066975539764) .producer added (3, 0.4343269311040271) .producer added (4, 0.59954639408808) .producer added (5, 0.37542677283488946) .producer added (6, 0.3799982006865581) .producer added (7, 0.5233369534357395) .producer added (8, 0.2602459165427916) .producer added (9, 0.4480714813483033) Producer finished Consumer starting Main found that the producer has finished .consumer got (0, 0.32924547770163737) .consumer got (1, 0.24588715644316705) .consumer got (2, 0.20271066975539764) .consumer got (3, 0.4343269311040271) .consumer got (4, 0.59954639408808) .consumer got (5, 0.37542677283488946) .consumer got (6, 0.3799982006865581) .consumer got (7, 0.5233369534357395) .consumer got (8, 0.2602459165427916) .consumer got (9, 0.4480714813483033) Exception in thread Thread-2: Traceback (most recent call last): ... Main found that all tasks are processed ... ValueError: task_done() called too many times |
This example highlights what happens if too many calls are made to the task_done() function.
Next, let’s look at an example of a deadlock caused by too few calls to the task_done() function.
Example of Join Deadlock Due to Missed Task Done
A deadlock may result if there is at least one thread waiting on the queue via a call to join() and too few calls are made to the task_done() function.
This can happen if an Error or Exception is raised by a consumer thread while processing a task. For example, between a call to get() and the call to task_done() on the queue.
We can update the above example to demonstrate this case.
Consider the loop in consumer() function. We can contrive a case where if a task has the identifier of five, to raise an exception.
1 2 3 4 |
... # check for a failure case if task[0] == 5: raise Exception('Something bad happened') |
An exception raised at this point will cause this item on the queue to not have the task_done() function called.
Additionally, any items not retrieved from the queue will remain on the queue and will also not be marked as task_done().
The updated version of the consumer() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}') # check for a failure case if task[0] == 5: raise Exception('Something bad happened') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') |
The exception will cause the consumer thread to unravel the stack and to terminate. The result will be that the main thread will block and wait forever on the queue as the remaining tasks will never be marked done.
This is a deadlock and the program must be terminated forcefully, such as via Control-C.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# SuperFastPython.com # example of deadlock with join on a thread queue from time import sleep from random import random from queue import Queue from threading import Thread # task for the producer thread def producer(queue): print('Producer starting') # add tasks to the queue for i in range(10): # generate a task task = (i, random()) print(f'.producer added {task}') # add it to the queue queue.put(task) # send a signal that no further tasks are coming queue.put(None) print('Producer finished') # task for the consumer thread def consumer(queue): print('Consumer starting') # process items from the queue while True: # get a task from the queue task = queue.get() # check for signal that we are done if task is None: break # process the item sleep(task[1]) print(f'.consumer got {task}') # check for a failure case if task[0] == 5: raise Exception('Something bad happened') # mark the unit of work as processed queue.task_done() # mark the signal as processed queue.task_done() print('Consumer finished') # create the shared queue queue = Queue() # create and start the producer thread producer = Thread(target=producer, args=(queue,)) producer.start() # create and start the consumer thread consumer = Thread(target=consumer, args=(queue,)) consumer.start() # wait for the producer to finish producer.join() print('Main found that the producer has finished') # wait for the queue to empty queue.join() print('Main found that all tasks are processed') |
Running the example creates the shared queue, then creates and starts the producer and consumer threads.
The producer thread puts ten tasks to the queue, then the sentinel message.
The consumer thread gets the ten tasks and marks each as done. When a task is retrieved that has an integer value of 5, the Exception is triggered and the thread is terminated.
The main thread continues to block, waiting for the consumer thread to mark all tasks as done.
This will never happen and the main thread is stuck, blocking forever for a condition that will never be met. A deadlock.
The program must then be forcefully terminated, such as via Control-C or via the kill command on POSIX operating systems like Linux or MacOS.
A sample of the output from the program is listed below. Note, your specific results will differ given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Producer starting Consumer starting .producer added (0, 0.857837894040976) .producer added (1, 0.9848037154429664) .producer added (2, 0.12700351329522575) .producer added (3, 0.6515070937934918) .producer added (4, 0.545121803729179) .producer added (5, 0.4270992095754642) .producer added (6, 0.9544698294630786) .producer added (7, 0.881736189399228) .producer added (8, 0.5339142632137198) .producer added (9, 0.8849690544003713) Producer finished Main found that the producer has finished .consumer got (0, 0.857837894040976) .consumer got (1, 0.9848037154429664) .consumer got (2, 0.12700351329522575) .consumer got (3, 0.6515070937934918) .consumer got (4, 0.545121803729179) .consumer got (5, 0.4270992095754642) Exception in thread Thread-2: Traceback (most recent call last): ... Exception: Something bad happened |
This highlights that a deadlock is possible in those cases where processing a task could result in an Error or Exception.
A possible fix is to wrap the processing of the task in a try-catch-finally pattern.
Specifically:
- The call to Queue.get() occurs before the try block.
- The processing of the task occurs within the try block.
- The expected Error or Exception is handled in the except block.
- The call to task_done() occurs within the finally block.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
... # get a task from the queue task = queue.get() # ... try: # process the item sleep(task[1]) print(f'.consumer got {task}') # check for a failure case if task[0] == 5: raise Exception('Something bad happened') except: # carry on... pass finally: # mark the unit of work as processed queue.task_done() |
This approach ensures that even if the processing of a task fails, that the task is still marked as done and that any threads waiting on the queue will be appropriately notified.
Next, let’s look at some common questions about queue join() and task_done().
Common Questions
This section lists some common questions about thread queue task_done() and join().
Do you have any questions?
Let me know in the comments below and I may add your question to this section.
Why Not Just Use Queue.qsize() or Queue.empty()?
The Queue.qsize() function will report the number of items in the queue and will return a value of zero when there are no tasks in the queue at the time of calling.
For example:
1 2 3 4 |
... # check for no items in the queue if not queue.qsize(): # ... |
The Queue.empty() function will return True when there are no tasks in the queue at the time of calling.
For example:
1 2 3 4 |
... # check for an empty queue if queue.empty(): # ... |
Both functions can be used to report if there are no items on the queue, but neither have any way of determining if the tasks retrieved from the queue have been processed and marked done and neither have any way of notifung waiting threads.
Nevertheless, these functions can be used instead of join() in some limited cases.
If join() is called at a time when the thread knows that no further tasks will be added to the queue, such as in the above situations, then qsize() or empty() can be used instead within a busy wait loop.
A busy wait loop or busy waiting is a way for a thread to block by repeatedly checking a condition. Once the condition is met, the thread may then proceed.
For example, below is a busy wait loop using the qsize() function:
1 2 3 4 |
... # wait for all items in the queue to be processed while not queue.qsize(): pass |
We can also use the empty() function in a busy wait loop.
For example:
1 2 3 4 |
... # wait for all items in the queue to be processed while not queue.empty(): pass |
The problem with the busy wait loop is that the thread will remain active and consume computational resources while waiting for the condition to be met.
The join() function on the other hand will block until the condition is met (e.g. until it is notified), meaning that the thread will not consume any computational resources while waiting.
You can learn more about busy wait loops in this tutorial:
Is Queue.join() related to Thread.join()?
No.
Calling join() on a queue will block and return once the number of calls to task_done() matches the number of calls to put() at the time that join() is called.
Calling join() a thread will block until the target thread terminates.
You can learn more about Thread.join() in the tutorial:
Do All Consumer Threads Need To Call task_done()?
Yes.
If a thread is blocked by calling join() on the queue, then all threads that are retrieving items from the queue via get() must call task_done() in order for the blocked thread to be notified correctly.
What if the Queue is Empty?
A thread may call join() on an empty thread.
If a thread calls join() on a queue that is empty, then it will not block and the call will return immediately.
What if Tasks Are Put After a Call to Join?
Tasks may be put on the queue after a thread has called join().
Each call to put() will increment an internal counter in the queue. Each call to task_done() decrements the counter. A call to join() will return once the counter has reached zero.
This means that any calls to put() while a thread is blocked on a call to join() will require a subsequent call to task_done() by a consumer thread in order for the blocked thread to be notified.
Can task_done() Be Called Without Calls to get()?
Yes.
A thread blocked with a call to join() will be notified after a task_done() call has been made for each call to put().
Any calls to get() are not considered by the blocked thread.
This means that a thread blocked by a call to join() may be notified by the correct number of calls to task_done(), even though all of the tasks may still remain on the queue given the absence of calls to get().
When Should I Call join()?
The join() function should be called by a thread on a queue when the calling thread wants to be notified when all tasks on the queue have been marked as done.
Ideally, the join() function would be called after all tasks that could be added to the queue have been added to the queue. This is to avoid a race condition where the blocked thread is notified because all tasks have been retrieved from the queue and marked as done, yet the producer thread has more tasks to add to the queue.
When Should I call task_done()?
The task_done() should be called on the queue after a thread has called get() to retrieve the item and after that retrieved item has been processed to the satisfaction of the application.
What if Processing a Task May Raise an Error or Exception?
Processing tasks retrieved from a queue may fail with an Error or Exception.
If this is possible, a try-except-finally or try-finally pattern may be used.
The call to get() on the queue would be made prior to the try block and the call to task_done() would be made in the finally block.
For example:
1 2 3 4 5 6 7 8 |
... # get a task task = queue.get() try: # ... finally: # mark the task as done queue.task_done() |
This ensures that regardless of the success or failure of processing the task, the task is marked as done and any thread blocked until all tasks are done will be correctly notified.
Otherwise, if a task_done() call is missed because of an error while processing, any threads waiting on the queue with a call to join() will block forever, resulting in a deadlock.
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to wait for all tasks to be done on a thread queue.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Alvis Taurēns on Unsplash
Do you have any questions?