From 53f617c15dea2c60bd32e1ecebbc8f62c9b315af Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 10 Nov 2023 15:52:04 +0100 Subject: [PATCH 1/3] feat: getting batch system info from pilot --- .../TimeLeft/HTCondorResourceUsage.py | 7 +- .../BatchSystems/TimeLeft/LSFResourceUsage.py | 23 +-- .../BatchSystems/TimeLeft/MJFResourceUsage.py | 4 +- .../BatchSystems/TimeLeft/PBSResourceUsage.py | 10 +- .../BatchSystems/TimeLeft/ResourceUsage.py | 10 +- .../BatchSystems/TimeLeft/SGEResourceUsage.py | 10 +- .../TimeLeft/SLURMResourceUsage.py | 4 +- .../BatchSystems/TimeLeft/TimeLeft.py | 71 +++---- .../test/Test_HTCondorResourceUsage.py | 38 ++++ .../TimeLeft/test/Test_LSFResourceUsage.py | 80 +++----- .../TimeLeft/test/Test_PBSResourceUsage.py | 27 +-- .../TimeLeft/test/Test_SGEResourceUsage.py | 70 +++---- .../TimeLeft/test/Test_SLURMResourceUsage.py | 56 ++++++ .../TimeLeft/test/Test_TimeLeft.py | 174 +++++------------- 14 files changed, 268 insertions(+), 316 deletions(-) create mode 100644 src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py create mode 100644 src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py index 4cbed4906e3..fc83c1f4b8f 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py @@ -17,9 +17,9 @@ class HTCondorResourceUsage(ResourceUsage): allow us to get an estimation of the resources usage. """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("HTCondor", "_CONDOR_JOB_AD") + super().__init__("HTCondor", jobID, parameters) def getResourceUsage(self): """Returns S_OK with a dictionary containing the entries WallClock, WallClockLimit, and Unit for current slot.""" @@ -29,8 +29,7 @@ def getResourceUsage(self): # only present on some Sites # - CurrentTime: current time # - JobCurrentStartDate: start of the job execution - jobDescription = os.environ.get("_CONDOR_JOB_AD") - cmd = f"condor_status -ads {jobDescription} -af MaxRuntime CurrentTime-JobCurrentStartDate" + cmd = f"condor_status -ads {self.info_path} -af MaxRuntime CurrentTime-JobCurrentStartDate" result = runCommand(cmd) if not result["OK"]: return S_ERROR("Current batch system is not supported") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py index 47e2c092a30..0456726e449 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py @@ -19,17 +19,14 @@ class LSFResourceUsage(ResourceUsage): This is the LSF plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("LSF", "LSB_JOBID") + super().__init__("LSF", jobID, parameters) - self.queue = os.environ.get("LSB_QUEUE") - self.bin = os.environ.get("LSF_BINDIR") - self.host = os.environ.get("LSB_HOSTS") self.year = time.strftime("%Y", time.gmtime()) self.log.verbose( "LSB_JOBID={}, LSB_QUEUE={}, LSF_BINDIR={}, LSB_HOSTS={}".format( - self.jobID, self.queue, self.bin, self.host + self.jobID, self.queue, self.binary_path, self.host ) ) @@ -39,7 +36,7 @@ def __init__(self): self.wallClockLimit = None self.hostNorm = None - cmd = f"{self.bin}/bqueues -l {self.queue}" + cmd = f"{self.binary_path}/bqueues -l {self.queue}" result = runCommand(cmd) if not result["OK"]: return @@ -73,7 +70,7 @@ def __init__(self): # Now try to get the CPU_FACTOR for this reference CPU, # it must be either a Model, a Host or the largest Model - cmd = f"{self.bin}/lshosts -w {self.cpuRef}" + cmd = f"{self.binary_path}/lshosts -w {self.cpuRef}" result = runCommand(cmd) if result["OK"]: # At CERN this command will return an error since there is no host defined @@ -97,7 +94,7 @@ def __init__(self): if not self.normRef: # Try if there is a model define with the name of cpuRef - cmd = f"{self.bin}/lsinfo -m" + cmd = f"{self.binary_path}/lsinfo -m" result = runCommand(cmd) if result["OK"]: lines = str(result["Value"]).split("\n") @@ -120,7 +117,7 @@ def __init__(self): if not self.normRef: # Now parse LSF configuration files if not os.path.isfile("./lsf.sh"): - os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh") + os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh") # As the variables are not exported, we must force it ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"]) if ret["OK"]: @@ -170,7 +167,7 @@ def __init__(self): # Now get the Normalization for the current Host if self.host: - cmd = f"{self.bin}/lshosts -w {self.host}" + cmd = f"{self.binary_path}/lshosts -w {self.host}" result = runCommand(cmd) if result["OK"]: lines = str(result["Value"]).split("\n") @@ -201,7 +198,7 @@ def getResourceUsage(self): """Returns S_OK with a dictionary containing the entries CPU, CPULimit, WallClock, WallClockLimit, and Unit for current slot. """ - if not self.bin: + if not self.binary_path: return S_ERROR("Could not determine bin directory for LSF") if not self.hostNorm: return S_ERROR("Could not determine host Norm factor") @@ -209,7 +206,7 @@ def getResourceUsage(self): cpu = None wallClock = None - cmd = f"{self.bin}/bjobs -W {self.jobID}" + cmd = f"{self.binary_path}/bjobs -W {self.jobID}" result = runCommand(cmd) if not result["OK"]: return result diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py index 95ee93c2faa..b419bea9b5f 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py @@ -15,9 +15,9 @@ class MJFResourceUsage(ResourceUsage): """ ############################################################################# - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("MJF", "JOB_ID") + super().__init__("MJF", jobID, parameters) self.queue = os.environ.get("QUEUE") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py index 5eff109361f..fa28ef00937 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py @@ -15,14 +15,12 @@ class PBSResourceUsage(ResourceUsage): This is the PBS plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("PBS", "PBS_JOBID") + super().__init__("PBS", jobID, parameters) - self.queue = os.environ.get("PBS_O_QUEUE") - pbsPath = os.environ.get("PBS_O_PATH") - if pbsPath: - os.environ["PATH"] += ":" + pbsPath + if self.binary_path and self.binary_path != "Unknown": + os.environ["PATH"] += ":" + self.binary_path self.log.verbose(f"PBS_JOBID={self.jobID}, PBS_O_QUEUE={self.queue}") self.startTime = time.time() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py index 0fa9483e188..85b260a6e0e 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py @@ -12,10 +12,16 @@ class ResourceUsage: (e.g. getting the time left in a Pilot) """ - def __init__(self, batchSystemName, jobIdEnvVar): + def __init__(self, batchSystemName, jobID, parameters): """Standard constructor""" self.log = gLogger.getSubLogger(f"{batchSystemName}ResourceUsage") - self.jobID = os.environ.get(jobIdEnvVar) + self.jobID = jobID + + # Parameters + self.binary_path = parameters.get("BinaryPath") + self.info_path = parameters.get("InfoPath") + self.host = parameters.get("Host") + self.queue = parameters.get("Queue") def getResourceUsage(self): """Returns S_OK with a dictionary that can contain entries: diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py index ea6bc090e02..95e21fcf5ba 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py @@ -16,14 +16,12 @@ class SGEResourceUsage(ResourceUsage): This is the SGE plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("SGE", "JOB_ID") + super().__init__("SGE", jobID, parameters) - self.queue = os.environ.get("QUEUE") - sgePath = os.environ.get("SGE_BINARY_PATH") - if sgePath: - os.environ["PATH"] += ":" + sgePath + if self.binary_path and self.binary_path != "Unknown": + os.environ["PATH"] += ":" + self.binary_path self.log.verbose(f"JOB_ID={self.jobID}, QUEUE={self.queue}") self.startTime = time.time() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py index 05e96a3ac14..09a6d26bef5 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py @@ -11,9 +11,9 @@ class SLURMResourceUsage(ResourceUsage): This is the SLURM plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("SLURM", "SLURM_JOB_ID") + super().__init__("SLURM", jobID, parameters) self.log.verbose(f"JOB_ID={self.jobID}") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py index e2c8a506b64..91f4573c208 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py @@ -9,12 +9,13 @@ With this information the utility can calculate in normalized units the CPU time remaining for a given slot. """ -import os import shlex import DIRAC from DIRAC import gLogger, gConfig, S_OK, S_ERROR +from DIRAC.Core.Utilities import DErrno +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.Subprocess import systemCall @@ -29,7 +30,7 @@ def __init__(self): if not self.cpuPower: self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}") - result = self.__getBatchSystemPlugin() + result = self._getBatchSystemPlugin() if result["OK"]: self.batchPlugin = result["Value"] else: @@ -65,7 +66,9 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1): """ # Quit if no norm factor available if not self.cpuPower: - return S_ERROR(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}") + return S_ERROR( + DErrno.ESECTION, f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}" + ) if not self.batchPlugin: return S_ERROR(self.batchError) @@ -126,54 +129,24 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1): self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.02f}") return S_OK(cpuWorkLeft) - def __getBatchSystemPlugin(self): + def _getBatchSystemPlugin(self): """Using the name of the batch system plugin, will return an instance of the plugin class.""" - batchSystems = { - "LSF": "LSB_JOBID", - "PBS": "PBS_JOBID", - "BQS": "QSUB_REQNAME", - "SGE": "SGE_TASK_ID", - "SLURM": "SLURM_JOB_ID", - "HTCondor": "_CONDOR_JOB_AD", - } # more to be added later - name = None - for batchSystem, envVar in batchSystems.items(): - if envVar in os.environ: - name = batchSystem - break - - if name is None and "MACHINEFEATURES" in os.environ and "JOBFEATURES" in os.environ: - # Only use MJF if legacy batch system information not available for now - name = "MJF" - - if name is None: + batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem") + type = batchSystemInfo.get("Type") + jobID = batchSystemInfo.get("JobID") + parameters = batchSystemInfo.get("Parameters") + + if not type or type == "Unknown": self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported") - return S_ERROR("Current batch system is not supported") - - self.log.debug(f"Creating plugin for {name} batch system") - try: - batchSystemName = f"{name}ResourceUsage" - batchPlugin = __import__( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.%s" - % batchSystemName, # pylint: disable=unused-variable - globals(), - locals(), - [batchSystemName], - ) - except ImportError as x: - msg = f"Could not import DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - self.log.warn(x) - self.log.warn(msg) - return S_ERROR(msg) - - try: - batchStr = f"batchPlugin.{batchSystemName}()" - batchInstance = eval(batchStr) - except Exception as x: # pylint: disable=broad-except - msg = f"Could not instantiate {batchSystemName}()" - self.log.warn(x) - self.log.warn(msg) - return S_ERROR(msg) + return S_ERROR(DErrno.ERESUNK, "Current batch system is not supported") + + self.log.debug(f"Creating plugin for {type} batch system") + + result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{type}ResourceUsage") + if not result["OK"]: + return result + batchClass = result["Value"] + batchInstance = batchClass(jobID, parameters) return S_OK(batchInstance) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py new file mode 100644 index 00000000000..f481b487857 --- /dev/null +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py @@ -0,0 +1,38 @@ +""" Test class for SGEResourceUsage utility +""" + +import pytest + +from DIRAC import S_OK +from DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage import HTCondorResourceUsage + + +HTCONDOR_OUT_0 = "86400 3600" +HTCONDOR_OUT_1 = "undefined 3600" +HTCONDOR_OUT_2 = "" + + +def test_getResourceUsage(mocker): + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage.runCommand", + side_effect=[S_OK(HTCONDOR_OUT_0), S_OK(HTCONDOR_OUT_1), S_OK(HTCONDOR_OUT_2)], + ) + + # First test: everything is fine + htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"}) + res = htcondorResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["WallClock"] == 3600 + assert res["Value"]["WallClockLimit"] == 86400 + + # Second test: MaxRuntime is undefined + htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"}) + res = htcondorResourceUsage.getResourceUsage() + assert not res["OK"] + assert res["Message"] == "Current batch system is not supported" + + # Third test: empty output + htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"}) + res = htcondorResourceUsage.getResourceUsage() + assert not res["OK"] + assert res["Message"] == "Current batch system is not supported" diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py index 3b03a6ff3a6..653265f1962 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py @@ -1,13 +1,13 @@ """ Test class for LSFResourceUsage utility """ -import os import pytest from DIRAC import S_OK, S_ERROR from DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage import LSFResourceUsage +# Sample outputs for LSF batch system commands LSF_KEK_BQUEUES = """ CPULIMIT 720.0 min @@ -19,6 +19,23 @@ b66 SLC6_64 i6_16 2.5 16 29M 19M Yes (intel share aishare cvmfs wan exe lcg wigner slot15) """ + +def test_getResourceUsageBasic(mocker): + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", + side_effect=[S_OK(LSF_KEK_BQUEUES), S_OK(LSF_LSHOSTS)], + ) + + lsfResourceUsage = LSFResourceUsage("1234", {"Host": "b66", "InfoPath": "Unknown"}) + cpuLimitExpected = 720 * 60 / 2.5 + wallClockLimitExpected = 1440 * 60 / 2.5 + + # Verify that the calculated limits match the expected values + assert lsfResourceUsage.cpuLimit == cpuLimitExpected + assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected + + +# Additional test data for a more specific setup (e.g., CERN) LSF_CERN_BQUEUES = """ CPULIMIT 10080.0 min of KSI2K @@ -26,72 +43,33 @@ 30240.0 min of KSI2K """ -# returns with S_ERROR LSF_CERN_LSHOSTS_1 = """KSI2K: unknown host name. """ -# shortened LSF_CERN_LSINFO = """MODEL_NAME CPU_FACTOR ARCHITECTURE i6_12_62d7h20_266 3.06 ai_intel_8 2.44 """ -@pytest.mark.parametrize( - "runCommandResult, lsbHosts, cpuLimitExpected, wallClockLimitExpected", - [((S_OK(LSF_KEK_BQUEUES), S_OK(LSF_LSHOSTS)), "b66", (720 * 60 / 2.5), (1440 * 60 / 2.5))], -) -def test_init(mocker, runCommandResult, lsbHosts, cpuLimitExpected, wallClockLimitExpected): - mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult - ) - mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts}) - - lsfResourceUsage = LSFResourceUsage() - assert lsfResourceUsage.cpuLimit == cpuLimitExpected - assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected - - -@pytest.mark.parametrize( - "runCommandResult, sourceEnvResult, lsbHosts, lsfEnvdir, cpuLimitExpected, \ - wallClockLimitExpected, normrefExpected, hostnormExpected, cpuRef", - [ - ( - (S_OK(LSF_CERN_BQUEUES), S_ERROR(LSF_CERN_LSHOSTS_1), S_OK(LSF_CERN_LSINFO), S_OK(LSF_LSHOSTS)), - S_ERROR("no lsf.sh"), - "b66", - "/dev/null", - 241920, - 725760, - 1.0, - 2.5, - "KSI2K", - ) - ], -) -def test_init_cern( - mocker, - runCommandResult, - sourceEnvResult, - lsbHosts, - lsfEnvdir, - cpuLimitExpected, - wallClockLimitExpected, - normrefExpected, - hostnormExpected, - cpuRef, -): +def test_getResourceUsageCern(mocker): mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", + side_effect=[S_OK(LSF_CERN_BQUEUES), S_ERROR(LSF_CERN_LSHOSTS_1), S_OK(LSF_CERN_LSINFO), S_OK(LSF_LSHOSTS)], ) mocker.patch("os.path.isfile", return_value=True) - mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts, "LSF_ENVDIR": lsfEnvdir}) mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.sourceEnv", return_value=sourceEnvResult + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.sourceEnv", return_value=S_ERROR("no lsf.sh") ) - lsfResourceUsage = LSFResourceUsage() + lsfResourceUsage = LSFResourceUsage(1234, {"Host": "b66", "InfoPath": "/dev/null"}) + cpuLimitExpected = 241920 + wallClockLimitExpected = 725760 + normrefExpected = 1.0 + hostnormExpected = 2.5 + cpuRef = "KSI2K" + # Verify that the calculated values and references match the expected values assert lsfResourceUsage.cpuLimit == cpuLimitExpected assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected assert lsfResourceUsage.cpuRef == cpuRef diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py index 17557162d70..0f08189a65d 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py @@ -70,34 +70,17 @@ """ -@pytest.mark.parametrize( - "runCommandResult, \ - cpuLimitExpected, wallClockLimitExpected", - [ - ( - [S_OK(RRCKI_OUT)], - 154967.0, - 156150.0, - ) - ], -) -def test_getResourcUsage( - mocker, - runCommandResult, - cpuLimitExpected, - wallClockLimitExpected, -): +def test_getResourcUsage(mocker): mocker.patch( "DIRAC.Resources.Computing.BatchSystems.TimeLeft.PBSResourceUsage.runCommand", - side_effect=runCommandResult, + side_effect=[S_OK(RRCKI_OUT)], ) mocker.patch("os.path.isfile", return_value=True) mocker.patch.dict(os.environ, {"PBS_O_QUEUE": "lhcb", "PBS_O_QPATH": "/some/path"}) - pbsRU = PBSResourceUsage() - pbsRU.jobID = "55755440.seer.t1.grid.kiae.ru" + pbsRU = PBSResourceUsage("55755440.seer.t1.grid.kiae.ru", {"Queue": "lhcb", "BinaryPath": "/some/path"}) res = pbsRU.getResourceUsage() assert res["OK"], res["Message"] assert len(res["Value"]) == 4 - assert res["Value"]["CPU"] == cpuLimitExpected # pylint: disable=invalid-sequence-index - assert res["Value"]["WallClock"] == wallClockLimitExpected # pylint: disable=invalid-sequence-index + assert res["Value"]["CPU"] == 154967.0 # pylint: disable=invalid-sequence-index + assert res["Value"]["WallClock"] == 156150.0 # pylint: disable=invalid-sequence-index diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py index 81ff9da5ab0..3aac6ddb5dc 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py @@ -4,52 +4,54 @@ import pytest from DIRAC import S_OK - -# sut from DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage import SGEResourceUsage -RESULT_FROM_SGE = """ -job_number: 7448711 -exec_file: job_scripts/7448711 -submission_time: Fri Feb 26 19:56:58 2021 -owner: pltlhcb001 -uid: 110476 -group: pltlhcb -gid: 110013 -sge_o_path: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin -sge_o_host: grendel -account: sge + +RESULT_FROM_SGE = """============================================================== +job_number: 12345 +exec_file: job_scripts/12345 +submission_time: Wed Apr 11 09:36:41 2012 +owner: lhcb049 +uid: 18416 +group: lhcb +gid: 155 +sge_o_home: /home/lhcb049 +sge_o_log_name: lhcb049 +sge_o_path: /opt/sge/bin/lx24-amd64:/usr/bin:/bin +sge_o_shell: /bin/sh +sge_o_workdir: /var/glite/tmp +sge_o_host: cccreamceli05 +account: GRID=EGI SITE=IN2P3-CC TIER=tier1 VO=lhcb ROLEVOMS=&2Flhcb&2FRole=pilot&2FCapability=NULL merge: y -hard resource_list: decom=FALSE,s_vmem=3994M,h_vmem=3994M -mail_list: pltlhcb001@grendel.private.dns.zone +hard resource_list: os=sl5,s_cpu=1000,s_vmem=5120M,s_fsize=51200M,cvmfs=1,dcache=1 +mail_list: lhcb049@cccreamceli05.in2p3.fr notify: FALSE -job_name: arc60158374 -priority: -512 +job_name: cccreamceli05_crm05_749996134 +stdout_path_list: NONE:NONE:/dev/null jobshare: 0 -hard_queue_list: grid7 +hard_queue_list: huge restart: n -shell_list: NONE:/bin/sh -env_list: TERM=NONE -script_file: STDIN -parallel environment: smp range: 8 -project: grid -binding: NONE -job_type: NONE -usage 1: cpu=00:00:23, mem=0.76117 GB s, io=0.08399 GB, vmem=384.875M, maxvmem=384.875M -binding 1: NONE +shell_list: NONE:/bin/bash +env_list: SITE_NAME=IN2P3-CC,MANPATH=/opt/sge/man:/usr/share/man:/usr/local/man:/usr/local/share/man +script_file: /tmp/crm05_749996134 +project: P_lhcb_pilot +usage 1: cpu=00:01:00, mem=0.03044 GBs, io=0.19846, vmem=288.609M, maxvmem=288.609M scheduling info: (Collecting of scheduler job information is turned off) """ -@pytest.mark.parametrize( - "runCommandResult, cpuLimitExpected, wallClockLimitExpected", - [((S_OK(RESULT_FROM_SGE), S_OK("bof")), (720 * 60 / 2.5), (1440 * 60 / 2.5))], -) -def test_init(mocker, runCommandResult, cpuLimitExpected, wallClockLimitExpected): +def test_getResourceUsage(mocker): mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage.runCommand", side_effect=runCommandResult + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage.runCommand", + return_value=S_OK(RESULT_FROM_SGE), ) sgeResourceUsage = SGEResourceUsage() res = sgeResourceUsage.getResourceUsage() - assert not res["OK"], res["Message"] + + assert res["OK"] + assert res["Value"]["CPU"] == 60 + assert res["Value"]["CPULimit"] == 1000 + # WallClock is random and don't know why, so not testing it + # assert res["Value"]["WallClock"] == 0.01 + assert res["Value"]["WallClockLimit"] == 1250 diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py new file mode 100644 index 00000000000..575aa1363f1 --- /dev/null +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py @@ -0,0 +1,56 @@ +""" Test class for SLURMResourceUsage utility +""" +import pytest + +from DIRAC import S_OK +from DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage import SLURMResourceUsage + + +SLURM_OUT_SUCCESS_0 = "12345,86400,24,3600,03:00:00" +SLURM_OUT_SUCCESS_1 = "56789,86400,24,3600,4-03:00:00" +SLURM_OUT_SUCCESS_2 = "19283,9000,10,900,30:00" +SLURM_OUT_ERROR = "" + + +def test_getResourceUsageSuccess(mocker): + """Here we want to make sure that wallclock limit is correctly interpreted""" + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage.runCommand", + side_effect=[ + S_OK(SLURM_OUT_SUCCESS_0), + S_OK(SLURM_OUT_SUCCESS_1), + S_OK(SLURM_OUT_SUCCESS_2), + S_OK(SLURM_OUT_ERROR), + ], + ) + # Get resource usage of job 12345: number of processors (24), wallclocktime limit expressed in hours + slurmResourceUsage = SLURMResourceUsage("12345", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["CPU"] == 86400 + assert res["Value"]["CPULimit"] == 259200 + assert res["Value"]["WallClock"] == 3600 + assert res["Value"]["WallClockLimit"] == 10800 + + # Get resource usage of job 56789: same number of processors (24), wallclocktime limit expressed in days + slurmResourceUsage = SLURMResourceUsage("56789", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["CPU"] == 86400 + assert res["Value"]["CPULimit"] == 8553600 + assert res["Value"]["WallClock"] == 3600 + assert res["Value"]["WallClockLimit"] == 356400 + + # Get resource usage of job 19283: different number of processors (10), wallclocktime limit expressed in minutes + slurmResourceUsage = SLURMResourceUsage("19283", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["CPU"] == 9000 + assert res["Value"]["CPULimit"] == 18000 + assert res["Value"]["WallClock"] == 900 + assert res["Value"]["WallClockLimit"] == 1800 + + # Get resource usage of job 00000: job does not exist + slurmResourceUsage = SLURMResourceUsage("0000", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert not res["OK"], res["Value"] diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py index 836e6336327..2a6f5e7fcac 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py @@ -1,146 +1,70 @@ """ Test TimeLeft utility - - (Partially) tested here are SGE and LSF, PBS is TO-DO """ - import pytest -from importlib import reload -from DIRAC import S_OK, gLogger +from DIRAC import S_OK from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft -gLogger.setLevel("DEBUG") - -SGE_OUT = """============================================================== -job_number: 12345 -exec_file: job_scripts/12345 -submission_time: Wed Apr 11 09:36:41 2012 -owner: lhcb049 -uid: 18416 -group: lhcb -gid: 155 -sge_o_home: /home/lhcb049 -sge_o_log_name: lhcb049 -sge_o_path: /opt/sge/bin/lx24-amd64:/usr/bin:/bin -sge_o_shell: /bin/sh -sge_o_workdir: /var/glite/tmp -sge_o_host: cccreamceli05 -account: GRID=EGI SITE=IN2P3-CC TIER=tier1 VO=lhcb ROLEVOMS=&2Flhcb&2FRole=pilot&2FCapability=NULL -merge: y -hard resource_list: os=sl5,s_cpu=1000,s_vmem=5120M,s_fsize=51200M,cvmfs=1,dcache=1 -mail_list: lhcb049@cccreamceli05.in2p3.fr -notify: FALSE -job_name: cccreamceli05_crm05_749996134 -stdout_path_list: NONE:NONE:/dev/null -jobshare: 0 -hard_queue_list: huge -restart: n -shell_list: NONE:/bin/bash -env_list: SITE_NAME=IN2P3-CC,MANPATH=/opt/sge/man:/usr/share/man:/usr/local/man:/usr/local/share/man -script_file: /tmp/crm05_749996134 -project: P_lhcb_pilot -usage 1: cpu=00:01:00, mem=0.03044 GBs, io=0.19846, vmem=288.609M, maxvmem=288.609M -scheduling info: (Collecting of scheduler job information is turned off)""" - -PBS_OUT = "bla" - -LSF_OUT = ( - "JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME PROJ_NAME CPU_USED MEM" - " SWAP PIDS START_TIME FINISH_TIME\n" - "12345 user RUN q1 host1 p01 job1 12/31-20:51:42 default" - " 00:00:60.00 6267 40713 25469,14249 12/31-20:52:00 -" -) - -MJF_OUT = "0" - -SLURM_OUT_0 = "12345,86400,24,3600,03:00:00" -SLURM_OUT_1 = "12345,86400,24,3600,4-03:00:00" -SLURM_OUT_2 = "12345,21600,24,900,30:00" -SLURM_OUT_3 = "12345,43200,24,1800,30:00" -SLURM_OUT_4 = "" - -HTCONDOR_OUT_0 = "86400 3600" -HTCONDOR_OUT_1 = "undefined 3600" -HTCONDOR_OUT_2 = "" - - -@pytest.mark.parametrize( - "batch, requiredVariables, returnValue, expected", - [ - ("LSF", {}, LSF_OUT, 0.0), - ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0}, LSF_OUT, 0.0), - ("MJF", {}, MJF_OUT, 0.0), - ("SGE", {}, SGE_OUT, 300.0), - ("SLURM", {}, SLURM_OUT_0, 432000.0), - ("SLURM", {}, SLURM_OUT_1, 432000.0), - ("SLURM", {}, SLURM_OUT_2, 108000.0), - ("SLURM", {}, SLURM_OUT_3, 216000.0), - ("SLURM", {}, SLURM_OUT_4, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_0, 18000.0), - ("HTCondor", {}, HTCONDOR_OUT_1, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_2, 0.0), - ], -) -def test_getScaledCPU(mocker, batch, requiredVariables, returnValue, expected): +def test_cpuPowerNotDefined(mocker): + """Test cpuPower not defined""" + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": "SLURM", "JobID": "12345", "Parameters": {}}) + + tl = TimeLeft() + res = tl.getTimeLeft() + assert not res["OK"] + assert "/LocalSite/CPUNormalizationFactor not defined" in res["Message"] + + +def test_batchSystemNotDefined(mocker): + """Test batch system not defined""" + mocker.patch("DIRAC.gConfig.getSections", return_value={}) + + tl = TimeLeft() + tl.cpuPower = 10 + res = tl.getTimeLeft() + assert not res["OK"] + assert res["Message"] == "Current batch system is not supported" + + +def test_getScaledCPU(mocker): """Test getScaledCPU()""" - mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue)) + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage.runCommand", + return_value=S_OK("19283,9000,10,900,30:00"), + ) + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": "SLURM", "JobID": "12345", "Parameters": {}}) + tl = TimeLeft() + + # Test 1: no normalization res = tl.getScaledCPU() assert res == 0 + # Test 2: normalization tl.cpuPower = 5.0 + res = tl.getScaledCPU() + assert res == 45000 - batchSystemName = f"{batch}ResourceUsage" - batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - batchPlugin = __import__(batchSystemPath, globals(), locals(), [batchSystemName]) # pylint: disable=unused-variable - # Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time - reload(batchPlugin) - - batchStr = f"batchPlugin.{batchSystemName}()" - tl.batchPlugin = eval(batchStr) - - # Update attributes of the batch systems to get scaled CPU - tl.batchPlugin.__dict__.update(requiredVariables) - res = tl.getScaledCPU() - assert res == expected - - -@pytest.mark.parametrize( - "batch, requiredVariables, returnValue, expected_1, expected_2", - [ - ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0, "cpuLimit": 1000, "wallClockLimit": 1000}, LSF_OUT, True, 9400.0), - ("SGE", {}, SGE_OUT, True, 9400.0), - ("SLURM", {}, SLURM_OUT_0, True, 72000.0), - ("SLURM", {}, SLURM_OUT_1, True, 3528000.0), - ("SLURM", {}, SLURM_OUT_2, True, 9000.0), - ("SLURM", {}, SLURM_OUT_3, True, 0.0), - ("SLURM", {}, SLURM_OUT_4, False, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_0, True, 828000), - ("HTCondor", {}, HTCONDOR_OUT_1, False, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_2, False, 0.0), - ], -) -def test_getTimeLeft(mocker, batch, requiredVariables, returnValue, expected_1, expected_2): +def test_getTimeLeft(mocker): """Test getTimeLeft()""" - mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue)) - tl = TimeLeft() + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage.runCommand", + return_value=S_OK("19283,9000,10,900,30:00"), + ) + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": "SLURM", "JobID": "12345", "Parameters": {}}) - batchSystemName = f"{batch}ResourceUsage" - batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - batchPlugin = __import__(batchSystemPath, globals(), locals(), [batchSystemName]) - # Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time - reload(batchPlugin) + tl = TimeLeft() - batchStr = f"batchPlugin.{batchSystemName}()" - tl.batchPlugin = eval(batchStr) + # Test 1: CPU power = 10 tl.cpuPower = 10.0 + res = tl.getTimeLeft() + assert res["OK"] + assert res["Value"] == 9000 - # Update attributes of the batch systems to get scaled CPU - tl.batchPlugin.__dict__.update(requiredVariables) - + # Test 2: CPU power = 15 + tl.cpuPower = 15.0 res = tl.getTimeLeft() - assert res["OK"] is expected_1 - if res["OK"]: - assert res["Value"] == expected_2 + assert res["OK"] + assert res["Value"] == 13500 From 281473a3e8f09ad921deaa6ec7ca708927c14d49 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 10 Nov 2023 15:53:37 +0100 Subject: [PATCH 2/3] feat: add batchSystem type in jobParameters --- .../BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py | 1 - .../BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py | 2 +- .../Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py | 2 +- src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py | 3 +++ 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py index 0f08189a65d..30b2097f9bb 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py @@ -76,7 +76,6 @@ def test_getResourcUsage(mocker): side_effect=[S_OK(RRCKI_OUT)], ) mocker.patch("os.path.isfile", return_value=True) - mocker.patch.dict(os.environ, {"PBS_O_QUEUE": "lhcb", "PBS_O_QPATH": "/some/path"}) pbsRU = PBSResourceUsage("55755440.seer.t1.grid.kiae.ru", {"Queue": "lhcb", "BinaryPath": "/some/path"}) res = pbsRU.getResourceUsage() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py index 3aac6ddb5dc..70461e091c0 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py @@ -46,7 +46,7 @@ def test_getResourceUsage(mocker): return_value=S_OK(RESULT_FROM_SGE), ) - sgeResourceUsage = SGEResourceUsage() + sgeResourceUsage = SGEResourceUsage("1234", {"Queue": "Test"}) res = sgeResourceUsage.getResourceUsage() assert res["OK"] diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py index 2a6f5e7fcac..b9c56e559a7 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py @@ -24,7 +24,7 @@ def test_batchSystemNotDefined(mocker): tl.cpuPower = 10 res = tl.getTimeLeft() assert not res["OK"] - assert res["Message"] == "Current batch system is not supported" + assert "Current batch system is not supported" in res["Message"] def test_getScaledCPU(mocker): diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index be86ab2001d..0022c36fc25 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -286,6 +286,9 @@ def execute(self): if queue: self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False) + if batchSystem := gConfig.getValue("/LocalSite/BatchSystem/Type", ""): + self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False) + self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") result = self._submitJob( jobID=jobID, From db93b46be366b5aabe31a5ccde9276bc7760f7fe Mon Sep 17 00:00:00 2001 From: aldbr Date: Thu, 7 Dec 2023 11:16:37 +0100 Subject: [PATCH 3/3] fix: make TimeLeft backward compatible --- .../BatchSystems/TimeLeft/MJFResourceUsage.py | 2 - .../BatchSystems/TimeLeft/TimeLeft.py | 42 +++++++++++++++++++ .../TimeLeft/test/Test_TimeLeft.py | 19 ++++++++- 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py index b419bea9b5f..5027813e180 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py @@ -19,8 +19,6 @@ def __init__(self, jobID, parameters): """Standard constructor""" super().__init__("MJF", jobID, parameters) - self.queue = os.environ.get("QUEUE") - self.log.verbose(f"jobID={self.jobID}, queue={self.queue}") self.startTime = time.time() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py index 91f4573c208..2499c55efc4 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py @@ -9,6 +9,7 @@ With this information the utility can calculate in normalized units the CPU time remaining for a given slot. """ +import os import shlex import DIRAC @@ -136,6 +137,47 @@ def _getBatchSystemPlugin(self): jobID = batchSystemInfo.get("JobID") parameters = batchSystemInfo.get("Parameters") + ########################################################################################### + # TODO: remove the following block in v9.0 + # This is a temporary fix for the case where the batch system is not set in the configuration + if not type: + self.log.warn( + "Batch system info not set in local configuration: trying to guess from environment variables." + ) + self.log.warn("Consider updating your pilot version before switching to v9.0.") + batchSystemInfo = { + "LSF": { + "JobID": "LSB_JOBID", + "Parameters": { + "BinaryPath": "LSB_BINDIR", + "Host": "LSB_HOSTS", + "InfoPath": "LSB_ENVDIR", + "Queue": "LSB_QUEUE", + }, + }, + "PBS": {"JobID": "PBS_JOBID", "Parameters": {"BinaryPath": "PBS_O_PATH", "Queue": "PBS_O_QUEUE"}}, + "SGE": {"JobID": "SGE_TASK_ID", "Parameters": {"BinaryPath": "SGE_BINARY_PATH", "Queue": "QUEUE"}}, + "SLURM": {"JobID": "SLURM_JOB_ID", "Parameters": {}}, + "HTCondor": {"JobID": "HTCONDOR_JOBID", "Parameters": {"InfoPath": "_CONDOR_JOB_AD"}}, + } + type = None + for batchSystem, attributes in batchSystemInfo.items(): + if attributes["JobID"] in os.environ: + type = batchSystem + jobID = os.environ[attributes["JobID"]] + parameters = {} + for parameterName, parameterVariable in attributes["Parameters"].items(): + parameters[parameterName] = os.environ.get(parameterVariable) + break + + if not type and "MACHINEFEATURES" in os.environ and "JOBFEATURES" in os.environ: + # Only use MJF if legacy batch system information not available for now + type = "MJF" + jobID = os.environ.get("JOB_ID") + parameters = {"Queue": os.environ.get("QUEUE")} + + ########################################################################################### + if not type or type == "Unknown": self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported") return S_ERROR(DErrno.ERESUNK, "Current batch system is not supported") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py index b9c56e559a7..b2093caa938 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py @@ -1,7 +1,5 @@ """ Test TimeLeft utility """ -import pytest - from DIRAC import S_OK from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft @@ -27,6 +25,23 @@ def test_batchSystemNotDefined(mocker): assert "Current batch system is not supported" in res["Message"] +def test_batchSystemNotDefinedInConfigButInEnvironmentVariables(mocker, monkeypatch): + """Test batch system not defined but present in environment variables (should fail from v9.0)""" + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage.runCommand", + return_value=S_OK("9000 800"), + ) + mocker.patch("DIRAC.gConfig.getSections", return_value={}) + monkeypatch.setenv("HTCONDOR_JOBID", "12345.0") + monkeypatch.setenv("_CONDOR_JOB_AD", "/path/to/config") + + tl = TimeLeft() + tl.cpuPower = 10 + res = tl.getTimeLeft() + assert res["OK"] + assert res["Value"] == 82000 + + def test_getScaledCPU(mocker): """Test getScaledCPU()""" mocker.patch(