diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py index 22b661ca13d..44708e9ab44 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -12,12 +12,16 @@ # # imports import os import time +import tempfile from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs from DIRAC.Core.Base.AgentModule import AgentModule -from DIRAC.Core.Utilities.Proxy import executeWithUserProxy +from DIRAC.Core.Security.ProxyInfo import getProxyInfo +from DIRAC.Core.Utilities.Proxy import executeWithoutServerCertificate +from DIRAC.Core.Utilities.Proxy import getProxy from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOMSAttributeForGroup, getDNForUsername from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient @@ -31,20 +35,22 @@ class PilotLoggingAgent(AgentModule): def __init__(self, *args, **kwargs): """c'tor""" super().__init__(*args, **kwargs) - self.clearPilotsDelay = 30 + self.clearPilotsDelay = 30 # in days + self.proxyTimeleftLimit = 600 # in seconds def initialize(self): """ agent's initialisation. Use this agent's CS information to: - Determine what Defaults/Shifter shifter proxy to use., - get the target SE name from the CS. - Obtain log file location from Tornado. + Determine VOs with remote logging enabled, + Determine what Defaults/Shifter shifter proxy to use., download the proxies. :param self: self reference """ # pilot logs lifetime in days self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay) - # configured VOs and setup + # proxy timeleft limit before we get a new one. + self.proxyTimeleftLimit = self.am_getOption("ProxyTimeleftLimit", self.proxyTimeleftLimit) + # configured VOs res = getVOs() if not res["OK"]: return res @@ -52,25 +58,16 @@ def initialize(self): if isinstance(self.voList, str): self.voList = [self.voList] + # download shifter proxies for enabled VOs: + self.proxyDict = {} - return S_OK() - - def execute(self): - """ - Execute one agent cycle. Upload log files to the SE and register them in the DFC. - Use a shifter proxy dynamically loaded for every VO - - :param self: self reference - """ - voRes = {} for vo in self.voList: - self.opsHelper = Operations(vo=vo) + opsHelper = Operations(vo=vo) # is remote pilot logging enabled for the VO ? - pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False) + pilotLogging = opsHelper.getValue("/Pilot/RemoteLogging", False) if pilotLogging: - res = self.opsHelper.getOptionsDict("Shifter/DataManager") + res = opsHelper.getOptionsDict("Shifter/DataManager") if not res["OK"]: - voRes[vo] = "No shifter defined - skipped" self.log.error(f"No shifter defined for VO: {vo} - skipping ...") continue @@ -80,36 +77,75 @@ def execute(self): self.log.error( f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}" ) - voRes[vo] = "No proxy user or group defined - skipped" continue self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") - res = self.executeForVO( # pylint: disable=unexpected-keyword-arg - vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup - ) - if not res["OK"]: - voRes[vo] = res["Message"] + # download a proxy and save a file name, userDN and proxyGroup for future use: + result = getDNForUsername(proxyUser) + if not result["OK"]: + self.log.error(f"Could not obtain a DN of user {proxyUser} for VO {vo}, skipped") + continue + userDNs = result["Value"] # the same user may have more than one DN + + with tempfile.NamedTemporaryFile(prefix="gridpp" + "__", delete=False) as ntf: + result = self._downloadProxy(vo, userDNs, proxyGroup, ntf.name) + + if not result["OK"]: + # no proxy, we have no other option than to skip the VO + continue + self.proxyDict[vo] = {"proxy": result["Value"], "DN": userDNs, "group": proxyGroup} + + return S_OK() + + def execute(self): + """ + Execute one agent cycle. Upload log files to the SE and register them in the DFC. + Consider only VOs we have proxies for. + + :param self: self reference + """ + voRes = {} + self.log.verbose(f"VOs configured for remote logging: {list(self.proxyDict.keys())}") + originalUserProxy = os.environ.get("X509_USER_PROXY") + for vo, elem in self.proxyDict.items(): + if self._isProxyExpired(elem["proxy"], self.proxyTimeleftLimit): + result = self._downloadProxy(vo, elem["DN"], elem["group"], elem["proxy"]) + if not result["OK"]: + voRes[vo] = result["Message"] + continue + os.environ["X509_USER_PROXY"] = elem["proxy"] + res = self.executeForVO(vo) + if not res["OK"]: + voRes[vo] = res["Message"] + # restore the original proxy: + if originalUserProxy: + os.environ["X509_USER_PROXY"] = originalUserProxy + else: + os.environ.pop("X509_USER_PROXY", None) + if voRes: for key, value in voRes.items(): self.log.error(f"Error for {key} vo; message: {value}") voRes.update(S_ERROR("Agent cycle for some VO finished with errors")) return voRes + return S_OK() - @executeWithUserProxy + @executeWithoutServerCertificate def executeForVO(self, vo): """ Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS: UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path). UploadSE - Storage element where the logs will be kept. - :param str vo: vo enabled for remote pilot logging + :param str vo: vo enabled for remote pilot logging (and a successfully downloaded proxy for the VO) :return: S_OK or S_ERROR :rtype: dict """ self.log.info(f"Pilot files upload cycle started for VO: {vo}") - res = self.opsHelper.getOptionsDict("Pilot") + opsHelper = Operations(vo=vo) + res = opsHelper.getOptionsDict("Pilot") if not res["OK"]: return S_ERROR(f"No pilot section for {vo} vo") pilotOptions = res["Value"] @@ -184,3 +220,41 @@ def clearOldPilotLogs(self, pilotLogPath): os.remove(fullpath) except Exception as excp: self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp) + + def _downloadProxy(self, vo, userDNs, proxyGroup, filename): + """ + Fetch a new proxy and store it in a file filename. + + :param str vo: VO to get a proxy for + :param list userDNs: user DN list + :param str proxyGroup: user group + :param str filename: file name to store a proxy + :return: Dirac S_OK or S_ERROR object + :rtype: dict + """ + vomsAttr = getVOMSAttributeForGroup(proxyGroup) + result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename) + if not result["OK"]: + self.log.error(f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped") + return S_ERROR(f"Could not download a proxy, {vo} skipped") + return result + + def _isProxyExpired(self, proxyfile, limit): + """ + Check proxy timeleft. If less than a limit, return True. + + :param str proxyfile: + :param int limit: timeleft threshold below which a proxy is considered expired. + :return: True or False + :rtype: bool + """ + result = getProxyInfo(proxyfile) + if not result["OK"]: + self.log.error(f"Could not get proxy info {result['Message']}") + return True + timeleft = result["Value"]["secondsLeft"] + self.log.debug(f"Proxy {proxyfile} time left: {timeleft}") + if timeleft < limit: + self.log.info(f"proxy {proxyfile} expired/is about to expire. Will fetch a new one") + return True + return False diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py index 2e7939e48b6..8bf566199ac 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py @@ -17,6 +17,9 @@ # Mock Objects mockReply = MagicMock() mockReply1 = MagicMock() +mockGetDNForUsername = MagicMock() +mockGetVomsAttr = MagicMock() +mockGetProxy = MagicMock() mockOperations = MagicMock() mockTornadoClient = MagicMock() mockDataManager = MagicMock() @@ -48,19 +51,16 @@ def plaBase(mocker): mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1 ) + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getDNForUsername", side_effect=mockGetDNForUsername + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getProxy", side_effect=mockGetProxy) pla = PilotLoggingAgent() pla.log = gLogger pla._AgentModule__configDefaults = mockAM return pla -@pytest.fixture -def pla_initialised(mocker, plaBase): - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO") - plaBase.initialize() - return plaBase - - @pytest.fixture def pla(mocker, plaBase): mocker.patch( @@ -72,42 +72,70 @@ def pla(mocker, plaBase): "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager", side_effect=mockDataManager, ) - plaBase.initialize() return plaBase -def test_initialize(plaBase): +@pytest.mark.parametrize( + "remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes", + [ + ( + [True, False], + upDict, + S_OK(["myDN"]), + S_OK(), + S_OK("proxyfilename"), + {"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, + S_OK(), + ), + ([False, False], upDict, S_OK(["myDN"]), S_OK(), S_OK(), {}, S_OK()), + ([True, False], upDict, S_ERROR("Could not obtain a DN"), S_OK(), S_OK(), {}, S_OK()), + ([True, False], upDict, S_ERROR("Could not download proxy"), S_OK(), S_ERROR("Failure"), {}, S_OK()), + ], +) +def test_initialize(plaBase, remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes): + """ + After a successful initialisation the proxyDict should contain proxy filenames key by a VO name. + test loops: gridpp enabled, lz disabled, proxy obtained, result proxyDict contains a proxy filename. + both VOs disabled => proxyDict empty + gridpp enabled, by getDNForUsername fails => proxyDict empty + gridpp enabled, lz disabled, getProxy fails => proxyDict empty + + """ + mockReply.side_effect = remoteLogging # Operations.getValue("/Pilot/RemoteLogging", False) + mockReply1.return_value = options # Operations.getOptionsDict("Shifter/DataManager") + mockGetDNForUsername.return_value = getDN + mockGetVomsAttr.return_value = getVOMS + mockGetProxy.return_value = getProxy + res = plaBase.initialize() + assert plaBase.voList == plaModule.getVOs()["Value"] - assert res == S_OK() + assert resDict == plaBase.proxyDict + assert res == expectedRes @pytest.mark.parametrize( - "mockReplyInput, expected, expectedExecOut, expected2", + "proxyDict, execVORes, expectedResult", [ - ("/Pilot/RemoteLogging", [True, False], S_OK(), upDict), - ("/Pilot/RemoteLogging", [False, False], S_OK(), upDict), - ("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict), + ({}, S_OK(), S_OK()), + ({"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, S_OK(), S_OK()), + ( + {"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, + S_ERROR("Execute for VO failed"), + S_ERROR("Agent cycle for some VO finished with errors"), + ), ], ) -def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2): +def test_execute(plaBase, proxyDict, execVORes, expectedResult): """Testing a thin version of execute (executeForVO is mocked)""" - assert pla_initialised.voList == plaModule.getVOs()["Value"] - mockReply.side_effect = expected - mockReply1.return_value = expected2 - # remote pilot logging on (gridpp only) and off. - pla_initialised.executeForVO.return_value = expectedExecOut - res = pla_initialised.execute() - if not any(expected): - pla_initialised.executeForVO.assert_not_called() - else: - assert pla_initialised.executeForVO.called - pla_initialised.executeForVO.assert_called_with( - "gridpp", - proxyUserName=upDict["Value"]["User"], - proxyUserGroup=upDict["Value"]["Group"], - ) - assert res["OK"] == expectedExecOut["OK"] + + plaBase.executeForVO = MagicMock() + plaBase._isProxyExpired = MagicMock() + plaBase._isProxyExpired.return_value = False + plaBase.proxyDict = proxyDict + plaBase.executeForVO.return_value = execVORes + res = plaBase.execute() + assert res["OK"] == expectedResult["OK"] @pytest.mark.parametrize(