diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index d9ef473c5eb..71526d7c486 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -1,9 +1,3 @@ -######################################################################################### -# Condor.py -# 10.11.2014 -# Author: A.T. -######################################################################################### - """ Condor.py is a DIRAC independent class representing Condor batch system. Condor objects are used as backend batch system representation for LocalComputingElement and SSHComputingElement classes @@ -19,13 +13,85 @@ import os +# Cannot use the PilotStatus module here as Condor is meant to be executed on a remote machine +# DIRAC might not be available +STATES_MAP = { + 1: "Waiting", + 2: "Running", + 3: "Aborted", + 4: "Done", + 5: "Failed", +} + +HOLD_REASON_SUBCODE = 55 + +subTemplate = """ +# Environment +# ----------- +# There exist many universe: +# https://htcondor.readthedocs.io/en/latest/users-manual/choosing-an-htcondor-universe.html +universe = %(targetUniverse)s + +# Inputs/Outputs +# -------------- +# Inputs: executable to submit +executable = %(executable)s + +# Directory that will contain the outputs +initialdir = %(initialDir)s + +# Outputs: stdout, stderr, log +output = $(Cluster).$(Process).out +error = $(Cluster).$(Process).err +log = $(Cluster).$(Process).log + +# Transfer all output files, even if the job is failed +transfer_output_files = "" +should_transfer_files = YES +when_to_transfer_output = ON_EXIT_OR_EVICT + +# Environment variables to pass to the job +environment = "DIRAC_PILOT_STAMP=$(stamp) %(environment)s" + +# Credentials +# ----------- +%(useCredentials)s + +# Requirements +# ------------ +request_cpus = %(processors)s + +# Exit options +# ------------ +# Specify the signal sent to the job when HTCondor needs to vacate the worker node +kill_sig=SIGTERM +# By default, HTCondor marked jobs as completed regardless of its status +# This option allows to mark jobs as Held if they don't finish successfully +on_exit_hold = ExitCode != 0 +# A random subcode to identify who put the job on hold +on_exit_hold_subcode = %(holdReasonSubcode)s +# Jobs are then deleted from the system after N days +period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600) + +# Specific options +# ---------------- +# Local vs Remote schedd +%(scheddOptions)s +# CE-specific options +%(extraString)s + + +Queue stamp in %(pilotStampList)s +""" + + def parseCondorStatus(lines, jobID): """parse the condor_q or condor_history output for the job status - :param lines: list of lines from the output of the condor commands, each line is a pair of jobID and statusID + :param lines: list of lines from the output of the condor commands, each line is a pair of jobID, statusID, and holdReasonCode :type lines: python:list :param str jobID: jobID of condor job, e.g.: 123.53 - :returns: Status as known by DIRAC + :returns: Status as known by DIRAC, and a reason if the job is being held """ jobID = str(jobID) for line in lines: @@ -34,35 +100,35 @@ def parseCondorStatus(lines, jobID): status = int(l[1]) except (ValueError, IndexError): continue + holdReason = "" if l[0] == jobID: - return {1: "Waiting", 2: "Running", 3: "Aborted", 4: "Done", 5: "HELD"}.get(status, "Unknown") - return "Unknown" - - -def treatCondorHistory(condorHistCall, qList): - """concatenate clusterID and processID to get the same output as condor_q - until we can expect condor version 8.5.3 everywhere - - :param str condorHistCall: condor_history command to run - :param qList: list of jobID and status from condor_q output, will be modified in this function - :type qList: python:list - :returns: None - """ - sp = subprocess.Popen( - shlex.split(condorHistCall), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - ) - output, _ = sp.communicate() - status = sp.returncode - - # Join the ClusterId and the ProcId and add to existing list of statuses - if status == 0: - for line in output.split("\n"): - values = line.strip().split() - if len(values) == 3: - qList.append("%s.%s %s" % tuple(values)) + # A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode + # Details in: + # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode + + # By default, a held (5) job is defined as Aborted, but there might be some exceptions + if status == 5: + try: + holdReasonCode = int(l[2]) + holdReasonSubcode = int(l[3]) + holdReason = l[4:] + except (ValueError, IndexError): + # This should not happen in theory + # Just set the status to unknown such as + status = -1 + holdReasonCode = -1 + holdReasonSubcode = -1 + + # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true) + # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed + if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE: + status = 5 + # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting + if holdReasonCode == 16: + status = 1 + + return (STATES_MAP.get(status, "Unknown"), holdReason) + return ("Unknown", holdReason) class Condor(object): @@ -96,24 +162,23 @@ def submitJob(self, **kwargs): return resultDict jdlFile = tempfile.NamedTemporaryFile(dir=outputDir, suffix=".jdl") + scheddOptions = 'requirements = OpSys == "LINUX"\n' + scheddOptions += "gentenv = False" jdlFile.write( - """ - Executable = %s - Universe = vanilla - Requirements = OpSys == "LINUX" - Initialdir = %s - Output = $(Cluster).$(Process).out - Error = $(Cluster).$(Process).err - Log = test.log - Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" - Getenv = False - - request_cpus = %s - - Queue stamp in %s - - """ - % (executable, outputDir, numberOfProcessors, ",".join(stamps)) + subTemplate + % dict( + targetUniverse="vanilla", + executable=executable, + initialDir=outputDir, + environment="CONDOR_JOBID=$(Cluster).$(Process)", + useCredentials="", + processors=numberOfProcessors, + holdReasonSubcode=HOLD_REASON_SUBCODE, + daysToKeepRemoteLogs=1, + scheddOptions="", + extraString="", + pilotStampList=",".join(stamps), + ) ) jdlFile.flush() @@ -233,7 +298,7 @@ def getJobStatus(self, **kwargs): resultDict["Message"] = "No user name" return resultDict - cmd = "condor_q -submitter %s -af:j JobStatus" % user + cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user sp = subprocess.Popen( shlex.split(cmd), stdout=subprocess.PIPE, @@ -250,19 +315,26 @@ def getJobStatus(self, **kwargs): qList = output.strip().split("\n") - # FIXME: condor_history does only support j for autoformat from 8.5.3, - # format adds whitespace for each field This will return a list of 1245 75 3 - # needs to cocatenate the first two with a dot - condorHistCall = "condor_history -af ClusterId ProcId JobStatus -submitter %s" % user - treatCondorHistory(condorHistCall, qList) + condorHistCall = ( + "condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user + ) + sp = subprocess.Popen( + shlex.split(condorHistCall), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + output, _ = sp.communicate() + status = sp.returncode + if status == 0: + for line in output.split("\n"): + qList.append(line) statusDict = {} if len(qList): for job in jobIDList: job = str(job) - statusDict[job] = parseCondorStatus(qList, job) - if statusDict[job] == "HELD": - statusDict[job] = "Unknown" + statusDict[job], _ = parseCondorStatus(qList, job) # Final output status = 0 diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 4010adbd1c0..ce1838d3260 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -63,7 +63,7 @@ from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient from DIRAC.FrameworkSystem.private.authorization.utils.Tokens import writeToTokenFile from DIRAC.Core.Security.Locations import getCAsLocation -from DIRAC.Resources.Computing.BatchSystems.Condor import parseCondorStatus +from DIRAC.Resources.Computing.BatchSystems.Condor import HOLD_REASON_SUBCODE, subTemplate, parseCondorStatus MANDATORY_PARAMETERS = ["Queue"] DEFAULT_WORKINGDIRECTORY = "/opt/dirac/pro/runit/WorkloadManagement/SiteDirectorHT" @@ -152,11 +152,10 @@ def __init__(self, ceUniqueID): self.tokenFile = None ############################################################################# - def __writeSub(self, executable, nJobs, location, processors, pilotStamps, tokenFile=None): + def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=None): """Create the Sub File for submission. :param str executable: name of the script to execute - :param int nJobs: number of desired jobs :param str location: directory that should contain the result of the jobs :param int processors: number of CPU cores to allocate :param list pilotStamps: list of pilot stamps (strings) @@ -166,7 +165,6 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps, token mkDir(os.path.join(self.workingDirectory, location)) self.log.debug("InitialDir:", os.path.join(self.workingDirectory, location)) - self.log.debug(f"ExtraSubmitString:\n### \n {self.extraSubmitString} \n###") fd, name = tempfile.mkstemp(suffix=".sub", prefix="HTCondorCE_", dir=self.workingDirectory) @@ -175,6 +173,7 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps, token executable = os.path.join(self.workingDirectory, executable) useCredentials = "use_x509userproxy = true" + # If tokenFile is present, then we transfer it to the worker node if tokenFile: useCredentials += textwrap.dedent( f""" @@ -183,55 +182,26 @@ def __writeSub(self, executable, nJobs, location, processors, pilotStamps, token """ ) + # Remote schedd options by default + targetUniverse = "vanilla" # This is used to remove outputs from the remote schedd - # Used in case a local schedd is not used - periodicRemove = "periodic_remove = " - periodicRemove += "(JobStatus == 4) && " - periodicRemove += f"(time() - EnteredCurrentStatus) > ({self.daysToKeepRemoteLogs} * 24 * 3600)" - - localScheddOptions = ( - """ -ShouldTransferFiles = YES -WhenToTransferOutput = ON_EXIT_OR_EVICT -""" - if self.useLocalSchedd - else periodicRemove - ) - - targetUniverse = "grid" if self.useLocalSchedd else "vanilla" - - sub = """ -executable = %(executable)s -universe = %(targetUniverse)s -%(useCredentials)s -output = $(Cluster).$(Process).out -error = $(Cluster).$(Process).err -log = $(Cluster).$(Process).log -environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" -initialdir = %(initialDir)s -grid_resource = condor %(ceName)s %(ceName)s:%(port)s -transfer_output_files = "" -request_cpus = %(processors)s -%(localScheddOptions)s - -kill_sig=SIGTERM - -%(extraString)s - -Queue stamp in %(pilotStampList)s + scheddOptions = "" + if self.useLocalSchedd: + targetUniverse = "grid" + scheddOptions = f"grid_resource = condor {self.ceName} {self.ceName}:9619" -""" % dict( + sub = subTemplate % dict( + targetUniverse=targetUniverse, executable=executable, - nJobs=nJobs, + initialDir=os.path.join(self.workingDirectory, location), + environment="HTCONDOR_JOBID=$(Cluster).$(Process)", + useCredentials=useCredentials, + holdReasonSubcode=HOLD_REASON_SUBCODE, processors=processors, - ceName=self.ceName, - port=self.port, + daysToKeepRemoteLogs=self.daysToKeepRemoteLogs, + scheddOptions=scheddOptions, extraString=self.extraSubmitString, - initialDir=os.path.join(self.workingDirectory, location), - localScheddOptions=localScheddOptions, - targetUniverse=targetUniverse, pilotStampList=",".join(pilotStamps), - useCredentials=useCredentials, ) subFile.write(sub) subFile.close() @@ -291,15 +261,28 @@ def _executeCondorCommand(self, cmd, keepTokenFile=False): if cas := getCAsLocation(): htcEnv["_CONDOR_AUTH_SSL_CLIENT_CADIR"] = cas + # Execute the command result = executeGridCommand( cmd, gridEnvScript=self.gridEnv, gridEnvDict=htcEnv, ) + if not result["OK"]: + self.tokenFile = None + self.log.error("Command", f"{cmd} failed with: {result['Message']}") + return result + + status, stdout, stderr = result["Value"] + if status: + self.tokenFile = None + # We have got a non-zero status code + errorString = stderr if stderr else stdout + return S_ERROR(f"Command", f"{cmd} failed with: {status} - {errorString.strip()}") + # Remove token file if we do not want to keep it self.tokenFile = self.tokenFile if keepTokenFile else None - return result + return S_OK(stdout.strip()) ############################################################################# def submitJob(self, executableFile, proxy, numberOfJobs=1): @@ -325,9 +308,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): suffix=".token", prefix="HTCondorCE_", dir=self.workingDirectory ) writeToTokenFile(self.token["access_token"], self.tokenFile.name) - subName = self.__writeSub( - executableFile, numberOfJobs, location, nProcessors, jobStamps, tokenFile=self.tokenFile - ) + subName = self.__writeSub(executableFile, location, nProcessors, jobStamps, tokenFile=self.tokenFile) cmd = ["condor_submit", "-terse", subName] # the options for submit to remote are different than the other remoteScheddOptions @@ -337,21 +318,13 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): cmd.insert(-1, op) result = self._executeCondorCommand(cmd, keepTokenFile=True) - self.log.verbose(result) os.remove(subName) - self.tokenFile = None if not result["OK"]: self.log.error("Failed to submit jobs to htcondor", result["Message"]) return result - status, stdout, stderr = result["Value"] - - if status: - # We have got a non-zero status code - errorString = stderr if stderr else stdout - return S_ERROR(f"Pilot submission failed with error: {errorString.strip()}") - - pilotJobReferences = self.__getPilotReferences(stdout.strip()) + stdout = result["Value"] + pilotJobReferences = self.__getPilotReferences(stdout) if not pilotJobReferences["OK"]: return pilotJobReferences pilotJobReferences = pilotJobReferences["Value"] @@ -377,7 +350,6 @@ def killJob(self, jobIDList): jobIDList = [jobIDList] self.log.verbose("KillJob jobIDList", jobIDList) - self.tokenFile = None for jobRef in jobIDList: @@ -388,16 +360,10 @@ def killJob(self, jobIDList): cmd.append(jobID) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: - self.tokenFile = None - return S_ERROR(f"condor_rm failed completely: {result['Message']}") - status, stdout, stderr = result["Value"] - if status != 0: - self.log.warn("Failed to kill pilot", f"{job}: {stdout}, {stderr}") - self.tokenFile = None - return S_ERROR(f"Failed to kill pilot {job}: {stderr}") + self.log.error("Failed to kill pilot", f"{job}: {result['Message']}") + return result self.tokenFile = None - return S_OK() ############################################################################# @@ -455,65 +421,43 @@ def getJobStatus(self, jobIDList): cmd = ["condor_q"] cmd.extend(self.remoteScheddOptions.strip().split(" ")) cmd.extend(_condorIDs) - cmd.extend(["-af:j", "JobStatus"]) + cmd.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"]) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: - self.tokenFile = None - return S_ERROR(f"condor_q failed completely: {result['Message']}") - status, stdout, stderr = result["Value"] - if status != 0: - self.tokenFile = None - return S_ERROR(stdout + stderr) - _qList = stdout.strip().split("\n") + return result + + _qList = result["Value"].split("\n") qList.extend(_qList) - # FIXME: condor_history does only support j for autoformat from 8.5.3, - # format adds whitespace for each field This will return a list of 1245 75 3 - # needs to concatenate the first two with a dot condorHistCall = ["condor_history"] condorHistCall.extend(self.remoteScheddOptions.strip().split(" ")) condorHistCall.extend(_condorIDs) - condorHistCall.extend(["-af", "ClusterId", "ProcId", "JobStatus"]) + condorHistCall.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"]) + result = self._executeCondorCommand(cmd, keepTokenFile=True) + if not result["OK"]: + return result - self._treatCondorHistory(condorHistCall, qList) + _qList = result["Value"].split("\n") + qList.extend(_qList) + jobsToCancel = [] for job, jobID in condorIDs.items(): - pilotStatus = parseCondorStatus(qList, jobID) - if pilotStatus == "HELD": - # make sure the pilot stays dead and gets taken out of the condor_q - cmd = f"condor_rm {self.remoteScheddOptions} {jobID}".split() - _result = self._executeCondorCommand(cmd, keepTokenFile=True) - pilotStatus = PilotStatus.ABORTED + pilotStatus, reason = parseCondorStatus(qList, jobID) + + if pilotStatus == PilotStatus.ABORTED: + self.log.verbose("Held job", f"{jobID} because: {reason}") + jobsToCancel.append(jobID) resultDict[job] = pilotStatus + # Make sure the pilot stays dead and gets taken out of the condor_q + self.killJob(jobsToCancel) + self.tokenFile = None self.log.verbose(f"Pilot Statuses: {resultDict} ") return S_OK(resultDict) - def _treatCondorHistory(self, condorHistCall, qList): - """concatenate clusterID and processID to get the same output as condor_q - until we can expect condor version 8.5.3 everywhere - - :param str condorHistCall: condor_history command to run - :param list qList: list of jobID and status from condor_q output, will be modified in this function - :returns: None - """ - - result = self._executeCondorCommand(condorHistCall) - if not result["OK"]: - return S_ERROR(f"condorHistCall failed completely: {result['Message']}") - - status_history, stdout_history, stderr_history = result["Value"] - - # Join the ClusterId and the ProcId and add to existing list of statuses - if status_history == 0: - for line in stdout_history.split("\n"): - values = line.strip().split() - if len(values) == 3: - qList.append("%s.%s %s" % tuple(values)) - def getJobLog(self, jobID): """Get pilot job logging info from HTCondor @@ -559,23 +503,12 @@ def __getJobOutput(self, jobID, outTypes): if not self.useLocalSchedd: cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorID] result = self._executeCondorCommand(cmd) - self.log.verbose(result) # Getting 'logging' without 'error' and 'output' is possible but will generate command errors # We do not check the command errors if we only want 'logging' if "error" in outTypes or "output" in outTypes: - errorMessage = "Failed to get job output from htcondor" if not result["OK"]: - self.log.error(errorMessage, result["Message"]) return result - # Even if result is OK, the actual exit code of cmd can still be an error - status, stdout, stderr = result["Value"] - if status != 0: - outMessage = stdout.strip() - errMessage = stderr.strip() - varMessage = outMessage + " " + errMessage - self.log.error(errorMessage, varMessage) - return S_ERROR(f"{errorMessage}: {varMessage}") outputsSuffix = {"output": "out", "error": "err", "logging": "log"} outputs = {}