Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bring loggers in sync and add multiproc capabilities #1885

Merged
merged 9 commits into from
Aug 14, 2023
Merged

Conversation

melange396
Copy link
Contributor

this does a few things, including:

  • bringing the logger.py files from https://github.com/cmu-delphi/delphi-epidata/ and https://github.com/cmu-delphi/covidcast-indicators/ back in sync. (there will be equivalent PRs in both repositories.)

  • larger functional differences are now applied to both:

    • the version in covidcast-indicators didnt log PIDs
    • nor did it have the env var trick to get debug-level output
    • the version in delphi-epidata didnt always respect output filenames properly (though it seems we didnt actually trigger this in practice; look at the collapsible section below for how to see this for yourself).
  • differentiated the uncaught exception handlers a bit

    • and reduced the log level for messages from threads exiting cleanly. this is mostly because multiprocessing.Manager processes start and stop threads often.
  • added multiprocess-safe logging constructs, which should be helpful in Add multiprocessing to the Quidel indicator #1881 and in the future. (see collapsible section below for sample usage)


to reproduce bad outfile behavior:
  • first, in your shell:
curl https://raw.githubusercontent.com/cmu-delphi/covidcast-indicators/main/_delphi_utils_python/delphi_utils/logger.py -o ci_logger.py

curl https://raw.githubusercontent.com/cmu-delphi/delphi-epidata/dev/src/common/logger.py -o de_logger.py
  • then follow these instructions: toggle the commented-out-ness of the imports below, remove files foo.txt and bar.txt, paste this into your python3 interpreter, check those files, and repeat:
import de_logger as a_logger
#import ci_logger as a_logger

foo = a_logger.get_structured_logger('foo', filename='foo.txt')
bar = a_logger.get_structured_logger('bar', filename='bar.txt')
foo.info("hi")
bar.info("oh hello")
two example usages of multiprocessing-compatible logger:
  • this approach uses multiprocessing directly, requires explicit termination of the LoggerThread (after ensuring the pool jobs have completed), and requires the sublogger to be instantiated before the worker and bound in an accessible scope (it must be used as though it is local to the worker, it cannot be passed as an argument to pool methods)
import multiprocessing
from delphi.epidata.common.logger import get_structured_logger, LoggerThread

logger = get_structured_logger('sample_1')

lt = LoggerThread(logger)
sl = lt.get_sublogger()
sl.info("test")

def worker_1(message):
  # do something here
  sl.info(message)
  # do somethine else here

num_processes = 5
with multiprocessing.Pool(num_processes) as pool:
  pool.apply_async(worker_1, args=["abcd"])
  pool.apply_async(worker_1, args=["efgh"])
  pool.close()
  pool.join()

lt.stop()
  • this approach uses the context manager to provide a pool and logger, and the logger can be used in arguments to pool methods. this is a little bit less lightweight (the multiprocessing.Manager() it uses will spawn its own subprocess and a number of threads within it) but it is cleaner and more straightforward.
from delphi.epidata.common.logger import get_structured_logger, pool_and_threadedlogger

logger = get_structured_logger('sample_2')

def worker_2(message, sublogger):
  # do something here
  sublogger.info(message)
  # do somethine else here

num_processes = 5  # optional argument passed to Pool constructor
with pool_and_threadedlogger(logger, num_processes) as (pool, sublogger):
  pool.apply_async(worker_2, args=["asdf", sublogger])
  pool.apply_async(worker_2, args=["jkl;", sublogger])
  pool.apply_async(worker_2, args=["qwerty", sublogger])

@melange396
Copy link
Contributor Author

paired PR: cmu-delphi/delphi-epidata#1254

Copy link
Contributor

@rzats rzats left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed this PR rather than the epidata one since it has a little more commits. Looks good with a few small caveats!

_delphi_utils_python/delphi_utils/logger.py Show resolved Hide resolved
_delphi_utils_python/delphi_utils/logger.py Outdated Show resolved Hide resolved
_delphi_utils_python/delphi_utils/logger.py Show resolved Hide resolved
_delphi_utils_python/delphi_utils/logger.py Show resolved Hide resolved
Copy link
Contributor

@dshemetov dshemetov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly have some questions and want to understand the multiprocessing stuff better

_delphi_utils_python/delphi_utils/logger.py Show resolved Hide resolved
if level == logging.INFO:
logger.info(*args, **kwargs)
if level == logging.WARN:
logger.warn(*args, **kwargs)
Copy link
Contributor

@dshemetov dshemetov Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need debug or exception (or others) here too? Seems like a situation where sublogger needs to inherit from Logger, but instead of writing to a file it needs to send a message to that queue. Otherwise, we need to hard duplicate Logger's interface or make sure downstream users know that multiprocessing subloggers != regular loggers. Am I reading this right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only need the methods we use, and right now, nobody is using debug or exception. i had a TODO here to make it more obvious to anyone who came looking that they could add their own, but the current pylint settings disallowed that...

this is sort of a proxy or facade for a Logger, and it doesnt need to duplicate the whole interface. someone using a SubLogger got there by providing an existing Logger object in the first place, so hopefully they have some idea that its still wrapped up in there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...i caved and added some other levels

logger.debug('stopping thread')

self.thread = threading.Thread(target=logger_thread_worker,
name="LoggerThread__"+logger.name)
Copy link
Contributor

@dshemetov dshemetov Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow the purpose of this thread... it seems like it should interact with self.sublogger in some way, no? Could you explain a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the thread doesnt really "interact" with the sublogger; the sublogger acts indirectly upon the thread. the thread is spawned in the background with a handle to a queue and a handle to the provided Logger, and it just logs anything that shows up on the queue. the sublogger is used by things outside the thread (or even outside the process!) to enqueue things to be logged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! So verifying that I get it:

  • So msg_queue.get() will block until it receives a message (because no timeout)
  • SubLoggers send messages to that queue that that thread will pick up
  • Eventually, that thread is terminated by the STOP message
  • We let the thread close fully with thread.join below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep!

Copy link
Contributor

@dshemetov dshemetov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One doc suggestion, but otherwise lgtm!

_delphi_utils_python/delphi_utils/logger.py Show resolved Hide resolved
@melange396 melange396 merged commit 70e0263 into main Aug 14, 2023
13 checks passed
@melange396 melange396 deleted the sync_loggers branch August 14, 2023 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants