Skip to content

Commit

Permalink
fix: HTCondor tests + a bit of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Jun 28, 2023
1 parent 330b47e commit 7415c90
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 108 deletions.
11 changes: 6 additions & 5 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
on_exit_hold = ExitCode != 0
# A random subcode to identify who put the job on hold
on_exit_hold_subcode = %(holdReasonSubcode)s
# Jobs are then deleted from the system after N days
period_remove = (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
# Jobs are then deleted from the system after N days if they are not running
period_remove = (JobStatus != 2) && (time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)
# Specific options
# ----------------
Expand Down Expand Up @@ -105,9 +105,10 @@ def parseCondorStatus(lines, jobID):
# A job can be held for many various reasons, we need to further investigate with the holdReasonCode & holdReasonSubCode
# Details in:
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode

# By default, a held (5) job is defined as Aborted, but there might be some exceptions
if status == 5:

# By default, a held (5) job is defined as Aborted, but there might be some exceptions
status = 3
try:
holdReasonCode = int(l[2])
holdReasonSubcode = int(l[3])
Expand All @@ -124,7 +125,7 @@ def parseCondorStatus(lines, jobID):
if holdReasonCode == 3 and holdReasonSubcode == HOLD_REASON_SUBCODE:
status = 5
# If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting
if holdReasonCode == 16:
elif holdReasonCode == 16:
status = 1

return (STATES_MAP.get(status, "Unknown"), holdReason)
Expand Down
161 changes: 80 additions & 81 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,50 +71,6 @@
DEFAULT_DAYSTOKEEPLOGS = 15


def logDir(ceName, stamp):
"""Return path to log and output files for pilot.
:param str ceName: Name of the CE
:param str stamp: pilot stamp from/for jobRef
"""
return os.path.join(ceName, stamp[0], stamp[1:3])


def condorIDAndPathToResultFromJobRef(jobRef):
"""Extract tuple of jobURL and jobID from the jobRef string.
The condorID as well as the path leading to the job results are also extracted from the jobID.
:param str jobRef: PilotJobReference of the following form: ``htcondorce://<ceName>/<condorID>:::<pilotStamp>``
:return: tuple composed of the jobURL, the path to the job results and the condorID of the given jobRef
"""
splits = jobRef.split(":::")
jobURL = splits[0]
stamp = splits[1] if len(splits) > 1 else ""
_, _, ceName, condorID = jobURL.split("/")

# Reconstruct the path leading to the result (log, output)
# Construction of the path can be found in submitJob()
pathToResult = logDir(ceName, stamp) if len(stamp) >= 3 else ""

return jobURL, pathToResult, condorID


def findFile(workingDir, fileName, pathToResult):
"""Find a file in a file system.
:param str workingDir: the name of the directory containing the given file to search for
:param str fileName: the name of the file to find
:param str pathToResult: the path to follow from workingDir to find the file
:return: path leading to the file
"""
path = os.path.join(workingDir, pathToResult, fileName)
if os.path.exists(path):
return S_OK(path)
return S_ERROR(errno.ENOENT, f"Could not find {path}")


class HTCondorCEComputingElement(ComputingElement):
"""HTCondorCE computing element class
implementing the functions jobSubmit, getJobOutput
Expand Down Expand Up @@ -152,6 +108,45 @@ def __init__(self, ceUniqueID):
self.tokenFile = None

#############################################################################

def _DiracToCondorID(self, diracJobID):
"""Convert a DIRAC jobID into an Condor jobID.
Example: https://<ce>/1234/0 becomes 1234.0
:param str: DIRAC jobID
:return: Condor jobID
"""
# Remove CE and protocol information from arc Job ID
if "://" in diracJobID:
condorJobID = diracJobID.split("/")[-1]
return condorJobID
return diracJobID

def _condorToDiracID(self, condorJobIDs):
"""Get the references from the condor_submit output.
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
:param str condorJobIDs: the output of condor_submit
:return: job references such as htcondorce://<CE name>/<clusterID>.<i>
"""
clusterIDs = condorJobIDs.split("-")
if len(clusterIDs) != 2:
return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}")
clusterIDs = [clu.strip() for clu in clusterIDs]
self.log.verbose("Cluster IDs parsed:", clusterIDs)
try:
clusterID = clusterIDs[0].split(".")[0]
numJobs = clusterIDs[1].split(".")[1]
except IndexError:
return S_ERROR(f"Something wrong with the condor_submit output: {condorJobIDs}")

cePrefix = f"htcondorce://{self.ceName}/"
jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)]
return S_OK(jobReferences)

#############################################################################

def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=None):
"""Create the Sub File for submission.
Expand Down Expand Up @@ -188,7 +183,7 @@ def __writeSub(self, executable, location, processors, pilotStamps, tokenFile=No
scheddOptions = ""
if self.useLocalSchedd:
targetUniverse = "grid"
scheddOptions = f"grid_resource = condor {self.ceName} {self.ceName}:9619"
scheddOptions = f"grid_resource = condor {self.ceName} {self.ceName}:{self.port}"

sub = subTemplate % dict(
targetUniverse=targetUniverse,
Expand Down Expand Up @@ -301,7 +296,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
jobStamps.append(jobStamp)

# We randomize the location of the pilot output and log, because there are just too many of them
location = logDir(self.ceName, commonJobStampPart)
location = os.path.join(self.ceName, commonJobStampPart[0], commonJobStampPart[1:3])
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
if self.token:
self.tokenFile = tempfile.NamedTemporaryFile(
Expand All @@ -324,7 +319,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
return result

stdout = result["Value"]
pilotJobReferences = self.__getPilotReferences(stdout)
pilotJobReferences = self._condorToDiracID(stdout)
if not pilotJobReferences["OK"]:
return pilotJobReferences
pilotJobReferences = pilotJobReferences["Value"]
Expand Down Expand Up @@ -352,15 +347,15 @@ def killJob(self, jobIDList):
self.log.verbose("KillJob jobIDList", jobIDList)
self.tokenFile = None

for jobRef in jobIDList:
job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
self.log.verbose("Killing pilot", job)
for diracJobID in jobIDList:
condorJobID = self._DiracToCondorID(diracJobID.split(":::")[0])
self.log.verbose("Killing pilot", diracJobID)
cmd = ["condor_rm"]
cmd.extend(self.remoteScheddOptions.strip().split(" "))
cmd.append(jobID)
cmd.append(condorJobID)
result = self._executeCondorCommand(cmd, keepTokenFile=True)
if not result["OK"]:
self.log.error("Failed to kill pilot", f"{job}: {result['Message']}")
self.log.error("Failed to kill pilot", f"{diracJobID}: {result['Message']}")
return result

self.tokenFile = None
Expand Down Expand Up @@ -409,9 +404,10 @@ def getJobStatus(self, jobIDList):
resultDict = {}
condorIDs = {}
# Get all condorIDs so we can just call condor_q and condor_history once
for jobRef in jobIDList:
job, _, jobID = condorIDAndPathToResultFromJobRef(jobRef)
condorIDs[job] = jobID
for diracJobID in jobIDList:
diracJobID = diracJobID.split(":::")[0]
condorJobID = self._DiracToCondorID(diracJobID)
condorIDs[diracJobID] = condorJobID

self.tokenFile = None

Expand Down Expand Up @@ -484,13 +480,39 @@ def getJobOutput(self, jobID):

return S_OK((result["Value"]["output"], result["Value"]["error"]))

def _findFile(self, workingDir, fileName, pathToResult):
"""Find a file in a file system.
:param str workingDir: the name of the directory containing the given file to search for
:param str fileName: the name of the file to find
:param str pathToResult: the path to follow from workingDir to find the file
:return: path leading to the file
"""
path = os.path.join(workingDir, pathToResult, fileName)
if os.path.exists(path):
return S_OK(path)
return S_ERROR(errno.ENOENT, f"Could not find {path}")

def __getJobOutput(self, jobID, outTypes):
"""Get job outputs: output, error and logging files from HTCondor
:param str jobID: job identifier
:param list outTypes: output types targeted (output, error and/or logging)
"""
_job, pathToResult, condorID = condorIDAndPathToResultFromJobRef(jobID)
# Extract stamp from the Job ID
if ":::" in jobID:
diracJobID, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")

# Reconstruct the path leading to the result (log, output)
# Construction of the path can be found in submitJob()
if len(stamp) < 3:
return S_ERROR(f"Stamp is not long enough: {stamp}")
pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3])

condorJobID = self._DiracToCondorID(diracJobID)
iwd = os.path.join(self.workingDirectory, pathToResult)

try:
Expand All @@ -501,7 +523,7 @@ def __getJobOutput(self, jobID, outTypes):
return S_ERROR(e.errno, f"{errorMessage} ({iwd})")

if not self.useLocalSchedd:
cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorID]
cmd = ["condor_transfer_data", "-pool", f"{self.ceName}:{self.port}", "-name", self.ceName, condorJobID]
result = self._executeCondorCommand(cmd)

# Getting 'logging' without 'error' and 'output' is possible but will generate command errors
Expand All @@ -513,7 +535,7 @@ def __getJobOutput(self, jobID, outTypes):
outputsSuffix = {"output": "out", "error": "err", "logging": "log"}
outputs = {}
for output, suffix in outputsSuffix.items():
resOut = findFile(self.workingDirectory, f"{condorID}.{suffix}", pathToResult)
resOut = self._findFile(self.workingDirectory, f"{condorJobID}.{suffix}", pathToResult)
if not resOut["OK"]:
# Return an error if the output type was targeted, else we continue
if output in outTypes:
Expand All @@ -536,29 +558,6 @@ def __getJobOutput(self, jobID, outTypes):

return S_OK(outputs)

def __getPilotReferences(self, jobString):
"""Get the references from the condor_submit output.
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
:param str jobString: the output of condor_submit
:return: job references such as htcondorce://<CE name>/<path to result>-<clusterID>.<i>
"""
self.log.verbose("getPilotReferences:", jobString)
clusterIDs = jobString.split("-")
if len(clusterIDs) != 2:
return S_ERROR(f"Something wrong with the condor_submit output: {jobString}")
clusterIDs = [clu.strip() for clu in clusterIDs]
self.log.verbose("Cluster IDs parsed:", clusterIDs)
try:
clusterID = clusterIDs[0].split(".")[0]
numJobs = clusterIDs[1].split(".")[1]
except IndexError:
return S_ERROR(f"Something wrong with the condor_submit output: {jobString}")
cePrefix = f"htcondorce://{self.ceName}/"
jobReferences = [f"{cePrefix}{clusterID}.{i}" for i in range(int(numJobs) + 1)]
return S_OK(jobReferences)

def __cleanup(self):
"""Clean the working directory of old jobs"""
if not HTCondorCEComputingElement._cleanupLock.acquire(False):
Expand Down
Loading

0 comments on commit 7415c90

Please sign in to comment.