From 8508d7d83a79c62e044922a58b72dcd5a5c69180 Mon Sep 17 00:00:00 2001 From: Simon Fayer Date: Fri, 29 Nov 2024 12:01:20 +0100 Subject: [PATCH] sweep: #7913 Add option to include proxy on AREX token submission --- .../Computing/AREXComputingElement.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index d24b75de453..925cb084cef 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -36,24 +36,29 @@ The XRSLExtraString note about times also applies to this configuration option. +AlwaysIncludeProxy: + A boolean, set to true to include the proxy in job submission even + in cases where tokens are the primary authentication method. + (Recommended for ARC6 tokens, deprecated for ARC7) + **Code Documentation** """ -import os import json -import requests +import os import shutil import stat import uuid -from DIRAC import S_OK, S_ERROR +import requests + +from DIRAC import S_ERROR, S_OK from DIRAC.Core.Security import Locations from DIRAC.Core.Security.ProxyInfo import getVOfromProxyGroup from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error from DIRAC.Resources.Computing.ComputingElement import ComputingElement -from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.Resources.Computing.PilotBundle import writeScript - +from DIRAC.WorkloadManagementSystem.Client import PilotStatus MANDATORY_PARAMETERS = ["Queue"] @@ -115,6 +120,8 @@ def __init__(self, ceUniqueID): } # URL used to communicate with the REST interface self.base_url = "" + # A flag to always include a proxy, even if a token is the primary auth method + self.alwaysIncludeProxy = False ############################################################################# @@ -149,6 +156,10 @@ def _reset(self): service_url = os.path.join("https://", f"{self.ceName}:{self.port}") self.base_url = os.path.join(service_url, "arex", "rest", self.restVersion) + self.alwaysIncludeProxy = False + if self.ceParameters.get("AlwaysIncludeProxy", "false").lower() in ("true", "yes"): + self.alwaysIncludeProxy = True + # Set up the request framework self.session = requests.Session() self.session.verify = Locations.getCAsLocation() @@ -247,13 +258,16 @@ def _checkSession(self): if not (self.token or self.proxy): self.log.error("Proxy or token not set") return S_ERROR("Proxy or token not set") + if not self.proxy and self.alwaysIncludeProxy: + self.log.error("Proxy required but not set") + return S_ERROR("Proxy required but not set") # If a token is set, we use it if self.token: # Attach the token to the headers if present self.headers["Authorization"] = f"Bearer {self.token['access_token']}" self.log.verbose("A token is attached to the header of the request(s)") - else: + if not self.token or self.alwaysIncludeProxy: # Prepare the proxy in X509_USER_PROXY if not (result := self._prepareProxy())["OK"]: self.log.error("Failed to set up proxy", result["Message"]) @@ -573,7 +587,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= # Delegation cannot be used with a token delegation = "" - if not self.token: + if not self.token or self.alwaysIncludeProxy: # Get existing delegations result = self._getDelegationIDs() if not result["OK"]: @@ -918,7 +932,7 @@ def getJobStatus(self, jobIDList): self.log.debug(f"Killing held job {jobReference}") # Renew delegations to renew the proxies of the jobs - if not self.token: + if not self.token or self.alwaysIncludeProxy: result = self._getDelegationIDs() if not result["OK"]: return result