Skip to content

Commit

Permalink
feat: replace DIRACJobID with jobReference in AREX/HTCondorCEs
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Oct 6, 2023
1 parent 557eb7b commit b5afceb
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 70 deletions.
91 changes: 45 additions & 46 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Port added to the CE host name to interact with AREX services.
ProxyTimeLeftBeforeRenewal:
Time in seconds before the AREXCE renews proxy of submitted pilots.
Time in seconds before the AREXCE renews proxy of submitted payloads.
RESTVersion:
Version of the REST interface to use.
Expand Down Expand Up @@ -105,34 +105,33 @@ def setToken(self, token, valid):
super().setToken(token, valid)
self.headers["Authorization"] = "Bearer " + self.token["access_token"]

def _arcToDiracID(self, arcJobID):
"""Convert an ARC jobID into a DIRAC jobID.
def _arcIDToJobReference(self, arcJobID):
"""Convert an ARC jobID into a job reference.
Example: 1234 becomes https://<ce>:<port>/arex/1234
:param str: ARC jobID
:return: DIRAC jobID
:return: job reference, defined as an ARC jobID with additional details
"""
# Add CE and protocol information to arc Job ID
if "://" in arcJobID:
self.log.warn("Identifier already in ARC format", arcJobID)
return arcJobID

diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID
return diracJobID
return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}"

def _DiracToArcID(self, diracJobID):
"""Convert a DIRAC jobID into an ARC jobID.
def _jobReferenceToArcID(self, jobReference):
"""Convert a job reference into an ARC jobID.
Example: https://<ce>:<port>/arex/1234 becomes 1234
:param str: DIRAC jobID
:param str: job reference, defined as an ARC jobID with additional details
:return: ARC jobID
"""
# Remove CE and protocol information from arc Job ID
if "://" in diracJobID:
arcJobID = diracJobID.split("arex/")[-1]
if "://" in jobReference:
arcJobID = jobReference.split("arex/")[-1]
return arcJobID
self.log.warn("Identifier already in REST format?", diracJobID)
return diracJobID
self.log.warn("Identifier already in REST format?", jobReference)
return jobReference

#############################################################################

Expand Down Expand Up @@ -483,12 +482,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
if not result["OK"]:
break

jobID = self._arcToDiracID(arcJobID)
batchIDList.append(jobID)
stampDict[jobID] = diracStamp
jobReference = self._arcIDToJobReference(arcJobID)
batchIDList.append(jobReference)
stampDict[jobReference] = diracStamp
self.log.debug(
"Successfully submitted job",
f"{jobID} to CE {self.ceHost}",
f"{jobReference} to CE {self.ceHost}",
)

if batchIDList:
Expand All @@ -503,16 +502,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
def killJob(self, jobIDList):
"""Kill the specified jobs
:param list jobIDList: list of DIRAC Job IDs
:param list jobIDList: list of Job references
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Killing jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(jList)
# Convert job references to ARC jobs
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(arcJobList)

def _killJob(self, arcJobList):
"""Kill the specified jobs
Expand Down Expand Up @@ -545,16 +544,16 @@ def _killJob(self, arcJobList):
def cleanJob(self, jobIDList):
"""Clean files related to the specified jobs
:param list jobIDList: list of DIRAC Job IDs
:param list jobIDList: list of job references
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Cleaning jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(jList)
# Convert job references to ARC jobs
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(arcJobList)

def _cleanJob(self, arcJobList):
"""Clean files related to the specified jobs
Expand Down Expand Up @@ -710,7 +709,7 @@ def _renewDelegation(self, delegationID):
def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs.
:param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp.
:param list jobIDList: list of job references, followed by the DIRAC stamp.
"""
result = self._checkSession()
if not result["OK"]:
Expand All @@ -721,9 +720,9 @@ def getJobStatus(self, jobIDList):
jobIDList = [jobIDList]

self.log.debug("Getting status of jobs:", jobIDList)
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}
# Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]}

# Prepare the command
params = {"action": "status"}
Expand All @@ -746,16 +745,16 @@ def getJobStatus(self, jobIDList):
arcJobsInfo = [arcJobsInfo]

for arcJob in arcJobsInfo:
jobID = self._arcToDiracID(arcJob["id"])
jobReference = self._arcIDToJobReference(arcJob["id"])
# ARC REST interface returns hyperbole
arcState = arcJob["state"].capitalize()
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
resultDict[jobID] = self.mapStates[arcState]
self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}")
resultDict[jobReference] = self.mapStates[arcState]

# Cancel held jobs so they don't sit in the queue forever
if arcState == "Hold":
jobsToCancel.append(arcJob["id"])
self.log.debug(f"Killing held job {jobID}")
self.log.debug(f"Killing held job {jobReference}")

# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
Expand All @@ -782,7 +781,7 @@ def getJobStatus(self, jobIDList):
def getJobLog(self, jobID):
"""Get job logging info
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
:param str jobID: Job reference followed by the DIRAC stamp.
:return: string representing the logging info of a given jobID
"""
result = self._checkSession()
Expand All @@ -791,7 +790,7 @@ def getJobLog(self, jobID):
return result

# Prepare the command: Get output files
arcJob = self._DiracToArcID(jobID.split(":::")[0])
arcJob = self._jobReferenceToArcID(jobID.split(":::")[0])
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))

# Submit the GET request to retrieve outputs
Expand All @@ -810,7 +809,7 @@ def getJobLog(self, jobID):
def _getListOfAvailableOutputs(self, jobID, arcJobID):
"""Request a list of outputs available for a given jobID.
:param str jobID: DIRAC job ID without the DIRAC stamp
:param str jobID: job reference without the DIRAC stamp
:param str arcJobID: ARC job ID
:return list: names of the available outputs
"""
Expand All @@ -830,11 +829,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID):
return S_OK(response.json()["file"])

def getJobOutput(self, jobID, workingDirectory=None):
"""Get the outputs of the given DIRAC job ID.
"""Get the outputs of the given job reference.
Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.
:param str jobID: DIRAC JobID followed by the DIRAC stamp.
:param str jobID: job reference followed by the DIRAC stamp.
:param str workingDirectory: name of the directory containing the retrieved outputs.
:return: content of stdout and stderr
"""
Expand All @@ -848,10 +847,10 @@ def getJobOutput(self, jobID, workingDirectory=None):
jobRef, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
job = self._DiracToArcID(jobRef)
arcJob = self._jobReferenceToArcID(jobRef)

# Get the list of available outputs
result = self._getListOfAvailableOutputs(jobRef, job)
result = self._getListOfAvailableOutputs(jobRef, arcJob)
if not result["OK"]:
return result
remoteOutputs = result["Value"]
Expand All @@ -860,21 +859,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
if not workingDirectory:
if "WorkingDirectory" in self.ceParameters:
# We assume that workingDirectory exists
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
else:
workingDirectory = job
workingDirectory = arcJob
os.mkdir(workingDirectory)

stdout = None
stderr = None
for remoteOutput in remoteOutputs:
# Prepare the command
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))

# Submit the GET request to retrieve outputs
result = self._request("get", query, stream=True)
if not result["OK"]:
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
response = result["Value"]

Expand Down
48 changes: 24 additions & 24 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,21 @@ def __init__(self, ceUniqueID):

#############################################################################

def _DiracToCondorID(self, diracJobID):
"""Convert a DIRAC jobID into a Condor jobID.
def _jobReferenceToCondorID(self, jobReference):
"""Convert a job reference into a Condor jobID.
Example: htcondorce://<ce>/1234.0 becomes 1234.0
:param str: DIRAC jobID
:param str: job reference, a condor jobID with additional details
:return: Condor jobID
"""
# Remove CE and protocol information from arc Job ID
if "://" in diracJobID:
condorJobID = diracJobID.split("/")[-1]
if "://" in jobReference:
condorJobID = jobReference.split("/")[-1]
return condorJobID
return diracJobID
return jobReference

def _condorToDiracID(self, condorJobIDs):
"""Get the references from the condor_submit output.
def _condorIDToJobReference(self, condorJobIDs):
"""Get the job references from the condor job IDs.
Cluster ids look like " 107.0 - 107.0 " or " 107.0 - 107.4 "
:param str condorJobIDs: the output of condor_submit
Expand Down Expand Up @@ -318,16 +318,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
return result

stdout = result["Value"]
pilotJobReferences = self._condorToDiracID(stdout)
if not pilotJobReferences["OK"]:
return pilotJobReferences
pilotJobReferences = pilotJobReferences["Value"]
jobReferences = self._condorIDToJobReference(stdout)
if not jobReferences["OK"]:
return jobReferences
jobReferences = jobReferences["Value"]

self.log.verbose("JobStamps:", jobStamps)
self.log.verbose("pilotRefs:", pilotJobReferences)
self.log.verbose("pilotRefs:", jobReferences)

result = S_OK(pilotJobReferences)
result["PilotStampDict"] = dict(zip(pilotJobReferences, jobStamps))
result = S_OK(jobReferences)
result["PilotStampDict"] = dict(zip(jobReferences, jobStamps))
if self.useLocalSchedd:
# Executable is transferred afterward
# Inform the caller that Condor cannot delete it before the end of the execution
Expand All @@ -346,15 +346,15 @@ def killJob(self, jobIDList):
self.log.verbose("KillJob jobIDList", jobIDList)
self.tokenFile = None

for diracJobID in jobIDList:
condorJobID = self._DiracToCondorID(diracJobID.split(":::")[0])
self.log.verbose("Killing pilot", diracJobID)
for jobReference in jobIDList:
condorJobID = self._jobReferenceToCondorID(jobReference.split(":::")[0])
self.log.verbose("Killing pilot", jobReference)
cmd = ["condor_rm"]
cmd.extend(self.remoteScheddOptions.strip().split(" "))
cmd.append(condorJobID)
result = self._executeCondorCommand(cmd, keepTokenFile=True)
if not result["OK"]:
self.log.error("Failed to kill pilot", f"{diracJobID}: {result['Message']}")
self.log.error("Failed to kill pilot", f"{jobReference}: {result['Message']}")
return result

self.tokenFile = None
Expand Down Expand Up @@ -403,9 +403,9 @@ def getJobStatus(self, jobIDList):
resultDict = {}
condorIDs = {}
# Get all condorIDs so we can just call condor_q and condor_history once
for diracJobID in jobIDList:
diracJobID = diracJobID.split(":::")[0]
condorIDs[diracJobID] = self._DiracToCondorID(diracJobID)
for jobReference in jobIDList:
jobReference = jobReference.split(":::")[0]
condorIDs[jobReference] = self._jobReferenceToCondorID(jobReference)

self.tokenFile = None

Expand Down Expand Up @@ -493,7 +493,7 @@ def __getJobOutput(self, jobID, outTypes):
"""
# Extract stamp from the Job ID
if ":::" in jobID:
diracJobID, stamp = jobID.split(":::")
jobReference, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")

Expand All @@ -503,7 +503,7 @@ def __getJobOutput(self, jobID, outTypes):
return S_ERROR(f"Stamp is not long enough: {stamp}")
pathToResult = os.path.join(self.ceName, stamp[0], stamp[1:3])

condorJobID = self._DiracToCondorID(diracJobID)
condorJobID = self._jobReferenceToCondorID(jobReference)
iwd = os.path.join(self.workingDirectory, pathToResult)

try:
Expand Down

0 comments on commit b5afceb

Please sign in to comment.