Skip to content

Commit

Permalink
fix: HTCondor tests + a bit of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jun 23, 2023
1 parent 1ab913c commit 0ab51ff
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 107 deletions.
11 changes: 6 additions & 5 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
on_exit_hold = ExitCode != 0
# A random subcode to identify who put the job on hold
on_exit_hold_subcode = %(holdReasonSubcode)s
# Jobs are then deleted from the system after N days
period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
# Jobs are then deleted from the system after N days if they are not running
period_remove = (JobStatus != 2) && (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
# Specific options
# ----------------
Expand Down Expand Up @@ -105,9 +105,10 @@ def parseCondorStatus(lines, jobID):
# A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode

# By default, a held (5) job is defined as Aborted, but there might be some exceptions
if status == 5:

# By default, a held (5) job is defined as Aborted, but there might be some exceptions
status = 3
try:
holdReasonCode = int(l[2])
holdReasonSubcode = int(l[3])
Expand All @@ -124,7 +125,7 @@ def parseCondorStatus(lines, jobID):
if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
if holdReasonCode == 16:
elif holdReasonCode == 16:
status = 1

return (STATES_MAP.get(status, "Unknown"), holdReason)
Expand Down
159 changes: 79 additions & 80 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,50 +71,6 @@
DEFAULT_DAYSTOKEEPLOGS = 15


def logDir(ceName, stamp):
"""Return path to log and output files for pilot.
:param str ceName: Name of the CE
:param str stamp: pilot stamp from/for jobRef
"""
return os.path.join(ceName, stamp[0], stamp[1:3])


def condorIDAndPathToResultFromJobRef(jobRef):
"""Extract tuple of jobURL and jobID from the jobRef string.
The condorID as well as the path leading to the job results are also extracted from the jobID.
:param str jobRef: PilotJobReference of the following form: ``htcondorce://<ceName>/<condorID>:::<pilotStamp>``
:return: tuple composed of the jobURL, the path to the job results and the condorID of the given jobRef
"""
splits = jobRef.split(":::")
jobURL = splits[0]
stamp = splits[1] if len(splits) > 1 else ""
_, _, ceName, condorID = jobURL.split("/")

# Reconstruct the path leading to the result (log, output)
# Construction of the path can be found in submitJob()
pathToResult = logDir(ceName, stamp) if len(stamp) >= 3 else ""

return jobURL, pathToResult, condorID


def findFile(workingDir, fileName, pathToResult):
"""Find a file in a file system.
:param str workingDir: the name of the directory containing the given file to search for
:param str fileName: the name of the file to find
:param str pathToResult: the path to follow from workingDir to find the file
:return: path leading to the file
"""
path = os.path.join(workingDir, pathToResult, fileName)
if os.path.exists(path):
return S_OK(path)
return S_ERROR(errno.ENOENT, f"Could not find {path}")


class HTCondorCEComputingElement(ComputingElement):
"""HTCondorCE computing element class
implementing the functions jobSubmit, getJobOutput
Expand Down Expand Up @@ -150,6 +106,45 @@ def __init__(self, ceUniqueID):
self.tokenFile = None

#############################################################################

def _DiracToCondorID(self, diracJobID):
"""Convert a DIRAC jobID into an Condor jobID.
Example: https://<ce>/1234/0 becomes 1234.0
:param str: DIRAC jobID
:return: Condor jobID
"""
# Remove CE and protocol information from arc Job ID
if "://" in diracJobID:
condorJobID = diracJobID.split("/")[-1]
return condorJobID
return diracJobID

def _condorToDiracID(self, condorJobIDs):
"""Get the references from the condor_submit output.
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
:param str condorJobIDs: the output of condor_submit
:return: job references such as htcondorce://<CE name>/<clusterID>.<i>
"""
clusterIDs = condorJobIDs.split("-")
if len(clusterIDs) != 2:
return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}")
clusterIDs = [clu.strip() for clu in clusterIDs]
self.log.verbose("Cluster IDs parsed:", clusterIDs)
try:
clusterID = clusterIDs[0].split(".")[0]
numJobs = clusterIDs[1].split(".")[1]
except IndexError:
return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}")

cePrefix = f"htcondorce://{self.ceName}/"
jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)]
return S_OK(jobReferences)

#############################################################################

def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=None):
"""Create the Sub File for submission.
Expand Down Expand Up @@ -288,7 +283,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
jobStamps.append(jobStamp)

# We randomize the location of the pilot output and log, because there are just too many of them
location = logDir(self.ceName, commonJobStampPart)
location = os.path.join(self.ceName, commonJobStampPart[0], commonJobStampPart[1:3])
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
if self.token:
self.tokenFile = tempfile.NamedTemporaryFile(
Expand All @@ -311,7 +306,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
return result

stdout = result["Value"]
pilotJobReferences = self.__getPilotReferences(stdout)
pilotJobReferences = self._condorToDiracID(stdout)
if not pilotJobReferences["OK"]:
return pilotJobReferences
pilotJobReferences = pilotJobReferences["Value"]
Expand Down Expand Up @@ -339,15 +334,15 @@ def killJob(self, jobIDList):
self.log.verbose("KillJob jobIDList", jobIDList)
self.tokenFile = None

for jobRef in jobIDList:
job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
self.log.verbose("Killing pilot", job)
for diracJobID in jobIDList:
condorJobID = self._DiracToCondorID(diracJobID.split(":::")[0])
self.log.verbose("Killing pilot", diracJobID)
cmd = ["condor_rm"]
cmd.extend(self.remoteScheddOptions.strip().split(" "))
cmd.append(jobID)
cmd.append(condorJobID)
result = self._executeCondorCommand(cmd, keepTokenFile=True)
if not result["OK"]:
self.log.error("Failed to kill pilot", f"{job}: {result['Message']}")
self.log.error("Failed to kill pilot", f"{diracJobID}: {result['Message']}")
return result

self.tokenFile = None
Expand Down Expand Up @@ -396,9 +391,10 @@ def getJobStatus(self, jobIDList):
resultDict = {}
condorIDs = {}
# Get all condorIDs so we can just call condor_q and condor_history once
for jobRef in jobIDList:
job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
condorIDs[job] = jobID
for diracJobID in jobIDList:
diracJobID = diracJobID.split(":::")[0]
condorJobID = self._DiracToCondorID(diracJobID)
condorIDs[diracJobID] = condorJobID

self.tokenFile = None

Expand Down Expand Up @@ -471,13 +467,39 @@ def getJobOutput(self, jobID):

return S_OK((result["Value"]["output"], result["Value"]["error"]))

def findFile(self, workingDir, fileName, pathToResult):
"""Find a file in a file system.
:param str workingDir: the name of the directory containing the given file to search for
:param str fileName: the name of the file to find
:param str pathToResult: the path to follow from workingDir to find the file
:return: path leading to the file
"""
path = os.path.join(workingDir, pathToResult, fileName)
if os.path.exists(path):
return S_OK(path)
return S_ERROR(errno.ENOENT, f"Could not find {path}")

def __getJobOutput(self, jobID, outTypes):
"""Get job outputs: output, error and logging files from HTCondor
:param str jobID: job identifier
:param list outTypes: output types targeted (output, error and/or logging)
"""
_job, pathToResult, condorID = condorIDAndPathToResultFromJobRef(jobID)
# Extract stamp from the Job ID
if ":::" in jobID:
diracJobID, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")

# Reconstruct the path leading to the result (log, output)
# Construction of the path can be found in submitJob()
if len(stamp) < 3:
return S_ERROR(f"Stamp is not long enough: {stamp}")
pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3])

condorJobID = self._DiracToCondorID(diracJobID)
iwd = os.path.join(self.workingDirectory, pathToResult)

try:
Expand All @@ -488,7 +510,7 @@ def __getJobOutput(self, jobID, outTypes):
return S_ERROR(e.errno, f"{errorMessage} ({iwd})")

if not self.useLocalSchedd:
cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:9619", "-name", self.ceName, condorID]
cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:9619", "-name", self.ceName, condorJobID]
result = self._executeCondorCommand(cmd)

# Getting 'logging' without 'error' and 'output' is possible but will generate command errors
Expand All @@ -500,7 +522,7 @@ def __getJobOutput(self, jobID, outTypes):
outputsSuffix = {"output": "out", "error": "err", "logging": "log"}
outputs = {}
for output, suffix in outputsSuffix.items():
resOut = findFile(self.workingDirectory, f"{condorID}.{suffix}", pathToResult)
resOut = self._findFile(self.workingDirectory, f"{condorJobID}.{suffix}", pathToResult)
if not resOut["OK"]:
# Return an error if the output type was targeted, else we continue
if output in outTypes:
Expand All @@ -523,29 +545,6 @@ def __getJobOutput(self, jobID, outTypes):

return S_OK(outputs)

def __getPilotReferences(self, jobString):
"""Get the references from the condor_submit output.
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
:param str jobString: the output of condor_submit
:return: job references such as htcondorce://<CE name>/<path to result>-<clusterID>.<i>
"""
self.log.verbose("getPilotReferences:", jobString)
clusterIDs = jobString.split("-")
if len(clusterIDs) != 2:
return S_ERROR(f"Something wrong with the condor_submit output: {jobString}")
clusterIDs = [clu.strip() for clu in clusterIDs]
self.log.verbose("Cluster IDs parsed:", clusterIDs)
try:
clusterID = clusterIDs[0].split(".")[0]
numJobs = clusterIDs[1].split(".")[1]
except IndexError:
return S_ERROR(f"Something wrong with the condor_submit output: {jobString}")
cePrefix = f"htcondorce://{self.ceName}/"
jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)]
return S_OK(jobReferences)

def __cleanup(self):
"""Clean the working directory of old jobs"""
if not HTCondorCEComputingElement._cleanupLock.acquire(False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement"

STATUS_LINES = """
123.2 5
123.1 3
123.2 5 4 0 undefined
123.1 3 undefined undefined undefined
""".strip().split(
"\n"
)

HISTORY_LINES = """
123 0 4
123.0 4 undefined undefined undefined
""".strip().split(
"\n"
)
Expand All @@ -31,31 +31,43 @@ def setUp():


def test_parseCondorStatus():
statusLines = """
104097.9 2
104098.0 1
104098.1 4
statusLines = f"""
104098.1 1 undefined undefined undefined
104098.2 2 undefined undefined undefined
104098.3 3 undefined undefined undefined
104098.4 4 undefined undefined undefined
104098.5 5 16 57 Input data are being spooled
104098.6 5 3 {Condor.HOLD_REASON_SUBCODE} Policy
104098.7 5 1 0 undefined
foo bar
104098.2 3
104098.3 5
104098.4 7
104096.1 3 16 test test
104096.2 3 test
104096.3 5 undefined undefined undefined
104096.4 7
""".strip().split(
"\n"
)
# force there to be an empty line

expectedResults = {
"104097.9": "Running",
"104098.0": "Waiting",
"104098.1": "Done",
"104098.2": "Aborted",
"104098.3": "HELD",
"104098.4": "Unknown",
"104098.1": "Waiting",
"104098.2": "Running",
"104098.3": "Aborted",
"104098.4": "Done",
"104098.5": "Waiting",
"104098.6": "Failed",
"104098.7": "Aborted",
"foo": "Unknown",
"104096.1": "Aborted",
"104096.2": "Aborted",
"104096.3": "Unknown",
"104096.4": "Unknown",
}

for jobID, expected in expectedResults.items():
assert HTCE.parseCondorStatus(statusLines, jobID) == expected
print(jobID, expected)
assert HTCE.parseCondorStatus(statusLines, jobID)[0] == expected


def test_getJobStatus(mocker):
Expand All @@ -66,6 +78,7 @@ def test_getJobStatus(mocker):
S_OK((0, "\n".join(STATUS_LINES), "")),
S_OK((0, "\n".join(HISTORY_LINES), "")),
S_OK((0, "", "")),
S_OK((0, "", "")),
],
)
mocker.patch(MODNAME + ".HTCondorCEComputingElement._HTCondorCEComputingElement__cleanup")
Expand All @@ -86,7 +99,6 @@ def test_getJobStatus(mocker):
"htcondorce://condorce.foo.arg/123.2": "Aborted",
"htcondorce://condorce.foo.arg/333.3": "Unknown",
}

assert ret["OK"] is True
assert expectedResults == ret["Value"]

Expand All @@ -102,7 +114,7 @@ def test_getJobStatusBatchSystem(mocker):
expectedResults = {
"123.0": "Done",
"123.1": "Aborted",
"123.2": "Unknown", # HELD is treated as Unknown
"123.2": "Aborted",
"333.3": "Unknown",
}

Expand All @@ -113,8 +125,8 @@ def test_getJobStatusBatchSystem(mocker):
@pytest.mark.parametrize(
"localSchedd, optionsNotExpected, optionsExpected",
[
(False, ["ShouldTransferFiles = YES", "WhenToTransferOutput = ON_EXIT_OR_EVICT"], ["universe = vanilla"]),
(True, [], ["ShouldTransferFiles = YES", "WhenToTransferOutput = ON_EXIT_OR_EVICT", "universe = grid"]),
(False, ["grid_resources = "], ["universe = vanilla"]),
(True, [], ["universe = grid"]),
],
)
def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected):
Expand All @@ -132,7 +144,7 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected):
jobStamp = commonJobStampPart + uuid.uuid4().hex[:29]
jobStamps.append(jobStamp)

htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1, jobStamps) # pylint: disable=E1101
htce._HTCondorCEComputingElement__writeSub("dirac-install", "", 1, jobStamps) # pylint: disable=E1101
for option in optionsNotExpected:
# the three [0] are: call_args_list[firstCall][ArgsArgumentsTuple][FirstArgsArgument]
assert option not in subFileMock.write.call_args_list[0][0][0]
Expand Down

0 comments on commit 0ab51ff

Please sign in to comment.