Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] fix: getting batch system info from local cfg #7289

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ class HTCondorResourceUsage(ResourceUsage):
allow us to get an estimation of the resources usage.
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("HTCondor", "_CONDOR_JOB_AD")
super().__init__("HTCondor", jobID, parameters)

def getResourceUsage(self):
"""Returns S_OK with a dictionary containing the entries WallClock, WallClockLimit, and Unit for current slot."""
Expand All @@ -29,8 +29,7 @@ def getResourceUsage(self):
# only present on some Sites
# - CurrentTime: current time
# - JobCurrentStartDate: start of the job execution
jobDescription = os.environ.get("_CONDOR_JOB_AD")
cmd = f"condor_status -ads {jobDescription} -af MaxRuntime CurrentTime-JobCurrentStartDate"
cmd = f"condor_status -ads {self.info_path} -af MaxRuntime CurrentTime-JobCurrentStartDate"
result = runCommand(cmd)
if not result["OK"]:
return S_ERROR("Current batch system is not supported")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ class LSFResourceUsage(ResourceUsage):
This is the LSF plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("LSF", "LSB_JOBID")
super().__init__("LSF", jobID, parameters)

self.queue = os.environ.get("LSB_QUEUE")
self.bin = os.environ.get("LSF_BINDIR")
self.host = os.environ.get("LSB_HOSTS")
self.year = time.strftime("%Y", time.gmtime())
self.log.verbose(
"LSB_JOBID={}, LSB_QUEUE={}, LSF_BINDIR={}, LSB_HOSTS={}".format(
self.jobID, self.queue, self.bin, self.host
self.jobID, self.queue, self.binary_path, self.host
)
)

Expand All @@ -39,7 +36,7 @@ def __init__(self):
self.wallClockLimit = None
self.hostNorm = None

cmd = f"{self.bin}/bqueues -l {self.queue}"
cmd = f"{self.binary_path}/bqueues -l {self.queue}"
result = runCommand(cmd)
if not result["OK"]:
return
Expand Down Expand Up @@ -73,7 +70,7 @@ def __init__(self):
# Now try to get the CPU_FACTOR for this reference CPU,
# it must be either a Model, a Host or the largest Model

cmd = f"{self.bin}/lshosts -w {self.cpuRef}"
cmd = f"{self.binary_path}/lshosts -w {self.cpuRef}"
result = runCommand(cmd)
if result["OK"]:
# At CERN this command will return an error since there is no host defined
Expand All @@ -97,7 +94,7 @@ def __init__(self):

if not self.normRef:
# Try if there is a model define with the name of cpuRef
cmd = f"{self.bin}/lsinfo -m"
cmd = f"{self.binary_path}/lsinfo -m"
result = runCommand(cmd)
if result["OK"]:
lines = str(result["Value"]).split("\n")
Expand All @@ -120,7 +117,7 @@ def __init__(self):
if not self.normRef:
# Now parse LSF configuration files
if not os.path.isfile("./lsf.sh"):
os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh")
os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh")
# As the variables are not exported, we must force it
ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"])
if ret["OK"]:
Expand Down Expand Up @@ -170,7 +167,7 @@ def __init__(self):

# Now get the Normalization for the current Host
if self.host:
cmd = f"{self.bin}/lshosts -w {self.host}"
cmd = f"{self.binary_path}/lshosts -w {self.host}"
result = runCommand(cmd)
if result["OK"]:
lines = str(result["Value"]).split("\n")
Expand Down Expand Up @@ -201,15 +198,15 @@ def getResourceUsage(self):
"""Returns S_OK with a dictionary containing the entries CPU, CPULimit,
WallClock, WallClockLimit, and Unit for current slot.
"""
if not self.bin:
if not self.binary_path:
return S_ERROR("Could not determine bin directory for LSF")
if not self.hostNorm:
return S_ERROR("Could not determine host Norm factor")

cpu = None
wallClock = None

cmd = f"{self.bin}/bjobs -W {self.jobID}"
cmd = f"{self.binary_path}/bjobs -W {self.jobID}"
result = runCommand(cmd)
if not result["OK"]:
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ class MJFResourceUsage(ResourceUsage):
"""

#############################################################################
def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("MJF", "JOB_ID")

self.queue = os.environ.get("QUEUE")
super().__init__("MJF", jobID, parameters)

self.log.verbose(f"jobID={self.jobID}, queue={self.queue}")
self.startTime = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ class PBSResourceUsage(ResourceUsage):
This is the PBS plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("PBS", "PBS_JOBID")
super().__init__("PBS", jobID, parameters)

self.queue = os.environ.get("PBS_O_QUEUE")
pbsPath = os.environ.get("PBS_O_PATH")
if pbsPath:
os.environ["PATH"] += ":" + pbsPath
if self.binary_path and self.binary_path != "Unknown":
os.environ["PATH"] += ":" + self.binary_path

self.log.verbose(f"PBS_JOBID={self.jobID}, PBS_O_QUEUE={self.queue}")
self.startTime = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ class ResourceUsage:
(e.g. getting the time left in a Pilot)
"""

def __init__(self, batchSystemName, jobIdEnvVar):
def __init__(self, batchSystemName, jobID, parameters):
"""Standard constructor"""
self.log = gLogger.getSubLogger(f"{batchSystemName}ResourceUsage")
self.jobID = os.environ.get(jobIdEnvVar)
self.jobID = jobID

# Parameters
self.binary_path = parameters.get("BinaryPath")
self.info_path = parameters.get("InfoPath")
self.host = parameters.get("Host")
self.queue = parameters.get("Queue")

def getResourceUsage(self):
"""Returns S_OK with a dictionary that can contain entries:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ class SGEResourceUsage(ResourceUsage):
This is the SGE plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("SGE", "JOB_ID")
super().__init__("SGE", jobID, parameters)

self.queue = os.environ.get("QUEUE")
sgePath = os.environ.get("SGE_BINARY_PATH")
if sgePath:
os.environ["PATH"] += ":" + sgePath
if self.binary_path and self.binary_path != "Unknown":
os.environ["PATH"] += ":" + self.binary_path

self.log.verbose(f"JOB_ID={self.jobID}, QUEUE={self.queue}")
self.startTime = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class SLURMResourceUsage(ResourceUsage):
This is the SLURM plugin of the TimeLeft Utility
"""

def __init__(self):
def __init__(self, jobID, parameters):
"""Standard constructor"""
super().__init__("SLURM", "SLURM_JOB_ID")
super().__init__("SLURM", jobID, parameters)

self.log.verbose(f"JOB_ID={self.jobID}")

Expand Down
111 changes: 63 additions & 48 deletions src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import DIRAC

from DIRAC import gLogger, gConfig, S_OK, S_ERROR
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.Utilities.Subprocess import systemCall


Expand All @@ -29,7 +31,7 @@ def __init__(self):
if not self.cpuPower:
self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")

result = self.__getBatchSystemPlugin()
result = self._getBatchSystemPlugin()
if result["OK"]:
self.batchPlugin = result["Value"]
else:
Expand Down Expand Up @@ -65,7 +67,9 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
"""
# Quit if no norm factor available
if not self.cpuPower:
return S_ERROR(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")
return S_ERROR(
DErrno.ESECTION, f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}"
)

if not self.batchPlugin:
return S_ERROR(self.batchError)
Expand Down Expand Up @@ -126,54 +130,65 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.02f}")
return S_OK(cpuWorkLeft)

def __getBatchSystemPlugin(self):
def _getBatchSystemPlugin(self):
"""Using the name of the batch system plugin, will return an instance of the plugin class."""
batchSystems = {
"LSF": "LSB_JOBID",
"PBS": "PBS_JOBID",
"BQS": "QSUB_REQNAME",
"SGE": "SGE_TASK_ID",
"SLURM": "SLURM_JOB_ID",
"HTCondor": "_CONDOR_JOB_AD",
} # more to be added later
name = None
for batchSystem, envVar in batchSystems.items():
if envVar in os.environ:
name = batchSystem
break

if name is None and "MACHINEFEATURES" in os.environ and "JOBFEATURES" in os.environ:
# Only use MJF if legacy batch system information not available for now
name = "MJF"

if name is None:
self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported")
return S_ERROR("Current batch system is not supported")

self.log.debug(f"Creating plugin for {name} batch system")
try:
batchSystemName = f"{name}ResourceUsage"
batchPlugin = __import__(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.%s"
% batchSystemName, # pylint: disable=unused-variable
globals(),
locals(),
[batchSystemName],
batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem")
type = batchSystemInfo.get("Type")
jobID = batchSystemInfo.get("JobID")
parameters = batchSystemInfo.get("Parameters")

###########################################################################################
# TODO: remove the following block in v9.0
# This is a temporary fix for the case where the batch system is not set in the configuration
aldbr marked this conversation as resolved.
Show resolved Hide resolved
if not type:
self.log.warn(
"Batch system info not set in local configuration: trying to guess from environment variables."
)
except ImportError as x:
msg = f"Could not import DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}"
self.log.warn(x)
self.log.warn(msg)
return S_ERROR(msg)

try:
batchStr = f"batchPlugin.{batchSystemName}()"
batchInstance = eval(batchStr)
except Exception as x: # pylint: disable=broad-except
msg = f"Could not instantiate {batchSystemName}()"
self.log.warn(x)
self.log.warn(msg)
return S_ERROR(msg)
self.log.warn("Consider updating your pilot version before switching to v9.0.")
batchSystemInfo = {
"LSF": {
"JobID": "LSB_JOBID",
"Parameters": {
"BinaryPath": "LSB_BINDIR",
"Host": "LSB_HOSTS",
"InfoPath": "LSB_ENVDIR",
"Queue": "LSB_QUEUE",
},
},
"PBS": {"JobID": "PBS_JOBID", "Parameters": {"BinaryPath": "PBS_O_PATH", "Queue": "PBS_O_QUEUE"}},
"SGE": {"JobID": "SGE_TASK_ID", "Parameters": {"BinaryPath": "SGE_BINARY_PATH", "Queue": "QUEUE"}},
"SLURM": {"JobID": "SLURM_JOB_ID", "Parameters": {}},
"HTCondor": {"JobID": "HTCONDOR_JOBID", "Parameters": {"InfoPath": "_CONDOR_JOB_AD"}},
}
type = None
for batchSystem, attributes in batchSystemInfo.items():
if attributes["JobID"] in os.environ:
type = batchSystem
jobID = os.environ[attributes["JobID"]]
parameters = {}
for parameterName, parameterVariable in attributes["Parameters"].items():
parameters[parameterName] = os.environ.get(parameterVariable)
break

if not type and "MACHINEFEATURES" in os.environ and "JOBFEATURES" in os.environ:
# Only use MJF if legacy batch system information not available for now
type = "MJF"
jobID = os.environ.get("JOB_ID")
parameters = {"Queue": os.environ.get("QUEUE")}

###########################################################################################

if not type or type == "Unknown":
self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported")
return S_ERROR(DErrno.ERESUNK, "Current batch system is not supported")

self.log.debug(f"Creating plugin for {type} batch system")

result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{type}ResourceUsage")
if not result["OK"]:
return result
batchClass = result["Value"]
batchInstance = batchClass(jobID, parameters)

return S_OK(batchInstance)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
""" Test class for SGEResourceUsage utility
"""

import pytest

from DIRAC import S_OK
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage import HTCondorResourceUsage


HTCONDOR_OUT_0 = "86400 3600"
HTCONDOR_OUT_1 = "undefined 3600"
HTCONDOR_OUT_2 = ""


def test_getResourceUsage(mocker):
mocker.patch(
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage.runCommand",
side_effect=[S_OK(HTCONDOR_OUT_0), S_OK(HTCONDOR_OUT_1), S_OK(HTCONDOR_OUT_2)],
)

# First test: everything is fine
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
res = htcondorResourceUsage.getResourceUsage()
assert res["OK"], res["Message"]
assert res["Value"]["WallClock"] == 3600
assert res["Value"]["WallClockLimit"] == 86400

# Second test: MaxRuntime is undefined
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
res = htcondorResourceUsage.getResourceUsage()
assert not res["OK"]
assert res["Message"] == "Current batch system is not supported"

# Third test: empty output
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
res = htcondorResourceUsage.getResourceUsage()
assert not res["OK"]
assert res["Message"] == "Current batch system is not supported"
Loading
Loading