Skip to content

Commit

Permalink
Merge pull request #7954 from aldbr/cherry-pick-2-d5aebd11d-integration
Browse files Browse the repository at this point in the history
[sweep:integration] fix(wms): correctly log the pilot job reference during the matching process
  • Loading branch information
fstagni authored Dec 17, 2024
2 parents 58472fd + 01b5238 commit e75f7dd
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,68 @@ This option can not be modified in the children of *gLogger*, even by
*gLogger* itself after the configuration, so the children receive
the *gLogger* configuration.

Add variables to different *Logging* objects depending on the context
---------------------------------------------------------------------

In complex cases, it can be useful to have loggers that change depending on
the execution context, without having to pass logger instances explicitly
through multiple layers of function calls.

Python's `contextvars` module provides context-local storage, which can be used
to store and retrieve context-specific data, such as logger instances.

gLogger supports the use of context variables to manage loggers in a flexible way.

Provide a Context Logger
~~~~~~~~~~~~~~~~~~~~~~~~

When you have a *Logging* instance that you want to use in a specific context,
you can set it in the context variable:

::

# Create a logger instance
logger = gLogger.getSubLogger("MyContextLogger")

# Set it in the context variable
contextLogger.set(logger)

Then, the instances within the context block will use the shared *Logging* object
set in the context variable:

::

with setContextLogger(contextualLogger):
# Any logging within this block will use contextualLogger
obj = MyClass()
obj.do_something() # This will use contextualLogger

Consume a Context Logger
~~~~~~~~~~~~~~~~~~~~~~~~

In functions or classes that need to log messages, you can retrieve the logger
from the context variable:

::

class MyClass:
def __init__(self):
# Get the default logger if no context logger is set
self._defaultLogger = gLogger.getSubLogger("MyClass")

@property
def log(self):
# Return the context logger if set, otherwise the default logger
return contextLogger.get() or self._defaultLogger

@log.setter
def log(self, value):
# Optionally, allow setting a new default logger
self._defaultLogger = value

def do_something(self):
self.log.notice("Doing something")

Some examples and summaries
---------------------------

Expand Down
4 changes: 4 additions & 0 deletions src/DIRAC/FrameworkSystem/Client/Logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from DIRAC.FrameworkSystem.private.standardLogging.LoggingContext import contextLogger, setContextLogger
from DIRAC.FrameworkSystem.private.standardLogging.LoggingRoot import LoggingRoot

gLogger = LoggingRoot()


def getLogger():
return gLogger


__all__ = ["contextLogger", "setContextLogger", "getLogger"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
""" Logging context module"""

# Context variable for the logger (adapted to the request of the pilot reference)
import contextvars
from contextlib import contextmanager

contextLogger = contextvars.ContextVar("Logger", default=None)


@contextmanager
def setContextLogger(logger_name):
token = contextLogger.set(logger_name)
try:
yield
finally:
contextLogger.reset(token)
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
""" Test the context variable logger """

from DIRAC import gLogger
from DIRAC.FrameworkSystem.private.standardLogging.Logging import Logging
from DIRAC.FrameworkSystem.private.standardLogging.test.TestLogUtilities import gLoggerReset
from DIRAC.FrameworkSystem.private.standardLogging.LoggingContext import contextLogger, setContextLogger


class A:
def __init__(self):
# Get the logger from the context variable
self._defaultLogger = gLogger.getSubLogger("A")

# Use a property to get and set the logger, this is necessary to use the context variable
@property
def log(self):
return contextLogger.get() or self._defaultLogger

@log.setter
def log(self, value: Logging):
self._defaultLogger = value

def do_something(self):
self.log.notice("A is doing something")


class B:
def __init__(self, a: A, pilotRef: str = None):
self.a = A()

# Get the logger from the context variable
if pilotRef:
self.log = gLogger.getLocalSubLogger(f"[{pilotRef}]B")
contextLogger.set(self.log)
else:
self.log = gLogger.getSubLogger("B")

def do_something_else(self):
with setContextLogger(self.log):
self.a.do_something()
self.log.notice("B is doing something else")


def test_contextvar_logger():
capturedBackend, log, sublog = gLoggerReset()

# Create an instance of A
a = A()

# Create an instance of B and call its method without setting the pilotRef
# Log signature coming from A and B should be different
b1 = B(a)
b1.do_something_else()
assert "Framework/B NOTICE: A is doing something" in capturedBackend.getvalue()
assert "Framework/B NOTICE: B is doing something else" in capturedBackend.getvalue()

# Create an instance of B and call its method with setting the pilotRef
# Log signature coming from A and B should be similar because of the pilotRef
capturedBackend.truncate(0)

b2 = B(a, "pilotRef")
b2.do_something_else()
assert "Framework/[pilotRef]B NOTICE: A is doing something" in capturedBackend.getvalue()
assert "Framework/[pilotRef]B NOTICE: B is doing something else" in capturedBackend.getvalue()

# Now we check that the logger of b1 is not the same as the logger of b2 (b1 should still use its own logger)
capturedBackend.truncate(0)

b1.do_something_else()
assert "Framework/B NOTICE: A is doing something" in capturedBackend.getvalue()
assert "Framework/B NOTICE: B is doing something else" in capturedBackend.getvalue()
157 changes: 77 additions & 80 deletions src/DIRAC/WorkloadManagementSystem/Client/Matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Security import Properties
from DIRAC.Core.Utilities.PrettyPrint import printDict
from DIRAC.FrameworkSystem.Client.Logger import setContextLogger
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus
from DIRAC.WorkloadManagementSystem.Client.Limiter import Limiter
Expand Down Expand Up @@ -50,11 +51,7 @@ def __init__(self, pilotAgentsDB=None, jobDB=None, tqDB=None, jlDB=None, opsHelp
self.opsHelper = Operations()

if pilotRef:
self.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher")
self.pilotAgentsDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher")
self.jobDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher")
self.tqDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher")
self.jlDB.log = gLogger.getSubLogger(f"[{pilotRef}]Matcher")
self.log = gLogger.getLocalSubLogger(f"[{pilotRef}]Matcher")
else:
self.log = gLogger.getSubLogger("Matcher")

Expand All @@ -64,86 +61,86 @@ def __init__(self, pilotAgentsDB=None, jobDB=None, tqDB=None, jlDB=None, opsHelp

def selectJob(self, resourceDescription, credDict):
"""Main job selection function to find the highest priority job matching the resource capacity"""
with setContextLogger(self.log):
startTime = time.time()

resourceDict = self._getResourceDict(resourceDescription, credDict)

# Make a nice print of the resource matching parameters
toPrintDict = dict(resourceDict)
if "MaxRAM" in resourceDescription:
toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"]
if "NumberOfProcessors" in resourceDescription:
toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"]
toPrintDict["Tag"] = []
if "Tag" in resourceDict:
for tag in resourceDict["Tag"]:
if not tag.endswith("GB") and not tag.endswith("Processors"):
toPrintDict["Tag"].append(tag)
if not toPrintDict["Tag"]:
toPrintDict.pop("Tag")
self.log.info("Resource description for matching", printDict(toPrintDict))

negativeCond = self.limiter.getNegativeCondForSite(resourceDict["Site"], resourceDict.get("GridCE"))
result = self.tqDB.matchAndGetJob(resourceDict, negativeCond=negativeCond)

startTime = time.time()

resourceDict = self._getResourceDict(resourceDescription, credDict)

# Make a nice print of the resource matching parameters
toPrintDict = dict(resourceDict)
if "MaxRAM" in resourceDescription:
toPrintDict["MaxRAM"] = resourceDescription["MaxRAM"]
if "NumberOfProcessors" in resourceDescription:
toPrintDict["NumberOfProcessors"] = resourceDescription["NumberOfProcessors"]
toPrintDict["Tag"] = []
if "Tag" in resourceDict:
for tag in resourceDict["Tag"]:
if not tag.endswith("GB") and not tag.endswith("Processors"):
toPrintDict["Tag"].append(tag)
if not toPrintDict["Tag"]:
toPrintDict.pop("Tag")
self.log.info("Resource description for matching", printDict(toPrintDict))

negativeCond = self.limiter.getNegativeCondForSite(resourceDict["Site"], resourceDict.get("GridCE"))
result = self.tqDB.matchAndGetJob(resourceDict, negativeCond=negativeCond)

if not result["OK"]:
raise RuntimeError(result["Message"])
result = result["Value"]
if not result["matchFound"]:
self.log.info("No match found")
return {}

jobID = result["jobId"]
resAtt = self.jobDB.getJobAttributes(jobID, ["Status"])
if not resAtt["OK"]:
raise RuntimeError("Could not retrieve job attributes")
if not resAtt["Value"]:
raise RuntimeError("No attributes returned for job")
if not resAtt["Value"]["Status"] == "Waiting":
self.log.error("Job matched by the TQ is not in Waiting state", str(jobID))
result = self.tqDB.deleteJob(jobID)
if not result["OK"]:
raise RuntimeError(result["Message"])
raise RuntimeError(f"Job {str(jobID)} is not in Waiting state")
result = result["Value"]
if not result["matchFound"]:
self.log.info("No match found")
return {}

jobID = result["jobId"]
resAtt = self.jobDB.getJobAttributes(jobID, ["Status"])
if not resAtt["OK"]:
raise RuntimeError("Could not retrieve job attributes")
if not resAtt["Value"]:
raise RuntimeError("No attributes returned for job")
if not resAtt["Value"]["Status"] == "Waiting":
self.log.error("Job matched by the TQ is not in Waiting state", str(jobID))
result = self.tqDB.deleteJob(jobID)
if not result["OK"]:
raise RuntimeError(result["Message"])
raise RuntimeError(f"Job {str(jobID)} is not in Waiting state")

self._reportStatus(resourceDict, jobID)
self._reportStatus(resourceDict, jobID)

result = self.jobDB.getJobJDL(jobID)
if not result["OK"]:
raise RuntimeError("Failed to get the job JDL")

resultDict = {}
resultDict["JDL"] = result["Value"]
resultDict["JobID"] = jobID

matchTime = time.time() - startTime
self.log.verbose("Match time", f"[{str(matchTime)}]")

# Get some extra stuff into the response returned
resOpt = self.jobDB.getJobOptParameters(jobID)
if resOpt["OK"]:
for key, value in resOpt["Value"].items():
resultDict[key] = value
resAtt = self.jobDB.getJobAttributes(jobID, ["Owner", "OwnerGroup"])
if not resAtt["OK"]:
raise RuntimeError("Could not retrieve job attributes")
if not resAtt["Value"]:
raise RuntimeError("No attributes returned for job")

if self.opsHelper.getValue("JobScheduling/CheckMatchingDelay", True):
self.limiter.updateDelayCounters(resourceDict["Site"], jobID)

pilotInfoReportedFlag = resourceDict.get("PilotInfoReportedFlag", False)
if not pilotInfoReportedFlag:
self._updatePilotInfo(resourceDict)
self._updatePilotJobMapping(resourceDict, jobID)

resultDict["Owner"] = resAtt["Value"]["Owner"]
resultDict["Group"] = resAtt["Value"]["OwnerGroup"]
resultDict["PilotInfoReportedFlag"] = True

return resultDict
result = self.jobDB.getJobJDL(jobID)
if not result["OK"]:
raise RuntimeError("Failed to get the job JDL")

resultDict = {}
resultDict["JDL"] = result["Value"]
resultDict["JobID"] = jobID

matchTime = time.time() - startTime
self.log.verbose("Match time", f"[{str(matchTime)}]")

# Get some extra stuff into the response returned
resOpt = self.jobDB.getJobOptParameters(jobID)
if resOpt["OK"]:
for key, value in resOpt["Value"].items():
resultDict[key] = value
resAtt = self.jobDB.getJobAttributes(jobID, ["Owner", "OwnerGroup"])
if not resAtt["OK"]:
raise RuntimeError("Could not retrieve job attributes")
if not resAtt["Value"]:
raise RuntimeError("No attributes returned for job")

if self.opsHelper.getValue("JobScheduling/CheckMatchingDelay", True):
self.limiter.updateDelayCounters(resourceDict["Site"], jobID)

pilotInfoReportedFlag = resourceDict.get("PilotInfoReportedFlag", False)
if not pilotInfoReportedFlag:
self._updatePilotInfo(resourceDict)
self._updatePilotJobMapping(resourceDict, jobID)

resultDict["Owner"] = resAtt["Value"]["Owner"]
resultDict["Group"] = resAtt["Value"]["OwnerGroup"]
resultDict["PilotInfoReportedFlag"] = True

return resultDict

def _getResourceDict(self, resourceDescription, credDict):
"""from resourceDescription to resourceDict (just various mods)"""
Expand Down
11 changes: 11 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
Expand All @@ -42,6 +43,8 @@ def __init__(self, parentLogger=None):

DB.__init__(self, "JobDB", "WorkloadManagement/JobDB", parentLogger=parentLogger)

self._defaultLogger = self.log

# data member to check if __init__ went through without error
self.__initialized = False
self.maxRescheduling = self.getCSOption("MaxRescheduling", 3)
Expand All @@ -64,6 +67,14 @@ def __init__(self, parentLogger=None):
self.log.info("==================================================")
self.__initialized = True

@property
def log(self):
return contextLogger.get() or self._defaultLogger

@log.setter
def log(self, value):
self._defaultLogger = value

def isValid(self):
"""Check if correctly initialised"""
return self.__initialized
Expand Down
Loading

0 comments on commit e75f7dd

Please sign in to comment.