Asyncio Logging Without Blocking

January 4, 2024 Python Asyncio

You can implement non-blocking logging in asyncio programs by using a shared Queue with a QueueHandler to log messages and a QueueListener to store log messages.

In this tutorial, you will discover how to log without blocking from asyncio programs in Python.

Let's get started.

What is Logging

Logging is a way of tracking events within a program.

There are many types of events that may be logged within a program, ranging in different levels of severity, such as debugging and information to warnings, errors, and critical events.

The logging module provides infrastructure for logging within Python programs.

This module defines functions and classes which implement a flexible event logging system for applications and libraries.

-- logging — Logging facility for Python

Logging is achieved by first configuring the log handler and then adding calls to the logging infrastructure at key points in the program.

Handler objects are responsible for dispatching the appropriate log messages (based on the log messages' severity) to the handler's specified destination.

-- logging — Logging facility for Python

The default handler will report log messages on the command prompt (e.g. terminal or system output stream).

Alternate handlers can be configured to write log messages to a file, to a database, or to custom target locations. The handler can specify the severity of messages to report.

For example, we can log to a file by calling the logging.basicConfig() function and specifying the file name and path to log to (e.g. application.log), the level of logging to capture to the file (e.g. from logging.DEBUG to logging.CRITICAL).

...
# log everything to file
logging.basicConfig(filename='application.log', level=logging.DEBUG)

Events are logged via function calls based on the type of event performed, e.g. logging.debug() for debugging messages.

Logging provides a set of convenience functions for simple logging usage. These are debug(), info(), warning(), error() and critical().

-- Logging HOWTO

For example, we may add information messages to the application code by calling logging.info() and passing in the string details of the event.

...
# log that data was loaded successfully
logger.info('All data was loaded successfully')

We may also log failures, such as exceptions that are important but not critical to the program via the logger.error() or logger.exception() functions.

For example:

...
# try a thing
try:
    # ...
except Exception as err:
    logger.exception(f'Unable to perform task because: {err.message} ')

These function calls can then be added throughout an application to capture events of different types, and the log handler or runtime configuration can be used to filter the severity of the messages that are actually stored.

Now that we know about logging, let's consider logging in asyncio programs.

Logging From Asyncio is Blocking

Logging from an asyncio program is blocking.

This means that logging a is a function call that will prevent the asyncio event loop from progressing.

Often this is not a problem, such as when logging a few messages to standard output.

It does become a problem when there are a large number of log messages and/or the target of the logging infrastructure is a blocking I/O call, such as storing messages to a file, database, or network connection.

... it should be noted that when logging from async code, network and even file handlers could lead to problems (blocking the event loop) because some logging is done from asyncio internals.

-- Logging Cookbook

Therefore, it is recommended to perform logging from asyncio using a different thread.

Naively, this could be achieved by wrapping each call to the logging infrastructure in a call like asyncio.to_thread() so that it happens in a new Python thread.

This is cumbersome.

Instead, a better approach is to configure the logging infrastructure so that messages are issued using a data structure and a separate logging thread is responsible for storing log messages and performing any blocking calls to files or network resources.

Network logging can block the event loop. It is recommended to use a separate thread for handling logs or use non-blocking IO.

-- Developing with asyncio

Now that we know that logging in asyncio is blocking, let's explore how we might log in asyncio without blocking.

How to Log From Asyncio Without Blocking

We can log in asyncio programs without blocking using a QueueHandler and QueueListener.

A queue.Queue can be created and used to store log messages.

We can create a QueueHandler that is configured to use our Queue object and store messages in the queue.

This is a quick function call that will return nearly immediately, assuming the size of the Queue is unbounded in memory (reasonable for most programs).

... attach only a QueueHandler to those loggers which are accessed from performance-critical threads. They simply write to their queue, which can be sized to a large enough capacity or initialized with no upper bound to their size. The write to the queue will typically be accepted quickly

-- Logging Cookbook

For example:

...
# get the root logger
log = logging.getLogger()
# create the shared queue
que = Queue()
# add a handler that uses the shared queue
log.addHandler(QueueHandler(que))

We can then configure a QueueListener to consume log messages from a shared queue and store them in some way.

The QueueListener takes the shared Queue object as an argument, as well as a logging handler object. This could be a FileHandler, some network-based handler for storing log messages remotely, or even a simple StreamHandler for reporting messages on standard output.

... QueueListener, which has been designed as the counterpart to QueueHandler. A QueueListener is very simple: it's passed a queue and some handlers, and it fires up an internal thread which listens to its queue for LogRecords sent from QueueHandlers (or any other source of LogRecords, for that matter). The LogRecords are removed from the queue and passed to the handlers for processing.

-- Logging Cookbook

For example:

...
# create a listener for messages on the queue
listener = QueueListener(que, StreamHandler())

The QueueListener is not connected to the logging infrastructure directly, only the shared queue.

Internally, it has a daemon worker thread that will consume log messages. As such the QueueListener must be started and stopped explicitly.

For example:

...
# start the listener
listener.start()
...
# ensure the listener is closed
listener.stop()

In asyncio, we can develop a coroutine that can initialize the logging infrastructure and configure the shared Queue and QueueHandler. It can then configure the QueueListener and manage its life cycle, starting it initially, and stopping it once the task is canceled.

This helper coroutine can then be run as a background task, which will be canceled automatically when the event loop is terminated.

You can learn more about when asyncio tasks are automatically canceled in the tutorial:

Below is an example of a helper coroutine for initializing non-blocking logging in an asyncio program

# helper coroutine to setup and manage the logger
async def init_logger():
    # get the root logger
    log = logging.getLogger()
    # create the shared queue
    que = Queue()
    # add a handler that uses the shared queue
    log.addHandler(QueueHandler(que))
    # log all messages, debug and up
    log.setLevel(logging.DEBUG)
    # create a listener for messages on the queue
    listener = QueueListener(que, StreamHandler())
    try:
        # start the listener
        listener.start()
        # report the logger is ready
        logging.debug(f'Logger has started')
        # wait forever
        while True:
            await asyncio.sleep(60)
    finally:
        # report the logger is done
        logging.debug(f'Logger is shutting down')
        # ensure the listener is closed
        listener.stop()

This coroutine can then be run as a background task as the first step in our asyncio program.

We must suspend the caller in order to allow the background task to run and start the worker thread within the QueueListener.

For example:

...
# initialize the logger
logger = asyncio.create_task(init_logger())
# allow the logger to start
await asyncio.sleep(0)

If you are new to suspending the current coroutine with a call to asyncio.sleep(0), you can learn more in the tutorial:

This too could be wrapped in a helper coroutine that ensures we avoid the disappearing task bug and that we don't forget to suspend in order to start the logging infrastructure.

# reference to the logger task
LOGGER_TASK = None
# coroutine to safely start the logger
async def safely_start_logger():
    # initialize the logger
    LOGGER_TASK = asyncio.create_task(init_logger())
    # allow the logger to start
    await asyncio.sleep(0)

You can learn more about the disappearing task bug in the tutorial:

This helper can then be called as the first line of our asyncio program.

For example:

...
# initialize the logger
await safely_start_logger()

Now that we know how to configure non-blocking logging in our asyncio program, let's look at some worked examples.

Example of Blocking Asyncio Logging

We can explore an example of logging from an asyncio program that is blocking.

That is, each call to the logging infrastructure blocks until the log message is handleded.

In this case, we will use the default handler, the StreamHandler, where log messages are reported to standard output.

The init_logger() function below initializes the root logger to log all messages from a DEBUG level and up using the default log handler.

# helper function to setup the logger
def init_logger():
    # get the root logger
    log = logging.getLogger()
    # log all messages, debug and up
    log.setLevel(logging.DEBUG)

We can then define a task that takes a unique id and blocks for a random fraction of 5 seconds, logging a message before and after it is blocked.

This is to ensure we see log messages reported randomly during the operation of our program.

The task() coroutine below implements this.

# task that does work and logs
async def task(value):
    # log a message
    logging.info(f'Task {value} is starting')
    # simulate doing work
    await asyncio.sleep(random() * 5)
    # log a message
    logging.info(f'Task {value} is done')

We can then define the main() coroutine that first initializes the logging infrastructure, and then issues ten tasks using an asyncio.TaskGroup.

This will ensure we have many different tasks logging many messages randomly, all of which are blocking calls.

# main coroutine
async def main():
    # initialize the logger
    init_logger()
    # log a message
    logging.info(f'Main is starting')
    # issue many tasks
    async with asyncio.TaskGroup() as group:
        for i in range(10):
            _ = group.create_task(task(i))
    # log a message
    logging.info(f'Main is done')

If you are new to the asyncio.TaskGroup, you can learn how to use it in the tutorial:

Finally, we can start the asyncio event loop.

...
# start the event loop
asyncio.run(main())

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of asyncio logging is blocking
import logging
from random import random
import asyncio

# helper function to setup the logger
def init_logger():
    # get the root logger
    log = logging.getLogger()
    # log all messages, debug and up
    log.setLevel(logging.DEBUG)

# task that does work and logs
async def task(value):
    # log a message
    logging.info(f'Task {value} is starting')
    # simulate doing work
    await asyncio.sleep(random() * 5)
    # log a message
    logging.info(f'Task {value} is done')

# main coroutine
async def main():
    # initialize the logger
    init_logger()
    # log a message
    logging.info(f'Main is starting')
    # issue many tasks
    async with asyncio.TaskGroup() as group:
        for i in range(10):
            _ = group.create_task(task(i))
    # log a message
    logging.info(f'Main is done')

# start the event loop
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine runs and initializes the logger, ensuring that all log messages at a DEBUG level and higher are reported.

The main() coroutine then logs an initial message which is reported on standard output.

A TaskGroup is created and 10 instances of the task() coroutine are created and issued as tasks. The main() coroutine then blocks until all tasks are done.

The task() coroutines run, each reporting its own unique start message, sleeping for a fraction of 5 seconds, then reporting a unique done message.

Once all tasks are done, the main() coroutine resumes and reports a final done message.

This highlights normal logging from an asyncio program, where each call to the logging infrastructure is blocking. Although the performance of the event loop is not affected in this case because we are reporting messages on standard output, performance could be dramatically affected if a file or network handler was used.

INFO:root:Main is starting
INFO:root:Task 0 is starting
INFO:root:Task 1 is starting
INFO:root:Task 2 is starting
INFO:root:Task 3 is starting
INFO:root:Task 4 is starting
INFO:root:Task 5 is starting
INFO:root:Task 6 is starting
INFO:root:Task 7 is starting
INFO:root:Task 8 is starting
INFO:root:Task 9 is starting
INFO:root:Task 8 is done
INFO:root:Task 5 is done
INFO:root:Task 3 is done
INFO:root:Task 4 is done
INFO:root:Task 2 is done
INFO:root:Task 7 is done
INFO:root:Task 9 is done
INFO:root:Task 0 is done
INFO:root:Task 1 is done
INFO:root:Task 6 is done
INFO:root:Main is done

Next, let's explore an example of logging without blocking from asyncio.

Example of Non-Blocking Asyncio Logging

We can explore an example of non-blocking logging from an asyncio program.

In this case, we can update the above example to initialize the logging infrastructure using our helper coroutine.

This helper first creates a shared queue, and then configured a QueueHandler so that the act of logging is limited to putting a message in the queue. A QueueListener is then created and configured to consume messages from the shared queue in an internal worker thread, and then report those messages to a separate handler, in this case, a StreamHandler that reports log messages on standard output.

The init_logger() coroutine below implements this. It is designed to run as a background task for as long as the program is running, sleeping most of the time. It will be canceled automatically when the event loop is terminated, allowing the QueueListener to be closed correctly and purge any messages that may be on the queue.

# helper coroutine to setup and manage the logger
async def init_logger():
    # get the root logger
    log = logging.getLogger()
    # create the shared queue
    que = Queue()
    # add a handler that uses the shared queue
    log.addHandler(QueueHandler(que))
    # log all messages, debug and up
    log.setLevel(logging.DEBUG)
    # create a listener for messages on the queue
    listener = QueueListener(que, StreamHandler())
    try:
        # start the listener
        listener.start()
        # report the logger is ready
        logging.debug(f'Logger has started')
        # wait forever
        while True:
            await asyncio.sleep(60)
    finally:
        # report the logger is done
        logging.debug(f'Logger is shutting down')
        # ensure the listener is closed
        listener.stop()

We can then update the main() coroutine to schedule the init_logger() as a background task and then suspend to allow the task to begin executing.

# main coroutine
async def main():
    # initialize the logger
    logger = asyncio.create_task(init_logger())
    # allow the logger to start
    await asyncio.sleep(0)
    ...

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of asyncio logging with a queue handler and listener
import logging
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from logging import StreamHandler
from random import random
from queue import Queue
import asyncio

# helper coroutine to setup and manage the logger
async def init_logger():
    # get the root logger
    log = logging.getLogger()
    # create the shared queue
    que = Queue()
    # add a handler that uses the shared queue
    log.addHandler(QueueHandler(que))
    # log all messages, debug and up
    log.setLevel(logging.DEBUG)
    # create a listener for messages on the queue
    listener = QueueListener(que, StreamHandler())
    try:
        # start the listener
        listener.start()
        # report the logger is ready
        logging.debug(f'Logger has started')
        # wait forever
        while True:
            await asyncio.sleep(60)
    finally:
        # report the logger is done
        logging.debug(f'Logger is shutting down')
        # ensure the listener is closed
        listener.stop()

# task that does work and logs
async def task(value):
    # log a message
    logging.info(f'Task {value} is starting')
    # simulate doing work
    await asyncio.sleep(random() * 5)
    # log a message
    logging.info(f'Task {value} is done')

# main coroutine
async def main():
    # initialize the logger
    logger = asyncio.create_task(init_logger())
    # allow the logger to start
    await asyncio.sleep(0)
    # log a message
    logging.info(f'Main is starting')
    # issue many tasks
    async with asyncio.TaskGroup() as group:
        for i in range(10):
            _ = group.create_task(task(i))
    # log a message
    logging.info(f'Main is done')

# start the event loop
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine then schedules the init_logger() coroutine as a background task and suspends to allow it to run.

The init_logger() task runs and configures the root logger to log all messages at a DEBUG level and above and creates and configures the shared Queue and the QueueHandler that makes use of it and configures the logging infrastructure to use the QueueHandler.

It then creates the QueueListener and starts its internal thread to consume log messages from the shared queue. The init_logger() task then logs an initial message then runs forever in a loop, suspending itself with calls to sleep.

The main() coroutine resumes and logs an initial message which is reported on standard output.

A TaskGroup is created and 10 instances of the task() coroutine are created and issued as tasks. The main() coroutine then blocks until all tasks are done.

The task() coroutines run, each reporting its own unique start message, sleeping for a fraction of 5 seconds, then reporting a unique done message.

The log messages are put into the shared queue via the QueueHandler and the calls return almost immediately. The internal thread of the QueueListener runs and consumes log messages from the queue and reports them to standard output.

Once all tasks are done, the main() coroutine resumes and reports a final done message.

This highlights how we can log from an asyncio program without blocking.

Logger has started
Main is starting
Task 0 is starting
Task 1 is starting
Task 2 is starting
Task 3 is starting
Task 4 is starting
Task 5 is starting
Task 6 is starting
Task 7 is starting
Task 8 is starting
Task 9 is starting
Task 6 is done
Task 8 is done
Task 9 is done
Task 5 is done
Task 3 is done
Task 7 is done
Task 4 is done
Task 2 is done
Task 1 is done
Task 0 is done
Main is done
Logger is shutting down

Next, let's look at how we can make setting up the non-blocking logging infrastructure a little safer.

Example of Safer Non-Blocking Asyncio Logging

We can explore how to log without blocking from an asyncio program in a way that makes setting up the logging infrastructure more robust.

In this case, we will update the above example to use our safely_start_logger() coroutine developed above.

This will ensure that the program cannot suffer from the "disappearing task bug" by keeping a reference to the background logging task. It also ensures that the caller is suspended, which allows the logging infrastructure to begin, meaning the caller only needs to await the safely_start_logger() coroutine and nothing more.

The safely_start_logger() coroutine is listed below.

# reference to the logger task
LOGGER_TASK = None
# coroutine to safely start the logger
async def safely_start_logger():
    # initialize the logger
    LOGGER_TASK = asyncio.create_task(init_logger())
    # allow the logger to start
    await asyncio.sleep(0)

We can then simply await this coroutine as the first line of our program.

# main coroutine
async def main():
    # initialize the logger
    await safely_start_logger()
    ...

And that's it.

Tying this together, the complete example is listed below.

# SuperFastPython.com
# example of asyncio logging with a queue handler and listener
import logging
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from logging import StreamHandler
from random import random
from queue import Queue
import asyncio

# helper coroutine to setup and manage the logger
async def init_logger():
    # get the root logger
    log = logging.getLogger()
    # create the shared queue
    que = Queue()
    # add a handler that uses the shared queue
    log.addHandler(QueueHandler(que))
    # log all messages, debug and up
    log.setLevel(logging.DEBUG)
    # create a listener for messages on the queue
    listener = QueueListener(que, StreamHandler())
    try:
        # start the listener
        listener.start()
        # report the logger is ready
        logging.debug(f'Logger has started')
        # wait forever
        while True:
            await asyncio.sleep(60)
    finally:
        # report the logger is done
        logging.debug(f'Logger is shutting down')
        # ensure the listener is closed
        listener.stop()

# reference to the logger task
LOGGER_TASK = None
# coroutine to safely start the logger
async def safely_start_logger():
    # initialize the logger
    LOGGER_TASK = asyncio.create_task(init_logger())
    # allow the logger to start
    await asyncio.sleep(0)

# task that does work and logs
async def task(value):
    # log a message
    logging.info(f'Task {value} is starting')
    # simulate doing work
    await asyncio.sleep(random() * 5)
    # log a message
    logging.info(f'Task {value} is done')

# main coroutine
async def main():
    # initialize the logger
    await safely_start_logger()
    # log a message
    logging.info(f'Main is starting')
    # issue many tasks
    async with asyncio.TaskGroup() as group:
        for i in range(10):
            _ = group.create_task(task(i))
    # log a message
    logging.info(f'Main is done')

# start the event loop
asyncio.run(main())

Running the example first starts the asyncio event loop and runs the main() coroutine.

The main() coroutine awaits the safely_start_logger() coroutine. The safely_start_logger() coroutine then schedules the init_logger() coroutine as a background task, stores a reference to the task in a global variable, and suspends to allow it to run.

The init_logger() task runs and configures the root logger to log all messages at a DEBUG level and above and creates and configures the shared Queue and the QueueHandler that makes use of it and configures the logging infrastructure to use the QueueHandler.

It then creates the QueueListener and starts its internal thread to consume log messages from the shared queue. The init_logger() task then logs an initial message then runs forever in a loop, suspending itself with calls to sleep.

The main() coroutine resumes and logs an initial message which is reported on standard output.

A TaskGroup is created and 10 instances of the task() coroutine are created and issued as tasks. The main() coroutine then blocks until all tasks are done.

The task() coroutines run, each reporting its own unique start message, sleeping for a fraction of 5 seconds, then reporting a unique done message.

The log messages are put into the shared queue via the QueueHandler and the calls return almost immediately. The internal thread of the QueueListener runs and consumes log messages from the queue and reports them to standard output.

Once all tasks are done, the main() coroutine resumes and reports a final done message.

This highlights how we can log from an asyncio program without blocking with a safe coroutine to initialize the logging infrastructure.

Logger has started
Main is starting
Task 0 is starting
Task 1 is starting
Task 2 is starting
Task 3 is starting
Task 4 is starting
Task 5 is starting
Task 6 is starting
Task 7 is starting
Task 8 is starting
Task 9 is starting
Task 1 is done
Task 3 is done
Task 9 is done
Task 8 is done
Task 4 is done
Task 2 is done
Task 6 is done
Task 5 is done
Task 0 is done
Task 7 is done
Main is done
Logger is shutting down

Takeaways

You now know how to log without blocking from asyncio programs in Python.