Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] Getting more details about failed/aborted pilots from HTCondor #7069

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 45 additions & 46 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Port added to the CE host name to interact with AREX services.

ProxyTimeLeftBeforeRenewal:
Time in seconds before the AREXCE renews proxy of submitted pilots.
Time in seconds before the AREXCE renews proxy of submitted payloads.

RESTVersion:
Version of the REST interface to use.
Expand Down Expand Up @@ -105,34 +105,33 @@ def setToken(self, token, valid):
super().setToken(token, valid)
self.headers["Authorization"] = "Bearer " + self.token["access_token"]

def _arcToDiracID(self, arcJobID):
"""Convert an ARC jobID into a DIRAC jobID.
def _arcIDToJobReference(self, arcJobID):
"""Convert an ARC jobID into a job reference.
Example: 1234 becomes https://<ce>:<port>/arex/1234

:param str: ARC jobID
:return: DIRAC jobID
:return: job reference, defined as an ARC jobID with additional details
"""
# Add CE and protocol information to arc Job ID
if "://" in arcJobID:
self.log.warn("Identifier already in ARC format", arcJobID)
return arcJobID

diracJobID = "https://" + self.ceHost + ":" + self.port + "/arex/" + arcJobID
return diracJobID
return f"https://{self.ceHost}:{self.port}/arex/{arcJobID}"

def _DiracToArcID(self, diracJobID):
"""Convert a DIRAC jobID into an ARC jobID.
def _jobReferenceToArcID(self, jobReference):
"""Convert a job reference into an ARC jobID.
Example: https://<ce>:<port>/arex/1234 becomes 1234

:param str: DIRAC jobID
:param str: job reference, defined as an ARC jobID with additional details
:return: ARC jobID
"""
# Remove CE and protocol information from arc Job ID
if "://" in diracJobID:
arcJobID = diracJobID.split("arex/")[-1]
if "://" in jobReference:
arcJobID = jobReference.split("arex/")[-1]
return arcJobID
self.log.warn("Identifier already in REST format?", diracJobID)
return diracJobID
self.log.warn("Identifier already in REST format?", jobReference)
return jobReference

#############################################################################

Expand Down Expand Up @@ -483,12 +482,12 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
if not result["OK"]:
break

jobID = self._arcToDiracID(arcJobID)
batchIDList.append(jobID)
stampDict[jobID] = diracStamp
jobReference = self._arcIDToJobReference(arcJobID)
batchIDList.append(jobReference)
stampDict[jobReference] = diracStamp
self.log.debug(
"Successfully submitted job",
f"{jobID} to CE {self.ceHost}",
f"{jobReference} to CE {self.ceHost}",
)

if batchIDList:
Expand All @@ -503,16 +502,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
def killJob(self, jobIDList):
"""Kill the specified jobs

:param list jobIDList: list of DIRAC Job IDs
:param list jobIDList: list of Job references
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Killing jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(jList)
# Convert job references to ARC jobs
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
return self._killJob(arcJobList)

def _killJob(self, arcJobList):
"""Kill the specified jobs
Expand Down Expand Up @@ -545,16 +544,16 @@ def _killJob(self, arcJobList):
def cleanJob(self, jobIDList):
"""Clean files related to the specified jobs

:param list jobIDList: list of DIRAC Job IDs
:param list jobIDList: list of job references
"""
if not isinstance(jobIDList, list):
jobIDList = [jobIDList]
self.log.debug("Cleaning jobs", ",".join(jobIDList))

# Convert DIRAC jobs to ARC jobs
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
jList = [self._DiracToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(jList)
# Convert job references to ARC jobs
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobList = [self._jobReferenceToArcID(job.split(":::")[0]) for job in jobIDList]
return self._cleanJob(arcJobList)

def _cleanJob(self, arcJobList):
"""Clean files related to the specified jobs
Expand Down Expand Up @@ -710,7 +709,7 @@ def _renewDelegation(self, delegationID):
def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs.

:param list jobIDList: list of DIRAC Job ID, followed by the DIRAC stamp.
:param list jobIDList: list of job references, followed by the DIRAC stamp.
"""
result = self._checkSession()
if not result["OK"]:
Expand All @@ -721,9 +720,9 @@ def getJobStatus(self, jobIDList):
jobIDList = [jobIDList]

self.log.debug("Getting status of jobs:", jobIDList)
# Convert DIRAC jobs to ARC jobs and encapsulate them in a dictionary for the REST query
# DIRAC Jobs might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._DiracToArcID(job.split(":::")[0])} for job in jobIDList]}
# Convert job references to ARC jobs and encapsulate them in a dictionary for the REST query
# Job references might be stored with a DIRAC stamp (":::XXXXX") that should be removed
arcJobsJson = {"job": [{"id": self._jobReferenceToArcID(job.split(":::")[0])} for job in jobIDList]}

# Prepare the command
params = {"action": "status"}
Expand All @@ -746,16 +745,16 @@ def getJobStatus(self, jobIDList):
arcJobsInfo = [arcJobsInfo]

for arcJob in arcJobsInfo:
jobID = self._arcToDiracID(arcJob["id"])
jobReference = self._arcIDToJobReference(arcJob["id"])
# ARC REST interface returns hyperbole
arcState = arcJob["state"].capitalize()
self.log.debug("REST ARC status", f"for job {jobID} is {arcState}")
resultDict[jobID] = self.mapStates[arcState]
self.log.debug("REST ARC status", f"for job {jobReference} is {arcState}")
resultDict[jobReference] = self.mapStates[arcState]

# Cancel held jobs so they don't sit in the queue forever
if arcState == "Hold":
jobsToCancel.append(arcJob["id"])
self.log.debug(f"Killing held job {jobID}")
self.log.debug(f"Killing held job {jobReference}")

# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
Expand All @@ -782,7 +781,7 @@ def getJobStatus(self, jobIDList):
def getJobLog(self, jobID):
"""Get job logging info

:param str jobID: DIRAC JobID followed by the DIRAC stamp.
:param str jobID: Job reference followed by the DIRAC stamp.
:return: string representing the logging info of a given jobID
"""
result = self._checkSession()
Expand All @@ -791,7 +790,7 @@ def getJobLog(self, jobID):
return result

# Prepare the command: Get output files
arcJob = self._DiracToArcID(jobID.split(":::")[0])
arcJob = self._jobReferenceToArcID(jobID.split(":::")[0])
query = self._urlJoin(os.path.join("jobs", arcJob, "diagnose", "errors"))

# Submit the GET request to retrieve outputs
Expand All @@ -810,7 +809,7 @@ def getJobLog(self, jobID):
def _getListOfAvailableOutputs(self, jobID, arcJobID):
"""Request a list of outputs available for a given jobID.

:param str jobID: DIRAC job ID without the DIRAC stamp
:param str jobID: job reference without the DIRAC stamp
:param str arcJobID: ARC job ID
:return list: names of the available outputs
"""
Expand All @@ -830,11 +829,11 @@ def _getListOfAvailableOutputs(self, jobID, arcJobID):
return S_OK(response.json()["file"])

def getJobOutput(self, jobID, workingDirectory=None):
"""Get the outputs of the given DIRAC job ID.
"""Get the outputs of the given job reference.

Outputs and stored in workingDirectory if present, else in a new directory named <ARC JobID>.

:param str jobID: DIRAC JobID followed by the DIRAC stamp.
:param str jobID: job reference followed by the DIRAC stamp.
:param str workingDirectory: name of the directory containing the retrieved outputs.
:return: content of stdout and stderr
"""
Expand All @@ -848,10 +847,10 @@ def getJobOutput(self, jobID, workingDirectory=None):
jobRef, stamp = jobID.split(":::")
else:
return S_ERROR(f"DIRAC stamp not defined for {jobID}")
job = self._DiracToArcID(jobRef)
arcJob = self._jobReferenceToArcID(jobRef)

# Get the list of available outputs
result = self._getListOfAvailableOutputs(jobRef, job)
result = self._getListOfAvailableOutputs(jobRef, arcJob)
if not result["OK"]:
return result
remoteOutputs = result["Value"]
Expand All @@ -860,21 +859,21 @@ def getJobOutput(self, jobID, workingDirectory=None):
if not workingDirectory:
if "WorkingDirectory" in self.ceParameters:
# We assume that workingDirectory exists
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], job)
workingDirectory = os.path.join(self.ceParameters["WorkingDirectory"], arcJob)
else:
workingDirectory = job
workingDirectory = arcJob
os.mkdir(workingDirectory)

stdout = None
stderr = None
for remoteOutput in remoteOutputs:
# Prepare the command
query = self._urlJoin(os.path.join("jobs", job, "session", remoteOutput))
query = self._urlJoin(os.path.join("jobs", arcJob, "session", remoteOutput))

# Submit the GET request to retrieve outputs
result = self._request("get", query, stream=True)
if not result["OK"]:
self.log.error("Error downloading", f"{remoteOutput} for {job}: {result['Message']}")
self.log.error("Error downloading", f"{remoteOutput} for {arcJob}: {result['Message']}")
return S_ERROR(f"Error downloading {remoteOutput} for {jobID}")
response = result["Value"]

Expand Down
Loading
Loading