diff --git a/PILOTVERSION b/PILOTVERSION index 66566d18..2df33a99 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.7.6.8 \ No newline at end of file +3.7.7.3 \ No newline at end of file diff --git a/pilot.py b/pilot.py index 20a3c47d..3bb549ac 100755 --- a/pilot.py +++ b/pilot.py @@ -54,6 +54,7 @@ PILOT_MULTIJOB_START_TIME, ) from pilot.util.cvmfs import ( + cvmfs_diagnostics, is_cvmfs_available, get_last_update ) @@ -123,6 +124,7 @@ def main() -> int: # check cvmfs if available ec = check_cvmfs(logger) if ec: + cvmfs_diagnostics() return ec if not args.rucio_host: diff --git a/pilot/eventservice/esprocess/esmanager.py b/pilot/eventservice/esprocess/esmanager.py index 0c3b868c..53cf4a28 100644 --- a/pilot/eventservice/esprocess/esmanager.py +++ b/pilot/eventservice/esprocess/esmanager.py @@ -18,7 +18,7 @@ # # Authors: # - Wen Guan, wen.guan@cern.ch, 2017 -# - Paul Nilsson, paul.nilsson@cern.ch, 2023 +# - Paul Nilsson, paul.nilsson@cern.ch, 2023-24 """Event Service manager to set up and run ESProcess.""" diff --git a/pilot/user/atlas/cvmfs.py b/pilot/user/atlas/cvmfs.py index 03568b11..9e6e79d2 100644 --- a/pilot/user/atlas/cvmfs.py +++ b/pilot/user/atlas/cvmfs.py @@ -51,3 +51,15 @@ def get_last_update_file() -> str: :return: last update file (str). """ return f'{get_cvmfs_base_path()}/sft.cern.ch/lcg/lastUpdate' + + +def get_cvmfs_diagnostics_commands() -> list: + """ + Return a list of commands to be used for CVMFS diagnostics. + + :return: list of commands (list). + """ + return [ + 'cvmfs_config stat atlas.cern.ch', + f'attr -g revision {get_cvmfs_base_path()}/atlas.cern.ch' + ] diff --git a/pilot/user/sphenix/setup.py b/pilot/user/sphenix/setup.py index 9099f600..1a5ebb9c 100644 --- a/pilot/user/sphenix/setup.py +++ b/pilot/user/sphenix/setup.py @@ -194,7 +194,7 @@ def get_valid_base_urls(order: str = "") -> list: """ valid_base_urls = [] _valid_base_urls = ["https://storage.googleapis.com/drp-us-central1-containers", - "http://pandaserver-doma.cern.ch:25080/trf/user"] + "https://pandaserver-doma.cern.ch/trf/user"] if order: valid_base_urls.append(order) diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 528081f7..f73f723d 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '7' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '6' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '8' # build number should be reset to '1' for every new development cycle +REVISION = '7' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '3' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/cvmfs.py b/pilot/util/cvmfs.py index b2075d42..9358a710 100644 --- a/pilot/util/cvmfs.py +++ b/pilot/util/cvmfs.py @@ -27,6 +27,8 @@ import time import types +from pilot.util.container import execute + logger = logging.getLogger(__name__) @@ -149,3 +151,26 @@ def extract_timestamp(filename: str) -> int: signal.alarm(0) # Disable the alarm return timestamp + + +def cvmfs_diagnostics(): + """Run cvmfs_diagnostics.""" + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() + user = __import__(f'pilot.user.{pilot_user}.cvmfs', globals(), locals(), [pilot_user], 0) + try: + cmds = user.get_cvmfs_diagnostics_commands() + except AttributeError: + logger.warning('get_cvmfs_diagnostics_commands not defined in user cvmfs module') + return + + if cmds: + for cmd in cmds: + timeout = 60 + logger.info(f'running cvmfs diagnostics command using timeout={timeout}s') + exit_code, stdout, stderr = execute(cmd, timeout=timeout) + if exit_code == 0: + logger.info(f'cvmfs diagnostics completed successfully:\n{stdout}') + else: + logger.warning(f'cvmfs diagnostics failed: {stderr}') + else: + logger.warning('cvmfs diagnostics commands not defined in user cvmfs module') diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index 05835d95..d7514545 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -18,17 +18,20 @@ # # Authors: # - Shuwei Ye, yesw@bnl.gov, 2021 -# - Paul Nilsson, paul.nilsson@cern.ch, 2021-23 +# - Paul Nilsson, paul.nilsson@cern.ch, 2021-24 +# - Wen Guan, wen.guan@cern.ch, 2024 + +"""Real-time logger.""" -import os -import time import json -from pilot.util.config import config -from pilot.util.https import cacert -# from pilot.util.proxy import create_cert_files -from pilot.util.transport import HttpTransport -from logging import Logger, INFO import logging +import os +import time + +try: + from fluent import handler as fluent_handler +except ImportError: + pass try: import google.cloud.logging @@ -36,31 +39,46 @@ except ImportError: pass +try: + from logstash_async.handler import AsynchronousLogstashHandler +except ImportError: + pass + +try: + from loki_logger_handler.loki_logger_handler import LokiLoggerHandler +except ImportError: + pass + +from typing import Any + +from pilot.util.config import config +from pilot.util.https import cacert +# from pilot.util.proxy import create_cert_files +from pilot.util.transport import HttpTransport + logger = logging.getLogger(__name__) -def get_realtime_logger(args=None, info_dic=None, workdir=None, secrets=""): +def get_realtime_logger(args: Any = None, info_dic: dict = None, workdir: str = None, secrets: str = ""): """ Helper function for real-time logger. The info_dic dictionary has the format: {'logging_type': .., 'protocol': .., 'url': .., 'port': .., 'logname': ..} - :param args: pilot arguments object. - :param info_dic: info dictionary. - :param workdir: job working directory (string). + :param args: pilot arguments object (Any) + :param info_dic: info dictionary (dict) + :param workdir: job working directory (str) + :param secrets: secrets (str) :return: RealTimeLogger instance (self). """ - if RealTimeLogger.glogger is None: RealTimeLogger(args, info_dic, workdir, secrets) + return RealTimeLogger.glogger def cleanup(): - """ - Clean-up function for external use. - """ - + """Clean-up function for external use.""" logger.debug('attempting real-time logger cleanup') if RealTimeLogger.glogger: RealTimeLogger.glogger.cleanup() @@ -68,7 +86,7 @@ def cleanup(): # RealTimeLogger is called if args.realtimelogger is on -class RealTimeLogger(Logger): +class RealTimeLogger(logging.Logger): """ RealTimeLogger class definition. """ @@ -81,7 +99,7 @@ class RealTimeLogger(Logger): _cacert = "" current_handler = None # needed for removing logger object from outside function - def __init__(self, args, info_dic, workdir, secrets, level=INFO): + def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level: Any = logging.INFO): """ Default init function. @@ -92,16 +110,16 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): 'logname': .., 'logfiles': [..]} - :param args: pilot arguments object. - :param info_dic: info dictionary. - :param workdir: job working directory (string). - :param level: logging level (constant). - :return: + :param args: pilot arguments object (Any) + :param info_dic: info dictionary (dict) + :param workdir: job working directory (str) + :param level: logging level (Any). """ - - super(RealTimeLogger, self).__init__(name="realTimeLogger", level=level) + super().__init__(name="realTimeLogger", level=level) RealTimeLogger.glogger = self + if workdir: # bypass pylint warning - keep workdir for possible future development + pass if not info_dic: logger.warning('info dictionary not set - add \'logging=type:protocol://host:port\' to PQ.catchall)') RealTimeLogger.glogger = None @@ -125,66 +143,61 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): _handler = None - try: - if logtype == "google-cloud-logging": - client = google.cloud.logging.Client() - _handler = CloudLoggingHandler(client, name=name) - api_logger = logging.getLogger('google.cloud.logging_v2') - api_logger.setLevel(INFO) - elif logtype == "fluent": - from fluent import handler - _handler = handler.FluentHandler(name, host=server, port=port) - elif logtype == "logstash": - # from logstash_async.transport import HttpTransport - from logstash_async.handler import AsynchronousLogstashHandler - # from logstash_async.handler import LogstashFormatter - - # certificate method (still in development): - - #certdir = os.environ.get('SSL_CERT_DIR', '') - #path = os.path.join(certdir, "CERN-GridCA.pem") - #crt, key = create_cert_files(workdir) - #if not crt or not key: - # logger.warning('failed to create crt/key') - # _handler = None - # return - #transport = HttpTransport( - # server, - # port, - # timeout=5.0, - # ssl_enable=True, - # ssl_verify=path, - # cert=(crt, key) - #) - - # login+password method: - if isinstance(secrets, str): - secrets = json.loads(secrets) - - ssl_enable, ssl_verify = self.get_rtlogging_ssl() - transport = HttpTransport( - server, - port, - ssl_enable=ssl_enable, - ssl_verify=ssl_verify, - timeout=5.0, - username=secrets.get('logstash_login', 'unknown_login'), - password=secrets.get('logstash_password', 'unknown_password') - ) - - # create the handler - _handler = AsynchronousLogstashHandler( - server, - port, - transport=transport, - database_path='logstash_test.db' - ) - - else: - logger.warning(f'unknown logtype: {logtype}') - _handler = None - except (ModuleNotFoundError, ImportError) as exc: - logger.warning(f'exception caught while setting up log handlers: {exc}') + if logtype == "google-cloud-logging": + client = google.cloud.logging.Client() + _handler = CloudLoggingHandler(client, name=name) + api_logger = logging.getLogger('google.cloud.logging_v2') + api_logger.setLevel(logger.INFO) + elif logtype == "fluent": + _handler = fluent_handler.FluentHandler(name, host=server, port=port) + elif logtype == "logstash": + # from logstash_async.transport import HttpTransport + # from logstash_async.handler import LogstashFormatter + + # certificate method (still in development): + + #certdir = os.environ.get('SSL_CERT_DIR', '') + #path = os.path.join(certdir, "CERN-GridCA.pem") + #crt, key = create_cert_files(workdir) + #if not crt or not key: + # logger.warning('failed to create crt/key') + # _handler = None + # return + #transport = HttpTransport( + # server, + # port, + # timeout=5.0, + # ssl_enable=True, + # ssl_verify=path, + # cert=(crt, key) + #) + + # login+password method: + if isinstance(secrets, str): + secrets = json.loads(secrets) + + ssl_enable, ssl_verify = self.get_rtlogging_ssl() + transport = HttpTransport( + server, + port, + ssl_enable=ssl_enable, + ssl_verify=ssl_verify, + timeout=5.0, + username=secrets.get('logstash_login', 'unknown_login'), + password=secrets.get('logstash_password', 'unknown_password') + ) + + # create the handler + _handler = AsynchronousLogstashHandler( + server, + port, + transport=transport, + database_path='logstash_test.db' + ) + elif logtype == 'loki': + _handler = self.setup_loki_handler() + else: + logger.warning(f'unknown logtype: {logtype}') _handler = None if _handler is not None: @@ -194,11 +207,42 @@ def __init__(self, args, info_dic, workdir, secrets, level=INFO): RealTimeLogger.glogger = None del self - def cleanup(self): - """ - Clean-up. - """ + def setup_loki_handler(self): + """Setup the Loki logger handler.""" + loki_labels = {'application': 'PanDA_Pilot', 'envirnment': 'Production'} + try: + labels = os.environ.get('LOKI_LABELS', {}) + if labels: + labels = json.loads(labels) + loki_labels = labels + except Exception as ex: + logger.warning(f'failed to load LOKI_LABELS from environment: {ex}') + + loki_labelkeys = ['application', 'environment'] + try: + labelkeys = os.environ.get('LOKI_LABELKEYS', {}) + if labelkeys: + labelkeys = json.loads(labelkeys) + loki_labelkeys = labelkeys + except Exception as ex: + logger.warning(f'failed to load LOKI_LABELKEYS from environment: {ex}') + + try: + loki_period = int(os.environ.get('LOKI_PERIOD', 30)) + except Exception as ex: + logger.warning(f'failed to load LOKI_PERIOD from environment: {ex}') + loki_period = 30 + + _handler = LokiLoggerHandler( + url=os.environ["LOKI_URL"], + labels=loki_labels, + labelKeys=loki_labelkeys, + timeout=loki_period + ) + return _handler + def cleanup(self): + """Clean-up.""" # close open files, if anything is still open self.close_files() @@ -211,7 +255,12 @@ def cleanup(self): RealTimeLogger.glogger = None del self - def set_jobinfo(self, job): + def set_jobinfo(self, job: Any): + """ + Set job info. + + :param job: job object (Any). + """ self.jobinfo = {"TaskID": job.taskid, "PandaJobID": job.jobid} if 'HARVESTER_WORKER_ID' in os.environ: self.jobinfo["Harvester_WorkerID"] = os.environ.get('HARVESTER_WORKER_ID') @@ -234,7 +283,13 @@ def send_with_jobinfo(self, msg): self.info(logobj) - def add_logfiles(self, job_or_filenames, reset=True): + def add_logfiles(self, job_or_filenames: Any or list, reset: bool = True): + """ + Add log files. + + :param job_or_filenames: job object or list of log file names (Any or list) + :param reset: reset the log files (bool). + """ self.close_files() if reset: self.logfiles = [] @@ -256,6 +311,7 @@ def add_logfiles(self, job_or_filenames, reset=True): logger.info(f'added log files: {self.logfiles}') def close_files(self): + """Close files.""" for openfile in self.openfiles.values(): if openfile is not None: openfile.close() @@ -263,13 +319,20 @@ def close_files(self): self.logfiles = [] def send_loginfiles(self): + """Send log files.""" for openfile in self.openfiles.values(): if openfile is not None: lines = openfile.readlines() for line in lines: self.send_with_jobinfo(line.strip()) - def sending_logs(self, args, job): + def sending_logs(self, args: Any, job: Any): + """ + Send logs. + + :param args: pilot arguments object (Any) + :param job: job object (Any). + """ logger.info('starting RealTimeLogger.sending_logs') self.set_jobinfo(job) self.add_logfiles(job) @@ -281,21 +344,21 @@ def sending_logs(self, args, job): if i % 10 == 1: logger.debug(f'RealTimeLogger iteration #{i} (job state={job.state}, logfiles={self.logfiles})') # there might be special cases when RT logs should be sent, e.g. for pilot logs - if job.state == '' or job.state == 'starting' or job.state == 'running': + if job.state in {'', 'starting', 'running'}: if len(self.logfiles) > len(self.openfiles): for logfile in self.logfiles: if logfile not in self.openfiles: if os.path.exists(logfile): - openfile = open(logfile) - openfile.seek(0) - self.openfiles[logfile] = openfile - logger.debug(f'opened logfile: {logfile}') + openfile = open(logfile, encoding='utf-8') + if openfile: + openfile.seek(0) + self.openfiles[logfile] = openfile + logger.debug(f'opened logfile: {logfile}') # logger.debug(f'real-time logging: sending logs for state={job.state} [1]') self.send_loginfiles() - elif job.state == 'stagein' or job.state == 'stageout': + elif job.state in {'stagein', 'stageout'}: logger.debug('no real-time logging during stage-in/out') - pass else: # run longer for pilotlog # wait for job.completed=True, for a maximum of N minutes @@ -325,7 +388,7 @@ def get_rtlogging_ssl(self): pilot_user = os.environ.get('PILOT_USER', 'generic').lower() try: - user = __import__('pilot.user.%s.common' % pilot_user, globals(), locals(), [pilot_user], 0) + user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) ssl_enable, ssl_verify = user.get_rtlogging_ssl() except Exception: ssl_enable = config.Pilot.ssl_enable