From 95e2e331064d9ee63230ce9a1ffe358403a04908 Mon Sep 17 00:00:00 2001 From: martynia Date: Tue, 24 Oct 2023 13:02:37 +0200 Subject: [PATCH 1/3] fix: Refactor the PilotLoggingAgent - download proxies at initialisation (adress #7249) --- .../Agent/PilotLoggingAgent.py | 85 +++++++++++++------ 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py index 22b661ca13d..fbf2ae36d97 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -12,12 +12,15 @@ # # imports import os import time +import tempfile from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs from DIRAC.Core.Base.AgentModule import AgentModule -from DIRAC.Core.Utilities.Proxy import executeWithUserProxy +from DIRAC.Core.Utilities.Proxy import executeWithoutServerCertificate +from DIRAC.Core.Utilities.Proxy import getProxy from DIRAC.DataManagementSystem.Client.DataManager import DataManager +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOMSAttributeForGroup, getDNForUsername from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient @@ -36,9 +39,8 @@ def __init__(self, *args, **kwargs): def initialize(self): """ agent's initialisation. Use this agent's CS information to: - Determine what Defaults/Shifter shifter proxy to use., - get the target SE name from the CS. - Obtain log file location from Tornado. + Determine VOs with remote logging enabled, + Determine what Defaults/Shifter shifter proxy to use., download the proxies. :param self: self reference """ @@ -52,25 +54,17 @@ def initialize(self): if isinstance(self.voList, str): self.voList = [self.voList] + # download shifter proxies for enabled VOs: + self.proxyDict = {} - return S_OK() - - def execute(self): - """ - Execute one agent cycle. Upload log files to the SE and register them in the DFC. - Use a shifter proxy dynamically loaded for every VO - - :param self: self reference - """ - voRes = {} for vo in self.voList: - self.opsHelper = Operations(vo=vo) + opsHelper = Operations(vo=vo) # is remote pilot logging enabled for the VO ? - pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False) + pilotLogging = opsHelper.getValue("/Pilot/RemoteLogging", False) if pilotLogging: - res = self.opsHelper.getOptionsDict("Shifter/DataManager") + res = opsHelper.getOptionsDict("Shifter/DataManager") if not res["OK"]: - voRes[vo] = "No shifter defined - skipped" + # voRes[vo] = "No shifter defined - skipped" self.log.error(f"No shifter defined for VO: {vo} - skipping ...") continue @@ -80,36 +74,73 @@ def execute(self): self.log.error( f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}" ) - voRes[vo] = "No proxy user or group defined - skipped" continue self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") - res = self.executeForVO( # pylint: disable=unexpected-keyword-arg - vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup - ) - if not res["OK"]: - voRes[vo] = res["Message"] + # download a proxy and save a filename for future use: + result = getDNForUsername(proxyUser) + if not result["OK"]: + self.log.error(f"Could not obtain a DN of user {proxyUser} for VO {vo}, skipped") + continue + userDNs = result["Value"] # a same user may have more than one DN + fd, filename = tempfile.mkstemp(prefix=vo + "__") + os.close(fd) + vomsAttr = getVOMSAttributeForGroup(proxyGroup) + result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename) + + if not result["OK"]: + self.log.error( + f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped" + ) + return result + self.proxyDict[vo] = result["Value"] + + return S_OK() + + def execute(self): + """ + Execute one agent cycle. Upload log files to the SE and register them in the DFC. + Consider only VOs we have proxies for. + + :param self: self reference + """ + voRes = {} + self.log.verbose(f"VOs configured for remote logging: {list(self.proxyDict.keys())}") + originalUserProxy = os.environ.get("X509_USER_PROXY") + for vo, proxy in self.proxyDict.items(): + os.environ["X509_USER_PROXY"] = proxy + res = self.executeForVO(vo) + if not res["OK"]: + voRes[vo] = res["Message"] + # restore the original proxy: + if originalUserProxy: + os.environ["X509_USER_PROXY"] = originalUserProxy + else: + os.environ.pop("X509_USER_PROXY", None) + if voRes: for key, value in voRes.items(): self.log.error(f"Error for {key} vo; message: {value}") voRes.update(S_ERROR("Agent cycle for some VO finished with errors")) return voRes + return S_OK() - @executeWithUserProxy + @executeWithoutServerCertificate def executeForVO(self, vo): """ Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS: UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path). UploadSE - Storage element where the logs will be kept. - :param str vo: vo enabled for remote pilot logging + :param str vo: vo enabled for remote pilot logging (and a successfully downloaded proxy for the VO) :return: S_OK or S_ERROR :rtype: dict """ self.log.info(f"Pilot files upload cycle started for VO: {vo}") - res = self.opsHelper.getOptionsDict("Pilot") + opsHelper = Operations(vo=vo) + res = opsHelper.getOptionsDict("Pilot") if not res["OK"]: return S_ERROR(f"No pilot section for {vo} vo") pilotOptions = res["Value"] From 27fef210536d73dbe22b5d482d7e5190bea59dd8 Mon Sep 17 00:00:00 2001 From: martynia Date: Wed, 25 Oct 2023 13:33:57 +0200 Subject: [PATCH 2/3] fix: modify tests for PilotLoggingAgent --- .../Agent/PilotLoggingAgent.py | 6 +- .../test/Test_Agent_PilotLoggingAgent.py | 80 ++++++++++++------- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py index fbf2ae36d97..e25f40c47d6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -46,7 +46,7 @@ def initialize(self): """ # pilot logs lifetime in days self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay) - # configured VOs and setup + # configured VOs res = getVOs() if not res["OK"]: return res @@ -64,7 +64,6 @@ def initialize(self): if pilotLogging: res = opsHelper.getOptionsDict("Shifter/DataManager") if not res["OK"]: - # voRes[vo] = "No shifter defined - skipped" self.log.error(f"No shifter defined for VO: {vo} - skipping ...") continue @@ -84,6 +83,7 @@ def initialize(self): continue userDNs = result["Value"] # a same user may have more than one DN fd, filename = tempfile.mkstemp(prefix=vo + "__") + print("filename", filename) os.close(fd) vomsAttr = getVOMSAttributeForGroup(proxyGroup) result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename) @@ -92,7 +92,7 @@ def initialize(self): self.log.error( f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped" ) - return result + continue self.proxyDict[vo] = result["Value"] return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py index 2e7939e48b6..82760e70d7b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py @@ -17,6 +17,9 @@ # Mock Objects mockReply = MagicMock() mockReply1 = MagicMock() +mockGetDNForUsername = MagicMock() +mockGetVomsAttr = MagicMock() +mockGetProxy = MagicMock() mockOperations = MagicMock() mockTornadoClient = MagicMock() mockDataManager = MagicMock() @@ -48,19 +51,16 @@ def plaBase(mocker): mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1 ) + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getDNForUsername", side_effect=mockGetDNForUsername + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getProxy", side_effect=mockGetProxy) pla = PilotLoggingAgent() pla.log = gLogger pla._AgentModule__configDefaults = mockAM return pla -@pytest.fixture -def pla_initialised(mocker, plaBase): - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO") - plaBase.initialize() - return plaBase - - @pytest.fixture def pla(mocker, plaBase): mocker.patch( @@ -72,42 +72,60 @@ def pla(mocker, plaBase): "DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager", side_effect=mockDataManager, ) - plaBase.initialize() return plaBase -def test_initialize(plaBase): +@pytest.mark.parametrize( + "remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes", + [ + ([True, False], upDict, S_OK(["myDN"]), S_OK(), S_OK("proxyfilename"), {"gridpp": "proxyfilename"}, S_OK()), + ([False, False], upDict, S_OK(["myDN"]), S_OK(), S_OK(), {}, S_OK()), + ([True, False], upDict, S_ERROR("Could not obtain a DN"), S_OK(), S_OK(), {}, S_OK()), + ([True, False], upDict, S_ERROR("Could not download proxy"), S_OK(), S_ERROR("Failure"), {}, S_OK()), + ], +) +def test_initialize(plaBase, remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes): + """ + After a successful initialisation the proxyDict should contain proxy filenames key by a VO name. + test loops: gridpp enabled, lz disabled, proxy obtained, result proxyDict contains a proxy filename. + both VOs disabled => proxyDict empty + gridpp enabled, by getDNForUsername fails => proxyDict empty + gridpp enabled, lz disabled, getProxy fails => proxyDict empty + + """ + mockReply.side_effect = remoteLogging # Operations.getValue("/Pilot/RemoteLogging", False) + mockReply1.return_value = options # Operations.getOptionsDict("Shifter/DataManager") + mockGetDNForUsername.return_value = getDN + mockGetVomsAttr.return_value = getVOMS + mockGetProxy.return_value = getProxy + res = plaBase.initialize() + assert plaBase.voList == plaModule.getVOs()["Value"] - assert res == S_OK() + assert resDict == plaBase.proxyDict + assert res == expectedRes @pytest.mark.parametrize( - "mockReplyInput, expected, expectedExecOut, expected2", + "proxyDict, execVORes, expectedResult", [ - ("/Pilot/RemoteLogging", [True, False], S_OK(), upDict), - ("/Pilot/RemoteLogging", [False, False], S_OK(), upDict), - ("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict), + ({}, S_OK(), S_OK()), + ({"gridpp": "gridpp_proxyfile"}, S_OK(), S_OK()), + ( + {"gridpp": "gridpp_proxyfile"}, + S_ERROR("Execute for VO failed"), + S_ERROR("Agent cycle for some VO finished with errors"), + ), ], ) -def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2): +def test_execute(plaBase, proxyDict, execVORes, expectedResult): """Testing a thin version of execute (executeForVO is mocked)""" - assert pla_initialised.voList == plaModule.getVOs()["Value"] - mockReply.side_effect = expected - mockReply1.return_value = expected2 - # remote pilot logging on (gridpp only) and off. - pla_initialised.executeForVO.return_value = expectedExecOut - res = pla_initialised.execute() - if not any(expected): - pla_initialised.executeForVO.assert_not_called() - else: - assert pla_initialised.executeForVO.called - pla_initialised.executeForVO.assert_called_with( - "gridpp", - proxyUserName=upDict["Value"]["User"], - proxyUserGroup=upDict["Value"]["Group"], - ) - assert res["OK"] == expectedExecOut["OK"] + + plaBase.executeForVO = MagicMock() + plaBase.proxyDict = proxyDict + plaBase.executeForVO.return_value = execVORes + res = plaBase.execute() + assert res["OK"] == expectedResult["OK"] @pytest.mark.parametrize( From 7e094f973dd5a8bea42fc6b0cc863d2e826d0725 Mon Sep 17 00:00:00 2001 From: martynia Date: Thu, 26 Oct 2023 09:37:47 +0200 Subject: [PATCH 3/3] fix: renew a proxy when it is about to expire --- .../Agent/PilotLoggingAgent.py | 71 +++++++++++++++---- .../test/Test_Agent_PilotLoggingAgent.py | 16 ++++- 2 files changed, 70 insertions(+), 17 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py index e25f40c47d6..44708e9ab44 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py @@ -17,6 +17,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Security.ProxyInfo import getProxyInfo from DIRAC.Core.Utilities.Proxy import executeWithoutServerCertificate from DIRAC.Core.Utilities.Proxy import getProxy from DIRAC.DataManagementSystem.Client.DataManager import DataManager @@ -34,7 +35,8 @@ class PilotLoggingAgent(AgentModule): def __init__(self, *args, **kwargs): """c'tor""" super().__init__(*args, **kwargs) - self.clearPilotsDelay = 30 + self.clearPilotsDelay = 30 # in days + self.proxyTimeleftLimit = 600 # in seconds def initialize(self): """ @@ -46,6 +48,8 @@ def initialize(self): """ # pilot logs lifetime in days self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay) + # proxy timeleft limit before we get a new one. + self.proxyTimeleftLimit = self.am_getOption("ProxyTimeleftLimit", self.proxyTimeleftLimit) # configured VOs res = getVOs() if not res["OK"]: @@ -76,24 +80,20 @@ def initialize(self): continue self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}") - # download a proxy and save a filename for future use: + # download a proxy and save a file name, userDN and proxyGroup for future use: result = getDNForUsername(proxyUser) if not result["OK"]: self.log.error(f"Could not obtain a DN of user {proxyUser} for VO {vo}, skipped") continue - userDNs = result["Value"] # a same user may have more than one DN - fd, filename = tempfile.mkstemp(prefix=vo + "__") - print("filename", filename) - os.close(fd) - vomsAttr = getVOMSAttributeForGroup(proxyGroup) - result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename) + userDNs = result["Value"] # the same user may have more than one DN + + with tempfile.NamedTemporaryFile(prefix="gridpp" + "__", delete=False) as ntf: + result = self._downloadProxy(vo, userDNs, proxyGroup, ntf.name) if not result["OK"]: - self.log.error( - f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped" - ) + # no proxy, we have no other option than to skip the VO continue - self.proxyDict[vo] = result["Value"] + self.proxyDict[vo] = {"proxy": result["Value"], "DN": userDNs, "group": proxyGroup} return S_OK() @@ -107,8 +107,13 @@ def execute(self): voRes = {} self.log.verbose(f"VOs configured for remote logging: {list(self.proxyDict.keys())}") originalUserProxy = os.environ.get("X509_USER_PROXY") - for vo, proxy in self.proxyDict.items(): - os.environ["X509_USER_PROXY"] = proxy + for vo, elem in self.proxyDict.items(): + if self._isProxyExpired(elem["proxy"], self.proxyTimeleftLimit): + result = self._downloadProxy(vo, elem["DN"], elem["group"], elem["proxy"]) + if not result["OK"]: + voRes[vo] = result["Message"] + continue + os.environ["X509_USER_PROXY"] = elem["proxy"] res = self.executeForVO(vo) if not res["OK"]: voRes[vo] = res["Message"] @@ -215,3 +220,41 @@ def clearOldPilotLogs(self, pilotLogPath): os.remove(fullpath) except Exception as excp: self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp) + + def _downloadProxy(self, vo, userDNs, proxyGroup, filename): + """ + Fetch a new proxy and store it in a file filename. + + :param str vo: VO to get a proxy for + :param list userDNs: user DN list + :param str proxyGroup: user group + :param str filename: file name to store a proxy + :return: Dirac S_OK or S_ERROR object + :rtype: dict + """ + vomsAttr = getVOMSAttributeForGroup(proxyGroup) + result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename) + if not result["OK"]: + self.log.error(f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped") + return S_ERROR(f"Could not download a proxy, {vo} skipped") + return result + + def _isProxyExpired(self, proxyfile, limit): + """ + Check proxy timeleft. If less than a limit, return True. + + :param str proxyfile: + :param int limit: timeleft threshold below which a proxy is considered expired. + :return: True or False + :rtype: bool + """ + result = getProxyInfo(proxyfile) + if not result["OK"]: + self.log.error(f"Could not get proxy info {result['Message']}") + return True + timeleft = result["Value"]["secondsLeft"] + self.log.debug(f"Proxy {proxyfile} time left: {timeleft}") + if timeleft < limit: + self.log.info(f"proxy {proxyfile} expired/is about to expire. Will fetch a new one") + return True + return False diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py index 82760e70d7b..8bf566199ac 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py @@ -78,7 +78,15 @@ def pla(mocker, plaBase): @pytest.mark.parametrize( "remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes", [ - ([True, False], upDict, S_OK(["myDN"]), S_OK(), S_OK("proxyfilename"), {"gridpp": "proxyfilename"}, S_OK()), + ( + [True, False], + upDict, + S_OK(["myDN"]), + S_OK(), + S_OK("proxyfilename"), + {"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, + S_OK(), + ), ([False, False], upDict, S_OK(["myDN"]), S_OK(), S_OK(), {}, S_OK()), ([True, False], upDict, S_ERROR("Could not obtain a DN"), S_OK(), S_OK(), {}, S_OK()), ([True, False], upDict, S_ERROR("Could not download proxy"), S_OK(), S_ERROR("Failure"), {}, S_OK()), @@ -110,9 +118,9 @@ def test_initialize(plaBase, remoteLogging, options, getDN, getVOMS, getProxy, r "proxyDict, execVORes, expectedResult", [ ({}, S_OK(), S_OK()), - ({"gridpp": "gridpp_proxyfile"}, S_OK(), S_OK()), + ({"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, S_OK(), S_OK()), ( - {"gridpp": "gridpp_proxyfile"}, + {"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, S_ERROR("Execute for VO failed"), S_ERROR("Agent cycle for some VO finished with errors"), ), @@ -122,6 +130,8 @@ def test_execute(plaBase, proxyDict, execVORes, expectedResult): """Testing a thin version of execute (executeForVO is mocked)""" plaBase.executeForVO = MagicMock() + plaBase._isProxyExpired = MagicMock() + plaBase._isProxyExpired.return_value = False plaBase.proxyDict = proxyDict plaBase.executeForVO.return_value = execVORes res = plaBase.execute()