Asyncio Condition Variable in Python

December 13, 2022 Python Asyncio

We often need to coordinate the behavior between multiple concurrent tasks.

For example, it is common for one or more tasks to need to wait for an activity to be completed by another task.

Rather than requiring the task to be completed and have other task wait upon it, we can use a concurrency primitive designed for this purpose called a monitor or a condition variable, provided in asyncio via the asyncio.Condition class class.

The condition variable implements the common wait/notify and wait/notify-all patterns for concurrent programming. Dependent tasks can wait on the condition variable and the target task can complete its work and notify all interested parties that a condition has been met and they can resume.

In this tutorial, you will discover how to use the condition variable in asyncio.

After completing this tutorial, you will know:

Let's get started.

What is an Asyncio Condition Variable

In concurrency, a condition (also called a monitor) allows multiple threads to be notified about some result.

It combines both a mutual exclusion lock (mutex) and an event allowing exclusive access and notification.

In essence, a Condition object combines the functionality of an Event and a Lock.

-- Asyncio Synchronization Primitives

A mutex can be used to protect a critical section, but it cannot be used to alert other threads that a condition has changed or been met

An event can be used to notify other threads, but it cannot be used to protect a critical section and enforce mutual exclusion.

A condition can be acquired by a thread (like a mutex) after which it can wait to be notified by another thread that something has changed like an event. While waiting, the thread is blocked and releases the lock for other threads to acquire.

Another thread can then acquire the condition, make a change, and notify one, all, or a subset of threads waiting on the condition that something has changed. The waiting thread can then wake up (be scheduled by the operating system), re-acquire the condition (mutex), perform checks on any changed state, and perform required actions.

The condition variable is typically described in terms of threads. Python provides a condition variable for threads via the threading.Condition class:

Nevertheless, a condition may also be used with other units of concurrency, such as processes via the multiprocessing.Condition class:

It may also be used with coroutines in asyncio via the asyncio.Condition class.

Now that we know what a condition is, let's look at how we might use it in an asyncio program.

How to Use an Asyncio Condition Variable

In this section, we will explore how to use the asyncio condition variable.

Create a Condition Variable

Python provides a condition for use with coroutines via the asyncio.Condition class.

To use a condition variable, we must create an instance of the class.

...
# create a new condition
condition = asyncio.Condition()

The condition object may then be shared and used among multiple asyncio coroutines.

Acquire and Release Condition Variable

In order for a coroutine to make use of the condition, it must acquire it and release it, like a mutex lock.

You can learn more about acquiring and releasing mutex locks in asyncio programs in the tutorial:

This can be achieved manually with the acquire() and release() methods.

Acquiring the condition via acquire() returns a coroutine object that requires that the caller use an await expression.

For example, we can acquire the condition, do something, then release the condition as follows:

...
# acquire the condition
await condition.acquire()
# do something
# ...
# release the condition
condition.release()

An alternative to calling the acquire() and release() methods directly is to use the async context manager, which will perform the acquire/release automatically for us, for example:

...
# acquire the condition
async with condition:
	# do something
	# ...

You can learn more about the "async with" expression in the tutorial:

Wait to be Notified

Once the condition is acquired, we can wait on it.

This will suspend the calling coroutine until another coroutine notices it via the condition with the notify() function (seen later).

This can be achieved via the wait() method that will return a coroutine and must be awaited.

For example:

...
# acquire the condition
async with condition:
	# wait to be notified
	await condition.wait()

Critically, the condition is relinquished while waiting. This allows other coroutines to acquire it in order to also wait or to notify waiting coroutines.

Wait For Expression

The asyncio.Condition class also provides a wait_for() method that can be used to only unlock the waiting coroutine if a condition is met, such as calling a function that returns a boolean value.

The name of the function that returns a boolean value can be provided to the wait_for() method directly.

Like the wait() method, it returns a coroutine object that must be awaited.

For example:

...
# acquire the condition
async with condition:
	# wait to be notified and a function to return true
	await condition.wait_for(all_data_collected)

The condition is only checked when the coroutine is notified. If the condition is not met, the coroutine may be notified many times and will not resume until the condition is met.

Notify Waiting Coroutines

We also must acquire the condition in a coroutine if we wish to notify waiting coroutines. This too can be achieved directly with the acquire/release method calls or via the context manager.

We can notify a single waiting coroutine via the notify() method.

For example:

...
# acquire the condition
async with condition:
	# notify a waiting coroutine
	condition.notify()

The notified coroutine will resume as soon as it can re-acquire the mutex within the condition. This will be attempted automatically as part of its call to wait(), you do not need to do anything extra.

If there are more than one coroutine waiting on the condition, we will not know which coroutine will be notified.

We can also notify a subset of waiting coroutines by setting the “n” argument to an integer number of coroutines to notify, for example:

...
# acquire the condition
async with condition:
	# notify 3 waiting coroutines
	condition.notify(n=3)

Notify All Waiting Coroutines

Finally, we can notify all coroutines waiting on the condition via the notify_all() method.

...
# acquire the condition
async with condition:
	# notify all coroutines waiting on the condition
	condition.notify_all()

A final reminder, a coroutine must acquire the condition before waiting on it or notifying waiting coroutines.

A failure to acquire the condition (the lock within the condition) before performing these actions will result in a RuntimeError.

Now that we know how to use the asyncio.Condition class, let's look at some worked examples.

Example of Wait and Notify With an Asyncio Condition

In this section, we will explore using an asyncio.Condition to notify a waiting coroutine that something has happened.

We will use a task to prepare some data and notify a waiting coroutine, and in the main coroutine, we will create and schedule the new task and use the condition to wait for the work to be completed.

The complete example is listed below.

# SuperFastPython.com
# example of wait/notify with an asyncio condition
import asyncio

# task coroutine
async def task(condition, work_list):
    # block for a moment
    await asyncio.sleep(1)
    # add data to the work list
    work_list.append(33)
    # notify a waiting coroutine that the work is done
    print('Task sending notification...')
    async with condition:
        condition.notify()

# main coroutine
async def main():
    # create a condition
    condition = asyncio.Condition()
    # prepare the work list
    work_list = list()
    # wait to be notified that the data is ready
    print('Main waiting for data...')
    async with condition:
        # create and start the a task
        _ = asyncio.create_task(task(condition, work_list))
        # wait to be notified
        await condition.wait()
    # we know the data is ready
    print(f'Got data: {work_list}')

# run the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine which is used as the entry point into the asyncio program.

The main() coroutine runs and creates the shared condition and the work list.

The main() coroutine then acquires the condition. A new task is created and scheduled, provided the shared condition and work list.

The main() coroutine then waits to be notified, suspending and calling the new scheduled task to run.

The task() coroutine runs. It first blocks for a moment to simulate effort, then adds work to the shared list. The condition is acquired and the waiting coroutine is notified, then releases the condition automatically. The task terminates.

The main() coroutine resumes and reports a final message, showing the updated content of the shared list.

This highlights how we can use a wait-notify pattern between coroutines using a condition variable.

Main waiting for data...
Task sending notification...
Got data: [33]

Next, let's look at how we might notify all waiting coroutines.

Example of wait() and notify_all() With an Asyncio Condition

We can explore how to notify all coroutines waiting on a condition.

In this example, we will start a suite of tasks that will wait on the condition to be notified before performing their processing and reporting a result.

The main coroutine will block for a moment and then notify all waiting coroutines that they can begin processing.

The complete example is listed below.

# SuperFastPython.com
# example of wait/notify all with an asyncio condition
from random import random
import asyncio

# task coroutine
async def task(condition, number):
    # report a message
    print(f'Task {number} waiting...')
    # acquire the condition
    async with condition:
        # wait to be notified
        await condition.wait()
    # generate a random number between 0 and 1
    value = random()
    # block for a moment
    await asyncio.sleep(value)
    # report a result
    print(f'Task {number} got {value}')

# main coroutine
async def main():
    # create a condition
    condition = asyncio.Condition()
    # create and start many tasks
    tasks = [asyncio.create_task(task(condition, i)) for i in range(5)]
    # allow the tasks to run
    await asyncio.sleep(1)
    # acquire the condition
    async with condition:
        # notify all waiting tasks
        condition.notify_all()
    # wait for all tasks to complete
    _ = await asyncio.wait(tasks)

# run the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine which is used as the entry point into the asyncio program.

The main() coroutine runs and creates the shared condition.

It then creates and schedules five tasks, each providing the shared condition and a unique integer from 0 to 4 as arguments.

The main() coroutine then suspends, allowing the tasks to run.

The tasks run one by one. The tasks first report their message, then acquire the condition. Once acquired they wait to be notified.

The main() coroutine resumes. It acquires the condition and then notifies all coroutines waiting on the condition. It then releases the condition and waits for the issued tasks to terminate.

The tasks resume, one at a time. Each task first generates a random number, and blocks for a fraction of a second. Once resumed it reports a message and terminates.

This highlights how we may have many coroutines waiting to be notified and have them all notified.

Task 0 waiting...
Task 1 waiting...
Task 2 waiting...
Task 3 waiting...
Task 4 waiting...
Task 4 got 0.11036200324308998
Task 3 got 0.25497519885869324
Task 1 got 0.36215401216779797
Task 0 got 0.4277021597975379
Task 2 got 0.7073867691766996

Next, let's look at how we might wait for a specific result on the condition.

Example of wait_for() With an Asyncio Condition

We can explore how to use the wait_for() function on the condition.

The wait_for() method takes a callable, such as a function with no arguments or a lambda expression. The coroutine calling the wait_for() method will block until notified and the callable passed in as an argument returns a True value.

This might mean that the coroutine is notified many times by different coroutines, but will only unblock and continue execution once the condition in the callable is met.

In this example, will create a suite of tasks, each of which will calculate a value and add it to a shared list and notify the waiting coroutine.

The main coroutine will wait on the condition and will use a lambda expression in the wait_for() function to not continue on until a work list populated by the worker coroutines is fully populated.

The complete example is listed below.

# SuperFastPython.com
# example of wait for with a condition
from random import random
import asyncio

# task coroutine
async def task(condition, work_list):
    # acquire the condition
    async with condition:
        # generate a random value between 0 and 1
        value = random()
        # block for a moment
        await asyncio.sleep(value)
        # add work to the list
        work_list.append(value)
        print(f'Task added {value}')
        # notify the waiting coroutine
        condition.notify()

# main coroutine
async def main():
    # create a condition
    condition = asyncio.Condition()
    # define work list
    work_list = list()
    # create and start many tasks
    _ = [asyncio.create_task(task(condition, work_list)) for _ in range(5)]
    # acquire the condition
    async with condition:
        # wait to be notified
        await condition.wait_for(lambda : len(work_list)==5)
        # report final message
        print(f'Done, got: {work_list}')

# run the asyncio program
asyncio.run(main())

Running the example first creates the main() coroutine which is used as the entry point into the asyncio program.

The main() coroutine runs and first creates the shared condition variable and work list.

The main() coroutine then creates and schedules five tasks, passing the condition and work list.

The main() coroutine then acquires the condition and waits to be notified and for the lambda expression to be true, which in this case requires that the work list has a length of 5.

The tasks then execute one by one. Each task first acquires the condition, generates a random value, and blocks. Once it resumes it adds a value to the shared work list, reports a message, and notifies the waiting main() coroutine.

The main() coroutine is notified many times but does not resume until the condition is met, after which the final message is reported.

Importantly, only one coroutine can hold or acquire the condition at a time, although recall that the condition is released when waiting to be notified. In this case, the tasks do not wait, although they are suspended. The lock is held during this block, making the block of the condition context manager coroutine-safe, e.g. mutually exclusive.

This highlights how we may use the wait-for and notify pattern with coroutines.

Task added 0.45089927950158515
Task added 0.8931671797432676
Task added 0.06484378382773981
Task added 0.7649095074042099
Task added 0.9954569956651376
Done, got: [0.45089927950158515, 0.8931671797432676, 0.06484378382773981, 0.7649095074042099, 0.9954569956651376]

Takeaways

You now know how to use an asyncio condition variable in Python.



If you enjoyed this tutorial, you will love my book: Python Asyncio Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.