Skip to content

Commit

Permalink
Fixup logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
DinoBektesevic committed Aug 30, 2024
1 parent 1a834a5 commit 9e373b6
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 100 deletions.
31 changes: 15 additions & 16 deletions src/kbmod_wf/single_chip_workflow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import argparse
import os

import toml
import parsl
from parsl import python_app, File
import parsl.executors

from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config
from kbmod_wf.utilities.executor_utilities import get_executors
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.utilities import (
apply_runtime_updates,
get_resource_config,
get_executors,
get_configured_logger,
)

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search

Expand All @@ -21,26 +25,21 @@
ignore_for_cache=["logging_file"],
)
def reproject_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.reproject_single_chip_single_night_wu import reproject_wu
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = configure_logger("task.reproject_wu", logging_file.filepath)
logger = get_configured_logger("task.reproject_wu", logging_file.filepath)

from kbmod_wf.task_impls.reproject_single_chip_single_night_wu import reproject_wu

logger.info("Starting reproject_ic")
try:
with ErrorLogger(logger):
reproject_wu(
original_wu_filepath=inputs[0].filepath,
reprojected_wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running reproject_ic: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed reproject_ic")

logger.info("Completed reproject_ic")
return outputs[0]


Expand All @@ -62,8 +61,8 @@ def workflow_runner(env=None, runtime_config={}):

dfk = parsl.load(resource_config)
if dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))
logger = configure_logger("workflow.workflow_runner", logging_file.filepath)
logging_file = File(os.path.join(dfk.run_dir, "kbmod.log"))
logger = get_configured_logger("workflow.workflow_runner", logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")
Expand Down
1 change: 0 additions & 1 deletion src/kbmod_wf/task_impls/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def __init__(
self.wu_filepath = wu_filepath
self.runtime_config = runtime_config
self.logger = logger
kbmod._logging.basicConfig(level=self.logger.level)

self.overwrite = self.runtime_config.get("overwrite", False)
self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)
Expand Down
1 change: 0 additions & 1 deletion src/kbmod_wf/task_impls/kbmod_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def __init__(
self.runtime_config = runtime_config
self.result_filepath = result_filepath
self.logger = logger
kbmod._logging.basicConfig(level=self.logger.level)

self.search_config_filepath = self.runtime_config.get("search_config_filepath", None)
self.results_directory = os.path.dirname(self.result_filepath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def __init__(
self.reprojected_wu_filepath = reprojected_wu_filepath
self.runtime_config = runtime_config
self.logger = logger
kbmod._logging.basicConfig(level=self.logger.level)

# Default to 8 workers if not in the config. Value must be 0<num workers<65.
self.n_workers = max(1, min(self.runtime_config.get("n_workers", 8), 64))
Expand All @@ -68,16 +67,14 @@ def reproject_workunit(self):
directory_containing_shards, wu_filename = os.path.split(self.original_wu_filepath)
wu = WorkUnit.from_sharded_fits(wu_filename, directory_containing_shards, lazy=True)
elapsed = round(time.time() - last_time, 1)
self.logger.debug(
f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}."
)
self.logger.info(f"Required {elapsed}[s] to lazy read original WorkUnit {self.original_wu_filepath}.")

directory_containing_reprojected_shards, reprojected_wu_filename = os.path.split(
self.reprojected_wu_filepath
)

# Reproject to a common WCS using the WCS for our patch
self.logger.debug(f"Reprojecting WorkUnit with {self.n_workers} workers...")
self.logger.info(f"Reprojecting WorkUnit with {self.n_workers} workers...")
last_time = time.time()

opt_wcs, shape = find_optimal_celestial_wcs(list(wu._per_image_wcs))
Expand All @@ -92,6 +89,6 @@ def reproject_workunit(self):
)

elapsed = round(time.time() - last_time, 1)
self.logger.debug(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")
self.logger.info(f"Required {elapsed}[s] to create the sharded reprojected WorkUnit.")

return self.reprojected_wu_filepath
4 changes: 1 addition & 3 deletions src/kbmod_wf/utilities/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from .configuration_utilities import get_resource_config, apply_runtime_updates
from .executor_utilities import get_executors
from .logger_utilities import configure_logger
from .logger_utilities import *
from .memoization_utilities import id_for_memo_file

__all__ = ["apply_runtime_updates", "get_resource_config", "get_executors", "configure_logger"]
106 changes: 86 additions & 20 deletions src/kbmod_wf/utilities/logger_utilities.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,93 @@
DEFAULT_FORMAT = (
"%(created)f %(asctime)s %(processName)s-%(process)d "
"%(threadName)s-%(thread)d %(name)s:%(lineno)d %(funcName)s %(levelname)s: "
"%(message)s"
)
import traceback
import logging
from logging import config

__all__ = ["LOGGING_CONFIG", "get_configured_logger", "ErrorLogger"]

def configure_logger(name, file_path):

LOGGING_CONFIG = {
"version": 1.0,
"formatters": {
"standard": {
"format": (
"[%(processName)s-%(process)d %(threadName)s-%(thread)d "
"%(asctime)s %(levelname)s %(name)s] %(message)s"
),
},
},
"handlers": {
"stdout": {
"level": "INFO",
"formatter": "standard",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
"stderr": {
"level": "INFO",
"formatter": "standard",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
},
"file": {
"level": "INFO",
"formatter": "standard",
"class": "logging.FileHandler",
"filename": "parsl.log",
},
},
"loggers": {
"task": {"level": "INFO", "handlers": ["file", "stdout"], "propagate": False},
"task.create_manifest": {},
"task.ic_to_wu": {},
"task.reproject_wu": {},
"task.kbmod_search": {},
"kbmod": {"level": "INFO", "handlers": ["file", "stdout"], "propagate": False},
},
}
"""Default logging configuration for Parsl."""


def get_configured_logger(logger_name, file_path=None):
"""Configure logging to output to the given file.
Parameters
----------
logger_name : `str`
Name of the created logger instance.
file_path : `str` or `None`, optional
Path to the log file, if any
"""
Simple function that will create a logger object and configure it to write
to a file at the specified path.
Note: We import logging within the function because we expect this to be
called within a parsl app."""
logconf = LOGGING_CONFIG.copy()
if file_path is not None:
logconf["handlers"]["file"]["filename"] = file_path
config.dictConfig(logconf)
logger = logging.getLogger()

import logging
return logging.getLogger(logger_name)


class ErrorLogger:
"""Logs received errors before re-raising them.
Parameters
----------
logger : `logging.Logger`
Logger instance that will be used to log the error.
silence_errors : `bool`, optional
Errors are not silenced by default but re-raised.
Set this to `True` to silence errors.
"""

if name in logging.Logger.manager.loggerDict:
return logging.Logger.manager.loggerDict[name]
def __init__(self, logger, silence_errors=False):
self.logger = logger
self.silence_errors = silence_errors

logger = logging.getLogger(name)
handler = logging.FileHandler(file_path)
formatter = logging.Formatter(DEFAULT_FORMAT, datefmt="%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
def __enter__(self):
return self

return logger
def __exit__(self, exc, value, tb):
if exc is not None:
msg = traceback.format_exception(exc, value, tb)
msg = "".join(msg)
self.logger.error(msg)
return self.silence_errors
5 changes: 3 additions & 2 deletions src/kbmod_wf/workflow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import argparse
import os
import toml

import parsl
from parsl import File
import parsl.executors

from kbmod_wf.utilities.configuration_utilities import apply_runtime_updates, get_resource_config
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.utilities.logger_utilities import get_configured_logger

from kbmod_wf.workflow_tasks import create_manifest, ic_to_wu, kbmod_search, reproject_wu, uri_to_ic

Expand All @@ -30,7 +31,7 @@ def workflow_runner(env=None, runtime_config={}):
dfk = parsl.load(resource_config)
if dfk:
logging_file = File(os.path.join(dfk.run_dir, "parsl.log"))
logger = configure_logger("workflow.workflow_runner", logging_file.filepath)
logger = get_configured_logger(logging_file.filepath)

if runtime_config is not None:
logger.info(f"Using runtime configuration definition:\n{toml.dumps(runtime_config)}")
Expand Down
2 changes: 0 additions & 2 deletions src/kbmod_wf/workflow_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,3 @@
from .kbmod_search import kbmod_search
from .reproject_wu import reproject_wu
from .uri_to_ic import uri_to_ic

__all__ = ["create_manifest", "ic_to_wu", "kbmod_search", "reproject_wu", "uri_to_ic"]
6 changes: 3 additions & 3 deletions src/kbmod_wf/workflow_tasks/create_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ def create_manifest(inputs=(), outputs=(), runtime_config={}, logging_file=None)
ValueError
If the staging_directory is not provided in the runtime_config.
"""

import glob
import os
import shutil
from kbmod_wf.utilities.logger_utilities import configure_logger

logger = configure_logger("task.create_manifest", logging_file.filepath)
from kbmod_wf.utilities.logger_utilities import get_configured_logger

logger = get_configured_logger("task.create_manifest", logging_file.filepath)

directory_path = runtime_config.get("staging_directory")
output_path = runtime_config.get("output_directory")
Expand Down
22 changes: 6 additions & 16 deletions src/kbmod_wf/workflow_tasks/ic_to_wu.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,21 @@ def ic_to_wu(inputs=(), outputs=(), runtime_config={}, logging_file=None):
-------
parsl.File
The file object that points to the WorkUnit file that was created.
Raises
------
e
Reraises any exceptions that occur during the execution of the ic_to_wu
function.
"""
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.ic_to_wu import ic_to_wu
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = configure_logger("task.ic_to_wu", logging_file.filepath)
logger = get_configured_logger("task.ic_to_wu", logging_file)

from kbmod_wf.task_impls.ic_to_wu import ic_to_wu

logger.info("Starting ic_to_wu")
try:
with ErrorLogger(logger):
ic_to_wu(
ic_filepath=inputs[0].filepath,
wu_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running ic_to_wu: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed ic_to_wu")
logger.info("Completed ic_to_wu")

return outputs[0]
30 changes: 10 additions & 20 deletions src/kbmod_wf/workflow_tasks/kbmod_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,36 @@ def kbmod_search(inputs=(), outputs=(), runtime_config={}, logging_file=None):
Parameters
----------
inputs : tuple, optional
inputs : `tuple`, optional
A tuple with a single parsl.File object that references the WorkUnit file
to be searched, by default ()
outputs : tuple, optional
outputs : `tuple`, optional
A tuple with a single parsl.File object that references the results output
file, by default ()
runtime_config : dict, optional
runtime_config : `dict`, optional
A dictionary of configuration setting specific to this task, by default {}
logging_file : parsl.File, optional
The parsl.File object the defines where the logs are written, by default None
Returns
-------
parsl.File
output : `parsl.File`
The file object that points to the search results file that was created.
Raises
------
e
Reraises any exceptions that occur during the execution of the kbmod_search
function.
"""
import traceback
from kbmod_wf.utilities.logger_utilities import configure_logger
from kbmod_wf.task_impls.kbmod_search import kbmod_search
from kbmod_wf.utilities.logger_utilities import get_configured_logger, ErrorLogger

logger = configure_logger("task.kbmod_search", logging_file.filepath)
logger = get_configured_logger("task.kbmod_search", logging_file)

from kbmod_wf.task_impls.kbmod_search import kbmod_search

logger.info("Starting kbmod_search")
try:
with ErrorLogger(logger):
kbmod_search(
wu_filepath=inputs[0].filepath,
result_filepath=outputs[0].filepath,
runtime_config=runtime_config,
logger=logger,
)
except Exception as e:
logger.error(f"Error running kbmod_search: {e}")
logger.error(traceback.format_exc())
raise e
logger.warning("Completed kbmod_search")
logger.info("Completed kbmod_search")

return outputs[0]
Loading

0 comments on commit 9e373b6

Please sign in to comment.