From 39e1017a32f1e31091f964878d0d6c44d3fc8167 Mon Sep 17 00:00:00 2001 From: aldbr Date: Fri, 10 Nov 2023 15:53:37 +0100 Subject: [PATCH] feat: add batchSystem type in jobParameters --- .../BatchSystems/TimeLeft/LSFResourceUsage.py | 2 +- .../BatchSystems/TimeLeft/MJFResourceUsage.py | 4 +- .../BatchSystems/TimeLeft/ResourceUsage.py | 8 ++-- .../BatchSystems/TimeLeft/TimeLeft.py | 4 +- .../TimeLeft/TorqueResourceUsage.py | 4 +- .../TimeLeft/test/Test_LSFResourceUsage.py | 6 +-- .../TimeLeft/test/Test_PBSResourceUsage.py | 4 +- .../TimeLeft/test/Test_SGEResourceUsage.py | 2 +- .../TimeLeft/test/Test_TimeLeft.py | 39 +++++-------------- .../Agent/JobAgent.py | 4 ++ 10 files changed, 28 insertions(+), 49 deletions(-) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py index 4404b7e5955..0456726e449 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py @@ -117,7 +117,7 @@ def __init__(self, jobID, parameters): if not self.normRef: # Now parse LSF configuration files if not os.path.isfile("./lsf.sh"): - os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh") + os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh") # As the variables are not exported, we must force it ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"]) if ret["OK"]: diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py index 95ee93c2faa..b419bea9b5f 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py @@ -15,9 +15,9 @@ class MJFResourceUsage(ResourceUsage): """ ############################################################################# - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("MJF", "JOB_ID") + super().__init__("MJF", jobID, parameters) self.queue = os.environ.get("QUEUE") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py index 7a509a7ea97..85b260a6e0e 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py @@ -18,10 +18,10 @@ def __init__(self, batchSystemName, jobID, parameters): self.jobID = jobID # Parameters - self.binary_path = parameters["BinaryPath"] - self.info_path = parameters["InfoPath"] - self.host = parameters["Host"] - self.queue = parameters["Queue"] + self.binary_path = parameters.get("BinaryPath") + self.info_path = parameters.get("InfoPath") + self.host = parameters.get("Host") + self.queue = parameters.get("Queue") def getResourceUsage(self): """Returns S_OK with a dictionary that can contain entries: diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py index a72da02bb0f..5708b236605 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py @@ -29,7 +29,7 @@ def __init__(self): if not self.cpuPower: self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}") - result = self.__getBatchSystemPlugin() + result = self._getBatchSystemPlugin() if result["OK"]: self.batchPlugin = result["Value"] else: @@ -126,7 +126,7 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1): self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.02f}") return S_OK(cpuWorkLeft) - def __getBatchSystemPlugin(self): + def _getBatchSystemPlugin(self): """Using the name of the batch system plugin, will return an instance of the plugin class.""" batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem") type = batchSystemInfo.get("Type") diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TorqueResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TorqueResourceUsage.py index c17d121d0cc..4064f3ab5eb 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TorqueResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TorqueResourceUsage.py @@ -15,9 +15,9 @@ class TorqueResourceUsage(ResourceUsage): This is the PBS plugin of the TimeLeft Utility """ - def __init__(self): + def __init__(self, jobID, parameters): """Standard constructor""" - super().__init__("Torque", "PBS_JOBID") + super().__init__("Torque", jobID, parameters) if self.binary_path and self.binary_path != "Unknown": os.environ["PATH"] += ":" + self.binary_path diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py index 3b03a6ff3a6..51852a4057b 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_LSFResourceUsage.py @@ -45,9 +45,8 @@ def test_init(mocker, runCommandResult, lsbHosts, cpuLimitExpected, wallClockLim mocker.patch( "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult ) - mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts}) - lsfResourceUsage = LSFResourceUsage() + lsfResourceUsage = LSFResourceUsage("1234", {"Host": lsbHosts, "InfoPath": "Unknown"}) assert lsfResourceUsage.cpuLimit == cpuLimitExpected assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected @@ -85,12 +84,11 @@ def test_init_cern( "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult ) mocker.patch("os.path.isfile", return_value=True) - mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts, "LSF_ENVDIR": lsfEnvdir}) mocker.patch( "DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.sourceEnv", return_value=sourceEnvResult ) - lsfResourceUsage = LSFResourceUsage() + lsfResourceUsage = LSFResourceUsage(1234, {"Host": lsbHosts, "InfoPath": lsfEnvdir}) assert lsfResourceUsage.cpuLimit == cpuLimitExpected assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py index 806849fbdaf..344da7c0e6c 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_PBSResourceUsage.py @@ -92,10 +92,8 @@ def test_getResourcUsage( side_effect=runCommandResult, ) mocker.patch("os.path.isfile", return_value=True) - mocker.patch.dict(os.environ, {"PBS_O_QUEUE": "lhcb", "PBS_O_QPATH": "/some/path"}) - pbsRU = TorqueResourceUsage() - pbsRU.jobID = "55755440.seer.t1.grid.kiae.ru" + pbsRU = TorqueResourceUsage("55755440.seer.t1.grid.kiae.ru", {"Queue": "lhcb", "BinaryPath": "/some/path"}) res = pbsRU.getResourceUsage() assert res["OK"], res["Message"] assert len(res["Value"]) == 4 diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py index 81ff9da5ab0..4a0d92cfe60 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_SGEResourceUsage.py @@ -50,6 +50,6 @@ def test_init(mocker, runCommandResult, cpuLimitExpected, wallClockLimitExpected "DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage.runCommand", side_effect=runCommandResult ) - sgeResourceUsage = SGEResourceUsage() + sgeResourceUsage = SGEResourceUsage("1234", {"Queue": "Test"}) res = sgeResourceUsage.getResourceUsage() assert not res["OK"], res["Message"] diff --git a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py index 836e6336327..5c3645ed809 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/test/Test_TimeLeft.py @@ -66,10 +66,10 @@ @pytest.mark.parametrize( - "batch, requiredVariables, returnValue, expected", + "batch, parameters, returnValue, expected", [ ("LSF", {}, LSF_OUT, 0.0), - ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0}, LSF_OUT, 0.0), + ("LSF", {"BinaryPath": "/usr/bin"}, LSF_OUT, 0.0), ("MJF", {}, MJF_OUT, 0.0), ("SGE", {}, SGE_OUT, 300.0), ("SLURM", {}, SLURM_OUT_0, 432000.0), @@ -82,35 +82,24 @@ ("HTCondor", {}, HTCONDOR_OUT_2, 0.0), ], ) -def test_getScaledCPU(mocker, batch, requiredVariables, returnValue, expected): +def test_getScaledCPU(mocker, batch, parameters, returnValue, expected): """Test getScaledCPU()""" mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue)) + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": batch, "JobID": "12345", "Parameters": parameters}) tl = TimeLeft() res = tl.getScaledCPU() assert res == 0 tl.cpuPower = 5.0 - batchSystemName = f"{batch}ResourceUsage" - batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - batchPlugin = __import__(batchSystemPath, globals(), locals(), [batchSystemName]) # pylint: disable=unused-variable - # Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time - reload(batchPlugin) - - batchStr = f"batchPlugin.{batchSystemName}()" - tl.batchPlugin = eval(batchStr) - - # Update attributes of the batch systems to get scaled CPU - tl.batchPlugin.__dict__.update(requiredVariables) - res = tl.getScaledCPU() assert res == expected @pytest.mark.parametrize( - "batch, requiredVariables, returnValue, expected_1, expected_2", + "batch, parameters, returnValue, expected_1, expected_2", [ - ("LSF", {"bin": "/usr/bin", "hostNorm": 10.0, "cpuLimit": 1000, "wallClockLimit": 1000}, LSF_OUT, True, 9400.0), + ("LSF", {"BinaryPath": "/usr/bin"}, LSF_OUT, True, 9400.0), ("SGE", {}, SGE_OUT, True, 9400.0), ("SLURM", {}, SLURM_OUT_0, True, 72000.0), ("SLURM", {}, SLURM_OUT_1, True, 3528000.0), @@ -122,24 +111,14 @@ def test_getScaledCPU(mocker, batch, requiredVariables, returnValue, expected): ("HTCondor", {}, HTCONDOR_OUT_2, False, 0.0), ], ) -def test_getTimeLeft(mocker, batch, requiredVariables, returnValue, expected_1, expected_2): +def test_getTimeLeft(mocker, batch, parameters, returnValue, expected_1, expected_2): """Test getTimeLeft()""" mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue)) - tl = TimeLeft() + mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": batch, "JobID": "12345", "Parameters": parameters}) - batchSystemName = f"{batch}ResourceUsage" - batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}" - batchPlugin = __import__(batchSystemPath, globals(), locals(), [batchSystemName]) - # Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time - reload(batchPlugin) - - batchStr = f"batchPlugin.{batchSystemName}()" - tl.batchPlugin = eval(batchStr) + tl = TimeLeft() tl.cpuPower = 10.0 - # Update attributes of the batch systems to get scaled CPU - tl.batchPlugin.__dict__.update(requiredVariables) - res = tl.getTimeLeft() assert res["OK"] is expected_1 if res["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py index a568bc957d3..dc40d69cc6a 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py @@ -285,6 +285,10 @@ def execute(self): if queue: self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False) + batchSystem = gConfig.getValue("/LocalSite/BatchSystem/Type", "") + if batchSystem: + self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False) + self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") result = self._submitJob( jobID=jobID,