Last Updated on September 12, 2022
You can develop a countdown latch from scratch using a threading.Condition.
In this tutorial you will discover how to develop a latch for threads in Python.
Let’s get started.
Need for a Countdown Latch
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guide:
In concurrent programming, we may need to coordinate one or more threads based on the work completed by many other threads.
This may be needed for many reasons, but generally involves one master or controlling thread waiting for a fixed number of subtasks to be completed by worker threads.
For example:
- Waiting for multiple parts of a solution to be complete before piecing them together.
- Waiting for all subtasks to complete before moving on to the next task.
Solving this type of coordination problem requires keeping track of the number of elements or subtasks that have completed in a thread-safe manner, and not moving on until all elements have completed.
A latch can be used to address this type of problem as it provides a thread-safe counter and notification mechanism once the counter reaches zero.
What is a latch and how can we use it in Python?
Run loops using all CPUs, download your FREE book to learn how.
What is a Latch
A latch or countdown latch is a synchronization primitives used in concurrent programming.
It is created in the closed position and requires a count to be decremented until zero before opening.
The count is decremented by threads that pass through the latch, calling a count_down() function. This is a non-blocking call, allowing the caller to proceed immediately.
Other threads register interest in the latch by calling wait() to block on the latch until it is opened.
A latch is used to coordinate more threads with the full opening of the latch. It just so happens that the latch is designed to be opened incrementally each time other threads pass through, e.g. counting down from a specified number to zero.
A latch is a one-use structure and is not reset after use. Once open, additional threads that may wait on the latch will not block but will instead return immediately.
Internally, the latch may count up or count down, but typically decrements with each arrival, hence the common name “countdown latch“. Internally, the latch may or may not raise an exception if more than the specified number of arrivals (countdowns) are performed.
Java provides a countdown latch implementation via the java.util.concurrent.CountDownLatch class, which may provide an interesting reference.
Now that we know what a latch is, let’s compare it to a related synchronization primitive called a barrier.
Latch Vs Barrier
In concurrency, a latch and barrier are related, but perform different functions.
A latch and a barrier are very similar, for example:
- Both are synchronization primitives for use in concurrent programming.
- Both involve a count or number of threads arriving before the state of the structure changes.
- Both are a data structure that starts closed and is open once all parties arrive.
- Both allow threads to wait or be blocked until the structure is open.
The main difference is:
- The “countdown” and “waiting” are performed by separate parties (threads) on the latch.
- The “countdown” and “waiting” are performed by the same parties (threads) on the barrier.
The threads that countdown on the latch are free to carry on after reaching the latch, whereas on the barrier the threads that countdown must all block or wait until the barrier is open.
Put another way, the barrier coordinates the countdown threads themselves on the count, whereas the latch coordinates other threads waiting on the count alone.
- A latch can have any number of waiting threads and a fixed number of expected arrivals before opening.
- A barrier has a fixed number of waiting threads that are all waiting for each other to arrive before opening.
Latch
This means that on the latch, we have separate threads counting down the latch from those threads that are waiting for the latch to open.
For example, worker threads each performing their task then counting down the latch and a coordinator thread waiting for all worker threads to finish by waiting for the latch to open.
Barrier
It means that on a barrier, the threads that countdown are the same threads that wait for the barrier to open.
For example, worker threads each perform their task then counting down the barrier and waiting for the other threads to finish, before all proceeding together once the barrier is open.
Latch with a Barrier
We cannot implement a latch with a barrier.
A barrier will prevent the threads that count down from proceeding unlike a latch where they are free to carry on after reaching the latch. And the only threads that can wait on the barrier are those that are part of counting down.
Nevertheless, we could simulate a latch with a barrier, by having the countdown threads as well as those other threads interested in waiting for the open state all wait together on the barrier until it is open.
Barrier with a Latch
We could implement a barrier with a latch.
This could be achieved by having each countdown thread that reaches the latch to also wait on the latch for it to open.
Now that we know the difference between a latch and a barrier, let’s look at how we might implement it in Python.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
How to Develop a Countdown Latch
Python does not provide a countdown latch, but we can develop one easily using a new class and a threading.Condition.
We can develop a simple countdown latch class named CountDownLatch. It must have three elements:
- Constructor that require a count and initializes the internal locks.
- CountDown that decrements the counter in a thread safe manner and notify waiting threads if the count reaches zero.
- Wait that allows threads to block until the count reaches zero.
We will require a wait/notify structure within the latch. Python provides a threading.Condition that supports wait/notify directly.
If you are new to the condition, you can learn more here:
Let’s start developing our class.
1 2 3 |
# simple countdown latch, starts closed then opens once count is reached class CountDownLatch(): # ... |
The constructor will take a count specified by the user, indicating the expected number of parties to arrive before the latch is opened.
We will store this count and then create a new threading.Condition used to manage changes to the count and to allow threads to wait and be notified.
1 2 3 4 5 6 |
# constructor def __init__(self, count): # store the count self.count = count # control access to the count and notify when latch is open self.condition = Condition() |
The count_down() function must first acquire the condition before doing anything to ensure any checking and changing of the internal count is thread safe.
This can be achieved using the context manager.
1 2 3 4 |
... # acquire the lock on the condition with self.condition: # ... |
Next, we need to check if the latch is already open, and if so return immediately.
1 2 3 4 |
... # check if the latch is already open if self.count == 0: return |
We can then decrement the counter.
1 2 3 |
... # decrement the counter self.count -= 1 |
Finally, we can check if the counter has reached zero and whether we should notify all waiting threads.
1 2 3 4 5 |
... # check if the latch is now open if self.count == 0: # notify all waiting threads that the latch is open self.condition.notify_all() |
Tying this together, the complete count_down() method is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# count down the latch by one increment def count_down(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # decrement the counter self.count -= 1 # check if the latch is now open if self.count == 0: # notify all waiting threads that the latch is open self.condition.notify_all() |
The count_down() function could be extended to support counting down by a specified amount. It may also raise an Exception if the latch is already open, if that truly is an error state (it feels like it is).
Finally, we need a wait() function.
Again, we must acquire the condition before we can check the internal state in a thread-safe manner.
1 2 3 4 |
... # acquire the lock on the condition with self.condition: # ... |
We can then check if the latch is already open, in which case we can return immediately.
1 2 3 4 |
... # check if the latch is already open if self.count == 0: return |
Otherwise, we can wait on the condition to be notified that the latch will be opened.
1 2 3 |
... # wait to be notified when the latch is open self.condition.wait() |
Tying this together, the complete wait() function is listed below.
1 2 3 4 5 6 7 8 9 |
# wait for the latch to open def wait(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # wait to be notified when the latch is open self.condition.wait() |
The wait() function could be extended to support a timeout.
And that’s it.
Tying this together, the complete CountDownLatch class is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# simple countdown latch, starts closed then opens once count is reached class CountDownLatch(): # constructor def __init__(self, count): # store the count self.count = count # control access to the count and notify when latch is open self.condition = Condition() # count down the latch by one increment def count_down(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # decrement the counter self.count -= 1 # check if the latch is now open if self.count == 0: # notify all waiting threads that the latch is open self.condition.notify_all() # wait for the latch to open def wait(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # wait to be notified when the latch is open self.condition.wait() |
You might want to add extra methods to the latch as a fun extension.
For example, we could add a method to get the current value of the internal count. We also might want to add a method to reset the count, but perhaps only if the latch is already open, e.g. reset from open to closed state.
We can use this countdown class by first creating an instance of the class and specifying the number of expected parties to arrive as an argument to the constructor.
1 2 3 |
... # create the countdown latch latch = CountDownLatch(5) |
As each party arrives at the latch, we can call the count_down() function.
For example:
1 2 3 |
... # count down the latch latch.count_down() |
Any threads interested in when the latch is open can call the wait() function.
For example:
1 2 3 |
... # block until the latch is open latch.wait() |
Now that we know how to develop a countdown latch class, let’s look at using it in a worked example.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Using a Countdown Latch
We can develop an example to demonstrate the latch.
In this example, we will start a number of threads, each of which must perform some task then trigger the latch to signal they are done. The coordinating thread will wait for all threads to complete their work before carrying on.
This is a very common usage pattern for a countdown latch.
First, we can define a custom function to be executed by each worker thread. The function will take the latch instance as well as a unique identifying integer as arguments.
We will simulate work in the thread by blocking for a random interval of ten seconds, then counting down the latch to indicate the work is done before finally reporting a message that the thread is closing as all work is done.
The task() function below implements this.
1 2 3 4 5 6 7 8 |
# task that counts down the latch def task(latch, i): # block for a moment sleep(random() * 10) # count down the latch latch.count_down() # report done print(f'Thread {i} done.') |
Next, in the main thread we can first create an instance of the latch class, configured to expect five arrivals.
1 2 3 |
... # create the countdown latch latch = CountDownLatch(5) |
We can then create and start five worker threads, each configured to call our task() function via the “target” argument of the threading.Thread constructor. We will pass the latch instance as an argument as well as a unique integer for each worker thread from 0 to 4.
1 2 3 4 5 |
... # start 5 threads for i in range(5): thread = Thread(target=task, args=(latch, i)) thread.start() |
The main thread will then wait, blocking until the latch is open.
This signals that all worker threads have completed their work successfully and have reached the latch.
1 2 3 4 |
... # wait for the latch to close print('Main waiting on latch...') latch.wait() |
Finally, the main thread will report a message and terminate.
1 2 3 |
... # latch is open, move on print('Main done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# SuperFastPython.com # example of using a countdown latch from time import sleep from random import random from threading import Thread from threading import Condition # simple countdown latch, starts closed then opens once count is reached class CountDownLatch(): # constructor def __init__(self, count): # store the count self.count = count # control access to the count and notify when latch is open self.condition = Condition() # count down the latch by one increment def count_down(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # decrement the counter self.count -= 1 # check if the latch is now open if self.count == 0: # notify all waiting threads that the latch is open self.condition.notify_all() # wait for the latch to open def wait(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # wait to be notified when the latch is open self.condition.wait() # task that counts down the latch def task(latch, i): # block for a moment sleep(random() * 10) # count down the latch latch.count_down() # report done print(f'Thread {i} done.') # create the countdown latch latch = CountDownLatch(5) # start 5 threads for i in range(5): thread = Thread(target=task, args=(latch, i)) thread.start() # wait for the latch to close print('Main waiting on latch...') latch.wait() # latch is open, move on print('Main done') |
Running the example first creates the countdown latch.
Then five worker threads are created and configured and started immediately.
Each worker thread blocks for some fraction of ten seconds, triggers the latch, then reports a message that is done before terminating.
Meanwhile, the main thread blocks on the latch, waiting for each thread to complete its work and signal the fact on the latch.
Once all threads reach the latch, the latch is opened and the main thread is notified and its wait() function returns.
The main thread then carries on and reports its final message.
1 2 3 4 5 6 7 |
Main waiting on latch... Thread 4 done. Thread 0 done. Thread 1 done. Thread 2 done. Thread 3 done. Main done |
Now that we know how to use the countdown latch, let’s consider how we might simulate a similar result with a barrier.
Simulate a Latch with a Barrier
A barrier cannot be used to implement a latch.
The main reason is because the latch allows threads that arrive to carry on and to have any number of other waiting threads. Whereas the barrier requires that each thread that arrives must also wait for all other threads to arrive.
Nevertheless, if the application permits these limitations, we could simulate (something like) a countdown latch with a threading.Barrier class.
A barrier is first configured with the expected number of parties then requires that each thread that arrives calls the wait() function, and that’s all there is to it.
If you are new to the barrier, you can learn more here:
We can update the example from the previous section to use a threading.Barrier instead of our custom CountdownLatch class.
First, we must update the task() function to take a threading.Barrier instance as an argument and to call the wait() function on the barrier once its work is completed.
The updated version of the task() function with these changes is listed below.
1 2 3 4 5 6 7 8 |
# task that does some work and waits on the barrier def task(barrier, i): # block for a moment sleep(random() * 10) # wait on the barrier barrier.wait() # report done print(f'Thread {i} done.') |
Next, we must create an instance of the threading.Barrier class configured to expect five parties, one for each worker thread, plus the main thread, e.g. 6.
1 2 3 |
... # create the barrier barrier = Barrier(5 + 1) |
Next, we can create and configure the worker threads to take the instance of the barrier as an argument.
1 2 3 4 5 |
... # start 5 threads for i in range(5): thread = Thread(target=task, args=(barrier, i)) thread.start() |
Finally, the main thread must wait on the barrier for all worker threads to arrive.
1 2 3 4 |
... # wait for the barrier to open print('Main waiting on barrier...') barrier.wait() |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of simulating a latch with a barrier from time import sleep from random import random from threading import Thread from threading import Barrier # task that does some work and waits on the barrier def task(barrier, i): # block for a moment sleep(random() * 10) # wait on the barrier barrier.wait() # report done print(f'Thread {i} done.') # create the barrier barrier = Barrier(5 + 1) # start 5 threads for i in range(5): thread = Thread(target=task, args=(barrier, i)) thread.start() # wait for the barrier to open print('Main waiting on barrier...') barrier.wait() # barrier is open, move on print('Main done') |
Running the example first creates the barrier and then creates and starts the five worker threads passing the barrier instance as an argument.
Each worker thread performs its work then reaches the barrier and waits, blocking until all other worker threads and the main thread arrive at the barrier.
This keeps the worker threads alive, along with their resource requirements in memory (e.g. each thread has its own stack space and is scheduled by the operating system).
The main thread reaches the barrier and waits for all of the worker threads to complete their work.
Finally, all threads reach the barrier and are free to continue.
The worker threads print their messages and terminate. The main thread carries on and reports its own final message.
We can see that functionally, we achieved the same result as the previous section. Namely, we were able to block the main thread until the worker threads finished, at the cost of keeping the worker threads alive unnecessarily.
1 2 3 4 5 6 7 |
Main waiting on barrier... Thread 0 done. Main done Thread 4 done. Thread 3 done. Thread 2 done. Thread 1 done. |
Now that we know how to simulate a latch with a barrier (sort of), let’s explore the reverse of simulating a barrier with a latch.
Simulate a Barrier with a Latch
We can simulate a barrier using a countdown latch.
This is relatively straightforward and involves having the threads that arrive on the latch to count down also wait.
We can update the above countdown latch example to act like a barrier by having each worker thread wait on the latch, and to have the main thread also countdown the latch along with the wait.
First, we must update the latch to add an additional position for the main thread, before opening.
1 2 3 |
... # create the countdown latch latch = CountDownLatch(5 + 1) |
Next, we can update the task() function so that each thread will wait for the latch to open after counting down.
1 2 3 4 5 6 7 8 9 10 |
# task that counts down the latch def task(latch, i): # block for a moment sleep(random() * 10) # count down the latch latch.count_down() # wait on the latch itself latch.wait() # report done print(f'Thread {i} done.') |
We can then update the main thread to first count down the latch, then wait, just like the worker threads.
1 2 3 4 5 6 7 |
... # wait for the latch to close print('Main waiting on latch...') # count down the latch as well latch.count_down() # wait for all other threads latch.wait() |
The effect is that the latch will act like a barrier, mostly.
Unlike a barrier, there is a possible race condition between the last thread reaching the call to count_down() and other threads making it to their call to wait(), meaning that it is theoretically possible that not all threads are blocking on the call to wait() at the time the final thread comes through. But this is a reasonably minor difference.
It points out that if you need a barrier, use a barrier, don’t simulate one with a latch.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# SuperFastPython.com # example of simulating a barrier with a latch from time import sleep from random import random from threading import Thread from threading import Condition # simple countdown latch, starts closed then opens once count is reached class CountDownLatch(): # constructor def __init__(self, count): # store the count self.count = count # control access to the count and notify when latch is open self.condition = Condition() # count down the latch by one increment def count_down(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # decrement the counter self.count -= 1 # check if the latch is now open if self.count == 0: # notify all waiting threads that the latch is open self.condition.notify_all() # wait for the latch to open def wait(self): # acquire the lock on the condition with self.condition: # check if the latch is already open if self.count == 0: return # wait to be notified when the latch is open self.condition.wait() # task that counts down the latch def task(latch, i): # block for a moment sleep(random() * 10) # count down the latch latch.count_down() # wait on the latch itself latch.wait() # report done print(f'Thread {i} done.') # create the countdown latch latch = CountDownLatch(5 + 1) # start 5 threads for i in range(5): thread = Thread(target=task, args=(latch, i)) thread.start() # wait for the latch to close print('Main waiting on latch...') # count down the latch as well latch.count_down() # wait for all other threads latch.wait() # latch is open, move on print('Main done') |
Running the example first creates the latch and then creates and starts the five worker threads.
Each thread performs its own work then hits the latch calling count_down() then waiting for all other parties.
The main thread continues on first calling count_down() then blocking waiting for the latch to open.
All worker threads reach the latch and call count_down() allowing the latch to open.
Any threads blocking on the call to wait() are then released to carry on, including the main thread. Any threads that had called count_down() but had not yet made it to wait() will call wait() and not block.
1 2 3 4 5 6 7 |
Main waiting on latch... Thread 0 done. Main done Thread 2 done. Thread 1 done. Thread 3 done. Thread 4 done. |
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use a countdown latch in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Vivish Photography on Unsplash
Ahmed Ouerfelli says
I think we could achieve this behaviour by simply joining the threads that we want to wait for. This specific example does not need a count down.
threads = [Thread(target=task) for _ in range(5)]
for thread in threads :thread.start()
for thread in threads: thread.join()
I would be interested to know an example where we are forced to use a count down latch. Perhaps we are waiting for each thread to complete some operation but they may continue working something else after that, so the join() method will continue waiting unnecessarily.
Perhaps we are not waiting for specific threads, rather a specific number of operations to be done which could be processed by any thread.
Jason Brownlee says
In the above case, we have one thread waiting for many other threads to complete. These other threads may be completed in any order.
This cannot be solved by the one thread joining one other thread, which thread would it be? Instead, it must be notified when all other threads are done.
A barrier could be used, but requires that the task threads also block, which is undesirable in this use case. Instead, we use a latch.
Does that help?
Ahmed Ouerfelli says
I mean, by joining all of them like in this code:
for thread in threads:
thread.join()
That loop will finish if and only if all the threads finish. Their order of completion does not affect this fact.
In each iteration of the loop, the join method either blocks until the given thread is completed, or returns immediately because the thread is already completed. So we cannot complete the iteration until the corresponding thread is dead. When the loop finishes, it means all threads are dead. So the main thread can proceed afterwards.
Jason Brownlee says
Yes, sorry for my confusion. You could solve it by having the main thread block on all task threads.
The downside of this approach is the tight coupling between the main thread and the task threads. The main thread must know how many tasks/threads there are and have direct access to them.
The latch allows these concerns to be decoupled. There can be an arbitrary number of tasks and they could be executed by any means, the main thread is only concerned with the latch.
I hope that helps.
Ahmed Ouerfelli says
Thank you for this valuable insight.
Jason Brownlee says
You’re very welcome!