Last Updated on September 12, 2022
You can use a barrier with processes via the multiprocessing.Barrier class.
In this tutorial you will discover how to coordinate process with barriers in Python.
Let’s get started.
Need for a Process Barrier
A process is a running instance of a computer program.
Every Python program is executed in a Process, which is a new instance of the Python interpreter. This process has the name MainProcess and has one thread used to execute the program instructions called the MainThread. Both processes and threads are created and managed by the underlying operating system.
Sometimes we may need to create new child processes in our program in order to execute code concurrently.
Python provides the ability to create and manage new processes via the multiprocessing.Process class.
You can learn more about multiprocessing in the tutorial:
In some concurrent programs we may need to coordinate action between two or more processes.
One way to achieve coordination is to have process reach and wait upon a barrier until all processes arrive, after which an action can be performed.
What is a barrier and how can we use it for coordination between processes in Python?
Run loops using all CPUs, download your FREE book to learn how.
What is a Barrier
A barrier is a synchronization primitive.
It allows multiple processes (or threads) to wait on the same barrier object instance (e.g. at the same point in code) until a predefined fixed number of processes arrive (e.g. the barrier is full), after which all processes are then notified and released to continue their execution.
Internally, a barrier maintains a count of the number of processes waiting on the barrier and a configured maximum number of parties (processes) that are expected. Once the expected number of parties reaches the pre-defined maximum, all waiting processes are notified.
This provides a useful mechanism to coordinate actions between multiple processes.
Now that we know what a barrier is, let’s look at how we might use it in Python.
How to Use a Barrier
Python provides a barrier via the multiprocessing.Barrier class.
A barrier instance must first be created and configured via the constructor specifying the number of parties (processes) that must arrive before the barrier will be lifted.
For example:
1 2 3 |
... # create a barrier barrier = multiprocessing.Barrier(10) |
We can also perform an action once all processes reach the barrier which can be specified via the “action” argument in the constructor.
This action must be a callable such as a function or a lambda that does not take any arguments and will be executed by one process once all processes reach the barrier but before the processes are released.
1 2 3 |
... # configure a barrier with an action barrier = multiprocessing.Barrier(10, action=my_function) |
We can also set a default timeout used by all processes that reach the barrier and call the wait() function.
The default timeout can be set via the “timeout” argument in seconds in the constructor.
1 2 3 |
... # configure a barrier with a default timeout barrier = multiprocessing.Barrier(10, timeout=5) |
Once configured, the barrier instance can be shared between processes and used.
A process can reach and wait on the barrier via the wait() function, for example:
1 2 3 |
... # wait on the barrier for all other processes to arrive barrier.wait() |
This is a blocking call and will return once all other processes (the pre-configured number of parties) have reached the barrier.
The wait function returns an integer indicating the number of parties remaining to arrive at the barrier. If a process was the last to arrive, then the return value will be zero. This is helpful if you want the last process or one process to perform an action after the barrier is released, an alternative to using the “action” argument in the constructor.
1 2 3 4 5 6 |
... # wait on the barrier remaining = barrier.wait() # after released, check if this was the last party if remaining == 0: print('I was last...') |
A timeout can be set on the call to wait in second via the “timeout” argument. If the timeout expires before all parties reach the barrier, a BrokenBarrierError will be raised in all processes waiting on the barrier and the barrier will be marked as broken.
If a timeout is used via the “timeout” argument or the default timeout in the constructor, then all calls to the wait() function may need to handle the BrokenBarrierError.
1 2 3 4 5 6 |
... # wait on the barrier for all other processes to arrive try: barrier.wait() except BrokenBarrierError: # ... |
We can also abort the barrier.
Aborting the barrier means that all processes waiting on the barrier via the wait() function will raise a BrokenBarrierError and the barrier will be put in the broken state.
This might be helpful if you wish to cancel the coordination effort.
1 2 3 |
... # abort the barrier barrier.abort() |
A broken barrier cannot be used. Calls to wait() will raise a BrokenBarrierError.
A barrier can be fixed and made ready for use again by calling the reset() function.
This might be helpful if you cancel a coordination effort although you wish to retry it again with the same barrier instance.
1 2 3 |
... # reset a broken barrier barrier.reset() |
Finally, the status of the barrier can be checked via attributes.
- parties: reports the configured number of parties that must reach the barrier for it to be lifted.
- n_waiting: reports the current number of processes waiting on the barrier.
- broken: attribute indicates whether the barrier is currently broken or not.
The barrier can also be used to coordinate threads, you learn more in the tutorial:
Now that we know how to use the barrier in Python, let’s look at some worked examples.
Free Python Multiprocessing Course
Download your FREE multiprocessing PDF cheat sheet and get BONUS access to my free 7-day crash course on the multiprocessing API.
Discover how to use the Python multiprocessing module including how to create and start child processes and how to use a mutex locks and semaphores.
Example of Using a Process Barrier
We can explore how to use a multiprocessing.Barrier with a worked example.
In this example we will create a suite of processes, each required to perform some blocking calculation. We will use a barrier to coordinate all processes after they have finished their work and perform some action in the main process. This is a good proxy for the types of coordination we may need to perform with a barrier.
First, let’s define a target task function to execute by each process. The function will take the barrier as an argument as well as a unique identifier for the process.
The process will generate a random value between 0 and 10, block for that many seconds, report the result, then wait on the barrier for all other processes to perform their computation.
The complete target task function is listed below.
1 2 3 4 5 6 7 8 9 10 |
# target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # wait on all other processes to complete barrier.wait() |
Next in the main process we can create the barrier.
We need one party for each process we intend to create, five in this place, as well as an additional party for the main process that will also wait for all processes to reach the barrier.
1 2 3 |
... # create a barrier barrier = Barrier(5 + 1) |
Next, we can create and start our five child processes executing our target task() function.
1 2 3 4 5 6 |
... # create the worker processes for i in range(5): # start a new process to perform some work worker = Process(target=task, args=(barrier, i)) worker.start() |
Finally, the main process can wait on the barrier for all processes to perform their calculation.
1 2 3 4 |
... # wait for all processes to finish print('Main process waiting on all results...') barrier.wait() |
Once the processes are finished, the barrier will be lifted and the worker processes will exit and the main process will report a message.
1 2 3 |
... # report once all processes are done print('All processes have their result') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of using a barrier with processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Barrier # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # wait on all other processes to complete barrier.wait() # entry point if __name__ == '__main__': # create a barrier barrier = Barrier(5 + 1) # create the worker processes for i in range(5): # start a new process to perform some work worker = Process(target=task, args=(barrier, i)) worker.start() # wait for all processes to finish print('Main process waiting on all results...') barrier.wait() # report once all processes are done print('All processes have their result') |
Running the example first creates the barrier then creates and starts the worker processes.
Each worker process performs its calculation and then waits on the barrier for all other processes to finish.
Finally, the processes finish and are all released, including the main process, reporting a final message.
Note, your specific results will differ given the use of random numbers. Try running the example a few times.
1 2 3 4 5 6 7 |
Main process waiting on all results... Process 3 done, got: 0.19554467220314398 Process 4 done, got: 0.345718981628913 Process 2 done, got: 2.37158455232798 Process 1 done, got: 5.970308025592141 Process 0 done, got: 7.102904442921531 All processes have their result |
Next, let’s explore how we might use the barrier with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of a Barrier with a Timeout
We can use the barrier with a timeout.
There are two ways to set a timeout, via the constructor to the multiprocessing.Barrier instance which will be used by default in all calls to wait() and in each call to wait() directly.
In this case, we will use a timeout argument in the wait() function.
The example from the previous section can be updated to make use of a timeout. Specifically, the main process can wait a fixed number of seconds for all processes to finish. If all processes finish within the time, all is well, otherwise we can report that not all work could be finished on time.
When using a timeout, all calls to wait() should handle a potential BrokenBarrierError that could be raised. Note, the BrokenBarrierError is provided by the threading module, not the multiprocessing module.
First, we must update the task() function to handle the possible error.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
# target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # wait on all other processes to complete try: barrier.wait() except BrokenBarrierError: pass |
Next, we can update the main process to set a timeout of 5 seconds on the wait for all child processes.
Again, this must be protected with a try-except pattern.
1 2 3 4 5 6 |
... try: barrier.wait(timeout=5) print('All processes have their result') except BrokenBarrierError: print('Some processes did not finish on time...') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# SuperFastPython.com # example of using a barrier for processes with a timeout from time import sleep from random import random from multiprocessing import Process from multiprocessing import Barrier from threading import BrokenBarrierError # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # wait on all other processes to complete try: barrier.wait() except BrokenBarrierError: pass # entry point if __name__ == '__main__': # create a barrier barrier = Barrier(5 + 1) # create the worker processes for i in range(5): # start a new process to perform some work worker = Process(target=task, args=(barrier, i)) worker.start() # wait for all processes to finish print('Main process waiting on all results...') try: barrier.wait(timeout=5) print('All processes have their result') except BrokenBarrierError: print('Some processes did not finish on time...') |
Running the example creates the barrier and starts all child processes as before.
In this case, the main process is impatient and will only wait 5 seconds for all child processes to complete. Some may take longer, and this will differ with each run of the code.
On this specific run, the timeout is reached and the barrier is broken. All waiting child processes raise a BrokenBarrierError, which is ignored and the processes terminate. All child processes not yet at the barrier will reach the barrier, raise a BrokenBarrierError and terminate.
The main process raises a BrokenBarrierError in the call to wait() and reports that some child processes did not finish within the timeout.
Note, your specific results will differ given the use of random numbers. Try running the example a few times.
1 2 3 4 5 6 7 |
Main process waiting on all results... Process 3 done, got: 0.9150896527633812 Process 0 done, got: 0.9839288078418307 Process 4 done, got: 1.0007833215409345 Some processes did not finish on time... Process 1 done, got: 7.019920656936832 Process 2 done, got: 7.7056666005574765 |
Next, let’s look at how we might use a barrier with an action.
Example of a Barrier with an Action
We can trigger an action once all parties reach the barrier.
This can be achieved by setting the “action” argument to a callable in the multiprocessing.Barrier constructor.
The callable could be a lambda or a function with no arguments.
In this case, we can update our first example to create a new function that reports that all worker processes have finished their calculation.
First, we can define our new action function.
1 2 3 4 |
# action once all processes reach the barrier def report(): # report once all processes are done print('All processes have their result', flush=True) |
Next, we can configure the new barrier instance to call the report() function once all parties reach the barrier.
Because the main process no longer needs to wait on the barrier, we can reduce the number of expected parties to reach the barrier to five, to match the five child processes.
1 2 3 |
... # create a barrier barrier = Barrier(5, action=report) |
And that’s about it. The main process no does little after starting the child processes other than wait around for the processes to complete.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# SuperFastPython.com # example of using a barrier with an action for processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Barrier # action once all processes reach the barrier def report(): # report once all processes are done print('All processes have their result', flush=True) # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # wait on all other processes to complete barrier.wait() # entry point if __name__ == '__main__': # create a barrier barrier = Barrier(5, action=report) # create the worker processes processes = [Process(target=task, args=(barrier, i)) for i in range(5)] # start all process for process in processes: process.start() # wait for all processes to finish for process in processes: process.join() |
Running the example creates the barrier with the configured action.
The five child processes are created and started, performing their calculation and reaching the barrier.
Once all processes reach the barrier, the barrier ensures that the action is triggered by one of the child processes, calling our configured report() function once.
This is a cleaner solution (less code) for having an action performed after the barrier is lifted, compared to attempting the same thing in the main process in the first example above.
1 2 3 4 5 6 |
Process 2 done, got: 3.344926296473795 Process 0 done, got: 3.8960887458034623 Process 4 done, got: 3.9533264013923763 Process 1 done, got: 7.645186958429663 Process 3 done, got: 9.477151114013973 All processes have their result |
Next, let’s take a look at how we might abort a barrier.
Example of Aborting a Barrier
We might want to abort the coordination of processes on the barrier for some reason.
This might be because one of the processes is unable to perform its required task.
We can abort the barrier by calling the abort() function, this will cause all processes waiting on the barrier to raise a BrokenBarrierError and all new callers to wait() to raise the same error.
This means that all calls to wait() should be protected by a try-except structure.
We can update our first example of using the barrier to support aborting the barrier. We will add a check in the task() function if a value larger than 8 is encountered about the coordination effort, otherwise proceed. This means sometimes all processes will coordinate and sometimes not, depending on the specific random numbers generated.
First, we can update the task() function to check the generated result and to abort the task if needed, otherwise wait on the barrier.
1 2 3 4 5 6 7 8 9 10 11 |
... # check if the result was "bad" if value > 8: print(f'Process {number} aborting...', flush=True) barrier.abort() else: # wait on all other processes to complete try: barrier.wait() except BrokenBarrierError: pass |
The updated version of the task() function with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # check if the result was "bad" if value > 8: print(f'Process {number} aborting...', flush=True) barrier.abort() else: # wait on all other processes to complete try: barrier.wait() except BrokenBarrierError: pass |
Next, we can update the main process to protect the call to wait() and if a BrokenBarrierError is encountered, to report that at least one process aborted the coordination.
1 2 3 4 5 6 |
... try: barrier.wait() print('All processes have their result') except BrokenBarrierError: print('At least one process aborted due to bad results.') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# SuperFastPython.com # example of aborting a barrier for processes from time import sleep from random import random from multiprocessing import Process from multiprocessing import Barrier from threading import BrokenBarrierError # target function to prepare some work def task(barrier, number): # generate a unique value value = random() * 10 # block for a moment sleep(value) # report result print(f'Process {number} done, got: {value}', flush=True) # check if the result was "bad" if value > 8: print(f'Process {number} aborting...', flush=True) barrier.abort() else: # wait on all other processes to complete try: barrier.wait() except BrokenBarrierError: pass # entry point if __name__ == '__main__': # create a barrier barrier = Barrier(5 + 1) # create the worker processes for i in range(5): # start a new process to perform some work worker = Process(target=task, args=(barrier, i)) worker.start() # wait for all processes to finish print('Main process waiting on all results...') try: barrier.wait() print('All processes have their result') except BrokenBarrierError: print('At least one process aborted due to bad results.') |
Running the example creates the barrier, then creates and starts the child processes.
Each process performs its work and conditionally tries to abort or wait on the barrier, depending on the specific random number that was generated.
In this case, four of the five numbers were good, but the fifth resulted in the barrier being aborted and a message in the main process reporting this fact.
1 2 3 4 5 6 7 8 |
Main process waiting on all results... Process 4 done, got: 2.062864720794101 Process 3 done, got: 2.4440527125499445 Process 0 done, got: 3.429395148713464 Process 2 done, got: 3.9549314961996718 Process 1 done, got: 8.702659226034747 Process 1 aborting... At least one process aborted due to bad results. |
Further Reading
This section provides additional resources that you may find helpful.
Python Multiprocessing Books
- Python Multiprocessing Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Multiprocessing API Cheat Sheet
I would also recommend specific chapters in the books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing: The Complete Guide
- Python Multiprocessing Pool: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Takeaways
You now know how to use multiprocessing.Barrier to coordinate processes in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?