Skip to content

Commit

Permalink
feat: add batchSystem type in jobParameters
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Nov 10, 2023
1 parent 5ef7638 commit 39e1017
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(self, jobID, parameters):
if not self.normRef:
# Now parse LSF configuration files
if not os.path.isfile("./lsf.sh"):
os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh")
os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh")
# As the variables are not exported, we must force it
ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"])
if ret["OK"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class MJFResourceUsage(ResourceUsage):
"""

#############################################################################
def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("MJF", "JOB_ID")
super().__init__("MJF", jobID, parameters)

self.queue = os.environ.get("QUEUE")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ def __init__(self, batchSystemName, jobID, parameters):
self.jobID = jobID

# Parameters
self.binary_path = parameters["BinaryPath"]
self.info_path = parameters["InfoPath"]
self.host = parameters["Host"]
self.queue = parameters["Queue"]
self.binary_path = parameters.get("BinaryPath")
self.info_path = parameters.get("InfoPath")
self.host = parameters.get("Host")
self.queue = parameters.get("Queue")

def getResourceUsage(self):
"""Returns S_OK with a dictionary that can contain entries:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self):
if not self.cpuPower:
self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")

result = self.__getBatchSystemPlugin()
result = self._getBatchSystemPlugin()
if result["OK"]:
self.batchPlugin = result["Value"]
else:
Expand Down Expand Up @@ -126,7 +126,7 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.02f}")
return S_OK(cpuWorkLeft)

def __getBatchSystemPlugin(self):
def _getBatchSystemPlugin(self):
"""Using the name of the batch system plugin, will return an instance of the plugin class."""
batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem")
type = batchSystemInfo.get("Type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class TorqueResourceUsage(ResourceUsage):
This is the PBS plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("Torque", "PBS_JOBID")
super().__init__("Torque", jobID, parameters)

if self.binary_path and self.binary_path != "Unknown":
os.environ["PATH"] += ":" + self.binary_path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ def test_init(mocker, runCommandResult, lsbHosts, cpuLimitExpected, wallClockLim
mocker.patch(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult
)
mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts})

lsfResourceUsage = LSFResourceUsage()
lsfResourceUsage = LSFResourceUsage("1234", {"Host": lsbHosts, "InfoPath": "Unknown"})
assert lsfResourceUsage.cpuLimit == cpuLimitExpected
assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected

Expand Down Expand Up @@ -85,12 +84,11 @@ def test_init_cern(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.runCommand", side_effect=runCommandResult
)
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.dict(os.environ, {"LSB_HOSTS": lsbHosts, "LSF_ENVDIR": lsfEnvdir})
mocker.patch(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.LSFResourceUsage.sourceEnv", return_value=sourceEnvResult
)

lsfResourceUsage = LSFResourceUsage()
lsfResourceUsage = LSFResourceUsage(1234, {"Host": lsbHosts, "InfoPath": lsfEnvdir})

assert lsfResourceUsage.cpuLimit == cpuLimitExpected
assert lsfResourceUsage.wallClockLimit == wallClockLimitExpected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ def test_getResourcUsage(
side_effect=runCommandResult,
)
mocker.patch("os.path.isfile", return_value=True)
mocker.patch.dict(os.environ, {"PBS_O_QUEUE": "lhcb", "PBS_O_QPATH": "/some/path"})

pbsRU = TorqueResourceUsage()
pbsRU.jobID = "55755440.seer.t1.grid.kiae.ru"
pbsRU = TorqueResourceUsage("55755440.seer.t1.grid.kiae.ru", {"Queue": "lhcb", "BinaryPath": "/some/path"})
res = pbsRU.getResourceUsage()
assert res["OK"], res["Message"]
assert len(res["Value"]) == 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ def test_init(mocker, runCommandResult, cpuLimitExpected, wallClockLimitExpected
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.SGEResourceUsage.runCommand", side_effect=runCommandResult
)

sgeResourceUsage = SGEResourceUsage()
sgeResourceUsage = SGEResourceUsage("1234", {"Queue": "Test"})
res = sgeResourceUsage.getResourceUsage()
assert not res["OK"], res["Message"]
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@


@pytest.mark.parametrize(
"batch, requiredVariables, returnValue, expected",
"batch, parameters, returnValue, expected",
[
("LSF", {}, LSF_OUT, 0.0),
("LSF", {"bin": "/usr/bin", "hostNorm": 10.0}, LSF_OUT, 0.0),
("LSF", {"BinaryPath": "/usr/bin"}, LSF_OUT, 0.0),
("MJF", {}, MJF_OUT, 0.0),
("SGE", {}, SGE_OUT, 300.0),
("SLURM", {}, SLURM_OUT_0, 432000.0),
Expand All @@ -82,35 +82,24 @@
("HTCondor", {}, HTCONDOR_OUT_2, 0.0),
],
)
def test_getScaledCPU(mocker, batch, requiredVariables, returnValue, expected):
def test_getScaledCPU(mocker, batch, parameters, returnValue, expected):
"""Test getScaledCPU()"""
mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue))
mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": batch, "JobID": "12345", "Parameters": parameters})
tl = TimeLeft()
res = tl.getScaledCPU()
assert res == 0

tl.cpuPower = 5.0

batchSystemName = f"{batch}ResourceUsage"
batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}"
batchPlugin = __import__(batchSystemPath, globals(), locals(), [batchSystemName]) # pylint: disable=unused-variable
# Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time
reload(batchPlugin)

batchStr = f"batchPlugin.{batchSystemName}()"
tl.batchPlugin = eval(batchStr)

# Update attributes of the batch systems to get scaled CPU
tl.batchPlugin.__dict__.update(requiredVariables)

res = tl.getScaledCPU()
assert res == expected


@pytest.mark.parametrize(
"batch, requiredVariables, returnValue, expected_1, expected_2",
"batch, parameters, returnValue, expected_1, expected_2",
[
("LSF", {"bin": "/usr/bin", "hostNorm": 10.0, "cpuLimit": 1000, "wallClockLimit": 1000}, LSF_OUT, True, 9400.0),
("LSF", {"BinaryPath": "/usr/bin"}, LSF_OUT, True, 9400.0),
("SGE", {}, SGE_OUT, True, 9400.0),
("SLURM", {}, SLURM_OUT_0, True, 72000.0),
("SLURM", {}, SLURM_OUT_1, True, 3528000.0),
Expand All @@ -122,24 +111,14 @@ def test_getScaledCPU(mocker, batch, requiredVariables, returnValue, expected):
("HTCondor", {}, HTCONDOR_OUT_2, False, 0.0),
],
)
def test_getTimeLeft(mocker, batch, requiredVariables, returnValue, expected_1, expected_2):
def test_getTimeLeft(mocker, batch, parameters, returnValue, expected_1, expected_2):
"""Test getTimeLeft()"""
mocker.patch("DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.runCommand", return_value=S_OK(returnValue))
tl = TimeLeft()
mocker.patch("DIRAC.gConfig.getSections", return_value={"Type": batch, "JobID": "12345", "Parameters": parameters})

batchSystemName = f"{batch}ResourceUsage"
batchSystemPath = f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}"
batchPlugin = __import__(batchSystemPath, globals(), locals(), [batchSystemName])
# Need to be reloaded to update the mock within the module, else, it will reuse the one when loaded the first time
reload(batchPlugin)

batchStr = f"batchPlugin.{batchSystemName}()"
tl.batchPlugin = eval(batchStr)
tl = TimeLeft()
tl.cpuPower = 10.0

# Update attributes of the batch systems to get scaled CPU
tl.batchPlugin.__dict__.update(requiredVariables)

res = tl.getTimeLeft()
assert res["OK"] is expected_1
if res["OK"]:
Expand Down
4 changes: 4 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ def execute(self):
if queue:
self.jobReport.setJobParameter(par_name="CEQueue", par_value=queue, sendFlag=False)

batchSystem = gConfig.getValue("/LocalSite/BatchSystem/Type", "")
if batchSystem:
self.jobReport.setJobParameter(par_name="BatchSystem", par_value=batchSystem, sendFlag=False)

self.log.debug(f"Before self._submitJob() ({self.ceName}CE)")
result = self._submitJob(
jobID=jobID,
Expand Down

0 comments on commit 39e1017

Please sign in to comment.