An asyncio barrier is a synchronization primitive, like a sempahore and a mutex lock.
It is used to coordinate the behavior of concurrent tasks at one point, on the barrier itself. Once all expected parties arrive at the barrier, the barrier is lifted and all waiting tasks can resume their activity.
In this tutorial, you will discover how to use the barrier in asyncio programs.
After completing this tutorial, you will know:
- What is a barrier and what role does it play in concurrent programming.
- How to use a barrier to coordinate the behavior of tasks in asyncio programs.
- How to use barriers with a final action and a timeout.
Let’s get started.
Need to Coordinate Coroutines
In asyncio programs, sometimes we need to coordinate the behavior of multiple coroutines.
If there is a coordinating task or coroutine, then the controller can wait for all tasks to be done using asyncio.gather() or asyncio.wait().
Nevertheless, there are situations where we don’t want to wait for the coroutines to be done, but instead, reach a point of coordination.
Alternatively, we may need to coordinate the coroutines without a controller that is aware of all coroutines and therefore is not able to gather or wait for them all.
How can we coordinate the behavior of many coroutines?
Run loops using all CPUs, download your FREE book to learn how.
What is a Barrier
A barrier is a synchronization primitive.
It is typically defined in terms of threads, although the same mechanism can be used with processes and coroutines.
It allows multiple threads to wait on the same barrier object instance (e.g. at the same point in code) until a predefined fixed number of threads arrive (e.g. the barrier is full), after which all threads are then notified and released to continue their execution.
A barrier for a group of threads or processes in the source code means any thread/process must stop at this point and cannot proceed until all other threads/processes reach this barrier.
— Barrier (computer science), Wikipedia.
Internally, a barrier maintains a count of the number of threads waiting on the barrier and a configured maximum number of parties (threads) that are expected. Once the expected number of parties reaches the pre-defined maximum, all waiting threads are notified.
This provides a useful mechanism to coordinate actions between multiple threads.
You can learn more about a barrier for threads in the tutorial:
You can learn more about a barrier for processes in the tutorial:
Now that we know what a barrier is, let’s look at how we might use it in asyncio programs.
How To Use the Asyncio Barrier
Python provides an asyncio barrier via the asyncio.Barrier class.
The asyncio barrier was introduced in Python 3.11 in 2023.
Added the Barrier class to the synchronization primitives in the asyncio library, and the related BrokenBarrierError exception.
— What’s New In Python 3.11
A barrier instance must first be created and configured via the constructor specifying the number of parties (coroutines) that must arrive before the barrier will be lifted.
For example:
1 2 3 |
... # create a barrier barrier = asyncio.Barrier(10) |
Once configured, the barrier instance can be shared between coroutines and used.
A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.
— Synchronization Primitives
A coroutine can reach and wait on the barrier via the wait() method which must be awaited, for example:
1 2 3 |
... # wait on all other tasks to complete await barrier.wait() |
This will suspend the caller until all parties reach and wait on the barrier.
The wait function returns an integer indicating the number of parties remaining to arrive at the barrier.
If a coroutine was the last coroutine to arrive, then the return value will be zero. This is helpful if you want the last coroutine or one coroutine to perform an action after the barrier is released.
For example:
1 2 3 4 5 6 7 |
... # wait on all other tasks to complete position = await barrier.wait() # perform action if last to leave the barrier if position == 0: # perform some action # ... |
Callers can also wait on the asyncio.Barrier via the async context interface.
This is helpful if some action needs to be performed after all coroutine reaches the barrier.
For example:
1 2 3 4 5 |
... # wait on all other tasks to complete async with barrier: # perform some action # ... |
We can also abort the barrier.
Aborting the barrier means that all coroutines waiting on the barrier via the wait() method will raise an asyncio.BrokenBarrierError and the barrier will be put in the broken state.
This might be helpful if you wish to cancel the coordination effort.
1 2 3 |
... # abort the barrier await barrier.abort() |
A broken barrier cannot be used. Calls to wait() will raise a BrokenBarrierError.
A barrier can be fixed and made ready for use again by calling the reset() function.
This might be helpful if you cancel a coordination effort although you wish to retry it again with the same barrier instance.
1 2 3 |
... # reset a broken barrier await barrier.reset() |
Finally, the status of the barrier can be checked via attributes.
- parties: reports the configured number of parties that must reach the barrier for it to be lifted.
- n_waiting: reports the current number of coroutines waiting on the barrier.
- broken: attribute indicates whether the barrier is currently broken or not.
Now that we know how to use the barrier in Python, let’s look at some worked examples.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of Using an Asyncio Barrier
We can explore how to use an asyncio.Barrier with a worked example.
In this example, we will create a suite of coroutines, each required to perform some blocking calculation. We will use a barrier to coordinate all coroutines after they have finished their work and perform some action in the main coroutine. This is a good proxy for the types of coordination we may need to perform with a barrier.
First, let’s define a target task function to execute by each coroutine. The coroutine will take the barrier as an argument as well as a unique identifier for the task.
The task will generate a random value between 0 and 10, sleep for that many seconds, report the result, then wait on the barrier for all other tasks to perform their computation.
The work() coroutine below implements this.
1 2 3 4 5 6 7 8 9 10 |
# coroutine that prepares work and waits on a barrier async def work(barrier, number): # generate a unique value value = random() * 10 # suspend for a moment to simulate work await asyncio.sleep(value) # report result print(f'Task {number} done, got: {value}, waiting...') # wait on all other tasks to complete await barrier.wait() |
Next, in the main coroutine, we can create the barrier.
We need one party for each task we intend to create, five in this place, as well as an additional party for the main coroutine that will also wait for all coroutines to reach the barrier.
1 2 3 4 |
... # create a barrier n_tasks = 5 barrier = asyncio.Barrier(n_tasks + 1) |
Next, we can create and schedule five work() tasks to run in the background, providing the barrier to each.
1 2 3 |
... # issue all of the tasks _ = [asyncio.create_task(work(barrier, i)) for i in range(n_tasks)] |
The main coroutine can wait on the barrier for all tasks to perform their calculation.
We will use the context manager interface in this case.
Once the coroutines are finished, the barrier will be lifted and the worker coroutines will terminate and the main coroutine will report a message.
1 2 3 4 5 6 |
... # wait for all tasks to finish print('Main is waiting on all results...') async with barrier: # report once all tasks are done print('All tasks have their result') |
Finally, we can start the event loop and execute our main() coroutine.
1 2 3 |
... # start the event loop asyncio.run(main()) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of an asyncio barrier from random import random import asyncio # coroutine that prepares work and waits on a barrier async def work(barrier, number): # generate a unique value value = random() * 10 # suspend for a moment to simulate work await asyncio.sleep(value) # report result print(f'Task {number} done, got: {value}, waiting...') # wait on all other tasks to complete await barrier.wait() # main coroutine async def main(): # create a barrier n_tasks = 5 barrier = asyncio.Barrier(n_tasks + 1) # issue all of the tasks _ = [asyncio.create_task(work(barrier, i)) for i in range(n_tasks)] # wait for all tasks to finish print('Main is waiting on all results...') async with barrier: # report once all tasks are done print('All tasks have their result') # start the event loop asyncio.run(main()) |
Running the example first starts the event loop and runs the main() coroutine.
The main() coroutine runs and creates the barrier with 6 parties. It then creates and issues 5 tasks and prints a message and waits on the barrier.
The work() tasks run. Each generates a random number fraction of 10 and sleeps for that many seconds.
The work() tasks resume and report a message that includes their random value, then wait on the barrier.
Once all work() tasks make it to the barrier, the barrier is released.
The work() tasks resume and terminate as they have no further work to complete.
The main() coroutine resumes and reports a final message.
This highlights how we can coordinate multiple coroutines using a barrier.
1 2 3 4 5 6 7 |
Main is waiting on all results... Task 4 done, got: 2.873148965038761, waiting... Task 0 done, got: 6.587379562960236, waiting... Task 2 done, got: 7.210118403603204, waiting... Task 1 done, got: 9.314866458885717, waiting... Task 3 done, got: 9.40438957381003, waiting... All tasks have their result |
Next, let’s explore using the barrier with a timeout.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Using an Asyncio Barrier With Timeout and Abort
We can explore an example of using the barrier with a timeout.
In this case, the main() coroutine will wait on the barrier with a timeout of 5 seconds. If all tasks don’t make it to the barrier within the timeout then the barrier is aborted. This causes any coroutines on the barrier to stop waiting and any future coroutines that reach the barrier to not wait.
Firstly, we can update the work() coroutine to handle the case if the barrier is aborted.
If aborted, the barrier will raise a BrokenBarrierError exception. We can handle this exception when waiting on the barrier and report a message.
1 2 3 4 5 6 7 8 9 |
... # report result print(f'Task {number} done, got: {value}, waiting...') try: # wait on all other tasks to complete await barrier.wait() except asyncio.BrokenBarrierError: # report the task is no longer waiting print(f'Task {number} aborted waiting on the barrier') |
The updated work() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# coroutine that prepares work and waits on a barrier async def work(barrier, number): # generate a unique value value = random() * 10 # suspend for a moment to simulate work await asyncio.sleep(value) # report result print(f'Task {number} done, got: {value}, waiting...') try: # wait on all other tasks to complete await barrier.wait() except asyncio.BrokenBarrierError: # report the task is no longer waiting print(f'Task {number} aborted waiting on the barrier') |
Next, the main() coroutine needs to be updated to use a timeout.
We will use the asyncio.timeout() context manager.
This context manager will allow the body block of code to execute for a given number of seconds. If the body does not complete before the timeout, a TimeoutError exception is raised.
If raised, we will then await the abort() method on the barrier.
1 2 3 4 5 6 7 8 9 10 11 12 |
... print('Main is waiting on all results...') try: # wait on all other tasks to complete with a timeout async with asyncio.timeout(5): # wait on the barrier await barrier.wait() # report once all tasks are done print('All tasks have their result') except asyncio.TimeoutError: # abort the barrier print('Main is aborting the barrier') |
You can learn more about the asyncio.timeout() context manager in the tutorial:
The updated main() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# main coroutine async def main(): # create a barrier n_tasks = 5 barrier = asyncio.Barrier(n_tasks + 1) # issue all of the tasks _ = [asyncio.create_task(work(barrier, i)) for i in range(n_tasks)] # wait for all tasks to finish print('Main is waiting on all results...') try: # wait on all other tasks to complete with a timeout async with asyncio.timeout(5): # wait on the barrier await barrier.wait() # report once all tasks are done print('All tasks have their result') except asyncio.TimeoutError: # abort the barrier print('Main is aborting the barrier') await barrier.abort() # wait around await asyncio.sleep(5) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# SuperFastPython.com # example of an asyncio barrier with a timeout and abort from random import random import asyncio # coroutine that prepares work and waits on a barrier async def work(barrier, number): # generate a unique value value = random() * 10 # suspend for a moment to simulate work await asyncio.sleep(value) # report result print(f'Task {number} done, got: {value}, waiting...') try: # wait on all other tasks to complete await barrier.wait() except asyncio.BrokenBarrierError: # report the task is no longer waiting print(f'Task {number} aborted waiting on the barrier') # main coroutine async def main(): # create a barrier n_tasks = 5 barrier = asyncio.Barrier(n_tasks + 1) # issue all of the tasks _ = [asyncio.create_task(work(barrier, i)) for i in range(n_tasks)] # wait for all tasks to finish print('Main is waiting on all results...') try: # wait on all other tasks to complete with a timeout async with asyncio.timeout(5): # wait on the barrier await barrier.wait() # report once all tasks are done print('All tasks have their result') except asyncio.TimeoutError: # abort the barrier print('Main is aborting the barrier') await barrier.abort() # wait around await asyncio.sleep(5) # start the event loop asyncio.run(main()) |
Running the example first starts the event loop and runs the main() coroutine.
The main() coroutine runs and creates the barrier with 6 parties. It then creates and issues 5 tasks and prints a message.
The main() coroutine then opens the asyncio.timeout() context manager with a timeout of 5 seconds. Within the timeout context manager, it then suspends and awaits the barrier.
The work() tasks run. Each generates a random number fraction of 10 and sleeps for that many seconds.
Some work() tasks resume and report a message that includes their random value, then wait on the barrier.
After 5 seconds, not all tasks have reached the barrier. The timeout elapses and a TimeoutError is raised.
The main() coroutine handles the TimeoutError and reports a message before aborting the barrier. It then sleeps for 5 seconds, so that the program does not terminate early and we can see what happens to the tasks.
The work() tasks waiting on the barrier raise a BrokenBarrierError, which is handled, and a message is reported.
The remaining work() tasks complete, report their message, then wait on the barrier, immediately raising a BrokenBarrierError and reporting that the barrier was aborted.
This highlights how we can coordinate multiple coroutines using a barrier and abort the barrier after a timeout.
1 2 3 4 5 6 7 8 9 10 11 12 |
Main is waiting on all results... Task 2 done, got: 1.0488582339038333, waiting... Task 0 done, got: 1.1906875368978564, waiting... Main is aborting the barrier Task 2 aborted waiting on the barrier Task 0 aborted waiting on the barrier Task 3 done, got: 5.568018087287427, waiting... Task 3 aborted waiting on the barrier Task 1 done, got: 6.0924256738274885, waiting... Task 1 aborted waiting on the barrier Task 4 done, got: 6.577359157841062, waiting... Task 4 aborted waiting on the barrier |
Next, let’s explore executing a final action after the parties leave the barrier.
Example of Using an Asyncio Barrier With Final Action
We can explore the case of running a final action by the last party to leave the barrier after it is raised.
In this case, the main() coroutine will no longer wait on the barrier. The work() coroutines will check if they are the last party to leave the barrier and if so perform a special action.
Firstly, we can update the work() coroutine to use the context manager interface when waiting on the barrier.
Once all parties have arrived, each work() coroutine instance will check their position. If they have the position of zero, they are the last party to leave the barrier and report a special message.
1 2 3 4 5 6 7 |
... # wait on all other tasks to complete async with barrier as position: # check if this is the last task to leave if position == 0: # perform a special action print('This is a special action!') |
Note that we are able to get the position directly by the context manager interface on the barrier instance itself.
The updated work() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# coroutine that prepares work and waits on a barrier async def work(barrier, number): # generate a unique value value = random() * 10 # suspend for a moment to simulate work await asyncio.sleep(value) # report result print(f'Task {number} done, got: {value}, waiting...') # wait on all other tasks to complete async with barrier as position: # check if this is the last task to leave if position == 0: # perform a special action print('This is a special action!') |
We can then update the main() coroutine to no longer wait on the barrier.
It will wait for all tasks to be completed. This can be achieved by creating and scheduling the tasks via an asyncio.TaskGroup with a context manager interface. This will suspend until all issued tasks are done.
You can learn more about the asyncio.TaskGroup in the tutorial:
The updated main() coroutine with these changes is listed below.
1 2 3 4 5 6 7 8 9 |
# main coroutine async def main(): # create a barrier n_tasks = 5 barrier = asyncio.Barrier(n_tasks) # issue all of the tasks async with asyncio.TaskGroup() as group: _ = [group.create_task(work(barrier, i)) for i in range(n_tasks)] # wait for tasks to be done... |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of an asyncio barrier with a final action from random import random import asyncio # coroutine that prepares work and waits on a barrier async def work(barrier, number): # generate a unique value value = random() * 10 # suspend for a moment to simulate work await asyncio.sleep(value) # report result print(f'Task {number} done, got: {value}, waiting...') # wait on all other tasks to complete async with barrier as position: # check if this is the last task to leave if position == 0: # perform a special action print('This is a special action!') # main coroutine async def main(): # create a barrier n_tasks = 5 barrier = asyncio.Barrier(n_tasks) # issue all of the tasks async with asyncio.TaskGroup() as group: _ = [group.create_task(work(barrier, i)) for i in range(n_tasks)] # wait for tasks to be done... # start the event loop asyncio.run(main()) |
Running the example first starts the event loop and runs the main() coroutine.
The main() coroutine runs and creates the barrier with 5 parties.
It then creates an asyncio.TaskGroup and uses the context manager interface and creates and issues 5 work() tasks. It then suspends and waits on the asyncio.TaskGroup for all tasks to be done.
The work() tasks run. Each generates a random number fraction of 10 and sleeps for that many seconds.
The work() tasks resume and report a message that includes their random value, then wait on the barrier.
Once all work() tasks make it to the barrier, the barrier is released.
The work() tasks resume and terminate as they have no further work to complete.
The last task to leave the barrier has a position of zero and reports a special message.
This highlights how we can coordinate multiple coroutines using a barrier and have a special action performed by the last party to leave the barrier.
1 2 3 4 5 6 |
Task 1 done, got: 0.767816304788077, waiting... Task 4 done, got: 1.824525567415607, waiting... Task 0 done, got: 3.1450961858118, waiting... Task 2 done, got: 4.138987095243019, waiting... Task 3 done, got: 7.077700100405522, waiting... This is a special action! |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to use the barrier synchronization primitive in asyncio programs.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Kindred Hues Photography on Unsplash
Do you have any questions?