How to Use Asyncio as_completed() in Python

November 24, 2022 Python Asyncio

It is common to issue many tasks at once, then need to process the results from each task as the tasks are completed.

This can be more efficient than waiting for all tasks to complete before handling the results.

We can achieve this and iterate tasks in the order they are done using the as_completed() function.

In this tutorial, you will discover how to use the as_completed() function in asyncio.

After completing this tutorial, you will know:

Let's get started.

What is Asyncio as_completed()

The asyncio.as_completed() function will run a collection of tasks and coroutines concurrently.

More importantly, it returns an iterable that we can use to retrieve the awaitables in the order that they are completed.

Run awaitable objects in the aws iterable concurrently. Return an iterator of coroutines. Each coroutine returned can be awaited to get the earliest next result from the iterable of the remaining awaitables.

-- Coroutines and Tasks

It allows us to execute many coroutines or tasks concurrently and get the results from tasks in the order that they are completed, rather than the order we issued them.

Now that we know what as_completed() is, let's look at how we might use it.

How to Use Asyncio as_completed()

The asyncio.as_completed() function is called with a collection of awaitables.

This may be a list, dict, or set, and may contain asyncio.Task objects, coroutines, or other awaitables.

Any coroutines provided to as_completed() will be wrapped in a Task object for independent execution.

It returns an iterable that when traversed will yield awaitables (or wrapped awaitables) in the provided list. These can be awaited by the caller in order to get results in the order that tasks are completed, e.g. get the result from the next task to complete.

For example:

...
# iterate over awaitables
for task in asyncio.as_completed(tasks):
	# get the next result
	result = await task

The as_completed() also takes a "timeout" argument.

This specifies how long the caller is willing to wait for all awaitables to be done.

For example:

...
# iterate over awaitables with a timeout
for task in asyncio.as_completed(tasks, timeout=10):
	# get the next result
	result = await task

If the timeout elapses before all awaitables are done, a asyncio.TimeoutError is raised and may need to be handled.

For example, we could handle it within the loop:

...
# iterate over awaitables with a timeout
for task in asyncio.as_completed(tasks, timeout=10):
	# handle a timeout
	try:
		# get the next result
		result = await task
	except asyncio.TimeoutError:
		# ...

This is not desirable because once the timeout has elapsed, an asyncio.TimeoutError will be raised each time next() is called on the generator.

Therefore, it is better to wrap the entire loop in a try-except block.

For example:

...
# handle a timeout
try:
	# iterate over awaitables with a timeout
	for task in asyncio.as_completed(tasks, timeout=10):
		# get the next result
		result = await task
except asyncio.TimeoutError:
	# ...

Now that we know how to use the as_completed() function, let's take a moment to consider how it works.

How does Asyncio as_completed() Work

The works by providing a generator that yields coroutines, where each coroutine will return a result of a provided awaitable.

The asyncio.as_completed() function does not block (suspend the current coroutine) but instead returns a generator.

For example:

...
# get a generator that yields awaitables in completion order
generator = asyncio.as_completed(tasks)

Calling the next() built-in function on the generator does not block, but instead yields a coroutine.

The returned coroutine is not one of the provided awaitables, but rather an internal coroutine from the as_completed() function that manages and monitors which issued task will return a result next.

For example:

...
# get the next coroutine
coro = next(generator)

It is not until one of the returned coroutines is awaited that the caller will block.

For example:

...
# get a result from the next task to complete
result = await coro

Now that we have an idea of how to use the as_completed() function and how it works, let's look at some worked examples.

Example of as_completed() with Coroutines

We can explore how to execute coroutines concurrently and get coroutine results as tasks are completed with the asyncio.as_completed() function.

In this example, we will define a simple coroutine task that takes an integer argument, generates a random value, sleeps for a fraction of a second then returns the integer argument multiplied by the generated value.

A list of the task coroutine is created and passed to the as_completed() function. This returns a generator that is traversed using a for loop.

In each iteration of the loop a coroutine is yielded from the generator and is then awaited in the body of the loop. A result from the next coroutine to complete is returned and the result is reported.

The complete example is listed below.

# SuperFastPython.com
# example of getting coroutine results as completed
from random import random
import asyncio

# coroutine to execute in a new task
async def task_coro(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    await asyncio.sleep(value)
    # return the result
    return arg * value

# main coroutine
async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # get results as coroutines are completed
    for coro in asyncio.as_completed(coros):
        # get the result from the next coroutine to complete
        result = await coro
        # report the result
        print(f'>got {result}')

# start the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and then uses this as the entry point into the asyncio program.

The main() coroutine runs and creates a list of coroutines.

It then passes this list to the as_completed() function which returns a generator. The generator is traversed in a for loop and each iteration yields a coroutine.

The coroutine is awaited. This suspends the main() coroutine.

The tasks begin executing, generating a random value, and sleeping. A task finishes and returns a value.

The main() coroutine resumes, receives the return value, and reports it.

The loop repeats, another coroutine is yielded, the main() coroutine awaits it and suspends, and another result is returned.

This continues until all coroutines in the provided list are completed.

This example highlights that we can traverse a collection of coroutines and get and use results in the order that tasks are completed.

Results will differ each time the example is run given the use of random numbers.

>got 0.07236962530232949
>got 0.5171864910147306
>got 0.7153626682953872
>got 2.54812333824902
>got 0.5960648114598495
>got 5.051883987489034
>got 0.0
>got 2.842043340472799
>got 6.343694133393031
>got 4.903128525746293

Next, let's look at how we might use the as_completed() function with asyncio.Task objects.

Example of as_completed() with Tasks

We can explore how to execute asyncio.Task objects concurrently and gets coroutine results as tasks are completed with the asyncio.as_completed() function.

In this example, we will update the above example to create a list of asyncio.Task objects instead of coroutines.

Recall that a Task object wraps a coroutine. Once created, a task is scheduled for execution immediately and will be executed independently.

You can learn more about asyncio tasks in the tutorial:

The complete example is listed below.

# SuperFastPython.com
# example of getting task results as completed
from random import random
import asyncio

# coroutine to execute in a new task
async def task_coro(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    await asyncio.sleep(value)
    # return the result
    return arg * value

# main coroutine
async def main():
    # create many tasks
    coros = [asyncio.create_task(task_coro(i)) for i in range(10)]
    # get results as tasks are completed
    for coro in asyncio.as_completed(coros):
        # get the result from the next task to complete
        result = await coro
        # report the result
        print(f'>got {result}')

# start the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and then uses this as the entry point into the asyncio program.

The main() coroutine runs and creates a list of Task objects, instead of coroutines.

Each task is scheduled for independent execution after it is created. This is different from the coroutines in the previous example that are not scheduled until the as_completed() function wraps them in Task objects.

The main() coroutine then passes this list to the as_completed() function which returns a generator. The generator is traversed in a for loop and each iteration yields a coroutine.

The coroutine is awaited. This suspends the main() coroutine.

The tasks begin executing, generating a random value, and sleeping. A task finishes and returns a value.

The main() coroutine resumes, receives the return value, and reports it.

This continues until all coroutines in the provided list are completed.

This example highlights that the as_completed() function can be used with Task objects as well as coroutines that we saw in the previous example.

Results will differ each time the example is run given the use of random numbers.

>got 0.3453975438487853
>got 0.5871779512599952
>got 0.0
>got 0.6767468346945187
>got 0.9905365580643459
>got 1.9274714575238143
>got 0.5127918804896211
>got 1.308261888746516
>got 4.12075681907987
>got 7.128076208913287

Example of as_completed() with a Timeout

We can explore how to execute coroutines concurrently with the as_completed() function and a timeout and handle cases where results take longer than the expected timeout.

In this example, we call the as_completed() function with a timeout that is large enough for all tasks to be completed.

This is intentional as we want to explore how to handle the case that a collection of tasks takes too long to be done.

The complete example is listed below.

# SuperFastPython.com
# example of getting coroutine results as completed with a timeout
from random import random
import asyncio

# coroutine to execute in a new task
async def task_coro(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    await asyncio.sleep(value)
    # return the result
    return arg * value

# main coroutine
async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # handle a timeout
    try:
        # get results as coroutines are completed
        for coro in asyncio.as_completed(coros, timeout=0.5):
            # get the result from the next coroutine to complete
            result = await coro
            # report the result
            print(f'>got {result}')
    except asyncio.TimeoutError:
        print('Gave up after timeout')

# start the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and then uses this as the entry point into the asyncio program.

The main() coroutine runs and creates a list of coroutines.

It then passes this list to the as_completed() function with a timeout.

A generator is returned and traversed in a for loop and each iteration yields a coroutine.

The coroutine is awaited. This suspends the main() coroutine.

The tasks begin executing, generating a random value, and sleeping. A task finishes and returns a value.

The main() coroutine resumes, receives the return value, and reports it.

This continues until the timeout elapses, after which, the next call to await a returned coroutine raises a TimeoutError exception.

This breaks the loop and is caught, reporting a message.

This example highlights how we can impose a limit on how long the caller is willing to wait for all tasks to be completed and handle the case where the timeout elapses before all results are retrieved.

Results will differ each time the example is run given the use of random numbers.

>got 1.1794285132416094
>got 0.2733126502808916
>got 0.9659219853827947
>got 1.8523793561409736
>got 0.0
>got 3.4477334386446854
Gave up after timeout

Example of as_completed() with an Exception

We can explore how to execute coroutines concurrently with as_completed() and one of the tasks fails with an exception.

In this example, we update the above example so that one of the task coroutines intentionally fails with an exception.

Tasks can fail and it is important to be aware of this and to consider handling a task failure when waiting for results.

You can learn more about exceptions in asyncio tasks in the tutorial:

The complete example is listed below.

# SuperFastPython.com
# example of getting coroutine results as completed with an exception
from random import random
import asyncio

# coroutine to execute in a new task
async def task_coro(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    await asyncio.sleep(value)
    # check if the task should fail
    if value > 0.5:
        raise Exception('Something bad happened')
    # return the result
    return arg * value

# main coroutine
async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # get results as coroutines are completed
    for coro in asyncio.as_completed(coros):
        # get the result from the next coroutine to complete
        result = await coro
        # report the result
        print(f'>got {result}')

# start the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and then uses this as the entry point into the asyncio program.

The main() coroutine runs and creates a list of coroutines.

It then passes this list to the as_completed() function which returns a generator. The generator is traversed in a for loop and each iteration yields a coroutine.

The coroutine is awaited. This suspends the main() coroutine.

The tasks begin executing, generating a random value, and sleeping. A task finishes and returns a value.

The main() coroutine resumes, receives the return value, and reports it.

This continues for a number of iterations.

Then, one of the tasks fails with an exception. When it is awaited, the exception is re-raised by the caller.

The exception is not expected and therefore it unwinds the asyncio program and terminates the main thread.

This example highlights what happens if a task in the collection fails with an exception. It reminds us to be careful when awaiting results from a coroutine and to potentially handle exceptions that could be raised.

Results will differ each time the example is run given the use of random numbers.

>got 0.05191041233869953
>got 0.3949931321512907
>got 0.1518973920395813
>got 2.125424835725659
>got 0.0
Traceback (most recent call last):
  ...
Exception: Something bad happened

Example of as_completed() with "async for"

We can explore a common error where the "async for" expression is used with the generator returned from the as_completed() function.

The "async for" expression is used for asynchronous for-loops, specifically with asynchronous iterators and asynchronous generators.

You can learn more about the "async for" expression in the tutorial:

The as_completed() function does not return an asynchronous generator, it returns a traditional generator.

Therefore, using "async for" with the as_completed() function is an error.

In this example, we intentionally use the "async for" expression with the as_completed() function to demonstrate this error.

The complete example is listed below.

# SuperFastPython.com
# example of getting coroutine results as completed and async for
from random import random
import asyncio

# coroutine to execute in a new task
async def task_coro(arg):
    # generate a random value between 0 and 1
    value = random()
    # block for a moment
    await asyncio.sleep(value)
    # return the result
    return arg * value

# main coroutine
async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # get results as coroutines are completed
    async for coro in asyncio.as_completed(coros):
        # get the result from the next coroutine to complete
        result = await coro
        # report the result
        print(f'>got {result}')

# start the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine and then uses this as the entry point into the asyncio program.

The main() coroutine runs and creates a list of coroutines.

It then passes them to the as_completed() function. This returns a generator that is iterated using an "async for" expression.

This fails because the as_completed() function returns a traditional generator, not an asynchronous generator.

The fix involves changing the "async for" expression to a traditional "for" expression.

For example:

...
# get results as coroutines are completed
for coro in asyncio.as_completed(coros):
    # get the result from the next coroutine to complete
    result = await coro

This example highlights a common error when using the as_completd() function and how to overcome it.

Traceback (most recent call last):
  ...
TypeError: 'async for' requires an object with __aiter__ method, got generator
sys:1: RuntimeWarning: coroutine 'task_coro' was never awaited

Takeaways

You now know how to use the asyncio.as_completed() function in Python.



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.