Thread Countdown Latch in Python

March 18, 2022 Python Threading

You can develop a countdown latch from scratch using a threading.Condition.

In this tutorial you will discover how to develop a latch for threads in Python.

Let's get started.

Need for a Countdown Latch

A thread is a thread of execution in a computer program.

Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.

Sometimes we may need to create additional threads in our program in order to execute code concurrently.

Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.

You can learn more about Python threads in the guide:

In concurrent programming, we may need to coordinate one or more threads based on the work completed by many other threads.

This may be needed for many reasons, but generally involves one master or controlling thread waiting for a fixed number of subtasks to be completed by worker threads.

For example:

Solving this type of coordination problem requires keeping track of the number of elements or subtasks that have completed in a thread-safe manner, and not moving on until all elements have completed.

A latch can be used to address this type of problem as it provides a thread-safe counter and notification mechanism once the counter reaches zero.

What is a latch and how can we use it in Python?

What is a Latch

A latch or countdown latch is a synchronization primitives used in concurrent programming.

It is created in the closed position and requires a count to be decremented until zero before opening.

The count is decremented by threads that pass through the latch, calling a count_down() function. This is a non-blocking call, allowing the caller to proceed immediately.

Other threads register interest in the latch by calling wait() to block on the latch until it is opened.

A latch is used to coordinate more threads with the full opening of the latch. It just so happens that the latch is designed to be opened incrementally each time other threads pass through, e.g. counting down from a specified number to zero.

A latch is a one-use structure and is not reset after use. Once open, additional threads that may wait on the latch will not block but will instead return immediately.

Internally, the latch may count up or count down, but typically decrements with each arrival, hence the common name "countdown latch". Internally, the latch may or may not raise an exception if more than the specified number of arrivals (countdowns) are performed.

Java provides a countdown latch implementation via the java.util.concurrent.CountDownLatch class, which may provide an interesting reference.

Now that we know what a latch is, let's compare it to a related synchronization primitive called a barrier.

Latch Vs Barrier

In concurrency, a latch and barrier are related, but perform different functions.

A latch and a barrier are very similar, for example:

The main difference is:

The threads that countdown on the latch are free to carry on after reaching the latch, whereas on the barrier the threads that countdown must all block or wait until the barrier is open.

Put another way, the barrier coordinates the countdown threads themselves on the count, whereas the latch coordinates other threads waiting on the count alone.

Latch

This means that on the latch, we have separate threads counting down the latch from those threads that are waiting for the latch to open.

For example, worker threads each performing their task then counting down the latch and a coordinator thread waiting for all worker threads to finish by waiting for the latch to open.

Barrier

It means that on a barrier, the threads that countdown are the same threads that wait for the barrier to open.

For example, worker threads each perform their task then counting down the barrier and waiting for the other threads to finish, before all proceeding together once the barrier is open.

Latch with a Barrier

We cannot implement a latch with a barrier.

A barrier will prevent the threads that count down from proceeding unlike a latch where they are free to carry on after reaching the latch. And the only threads that can wait on the barrier are those that are part of counting down.

Nevertheless, we could simulate a latch with a barrier, by having the countdown threads as well as those other threads interested in waiting for the open state all wait together on the barrier until it is open.

Barrier with a Latch

We could implement a barrier with a latch.

This could be achieved by having each countdown thread that reaches the latch to also wait on the latch for it to open.

Now that we know the difference between a latch and a barrier, let's look at how we might implement it in Python.

How to Develop a Countdown Latch

Python does not provide a countdown latch, but we can develop one easily using a new class and a threading.Condition.

We can develop a simple countdown latch class named CountDownLatch. It must have three elements:

We will require a wait/notify structure within the latch. Python provides a threading.Condition that supports wait/notify directly.

If you are new to the condition, you can learn more here:

Let's start developing our class.

# simple countdown latch, starts closed then opens once count is reached
class CountDownLatch():
	# ...

The constructor will take a count specified by the user, indicating the expected number of parties to arrive before the latch is opened.

We will store this count and then create a new threading.Condition used to manage changes to the count and to allow threads to wait and be notified.

# constructor
def __init__(self, count):
    # store the count
    self.count = count
    # control access to the count and notify when latch is open
    self.condition = Condition()

The count_down() function must first acquire the condition before doing anything to ensure any checking and changing of the internal count is thread safe.

This can be achieved using the context manager.

...
# acquire the lock on the condition
with self.condition:
	# ...

Next, we need to check if the latch is already open, and if so return immediately.

...
# check if the latch is already open
if self.count == 0:
    return

We can then decrement the counter.

...
# decrement the counter
self.count -= 1

Finally, we can check if the counter has reached zero and whether we should notify all waiting threads.

...
# check if the latch is now open
if self.count == 0:
    # notify all waiting threads that the latch is open
    self.condition.notify_all()

Tying this together, the complete count_down() method is listed below.

# count down the latch by one increment
def count_down(self):
    # acquire the lock on the condition
    with self.condition:
        # check if the latch is already open
        if self.count == 0:
            return
        # decrement the counter
        self.count -= 1
        # check if the latch is now open
        if self.count == 0:
            # notify all waiting threads that the latch is open
            self.condition.notify_all()

The count_down() function could be extended to support counting down by a specified amount. It may also raise an Exception if the latch is already open, if that truly is an error state (it feels like it is).

Finally, we need a wait() function.

Again, we must acquire the condition before we can check the internal state in a thread-safe manner.

...
# acquire the lock on the condition
with self.condition:
	# ...

We can then check if the latch is already open, in which case we can return immediately.

...
# check if the latch is already open
if self.count == 0:
    return

Otherwise, we can wait on the condition to be notified that the latch will be opened.

...
# wait to be notified when the latch is open
self.condition.wait()

Tying this together, the complete wait() function is listed below.

# wait for the latch to open
def wait(self):
    # acquire the lock on the condition
    with self.condition:
        # check if the latch is already open
        if self.count == 0:
            return
        # wait to be notified when the latch is open
        self.condition.wait()

The wait() function could be extended to support a timeout.

And that's it.

Tying this together, the complete CountDownLatch class is listed below.

# simple countdown latch, starts closed then opens once count is reached
class CountDownLatch():
    # constructor
    def __init__(self, count):
        # store the count
        self.count = count
        # control access to the count and notify when latch is open
        self.condition = Condition()

    # count down the latch by one increment
    def count_down(self):
        # acquire the lock on the condition
        with self.condition:
            # check if the latch is already open
            if self.count == 0:
                return
            # decrement the counter
            self.count -= 1
            # check if the latch is now open
            if self.count == 0:
                # notify all waiting threads that the latch is open
                self.condition.notify_all()

    # wait for the latch to open
    def wait(self):
        # acquire the lock on the condition
        with self.condition:
            # check if the latch is already open
            if self.count == 0:
                return
            # wait to be notified when the latch is open
            self.condition.wait()

You might want to add extra methods to the latch as a fun extension.

For example, we could add a method to get the current value of the internal count. We also might want to add a method to reset the count, but perhaps only if the latch is already open, e.g. reset from open to closed state.

We can use this countdown class by first creating an instance of the class and specifying the number of expected parties to arrive as an argument to the constructor.

...
# create the countdown latch
latch = CountDownLatch(5)

As each party arrives at the latch, we can call the count_down() function.

For example:

...
# count down the latch
latch.count_down()

Any threads interested in when the latch is open can call the wait() function.

For example:

...
# block until the latch is open
latch.wait()

Now that we know how to develop a countdown latch class, let's look at using it in a worked example.

Example of Using a Countdown Latch

We can develop an example to demonstrate the latch.

In this example, we will start a number of threads, each of which must perform some task then trigger the latch to signal they are done. The coordinating thread will wait for all threads to complete their work before carrying on.

This is a very common usage pattern for a countdown latch.

First, we can define a custom function to be executed by each worker thread. The function will take the latch instance as well as a unique identifying integer as arguments.

We will simulate work in the thread by blocking for a random interval of ten seconds, then counting down the latch to indicate the work is done before finally reporting a message that the thread is closing as all work is done.

The task() function below implements this.

# task that counts down the latch
def task(latch, i):
    # block for a moment
    sleep(random() * 10)
    # count down the latch
    latch.count_down()
    # report done
    print(f'Thread {i} done.')

Next, in the main thread we can first create an instance of the latch class, configured to expect five arrivals.

...
# create the countdown latch
latch = CountDownLatch(5)

We can then create and start five worker threads, each configured to call our task() function via the "target" argument of the threading.Thread constructor. We will pass the latch instance as an argument as well as a unique integer for each worker thread from 0 to 4.

...
# start 5 threads
for i in range(5):
    thread = Thread(target=task, args=(latch, i))
    thread.start()

The main thread will then wait, blocking until the latch is open.

This signals that all worker threads have completed their work successfully and have reached the latch.

...
# wait for the latch to close
print('Main waiting on latch...')
latch.wait()

Finally, the main thread will report a message and terminate.

...
# latch is open, move on
print('Main done')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of using a countdown latch
from time import sleep
from random import random
from threading import Thread
from threading import Condition

# simple countdown latch, starts closed then opens once count is reached
class CountDownLatch():
    # constructor
    def __init__(self, count):
        # store the count
        self.count = count
        # control access to the count and notify when latch is open
        self.condition = Condition()

    # count down the latch by one increment
    def count_down(self):
        # acquire the lock on the condition
        with self.condition:
            # check if the latch is already open
            if self.count == 0:
                return
            # decrement the counter
            self.count -= 1
            # check if the latch is now open
            if self.count == 0:
                # notify all waiting threads that the latch is open
                self.condition.notify_all()

    # wait for the latch to open
    def wait(self):
        # acquire the lock on the condition
        with self.condition:
            # check if the latch is already open
            if self.count == 0:
                return
            # wait to be notified when the latch is open
            self.condition.wait()

# task that counts down the latch
def task(latch, i):
    # block for a moment
    sleep(random() * 10)
    # count down the latch
    latch.count_down()
    # report done
    print(f'Thread {i} done.')

# create the countdown latch
latch = CountDownLatch(5)
# start 5 threads
for i in range(5):
    thread = Thread(target=task, args=(latch, i))
    thread.start()
# wait for the latch to close
print('Main waiting on latch...')
latch.wait()
# latch is open, move on
print('Main done')

Running the example first creates the countdown latch.

Then five worker threads are created and configured and started immediately.

Each worker thread blocks for some fraction of ten seconds, triggers the latch, then reports a message that is done before terminating.

Meanwhile, the main thread blocks on the latch, waiting for each thread to complete its work and signal the fact on the latch.

Once all threads reach the latch, the latch is opened and the main thread is notified and its wait() function returns.

The main thread then carries on and reports its final message.

Main waiting on latch...
Thread 4 done.
Thread 0 done.
Thread 1 done.
Thread 2 done.
Thread 3 done.
Main done

Now that we know how to use the countdown latch, let's consider how we might simulate a similar result with a barrier.

Simulate a Latch with a Barrier

A barrier cannot be used to implement a latch.

The main reason is because the latch allows threads that arrive to carry on and to have any number of other waiting threads. Whereas the barrier requires that each thread that arrives must also wait for all other threads to arrive.

Nevertheless, if the application permits these limitations, we could simulate (something like) a countdown latch with a threading.Barrier class.

A barrier is first configured with the expected number of parties then requires that each thread that arrives calls the wait() function, and that's all there is to it.

If you are new to the barrier, you can learn more here:

We can update the example from the previous section to use a threading.Barrier instead of our custom CountdownLatch class.

First, we must update the task() function to take a threading.Barrier instance as an argument and to call the wait() function on the barrier once its work is completed.

The updated version of the task() function with these changes is listed below.

# task that does some work and waits on the barrier
def task(barrier, i):
    # block for a moment
    sleep(random() * 10)
    # wait on the barrier
    barrier.wait()
    # report done
    print(f'Thread {i} done.')

Next, we must create an instance of the threading.Barrier class configured to expect five parties, one for each worker thread, plus the main thread, e.g. 6.

...
# create the barrier
barrier = Barrier(5 + 1)

Next, we can create and configure the worker threads to take the instance of the barrier as an argument.

...
# start 5 threads
for i in range(5):
    thread = Thread(target=task, args=(barrier, i))
    thread.start()

Finally, the main thread must wait on the barrier for all worker threads to arrive.

...
# wait for the barrier to open
print('Main waiting on barrier...')
barrier.wait()

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of simulating a latch with a barrier
from time import sleep
from random import random
from threading import Thread
from threading import Barrier

# task that does some work and waits on the barrier
def task(barrier, i):
    # block for a moment
    sleep(random() * 10)
    # wait on the barrier
    barrier.wait()
    # report done
    print(f'Thread {i} done.')

# create the barrier
barrier = Barrier(5 + 1)
# start 5 threads
for i in range(5):
    thread = Thread(target=task, args=(barrier, i))
    thread.start()
# wait for the barrier to open
print('Main waiting on barrier...')
barrier.wait()
# barrier is open, move on
print('Main done')

Running the example first creates the barrier and then creates and starts the five worker threads passing the barrier instance as an argument.

Each worker thread performs its work then reaches the barrier and waits, blocking until all other worker threads and the main thread arrive at the barrier.

This keeps the worker threads alive, along with their resource requirements in memory (e.g. each thread has its own stack space and is scheduled by the operating system).

The main thread reaches the barrier and waits for all of the worker threads to complete their work.

Finally, all threads reach the barrier and are free to continue.

The worker threads print their messages and terminate. The main thread carries on and reports its own final message.

We can see that functionally, we achieved the same result as the previous section. Namely, we were able to block the main thread until the worker threads finished, at the cost of keeping the worker threads alive unnecessarily.

Main waiting on barrier...
Thread 0 done.
Main done
Thread 4 done.
Thread 3 done.
Thread 2 done.
Thread 1 done.

Now that we know how to simulate a latch with a barrier (sort of), let's explore the reverse of simulating a barrier with a latch.

Simulate a Barrier with a Latch

We can simulate a barrier using a countdown latch.

This is relatively straightforward and involves having the threads that arrive on the latch to count down also wait.

We can update the above countdown latch example to act like a barrier by having each worker thread wait on the latch, and to have the main thread also countdown the latch along with the wait.

First, we must update the latch to add an additional position for the main thread, before opening.

...
# create the countdown latch
latch = CountDownLatch(5 + 1)

Next, we can update the task() function so that each thread will wait for the latch to open after counting down.

# task that counts down the latch
def task(latch, i):
    # block for a moment
    sleep(random() * 10)
    # count down the latch
    latch.count_down()
    # wait on the latch itself
    latch.wait()
    # report done
    print(f'Thread {i} done.')

We can then update the main thread to first count down the latch, then wait, just like the worker threads.

...
# wait for the latch to close
print('Main waiting on latch...')
# count down the latch as well
latch.count_down()
# wait for all other threads
latch.wait()

The effect is that the latch will act like a barrier, mostly.

Unlike a barrier, there is a possible race condition between the last thread reaching the call to count_down() and other threads making it to their call to wait(), meaning that it is theoretically possible that not all threads are blocking on the call to wait() at the time the final thread comes through. But this is a reasonably minor difference.

It points out that if you need a barrier, use a barrier, don't simulate one with a latch.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of simulating a barrier with a latch
from time import sleep
from random import random
from threading import Thread
from threading import Condition

# simple countdown latch, starts closed then opens once count is reached
class CountDownLatch():
    # constructor
    def __init__(self, count):
        # store the count
        self.count = count
        # control access to the count and notify when latch is open
        self.condition = Condition()

    # count down the latch by one increment
    def count_down(self):
        # acquire the lock on the condition
        with self.condition:
            # check if the latch is already open
            if self.count == 0:
                return
            # decrement the counter
            self.count -= 1
            # check if the latch is now open
            if self.count == 0:
                # notify all waiting threads that the latch is open
                self.condition.notify_all()

    # wait for the latch to open
    def wait(self):
        # acquire the lock on the condition
        with self.condition:
            # check if the latch is already open
            if self.count == 0:
                return
            # wait to be notified when the latch is open
            self.condition.wait()

# task that counts down the latch
def task(latch, i):
    # block for a moment
    sleep(random() * 10)
    # count down the latch
    latch.count_down()
    # wait on the latch itself
    latch.wait()
    # report done
    print(f'Thread {i} done.')

# create the countdown latch
latch = CountDownLatch(5 + 1)
# start 5 threads
for i in range(5):
    thread = Thread(target=task, args=(latch, i))
    thread.start()
# wait for the latch to close
print('Main waiting on latch...')
# count down the latch as well
latch.count_down()
# wait for all other threads
latch.wait()
# latch is open, move on
print('Main done')

Running the example first creates the latch and then creates and starts the five worker threads.

Each thread performs its own work then hits the latch calling count_down() then waiting for all other parties.

The main thread continues on first calling count_down() then blocking waiting for the latch to open.

All worker threads reach the latch and call count_down() allowing the latch to open.

Any threads blocking on the call to wait() are then released to carry on, including the main thread. Any threads that had called count_down() but had not yet made it to wait() will call wait() and not block.

Main waiting on latch...
Thread 0 done.
Main done
Thread 2 done.
Thread 1 done.
Thread 3 done.
Thread 4 done.

Takeaways

You now know how to use a countdown latch in Python.