Skip to content

Commit

Permalink
Merge pull request #7357 from aldbr/cherry-pick-2-460fa7ad1-integration
Browse files Browse the repository at this point in the history
[sweep:integration] fix: getting batch system info from local cfg
  • Loading branch information
fstagni authored Dec 8, 2023
2 parents 367c962 + dcbae7e commit 8eb09d9
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class HTCondorResourceUsage(ResourceUsage):
allow us to get an estimation of the resources usage.
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("HTCondor", "_CONDOR_JOB_AD")
super().__init__("HTCondor", jobID, parameters)

def getResourceUsage(self):
"""Returns S_OK with a dictionary containing the entries WallClock, WallClockLimit, and Unit for current slot."""
Expand All @@ -29,8 +29,7 @@ def getResourceUsage(self):
# only present on some Sites
# - CurrentTime: current time
# - JobCurrentStartDate: start of the job execution
jobDescription = os.environ.get("_CONDOR_JOB_AD")
cmd = f"condor_status -ads {jobDescription} -af MaxRuntime CurrentTime-JobCurrentStartDate"
cmd = f"condor_status -ads {self.info_path} -af MaxRuntime CurrentTime-JobCurrentStartDate"
result = runCommand(cmd)
if not result["OK"]:
return S_ERROR("Current batch system is not supported")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ class LSFResourceUsage(ResourceUsage):
This is the LSF plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("LSF", "LSB_JOBID")
super().__init__("LSF", jobID, parameters)

self.queue = os.environ.get("LSB_QUEUE")
self.bin = os.environ.get("LSF_BINDIR")
self.host = os.environ.get("LSB_HOSTS")
self.year = time.strftime("%Y", time.gmtime())
self.log.verbose(
"LSB_JOBID={}, LSB_QUEUE={}, LSF_BINDIR={}, LSB_HOSTS={}".format(
self.jobID, self.queue, self.bin, self.host
self.jobID, self.queue, self.binary_path, self.host
)
)

Expand All @@ -39,7 +36,7 @@ def __init__(self):
self.wallClockLimit = None
self.hostNorm = None

cmd = f"{self.bin}/bqueues -l {self.queue}"
cmd = f"{self.binary_path}/bqueues -l {self.queue}"
result = runCommand(cmd)
if not result["OK"]:
return
Expand Down Expand Up @@ -73,7 +70,7 @@ def __init__(self):
# Now try to get the CPU_FACTOR for this reference CPU,
# it must be either a Model, a Host or the largest Model

cmd = f"{self.bin}/lshosts -w {self.cpuRef}"
cmd = f"{self.binary_path}/lshosts -w {self.cpuRef}"
result = runCommand(cmd)
if result["OK"]:
# At CERN this command will return an error since there is no host defined
Expand All @@ -97,7 +94,7 @@ def __init__(self):

if not self.normRef:
# Try if there is a model define with the name of cpuRef
cmd = f"{self.bin}/lsinfo -m"
cmd = f"{self.binary_path}/lsinfo -m"
result = runCommand(cmd)
if result["OK"]:
lines = str(result["Value"]).split("\n")
Expand All @@ -120,7 +117,7 @@ def __init__(self):
if not self.normRef:
# Now parse LSF configuration files
if not os.path.isfile("./lsf.sh"):
os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh")
os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh")
# As the variables are not exported, we must force it
ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"])
if ret["OK"]:
Expand Down Expand Up @@ -170,7 +167,7 @@ def __init__(self):

# Now get the Normalization for the current Host
if self.host:
cmd = f"{self.bin}/lshosts -w {self.host}"
cmd = f"{self.binary_path}/lshosts -w {self.host}"
result = runCommand(cmd)
if result["OK"]:
lines = str(result["Value"]).split("\n")
Expand Down Expand Up @@ -201,15 +198,15 @@ def getResourceUsage(self):
"""Returns S_OK with a dictionary containing the entries CPU, CPULimit,
WallClock, WallClockLimit, and Unit for current slot.
"""
if not self.bin:
if not self.binary_path:
return S_ERROR("Could not determine bin directory for LSF")
if not self.hostNorm:
return S_ERROR("Could not determine host Norm factor")

cpu = None
wallClock = None

cmd = f"{self.bin}/bjobs -W {self.jobID}"
cmd = f"{self.binary_path}/bjobs -W {self.jobID}"
result = runCommand(cmd)
if not result["OK"]:
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ class PBSResourceUsage(ResourceUsage):
This is the PBS plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("PBS", "PBS_JOBID")
super().__init__("PBS", jobID, parameters)

self.queue = os.environ.get("PBS_O_QUEUE")
pbsPath = os.environ.get("PBS_O_PATH")
if pbsPath:
os.environ["PATH"] += ":" + pbsPath
if self.binary_path and self.binary_path != "Unknown":
os.environ["PATH"] += ":" + self.binary_path

self.log.verbose(f"PBS_JOBID={self.jobID}, PBS_O_QUEUE={self.queue}")
self.startTime = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ class ResourceUsage:
(e.g. getting the time left in a Pilot)
"""

def __init__(self, batchSystemName, jobIdEnvVar):
def __init__(self, batchSystemName, jobID, parameters):
"""Standard constructor"""
self.log = gLogger.getSubLogger(f"{batchSystemName}ResourceUsage")
self.jobID = os.environ.get(jobIdEnvVar)
self.jobID = jobID

# Parameters
self.binary_path = parameters.get("BinaryPath")
self.info_path = parameters.get("InfoPath")
self.host = parameters.get("Host")
self.queue = parameters.get("Queue")

def getResourceUsage(self):
"""Returns S_OK with a dictionary that can contain entries:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ class SGEResourceUsage(ResourceUsage):
This is the SGE plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("SGE", "JOB_ID")
super().__init__("SGE", jobID, parameters)

self.queue = os.environ.get("QUEUE")
sgePath = os.environ.get("SGE_BINARY_PATH")
if sgePath:
os.environ["PATH"] += ":" + sgePath
if self.binary_path and self.binary_path != "Unknown":
os.environ["PATH"] += ":" + self.binary_path

self.log.verbose(f"JOB_ID={self.jobID}, QUEUE={self.queue}")
self.startTime = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class SLURMResourceUsage(ResourceUsage):
This is the SLURM plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("SLURM", "SLURM_JOB_ID")
super().__init__("SLURM", jobID, parameters)

self.log.verbose(f"JOB_ID={self.jobID}")

Expand Down
38 changes: 16 additions & 22 deletions src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import DIRAC

from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Utilities.Subprocess import systemCall

Expand All @@ -30,7 +31,7 @@ def __init__(self):
if not self.cpuPower:
self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")

result = self.__getBatchSystemPlugin()
result = self._getBatchSystemPlugin()
if result["OK"]:
self.batchPlugin = result["Value"]
else:
Expand Down Expand Up @@ -66,7 +67,9 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
"""
# Quit if no norm factor available
if not self.cpuPower:
return S_ERROR(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")
return S_ERROR(
DErrno.ESECTION, f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}"
)

if not self.batchPlugin:
return S_ERROR(self.batchError)
Expand Down Expand Up @@ -127,33 +130,24 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.2f}")
return S_OK(cpuWorkLeft)

def __getBatchSystemPlugin(self):
def _getBatchSystemPlugin(self):
"""Using the name of the batch system plugin, will return an instance of the plugin class."""
batchSystems = {
"LSF": "LSB_JOBID",
"PBS": "PBS_JOBID",
"BQS": "QSUB_REQNAME",
"SGE": "SGE_TASK_ID",
"SLURM": "SLURM_JOB_ID",
"HTCondor": "_CONDOR_JOB_AD",
} # more to be added later
name = None
for batchSystem, envVar in batchSystems.items():
if envVar in os.environ:
name = batchSystem
break

if name is None:
batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem")
type = batchSystemInfo.get("Type")
jobID = batchSystemInfo.get("JobID")
parameters = batchSystemInfo.get("Parameters")

if not type or type == "Unknown":
self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported")
return S_ERROR("Current batch system is not supported")
return S_ERROR(DErrno.ERESUNK, "Current batch system is not supported")

self.log.debug(f"Creating plugin for {name} batch system")
self.log.debug(f"Creating plugin for {type} batch system")

result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{name}ResourceUsage")
result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{type}ResourceUsage")
if not result["OK"]:
return result
batchClass = result["Value"]
batchInstance = batchClass()
batchInstance = batchClass(jobID, parameters)

return S_OK(batchInstance)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
""" Test class for SGEResourceUsage utility
"""

import pytest

from DIRAC import S_OK
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage import HTCondorResourceUsage


HTCONDOR_OUT_0 = "86400 3600"
HTCONDOR_OUT_1 = "undefined 3600"
HTCONDOR_OUT_2 = ""


def test_getResourceUsage(mocker):
mocker.patch(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage.runCommand",
side_effect=[S_OK(HTCONDOR_OUT_0), S_OK(HTCONDOR_OUT_1), S_OK(HTCONDOR_OUT_2)],
)

# First test: everything is fine
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
res = htcondorResourceUsage.getResourceUsage()
assert res["OK"], res["Message"]
assert res["Value"]["WallClock"] == 3600
assert res["Value"]["WallClockLimit"] == 86400

# Second test: MaxRuntime is undefined
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
res = htcondorResourceUsage.getResourceUsage()
assert not res["OK"]
assert res["Message"] == "Current batch system is not supported"

# Third test: empty output
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
res = htcondorResourceUsage.getResourceUsage()
assert not res["OK"]
assert res["Message"] == "Current batch system is not supported"
Loading

0 comments on commit 8eb09d9

Please sign in to comment.