We can add a timeout when using asyncio.gather() via the asyncio.timeout() context manager.
A timeout in seconds can be specified to the asyncio.timeout() context manager and the asyncio.gather() function can be called within the context manager block. If the timeout elapses before all tasks in the gather() are done, an asyncio.TimeoutError exception is raised. Using the asyncio.timeout() context manager requires Python version 3.11 or higher.
In this tutorial, you will discover how to use asyncio.gather() with a timeout.
Let’s get started.
Need Timeout With asyncio.gather()
The asyncio.gather() function allows us to run multiple coroutines or tasks concurrently.
Coroutines can be provided as positional arguments to asyncio.gather() and a list of return values is returned, for example:
1 2 3 |
... # execute coroutines concurrently results = await asyncio.gather(coro1(), coro2()) |
Alternatively, we can create a list of coroutines or tasks and unpack them as positional arguments using the star operator (*).
For example:
1 2 3 4 5 |
... # creates a list of coroutine objects coros = [coro(i) for i in range(100)] # execute coroutines concurrently results = await asyncio.gather(*coros) |
You can learn more about how to use asyncio.gather() in the tutorial:
A limitation of gather() is that it does not support a timeout.
The asyncio.gather() function will suspend until all provided awaitables are done. This means that one long-running coroutine provided to gather() may prevent a program from progressing.
How can we use a timeout with asyncio.gather()?
Run loops using all CPUs, download your FREE book to learn how.
How to Add Timeout To asyncio.gather()
There are many ways that we can use asyncio.gather() with a timeout, or achieve a similar result with another function that adds a timeout.
The main approaches include:
- Use asyncio.timeout()
- Use asyncio.wait_for()
- Use asyncio.wait()
Let’s take a closer look at each approach in turn.
Add a Timeout With asyncio.timeout()
Perhaps the simplest approach to add a timeout when using asyncio.gather() is via the asyncio.timeout() context manager.
For example:
1 2 3 4 5 |
... # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) |
If the timeout elapses before the gather() returns, an asyncio.TimeoutError exception is raised, which can be handled.
For example:
1 2 3 4 5 6 7 8 |
... try: # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) except asyncio.TimeoutError: ... |
Note: Using the asyncio.timeout() context manager requires Python version 3.11 or higher.
You can learn more about how to use asyncio.timeout() in the tutorial:
Add a Timeout With asyncio.wait_for()
Another approach is to wrap the coroutines passed to asyncio.gather() in calls to asyncio.wait_for().
This will convert each coroutine to a task and raise an asyncio.TimeoutError exception if a task is not completed within the time limit.
The asyncio.wait_for() could be applied to all coroutines.
For example:
1 2 3 |
... # create many tasks with timeouts coros = [asyncio.wait_for(task_coro(i)) for i in range(10)] |
We can then issue the asyncio.gather() and handle the exception.
1 2 3 4 5 6 7 8 |
... try: # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) except asyncio.TimeoutError: ... |
Alternatively, it could be applied to selected coroutines that may be expected to take a long time.
For example:
1 2 3 |
... # add a timeout to the first task only coros[0] = asyncio.wait_for(coros[0], timeout=2.5) |
You can learn more about how to use asyncio.wait_for() in the tutorial:
Add a Timeout With asyncio.wait()
Finally, we could replace the asyncio.gather() with asyncio.wait().
It requires that all coroutines be scheduled as tasks first, such as asyncio.create_task().
For example:
1 2 3 |
... # create many tasks coros = [asyncio.create_task(task_coro(i)) for i in range(10)] |
The asyncio.wait() function will then return once all issued tasks are done, allowing a timeout to be specified.
For example:
1 2 3 |
... # wait for all tasks with a timeout done, notdone = = await asyncio.wait(tasks, timeout=5) |
An asyncio.TimeoutError exception is not raised, instead, a tuple of two sets is returned, the first containing all tasks that are done, and the second with those tasks that are still running.
You can learn more about how to use asyncio.wait() in the tutorial:
Now that we know how to wait for coroutines to complete with a timeout, let’s look at some examples.
Example of asyncio.gather() With Slow Tasks
Before we look at adding a timeout to asyncio.gather(), let’s look at using asyncio.gather() with slow coroutines.
In this example, we will define a coroutine that takes an argument, suspends for 5 seconds, and then returns the provided argument multiplied by 100. This is our slow coroutine.
1 2 3 4 5 6 |
# coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(5) # return a result return value * 100 |
We can then create a list of 10 slow coroutines, and execute them concurrently using asyncio.gather(), then report the return values.
1 2 3 4 5 6 7 8 9 10 11 12 |
# coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(10)] # run the tasks results = await asyncio.gather(*coros) # report results print(results) # report a message print('Main done') |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# SuperFastPython.com # example of gather() with slow tasks import asyncio # coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(5) # return a result return value * 100 # coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(10)] # run the tasks results = await asyncio.gather(*coros) # report results print(results) # report a message print('Main done') # start the asyncio program asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine runs and reports a message. It then creates a list of 10 task_coro() coroutine objects and issues them to asyncio.gather() for concurrent execution.
The coroutines run, each suspending for 5 seconds before returning an integer value.
The main() coroutine resumes and receives the return values, which are reported before a final message is reported.
This highlights an example of executing multiple slow coroutines concurrently, motivating the desire to add a timeout.
1 2 3 |
Main starting [0, 100, 200, 300, 400, 500, 600, 700, 800, 900] Main done |
Next, let’s look at how we might add a timeout when using asyncio.gather().
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of asyncio.gather() with Timeout on All Tasks
We can explore how to add a timeout to asyncio.gather() via asyncio.timeout().
In this case, we can update the above example to add an asyncio.timeout() context manager to the main() coroutine with a timeout of 2.5 seconds.
We can then issue the asyncio.gather() within the context manager and report results.
1 2 3 4 5 6 7 |
... # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) # report results print(results) |
The whole timeout() context manager can be wrapped in a try-except structure to handle the timeout.
1 2 3 4 5 6 7 8 9 10 11 |
... try: # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) # report results print(results) except asyncio.TimeoutError: # report a message print('Timeout waiting for gather') |
The updated main() coroutine with this change is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(10)] try: # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) # report results print(results) except asyncio.TimeoutError: # report a message print('Timeout waiting for gather') # report a message print('Main done') |
Tying this together, the complete example is listed below.
Note: This example requires Python 3.11 or higher.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# SuperFastPython.com # example of gather() with timeout import asyncio # coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(5) # return a result return value * 100 # coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(10)] try: # gather tasks with a timeout async with asyncio.timeout(2.5): # run the tasks results = await asyncio.gather(*coros) # report results print(results) except asyncio.TimeoutError: # report a message print('Timeout waiting for gather') # report a message print('Main done') # start the asyncio program asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine runs and reports a message. It then creates a list of 10 task_coro() coroutine objects.
The timeout() context manager is opened with a time limit of 2.5 seconds, not long enough for the coroutines we are going to issue.
Within the block, the list of coroutines is issued for concurrent execution with asyncio.gather().
The coroutines run, each suspending for 5 seconds.
The timeout elapses and the asyncio.timeout() cancels all issued coroutines and raises an asyncio.TimeoutError.
The exception is handled and a message is reported.
This highlights how we can add a timeout when using asyncio.gather().
1 2 3 |
Main starting Timeout waiting for gather Main done |
Next, let’s look at how we might add a timeout to a single coroutine when using asyncio.gather().
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of asyncio.gather() with Timeout on One Task
We can explore an example of adding a timeout to a single coroutine in a collection when using asyncio.gather().
In this case, we will issue 10 coroutines using asyncio.gather() as before, except in this case, one of the coroutines in the list will be wrapped in asyncio.wait_for() with a timeout of 2.5 seconds.
1 2 3 4 5 |
... # create many coroutines coros = [task_coro(i) for i in range(10)] # add a timeout to the first task only coros[0] = asyncio.wait_for(coros[0], timeout=2.5) |
This will not be long enough to complete the task, which will raise an asyncio.TimeoutError exception.
Therefore we will add a try-except structure to the coroutine and handle the exception.
1 2 3 4 5 6 7 8 |
... try: # run the tasks results = await asyncio.gather(*coros) # report results print(results) except asyncio.TimeoutError: print('Timeout waiting for results') |
We expect that the timeout will elapse and raise an asyncio.TimeoutError exception which will bubble up to the asyncio.gather() after all other tasks are done.
The timeout will not cancel all other tasks, as it did above when using asyncio.timeout().
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# SuperFastPython.com # example of gather() with timeout on one task import asyncio # coroutine used for a task async def task_coro(value): # sleep for a moment await asyncio.sleep(5) # return a result return value * 100 # coroutine used for the entry point async def main(): # report a message print('Main starting') # create many coroutines coros = [task_coro(i) for i in range(10)] # add a timeout to the first task only coros[0] = asyncio.wait_for(coros[0], timeout=2.5) try: # run the tasks results = await asyncio.gather(*coros) # report results print(results) except asyncio.TimeoutError: print('Timeout waiting for results') # report a message print('Main done') # start the asyncio program asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine runs and reports a message. It then creates a list of 10 task_coro() coroutine objects.
The first coroutine in the list is then replaced with a wrapped version that adds a 2.5-second timeout, not long enough for the task to complete.
The list of coroutines is issued for concurrent execution with asyncio.gather().
The coroutines run, each suspending for 5 seconds.
The timeout on the first task elapses and an asyncio.TimeoutError exception is raised. Once all other tasks are done, the asyncio.TimeoutError exception is propagated to the main() coroutine.
The exception is handled and a message is reported.
This highlights how we can add a timeout to a single coroutine when using asyncio.gather().
1 2 3 |
Main starting Timeout waiting for results Main done |
Takeaways
You now know how to use asyncio.gather() with a timeout.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jason Blackeye on Unsplash
Do you have any questions?