You can implement non-blocking logging in asyncio programs by using a shared Queue with a QueueHandler to log messages and a QueueListener to store log messages.
In this tutorial, you will discover how to log without blocking from asyncio programs in Python.
Let’s get started.
What is Logging
Logging is a way of tracking events within a program.
There are many types of events that may be logged within a program, ranging in different levels of severity, such as debugging and information to warnings, errors, and critical events.
The logging module provides infrastructure for logging within Python programs.
This module defines functions and classes which implement a flexible event logging system for applications and libraries.
— logging — Logging facility for Python
Logging is achieved by first configuring the log handler and then adding calls to the logging infrastructure at key points in the program.
Handler objects are responsible for dispatching the appropriate log messages (based on the log messages’ severity) to the handler’s specified destination.
— logging — Logging facility for Python
The default handler will report log messages on the command prompt (e.g. terminal or system output stream).
Alternate handlers can be configured to write log messages to a file, to a database, or to custom target locations. The handler can specify the severity of messages to report.
For example, we can log to a file by calling the logging.basicConfig() function and specifying the file name and path to log to (e.g. application.log), the level of logging to capture to the file (e.g. from logging.DEBUG to logging.CRITICAL).
1 2 3 |
... # log everything to file logging.basicConfig(filename='application.log', level=logging.DEBUG) |
Events are logged via function calls based on the type of event performed, e.g. logging.debug() for debugging messages.
Logging provides a set of convenience functions for simple logging usage. These are debug(), info(), warning(), error() and critical().
— Logging HOWTO
For example, we may add information messages to the application code by calling logging.info() and passing in the string details of the event.
1 2 3 |
... # log that data was loaded successfully logger.info('All data was loaded successfully') |
We may also log failures, such as exceptions that are important but not critical to the program via the logger.error() or logger.exception() functions.
For example:
1 2 3 4 5 6 |
... # try a thing try: # ... except Exception as err: logger.exception(f'Unable to perform task because: {err.message} ') |
These function calls can then be added throughout an application to capture events of different types, and the log handler or runtime configuration can be used to filter the severity of the messages that are actually stored.
Now that we know about logging, let’s consider logging in asyncio programs.
Run loops using all CPUs, download your FREE book to learn how.
Logging From Asyncio is Blocking
Logging from an asyncio program is blocking.
This means that logging a is a function call that will prevent the asyncio event loop from progressing.
Often this is not a problem, such as when logging a few messages to standard output.
It does become a problem when there are a large number of log messages and/or the target of the logging infrastructure is a blocking I/O call, such as storing messages to a file, database, or network connection.
… it should be noted that when logging from async code, network and even file handlers could lead to problems (blocking the event loop) because some logging is done from asyncio internals.
— Logging Cookbook
Therefore, it is recommended to perform logging from asyncio using a different thread.
Naively, this could be achieved by wrapping each call to the logging infrastructure in a call like asyncio.to_thread() so that it happens in a new Python thread.
This is cumbersome.
Instead, a better approach is to configure the logging infrastructure so that messages are issued using a data structure and a separate logging thread is responsible for storing log messages and performing any blocking calls to files or network resources.
Network logging can block the event loop. It is recommended to use a separate thread for handling logs or use non-blocking IO.
— Developing with asyncio
Now that we know that logging in asyncio is blocking, let’s explore how we might log in asyncio without blocking.
How to Log From Asyncio Without Blocking
We can log in asyncio programs without blocking using a QueueHandler and QueueListener.
A queue.Queue can be created and used to store log messages.
We can create a QueueHandler that is configured to use our Queue object and store messages in the queue.
This is a quick function call that will return nearly immediately, assuming the size of the Queue is unbounded in memory (reasonable for most programs).
… attach only a QueueHandler to those loggers which are accessed from performance-critical threads. They simply write to their queue, which can be sized to a large enough capacity or initialized with no upper bound to their size. The write to the queue will typically be accepted quickly
— Logging Cookbook
For example:
1 2 3 4 5 6 7 |
... # get the root logger log = logging.getLogger() # create the shared queue que = Queue() # add a handler that uses the shared queue log.addHandler(QueueHandler(que)) |
We can then configure a QueueListener to consume log messages from a shared queue and store them in some way.
The QueueListener takes the shared Queue object as an argument, as well as a logging handler object. This could be a FileHandler, some network-based handler for storing log messages remotely, or even a simple StreamHandler for reporting messages on standard output.
… QueueListener, which has been designed as the counterpart to QueueHandler. A QueueListener is very simple: it’s passed a queue and some handlers, and it fires up an internal thread which listens to its queue for LogRecords sent from QueueHandlers (or any other source of LogRecords, for that matter). The LogRecords are removed from the queue and passed to the handlers for processing.
— Logging Cookbook
For example:
1 2 3 |
... # create a listener for messages on the queue listener = QueueListener(que, StreamHandler()) |
The QueueListener is not connected to the logging infrastructure directly, only the shared queue.
Internally, it has a daemon worker thread that will consume log messages. As such the QueueListener must be started and stopped explicitly.
For example:
1 2 3 4 5 6 |
... # start the listener listener.start() ... # ensure the listener is closed listener.stop() |
In asyncio, we can develop a coroutine that can initialize the logging infrastructure and configure the shared Queue and QueueHandler. It can then configure the QueueListener and manage its life cycle, starting it initially, and stopping it once the task is canceled.
This helper coroutine can then be run as a background task, which will be canceled automatically when the event loop is terminated.
You can learn more about when asyncio tasks are automatically canceled in the tutorial:
Below is an example of a helper coroutine for initializing non-blocking logging in an asyncio program
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# helper coroutine to setup and manage the logger async def init_logger(): # get the root logger log = logging.getLogger() # create the shared queue que = Queue() # add a handler that uses the shared queue log.addHandler(QueueHandler(que)) # log all messages, debug and up log.setLevel(logging.DEBUG) # create a listener for messages on the queue listener = QueueListener(que, StreamHandler()) try: # start the listener listener.start() # report the logger is ready logging.debug(f'Logger has started') # wait forever while True: await asyncio.sleep(60) finally: # report the logger is done logging.debug(f'Logger is shutting down') # ensure the listener is closed listener.stop() |
This coroutine can then be run as a background task as the first step in our asyncio program.
We must suspend the caller in order to allow the background task to run and start the worker thread within the QueueListener.
For example:
1 2 3 4 5 |
... # initialize the logger logger = asyncio.create_task(init_logger()) # allow the logger to start await asyncio.sleep(0) |
If you are new to suspending the current coroutine with a call to asyncio.sleep(0), you can learn more in the tutorial:
This too could be wrapped in a helper coroutine that ensures we avoid the disappearing task bug and that we don’t forget to suspend in order to start the logging infrastructure.
1 2 3 4 5 6 7 8 |
# reference to the logger task LOGGER_TASK = None # coroutine to safely start the logger async def safely_start_logger(): # initialize the logger LOGGER_TASK = asyncio.create_task(init_logger()) # allow the logger to start await asyncio.sleep(0) |
You can learn more about the disappearing task bug in the tutorial:
This helper can then be called as the first line of our asyncio program.
For example:
1 2 3 |
... # initialize the logger await safely_start_logger() |
Now that we know how to configure non-blocking logging in our asyncio program, let’s look at some worked examples.
Free Python Asyncio Course
Download your FREE Asyncio PDF cheat sheet and get BONUS access to my free 7-day crash course on the Asyncio API.
Discover how to use the Python asyncio module including how to define, create, and run new coroutines and how to use non-blocking I/O.
Example of Blocking Asyncio Logging
We can explore an example of logging from an asyncio program that is blocking.
That is, each call to the logging infrastructure blocks until the log message is handleded.
In this case, we will use the default handler, the StreamHandler, where log messages are reported to standard output.
The init_logger() function below initializes the root logger to log all messages from a DEBUG level and up using the default log handler.
1 2 3 4 5 6 |
# helper function to setup the logger def init_logger(): # get the root logger log = logging.getLogger() # log all messages, debug and up log.setLevel(logging.DEBUG) |
We can then define a task that takes a unique id and blocks for a random fraction of 5 seconds, logging a message before and after it is blocked.
This is to ensure we see log messages reported randomly during the operation of our program.
The task() coroutine below implements this.
1 2 3 4 5 6 7 8 |
# task that does work and logs async def task(value): # log a message logging.info(f'Task {value} is starting') # simulate doing work await asyncio.sleep(random() * 5) # log a message logging.info(f'Task {value} is done') |
We can then define the main() coroutine that first initializes the logging infrastructure, and then issues ten tasks using an asyncio.TaskGroup.
This will ensure we have many different tasks logging many messages randomly, all of which are blocking calls.
1 2 3 4 5 6 7 8 9 10 11 12 |
# main coroutine async def main(): # initialize the logger init_logger() # log a message logging.info(f'Main is starting') # issue many tasks async with asyncio.TaskGroup() as group: for i in range(10): _ = group.create_task(task(i)) # log a message logging.info(f'Main is done') |
If you are new to the asyncio.TaskGroup, you can learn how to use it in the tutorial:
Finally, we can start the asyncio event loop.
1 2 3 |
... # start the event loop asyncio.run(main()) |
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# SuperFastPython.com # example of asyncio logging is blocking import logging from random import random import asyncio # helper function to setup the logger def init_logger(): # get the root logger log = logging.getLogger() # log all messages, debug and up log.setLevel(logging.DEBUG) # task that does work and logs async def task(value): # log a message logging.info(f'Task {value} is starting') # simulate doing work await asyncio.sleep(random() * 5) # log a message logging.info(f'Task {value} is done') # main coroutine async def main(): # initialize the logger init_logger() # log a message logging.info(f'Main is starting') # issue many tasks async with asyncio.TaskGroup() as group: for i in range(10): _ = group.create_task(task(i)) # log a message logging.info(f'Main is done') # start the event loop asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine runs and initializes the logger, ensuring that all log messages at a DEBUG level and higher are reported.
The main() coroutine then logs an initial message which is reported on standard output.
A TaskGroup is created and 10 instances of the task() coroutine are created and issued as tasks. The main() coroutine then blocks until all tasks are done.
The task() coroutines run, each reporting its own unique start message, sleeping for a fraction of 5 seconds, then reporting a unique done message.
Once all tasks are done, the main() coroutine resumes and reports a final done message.
This highlights normal logging from an asyncio program, where each call to the logging infrastructure is blocking. Although the performance of the event loop is not affected in this case because we are reporting messages on standard output, performance could be dramatically affected if a file or network handler was used.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
INFO:root:Main is starting INFO:root:Task 0 is starting INFO:root:Task 1 is starting INFO:root:Task 2 is starting INFO:root:Task 3 is starting INFO:root:Task 4 is starting INFO:root:Task 5 is starting INFO:root:Task 6 is starting INFO:root:Task 7 is starting INFO:root:Task 8 is starting INFO:root:Task 9 is starting INFO:root:Task 8 is done INFO:root:Task 5 is done INFO:root:Task 3 is done INFO:root:Task 4 is done INFO:root:Task 2 is done INFO:root:Task 7 is done INFO:root:Task 9 is done INFO:root:Task 0 is done INFO:root:Task 1 is done INFO:root:Task 6 is done INFO:root:Main is done |
Next, let’s explore an example of logging without blocking from asyncio.
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Example of Non-Blocking Asyncio Logging
We can explore an example of non-blocking logging from an asyncio program.
In this case, we can update the above example to initialize the logging infrastructure using our helper coroutine.
This helper first creates a shared queue, and then configured a QueueHandler so that the act of logging is limited to putting a message in the queue. A QueueListener is then created and configured to consume messages from the shared queue in an internal worker thread, and then report those messages to a separate handler, in this case, a StreamHandler that reports log messages on standard output.
The init_logger() coroutine below implements this. It is designed to run as a background task for as long as the program is running, sleeping most of the time. It will be canceled automatically when the event loop is terminated, allowing the QueueListener to be closed correctly and purge any messages that may be on the queue.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# helper coroutine to setup and manage the logger async def init_logger(): # get the root logger log = logging.getLogger() # create the shared queue que = Queue() # add a handler that uses the shared queue log.addHandler(QueueHandler(que)) # log all messages, debug and up log.setLevel(logging.DEBUG) # create a listener for messages on the queue listener = QueueListener(que, StreamHandler()) try: # start the listener listener.start() # report the logger is ready logging.debug(f'Logger has started') # wait forever while True: await asyncio.sleep(60) finally: # report the logger is done logging.debug(f'Logger is shutting down') # ensure the listener is closed listener.stop() |
We can then update the main() coroutine to schedule the init_logger() as a background task and then suspend to allow the task to begin executing.
1 2 3 4 5 6 7 |
# main coroutine async def main(): # initialize the logger logger = asyncio.create_task(init_logger()) # allow the logger to start await asyncio.sleep(0) ... |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# SuperFastPython.com # example of asyncio logging with a queue handler and listener import logging from logging.handlers import QueueHandler from logging.handlers import QueueListener from logging import StreamHandler from random import random from queue import Queue import asyncio # helper coroutine to setup and manage the logger async def init_logger(): # get the root logger log = logging.getLogger() # create the shared queue que = Queue() # add a handler that uses the shared queue log.addHandler(QueueHandler(que)) # log all messages, debug and up log.setLevel(logging.DEBUG) # create a listener for messages on the queue listener = QueueListener(que, StreamHandler()) try: # start the listener listener.start() # report the logger is ready logging.debug(f'Logger has started') # wait forever while True: await asyncio.sleep(60) finally: # report the logger is done logging.debug(f'Logger is shutting down') # ensure the listener is closed listener.stop() # task that does work and logs async def task(value): # log a message logging.info(f'Task {value} is starting') # simulate doing work await asyncio.sleep(random() * 5) # log a message logging.info(f'Task {value} is done') # main coroutine async def main(): # initialize the logger logger = asyncio.create_task(init_logger()) # allow the logger to start await asyncio.sleep(0) # log a message logging.info(f'Main is starting') # issue many tasks async with asyncio.TaskGroup() as group: for i in range(10): _ = group.create_task(task(i)) # log a message logging.info(f'Main is done') # start the event loop asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine then schedules the init_logger() coroutine as a background task and suspends to allow it to run.
The init_logger() task runs and configures the root logger to log all messages at a DEBUG level and above and creates and configures the shared Queue and the QueueHandler that makes use of it and configures the logging infrastructure to use the QueueHandler.
It then creates the QueueListener and starts its internal thread to consume log messages from the shared queue. The init_logger() task then logs an initial message then runs forever in a loop, suspending itself with calls to sleep.
The main() coroutine resumes and logs an initial message which is reported on standard output.
A TaskGroup is created and 10 instances of the task() coroutine are created and issued as tasks. The main() coroutine then blocks until all tasks are done.
The task() coroutines run, each reporting its own unique start message, sleeping for a fraction of 5 seconds, then reporting a unique done message.
The log messages are put into the shared queue via the QueueHandler and the calls return almost immediately. The internal thread of the QueueListener runs and consumes log messages from the queue and reports them to standard output.
Once all tasks are done, the main() coroutine resumes and reports a final done message.
This highlights how we can log from an asyncio program without blocking.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Logger has started Main is starting Task 0 is starting Task 1 is starting Task 2 is starting Task 3 is starting Task 4 is starting Task 5 is starting Task 6 is starting Task 7 is starting Task 8 is starting Task 9 is starting Task 6 is done Task 8 is done Task 9 is done Task 5 is done Task 3 is done Task 7 is done Task 4 is done Task 2 is done Task 1 is done Task 0 is done Main is done Logger is shutting down |
Next, let’s look at how we can make setting up the non-blocking logging infrastructure a little safer.
Example of Safer Non-Blocking Asyncio Logging
We can explore how to log without blocking from an asyncio program in a way that makes setting up the logging infrastructure more robust.
In this case, we will update the above example to use our safely_start_logger() coroutine developed above.
This will ensure that the program cannot suffer from the “disappearing task bug” by keeping a reference to the background logging task. It also ensures that the caller is suspended, which allows the logging infrastructure to begin, meaning the caller only needs to await the safely_start_logger() coroutine and nothing more.
The safely_start_logger() coroutine is listed below.
1 2 3 4 5 6 7 8 |
# reference to the logger task LOGGER_TASK = None # coroutine to safely start the logger async def safely_start_logger(): # initialize the logger LOGGER_TASK = asyncio.create_task(init_logger()) # allow the logger to start await asyncio.sleep(0) |
We can then simply await this coroutine as the first line of our program.
1 2 3 4 5 |
# main coroutine async def main(): # initialize the logger await safely_start_logger() ... |
And that’s it.
Tying this together, the complete example is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# SuperFastPython.com # example of asyncio logging with a queue handler and listener import logging from logging.handlers import QueueHandler from logging.handlers import QueueListener from logging import StreamHandler from random import random from queue import Queue import asyncio # helper coroutine to setup and manage the logger async def init_logger(): # get the root logger log = logging.getLogger() # create the shared queue que = Queue() # add a handler that uses the shared queue log.addHandler(QueueHandler(que)) # log all messages, debug and up log.setLevel(logging.DEBUG) # create a listener for messages on the queue listener = QueueListener(que, StreamHandler()) try: # start the listener listener.start() # report the logger is ready logging.debug(f'Logger has started') # wait forever while True: await asyncio.sleep(60) finally: # report the logger is done logging.debug(f'Logger is shutting down') # ensure the listener is closed listener.stop() # reference to the logger task LOGGER_TASK = None # coroutine to safely start the logger async def safely_start_logger(): # initialize the logger LOGGER_TASK = asyncio.create_task(init_logger()) # allow the logger to start await asyncio.sleep(0) # task that does work and logs async def task(value): # log a message logging.info(f'Task {value} is starting') # simulate doing work await asyncio.sleep(random() * 5) # log a message logging.info(f'Task {value} is done') # main coroutine async def main(): # initialize the logger await safely_start_logger() # log a message logging.info(f'Main is starting') # issue many tasks async with asyncio.TaskGroup() as group: for i in range(10): _ = group.create_task(task(i)) # log a message logging.info(f'Main is done') # start the event loop asyncio.run(main()) |
Running the example first starts the asyncio event loop and runs the main() coroutine.
The main() coroutine awaits the safely_start_logger() coroutine. The safely_start_logger() coroutine then schedules the init_logger() coroutine as a background task, stores a reference to the task in a global variable, and suspends to allow it to run.
The init_logger() task runs and configures the root logger to log all messages at a DEBUG level and above and creates and configures the shared Queue and the QueueHandler that makes use of it and configures the logging infrastructure to use the QueueHandler.
It then creates the QueueListener and starts its internal thread to consume log messages from the shared queue. The init_logger() task then logs an initial message then runs forever in a loop, suspending itself with calls to sleep.
The main() coroutine resumes and logs an initial message which is reported on standard output.
A TaskGroup is created and 10 instances of the task() coroutine are created and issued as tasks. The main() coroutine then blocks until all tasks are done.
The task() coroutines run, each reporting its own unique start message, sleeping for a fraction of 5 seconds, then reporting a unique done message.
The log messages are put into the shared queue via the QueueHandler and the calls return almost immediately. The internal thread of the QueueListener runs and consumes log messages from the queue and reports them to standard output.
Once all tasks are done, the main() coroutine resumes and reports a final done message.
This highlights how we can log from an asyncio program without blocking with a safe coroutine to initialize the logging infrastructure.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Logger has started Main is starting Task 0 is starting Task 1 is starting Task 2 is starting Task 3 is starting Task 4 is starting Task 5 is starting Task 6 is starting Task 7 is starting Task 8 is starting Task 9 is starting Task 1 is done Task 3 is done Task 9 is done Task 8 is done Task 4 is done Task 2 is done Task 6 is done Task 5 is done Task 0 is done Task 7 is done Main is done Logger is shutting down |
Further Reading
This section provides additional resources that you may find helpful.
Python Asyncio Books
- Python Asyncio Mastery, Jason Brownlee (my book!)
- Python Asyncio Jump-Start, Jason Brownlee.
- Python Asyncio Interview Questions, Jason Brownlee.
- Asyncio Module API Cheat Sheet
I also recommend the following books:
- Python Concurrency with asyncio, Matthew Fowler, 2022.
- Using Asyncio in Python, Caleb Hattingh, 2020.
- asyncio Recipes, Mohamed Mustapha Tahrioui, 2019.
Guides
APIs
- asyncio — Asynchronous I/O
- Asyncio Coroutines and Tasks
- Asyncio Streams
- Asyncio Subprocesses
- Asyncio Queues
- Asyncio Synchronization Primitives
References
Takeaways
You now know how to log without blocking from asyncio programs in Python.
Did I make a mistake? See a typo?
I’m a simple humble human. Correct me, please!
Do you have any additional tips?
I’d love to hear about them!
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Photo by Jose Mizrahi on Unsplash
Do you have any questions?