Last Updated on September 12, 2022
You can log from worker processes in the multiprocessing pool using a shared multiprocessing.Queue and a logging.handlers.QueueHandler.
In this tutorial you will discover how to log from worker processes in the multiprocessing pool in Python.
Let’s get started.
Need to Log from Worker Processes
The multiprocessing.pool.Pool in Python provides a pool of reusable processes for executing ad hoc tasks.
A multiprocessing pool can be configured when it is created, which will prepare the child workers.
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
— multiprocessing — Process-based parallelism
We can issue one-off tasks to the process pool using functions such as apply() or we can apply the same function to an iterable of items using functions such as map().
Results for issued tasks can then be retrieved synchronously, or we can retrieve the result of tasks later by using asynchronous versions of the functions such as apply_async() and map_async().
When using the multiprocessing pool, we may want to log directly from tasks.
This may be a problem as the tasks are executed by child worker processes, and logging to a central location from multiple processes is challenging.
How can we log safely from child worker processes in the multiprocessing pool?
Run loops using all CPUs, download your FREE book to learn how.
How to Log from Worker Processes
Logging is a way of tracking events within a program.
There are many types of events that may be logged within a program, ranging in different levels of severity, such as debugging and information to warnings, errors and critical events.
The logging module provides infrastructure for logging within Python programs.
Effective logging from multiple processes requires custom code.
A few approaches are described, including:
- Use a socket handler and send messages to a logging process.
- Use a custom handler that internally uses a shared mutex to ensure one process logs at a time.
- Use a queue handler that uses a shared queue to send messages to a logging process.
All approaches are similar in that they require that log messages are serialized before being stored.
This can be achieved by serializing messages on a shared multiprocessing.Queue via a logging.handlers.QueueHandler and letting one process read messages one at a time and store them to stream or file.
A multiprocessing.Queue can be passed to a child process, such as a new multiprocessing.Process instance. You can see an example of this in the tutorial:
However, a multiprocessing.Queue cannot be passed directly to child worker processes in the Pool as an argument to a task function.
Attempting to do so will result in an error, such as:
1 |
RuntimeError: Queue objects should only be shared between processes through inheritance |
Instead, it must be created using a multiprocessing.Manager. This will create a centralized version of the Queue in a server process and return a proxy object to interact with the centralized queue that is safe to pass around.
For example:
1 2 3 4 5 6 |
... # create the manager with Manager() as manager: # create the shared queue and get the proxy object queue = manager.Queue() # ... |
A QueueHandler log handler can then be created and added to the logging infrastructure by each process using the shared queue.
For example:
1 2 3 4 5 |
... # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) |
This can be performed by the task functions executed by child worker processes in the multiprocessing pool.
Another process can then read from the shared queue and report messages either directly or by using the logging infrastructure.
For example:
1 2 3 4 5 |
... # consume a log message, block until one arrives message = queue.get() # report the message print(message) |
You can learn more about how to log safely from multiple processes in the tutorial:
Next, let’s explore an example of how to log from child worker processes in the multiprocessing pool.
Example of Logging from Worker Processes
We can explore how to log from worker processes in the multiprocessing pool in a process-safe manner.
In this example we will create a number of child processes to execute a custom function. Each task will log messages and the messages will be sent to a separate logging process and logged in a serial (one-at-a-time) manner.
This can be achieved by configuring the logging infrastructure in each process to use a logging.handlers.QueueHandler that will send log messages to a shared multiprocessing.Queue.
A new child process will then consume messages from the queue one-at-a-time and store or report them, in this case on stderr using a logging.StreamHandler, but it could just as easily be a file.
Importantly, in this example, messages cannot be lost or corrupted as only a single process is responsible for handling the log messages, and log messages are sent to this one process safely and reliably using a process-aware queue data structure.
Define Centralized Logger Function
Firstly, we need to define a function to run in a child process responsible for logging.
This function will take a shared multiprocessing.Queue as an argument.
1 2 3 |
# executed in a process that performs logging def logger_process(queue): # ... |
We need to configure the logging infrastructure in this process to specify what level of messages to report and where to store them. In this case, we will get a named log for our application called “app“, and send all messages debug level and up to stderr using a StreamHandler.
1 2 3 4 5 6 7 8 9 |
... # create a logger logger = logging.getLogger('app') # configure a stream handler logger.addHandler(logging.StreamHandler()) # log all messages, debug and up logger.setLevel(logging.DEBUG) # report that the logger process is running logger.info(f'Logger process running.') |
This child process will then run forever, reading messages from the queue and logging them. If there are no messages on the queue, it will block until a message arrives.
1 2 3 4 5 |
... # run forever while True: # consume a log message, block until one arrives message = queue.get() |
If you are new to multiprocessing queues, you can learn more in the tutorial:
If the message is a special message None, then the look is broken and the process will terminate normally. This is called a sentinel message and must be sent by the main process before the program exits.
1 2 3 4 5 |
... # check for shutdown if message is None: logger.info(f'Logger process shutting down.') break |
Otherwise, if it is a normal log message, we can let our logging infrastructure for this process handle it, e.g. log it to stderr.
1 2 3 |
... # log the message logger.handle(message) |
Tying this together, the complete function for running a child process responsible for logging is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# executed in a process that performs logging def logger_process(queue): # create a logger logger = logging.getLogger('app') # configure a stream handler logger.addHandler(logging.StreamHandler()) # log all messages, debug and up logger.setLevel(logging.DEBUG) # report that the logger process is running logger.info(f'Logger process running.') # run forever while True: # consume a log message, block until one arrives message = queue.get() # check for shutdown if message is None: logger.info(f'Logger process shutting down.') break # log the message logger.handle(message) |
Define Custom Task Function
Next, we can define a custom function to execute in the process pool.
The function takes the shared queue as an argument.
1 2 3 |
# task to be executed in child processes def task(queue): # ... |
We can then configure a new logger instance to use a QueueHandler that makes use of the shared queue.
1 2 3 4 5 6 7 |
... # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) |
We can then get the current process so that we can include the process name in the messages.
1 2 3 |
... # get the current process process = current_process() |
The child process can then log a message, loop a few times and report a message each iteration and block for a random fraction of a second, then log a message before exiting.
1 2 3 4 5 6 7 8 9 10 11 |
... # report initial message logging.info(f'Child {process.name} starting.') # simulate doing work for i in range(5): # report a message logging.debug(f'Child {process.name} step {i}.') # block # sleep(random()) # report final message logging.info(f'Child {process.name} done.') |
Tying this together, the complete task() function is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# task to be executed in child processes def task(queue): # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) # get the current process process = current_process() # report initial message logger.info(f'Child {process.name} starting.') # simulate doing work for i in range(5): # report a message logger.debug(f'Child {process.name} step {i}.') # block sleep(random()) # report final message logger.info(f'Child {process.name} done.') |
Create Shared Queue
Next, in the main process, we can create the shared queue.
Importantly, we cannot just create a multiprocessing.Queue and share it with tasks in the multiprocessing pool.
This will result in a RuntimeError:
1 |
RuntimeError: Queue objects should only be shared between processes through inheritance |
Instead, we must create a multiprocessing.Manager and use the manager to create a Queue.
The manager is a server process that will manage a centralized version of shared objects. It returns proxy objects that we can share among processes and use safely to interact with the centralized object.
We can create a manager using the context manager interface to ensure it is closed once we are finished with it.
1 2 3 4 |
... # create the manager with Manager() as manager: # ... |
Next, we can use the manager to create the shared queue.
1 2 3 |
... # create the shared queue and get the proxy object queue = manager.Queue() |
We want to log from the main process in order to report progress.
The main process can then configure its logging infrastructure to use the QueueHandler so that messages are sent to the logging child process.
1 2 3 4 5 6 7 8 9 |
... # create the shared queue queue = Queue() # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) |
Create Multiprocessing Pool
Finally, we can create the multiprocessing pool and execute our logging process task and custom task functions.
First, we can create an instance of the multiprocessing pool using the context manager interface to ensure that the pool is closed once we are finished with it.
1 2 3 4 |
... # create the process pool with default configuration with Pool() as pool: # ... |
You can learn more about the context manager interface for the pool in the tutorial:
Next, we can issue the long running task that receives and reports log messages.
In this case, we will issue this task asynchronously using the apply_async() function and pass in the shared queue.
1 2 3 4 5 |
... # issue a long running task to receive logging messages _ = pool.apply_async(logger_process, args=(queue,)) # report initial message logger.info('Main process started.') |
You can learn more about how to use the apply_async() function in the tutorial:
Next, we can execute our tasks in the multiprocessing pool by calling our custom task() function multiple times and pass in the share queue for logging.
We will use the apply_async() function and gather the AsyncResult objects returned from each call as we will need them in a moment.
This can be achieved in a list comprehension.
1 2 3 |
... # issue task to the process pool results = [pool.apply_async(task, args=(queue,)) for i in range(5)] |
Next, we can wait for the five issued tasks to complete by waiting on the AsyncResult object for each task.
This can be achieved in a loop by calling the AsyncResult.wait() function.
1 2 3 4 5 |
... # wait for all issued tasks to complete logger.info('Main process waiting...') for result in results: result.wait() |
Once all tasks have completed, we can shutdown the logging process in the pool.
We could just terminate the process pool directly, but instead, we will shutdown the logging process in a controlled manner.
This is achieved by putting the special None sentinel value on the queue which will instruct the logging process to exit its loop and terminate.
1 2 3 4 5 |
... # report final message logger.info('Main process done.') # shutdown the long running logger task queue.put(None) |
We can then close the multiprocessing pool, which will prevent it from receiving any further tasks, then wait for the logging process to complete, and for all worker processes to terminate.
1 2 3 4 5 |
... # close the process pool pool.close() # wait for all tasks to complete (e.g. the logger to close) pool.join() |
Complete Example
Tying all of this together, the complete example of logging from workers in the multiprocessing pool is listed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# SuperFastPython.com # example of logging from multiple workers in the multiprocessing pool from random import random from time import sleep from multiprocessing import current_process from multiprocessing import Pool from multiprocessing import Queue from multiprocessing import Manager from logging.handlers import QueueHandler import logging # executed in a process that performs logging def logger_process(queue): # create a logger logger = logging.getLogger('app') # configure a stream handler logger.addHandler(logging.StreamHandler()) # log all messages, debug and up logger.setLevel(logging.DEBUG) # report that the logger process is running logger.info(f'Logger process running.') # run forever while True: # consume a log message, block until one arrives message = queue.get() # check for shutdown if message is None: logger.info(f'Logger process shutting down.') break # log the message logger.handle(message) # task to be executed in child processes def task(queue): # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) # get the current process process = current_process() # report initial message logger.info(f'Child {process.name} starting.') # simulate doing work for i in range(5): # report a message logger.debug(f'Child {process.name} step {i}.') # block sleep(random()) # report final message logger.info(f'Child {process.name} done.') # protect the entry point if __name__ == '__main__': # create the manager with Manager() as manager: # create the shared queue and get the proxy object queue = manager.Queue() # create a logger logger = logging.getLogger('app') # add a handler that uses the shared queue logger.addHandler(QueueHandler(queue)) # log all messages, debug and up logger.setLevel(logging.DEBUG) # create the process pool with default configuration with Pool() as pool: # issue a long running task to receive logging messages _ = pool.apply_async(logger_process, args=(queue,)) # report initial message logger.info('Main process started.') # issue task to the process pool results = [pool.apply_async(task, args=(queue,)) for i in range(5)] # wait for all issued tasks to complete logger.info('Main process waiting...') for result in results: result.wait() # report final message logger.info('Main process done.') # shutdown the long running logger task queue.put(None) # close the process pool pool.close() # wait for all tasks to complete (e.g. the logger to close) pool.join() |
Running the example first creates the manager and the shared queue, then configures the logging infrastructure in the main process to use the QueueHandler with the shared queue.
This ensures that any logging in the main process is sent to the logging process.
Next, the logging task is configured and started in the multiprocessing pool. It is responsible for reading messages from the shared queue and logging them to a central location, in this case to standard error.
Five tasks are then issued to the multiprocessing pool. The main process then waits for each of the issued tasks to complete.
Each task runs and configures its logging infrastructure to use a QueueHandler with the shared queue. The tasks then perform their simulated work and log their messages which are all sent into the shared queue to the logging process.
The logging process reads messages one-at-a-time from the queue and lets its local logging infrastructure handle them, logging them to stderr.
The tasks finish and the main process carries on. It sends a None message into the queue which triggers the logging task to exit its loop and terminate.
The main process then waits for the pool to close completely for terminating the program.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
Logger process running. Main process started. Main process waiting... Child SpawnPoolWorker-3 starting. Child SpawnPoolWorker-3 step 0. Child SpawnPoolWorker-5 starting. Child SpawnPoolWorker-5 step 0. Child SpawnPoolWorker-4 starting. Child SpawnPoolWorker-4 step 0. Child SpawnPoolWorker-6 starting. Child SpawnPoolWorker-6 step 0. Child SpawnPoolWorker-7 starting. Child SpawnPoolWorker-7 step 0. Child SpawnPoolWorker-7 step 1. Child SpawnPoolWorker-6 step 1. Child SpawnPoolWorker-3 step 1. Child SpawnPoolWorker-5 step 1. Child SpawnPoolWorker-4 step 1. Child SpawnPoolWorker-6 step 2. Child SpawnPoolWorker-5 step 2. Child SpawnPoolWorker-5 step 3. Child SpawnPoolWorker-7 step 2. Child SpawnPoolWorker-5 step 4. Child SpawnPoolWorker-6 step 3. Child SpawnPoolWorker-7 step 3. Child SpawnPoolWorker-3 step 2. Child SpawnPoolWorker-4 step 2. Child SpawnPoolWorker-6 step 4. Child SpawnPoolWorker-5 done. Child SpawnPoolWorker-7 step 4. Child SpawnPoolWorker-3 step 3. Child SpawnPoolWorker-6 done. Child SpawnPoolWorker-4 step 3. Child SpawnPoolWorker-3 step 4. Child SpawnPoolWorker-7 done. Child SpawnPoolWorker-4 step 4. Child SpawnPoolWorker-4 done. Child SpawnPoolWorker-3 done. Main process done. Logger process shutting down. |
Free Python Multiprocessing Pool Course
Download your FREE Process Pool PDF cheat sheet and get BONUS access to my free 7-day crash course on the Process Pool API.
Discover how to use the Multiprocessing Pool including how to configure the number of workers and how to execute tasks asynchronously.
Further Reading
This section provides additional resources that you may find helpful.
Books
- Multiprocessing Pool Jump-Start, Jason Brownlee (my book!)
- Multiprocessing API Interview Questions
- Pool Class API Cheat Sheet
I would also recommend specific chapters from these books:
- Effective Python, Brett Slatkin, 2019.
- See: Chapter 7: Concurrency and Parallelism
- High Performance Python, Ian Ozsvald and Micha Gorelick, 2020.
- See: Chapter 9: The multiprocessing Module
- Python in a Nutshell, Alex Martelli, et al., 2017.
- See: Chapter: 14: Threads and Processes
Guides
- Python Multiprocessing Pool: The Complete Guide
- Python ThreadPool: The Complete Guide
- Python Multiprocessing: The Complete Guide
- Python ProcessPoolExecutor: The Complete Guide
APIs
References
Overwhelmed by the python concurrency APIs?
Find relief, download my FREE Python Concurrency Mind Maps
Takeaways
You now know how to log from worker processes in the multiprocessing pool.
Do you have any questions?
Ask your questions in the comments below and I will do my best to answer.
Denis says
Hi Jason,
I enjoy your writing style and topics you write about.
I spent some time understanding your code and I’m trying your final example with python 3.10.
The output looks quite strange, do you know how to fix it?
As you can see, the logger writes multiple times the same messages: Logger process running, Main process started, Main process started, Main process waiting… But they should appear just once.
Thanks,
Denis
Here is more output:
Logger process running.Main process started.Main process waiting…Logger process running.Child ForkPoolWorker-4 starting.Main process started.Child ForkPoolWorker-4 starting.Main process waiting…Child ForkPoolWorker-3 starting.Child ForkPoolWorker-4 step 0.Child ForkPoolWorker-3 starting.Child ForkPoolWorker-4 step 0.Logger process running.Child ForkPoolWorker-3 step 0.Child ForkPoolWorker-3 step 0.Child ForkPoolWorker-4 starting.Main process started.Child ForkPoolWorker-4 starting.Main process waiting…
Jason Brownlee says
Sorry to hear that.
I suspect it is your development environment. I can confirm it works as expected in Python v3.10 and 3.11 from the command line and in sublime text.
Perhaps you can try running the example from the command line or a different IDE?
Stef Rommes says
The difference between the two of you is Windows/Mac versus Linux. Windows/Mac spawns new processess while Linux forks instead.
See https://pythonspeed.com/articles/python-multiprocessing/ for more info.
Making it so Linux starts spawning instead of forking is a possible solution.
from multiprocessing import get_context
with get_context("spawn").Pool() as pool:
# ... everything else is unchanged
Jason Brownlee says
Thanks for sharing.
There is more on this here:
https://superfastpython.com/multiprocessing-pool-context/
Denis Memmesheimer says
Hi Jason,
I tried the https://superfastpython.com/multiprocessing-pool-logging/#Complete_Example using different python3 versions in the console in order to exclude environment problems – same result.
I’m testing it in Linux: Ubuntu 22.04.1 LTS
For a comparison I tried online environment, result is the same: https://onecompiler.com/python/3yvnpcgwc
Here is the screenshot from the online environment: https://imgur.com/RpB3mjR
Justin Rice says
Hi Jason, I love your tutorials, the ML ones too!
I must be doing something wrong in my (slightly adapted) code, because I’m seeing odd duplicates in the output. I have two different ways of running: using pool.apply_async() on what I call my task_apply function, or using pool.starmap_async() on what I call my task_starmap function — either way, I see the same odd behavior.
For example, running either way 10 times — whether to a stream handler, a file handler, or a rotating file handler — I always get output that looks like this:
EXAMPLE 1 OUTPUT — TYPICAL/MOST FREQUENT
(Notice there are 14 lines that end with “starting…” when I am expecting only 10, yet when rows with duplicate timestamps are eliminated, the expected 10 “starting…” rows remain.)
EXAMPLE 2 OUTPUT
(Notice there are 15 lines that end with “starting…” when I am expecting only 10, yet when rows with duplicate timestamps are eliminated, only 8 “starting…” rows remain.)
Do you have any idea what’s going on? Is this a basic mistake on my part? Or is it something more, like perhaps a logging oddity? Or it is with multiprocessing?
For reference:
Python version 3.9.5
I tried on both a Linux-based HPC and my MacBook Pro, same result.
When running on HPC, I used set_start_method(“spawn”) because forking resulted in huge output files with tons of duplication
Here’s my code:
Jason Brownlee says
Sorry to hear that you’re having trouble.
I don’t see anything obvious, sorry. Perhaps try posting your code example to StackOverflow.
Off the cuff, when I have had problems of this kind in the past, it has been related to the config of the log infrastructure itself. Perhaps confirm object ids and such in log messages or run a debugger to see where dup log messages are coming from.
Justin Rice says
Thanks for the reply, Jason!
I found out the problem, and it turns out I should have read your documents more closely.
I was running in debug mode in VS code. Running normally fixes the problem.
Works like a charm! And THANK YOU for these blogs!
Justin
Jason Brownlee says
Well done, I’m happy to hear that!
Mirko says
Hi Jason and Justin,
thank you so much for your tutorials and contributions!
I encountered the same issue presented by Justin about duplicated LogRecords.
One way to reproduce it is to set the Pool size to 2 (i.e. one log-worker and exactly one task-worker).
In this way, at every task execution one more replicated LogRecord is shown.
The simplest solution I was able to figure out is to check if the task logger already has some handlers before to add a new one (line 38).
Jason’code, at line 38, is:
# add a handler that uses the shared queue
logger.addHandler(QueueHandler(queue))
My solution is:
# add a handler that uses the shared queue
if not logger.hasHandlers():
logger.addHandler(QueueHandler(queue))
Do you foresee any more principled solution?
Thank you in advance.
Mirko
Jason Brownlee says
Thanks for sharing!
Charini Nanayakkara says
Thank you for your very useful tutorials on Python Multiprocessing!
Jason Brownlee says
You’re very welcome!
Robert Claar says
Thank you so much for your helpful tutorials! For anyone who is still struggling with the logger outputting repeated lines, I fixed this by making sure to input a different name for each logger created instead.
In the main block after we define a manager, we create a logger using
logger = logging.getLogger(f"app")
and we add the handler using the queue we created. Then we create a long running process so our logger can receive all the log messages from the individual processes
_ = pool.apply_async(logger_process, args=(queue,))
and a logger names “app” already exists with a handler and then each logger created in our task also is named “app” and has the same handler and this causes each log message to be sent to the queue multiple times. I’m sorry I can’t explain all the details but using a different name in each part other than “app” for all three worked for me in Linux. Oddly enough the code worked as is without repeat log messages in Windows. Probably due to the OS differences in handling processes.
I hope this helps someone.
Jason Brownlee says
Fascinating, thank you for sharing Robert!