Asyncio gather() Timeout

May 5, 2024 Python Asyncio

We can add a timeout when using asyncio.gather() via the asyncio.timeout() context manager.

A timeout in seconds can be specified to the asyncio.timeout() context manager and the asyncio.gather() function can be called within the context manager block. If the timeout elapses before all tasks in the gather() are done, an asyncio.TimeoutError exception is raised. Using the asyncio.timeout() context manager requires Python version 3.11 or higher.

In this tutorial, you will discover how to use asyncio.gather() with a timeout.

Let's get started.

Need Timeout With asyncio.gather()

The asyncio.gather() function allows us to run multiple coroutines or tasks concurrently.

Coroutines can be provided as positional arguments to asyncio.gather() and a list of return values is returned, for example:

...
# execute coroutines concurrently
results = await asyncio.gather(coro1(), coro2())

Alternatively, we can create a list of coroutines or tasks and unpack them as positional arguments using the star operator (*).

For example:

...
# creates a list of coroutine objects
coros = [coro(i) for i in range(100)]
# execute coroutines concurrently
results = await asyncio.gather(*coros)

You can learn more about how to use asyncio.gather() in the tutorial:

A limitation of gather() is that it does not support a timeout.

The asyncio.gather() function will suspend until all provided awaitables are done. This means that one long-running coroutine provided to gather() may prevent a program from progressing.

How can we use a timeout with asyncio.gather()?

How to Add Timeout To asyncio.gather()

There are many ways that we can use asyncio.gather() with a timeout, or achieve a similar result with another function that adds a timeout.

The main approaches include:

  1. Use asyncio.timeout()
  2. Use asyncio.wait_for()
  3. Use asyncio.wait()

Let's take a closer look at each approach in turn.

Add a Timeout With asyncio.timeout()

Perhaps the simplest approach to add a timeout when using asyncio.gather() is via the asyncio.timeout() context manager.

For example:

...
# gather tasks with a timeout
async with asyncio.timeout(2.5):
    # run the tasks
    results = await asyncio.gather(*coros)

If the timeout elapses before the gather() returns, an asyncio.TimeoutError exception is raised, which can be handled.

For example:

...
try:
    # gather tasks with a timeout
    async with asyncio.timeout(2.5):
        # run the tasks
        results = await asyncio.gather(*coros)
except asyncio.TimeoutError:
	...

Note: Using the asyncio.timeout() context manager requires Python version 3.11 or higher.

You can learn more about how to use asyncio.timeout() in the tutorial:

Add a Timeout With asyncio.wait_for()

Another approach is to wrap the coroutines passed to asyncio.gather() in calls to asyncio.wait_for().

This will convert each coroutine to a task and raise an asyncio.TimeoutError exception if a task is not completed within the time limit.

The asyncio.wait_for() could be applied to all coroutines.

For example:

...
# create many tasks with timeouts
coros = [asyncio.wait_for(task_coro(i)) for i in range(10)]

We can then issue the asyncio.gather() and handle the exception.

...
try:
    # gather tasks with a timeout
    async with asyncio.timeout(2.5):
        # run the tasks
        results = await asyncio.gather(*coros)
except asyncio.TimeoutError:
	...

Alternatively, it could be applied to selected coroutines that may be expected to take a long time.

For example:

...
# add a timeout to the first task only
coros[0] = asyncio.wait_for(coros[0], timeout=2.5)

You can learn more about how to use asyncio.wait_for() in the tutorial:

Add a Timeout With asyncio.wait()

Finally, we could replace the asyncio.gather() with asyncio.wait().

It requires that all coroutines be scheduled as tasks first, such as asyncio.create_task().

For example:

...
# create many tasks
coros = [asyncio.create_task(task_coro(i)) for i in range(10)]

The asyncio.wait() function will then return once all issued tasks are done, allowing a timeout to be specified.

For example:

...
# wait for all tasks with a timeout
done, notdone = = await asyncio.wait(tasks, timeout=5)

An asyncio.TimeoutError exception is not raised, instead, a tuple of two sets is returned, the first containing all tasks that are done, and the second with those tasks that are still running.

You can learn more about how to use asyncio.wait() in the tutorial:

Now that we know how to wait for coroutines to complete with a timeout, let's look at some examples.

Example of asyncio.gather() With Slow Tasks

Before we look at adding a timeout to asyncio.gather(), let's look at using asyncio.gather() with slow coroutines.

In this example, we will define a coroutine that takes an argument, suspends for 5 seconds, and then returns the provided argument multiplied by 100. This is our slow coroutine.

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(5)
    # return a result
    return value * 100

We can then create a list of 10 slow coroutines, and execute them concurrently using asyncio.gather(), then report the return values.

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
    # report a message
    print('Main done')

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of gather() with slow tasks
import asyncio

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(5)
    # return a result
    return value * 100

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
    # report a message
    print('Main done')

# start the asyncio program
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine runs and reports a message. It then creates a list of 10 task_coro() coroutine objects and issues them to asyncio.gather() for concurrent execution.

The coroutines run, each suspending for 5 seconds before returning an integer value.

The main() coroutine resumes and receives the return values, which are reported before a final message is reported.

This highlights an example of executing multiple slow coroutines concurrently, motivating the desire to add a timeout.

Main starting
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
Main done

Next, let's look at how we might add a timeout when using asyncio.gather().

Example of asyncio.gather() with Timeout on All Tasks

We can explore how to add a timeout to asyncio.gather() via asyncio.timeout().

In this case, we can update the above example to add an asyncio.timeout() context manager to the main() coroutine with a timeout of 2.5 seconds.

We can then issue the asyncio.gather() within the context manager and report results.

...
# gather tasks with a timeout
async with asyncio.timeout(2.5):
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)

The whole timeout() context manager can be wrapped in a try-except structure to handle the timeout.

...
try:
    # gather tasks with a timeout
    async with asyncio.timeout(2.5):
        # run the tasks
        results = await asyncio.gather(*coros)
        # report results
        print(results)
except asyncio.TimeoutError:
    # report a message
    print('Timeout waiting for gather')

The updated main() coroutine with this change is listed below.

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    try:
        # gather tasks with a timeout
        async with asyncio.timeout(2.5):
            # run the tasks
            results = await asyncio.gather(*coros)
            # report results
            print(results)
    except asyncio.TimeoutError:
        # report a message
        print('Timeout waiting for gather')
    # report a message
    print('Main done')

Tying this together, the complete example is listed below.

Note: This example requires Python 3.11 or higher.

# SuperFastPython.com
# example of gather() with timeout
import asyncio

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(5)
    # return a result
    return value * 100

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    try:
        # gather tasks with a timeout
        async with asyncio.timeout(2.5):
            # run the tasks
            results = await asyncio.gather(*coros)
            # report results
            print(results)
    except asyncio.TimeoutError:
        # report a message
        print('Timeout waiting for gather')
    # report a message
    print('Main done')

# start the asyncio program
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine runs and reports a message. It then creates a list of 10 task_coro() coroutine objects.

The timeout() context manager is opened with a time limit of 2.5 seconds, not long enough for the coroutines we are going to issue.

Within the block, the list of coroutines is issued for concurrent execution with asyncio.gather().

The coroutines run, each suspending for 5 seconds.

The timeout elapses and the asyncio.timeout() cancels all issued coroutines and raises an asyncio.TimeoutError.

The exception is handled and a message is reported.

This highlights how we can add a timeout when using asyncio.gather().

Main starting
Timeout waiting for gather
Main done

Next, let's look at how we might add a timeout to a single coroutine when using asyncio.gather().

Example of asyncio.gather() with Timeout on One Task

We can explore an example of adding a timeout to a single coroutine in a collection when using asyncio.gather().

In this case, we will issue 10 coroutines using asyncio.gather() as before, except in this case, one of the coroutines in the list will be wrapped in asyncio.wait_for() with a timeout of 2.5 seconds.

...
# create many coroutines
coros = [task_coro(i) for i in range(10)]
# add a timeout to the first task only
coros[0] = asyncio.wait_for(coros[0], timeout=2.5)

This will not be long enough to complete the task, which will raise an asyncio.TimeoutError exception.

Therefore we will add a try-except structure to the coroutine and handle the exception.

...
try:
    # run the tasks
    results = await asyncio.gather(*coros)
    # report results
    print(results)
except asyncio.TimeoutError:
    print('Timeout waiting for results')

We expect that the timeout will elapse and raise an asyncio.TimeoutError exception which will bubble up to the asyncio.gather() after all other tasks are done.

The timeout will not cancel all other tasks, as it did above when using asyncio.timeout().

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of gather() with timeout on one task
import asyncio

# coroutine used for a task
async def task_coro(value):
    # sleep for a moment
    await asyncio.sleep(5)
    # return a result
    return value * 100

# coroutine used for the entry point
async def main():
    # report a message
    print('Main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # add a timeout to the first task only
    coros[0] = asyncio.wait_for(coros[0], timeout=2.5)
    try:
        # run the tasks
        results = await asyncio.gather(*coros)
        # report results
        print(results)
    except asyncio.TimeoutError:
        print('Timeout waiting for results')
    # report a message
    print('Main done')

# start the asyncio program
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine runs and reports a message. It then creates a list of 10 task_coro() coroutine objects.

The first coroutine in the list is then replaced with a wrapped version that adds a 2.5-second timeout, not long enough for the task to complete.

The list of coroutines is issued for concurrent execution with asyncio.gather().

The coroutines run, each suspending for 5 seconds.

The timeout on the first task elapses and an asyncio.TimeoutError exception is raised. Once all other tasks are done, the asyncio.TimeoutError exception is propagated to the main() coroutine.

The exception is handled and a message is reported.

This highlights how we can add a timeout to a single coroutine when using asyncio.gather().

Main starting
Timeout waiting for results
Main done

Takeaways

You now know how to use asyncio.gather() with a timeout.



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.