We can limit concurrency when using asyncio.gather() via a semaphore.
In this tutorial, you will discover how to limit concurrency with asyncio.gather().
Let’s get started.
Need to Limit Concurrency with asyncio.gather()
The asyncio.gather() function allows us to run multiple coroutines or tasks concurrently.
Coroutines can be provided as positional arguments to asyncio.gather() and a list of return values is returned, for example:
1 2 3 |
... # execute coroutines concurrently results = await asyncio.gather(coro1(), coro2()) |
Alternately, we can create a list of coroutines or tasks and unpack them as positional arguments using the star operator (*), for example:
1 2 3 4 5 |
... # creates a list of coroutine objects coros = [coro(i) for i in range(100)] # execute coroutines concurrently results = await asyncio.gather(*coros) |
You can learn more about how to use asyncio.gather() in the tutorial:
The asyncio.gather() function will execute all provided coroutines concurrently.
This can be a problem if the coroutines access shared resources, like a remote website or database.
Therefore, in some cases, we may want to limit the concurrency of asyncio.gather().
How can we limit the number of coroutines that run concurrently?
Run loops using all CPUs, download your FREE book to learn how.
How to Limit Concurrency with asyncio.gather()
We can limit the concurrency of asyncio.gather() using a semaphore.
A semaphore is a concurrency primitive that allows a limit on the number of coroutines that can acquire a lock protecting a critical section.
It is an extension of a mutual exclusion (mutex) lock that adds a count for the number of coroutines that can acquire the lock before additional coroutines will block. Once full, new coroutines can only acquire a position on the semaphore once an existing coroutine holding the semaphore releases a position.
Internally, the semaphore maintains a counter protected by a mutex lock that is decremented each time the semaphore is acquired and incremented each time it is released.
We can use semaphores in asyncio programs via the asyncio.Semaphore class.
An asyncio.Semaphore instance is created and the maximum number of positions on the semaphore is specified as an argument.
For example:
1 2 3 |
... # prepare semaphore for limiting concurrency semaphore = asyncio.Semaphore(10) |
The semaphore can then be shared with all coroutines in which we need to limit concurrency.
The coroutines can acquire the semaphore using the asynchronous context manager interface via the “async with” expression.
For example:
1 2 3 4 |
... # acquire the semaphore async with semaphore: ... |
We can then create instances of the target coroutine or coroutines and provide the semaphore as an argument, then provide the coroutine objects to asyncio.gather() as per normal.
1 2 3 4 5 |
... # create many coroutines coros = [mycoro(semaphore) for _ in range(100)] # run the tasks results = await asyncio.gather(*coros) |
And that’s it.
You can learn more about how to use semaphores in the tutorial:
You can learn more about how to use the “async with” expression for asynchronous context managers in the tutorial:
Now that we know how to limit concurrency with asyncio.gather(), let’s look at some worked examples.
Example of asyncio.gather() with Unlimited Concurrency
Before we explore how to limit concurrency with asyncio.gather() , let’s explore an example that uses gather that is not limited.
In his case, we can define a coroutine that will take an argument, suspend a moment to simulate work, report a message, and return a value.
1 2 3 4 5 6 7 8 |
# coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(1) # report a message print(f'>{value} done') # return a result return value * 100 |
We can then create 100 instances of our coroutine and run them concurrently using asyncio.gather() and report all of the results.
1 2 3 4 5 6 7 8 9 10 11 12 |
# coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(100)] # run the tasks results = await asyncio.gather(*coros) # report results print(results) # report a message print('Main done') |
In this example, we expect all 100 coroutines will run concurrently.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# SuperFastPython.com # example of gather() with unlimited concurrency import asyncio # coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(1) # report a message print(f'>{value} done') # return a result return value * 100 # coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(100)] # run the tasks results = await asyncio.gather(*coros) # report results print(results) # report a message print('Main done') # start the asyncio program asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine runs and reports a message. It then creates 100 coroutine objects and executes them concurrency with asyncio, suspending until all results are available.
All 100 coroutines run concurrently. Each suspends a moment, reports a message, and returns a value.
The main() coroutine resumes and reports all return values from the coroutines and then reports a final message.
This highlights how asyncio will execute all provided coroutines concurrently, regardless of how many are provided.
1 2 3 4 5 6 7 8 |
... >95 done >96 done >97 done >98 done >99 done [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3000, 3100, 3200, 3300, 3400, 3500, 3600, 3700, 3800, 3900, 4000, 4100, 4200, 4300, 4400, 4500, 4600, 4700, 4800, 4900, 5000, 5100, 5200, 5300, 5400, 5500, 5600, 5700, 5800, 5900, 6000, 6100, 6200, 6300, 6400, 6500, 6600, 6700, 6800, 6900, 7000, 7100, 7200, 7300, 7400, 7500, 7600, 7700, 7800, 7900, 8000, 8100, 8200, 8300, 8400, 8500, 8600, 8700, 8800, 8900, 9000, 9100, 9200, 9300, 9400, 9500, 9600, 9700, 9800, 9900] Main done |
Next, let’s look at how we might limit concurrency when using asyncio.gather().
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of asyncio.gather() with Limited Concurrency
We can explore how to limit concurrency when using asyncio.gather().
In this case, we can update the above example to use a semaphore to limit the number of coroutines that can run concurrently via asyncio.gather().
Firstly, we can update our task_coro() to take a semaphore as an argument.
The body of the coroutine can then be updated so that the semaphore must be acquired first before the body is run.
The updated task_coro() coroutine with these changes is listed below.
1 2 3 4 5 6 7 8 9 10 |
# coroutine used for a task async def task_coro(value, semaphore): # acquire the semaphore async with semaphore: # sleep for a moment await asyncio.sleep(1) # report a message print(f'>{value} done') # return a result return value * 100 |
Next, we can update the main() coroutine to first create the semaphore. We will set a limit of 10 coroutines that can run concurrently.
1 2 3 |
... # prepare semaphore for limiting concurrency semaphore = asyncio.Semaphore(10) |
Next, we can pass this semaphore as an argument to each coroutine object.
1 2 3 |
... # create many coroutines coros = [task_coro(i, semaphore) for i in range(100)] |
And that’s it.
The updated main() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# coroutine used for the entry point async def main(): # report a message print('Main starting') # prepare semaphore for limiting concurrency semaphore = asyncio.Semaphore(10) # create many coroutines coros = [task_coro(i, semaphore) for i in range(100)] # run the tasks results = await asyncio.gather(*coros) # report results print(results) # report a message print('Main done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of gather() with limited concurrency import asyncio # coroutine used for a task async def task_coro(value, semaphore): # acquire the semaphore async with semaphore: # sleep for a moment await asyncio.sleep(1) # report a message print(f'>{value} done') # return a result return value * 100 # coroutine used for the entry point async def main(): # report a message print('Main starting') # prepare semaphore for limiting concurrency semaphore = asyncio.Semaphore(10) # create many coroutines coros = [task_coro(i, semaphore) for i in range(100)] # run the tasks results = await asyncio.gather(*coros) # report results print(results) # report a message print('Main done') # start the asyncio program asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine runs and reports a message.
A total of 100 coroutine objects are created and provided with the semaphore.
The list of coroutines is unpacked and provided to asyncio.gather().
All 100 coroutines are scheduled and begin running concurrently.
Each task attempts to acquire the semaphore, but only 10 are able to hold it at a time and execute their body, suspending, reporting a message, and returning a value.
The coroutines are processed in batches of 10 until they are all done and the list of all results is returned from gather().
The main() coroutine resumes and reports all return values from the coroutines and then reports a final message.
This highlights how we can limit concurrency with a semaphore when using asyncio.gather().
1 2 3 4 5 6 7 8 |
... >95 done >96 done >97 done >98 done >99 done [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3000, 3100, 3200, 3300, 3400, 3500, 3600, 3700, 3800, 3900, 4000, 4100, 4200, 4300, 4400, 4500, 4600, 4700, 4800, 4900, 5000, 5100, 5200, 5300, 5400, 5500, 5600, 5700, 5800, 5900, 6000, 6100, 6200, 6300, 6400, 6500, 6600, 6700, 6800, 6900, 7000, 7100, 7200, 7300, 7400, 7500, 7600, 7700, 7800, 7900, 8000, 8100, 8200, 8300, 8400, 8500, 8600, 8700, 8800, 8900, 9000, 9100, 9200, 9300, 9400, 9500, 9600, 9700, 9800, 9900] Main done |
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to limit concurrency with asyncio.gather().
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Iewek Gnos on Unsplash
Do you have any questions?