Multiprocessing Pool Logging From Worker Processes
You can log from worker processes in the multiprocessing pool using a shared multiprocessing.Queue and a logging.handlers.QueueHandler.
In this tutorial you will discover how to log from worker processes in the multiprocessing pool in Python.
Let's get started.
Need to Log from Worker Processes
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A multiprocessing pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
-- multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the multiprocessing pool, we may want to log directly from tasks.
This may be a problem as the tasks are executed by child worker processes, and logging to a central location from multiple processes is challenging.
How can we log safely from child worker processes in the multiprocessing pool?
How to Log from Worker Processes
Logging is a way of tracking events within a program.
There are many types of events that may be logged within a program, ranging in different levels of severity, such as debugging and information to warnings, errors and critical events.
The logging module provides infrastructure for logging within Python programs.
Effective logging from multiple processes requires custom code.
A few approaches are described, including:
- Use a socket handler and send messages to a logging process.
- Use a custom handler that internally uses a shared mutex to ensure one process logs at a time.
- Use a queue handler that uses a shared queue to send messages to a logging process.
All approaches are similar in that they require that log messages are serialized before being stored.
This can be achieved by serializing messages on a shared multiprocessing.Queue via a logging.handlers.QueueHandler and letting one process read messages one at a time and store them to stream or file.
A multiprocessing.Queue can be passed to a child process, such as a new multiprocessing.Process instance. You can see an example of this in the tutorial:
However, a multiprocessing.Queue cannot be passed directly to child worker processes in the Pool as an argument to a task function.
Attempting to do so will result in an error, such as:
RuntimeError: Queue objects should only be shared between processes through inheritance
Instead, it must be created using a multiprocessing.Manager. This will create a centralized version of the Queue in a server process and return a proxy object to interact with the centralized queue that is safe to pass around.
For example:
...
# create the manager
with Manager() as manager:
# create the shared queue and get the proxy object
queue = manager.Queue()
# ...
A QueueHandler log handler can then be created and added to the logging infrastructure by each process using the shared queue.
For example:
...
# create a logger
logger = logging.getLogger('app')
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
This can be performed by the task functions executed by child worker processes in the multiprocessing pool.
Another process can then read from the shared queue and report messages either directly or by using the logging infrastructure.
For example:
...
# consume a log message, block until one arrives
message = queue.get()
# report the message
print(message)
You can learn more about how to log safely from multiple processes in the tutorial:
Next, let's explore an example of how to log from child worker processes in the multiprocessing pool.
Example of Logging from Worker Processes
We can explore how to log from worker processes in the multiprocessing pool in a process-safe manner.
In this example we will create a number of child processes to execute a custom function. Each task will log messages and the messages will be sent to a separate logging process and logged in a serial (one-at-a-time) manner.
This can be achieved by configuring the logging infrastructure in each process to use a logging.handlers.QueueHandler that will send log messages to a shared multiprocessing.Queue.
A new child process will then consume messages from the queue one-at-a-time and store or report them, in this case on stderr using a logging.StreamHandler, but it could just as easily be a file.
Importantly, in this example, messages cannot be lost or corrupted as only a single process is responsible for handling the log messages, and log messages are sent to this one process safely and reliably using a process-aware queue data structure.
Define Centralized Logger Function
Firstly, we need to define a function to run in a child process responsible for logging.
This function will take a shared multiprocessing.Queue as an argument.
# executed in a process that performs logging
def logger_process(queue):
# ...
We need to configure the logging infrastructure in this process to specify what level of messages to report and where to store them. In this case, we will get a named log for our application called "app", and send all messages debug level and up to stderr using a StreamHandler.
...
# create a logger
logger = logging.getLogger('app')
# configure a stream handler
logger.addHandler(logging.StreamHandler())
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
# report that the logger process is running
logger.info(f'Logger process running.')
This child process will then run forever, reading messages from the queue and logging them. If there are no messages on the queue, it will block until a message arrives.
...
# run forever
while True:
# consume a log message, block until one arrives
message = queue.get()
If you are new to multiprocessing queues, you can learn more in the tutorial:
If the message is a special message None, then the look is broken and the process will terminate normally. This is called a sentinel message and must be sent by the main process before the program exits.
...
# check for shutdown
if message is None:
logger.info(f'Logger process shutting down.')
break
Otherwise, if it is a normal log message, we can let our logging infrastructure for this process handle it, e.g. log it to stderr.
...
# log the message
logger.handle(message)
Tying this together, the complete function for running a child process responsible for logging is listed below.
# executed in a process that performs logging
def logger_process(queue):
# create a logger
logger = logging.getLogger('app')
# configure a stream handler
logger.addHandler(logging.StreamHandler())
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
# report that the logger process is running
logger.info(f'Logger process running.')
# run forever
while True:
# consume a log message, block until one arrives
message = queue.get()
# check for shutdown
if message is None:
logger.info(f'Logger process shutting down.')
break
# log the message
logger.handle(message)
Define Custom Task Function
Next, we can define a custom function to execute in the process pool.
The function takes the shared queue as an argument.
# task to be executed in child processes
def task(queue):
# ...
We can then configure a new logger instance to use a QueueHandler that makes use of the shared queue.
...
# create a logger
logger = logging.getLogger('app')
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
We can then get the current process so that we can include the process name in the messages.
...
# get the current process
process = current_process()
The child process can then log a message, loop a few times and report a message each iteration and block for a random fraction of a second, then log a message before exiting.
...
# report initial message
logging.info(f'Child {process.name} starting.')
# simulate doing work
for i in range(5):
# report a message
logging.debug(f'Child {process.name} step {i}.')
# block
# sleep(random())
# report final message
logging.info(f'Child {process.name} done.')
Tying this together, the complete task() function is listed below.
# task to be executed in child processes
def task(queue):
# create a logger
logger = logging.getLogger('app')
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
# get the current process
process = current_process()
# report initial message
logger.info(f'Child {process.name} starting.')
# simulate doing work
for i in range(5):
# report a message
logger.debug(f'Child {process.name} step {i}.')
# block
sleep(random())
# report final message
logger.info(f'Child {process.name} done.')
Create Shared Queue
Next, in the main process, we can create the shared queue.
Importantly, we cannot just create a multiprocessing.Queue and share it with tasks in the multiprocessing pool.
This will result in a RuntimeError:
RuntimeError: Queue objects should only be shared between processes through inheritance
Instead, we must create a multiprocessing.Manager and use the manager to create a Queue.
The manager is a server process that will manage a centralized version of shared objects. It returns proxy objects that we can share among processes and use safely to interact with the centralized object.
We can create a manager using the context manager interface to ensure it is closed once we are finished with it.
...
# create the manager
with Manager() as manager:
# ...
Next, we can use the manager to create the shared queue.
...
# create the shared queue and get the proxy object
queue = manager.Queue()
We want to log from the main process in order to report progress.
The main process can then configure its logging infrastructure to use the QueueHandler so that messages are sent to the logging child process.
...
# create the shared queue
queue = Queue()
# create a logger
logger = logging.getLogger('app')
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
Create Multiprocessing Pool
Finally, we can create the multiprocessing pool and execute our logging process task and custom task functions.
First, we can create an instance of the multiprocessing pool using the context manager interface to ensure that the pool is closed once we are finished with it.
...
# create the process pool with default configuration
with Pool() as pool:
# ...
You can learn more about the context manager interface for the pool in the tutorial:
Next, we can issue the long running task that receives and reports log messages.
In this case, we will issue this task asynchronously using the apply_async() function and pass in the shared queue.
...
# issue a long running task to receive logging messages
_ = pool.apply_async(logger_process, args=(queue,))
# report initial message
logger.info('Main process started.')
You can learn more about how to use the apply_async() function in the tutorial:
Next, we can execute our tasks in the multiprocessing pool by calling our custom task() function multiple times and pass in the share queue for logging.
We will use the apply_async() function and gather the AsyncResult objects returned from each call as we will need them in a moment.
This can be achieved in a list comprehension.
...
# issue task to the process pool
results = [pool.apply_async(task, args=(queue,)) for i in range(5)]
Next, we can wait for the five issued tasks to complete by waiting on the AsyncResult object for each task.
This can be achieved in a loop by calling the AsyncResult.wait() function.
...
# wait for all issued tasks to complete
logger.info('Main process waiting...')
for result in results:
result.wait()
Once all tasks have completed, we can shutdown the logging process in the pool.
We could just terminate the process pool directly, but instead, we will shutdown the logging process in a controlled manner.
This is achieved by putting the special None sentinel value on the queue which will instruct the logging process to exit its loop and terminate.
...
# report final message
logger.info('Main process done.')
# shutdown the long running logger task
queue.put(None)
We can then close the multiprocessing pool, which will prevent it from receiving any further tasks, then wait for the logging process to complete, and for all worker processes to terminate.
...
# close the process pool
pool.close()
# wait for all tasks to complete (e.g. the logger to close)
pool.join()
Complete Example
Tying all of this together, the complete example of logging from workers in the multiprocessing pool is listed below.
# SuperFastPython.com
# example of logging from multiple workers in the multiprocessing pool
from random import random
from time import sleep
from multiprocessing import current_process
from multiprocessing import Pool
from multiprocessing import Queue
from multiprocessing import Manager
from logging.handlers import QueueHandler
import logging
# executed in a process that performs logging
def logger_process(queue):
# create a logger
logger = logging.getLogger('app')
# configure a stream handler
logger.addHandler(logging.StreamHandler())
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
# report that the logger process is running
logger.info(f'Logger process running.')
# run forever
while True:
# consume a log message, block until one arrives
message = queue.get()
# check for shutdown
if message is None:
logger.info(f'Logger process shutting down.')
break
# log the message
logger.handle(message)
# task to be executed in child processes
def task(queue):
# create a logger
logger = logging.getLogger('app')
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
# get the current process
process = current_process()
# report initial message
logger.info(f'Child {process.name} starting.')
# simulate doing work
for i in range(5):
# report a message
logger.debug(f'Child {process.name} step {i}.')
# block
sleep(random())
# report final message
logger.info(f'Child {process.name} done.')
# protect the entry point
if __name__ == '__main__':
# create the manager
with Manager() as manager:
# create the shared queue and get the proxy object
queue = manager.Queue()
# create a logger
logger = logging.getLogger('app')
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
# log all messages, debug and up
logger.setLevel(logging.DEBUG)
# create the process pool with default configuration
with Pool() as pool:
# issue a long running task to receive logging messages
_ = pool.apply_async(logger_process, args=(queue,))
# report initial message
logger.info('Main process started.')
# issue task to the process pool
results = [pool.apply_async(task, args=(queue,)) for i in range(5)]
# wait for all issued tasks to complete
logger.info('Main process waiting...')
for result in results:
result.wait()
# report final message
logger.info('Main process done.')
# shutdown the long running logger task
queue.put(None)
# close the process pool
pool.close()
# wait for all tasks to complete (e.g. the logger to close)
pool.join()
Running the example first creates the manager and the shared queue, then configures the logging infrastructure in the main process to use the QueueHandler with the shared queue.
This ensures that any logging in the main process is sent to the logging process.
Next, the logging task is configured and started in the multiprocessing pool. It is responsible for reading messages from the shared queue and logging them to a central location, in this case to standard error.
Five tasks are then issued to the multiprocessing pool. The main process then waits for each of the issued tasks to complete.
Each task runs and configures its logging infrastructure to use a QueueHandler with the shared queue. The tasks then perform their simulated work and log their messages which are all sent into the shared queue to the logging process.
The logging process reads messages one-at-a-time from the queue and lets its local logging infrastructure handle them, logging them to stderr.
The tasks finish and the main process carries on. It sends a None message into the queue which triggers the logging task to exit its loop and terminate.
The main process then waits for the pool to close completely for terminating the program.
Logger process running.
Main process started.
Main process waiting...
Child SpawnPoolWorker-3 starting.
Child SpawnPoolWorker-3 step 0.
Child SpawnPoolWorker-5 starting.
Child SpawnPoolWorker-5 step 0.
Child SpawnPoolWorker-4 starting.
Child SpawnPoolWorker-4 step 0.
Child SpawnPoolWorker-6 starting.
Child SpawnPoolWorker-6 step 0.
Child SpawnPoolWorker-7 starting.
Child SpawnPoolWorker-7 step 0.
Child SpawnPoolWorker-7 step 1.
Child SpawnPoolWorker-6 step 1.
Child SpawnPoolWorker-3 step 1.
Child SpawnPoolWorker-5 step 1.
Child SpawnPoolWorker-4 step 1.
Child SpawnPoolWorker-6 step 2.
Child SpawnPoolWorker-5 step 2.
Child SpawnPoolWorker-5 step 3.
Child SpawnPoolWorker-7 step 2.
Child SpawnPoolWorker-5 step 4.
Child SpawnPoolWorker-6 step 3.
Child SpawnPoolWorker-7 step 3.
Child SpawnPoolWorker-3 step 2.
Child SpawnPoolWorker-4 step 2.
Child SpawnPoolWorker-6 step 4.
Child SpawnPoolWorker-5 done.
Child SpawnPoolWorker-7 step 4.
Child SpawnPoolWorker-3 step 3.
Child SpawnPoolWorker-6 done.
Child SpawnPoolWorker-4 step 3.
Child SpawnPoolWorker-3 step 4.
Child SpawnPoolWorker-7 done.
Child SpawnPoolWorker-4 step 4.
Child SpawnPoolWorker-4 done.
Child SpawnPoolWorker-3 done.
Main process done.
Logger process shutting down.
Takeaways
You now know how to log from worker processes in the multiprocessing pool.
If you enjoyed this tutorial, you will love my book: Python Multiprocessing Pool Jump-Start. It covers everything you need to master the topic with hands-on examples and clear explanations.