From 1a1415b72a0f3e005ae169534144ce09e7e90631 Mon Sep 17 00:00:00 2001 From: aldbr Date: Tue, 1 Aug 2023 08:47:19 +0200 Subject: [PATCH] fix: minor changes in HTCondor --- .../Computing/BatchSystems/Condor.py | 78 +++++++++++-------- .../Computing/HTCondorCEComputingElement.py | 37 ++++----- .../test/Test_HTCondorCEComputingElement.py | 2 - 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index a545728c25e..c3c02313670 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -45,8 +45,10 @@ error = $(Cluster).$(Process).err log = $(Cluster).$(Process).log -# Transfer all output files, even if the job is failed +# No other files are to be transferred transfer_output_files = "" + +# Transfer outputs, even if the job is failed should_transfer_files = YES when_to_transfer_output = ON_EXIT_OR_EVICT @@ -70,8 +72,8 @@ on_exit_hold = ExitCode != 0 # A random subcode to identify who put the job on hold on_exit_hold_subcode = %(holdReasonSubcode)s -# Jobs are then deleted from the system after N days if they are not running -period_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)) +# Jobs are then deleted from the system after N days if they are not idle or running +periodic_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)) # Specific options # ---------------- @@ -88,7 +90,7 @@ def parseCondorStatus(lines, jobID): """parse the condor_q or condor_history output for the job status - :param lines: list of lines from the output of the condor commands, each line is a pair of jobID, statusID, and holdReasonCode + :param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode :type lines: python:list :param str jobID: jobID of condor job, e.g.: 123.53 :returns: Status as known by DIRAC, and a reason if the job is being held @@ -96,44 +98,52 @@ def parseCondorStatus(lines, jobID): jobID = str(jobID) holdReason = "" + status = None for line in lines: l = line.strip().split() + # Make sure the job ID exists + if len(l) < 1 or l[0] != jobID: + continue + # Make sure the status is present and is an integer try: status = int(l[1]) except (ValueError, IndexError): - continue + break - if l[0] == jobID: - # A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode - # Details in: - # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode - if status == 5: - - # By default, a held (5) job is defined as Aborted, but there might be some exceptions - status = 3 - try: - holdReasonCode = l[2] - holdReasonSubcode = l[3] - holdReason = " ".join(l[4:]) - except IndexError: - # This should not happen in theory - # Just set the status to unknown such as - status = -1 - holdReasonCode = "undefined" - holdReasonSubcode = "undefined" - - # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true) - # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed - if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE: - status = 5 - # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting - elif holdReasonCode == "16": - status = 1 - - return (STATES_MAP.get(status, "Unknown"), holdReason) - return ("Unknown", holdReason) + # Stop here if the status is not held (5): result should be found in STATES_MAP + if status != 5: + break + + # A job can be held for various reasons, + # we need to further investigate with the holdReasonCode & holdReasonSubCode + # Details in: + # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode + + # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions + status = 3 + try: + holdReasonCode = l[2] + holdReasonSubcode = l[3] + holdReason = " ".join(l[4:]) + except IndexError: + # This should not happen in theory + # Just set the status to unknown such as + status = None + holdReasonCode = "undefined" + holdReasonSubcode = "undefined" + break + + # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true) + # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed + if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE: + status = 5 + # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting + elif holdReasonCode == "16": + status = 1 + + return (STATES_MAP.get(status, "Unknown"), holdReason) class Condor(object): diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 187e9d0baf4..21b7a95aa76 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -110,8 +110,8 @@ def __init__(self, ceUniqueID): ############################################################################# def _DiracToCondorID(self, diracJobID): - """Convert a DIRAC jobID into an Condor jobID. - Example: https:///1234/0 becomes 1234.0 + """Convert a DIRAC jobID into a Condor jobID. + Example: htcondorce:///1234.0 becomes 1234.0 :param str: DIRAC jobID :return: Condor jobID @@ -128,7 +128,7 @@ def _condorToDiracID(self, condorJobIDs): :param str condorJobIDs: the output of condor_submit - :return: job references such as htcondorce:///. + :return: job references such as htcondorce:///. """ clusterIDs = condorJobIDs.split("-") if len(clusterIDs) != 2: @@ -179,7 +179,6 @@ def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=No # Remote schedd options by default targetUniverse = "vanilla" - # This is used to remove outputs from the remote schedd scheddOptions = "" if self.useLocalSchedd: targetUniverse = "grid" @@ -227,7 +226,7 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False): :param list cmd: list of the condor command elements :param bool keepTokenFile: flag to reuse or not the previously created token file - :return: S_OK/S_ERROR - the result of the executeGridCommand() call + :return: S_OK/S_ERROR - the stdout parameter of the executeGridCommand() call """ if not self.token and not self.proxy: return S_ERROR(f"Cannot execute the command, token and proxy not found: {cmd}") @@ -406,8 +405,7 @@ def getJobStatus(self, jobIDList): # Get all condorIDs so we can just call condor_q and condor_history once for diracJobID in jobIDList: diracJobID = diracJobID.split(":::")[0] - condorJobID = self._DiracToCondorID(diracJobID) - condorIDs[diracJobID] = condorJobID + condorIDs[diracJobID] = self._DiracToCondorID(diracJobID) self.tokenFile = None @@ -422,8 +420,7 @@ def getJobStatus(self, jobIDList): if not result["OK"]: return result - _qList = result["Value"].split("\n") - qList.extend(_qList) + qList.extend(result["Value"].split("\n")) condorHistCall = ["condor_history"] condorHistCall.extend(self.remoteScheddOptions.strip().split(" ")) @@ -433,21 +430,15 @@ def getJobStatus(self, jobIDList): if not result["OK"]: return result - _qList = result["Value"].split("\n") - qList.extend(_qList) + qList.extend(result["Value"].split("\n")) - jobsToCancel = [] for job, jobID in condorIDs.items(): - pilotStatus, reason = parseCondorStatus(qList, jobID) + jobStatus, reason = parseCondorStatus(qList, jobID) - if pilotStatus == PilotStatus.ABORTED: - self.log.verbose("Held job", f"{jobID} because: {reason}") - jobsToCancel.append(jobID) + if jobStatus == PilotStatus.ABORTED: + self.log.verbose("Job", f"{jobID} held: {reason}") - resultDict[job] = pilotStatus - - # Make sure the pilot stays dead and gets taken out of the condor_q - self.killJob(jobsToCancel) + resultDict[job] = jobStatus self.tokenFile = None @@ -480,7 +471,7 @@ def getJobOutput(self, jobID): return S_OK((result["Value"]["output"], result["Value"]["error"])) - def _findFile(self, workingDir, fileName, pathToResult): + def _findFile(self, fileName, pathToResult): """Find a file in a file system. :param str workingDir: the name of the directory containing the given file to search for @@ -489,7 +480,7 @@ def _findFile(self, workingDir, fileName, pathToResult): :return: path leading to the file """ - path = os.path.join(workingDir, pathToResult, fileName) + path = os.path.join(self.workingDirectory, pathToResult, fileName) if os.path.exists(path): return S_OK(path) return S_ERROR(errno.ENOENT, f"Could not find {path}") @@ -535,7 +526,7 @@ def __getJobOutput(self, jobID, outTypes): outputsSuffix = {"output": "out", "error": "err", "logging": "log"} outputs = {} for output, suffix in outputsSuffix.items(): - resOut = self._findFile(self.workingDirectory, f"{condorJobID}.{suffix}", pathToResult) + resOut = self._findFile(f"{condorJobID}.{suffix}", pathToResult) if not resOut["OK"]: # Return an error if the output type was targeted, else we continue if output in outTypes: diff --git a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py index 7c8c7b2d9e9..a84347b7d4f 100644 --- a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py @@ -66,7 +66,6 @@ def test_parseCondorStatus(): } for jobID, expected in expectedResults.items(): - print(jobID, expected) assert HTCE.parseCondorStatus(statusLines, jobID)[0] == expected @@ -103,7 +102,6 @@ def test_getJobStatus(mocker): "htcondorce://condorce.foo.arg/123.2": "Aborted", "htcondorce://condorce.foo.arg/333.3": "Unknown", } - print(ret) assert ret["OK"] is True assert expectedResults == ret["Value"]