Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bring loggers in sync and add multiproc capabilities #1885

Merged
merged 9 commits into from
Aug 14, 2023
185 changes: 172 additions & 13 deletions _delphi_utils_python/delphi_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,45 @@
"""Structured logger utility for creating JSON logs in Delphi pipelines."""
"""Structured logger utility for creating JSON logs."""

# the Delphi group uses two ~identical versions of this file.
# try to keep them in sync with edits, for sanity.
# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long
# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py

import contextlib
import logging
import multiprocessing
import os
import sys
import threading
from traceback import format_exception

import structlog


def handle_exceptions(logger):
"""Handle exceptions using the provided logger."""
def exception_handler(etype, value, traceback):

def exception_handler(scope, etype, value, traceback):
logger.exception("Top-level exception occurred",
exc_info=(etype, value, traceback))
scope=scope, exc_info=(etype, value, traceback))

def sys_exception_handler(etype, value, traceback):
exception_handler("sys", etype, value, traceback)

def multithread_exception_handler(args):
exception_handler(args.exc_type, args.exc_value, args.exc_traceback)
def threading_exception_handler(args):
if args.exc_type == SystemExit and args.exc_value.code == 0:
# `sys.exit(0)` is considered "successful termination":
# https://docs.python.org/3/library/sys.html#sys.exit
logger.debug("normal thread exit", thread=args.thread,
stack="".join(
format_exception(
args.exc_type, args.exc_value, args.exc_traceback)))
else:
melange396 marked this conversation as resolved.
Show resolved Hide resolved
exception_handler(f"thread: {args.thread}",
args.exc_type, args.exc_value, args.exc_traceback)

sys.excepthook = exception_handler
threading.excepthook = multithread_exception_handler
sys.excepthook = sys_exception_handler
threading.excepthook = threading_exception_handler


def get_structured_logger(name=__name__,
Expand All @@ -40,12 +64,21 @@ def get_structured_logger(name=__name__,
is a good choice.
filename: An (optional) file to write log output.
"""
# Configure the basic underlying logging configuration
# Set the underlying logging configuration
if "LOG_DEBUG" in os.environ:
log_level = logging.DEBUG
else:
log_level = logging.INFO

logging.basicConfig(
format="%(message)s",
level=logging.INFO,
handlers=[logging.StreamHandler()]
)
level=log_level,
handlers=[logging.StreamHandler()])

def add_pid(_logger, _method_name, event_dict):
"""Add current PID to the event dict."""
event_dict["pid"] = os.getpid()
return event_dict

# Configure structlog. This uses many of the standard suggestions from
# the structlog documentation.
Expand All @@ -57,6 +90,8 @@ def get_structured_logger(name=__name__,
structlog.stdlib.add_logger_name,
# Include log level in output.
structlog.stdlib.add_log_level,
# Include PID in output.
add_pid,
# Allow formatting into arguments e.g., logger.info("Hello, %s",
# name)
structlog.stdlib.PositionalArgumentsFormatter(),
Expand All @@ -68,7 +103,7 @@ def get_structured_logger(name=__name__,
# Decode unicode characters
structlog.processors.UnicodeDecoder(),
# Render as JSON
structlog.processors.JSONRenderer()
structlog.processors.JSONRenderer(),
],
# Use a dict class for keeping track of data.
context_class=dict,
Expand All @@ -84,10 +119,134 @@ def get_structured_logger(name=__name__,
system_logger = logging.getLogger(name)
if filename and not system_logger.handlers:
system_logger.addHandler(logging.FileHandler(filename))
system_logger.setLevel(logging.INFO)
system_logger.setLevel(log_level)
logger = structlog.wrap_logger(system_logger)

if log_exceptions:
handle_exceptions(logger)

return logger


melange396 marked this conversation as resolved.
Show resolved Hide resolved
class LoggerThread():
"""
A construct to use a logger from multiprocessing workers/jobs.

the bare structlog loggers are thread-safe but not multiprocessing-safe.
a `LoggerThread` will spawn a thread that listens to a mp.Queue
and logs messages from it with the provided logger,
so other processes can send logging messages to it
via the logger-like `SubLogger` interface.
the SubLogger even logs the pid of the caller.

this is good to use with a set of jobs that are part of a mp.Pool,
but isnt recommended for general use
because of overhead from threading and multiprocessing,
and because it might introduce lag to log messages.

dshemetov marked this conversation as resolved.
Show resolved Hide resolved
somewhat inspired by:
docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
"""

class SubLogger():
"""MP-safe logger-like interface to convey log messages to a listening LoggerThread."""

def __init__(self, queue):
"""Create SubLogger with a bound queue."""
self.queue = queue

def _log(self, level, *args, **kwargs):
kwargs_plus = {'sub_pid': multiprocessing.current_process().pid}
kwargs_plus.update(kwargs)
self.queue.put([level, args, kwargs_plus])

def debug(self, *args, **kwargs):
"""Log a DEBUG level message."""
self._log(logging.DEBUG, *args, **kwargs)

def info(self, *args, **kwargs):
"""Log an INFO level message."""
self._log(logging.INFO, *args, **kwargs)

def warning(self, *args, **kwargs):
"""Log a WARNING level message."""
self._log(logging.WARNING, *args, **kwargs)

def error(self, *args, **kwargs):
"""Log an ERROR level message."""
self._log(logging.ERROR, *args, **kwargs)

def critical(self, *args, **kwargs):
"""Log a CRITICAL level message."""
self._log(logging.CRITICAL, *args, **kwargs)


def get_sublogger(self):
"""Retrieve SubLogger for this LoggerThread."""
return self.sublogger

def __init__(self, logger, q=None):
"""Create and start LoggerThread with supplied logger, creating a queue if not provided."""
self.logger = logger
if q:
self.msg_queue = q
else:
self.msg_queue = multiprocessing.Queue()

def logger_thread_worker():
logger.info('thread started')
while True:
msg = self.msg_queue.get()
if msg == 'STOP':
logger.debug('received stop signal')
break
level, args, kwargs = msg
if level in [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]:
logger.log(level, *args, **kwargs)
else:
logger.error('received unknown logging level! exiting...',
level=level, args_kwargs=(args, kwargs))
break
logger.debug('stopping thread')

self.thread = threading.Thread(target=logger_thread_worker,
name="LoggerThread__"+logger.name)
logger.debug('starting thread')
Copy link
Contributor

@dshemetov dshemetov Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow the purpose of this thread... it seems like it should interact with self.sublogger in some way, no? Could you explain a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thread doesnt really "interact" with the sublogger; the sublogger acts indirectly upon the thread. the thread is spawned in the background with a handle to a queue and a handle to the provided Logger, and it just logs anything that shows up on the queue. the sublogger is used by things outside the thread (or even outside the process!) to enqueue things to be logged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! So verifying that I get it:

  • So msg_queue.get() will block until it receives a message (because no timeout)
  • SubLoggers send messages to that queue that that thread will pick up
  • Eventually, that thread is terminated by the STOP message
  • We let the thread close fully with thread.join below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep!

self.thread.start()

self.sublogger = LoggerThread.SubLogger(self.msg_queue)
self.running = True

def stop(self):
"""Terminate this LoggerThread."""
if not self.running:
self.logger.warning('thread already stopped')
return
self.logger.debug('sending stop signal')
self.msg_queue.put('STOP')
self.thread.join()
self.running = False
self.logger.info('thread stopped')


@contextlib.contextmanager
def pool_and_threadedlogger(logger, *poolargs):
"""
melange396 marked this conversation as resolved.
Show resolved Hide resolved
Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger.

Emulates the multiprocessing.Pool() context manager,
but also provides (via a LoggerThread) a SubLogger proxy to logger
that can be safely used by pool workers.
Also "cleans up" the pool by waiting for workers to complete
melange396 marked this conversation as resolved.
Show resolved Hide resolved
as it exits the context.
"""
with multiprocessing.Manager() as manager:
logger_thread = LoggerThread(logger, manager.Queue())
try:
with multiprocessing.Pool(*poolargs) as pool:
yield pool, logger_thread.get_sublogger()
pool.close()
pool.join()
finally:
logger_thread.stop()
Loading