Threading Barrier in Python
You can use a thread barrier in Python via the threading.Barrier class.
In this tutorial you will discover how to use thread barriers in Python.
Let's get started.
Need for a Thread Barrier
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guude:
In some concurrent programs we may need to coordinate action between two or more threads.
One way to achieve coordination is to have threads reach and wait upon a barrier until all threads arrive, after which an action can be performed.
What is a barrier and how can we use it for coordination between threads in Python?
What is a Barrier
A barrier is a synchronization primitive.
It allows multiple threads to wait on the same barrier object instance (e.g. at the same point in code) until a predefined fixed number of threads arrive (e.g. the barrier is full), after which all threads are then notified and released to continue their execution.
Internally, a barrier maintains a count of the number of threads waiting on the barrier and a configured maximum number of parties (threads) that are expected. Once the expected number of parties reaches the pre-defined maximum, all waiting threads are notified.
This provides a useful mechanism to coordinate actions between multiple threads.
Now that we know what a barrier is, let's look at how we might use it in Python.
How to Use a Barrier
Python provides a barrier via the threading.Barrier class.
A barrier instance must first be created and configured via the constructor specifying the number of parties (threads) that must arrive before the barrier will be lifted.
For example:
...
# create a barrier
barrier = threading.Barrier(10)
We can also perform an action once all threads reach the barrier which can be specified via the "action" argument in the constructor.
This action must be a callable such as a function or a lambda that does not take any arguments and will be executed by one thread once all threads reach the barrier but before the threads are released.
...
# configure a barrier with an action
barrier = threading.Barrier(10, action=my_function)
We can also set a default timeout used by all threads that reach the barrier and call the wait() function.
The default timeout can be set via the "timeout" argument in seconds in the constructor.
...
# configure a barrier with a default timeout
barrier = threading.Barrier(10, timeout=5)
Once configured, the barrier instance can be shared between threads and used.
A thread can reach and wait on the barrier via the wait() function, for example:
...
# wait on the barrier for all other threads to arrive
barrier.wait()
This is a blocking call and will return once all other threads (the pre-configured number of parties) have reached the barrier.
The wait function does return an integer indicating the number of parties remaining to arrive at the barrier. If a thread was the last thread to arrive, then the return value will be zero. This is helpful if you want the last thread or one thread to perform an action after the barrier is released, an alternative to using the "action" argument in the constructor.
...
# wait on the barrier
remaining = barrier.wait()
# after released, check if this was the last party
if remaining == 0:
print('I was last...')
A timeout can be set on the call to wait in second via the "timeout" argument. If the timeout expires before all parties reach the barrier, a BrokenBarrierError will be raised in all threads waiting on the barrier and the barrier will be marked as broken.
If a timeout is used via the "timeout" argument or the default timeout in the constructor, then all calls to the wait() function may need to handle the BrokenBarrierError.
...
# wait on the barrier for all other threads to arrive
try:
barrier.wait()
except BrokenBarrierError:
# ...
We can also abort the barrier.
Aborting the barrier means that all threads waiting on the barrier via the wait() function will raise a BrokenBarrierError and the barrier will be put in the broken state.
This might be helpful if you wish to cancel the coordination effort.
...
# abort the barrier
barrier.abort()
A broken barrier cannot be used. Calls to wait() will raise a BrokenBarrierError.
A barrier can be fixed and made ready for use again by calling the reset() function.
This might be helpful if you cancel a coordination effort although you wish to retry it again with the same barrier instance.
...
# reset a broken barrier
barrier.reset()
Finally, the status of the barrier can be checked via attributes.
- parties: reports the configured number of parties that must reach the barrier for it to be lifted.
- n_waiting: reports the current number of threads waiting on the barrier.
- broken: attribute indicates whether the barrier is currently broken or not.
Now that we know how to use the barrier in Python, let's look at some worked examples.
Example of Using a Thread Barrier
We can explore how to use a threading.Barrier with a worked example.
In this example we will create a suite of threads, each required to perform some blocking calculation. We will use a barrier to coordinate all threads after they have finished their work and perform some action in the main thread. This is a good proxy for the types of coordination we may need to perform with a barrier.
First, let's define a target task function to execute by each thread. The function will take the barrier as an argument as well as a unique identifier for the thread.
The thread will generate a random value between 0 and 10, block for that many seconds, report the result, then wait on the barrier for all other threads to perform their computation.
The complete target task function is listed below.
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# wait on all other threads to complete
barrier.wait()
Next in the main thread we can create the barrier.
We need one party for each thread we intend to create, five in this place, as well as an additional party for the main thread that will also wait for all threads to reach the barrier.
...
# create a barrier
barrier = Barrier(5 + 1)
Next, we can create and start our five worker threads executing our target task() function.
...
# create the worker threads
for i in range(5):
# start a new thread to perform some work
worker = Thread(target=task, args=(barrier, i))
worker.start()
Finally, the main thread can wait on the barrier for all threads to perform their calculation.
...
# wait for all threads to finish
print('Main thread waiting on all results...')
barrier.wait()
Once the threads are finished, the barrier will be lifted and the worker threads will exit and the main thread will report a message.
...
# report once all threads are done
print('All threads have their result')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a barrier
from time import sleep
from random import random
from threading import Thread
from threading import Barrier
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# wait on all other threads to complete
barrier.wait()
# create a barrier
barrier = Barrier(5 + 1)
# create the worker threads
for i in range(5):
# start a new thread to perform some work
worker = Thread(target=task, args=(barrier, i))
worker.start()
# wait for all threads to finish
print('Main thread waiting on all results...')
barrier.wait()
# report once all threads are done
print('All threads have their result')
Running the example first creates the barrier then creates and starts the worker threads.
Each worker thread performs its calculation and then waits on the barrier for all other threads to finish.
Finally, the threads finish and are all released, including the main thread, reporting a final message.
Note, your specific results will differ given the use of random numbers. Try running the example a few times.
Main thread waiting on all results...
Thread 0 done, got: 0.029404047464883787
Thread 2 done, got: 0.5464335927441033
Thread 1 done, got: 4.99178858357096
Thread 4 done, got: 6.821886004822457
Thread 3 done, got: 7.868471944670812
All threads have their result
Next, let's explore how we might use the barrier with a timeout.
Example of a Barrier with a Timeout
We can use the barrier with a timeout
There are two ways to set a timeout, via the constructor to the threading.Barrier instance which will be used by default in all calls to wait() and in each call to wait() directly.
In this case, we will use a timeout argument in the wait() function.
The example from the previous section can be updated to make use of a timeout. Specifically, the main thread can wait a fixed number of seconds for all threads to finish. If all threads finish within the time, all is well, otherwise we can report that not all work could be finished on time.
When using a timeout, all calls to wait() should handle a potential BrokenBarrierError that could be raised.
First, we must update the task() function to handle the possible error.
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# wait on all other threads to complete
try:
barrier.wait()
except BrokenBarrierError:
pass
Next, we can update the main thread to set a timeout of 5 seconds on the wait for all worker threads.
Again, this must be protected with a try-except structure.
...
try:
barrier.wait(timeout=5)
print('All threads have their result')
except BrokenBarrierError:
print('Some threads did not finish on time...')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a barrier with a timeout
from time import sleep
from random import random
from threading import Thread
from threading import Barrier
from threading import BrokenBarrierError
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# wait on all other threads to complete
try:
barrier.wait()
except BrokenBarrierError:
pass
# create a barrier
barrier = Barrier(5 + 1)
# create the worker threads
for i in range(5):
# start a new thread to perform some work
worker = Thread(target=task, args=(barrier, i))
worker.start()
# wait for all threads to finish
print('Main thread waiting on all results...')
try:
barrier.wait(timeout=5)
print('All threads have their result')
except BrokenBarrierError:
print('Some threads did not finish on time...')
Running the example creates the barrier and starts all worker threads as before.
In this case, the main thread is impatient and will only wait 5 seconds for all worker threads to complete. Some may take longer, and this will differ with each run of the code.
On this specific run, the timeout is reached and the barrier is broken. All waiting worker threads raise a BrokenBarrierError, which is ignored and the threads terminate. All worker threads not yet at the barrier will reach the barrier, raise a BrokenBarrierError and terminate.
The main thread raises a BrokenBarrierError in the call to wait() and reports that some worker threads did not finish within the timeout.
Note, your specific results will differ given the use of random numbers. Try running the example a few times.
Main thread waiting on all results...
Thread 4 done, got: 0.1367718061220924
Thread 0 done, got: 0.5802960386396927
Thread 3 done, got: 0.9069710246433671
Thread 2 done, got: 1.8471350353109695
Some threads did not finish on time...
Thread 1 done, got: 5.525649829682911
Next, let's look at how we might use a barrier with an action.
Example of a Barrier with an Action
We can trigger an action once all parties reach the barrier.
This can be achieved by setting the "action" argument to a callable in the threading.Barrier constructor.
The callable could be a lambda or a function with no arguments.
In this case, we can update our first example to create a new function that reports that all worker threads have finished their calculation.
First, we can define our new action function.
# action once all threads reach the barrier
def report():
# report once all threads are done
print('All threads have their result')
Next, we can configure the new barrier instance to call the report() function once all parties reach the barrier.
Because the main thread no longer needs to wait on the barrier, we can reduce the number of expected parties to reach the barrier to five, to match the five worker threads.
...
# create a barrier
barrier = Barrier(5, action=report)
And that's about it. The main thread no does little after starting the worker threads other than wait around for the threads to complete.
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of using a barrier with an action
from time import sleep
from random import random
from threading import Thread
from threading import Barrier
# action once all threads reach the barrier
def report():
# report once all threads are done
print('All threads have their result')
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# wait on all other threads to complete
barrier.wait()
# create a barrier
barrier = Barrier(5, action=report)
# create the worker threads
for i in range(5):
# start a new thread to perform some work
worker = Thread(target=task, args=(barrier, i))
worker.start()
# wait for all threads to finish...
Running the example creates the barrier with the configured action.
The five worker threads are created and started, performing their calculation and reaching the barrier.
Once all threads reach the barrier, the barrier ensures that the action is triggered by one of the worker threads, calling our configured report() function once.
This is a cleaner solution (less code) for having an action performed after the barrier is lifted, compared to attempting the same thing in the main thread in the first example.
Thread 4 done, got: 0.13262406779643077
Thread 1 done, got: 2.4520892805134773
Thread 3 done, got: 6.359683121050146
Thread 0 done, got: 6.514003577189086
Thread 2 done, got: 7.654282923005564
All threads have their result
Next, let's take a look at how we might abort a barrier.
Example of Aborting a Barrier
We might want to abort the coordination of threads on the barrier for some reason.
This might be because one of the threads is unable to perform its required task.
We can abort the barrier by calling the abort() function, this will cause all threads waiting on the barrier to raise a BrokenBarrierError and all new callers to wait() to raise the same error.
This means that all calls to wait() should be protected by a try-except structure.
We can update our first example of using the barrier to support aborting the barrier. We will add a check in the thread processing task that if a value larger than 8 is encountered about the coordination effort, otherwise proceed. This means sometimes all threads will coordinate and sometimes not, depending on the specific random numbers generated.
First, we can update the task() function to check the generated result and to abort the task if needed, otherwise wait on the barrier.
...
# check if the result was "bad"
if value > 8:
print(f'Thread {number} aborting...')
barrier.abort()
else:
# wait on all other threads to complete
try:
barrier.wait()
except BrokenBarrierError:
pass
The updated version of the task() function with this change is listed below.
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# check if the result was "bad"
if value > 8:
print(f'Thread {number} aborting...')
barrier.abort()
else:
# wait on all other threads to complete
try:
barrier.wait()
except BrokenBarrierError:
pass
Next, we can update the main thread to protect the call to wait() and if a BrokenBarrierError is encountered, to report that at least one thread aborted the coordination.
...
try:
barrier.wait()
print('All threads have their result')
except BrokenBarrierError:
print('At least one thread aborted due to bad results.')
Tying this together, the complete example is listed below.
# SuperFastPython.com
# example of aborting a barrier
from time import sleep
from random import random
from threading import Thread
from threading import Barrier
from threading import BrokenBarrierError
# target function to prepare some work
def task(barrier, number):
# generate a unique value
value = random() * 10
# block for a moment
sleep(value)
# report result
print(f'Thread {number} done, got: {value}')
# check if the result was "bad"
if value > 8:
print(f'Thread {number} aborting...')
barrier.abort()
else:
# wait on all other threads to complete
try:
barrier.wait()
except BrokenBarrierError:
pass
# create a barrier
barrier = Barrier(5 + 1)
# create the worker threads
for i in range(5):
# start a new thread to perform some work
worker = Thread(target=task, args=(barrier, i))
worker.start()
# wait for all threads to finish
print('Main thread waiting on all results...')
try:
barrier.wait()
print('All threads have their result')
except BrokenBarrierError:
print('At least one thread aborted due to bad results.')
Running the example creates the barrier, then creates and starts the worker threads.
Each thread performs its processing and conditionally tries to abort or wait on the barrier, depending on the specific random number that was generated.
In this case, four of the five numbers were good, but the fifth resulted in the barrier being aborted and a message in the main thread reporting this fact.
Main thread waiting on all results...
Thread 1 done, got: 1.3247704495942991
Thread 0 done, got: 2.0519554057841036
Thread 3 done, got: 6.191573105008992
Thread 2 done, got: 7.568670532887958
Thread 4 done, got: 9.483464923379831
Thread 4 aborting...
At least one thread aborted due to bad results.
Takeaways
You now know how to use threading.Barrier thread barriers in Python.
If you enjoyed this tutorial, you will love my book: Python Threading Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.