From b48b7599800c1dcbbe9e08710af2228aa9ebb23a Mon Sep 17 00:00:00 2001 From: aldbr Date: Mon, 31 Jul 2023 16:27:15 +0200 Subject: [PATCH] feat: move submission policies to WMS Utilities --- .../Computing/AREXComputingElement.py | 5 +- .../Computing/SSHComputingElement.py | 5 +- .../Agent/PushJobAgent.py | 2 - .../Agent/SiteDirector.py | 72 +------------------ .../DB/PilotAgentsDB.sql | 1 - .../Utilities/QueueUtilities.py | 2 +- .../Utilities/SubmissionPolicy.py | 63 ++++++++++++++++ 7 files changed, 71 insertions(+), 79 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index 9d1365b309c..ccadb75a276 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -88,13 +88,12 @@ def _reset(self): ############################################################################# - def setToken(self, token, valid): + def setToken(self, token): """Set the token and update the headers :param token: OAuth2Token object or dictionary containing token structure - :param int valid: validity period in seconds """ - super().setToken(token, valid) + super().setToken(token) self.headers["Authorization"] = "Bearer " + self.token["access_token"] def _arcToDiracID(self, arcJobID): diff --git a/src/DIRAC/Resources/Computing/SSHComputingElement.py b/src/DIRAC/Resources/Computing/SSHComputingElement.py index 605851fbfdf..c7ab66d7bd6 100644 --- a/src/DIRAC/Resources/Computing/SSHComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHComputingElement.py @@ -307,15 +307,14 @@ def __init__(self, ceUniqueID): self.errorTemplate = "" ############################################################################ - def setProxy(self, proxy, valid=0): + def setProxy(self, proxy): """ Set and prepare proxy to use :param str proxy: proxy to use - :param int valid: proxy validity period :return: S_OK/S_ERROR """ - ComputingElement.setProxy(self, proxy, valid) + ComputingElement.setProxy(self, proxy) if self.ceParameters.get("SSHType", "ssh") == "gsissh": result = self._prepareProxy() if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 89ce0a1dc9d..f6bba0c77d2 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -324,8 +324,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes): result = getQueuesResolved( siteDict=result["Value"], queueCECache=self.queueCECache, - gridEnv=getGridEnv(), - setup=gConfig.getValue("/DIRAC/Setup", "unknown"), instantiateCEs=True, ) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 56ad5d7092b..ada1181aa78 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -10,7 +10,6 @@ import datetime import os import socket -from abc import ABC, abstractmethod from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed @@ -33,7 +32,6 @@ from DIRAC.Resources.Computing.ComputingElement import ComputingElement from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES -from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import ( @@ -46,7 +44,7 @@ MAX_PILOTS_TO_SUBMIT = 100 # Submission policies -AGGRESSIVE_FILLING = "AgressingFilling" +AGGRESSIVE_FILLING = "AggressiveFilling" WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] @@ -77,7 +75,6 @@ def __init__(self, *args, **kwargs): self.sendSubmissionMonitoring = False self.localhost = socket.getfqdn() - self.matcherClient = None self.siteClient = None self.rssClient = None self.pilotAgentsDB = None @@ -85,7 +82,7 @@ def __init__(self, *args, **kwargs): # self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again self.failedQueueCycleFactor = 10 - self.submissionPolicyName = AGGRESSIVE_FILLING + self.submissionPolicyName = WAITING_SUPPORTED_JOBS self.submissionPolicy = None self.workingDirectory = None @@ -107,7 +104,6 @@ def initialize(self): # Get the clients self.siteClient = SiteStatus() self.rssClient = ResourceStatus() - self.matcherClient = MatcherClient() self.pilotAgentsDB = getPilotAgentsDB() return S_OK() @@ -217,7 +213,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes, tags): # Set up the queue dictionary result = getQueuesResolved( siteDict=result["Value"], - vo=self.vo, queueCECache=self.queueCECache, instantiateCEs=True, ) @@ -695,7 +690,7 @@ def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExe ##################################################################################### - def monitorPilotStatus(self): + def monitorPilots(self): """Update status of pilots in transient and final states""" self.log.verbose("Monitoring: Queues treated are", ",".join(self.queueDict)) @@ -1040,64 +1035,3 @@ def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, return S_ERROR() self.log.verbose("Done committing to monitoring") return S_OK() - - -class SubmissionPolicy(ABC): - """Abstract class to define a submission strategy.""" - - @abstractmethod - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: - """Method to redefine in the concrete subclasses - - :param availableSlots: slots available for new pilots - :param queueName: the name of the targeted queue - :param queueInfo: a dictionary of attributes related to the queue - :param vo: VO - """ - pass - - -class AgressiveFillingPolicy(SubmissionPolicy): - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: - """All the available slots should be filled up. - Should be employed for sites that are always processing jobs. - - * Pros: would quickly fill up a queue - * Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs - """ - return availableSlots - - -class WaitingSupportedJobsPolicy(SubmissionPolicy): - def __init__(self) -> None: - super().__init__() - self.matcherClient = MatcherClient() - - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: - """Fill up available slots only if waiting supported jobs exist. - Should be employed for sites that are rarely used (targetting specific Task Queues). - - * Pros: submit pilots only if necessary, and quickly fill up the queue if needed - * Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue - """ - # Prepare CE dictionary from the queue info - ce = queueInfo["CE"] - ceDict = ce.ceParameters - ceDict["GridCE"] = queueInfo["CEName"] - if vo: - ceDict["Community"] = vo - - # Get Task Queues related to the CE - result = self.matcherClient.getMatchingTaskQueues(ceDict) - if not result["OK"]: - self.log.error("Could not retrieve TaskQueues from TaskQueueDB", result["Message"]) - return 0 - taskQueueDict = result["Value"] - - # Get the number of jobs that would match the capability of the CE - waitingSupportedJobs = 0 - for tq in taskQueueDict.values(): - waitingSupportedJobs += tq["Jobs"] - - # Return the minimum value between the number of slots available and supported jobs - return min(availableSlots, waitingSupportedJobs) diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql index 418ad4877d2..d3a244972af 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql @@ -29,7 +29,6 @@ CREATE TABLE `PilotAgents` ( `PilotID` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, `InitialJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, `CurrentJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, - `TaskQueueID` INT(11) UNSIGNED NOT NULL DEFAULT 0, `PilotJobReference` VARCHAR(255) NOT NULL DEFAULT 'Unknown', `PilotStamp` VARCHAR(32) NOT NULL DEFAULT '', `DestinationSite` VARCHAR(128) NOT NULL DEFAULT 'NotAssigned', diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py index f08f63f2049..e508fd0aaca 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py @@ -11,7 +11,7 @@ from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory -def getQueuesResolved(siteDict, vo, queueCECache, checkPlatform=False, instantiateCEs=False): +def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False): """Get the list of relevant CEs (what is in siteDict) and their descriptions. The main goal of this method is to return a dictionary of queues """ diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py new file mode 100644 index 00000000000..29450ffa67a --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py @@ -0,0 +1,63 @@ +from abc import ABC, abstractmethod + +from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient + + +class SubmissionPolicy(ABC): + """Abstract class to define a submission strategy.""" + + @abstractmethod + def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + """Method to redefine in the concrete subclasses + + :param availableSlots: slots available for new pilots + :param queueName: the name of the targeted queue + :param queueInfo: a dictionary of attributes related to the queue + :param vo: VO + """ + pass + + +class AgressiveFillingPolicy(SubmissionPolicy): + def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + """All the available slots should be filled up. + Should be employed for sites that are always processing jobs. + + * Pros: would quickly fill up a queue + * Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs + """ + return availableSlots + + +class WaitingSupportedJobsPolicy(SubmissionPolicy): + def __init__(self) -> None: + super().__init__() + self.matcherClient = MatcherClient() + + def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + """Fill up available slots only if waiting supported jobs exist. + Should be employed for sites that are used from time to time (targeting specific Task Queues). + + * Pros: submit pilots only if necessary, and quickly fill up the queue if needed + * Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue + """ + # Prepare CE dictionary from the queue info + ce = queueInfo["CE"] + ceDict = ce.ceParameters + ceDict["GridCE"] = queueInfo["CEName"] + if vo: + ceDict["Community"] = vo + + # Get Task Queues related to the CE + result = self.matcherClient.getMatchingTaskQueues(ceDict) + if not result["OK"]: + return 0 + taskQueueDict = result["Value"] + + # Get the number of jobs that would match the capability of the CE + waitingSupportedJobs = 0 + for tq in taskQueueDict.values(): + waitingSupportedJobs += tq["Jobs"] + + # Return the minimum value between the number of slots available and supported jobs + return min(availableSlots, waitingSupportedJobs)