diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py index 4cbed4906e3..fc83c1f4b8f 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py @@ -17,9 +17,9 @@ class HTCondorResourceUsage(ResourceUsage): allow us to get an estimation of the resources usage. """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("HTCondor", "_CONDOR_JOB_AD") + super().__init__("HTCondor", jobID, parameters) def getResourceUsage(self): """Returns S_OK with a dictionary containing the entries WallClock, WallClockLimit, and Unit for current slot.""" @@ -29,8 +29,7 @@ def getResourceUsage(self): # only present on some Sites # - CurrentTime: current time # - JobCurrentStartDate: start of the job execution - jobDescription = os.environ.get("_CONDOR_JOB_AD") - cmd = f"condor_status -ads {jobDescription} -af MaxRuntime CurrentTime-JobCurrentStartDate" + cmd = f"condor_status -ads {self.info_path} -af MaxRuntime CurrentTime-JobCurrentStartDate" result = runCommand(cmd) if not result["OK"]: return S_ERROR("Current batch system is not supported") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py index 47e2c092a30..0456726e449 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py @@ -19,17 +19,14 @@ class LSFResourceUsage(ResourceUsage): This is the LSF plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("LSF", "LSB_JOBID") + super().__init__("LSF", jobID, parameters) - self.queue = os.environ.get("LSB_QUEUE") - self.bin = os.environ.get("LSF_BINDIR") - self.host = os.environ.get("LSB_HOSTS") self.year = time.strftime("%Y", time.gmtime()) self.log.verbose( "LSB_JOBID={}, LSB_QUEUE={}, LSF_BINDIR={}, LSB_HOSTS={}".format( - self.jobID, self.queue, self.bin, self.host + self.jobID, self.queue, self.binary_path, self.host ) ) @@ -39,7 +36,7 @@ def __init__(self): self.wallClockLimit = None self.hostNorm = None - cmd = f"{self.bin}/bqueues -l {self.queue}" + cmd = f"{self.binary_path}/bqueues -l {self.queue}" result = runCommand(cmd) if not result["OK"]: return @@ -73,7 +70,7 @@ def __init__(self): # Now try to get the CPU_FACTOR for this reference CPU, # it must be either a Model, a Host or the largest Model - cmd = f"{self.bin}/lshosts -w {self.cpuRef}" + cmd = f"{self.binary_path}/lshosts -w {self.cpuRef}" result = runCommand(cmd) if result["OK"]: # At CERN this command will return an error since there is no host defined @@ -97,7 +94,7 @@ def __init__(self): if not self.normRef: # Try if there is a model define with the name of cpuRef - cmd = f"{self.bin}/lsinfo -m" + cmd = f"{self.binary_path}/lsinfo -m" result = runCommand(cmd) if result["OK"]: lines = str(result["Value"]).split("\n") @@ -120,7 +117,7 @@ def __init__(self): if not self.normRef: # Now parse LSF configuration files if not os.path.isfile("./lsf.sh"): - os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh") + os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh") # As the variables are not exported, we must force it ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"]) if ret["OK"]: @@ -170,7 +167,7 @@ def __init__(self): # Now get the Normalization for the current Host if self.host: - cmd = f"{self.bin}/lshosts -w {self.host}" + cmd = f"{self.binary_path}/lshosts -w {self.host}" result = runCommand(cmd) if result["OK"]: lines = str(result["Value"]).split("\n") @@ -201,7 +198,7 @@ def getResourceUsage(self): """Returns S_OK with a dictionary containing the entries CPU, CPULimit, WallClock, WallClockLimit, and Unit for current slot. """ - if not self.bin: + if not self.binary_path: return S_ERROR("Could not determine bin directory for LSF") if not self.hostNorm: return S_ERROR("Could not determine host Norm factor") @@ -209,7 +206,7 @@ def getResourceUsage(self): cpu = None wallClock = None - cmd = f"{self.bin}/bjobs -W {self.jobID}" + cmd = f"{self.binary_path}/bjobs -W {self.jobID}" result = runCommand(cmd) if not result["OK"]: return result diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py index 5eff109361f..fa28ef00937 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py @@ -15,14 +15,12 @@ class PBSResourceUsage(ResourceUsage): This is the PBS plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("PBS", "PBS_JOBID") + super().__init__("PBS", jobID, parameters) - self.queue = os.environ.get("PBS_O_QUEUE") - pbsPath = os.environ.get("PBS_O_PATH") - if pbsPath: - os.environ["PATH"] += ":" + pbsPath + if self.binary_path and self.binary_path != "Unknown": + os.environ["PATH"] += ":" + self.binary_path self.log.verbose(f"PBS_JOBID={self.jobID}, PBS_O_QUEUE={self.queue}") self.startTime = time.time() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py index 0fa9483e188..85b260a6e0e 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py @@ -12,10 +12,16 @@ class ResourceUsage: (e.g. getting the time left in a Pilot) """ - def __init__(self, batchSystemName, jobIdEnvVar): + def __init__(self, batchSystemName, jobID, parameters): """Standard constructor""" self.log = gLogger.getSubLogger(f"{batchSystemName}ResourceUsage") - self.jobID = os.environ.get(jobIdEnvVar) + self.jobID = jobID + + # Parameters + self.binary_path = parameters.get("BinaryPath") + self.info_path = parameters.get("InfoPath") + self.host = parameters.get("Host") + self.queue = parameters.get("Queue") def getResourceUsage(self): """Returns S_OK with a dictionary that can contain entries: diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py index ea6bc090e02..95e21fcf5ba 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py @@ -16,14 +16,12 @@ class SGEResourceUsage(ResourceUsage): This is the SGE plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("SGE", "JOB_ID") + super().__init__("SGE", jobID, parameters) - self.queue = os.environ.get("QUEUE") - sgePath = os.environ.get("SGE_BINARY_PATH") - if sgePath: - os.environ["PATH"] += ":" + sgePath + if self.binary_path and self.binary_path != "Unknown": + os.environ["PATH"] += ":" + self.binary_path self.log.verbose(f"JOB_ID={self.jobID}, QUEUE={self.queue}") self.startTime = time.time() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py index 05e96a3ac14..09a6d26bef5 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py @@ -11,9 +11,9 @@ class SLURMResourceUsage(ResourceUsage): This is the SLURM plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("SLURM", "SLURM_JOB_ID") + super().__init__("SLURM", jobID, parameters) self.log.verbose(f"JOB_ID={self.jobID}") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py index 30013df9e3d..ecb5d8ddf83 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py @@ -15,6 +15,7 @@ import DIRAC from DIRAC import gLogger, gConfig, S_OK, S_ERROR +from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.Subprocess import systemCall @@ -30,7 +31,7 @@ def __init__(self): if not self.cpuPower: self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}") - result = self.__getBatchSystemPlugin() + result = self._getBatchSystemPlugin() if result["OK"]: self.batchPlugin = result["Value"] else: @@ -66,7 +67,9 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1): """ # Quit if no norm factor available if not self.cpuPower: - return S_ERROR(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}") + return S_ERROR( + DErrno.ESECTION, f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}" + ) if not self.batchPlugin: return S_ERROR(self.batchError) @@ -127,33 +130,24 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1): self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.2f}") return S_OK(cpuWorkLeft) - def __getBatchSystemPlugin(self): + def _getBatchSystemPlugin(self): """Using the name of the batch system plugin, will return an instance of the plugin class.""" - batchSystems = { - "LSF": "LSB_JOBID", - "PBS": "PBS_JOBID", - "BQS": "QSUB_REQNAME", - "SGE": "SGE_TASK_ID", - "SLURM": "SLURM_JOB_ID", - "HTCondor": "_CONDOR_JOB_AD", - } # more to be added later - name = None - for batchSystem, envVar in batchSystems.items(): - if envVar in os.environ: - name = batchSystem - break - - if name is None: + batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem") + type = batchSystemInfo.get("Type") + jobID = batchSystemInfo.get("JobID") + parameters = batchSystemInfo.get("Parameters") + + if not type or type == "Unknown": self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported") - return S_ERROR("Current batch system is not supported") + return S_ERROR(DErrno.ERESUNK, "Current batch system is not supported") - self.log.debug(f"Creating plugin for {name} batch system") + self.log.debug(f"Creating plugin for {type} batch system") - result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{name}ResourceUsage") + result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{type}ResourceUsage") if not result["OK"]: return result batchClass = result["Value"] - batchInstance = batchClass() + batchInstance = batchClass(jobID, parameters) return S_OK(batchInstance) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py new file mode 100644 index 00000000000..f481b487857 --- /dev/null +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_HTCondorResourceUsage.py @@ -0,0 +1,38 @@ +""" Test class for SGEResourceUsage utility +""" + +import pytest + +from DIRAC import S_OK +from DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage import HTCondorResourceUsage + + +HTCONDOR_OUT_0 = "86400 3600" +HTCONDOR_OUT_1 = "undefined 3600" +HTCONDOR_OUT_2 = "" + + +def test_getResourceUsage(mocker): + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage.runCommand", + side_effect=[S_OK(HTCONDOR_OUT_0), S_OK(HTCONDOR_OUT_1), S_OK(HTCONDOR_OUT_2)], + ) + + # First test: everything is fine + htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"}) + res = htcondorResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["WallClock"] == 3600 + assert res["Value"]["WallClockLimit"] == 86400 + + # Second test: MaxRuntime is undefined + htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"}) + res = htcondorResourceUsage.getResourceUsage() + assert not res["OK"] + assert res["Message"] == "Current batch system is not supported" + + # Third test: empty output + htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"}) + res = htcondorResourceUsage.getResourceUsage() + assert not res["OK"] + assert res["Message"] == "Current batch system is not supported" diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py index 3b03a6ff3a6..653265f1962 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py @@ -1,13 +1,13 @@ """ Test class for LSFResourceUsage utility """ -import os import pytest from DIRAC import S_OK, S_ERROR from DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage import LSFResourceUsage +# Sample outputs for LSF batch system commands LSF_KEK_BQUEUES = """ CPULIMIT 720.0 min @@ -19,6 +19,23 @@ b66 SLC6_64 i6_16 2.5 16 29M 19M Yes (intel share aishare cvmfs wan exe lcg wigner slot15) """ + +def test_getResourceUsageBasic(mocker): + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", + side_effect=[S_OK(LSF_KEK_BQUEUES), S_OK(LSF_LSHOSTS)], + ) + + lsfResourceUsage = LSFResourceUsage("1234", {"Host": "b66", "InfoPath": "Unknown"}) + cpuLimitExpected = 720 * 60 / 2.5 + wallClockLimitExpected = 1440 * 60 / 2.5 + + # Verify that the calculated limits match the expected values + assert lsfResourceUsage.cpuLimit == cpuLimitExpected + assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected + + +# Additional test data for a more specific setup (e.g., CERN) LSF_CERN_BQUEUES = """ CPULIMIT 10080.0 min of KSI2K @@ -26,72 +43,33 @@ 30240.0 min of KSI2K """ -# returns with S_ERROR LSF_CERN_LSHOSTS_1 = """KSI2K: unknown host name. """ -# shortened LSF_CERN_LSINFO = """MODEL_NAME CPU_FACTOR ARCHITECTURE i6_12_62d7h20_266 3.06 ai_intel_8 2.44 """ -@pytest.mark.parametrize( - "runCommandResult, lsbHosts, cpuLimitExpected, wallClockLimitExpected", - [((S_OK(LSF_KEK_BQUEUES), S_OK(LSF_LSHOSTS)), "b66", (720 * 60 / 2.5), (1440 * 60 / 2.5))], -) -def test_init(mocker, runCommandResult, lsbHosts, cpuLimitExpected, wallClockLimitExpected): - mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult - ) - mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts}) - - lsfResourceUsage = LSFResourceUsage() - assert lsfResourceUsage.cpuLimit == cpuLimitExpected - assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected - - -@pytest.mark.parametrize( - "runCommandResult, sourceEnvResult, lsbHosts, lsfEnvdir, cpuLimitExpected, \ - wallClockLimitExpected, normrefExpected, hostnormExpected, cpuRef", - [ - ( - (S_OK(LSF_CERN_BQUEUES), S_ERROR(LSF_CERN_LSHOSTS_1), S_OK(LSF_CERN_LSINFO), S_OK(LSF_LSHOSTS)), - S_ERROR("no lsf.sh"), - "b66", - "/dev/null", - 241920, - 725760, - 1.0, - 2.5, - "KSI2K", - ) - ], -) -def test_init_cern( - mocker, - runCommandResult, - sourceEnvResult, - lsbHosts, - lsfEnvdir, - cpuLimitExpected, - wallClockLimitExpected, - normrefExpected, - hostnormExpected, - cpuRef, -): +def test_getResourceUsageCern(mocker): mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", + side_effect=[S_OK(LSF_CERN_BQUEUES), S_ERROR(LSF_CERN_LSHOSTS_1), S_OK(LSF_CERN_LSINFO), S_OK(LSF_LSHOSTS)], ) mocker.patch("os.path.isfile", return_value=True) - mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts, "LSF_ENVDIR": lsfEnvdir}) mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.sourceEnv", return_value=sourceEnvResult + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.sourceEnv", return_value=S_ERROR("no lsf.sh") ) - lsfResourceUsage = LSFResourceUsage() + lsfResourceUsage = LSFResourceUsage(1234, {"Host": "b66", "InfoPath": "/dev/null"}) + cpuLimitExpected = 241920 + wallClockLimitExpected = 725760 + normrefExpected = 1.0 + hostnormExpected = 2.5 + cpuRef = "KSI2K" + # Verify that the calculated values and references match the expected values assert lsfResourceUsage.cpuLimit == cpuLimitExpected assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected assert lsfResourceUsage.cpuRef == cpuRef diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py index 17557162d70..30b2097f9bb 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py @@ -70,34 +70,16 @@ """ -@pytest.mark.parametrize( - "runCommandResult, \ - cpuLimitExpected, wallClockLimitExpected", - [ - ( - [S_OK(RRCKI_OUT)], - 154967.0, - 156150.0, - ) - ], -) -def test_getResourcUsage( - mocker, - runCommandResult, - cpuLimitExpected, - wallClockLimitExpected, -): +def test_getResourcUsage(mocker): mocker.patch( "DIRAC.Resources.Computing.BatchSystems.TimeLeft.PBSResourceUsage.runCommand", - side_effect=runCommandResult, + side_effect=[S_OK(RRCKI_OUT)], ) mocker.patch("os.path.isfile", return_value=True) - mocker.patch.dict(os.environ, {"PBS_O_QUEUE": "lhcb", "PBS_O_QPATH": "/some/path"}) - pbsRU = PBSResourceUsage() - pbsRU.jobID = "55755440.seer.t1.grid.kiae.ru" + pbsRU = PBSResourceUsage("55755440.seer.t1.grid.kiae.ru", {"Queue": "lhcb", "BinaryPath": "/some/path"}) res = pbsRU.getResourceUsage() assert res["OK"], res["Message"] assert len(res["Value"]) == 4 - assert res["Value"]["CPU"] == cpuLimitExpected # pylint: disable=invalid-sequence-index - assert res["Value"]["WallClock"] == wallClockLimitExpected # pylint: disable=invalid-sequence-index + assert res["Value"]["CPU"] == 154967.0 # pylint: disable=invalid-sequence-index + assert res["Value"]["WallClock"] == 156150.0 # pylint: disable=invalid-sequence-index diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py index 81ff9da5ab0..70461e091c0 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py @@ -4,52 +4,54 @@ import pytest from DIRAC import S_OK - -# sut from DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage import SGEResourceUsage -RESULT_FROM_SGE = """ -job_number: 7448711 -exec_file: job_scripts/7448711 -submission_time: Fri Feb 26 19:56:58 2021 -owner: pltlhcb001 -uid: 110476 -group: pltlhcb -gid: 110013 -sge_o_path: /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin -sge_o_host: grendel -account: sge + +RESULT_FROM_SGE = """============================================================== +job_number: 12345 +exec_file: job_scripts/12345 +submission_time: Wed Apr 11 09:36:41 2012 +owner: lhcb049 +uid: 18416 +group: lhcb +gid: 155 +sge_o_home: /home/lhcb049 +sge_o_log_name: lhcb049 +sge_o_path: /opt/sge/bin/lx24-amd64:/usr/bin:/bin +sge_o_shell: /bin/sh +sge_o_workdir: /var/glite/tmp +sge_o_host: cccreamceli05 +account: GRID=EGI SITE=IN2P3-CC TIER=tier1 VO=lhcb ROLEVOMS=&2Flhcb&2FRole=pilot&2FCapability=NULL merge: y -hard resource_list: decom=FALSE,s_vmem=3994M,h_vmem=3994M -mail_list: pltlhcb001@grendel.private.dns.zone +hard resource_list: os=sl5,s_cpu=1000,s_vmem=5120M,s_fsize=51200M,cvmfs=1,dcache=1 +mail_list: lhcb049@cccreamceli05.in2p3.fr notify: FALSE -job_name: arc60158374 -priority: -512 +job_name: cccreamceli05_crm05_749996134 +stdout_path_list: NONE:NONE:/dev/null jobshare: 0 -hard_queue_list: grid7 +hard_queue_list: huge restart: n -shell_list: NONE:/bin/sh -env_list: TERM=NONE -script_file: STDIN -parallel environment: smp range: 8 -project: grid -binding: NONE -job_type: NONE -usage 1: cpu=00:00:23, mem=0.76117 GB s, io=0.08399 GB, vmem=384.875M, maxvmem=384.875M -binding 1: NONE +shell_list: NONE:/bin/bash +env_list: SITE_NAME=IN2P3-CC,MANPATH=/opt/sge/man:/usr/share/man:/usr/local/man:/usr/local/share/man +script_file: /tmp/crm05_749996134 +project: P_lhcb_pilot +usage 1: cpu=00:01:00, mem=0.03044 GBs, io=0.19846, vmem=288.609M, maxvmem=288.609M scheduling info: (Collecting of scheduler job information is turned off) """ -@pytest.mark.parametrize( - "runCommandResult, cpuLimitExpected, wallClockLimitExpected", - [((S_OK(RESULT_FROM_SGE), S_OK("bof")), (720 * 60 / 2.5), (1440 * 60 / 2.5))], -) -def test_init(mocker, runCommandResult, cpuLimitExpected, wallClockLimitExpected): +def test_getResourceUsage(mocker): mocker.patch( - "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage.runCommand", side_effect=runCommandResult + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage.runCommand", + return_value=S_OK(RESULT_FROM_SGE), ) - sgeResourceUsage = SGEResourceUsage() + sgeResourceUsage = SGEResourceUsage("1234", {"Queue": "Test"}) res = sgeResourceUsage.getResourceUsage() - assert not res["OK"], res["Message"] + + assert res["OK"] + assert res["Value"]["CPU"] == 60 + assert res["Value"]["CPULimit"] == 1000 + # WallClock is random and don't know why, so not testing it + # assert res["Value"]["WallClock"] == 0.01 + assert res["Value"]["WallClockLimit"] == 1250 diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py new file mode 100644 index 00000000000..575aa1363f1 --- /dev/null +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SLURMResourceUsage.py @@ -0,0 +1,56 @@ +""" Test class for SLURMResourceUsage utility +""" +import pytest + +from DIRAC import S_OK +from DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage import SLURMResourceUsage + + +SLURM_OUT_SUCCESS_0 = "12345,86400,24,3600,03:00:00" +SLURM_OUT_SUCCESS_1 = "56789,86400,24,3600,4-03:00:00" +SLURM_OUT_SUCCESS_2 = "19283,9000,10,900,30:00" +SLURM_OUT_ERROR = "" + + +def test_getResourceUsageSuccess(mocker): + """Here we want to make sure that wallclock limit is correctly interpreted""" + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage.runCommand", + side_effect=[ + S_OK(SLURM_OUT_SUCCESS_0), + S_OK(SLURM_OUT_SUCCESS_1), + S_OK(SLURM_OUT_SUCCESS_2), + S_OK(SLURM_OUT_ERROR), + ], + ) + # Get resource usage of job 12345: number of processors (24), wallclocktime limit expressed in hours + slurmResourceUsage = SLURMResourceUsage("12345", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["CPU"] == 86400 + assert res["Value"]["CPULimit"] == 259200 + assert res["Value"]["WallClock"] == 3600 + assert res["Value"]["WallClockLimit"] == 10800 + + # Get resource usage of job 56789: same number of processors (24), wallclocktime limit expressed in days + slurmResourceUsage = SLURMResourceUsage("56789", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["CPU"] == 86400 + assert res["Value"]["CPULimit"] == 8553600 + assert res["Value"]["WallClock"] == 3600 + assert res["Value"]["WallClockLimit"] == 356400 + + # Get resource usage of job 19283: different number of processors (10), wallclocktime limit expressed in minutes + slurmResourceUsage = SLURMResourceUsage("19283", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert res["OK"], res["Message"] + assert res["Value"]["CPU"] == 9000 + assert res["Value"]["CPULimit"] == 18000 + assert res["Value"]["WallClock"] == 900 + assert res["Value"]["WallClockLimit"] == 1800 + + # Get resource usage of job 00000: job does not exist + slurmResourceUsage = SLURMResourceUsage("0000", {"Queue": "Test"}) + res = slurmResourceUsage.getResourceUsage() + assert not res["OK"], res["Value"] diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py index 5ec8c522392..1ed870bcda9 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py @@ -1,143 +1,68 @@ """ Test TimeLeft utility - - (Partially) tested here are SGE and LSF, PBS is TO-DO """ +from DIRAC import S_OK +from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft -# pylint: disable=invalid-name -from importlib import import_module, reload -import pytest +def test_cpuPowerNotDefined(mocker): + """Test cpuPower not defined""" + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": "SLURM", "JobID": "12345", "Parameters": {}}) -from DIRAC import S_OK, gLogger -from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft + tl = TimeLeft() + res = tl.getTimeLeft() + assert not res["OK"] + assert "/LocalSite/CPUNormalizationFactor not defined" in res["Message"] + + +def test_batchSystemNotDefined(mocker): + """Test batch system not defined""" + mocker.patch("DIRAC.gConfig.getSections", return_value={}) + + tl = TimeLeft() + tl.cpuPower = 10 + res = tl.getTimeLeft() + assert not res["OK"] + assert "Current batch system is not supported" in res["Message"] -gLogger.setLevel("DEBUG") - -SGE_OUT = """============================================================== -job_number: 12345 -exec_file: job_scripts/12345 -submission_time: Wed Apr 11 09:36:41 2012 -owner: lhcb049 -uid: 18416 -group: lhcb -gid: 155 -sge_o_home: /home/lhcb049 -sge_o_log_name: lhcb049 -sge_o_path: /opt/sge/bin/lx24-amd64:/usr/bin:/bin -sge_o_shell: /bin/sh -sge_o_workdir: /var/glite/tmp -sge_o_host: celi05 -account: GRID=EGI SITE=IN2P3-CC TIER=tier1 VO=lhcb ROLEVOMS=&2Flhcb&2FRole=pilot&2FCapability=NULL -merge: y -hard resource_list: os=sl5,s_cpu=1000,s_vmem=5120M,s_fsize=51200M,cvmfs=1,dcache=1 -mail_list: lhcb049@celi05.in2p3.fr -notify: FALSE -job_name: celi05_crm05_749996134 -stdout_path_list: NONE:NONE:/dev/null -jobshare: 0 -hard_queue_list: huge -restart: n -shell_list: NONE:/bin/bash -env_list: SITE_NAME=IN2P3-CC,MANPATH=/opt/sge/man:/usr/share/man:/usr/local/man:/usr/local/share/man -script_file: /tmp/crm05_749996134 -project: P_lhcb_pilot -usage 1: cpu=00:01:00, mem=0.03044 GBs, io=0.19846, vmem=288.609M, maxvmem=288.609M -scheduling info: (Collecting of scheduler job information is turned off)""" - -PBS_OUT = "bla" - -LSF_OUT = ( - "JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME PROJ_NAME CPU_USED MEM" - " SWAP PIDS START_TIME FINISH_TIME\n" - "12345 user RUN q1 host1 p01 job1 12/31-20:51:42 default" - " 00:00:60.00 6267 40713 25469,14249 12/31-20:52:00 -" -) - - -SLURM_OUT_0 = "12345,86400,24,3600,03:00:00" -SLURM_OUT_1 = "12345,86400,24,3600,4-03:00:00" -SLURM_OUT_2 = "12345,21600,24,900,30:00" -SLURM_OUT_3 = "12345,43200,24,1800,30:00" -SLURM_OUT_4 = "" - -HTCONDOR_OUT_0 = "86400 3600" -HTCONDOR_OUT_1 = "undefined 3600" -HTCONDOR_OUT_2 = "" - - -@pytest.mark.parametrize( - "batch, requiredVariables, returnValue, expected", - [ - ("LSF", {}, LSF_OUT, 0.0), - ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0}, LSF_OUT, 0.0), - ("SGE", {}, SGE_OUT, 300.0), - ("SLURM", {}, SLURM_OUT_0, 432000.0), - ("SLURM", {}, SLURM_OUT_1, 432000.0), - ("SLURM", {}, SLURM_OUT_2, 108000.0), - ("SLURM", {}, SLURM_OUT_3, 216000.0), - ("SLURM", {}, SLURM_OUT_4, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_0, 18000.0), - ("HTCondor", {}, HTCONDOR_OUT_1, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_2, 0.0), - ], -) -def test_getScaledCPU(mocker, batch, requiredVariables, returnValue, expected): +def test_getScaledCPU(mocker): """Test getScaledCPU()""" - mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue)) + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage.runCommand", + return_value=S_OK("19283,9000,10,900,30:00"), + ) + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": "SLURM", "JobID": "12345", "Parameters": {}}) + tl = TimeLeft() + + # Test 1: no normalization res = tl.getScaledCPU() assert res == 0 + # Test 2: normalization tl.cpuPower = 5.0 + res = tl.getScaledCPU() + assert res == 45000 - batchSystemName = f"{batch}ResourceUsage" - batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - batchPlugin = import_module(batchSystemPath) - # Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time - reload(batchPlugin) - tl.batchPlugin = getattr(batchPlugin, batchSystemName)() - - # Update attributes of the batch systems to get scaled CPU - tl.batchPlugin.__dict__.update(requiredVariables) - res = tl.getScaledCPU() - assert res == expected - - -@pytest.mark.parametrize( - "batch, requiredVariables, returnValue, expected_1, expected_2", - [ - ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0, "cpuLimit": 1000, "wallClockLimit": 1000}, LSF_OUT, True, 9400.0), - ("SGE", {}, SGE_OUT, True, 9400.0), - ("SLURM", {}, SLURM_OUT_0, True, 72000.0), - ("SLURM", {}, SLURM_OUT_1, True, 3528000.0), - ("SLURM", {}, SLURM_OUT_2, True, 9000.0), - ("SLURM", {}, SLURM_OUT_3, True, 0.0), - ("SLURM", {}, SLURM_OUT_4, False, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_0, True, 828000), - ("HTCondor", {}, HTCONDOR_OUT_1, False, 0.0), - ("HTCondor", {}, HTCONDOR_OUT_2, False, 0.0), - ], -) -def test_getTimeLeft(mocker, batch, requiredVariables, returnValue, expected_1, expected_2): +def test_getTimeLeft(mocker): """Test getTimeLeft()""" - mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue)) - tl = TimeLeft() + mocker.patch( + "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SLURMResourceUsage.runCommand", + return_value=S_OK("19283,9000,10,900,30:00"), + ) + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": "SLURM", "JobID": "12345", "Parameters": {}}) - batchSystemName = f"{batch}ResourceUsage" - batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - batchPlugin = import_module(batchSystemPath) - # Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time - reload(batchPlugin) - tl.batchPlugin = getattr(batchPlugin, batchSystemName)() + tl = TimeLeft() + # Test 1: CPU power = 10 tl.cpuPower = 10.0 + res = tl.getTimeLeft() + assert res["OK"] + assert res["Value"] == 9000 - # Update attributes of the batch systems to get scaled CPU - tl.batchPlugin.__dict__.update(requiredVariables) - + # Test 2: CPU power = 15 + tl.cpuPower = 15.0 res = tl.getTimeLeft() - assert res["OK"] is expected_1 - if res["OK"]: - assert res["Value"] == expected_2 + assert res["OK"] + assert res["Value"] == 13500 diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index 528970778f4..ee55a84ee3f 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -283,6 +283,9 @@ def execute(self): if queue: self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False) + if batchSystem := gConfig.getValue("/LocalSite/BatchSystem/Type", ""): + self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False) + self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") result = self._submitJob( jobID=jobID,