You can use an asyncio bounded semaphore via the asyncio.BoundedSemaphore class.
In this tutorial, you will discover how to use a bounded semaphore with coroutines in Python.
Let’s get started.
What is a Semaphore
A semaphore is a concurrency primitive that allows a limit on the number of threads that can acquire a lock protecting a critical section.
It is an extension of a mutual exclusion (mutex) lock that adds a count for the number of threads that can acquire the lock before additional threads will block. Once full, new threads can only acquire a position on the semaphore once an existing thread holding the semaphore releases a position.
Internally, the semaphore maintains a counter protected by a mutex lock that is decremented each time the semaphore is acquired and incremented each time it is released.
When a semaphore is created, the upper limit on the counter is set. If it is set to be 1, then the semaphore will operate like a mutex lock.
A semaphore provides a useful concurrency tool for limiting the number of threads that can access a resource concurrently. Some examples include:
- Limiting concurrent socket connections to a server.
- Limiting concurrent file operations on a hard drive.
- Limiting concurrent calculations.
Although the semaphore is described in terms of threads, it is a primitive that can be used with other units of concurrency.
For example, the threading.Semaphore class allows us to use the semaphore with threads:
The multiprocessing.Semaphore class allows to us to use a semaphore with processes:
And the asyncio.Semaphore class allows us to use a semaphore with coroutines in asyncio programs.
Run loops using all CPUs, download your FREE book to learn how.
What is a Bounded Semaphore?
A bounded semaphore is the same as a classical (unbounded) semaphore except that the upper limit of the semaphore cannot be increased.
Semaphores are configured with an initial count value when they are created. Each call to acquire() decrements the internal counter and each call to release() increments the counter.
We may call the release() method many more times than the acquire() method which will allow the internal counter to exceed the initial value.
For example:
1 2 3 4 5 |
... semaphore = asyncio.Semaphore(2) # counter = 2 semaphore.release() # counter = 3 semaphore.release() # counter = 4 semaphore.release() # counter = 5 |
The example above shows how the internal counter for a classical semaphore can be increased from the initial value of 2 to a new value of 5.
This is not supported on a bounded semaphore.
Instead, if the internal counter exceeds its initial value, a ValueError is raised.
Bounded Semaphore is a version of Semaphore that raises a ValueError in release() if it increases the internal counter above the initial value.
— Asyncio Synchronization Primitives
This is why it is called a “bounded” semaphore. The internal counter has an upper bound, unlike a more classical semaphore.
Semaphore vs Bounded Semaphore
- Semaphore: A semaphore can increase its internal counter value above the initial value.
- Bounded Semaphore: A bounded semaphore cannot increase its internal counter value above its initial value.
Why Use a Bounded Semaphore
We may prefer to use a bounded semaphore instead of a classical semaphore.
A key reason is that it ensures that the calls to the acquire() and release() methods are symmetric. If the number comes out of balance with more calls to release() that results in the counter exceeding its initial value, then an exception is raised and terminates the program.
This allows bugs in the usage of the semaphore to be more apparent, like using an assertion.
An unbounded semaphore may allow access to a critical section or resource by more than an initial number of coroutines. We may choose to use a bounded semaphore to ensure that this cannot be the case, assuming the semaphore is used correctly.
In fact, most cases when we choose to use a semaphore, we probably should be using a bounded semaphore. Rarely do we have the requirement of increasing the count above the initial value.
How to Use an Asyncio BoundedSemaphore
Python provides a bounded semaphore via the asyncio.BoundedSemaphore class for use with coroutines.
The asyncio.BoundedSemaphore instance must be configured when it is created to set the limit on the internal counter. This limit will match the number of concurrent coroutines that can hold the semaphore at a time.
For example, we might want to set it to 100:
1 2 3 |
... # create a semaphore with a limit of 100 semaphore = asyncio.BoundedSemaphore(100) |
A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some task calls release().
— Asyncio Synchronization Primitives
The bounded semaphore can be acquired by calling the acquire() function.
This returns a coroutine that must be awaited.
Acquire a semaphore. If the internal counter is greater than zero, decrement it by one and return True immediately. If it is zero, wait until a release() is called and return True.
— Asyncio Synchronization Primitives
For example:
1 2 3 |
... # acquire the bounded semaphore await semaphore.acquire() |
Once acquired, the bounded semaphore can be released again by calling the release() function.
Release a semaphore, incrementing the internal counter by one. Can wake up a task waiting to acquire the semaphore.
— Asyncio Synchronization Primitives
For example:
1 2 3 |
... # release the bounded semaphore semaphore.release() |
Unlike the asyncio.Semaphore, the release() method on the BoundedSemaphore can only be called a limited number of times. Once the internal counter exceeds the initial value the release() method will raise a ValueError exception.
Bounded Semaphore is a version of Semaphore that raises a ValueError in release() if it increases the internal counter above the initial value.
— Asyncio Synchronization Primitives
For example:
1 2 3 4 5 6 7 8 9 |
... # acquire the bounded semaphore semaphore.acquire() # do something # ... # release the bounded semaphore semaphore.release() # release the bounded semaphore again semaphore.release() # ValueError |
The asyncio.BoundedSemaphore class supports the context manager interface, which will automatically acquire and release the bounded semaphore for you.
As such it is the preferred usage, if appropriate for your program.
For example:
1 2 3 4 |
... # acquire the bounded semaphore async with semaphore: # ... |
Finally, a coroutine can check if the bounded semaphore is current at capacity via the locked() method.
It will be True if the bounded semaphore cannot be acquired at the time of the call, or False otherwise.
For example:
1 2 3 4 |
... # check if there is space on the bounded semaphore if not semaphore.locked(): # ... |
Now that we know how to use the asyncio.BoundedSemaphore in Python, let’s look at a worked example.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of Using an Asyncio BoundedSemaphore
We can explore how to use an asyncio.BoundedSemaphore with a worked example.
We will develop an example with a suite of coroutines but a limit on the number of coroutines that can perform an action simultaneously.
A bounded semaphore will be used to limit the number of concurrent coroutines which will be less than the total number of coroutines, allowing some coroutines to block, wait for a position, then be notified and acquire a position.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# SuperFastPython.com # example of using an asyncio bounded semaphore from random import random import asyncio # task coroutine async def task(semaphore, number): # acquire the bounded semaphore async with semaphore: # generate a random value between 0 and 1 value = random() # block for a moment await asyncio.sleep(value) # report a message print(f'Task {number} got {value}') # main coroutine async def main(): # create the shared bounded semaphore semaphore = asyncio.BoundedSemaphore(2) # create and schedule tasks tasks = [asyncio.create_task(task(semaphore, i)) for i in range(10)] # wait for all tasks to complete _ = await asyncio.wait(tasks) # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine that is used as the entry point into the asyncio program.
The main() coroutine runs and first creates the shared bounded semaphore with an initial counter value of 2, meaning that two coroutines can hold the bounded semaphore at once.
The main() coroutine then creates and schedules 10 tasks to execute our task() coroutine, passing the shared bounded semaphore and a unique number between 0 and 9.
The main() coroutine then suspends and waits for all tasks to complete.
The tasks run one at a time.
Each task first attempts to acquire the bounded semaphore. If there is a position available it proceeds with the block, otherwise it waits for a position to become available.
Once acquired, a task generates a random value, blocks for a moment, and then reports the generated value. It then releases the bounded semaphore and terminates. The bounded semaphore is not released while the task is suspended in the call to sleep().
The body of the bounded semaphore context manager is limited to two coroutines at a time.
This highlights how we can limit the number of coroutines to execute a block of code concurrently. It highlights how we might use a bounded semaphore like a classical semaphore.
The output from the program will differ each time it is run given the use of random numbers.
1 2 3 4 5 6 7 8 9 10 |
Task 0 got 0.20369168197618748 Task 2 got 0.20640107131350838 Task 1 got 0.6855263719449817 Task 3 got 0.9396433586858612 Task 4 got 0.8039832235015294 Task 6 got 0.12266060196253203 Task 5 got 0.879466225105295 Task 7 got 0.6675244153844875 Task 8 got 0.11511060306129695 Task 9 got 0.9607702805925814 |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Hitting the Boundary With the BoundedSemaphore
We can explore what happens when we exceed the initial value of the bounded semaphore.
In this example, we will create a bounded semaphore with an initial count of one.
We will then acquire it, then release it once, then a second time. We expect that the second call to the release() method will result in a ValueError.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# SuperFastPython.com # example of hitting the boundary with the asyncio bounded semaphore import asyncio # main coroutine async def main(): # create the shared bounded semaphore semaphore = asyncio.BoundedSemaphore(1) # acquire the semaphore await semaphore.acquire() # report a message print('bounded semaphore acquired') # release the semaphore semaphore.release() # report a message print('bounded semaphore released') # release again semaphore.release() # ValueError # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine that is used as the entry point into the asyncio program.
The main() coroutine runs and first creates the bounded semaphore with an initial count of one.
The bounded semaphore is then acquired and a message is reported. This takes the semaphore’s count from one to zero.
The bounded semaphore is released and a message is reported. This takes the semaphore’s count from zero to one.
The bounded semaphore is released again. This attempts to take the semaphore’s count from 1 to 2.
A ValueError exception is raised as expected, terminating the program.
This highlights the benefit of using a bounded semaphore, where it enforces the initial value as a boundary on the semaphore’s internal counter.
1 2 3 4 5 |
bounded semaphore acquired bounded semaphore released Traceback (most recent call last): ... ValueError: BoundedSemaphore released too many times |
Example of Checking if the BoundedSemaphore if Locked
We can explore checking the status of the bounded semaphore.
In this example, we will check the locked status of the semaphore before it is acquired, after it is acquired, and after it is released.
We expect that when the bounded semaphore’s value is above zero that it will be marked as unlocked (locked is false), whereas when the internal count is zero, it will be marked as locked.
The complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# SuperFastPython.com # example of checking the status of the asyncio bounded semaphore import asyncio # main coroutine async def main(): # create the shared bounded semaphore semaphore = asyncio.BoundedSemaphore(1) # report a the status of the bounded semaphore print(f'bounded semaphore locked: {semaphore.locked()}') # acquire the semaphore await semaphore.acquire() # report a the status of the bounded semaphore print(f'bounded semaphore locked: {semaphore.locked()}') # release the semaphore semaphore.release() # report a the status of the bounded semaphore print(f'bounded semaphore locked: {semaphore.locked()}') # start the asyncio program asyncio.run(main()) |
Running the example first creates the main() coroutine that is used as the entry point into the asyncio program.
The main() coroutine runs and first creates the bounded semaphore with an initial count of one.
The locked status of the bounded semaphore is reported, showing that initially it is not locked. The internal counter is set to 1 and it can be acquired.
The bounded semaphore is then acquired and the locked status is reported again. The semaphore’s internal counter is taken from 1 to 0. It is now in the locked state and cannot be acquired again, e.g. callers must wait.
The bounded semaphore is released and the lock status is reported a final time. The semaphore’s internal counter is taken from 0 to 1, and can be acquired again. It is marked as not locked.
1 2 3 |
bounded semaphore locked: False bounded semaphore locked: True bounded semaphore locked: False |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to use a bounded semaphore with coroutines in Python.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Do you have any questions?