Skip to content

Commit

Permalink
fix: HTCondor better reports the failures
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jun 22, 2023
1 parent 28d685f commit 1ab913c
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 186 deletions.
196 changes: 134 additions & 62 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
#########################################################################################
# Condor.py
# 10.11.2014
# Author: A.T.
#########################################################################################

""" Condor.py is a DIRAC independent class representing Condor batch system.
Condor objects are used as backend batch system representation for
LocalComputingElement and SSHComputingElement classes
Expand All @@ -19,13 +13,85 @@
import os


# Cannot use the PilotStatus module here as Condor is meant to be executed on a remote machine
# DIRAC might not be available
STATES_MAP = {
1: "Waiting",
2: "Running",
3: "Aborted",
4: "Done",
5: "Failed",
}

HOLD_REASON_SUBCODE = 55

subTemplate = """
# Environment
# -----------
# There exist many universe:
# https://htcondor.readthedocs.io/en/latest/users-manual/choosing-an-htcondor-universe.html
universe = %(targetUniverse)s
# Inputs/Outputs
# --------------
# Inputs: executable to submit
executable = %(executable)s
# Directory that will contain the outputs
initialdir = %(initialDir)s
# Outputs: stdout, stderr, log
output = $(Cluster).$(Process).out
error = $(Cluster).$(Process).err
log = $(Cluster).$(Process).log
# Transfer all output files, even if the job is failed
transfer_output_files = ""
should_transfer_files = YES
when_to_transfer_output = ON_EXIT_OR_EVICT
# Environment variables to pass to the job
environment = "DIRAC_PILOT_STAMP=$(stamp) %(environment)s"
# Credentials
# -----------
%(useCredentials)s
# Requirements
# ------------
request_cpus = %(processors)s
# Exit options
# ------------
# Specify the signal sent to the job when HTCondor needs to vacate the worker node
kill_sig=SIGTERM
# By default, HTCondor marked jobs as completed regardless of its status
# This option allows to mark jobs as Held if they don't finish successfully
on_exit_hold = ExitCode != 0
# A random subcode to identify who put the job on hold
on_exit_hold_subcode = %(holdReasonSubcode)s
# Jobs are then deleted from the system after N days
period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
# Specific options
# ----------------
# Local vs Remote schedd
%(scheddOptions)s
# CE-specific options
%(extraString)s
Queue stamp in %(pilotStampList)s
"""


def parseCondorStatus(lines, jobID):
"""parse the condor_q or condor_history output for the job status
:param lines: list of lines from the output of the condor commands, each line is a pair of jobID and statusID
:param lines: list of lines from the output of the condor commands, each line is a pair of jobID, statusID, and holdReasonCode
:type lines: python:list
:param str jobID: jobID of condor job, e.g.: 123.53
:returns: Status as known by DIRAC
:returns: Status as known by DIRAC, and a reason if the job is being held
"""
jobID = str(jobID)
for line in lines:
Expand All @@ -34,35 +100,35 @@ def parseCondorStatus(lines, jobID):
status = int(l[1])
except (ValueError, IndexError):
continue
holdReason = ""
if l[0] == jobID:
return {1: "Waiting", 2: "Running", 3: "Aborted", 4: "Done", 5: "HELD"}.get(status, "Unknown")
return "Unknown"


def treatCondorHistory(condorHistCall, qList):
"""concatenate clusterID and processID to get the same output as condor_q
until we can expect condor version 8.5.3 everywhere
:param str condorHistCall: condor_history command to run
:param qList: list of jobID and status from condor_q output, will be modified in this function
:type qList: python:list
:returns: None
"""
sp = subprocess.Popen(
shlex.split(condorHistCall),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, _ = sp.communicate()
status = sp.returncode

# Join the ClusterId and the ProcId and add to existing list of statuses
if status == 0:
for line in output.split("\n"):
values = line.strip().split()
if len(values) == 3:
qList.append("%s.%s %s" % tuple(values))
# A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode

# By default, a held (5) job is defined as Aborted, but there might be some exceptions
if status == 5:
try:
holdReasonCode = int(l[2])
holdReasonSubcode = int(l[3])
holdReason = l[4:]
except (ValueError, IndexError):
# This should not happen in theory
# Just set the status to unknown such as
status = -1
holdReasonCode = -1
holdReasonSubcode = -1

# If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true)
# And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed
if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
if holdReasonCode == 16:
status = 1

return (STATES_MAP.get(status, "Unknown"), holdReason)
return ("Unknown", holdReason)


class Condor(object):
Expand Down Expand Up @@ -96,24 +162,23 @@ def submitJob(self, **kwargs):
return resultDict

jdlFile = tempfile.NamedTemporaryFile(dir=outputDir, suffix=".jdl")
scheddOptions = 'requirements = OpSys == "LINUX"\n'
scheddOptions += "gentenv = False"
jdlFile.write(
"""
Executable = %s
Universe = vanilla
Requirements = OpSys == "LINUX"
Initialdir = %s
Output = $(Cluster).$(Process).out
Error = $(Cluster).$(Process).err
Log = test.log
Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
Getenv = False
request_cpus = %s
Queue stamp in %s
"""
% (executable, outputDir, numberOfProcessors, ",".join(stamps))
subTemplate
% dict(
targetUniverse="vanilla",
executable=executable,
initialDir=outputDir,
environment="CONDOR_JOBID=$(Cluster).$(Process)",
useCredentials="",
processors=numberOfProcessors,
holdReasonSubcode=HOLD_REASON_SUBCODE,
daysToKeepRemoteLogs=1,
scheddOptions="",
extraString="",
pilotStampList=",".join(stamps),
)
)

jdlFile.flush()
Expand Down Expand Up @@ -233,7 +298,7 @@ def getJobStatus(self, **kwargs):
resultDict["Message"] = "No user name"
return resultDict

cmd = "condor_q -submitter %s -af:j JobStatus" % user
cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user
sp = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
Expand All @@ -250,19 +315,26 @@ def getJobStatus(self, **kwargs):

qList = output.strip().split("\n")

# FIXME: condor_history does only support j for autoformat from 8.5.3,
# format adds whitespace for each field This will return a list of 1245 75 3
# needs to cocatenate the first two with a dot
condorHistCall = "condor_history -af ClusterId ProcId JobStatus -submitter %s" % user
treatCondorHistory(condorHistCall, qList)
condorHistCall = (
"condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user
)
sp = subprocess.Popen(
shlex.split(condorHistCall),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, _ = sp.communicate()
status = sp.returncode
if status == 0:
for line in output.split("\n"):
qList.append(line)

statusDict = {}
if len(qList):
for job in jobIDList:
job = str(job)
statusDict[job] = parseCondorStatus(qList, job)
if statusDict[job] == "HELD":
statusDict[job] = "Unknown"
statusDict[job], _ = parseCondorStatus(qList, job)

# Final output
status = 0
Expand Down
Loading

0 comments on commit 1ab913c

Please sign in to comment.