diff --git a/docs/source/DeveloperGuide/AddingNewComponents/Utilities/gLogger/gLogger/Advanced/index.rst b/docs/source/DeveloperGuide/AddingNewComponents/Utilities/gLogger/gLogger/Advanced/index.rst index 538ec77cb68..c754041aff8 100644 --- a/docs/source/DeveloperGuide/AddingNewComponents/Utilities/gLogger/gLogger/Advanced/index.rst +++ b/docs/source/DeveloperGuide/AddingNewComponents/Utilities/gLogger/gLogger/Advanced/index.rst @@ -217,6 +217,68 @@ This option can not be modified in the children of *gLogger*, even by *gLogger* itself after the configuration, so the children receive the *gLogger* configuration. +Add variables to different *Logging* objects depending on the context +--------------------------------------------------------------------- + +In complex cases, it can be useful to have loggers that change depending on +the execution context, without having to pass logger instances explicitly +through multiple layers of function calls. + +Python's `contextvars` module provides context-local storage, which can be used +to store and retrieve context-specific data, such as logger instances. + +gLogger supports the use of context variables to manage loggers in a flexible way. + +Provide a Context Logger +~~~~~~~~~~~~~~~~~~~~~~~~ + +When you have a *Logging* instance that you want to use in a specific context, +you can set it in the context variable: + +:: + + # Create a logger instance + logger = gLogger.getSubLogger("MyContextLogger") + + # Set it in the context variable + contextLogger.set(logger) + +Then, the instances within the context block will use the shared *Logging* object +set in the context variable: + +:: + + with setContextLogger(contextualLogger): + # Any logging within this block will use contextualLogger + obj = MyClass() + obj.do_something() # This will use contextualLogger + +Consume a Context Logger +~~~~~~~~~~~~~~~~~~~~~~~~ + +In functions or classes that need to log messages, you can retrieve the logger +from the context variable: + +:: + + class MyClass: + def __init__(self): + # Get the default logger if no context logger is set + self._defaultLogger = gLogger.getSubLogger("MyClass") + + @property + def log(self): + # Return the context logger if set, otherwise the default logger + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value): + # Optionally, allow setting a new default logger + self._defaultLogger = value + + def do_something(self): + self.log.notice("Doing something") + Some examples and summaries --------------------------- diff --git a/src/DIRAC/FrameworkSystem/Client/Logger.py b/src/DIRAC/FrameworkSystem/Client/Logger.py index 864883ac866..7b48d9b49ee 100755 --- a/src/DIRAC/FrameworkSystem/Client/Logger.py +++ b/src/DIRAC/FrameworkSystem/Client/Logger.py @@ -1,3 +1,4 @@ +from DIRAC.FrameworkSystem.private.standardLogging.LoggingContext import contextLogger, setContextLogger from DIRAC.FrameworkSystem.private.standardLogging.LoggingRoot import LoggingRoot gLogger = LoggingRoot() @@ -5,3 +6,6 @@ def getLogger(): return gLogger + + +__all__ = ["contextLogger", "setContextLogger", "getLogger"] diff --git a/src/DIRAC/FrameworkSystem/private/standardLogging/LoggingContext.py b/src/DIRAC/FrameworkSystem/private/standardLogging/LoggingContext.py new file mode 100644 index 00000000000..1e16ffdb3df --- /dev/null +++ b/src/DIRAC/FrameworkSystem/private/standardLogging/LoggingContext.py @@ -0,0 +1,16 @@ +""" Logging context module""" + +# Context variable for the logger (adapted to the request of the pilot reference) +import contextvars +from contextlib import contextmanager + +contextLogger = contextvars.ContextVar("Logger", default=None) + + +@contextmanager +def setContextLogger(logger_name): + token = contextLogger.set(logger_name) + try: + yield + finally: + contextLogger.reset(token) diff --git a/src/DIRAC/FrameworkSystem/private/standardLogging/test/Test_Logging_ContextVars.py b/src/DIRAC/FrameworkSystem/private/standardLogging/test/Test_Logging_ContextVars.py new file mode 100644 index 00000000000..c6dd5811e66 --- /dev/null +++ b/src/DIRAC/FrameworkSystem/private/standardLogging/test/Test_Logging_ContextVars.py @@ -0,0 +1,71 @@ +""" Test the context variable logger """ + +from DIRAC import gLogger +from DIRAC.FrameworkSystem.private.standardLogging.Logging import Logging +from DIRAC.FrameworkSystem.private.standardLogging.test.TestLogUtilities import gLoggerReset +from DIRAC.FrameworkSystem.private.standardLogging.LoggingContext import contextLogger, setContextLogger + + +class A: + def __init__(self): + # Get the logger from the context variable + self._defaultLogger = gLogger.getSubLogger("A") + + # Use a property to get and set the logger, this is necessary to use the context variable + @property + def log(self): + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value: Logging): + self._defaultLogger = value + + def do_something(self): + self.log.notice("A is doing something") + + +class B: + def __init__(self, a: A, pilotRef: str = None): + self.a = A() + + # Get the logger from the context variable + if pilotRef: + self.log = gLogger.getLocalSubLogger(f"[{pilotRef}]B") + contextLogger.set(self.log) + else: + self.log = gLogger.getSubLogger("B") + + def do_something_else(self): + with setContextLogger(self.log): + self.a.do_something() + self.log.notice("B is doing something else") + + +def test_contextvar_logger(): + capturedBackend, log, sublog = gLoggerReset() + + # Create an instance of A + a = A() + + # Create an instance of B and call its method without setting the pilotRef + # Log signature coming from A and B should be different + b1 = B(a) + b1.do_something_else() + assert "Framework/B NOTICE: A is doing something" in capturedBackend.getvalue() + assert "Framework/B NOTICE: B is doing something else" in capturedBackend.getvalue() + + # Create an instance of B and call its method with setting the pilotRef + # Log signature coming from A and B should be similar because of the pilotRef + capturedBackend.truncate(0) + + b2 = B(a, "pilotRef") + b2.do_something_else() + assert "Framework/[pilotRef]B NOTICE: A is doing something" in capturedBackend.getvalue() + assert "Framework/[pilotRef]B NOTICE: B is doing something else" in capturedBackend.getvalue() + + # Now we check that the logger of b1 is not the same as the logger of b2 (b1 should still use its own logger) + capturedBackend.truncate(0) + + b1.do_something_else() + assert "Framework/B NOTICE: A is doing something" in capturedBackend.getvalue() + assert "Framework/B NOTICE: B is doing something else" in capturedBackend.getvalue() diff --git a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py index 33fd28b0e95..0f2443f66e8 100644 --- a/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/Matcher.py @@ -4,20 +4,19 @@ """ import time -from DIRAC import gLogger, convertToPy3VersionNumber - -from DIRAC.Core.Utilities.PrettyPrint import printDict -from DIRAC.Core.Security import Properties +from DIRAC import convertToPy3VersionNumber, gLogger from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.Core.Security import Properties +from DIRAC.Core.Utilities.PrettyPrint import printDict +from DIRAC.FrameworkSystem.Client.Logger import setContextLogger +from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus +from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus from DIRAC.WorkloadManagementSystem.Client.Limiter import Limiter -from DIRAC.WorkloadManagementSystem.Client import PilotStatus -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, singleValueDefFields, multiValueMatchFields -from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB -from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus +from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB, multiValueMatchFields, singleValueDefFields class PilotVersionError(Exception): @@ -52,11 +51,7 @@ def __init__(self, pilotAgentsDB=None, jobDB=None, tqDB=None, jlDB=None, opsHelp self.opsHelper = Operations() if pilotRef: - self.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher") - self.pilotAgentsDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher") - self.jobDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher") - self.tqDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher") - self.jlDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher") + self.log = gLogger.getLocalSubLogger(f"[{pilotRef}]Matcher") else: self.log = gLogger.getSubLogger("Matcher") @@ -66,86 +61,86 @@ def __init__(self, pilotAgentsDB=None, jobDB=None, tqDB=None, jlDB=None, opsHelp def selectJob(self, resourceDescription, credDict): """Main job selection function to find the highest priority job matching the resource capacity""" + with setContextLogger(self.log): + startTime = time.time() + + resourceDict = self._getResourceDict(resourceDescription, credDict) + + # Make a nice print of the resource matching parameters + toPrintDict = dict(resourceDict) + if "MaxRAM" in resourceDescription: + toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"] + if "NumberOfProcessors" in resourceDescription: + toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"] + toPrintDict["Tag"] = [] + if "Tag" in resourceDict: + for tag in resourceDict["Tag"]: + if not tag.endswith("GB") and not tag.endswith("Processors"): + toPrintDict["Tag"].append(tag) + if not toPrintDict["Tag"]: + toPrintDict.pop("Tag") + self.log.info("Resource description for matching", printDict(toPrintDict)) + + negativeCond = self.limiter.getNegativeCondForSite(resourceDict["Site"], resourceDict.get("GridCE")) + result = self.tqDB.matchAndGetJob(resourceDict, negativeCond=negativeCond) - startTime = time.time() - - resourceDict = self._getResourceDict(resourceDescription, credDict) - - # Make a nice print of the resource matching parameters - toPrintDict = dict(resourceDict) - if "MaxRAM" in resourceDescription: - toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"] - if "NumberOfProcessors" in resourceDescription: - toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"] - toPrintDict["Tag"] = [] - if "Tag" in resourceDict: - for tag in resourceDict["Tag"]: - if not tag.endswith("GB") and not tag.endswith("Processors"): - toPrintDict["Tag"].append(tag) - if not toPrintDict["Tag"]: - toPrintDict.pop("Tag") - self.log.info("Resource description for matching", printDict(toPrintDict)) - - negativeCond = self.limiter.getNegativeCondForSite(resourceDict["Site"], resourceDict.get("GridCE")) - result = self.tqDB.matchAndGetJob(resourceDict, negativeCond=negativeCond) - - if not result["OK"]: - raise RuntimeError(result["Message"]) - result = result["Value"] - if not result["matchFound"]: - self.log.info("No match found") - return {} - - jobID = result["jobId"] - resAtt = self.jobDB.getJobAttributes(jobID, ["OwnerDN", "OwnerGroup", "Status"]) - if not resAtt["OK"]: - raise RuntimeError("Could not retrieve job attributes") - if not resAtt["Value"]: - raise RuntimeError("No attributes returned for job") - if not resAtt["Value"]["Status"] == "Waiting": - self.log.error("Job matched by the TQ is not in Waiting state", str(jobID)) - result = self.tqDB.deleteJob(jobID) if not result["OK"]: raise RuntimeError(result["Message"]) - raise RuntimeError(f"Job {str(jobID)} is not in Waiting state") + result = result["Value"] + if not result["matchFound"]: + self.log.info("No match found") + return {} + + jobID = result["jobId"] + resAtt = self.jobDB.getJobAttributes(jobID, ["OwnerDN", "OwnerGroup", "Status"]) + if not resAtt["OK"]: + raise RuntimeError("Could not retrieve job attributes") + if not resAtt["Value"]: + raise RuntimeError("No attributes returned for job") + if not resAtt["Value"]["Status"] == "Waiting": + self.log.error("Job matched by the TQ is not in Waiting state", str(jobID)) + result = self.tqDB.deleteJob(jobID) + if not result["OK"]: + raise RuntimeError(result["Message"]) + raise RuntimeError(f"Job {str(jobID)} is not in Waiting state") - self._reportStatus(resourceDict, jobID) + self._reportStatus(resourceDict, jobID) - result = self.jobDB.getJobJDL(jobID) - if not result["OK"]: - raise RuntimeError("Failed to get the job JDL") - - resultDict = {} - resultDict["JDL"] = result["Value"] - resultDict["JobID"] = jobID - - matchTime = time.time() - startTime - self.log.verbose("Match time", f"[{str(matchTime)}]") - - # Get some extra stuff into the response returned - resOpt = self.jobDB.getJobOptParameters(jobID) - if resOpt["OK"]: - for key, value in resOpt["Value"].items(): - resultDict[key] = value - resAtt = self.jobDB.getJobAttributes(jobID, ["OwnerDN", "OwnerGroup"]) - if not resAtt["OK"]: - raise RuntimeError("Could not retrieve job attributes") - if not resAtt["Value"]: - raise RuntimeError("No attributes returned for job") - - if self.opsHelper.getValue("JobScheduling/CheckMatchingDelay", True): - self.limiter.updateDelayCounters(resourceDict["Site"], jobID) - - pilotInfoReportedFlag = resourceDict.get("PilotInfoReportedFlag", False) - if not pilotInfoReportedFlag: - self._updatePilotInfo(resourceDict) - self._updatePilotJobMapping(resourceDict, jobID) - - resultDict["DN"] = resAtt["Value"]["OwnerDN"] - resultDict["Group"] = resAtt["Value"]["OwnerGroup"] - resultDict["PilotInfoReportedFlag"] = True - - return resultDict + result = self.jobDB.getJobJDL(jobID) + if not result["OK"]: + raise RuntimeError("Failed to get the job JDL") + + resultDict = {} + resultDict["JDL"] = result["Value"] + resultDict["JobID"] = jobID + + matchTime = time.time() - startTime + self.log.verbose("Match time", f"[{str(matchTime)}]") + + # Get some extra stuff into the response returned + resOpt = self.jobDB.getJobOptParameters(jobID) + if resOpt["OK"]: + for key, value in resOpt["Value"].items(): + resultDict[key] = value + resAtt = self.jobDB.getJobAttributes(jobID, ["OwnerDN", "OwnerGroup"]) + if not resAtt["OK"]: + raise RuntimeError("Could not retrieve job attributes") + if not resAtt["Value"]: + raise RuntimeError("No attributes returned for job") + + if self.opsHelper.getValue("JobScheduling/CheckMatchingDelay", True): + self.limiter.updateDelayCounters(resourceDict["Site"], jobID) + + pilotInfoReportedFlag = resourceDict.get("PilotInfoReportedFlag", False) + if not pilotInfoReportedFlag: + self._updatePilotInfo(resourceDict) + self._updatePilotJobMapping(resourceDict, jobID) + + resultDict["DN"] = resAtt["Value"]["OwnerDN"] + resultDict["Group"] = resAtt["Value"]["OwnerGroup"] + resultDict["PilotInfoReportedFlag"] = True + + return resultDict def _getResourceDict(self, resourceDescription, credDict): """from resourceDescription to resourceDict (just various mods)""" diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 532c5dace93..180c75a9412 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -21,6 +21,7 @@ from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK +from DIRAC.FrameworkSystem.Client.Logger import contextLogger from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient @@ -42,6 +43,8 @@ def __init__(self, parentLogger=None): DB.__init__(self, "JobDB", "WorkloadManagement/JobDB", parentLogger=parentLogger) + self._defaultLogger = self.log + # data member to check if __init__ went through without error self.__initialized = False self.maxRescheduling = self.getCSOption("MaxRescheduling", 3) @@ -64,6 +67,14 @@ def __init__(self, parentLogger=None): self.log.info("==================================================") self.__initialized = True + @property + def log(self): + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value): + self._defaultLogger = value + def isValid(self): """Check if correctly initialised""" return self.__initialized diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py index ee4b16df55e..b00e3ec3af2 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py @@ -11,6 +11,7 @@ from DIRAC import S_ERROR, S_OK from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import TimeUtilities +from DIRAC.FrameworkSystem.Client.Logger import contextLogger MAGIC_EPOC_NUMBER = 1270000000 @@ -24,6 +25,15 @@ def __init__(self, parentLogger=None): """Standard Constructor""" DB.__init__(self, "JobLoggingDB", "WorkloadManagement/JobLoggingDB", parentLogger=parentLogger) + self._defaultLogger = self.log + + @property + def log(self): + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value): + self._defaultLogger = value ############################################################################# def addLoggingRecord( diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index 7d7e3cb873c..0942b4d1fae 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -30,6 +30,7 @@ from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.MySQL import _quotedList +from DIRAC.FrameworkSystem.Client.Logger import contextLogger from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Client import PilotStatus @@ -37,8 +38,17 @@ class PilotAgentsDB(DB): def __init__(self, parentLogger=None): super().__init__("PilotAgentsDB", "WorkloadManagement/PilotAgentsDB", parentLogger=parentLogger) + self._defaultLogger = self.log self.lock = threading.Lock() + @property + def log(self): + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value): + self._defaultLogger = value + ########################################################################################## def addPilotReferences(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): """Add a new pilot job reference""" diff --git a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py index e94fc617e78..c9f97259f06 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -11,6 +11,7 @@ from DIRAC.Core.Security import Properties from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.FrameworkSystem.Client.Logger import contextLogger from DIRAC.WorkloadManagementSystem.private.SharesCorrector import SharesCorrector DEFAULT_GROUP_SHARE = 1000 @@ -37,6 +38,7 @@ class TaskQueueDB(DB): def __init__(self, parentLogger=None): DB.__init__(self, "TaskQueueDB", "WorkloadManagement/TaskQueueDB", parentLogger=parentLogger) + self._defaultLogger = self.log self.__maxJobsInTQ = 5000 self.__defaultCPUSegments = [ 6 * 60, @@ -64,6 +66,14 @@ def __init__(self, parentLogger=None): if not result["OK"]: raise Exception(f"Can't create tables: {result['Message']}") + @property + def log(self): + return contextLogger.get() or self._defaultLogger + + @log.setter + def log(self, value): + self._defaultLogger = value + def enableAllTaskQueues(self): """Enable all Task queues""" return self.updateFields("tq_TaskQueues", updateDict={"Enabled": "1"})