Skip to content

Commit

Permalink
Merge pull request #7824 from aldbr/v8.0_FIX_AREXCE-token-integration
Browse files Browse the repository at this point in the history
fix(Resources): AREX can submit jobs without proxy (token only)
  • Loading branch information
chrisburr authored Oct 11, 2024
2 parents 7e453ce + 5d1ec59 commit b3cee1a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 86 deletions.
137 changes: 66 additions & 71 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def _request(self, method, query, params=None, data=None, headers=None, timeout=
except requests.RequestException as e:
return S_ERROR(f"Request exception: {e}")

def _checkSession(self, mandatoryProxy: bool = False):
def _checkSession(self):
"""Check that the session exists and carries a valid proxy."""
if not self.session:
return S_ERROR("REST interface not initialised.")
Expand All @@ -188,13 +188,12 @@ def _checkSession(self, mandatoryProxy: bool = False):
self.log.error("Proxy or token not set")
return S_ERROR("Proxy or token not set")

# A proxy might be required: in this case, it should be present
if mandatoryProxy and not self.proxy:
self.log.error("Proxy is mandatory but not set")
return S_ERROR("Proxy is mandatory but not set")

# If a proxy is required or if no token is set
if mandatoryProxy or not self.token:
# If a token is set, we use it
if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = f"Bearer {self.token['access_token']}"
self.log.verbose("A token is attached to the header of the request(s)")
else:
# Prepare the proxy in X509_USER_PROXY
if not (result := self._prepareProxy())["OK"]:
self.log.error("Failed to set up proxy", result["Message"])
Expand All @@ -204,12 +203,6 @@ def _checkSession(self, mandatoryProxy: bool = False):
self.session.cert = os.environ.get("X509_USER_PROXY")
self.log.verbose("A proxy is attached to the session")

# If a token is set, we use it
if self.token:
# Attach the token to the headers if present
self.headers["Authorization"] = f"Bearer {self.token['access_token']}"
self.log.verbose("A token is attached to the header of the request(s)")

return S_OK()

#############################################################################
Expand Down Expand Up @@ -426,63 +419,66 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
Assume that the ARC queues are always of the format nordugrid-<batchSystem>-<queue>
And none of our supported batch systems have a "-" in their name
"""
result = self._checkSession(mandatoryProxy=True)
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot submit jobs", result["Message"])
return result

self.log.verbose(f"Executable file path: {executableFile}")

# Get existing delegations
result = self._getDelegationIDs()
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")
delegationIDs = result["Value"]

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)

# Bug in AREX, sometimes delegationID does not exist anymore,
# but still appears in the list of delegationIDs.
# Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133
# In this case, we just try with the next one
if not result["OK"] and "404" in result["Message"]:
continue

# Else, it means there was an issue with the CE,
# we stop the execution
if not result["OK"]:
return result

proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID
break

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
return S_ERROR("Could not get a new delegation")
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

if not inputs:
inputs = []
if not outputs:
outputs = []

# Delegation cannot be used with a token
delegation = ""
if not self.token:
# Get existing delegations
result = self._getDelegationIDs()
if not result["OK"]:
self.log.error("Could not get delegation IDs.", result["Message"])
return S_ERROR("Could not get delegation IDs")
delegationIDs = result["Value"]

# Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
currentDelegationID = None
proxyGroup = self.proxy.getDIRACGroup()
for delegationID in delegationIDs:
# Get the proxy attached to the delegationID
result = self._getProxyFromDelegationID(delegationID)

# Bug in AREX, sometimes delegationID does not exist anymore,
# but still appears in the list of delegationIDs.
# Issue submitted here: https://bugzilla.nordugrid.org/show_bug.cgi?id=4133
# In this case, we just try with the next one
if not result["OK"] and "404" in result["Message"]:
continue

# Else, it means there was an issue with the CE,
# we stop the execution
if not result["OK"]:
return result

proxy = result["Value"]

if proxy.getDIRACGroup() != proxyGroup:
continue

# If we are here, we have found the right delegationID to use
currentDelegationID = delegationID
break

if not currentDelegationID:
# No existing delegation, we need to prepare one
result = self._prepareDelegation()
if not result["OK"]:
self.log.warn("Could not get a new delegation", f"for CE {self.ceHost}")
return S_ERROR("Could not get a new delegation")
currentDelegationID = result["Value"]

delegation = f"\n(delegationid={currentDelegationID})"

# If there is a preamble, then we bundle it in an executable file
if self.preamble:
inputs.append(executableFile)
Expand Down Expand Up @@ -723,14 +719,12 @@ def _renewDelegation(self, delegationID):

return S_OK()

#############################################################################

def getJobStatus(self, jobIDList):
"""Get the status information for the given list of jobs.
:param list jobIDList: list of job references, followed by the DIRAC stamp.
"""
result = self._checkSession(mandatoryProxy=True)
result = self._checkSession()
if not result["OK"]:
self.log.error("Cannot get status of the jobs", result["Message"])
return result
Expand Down Expand Up @@ -776,15 +770,16 @@ def getJobStatus(self, jobIDList):
self.log.debug(f"Killing held job {jobReference}")

# Renew delegations to renew the proxies of the jobs
result = self._getDelegationIDs()
if not result["OK"]:
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not self.token:
result = self._getDelegationIDs()
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")
return result
delegationIDs = result["Value"]
for delegationID in delegationIDs:
result = self._renewDelegation(delegationID)
if not result["OK"]:
# Only log here as we still want to return statuses
self.log.warn("Failed to renew delegation", f"{delegationID}: {result['Message']}")

# Kill held jobs
if jobsToCancel:
Expand Down
16 changes: 1 addition & 15 deletions src/DIRAC/Resources/Computing/test/Test_AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,11 @@ def side_effect():
assert not arex.session.cert
assert "Authorization" in arex.headers, arex.headers

# 6. We make the proxy mandatory: the session should include both the proxy and the token
result = arex._checkSession(mandatoryProxy=True)
assert result["OK"], result
assert arex.session
assert arex.session.cert
assert "Authorization" in arex.headers, arex.headers

# 7. Now we just include the token, the proxy is not mandatory:
# 7. Now we just include the token:
# the session should only include the token
arex.proxy = None
result = arex._checkSession()
assert result["OK"], result
assert arex.session
assert not arex.session.cert
assert "Authorization" in arex.headers, arex.headers

# 8. Now we just include the token, but the proxy is mandatory: it should return an error
result = arex._checkSession(mandatoryProxy=True)
assert not result["OK"], result
assert arex.session
assert not arex.session.cert
assert "Authorization" not in arex.headers, arex.headers

0 comments on commit b3cee1a

Please sign in to comment.