Last Updated on September 12, 2022
You can use a thread barrier in Python via the threading.Barrier class.
In this tutorial you will discover how to use thread barriers in Python.
Let’s get started.
Need for a Thread Barrier
A thread is a thread of execution in a computer program.
Every Python program has at least one thread of execution called the main thread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create additional threads in our program in order to execute code concurrently.
Python provides the ability to create and manage new threads via the threading module and the threading.Thread class.
You can learn more about Python threads in the guude:
In some concurrent programs we may need to coordinate action between two or more threads.
One way to achieve coordination is to have threads reach and wait upon a barrier until all threads arrive, after which an action can be performed.
What is a barrier and how can we use it for coordination between threads in Python?
Run loops using all CPUs, download your FREE book to learn how.
What is a Barrier
A barrier is a synchronization primitive.
It allows multiple threads to wait on the same barrier object instance (e.g. at the same point in code) until a predefined fixed number of threads arrive (e.g. the barrier is full), after which all threads are then notified and released to continue their execution.
Internally, a barrier maintains a count of the number of threads waiting on the barrier and a configured maximum number of parties (threads) that are expected. Once the expected number of parties reaches the pre-defined maximum, all waiting threads are notified.
This provides a useful mechanism to coordinate actions between multiple threads.
Now that we know what a barrier is, let’s look at how we might use it in Python.
How to Use a Barrier
Python provides a barrier via the threading.Barrier class.
A barrier instance must first be created and configured via the constructor specifying the number of parties (threads) that must arrive before the barrier will be lifted.
For example:
1 2 3 |
... # create a barrier barrier = threading.Barrier(10) |
We can also perform an action once all threads reach the barrier which can be specified via the “action” argument in the constructor.
This action must be a callable such as a function or a lambda that does not take any arguments and will be executed by one thread once all threads reach the barrier but before the threads are released.
1 2 3 |
... # configure a barrier with an action barrier = threading.Barrier(10, action=my_function) |
We can also set a default timeout used by all threads that reach the barrier and call the wait() function.
The default timeout can be set via the “timeout” argument in seconds in the constructor.
1 2 3 |
... # configure a barrier with a default timeout barrier = threading.Barrier(10, timeout=5) |
Once configured, the barrier instance can be shared between threads and used.
A thread can reach and wait on the barrier via the wait() function, for example:
1 2 3 |
... # wait on the barrier for all other threads to arrive barrier.wait() |
This is a blocking call and will return once all other threads (the pre-configured number of parties) have reached the barrier.
The wait function does return an integer indicating the number of parties remaining to arrive at the barrier. If a thread was the last thread to arrive, then the return value will be zero. This is helpful if you want the last thread or one thread to perform an action after the barrier is released, an alternative to using the “action” argument in the constructor.
1 2 3 4 5 6 |
... # wait on the barrier remaining = barrier.wait() # after released, check if this was the last party if remaining == 0: print('I was last...') |
A timeout can be set on the call to wait in second via the “timeout” argument. If the timeout expires before all parties reach the barrier, a BrokenBarrierError will be raised in all threads waiting on the barrier and the barrier will be marked as broken.
If a timeout is used via the “timeout” argument or the default timeout in the constructor, then all calls to the wait() function may need to handle the BrokenBarrierError.
1 2 3 4 5 6 |
... # wait on the barrier for all other threads to arrive try: barrier.wait() except BrokenBarrierError: # ... |
We can also abort the barrier.
Aborting the barrier means that all threads waiting on the barrier via the wait() function will raise a BrokenBarrierError and the barrier will be put in the broken state.
This might be helpful if you wish to cancel the coordination effort.
1 2 3 |
... # abort the barrier barrier.abort() |
A broken barrier cannot be used. Calls to wait() will raise a BrokenBarrierError.
A barrier can be fixed and made ready for use again by calling the reset() function.
This might be helpful if you cancel a coordination effort although you wish to retry it again with the same barrier instance.
1 2 3 |
... # reset a broken barrier barrier.reset() |
Finally, the status of the barrier can be checked via attributes.
- parties: reports the configured number of parties that must reach the barrier for it to be lifted.
- n_waiting: reports the current number of threads waiting on the barrier.
- broken: attribute indicates whether the barrier is currently broken or not.
Now that we know how to use the barrier in Python, let’s look at some worked examples.
Free Python Threading Course
Download your FREE threading PDF cheat sheet and get BONUS access to my free 7-day crash course on the threading API.
Discover how to use the Python threading module including how to create and start new threads and how to use a mutex locks and semaphores
Example of Using a Thread Barrier
We can explore how to use a threading.Barrier with a worked example.
In this example we will create a suite of threads, each required to perform some blocking calculation. We will use a barrier to coordinate all threads after they have finished their work and perform some action in the main thread. This is a good proxy for the types of coordination we may need to perform with a barrier.
First, let’s define a target task function to execute by each thread. The function will take the barrier as an argument as well as a unique identifier for the thread.
The thread will generate a random value between 0 and 10, block for that many seconds, report the result, then wait on the barrier for all other threads to perform their computation.
The complete target task function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # wait on all other threads to complete barrier.wait() |
Next in the main thread we can create the barrier.
We need one party for each thread we intend to create, five in this place, as well as an additional party for the main thread that will also wait for all threads to reach the barrier.
1 2 3 |
... # create a barrier barrier = Barrier(5 + 1) |
Next, we can create and start our five worker threads executing our target task() function.
1 2 3 4 5 6 |
... # create the worker threads for i in range(5): # start a new thread to perform some work worker = Thread(target=task, args=(barrier, i)) worker.start() |
Finally, the main thread can wait on the barrier for all threads to perform their calculation.
1 2 3 4 |
... # wait for all threads to finish print('Main thread waiting on all results...') barrier.wait() |
Once the threads are finished, the barrier will be lifted and the worker threads will exit and the main thread will report a message.
1 2 3 |
... # report once all threads are done print('All threads have their result') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# SuperFastPython.com # example of using a barrier from time import sleep from random import random from threading import Thread from threading import Barrier # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # wait on all other threads to complete barrier.wait() # create a barrier barrier = Barrier(5 + 1) # create the worker threads for i in range(5): # start a new thread to perform some work worker = Thread(target=task, args=(barrier, i)) worker.start() # wait for all threads to finish print('Main thread waiting on all results...') barrier.wait() # report once all threads are done print('All threads have their result') |
Running the example first creates the barrier then creates and starts the worker threads.
Each worker thread performs its calculation and then waits on the barrier for all other threads to finish.
Finally, the threads finish and are all released, including the main thread, reporting a final message.
Note, your specific results will differ given the use of random numbers. Try running the example a few times.
1 2 3 4 5 6 7 |
Main thread waiting on all results... Thread 0 done, got: 0.029404047464883787 Thread 2 done, got: 0.5464335927441033 Thread 1 done, got: 4.99178858357096 Thread 4 done, got: 6.821886004822457 Thread 3 done, got: 7.868471944670812 All threads have their result |
Next, let’s explore how we might use the barrier with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of a Barrier with a Timeout
We can use the barrier with a timeout
There are two ways to set a timeout, via the constructor to the threading.Barrier instance which will be used by default in all calls to wait() and in each call to wait() directly.
In this case, we will use a timeout argument in the wait() function.
The example from the previous section can be updated to make use of a timeout. Specifically, the main thread can wait a fixed number of seconds for all threads to finish. If all threads finish within the time, all is well, otherwise we can report that not all work could be finished on time.
When using a timeout, all calls to wait() should handle a potential BrokenBarrierError that could be raised.
First, we must update the task() function to handle the possible error.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # wait on all other threads to complete try: barrier.wait() except BrokenBarrierError: pass |
Next, we can update the main thread to set a timeout of 5 seconds on the wait for all worker threads.
Again, this must be protected with a try-except structure.
1 2 3 4 5 6 |
... try: barrier.wait(timeout=5) print('All threads have their result') except BrokenBarrierError: print('Some threads did not finish on time...') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# SuperFastPython.com # example of using a barrier with a timeout from time import sleep from random import random from threading import Thread from threading import Barrier from threading import BrokenBarrierError # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # wait on all other threads to complete try: barrier.wait() except BrokenBarrierError: pass # create a barrier barrier = Barrier(5 + 1) # create the worker threads for i in range(5): # start a new thread to perform some work worker = Thread(target=task, args=(barrier, i)) worker.start() # wait for all threads to finish print('Main thread waiting on all results...') try: barrier.wait(timeout=5) print('All threads have their result') except BrokenBarrierError: print('Some threads did not finish on time...') |
Running the example creates the barrier and starts all worker threads as before.
In this case, the main thread is impatient and will only wait 5 seconds for all worker threads to complete. Some may take longer, and this will differ with each run of the code.
On this specific run, the timeout is reached and the barrier is broken. All waiting worker threads raise a BrokenBarrierError, which is ignored and the threads terminate. All worker threads not yet at the barrier will reach the barrier, raise a BrokenBarrierError and terminate.
The main thread raises a BrokenBarrierError in the call to wait() and reports that some worker threads did not finish within the timeout.
Note, your specific results will differ given the use of random numbers. Try running the example a few times.
1 2 3 4 5 6 7 |
Main thread waiting on all results... Thread 4 done, got: 0.1367718061220924 Thread 0 done, got: 0.5802960386396927 Thread 3 done, got: 0.9069710246433671 Thread 2 done, got: 1.8471350353109695 Some threads did not finish on time... Thread 1 done, got: 5.525649829682911 |
Next, let’s look at how we might use a barrier with an action.
Example of a Barrier with an Action
We can trigger an action once all parties reach the barrier.
This can be achieved by setting the “action” argument to a callable in the threading.Barrier constructor.
The callable could be a lambda or a function with no arguments.
In this case, we can update our first example to create a new function that reports that all worker threads have finished their calculation.
First, we can define our new action function.
1 2 3 4 |
# action once all threads reach the barrier def report(): # report once all threads are done print('All threads have their result') |
Next, we can configure the new barrier instance to call the report() function once all parties reach the barrier.
Because the main thread no longer needs to wait on the barrier, we can reduce the number of expected parties to reach the barrier to five, to match the five worker threads.
1 2 3 |
... # create a barrier barrier = Barrier(5, action=report) |
And that’s about it. The main thread no does little after starting the worker threads other than wait around for the threads to complete.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of using a barrier with an action from time import sleep from random import random from threading import Thread from threading import Barrier # action once all threads reach the barrier def report(): # report once all threads are done print('All threads have their result') # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # wait on all other threads to complete barrier.wait() # create a barrier barrier = Barrier(5, action=report) # create the worker threads for i in range(5): # start a new thread to perform some work worker = Thread(target=task, args=(barrier, i)) worker.start() # wait for all threads to finish... |
Running the example creates the barrier with the configured action.
The five worker threads are created and started, performing their calculation and reaching the barrier.
Once all threads reach the barrier, the barrier ensures that the action is triggered by one of the worker threads, calling our configured report() function once.
This is a cleaner solution (less code) for having an action performed after the barrier is lifted, compared to attempting the same thing in the main thread in the first example.
1 2 3 4 5 6 |
Thread 4 done, got: 0.13262406779643077 Thread 1 done, got: 2.4520892805134773 Thread 3 done, got: 6.359683121050146 Thread 0 done, got: 6.514003577189086 Thread 2 done, got: 7.654282923005564 All threads have their result |
Next, let’s take a look at how we might abort a barrier.
Example of Aborting a Barrier
We might want to abort the coordination of threads on the barrier for some reason.
This might be because one of the threads is unable to perform its required task.
We can abort the barrier by calling the abort() function, this will cause all threads waiting on the barrier to raise a BrokenBarrierError and all new callers to wait() to raise the same error.
This means that all calls to wait() should be protected by a try-except structure.
We can update our first example of using the barrier to support aborting the barrier. We will add a check in the thread processing task that if a value larger than 8 is encountered about the coordination effort, otherwise proceed. This means sometimes all threads will coordinate and sometimes not, depending on the specific random numbers generated.
First, we can update the task() function to check the generated result and to abort the task if needed, otherwise wait on the barrier.
1 2 3 4 5 6 7 8 9 10 11 |
... # check if the result was "bad" if value > 8: print(f'Thread {number} aborting...') barrier.abort() else: # wait on all other threads to complete try: barrier.wait() except BrokenBarrierError: pass |
The updated version of the task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # check if the result was "bad" if value > 8: print(f'Thread {number} aborting...') barrier.abort() else: # wait on all other threads to complete try: barrier.wait() except BrokenBarrierError: pass |
Next, we can update the main thread to protect the call to wait() and if a BrokenBarrierError is encountered, to report that at least one thread aborted the coordination.
1 2 3 4 5 6 |
... try: barrier.wait() print('All threads have their result') except BrokenBarrierError: print('At least one thread aborted due to bad results.') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# SuperFastPython.com # example of aborting a barrier from time import sleep from random import random from threading import Thread from threading import Barrier from threading import BrokenBarrierError # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Thread {number} done, got: {value}') # check if the result was "bad" if value > 8: print(f'Thread {number} aborting...') barrier.abort() else: # wait on all other threads to complete try: barrier.wait() except BrokenBarrierError: pass # create a barrier barrier = Barrier(5 + 1) # create the worker threads for i in range(5): # start a new thread to perform some work worker = Thread(target=task, args=(barrier, i)) worker.start() # wait for all threads to finish print('Main thread waiting on all results...') try: barrier.wait() print('All threads have their result') except BrokenBarrierError: print('At least one thread aborted due to bad results.') |
Running the example creates the barrier, then creates and starts the worker threads.
Each thread performs its processing and conditionally tries to abort or wait on the barrier, depending on the specific random number that was generated.
In this case, four of the five numbers were good, but the fifth resulted in the barrier being aborted and a message in the main thread reporting this fact.
1 2 3 4 5 6 7 8 |
Main thread waiting on all results... Thread 1 done, got: 1.3247704495942991 Thread 0 done, got: 2.0519554057841036 Thread 3 done, got: 6.191573105008992 Thread 2 done, got: 7.568670532887958 Thread 4 done, got: 9.483464923379831 Thread 4 aborting... At least one thread aborted due to bad results. |
Further Reading
This section provides additional resources that you may find helpful.
Python Threading Books
- Python Threading Jump-Start, Jason Brownlee (my book!)
- Threading API Interview Questions
- Threading Module API Cheat Sheet
I also recommend specific chapters in the following books:
- Python Cookbook, David Beazley and Brian Jones, 2013.
- See: Chapter 12: Concurrency
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Threading: The Complete Guide
- Python ThreadPoolExecutor: The Complete Guide
- Python ThreadPool: The Complete Guide
APIs
References
Takeaways
You now know how to use threading.Barrier thread barriers in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Joshua Reddekopp on Unsplash
Do you have any questions?