Asyncio gather() Limit Concurrency

May 9, 2024 Python Asyncio

We can limit concurrency when using asyncio.gather() via a semaphore.

In this tutorial, you will discover how to limit concurrency with asyncio.gather().

Let's get started.

Need to Limit Concurrency with asyncio.gather()

The asyncio.gather() function allows us to run multiple coroutines or tasks concurrently.

Coroutines can be provided as positional arguments to asyncio.gather() and a list of return values is returned, for example:

...
# execute coroutines concurrently
results = await asyncio.gather(coro1(), coro2())

Alternately, we can create a list of coroutines or tasks and unpack them as positional arguments using the star operator (*), for example:

...
# creates a list of coroutine objects
coros = [coro(i) for i in range(100)]
# execute coroutines concurrently
results = await asyncio.gather(*coros)

You can learn more about how to use asyncio.gather() in the tutorial:

The asyncio.gather() function will execute all provided coroutines concurrently.

This can be a problem if the coroutines access shared resources, like a remote website or database.

Therefore, in some cases, we may want to limit the concurrency of asyncio.gather().

How can we limit the number of coroutines that run concurrently?

How to Limit Concurrency with asyncio.gather()

We can limit the concurrency of asyncio.gather() using a semaphore.

A semaphore is a concurrency primitive that allows a limit on the number of coroutines that can acquire a lock protecting a critical section.

It is an extension of a mutual exclusion (mutex) lock that adds a count for the number of coroutines that can acquire the lock before additional coroutines will block. Once full, new coroutines can only acquire a position on the semaphore once an existing coroutine holding the semaphore releases a position.

Internally, the semaphore maintains a counter protected by a mutex lock that is decremented each time the semaphore is acquired and incremented each time it is released.

We can use semaphores in asyncio programs via the asyncio.Semaphore class.

An asyncio.Semaphore instance is created and the maximum number of positions on the semaphore is specified as an argument.

For example:

...
# prepare semaphore for limiting concurrency
semaphore = asyncio.Semaphore(10)

The semaphore can then be shared with all coroutines in which we need to limit concurrency.

The coroutines can acquire the semaphore using the asynchronous context manager interface via the "async with" expression.

For example:

...
# acquire the semaphore
async with semaphore:
	...

We can then create instances of the target coroutine or coroutines and provide the semaphore as an argument, then provide the coroutine objects to asyncio.gather() as per normal.

...
# create many coroutines
coros = [mycoro(semaphore) for _ in range(100)]
# run the tasks
results = await asyncio.gather(*coros)

And that's it.

You can learn more about how to use semaphores in the tutorial:

You can learn more about how to use the "async with" expression for asynchronous context managers in the tutorial:

Now that we know how to limit concurrency with asyncio.gather(), let's look at some worked examples.

Example of asyncio.gather() with Unlimited Concurrency

Before we explore how to limit concurrency with asyncio.gather() , let's explore an example that uses gather that is not limited.

In his case, we can define a coroutine that will take an argument, suspend a moment to simulate work, report a message, and return a value.

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(1)
    # report a message
    print(f'>{value} done')
    # return a result
    return value * 100

We can then create 100 instances of our coroutine and run them concurrently using asyncio.gather() and report all of the results.

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(100)]
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
    # report a message
    print('Main done')

In this example, we expect all 100 coroutines will run concurrently.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of gather() with unlimited concurrency
import asyncio

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(1)
    # report a message
    print(f'>{value} done')
    # return a result
    return value * 100

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(100)]
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
    # report a message
    print('Main done')

# start the asyncio program
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine runs and reports a message. It then creates 100 coroutine objects and executes them concurrency with asyncio, suspending until all results are available.

All 100 coroutines run concurrently. Each suspends a moment, reports a message, and returns a value.

The main() coroutine resumes and reports all return values from the coroutines and then reports a final message.

This highlights how asyncio will execute all provided coroutines concurrently, regardless of how many are provided.

...
>95 done
>96 done
>97 done
>98 done
>99 done
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3000, 3100, 3200, 3300, 3400, 3500, 3600, 3700, 3800, 3900, 4000, 4100, 4200, 4300, 4400, 4500, 4600, 4700, 4800, 4900, 5000, 5100, 5200, 5300, 5400, 5500, 5600, 5700, 5800, 5900, 6000, 6100, 6200, 6300, 6400, 6500, 6600, 6700, 6800, 6900, 7000, 7100, 7200, 7300, 7400, 7500, 7600, 7700, 7800, 7900, 8000, 8100, 8200, 8300, 8400, 8500, 8600, 8700, 8800, 8900, 9000, 9100, 9200, 9300, 9400, 9500, 9600, 9700, 9800, 9900]
Main done

Next, let's look at how we might limit concurrency when using asyncio.gather().

Example of asyncio.gather() with Limited Concurrency

We can explore how to limit concurrency when using asyncio.gather().

In this case, we can update the above example to use a semaphore to limit the number of coroutines that can run concurrently via asyncio.gather().

Firstly, we can update our task_coro() to take a semaphore as an argument.

The body of the coroutine can then be updated so that the semaphore must be acquired first before the body is run.

The updated task_coro() coroutine with these changes is listed below.

# coroutine used for a task
async def task_coro(value, semaphore):
    # acquire the semaphore
    async with semaphore:
        # sleep for a moment
        await asyncio.sleep(1)
        # report a message
        print(f'>{value} done')
        # return a result
        return value * 100

Next, we can update the main() coroutine to first create the semaphore. We will set a limit of 10 coroutines that can run concurrently.

...
# prepare semaphore for limiting concurrency
semaphore = asyncio.Semaphore(10)

Next, we can pass this semaphore as an argument to each coroutine object.

...
# create many coroutines
coros = [task_coro(i, semaphore) for i in range(100)]

And that's it.

The updated main() coroutine with this change is listed below.

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # prepare semaphore for limiting concurrency
    semaphore = asyncio.Semaphore(10)
    # create many coroutines
    coros = [task_coro(i, semaphore) for i in range(100)]
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
    # report a message
    print('Main done')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of gather() with limited concurrency
import asyncio

# coroutine used for a task
async def task_coro(value, semaphore):
    # acquire the semaphore
    async with semaphore:
        # sleep for a moment
        await asyncio.sleep(1)
        # report a message
        print(f'>{value} done')
        # return a result
        return value * 100

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # prepare semaphore for limiting concurrency
    semaphore = asyncio.Semaphore(10)
    # create many coroutines
    coros = [task_coro(i, semaphore) for i in range(100)]
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
    # report a message
    print('Main done')

# start the asyncio program
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine runs and reports a message.

A total of 100 coroutine objects are created and provided with the semaphore.

The list of coroutines is unpacked and provided to asyncio.gather().

All 100 coroutines are scheduled and begin running concurrently.

Each task attempts to acquire the semaphore, but only 10 are able to hold it at a time and execute their body, suspending, reporting a message, and returning a value.

The coroutines are processed in batches of 10 until they are all done and the list of all results is returned from gather().

The main() coroutine resumes and reports all return values from the coroutines and then reports a final message.

This highlights how we can limit concurrency with a semaphore when using asyncio.gather().

...
>95 done
>96 done
>97 done
>98 done
>99 done
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, 1600, 1700, 1800, 1900, 2000, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3000, 3100, 3200, 3300, 3400, 3500, 3600, 3700, 3800, 3900, 4000, 4100, 4200, 4300, 4400, 4500, 4600, 4700, 4800, 4900, 5000, 5100, 5200, 5300, 5400, 5500, 5600, 5700, 5800, 5900, 6000, 6100, 6200, 6300, 6400, 6500, 6600, 6700, 6800, 6900, 7000, 7100, 7200, 7300, 7400, 7500, 7600, 7700, 7800, 7900, 8000, 8100, 8200, 8300, 8400, 8500, 8600, 8700, 8800, 8900, 9000, 9100, 9200, 9300, 9400, 9500, 9600, 9700, 9800, 9900]
Main done

Takeaways

You now know how to limit concurrency with asyncio.gather().



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.