diff --git a/src/DIRAC/Resources/Computing/ComputingElement.py b/src/DIRAC/Resources/Computing/ComputingElement.py index ccbcff77f98..b8d07072abb 100755 --- a/src/DIRAC/Resources/Computing/ComputingElement.py +++ b/src/DIRAC/Resources/Computing/ComputingElement.py @@ -82,7 +82,6 @@ def __init__(self, ceName): self.minProxyTime = gConfig.getValue("/Registry/MinProxyLifeTime", 10800) # secs self.defaultProxyTime = gConfig.getValue("/Registry/DefaultProxyLifeTime", 43200) # secs self.proxyCheckPeriod = gConfig.getValue("/Registry/ProxyCheckingPeriod", 3600) # secs - self.valid = None self.batchSystem = None self.taskResults = {} @@ -97,14 +96,12 @@ def __init__(self, ceName): self.initializeParameters() self.log.debug("CE parameters", self.ceParameters) - def setProxy(self, proxy, valid=0): + def setProxy(self, proxy): """Set proxy for this instance""" self.proxy = proxy - self.valid = datetime.datetime.utcnow() + second * valid - def setToken(self, token, valid=0): + def setToken(self, token): self.token = token - self.valid = datetime.datetime.utcnow() + second * valid def _prepareProxy(self): """Set the environment variable X509_USER_PROXY""" @@ -117,21 +114,6 @@ def _prepareProxy(self): self.log.debug(f"Set proxy variable X509_USER_PROXY to {os.environ['X509_USER_PROXY']}") return S_OK() - def isProxyValid(self, valid=1000): - """Check if the stored proxy is valid""" - if not self.valid: - result = S_ERROR("Proxy is not valid for the requested length") - result["Value"] = 0 - return result - delta = self.valid - datetime.datetime.utcnow() - totalSeconds = delta.days * 86400 + delta.seconds - if totalSeconds > valid: - return S_OK(totalSeconds - valid) - - result = S_ERROR("Proxy is not valid for the requested length") - result["Value"] = totalSeconds - valid - return result - def initializeParameters(self): """Initialize the CE parameters after they are collected from various sources""" @@ -259,71 +241,49 @@ def setCPUTimeLeft(self, cpuTimeLeft=None): return S_ERROR("Wrong type for setCPUTimeLeft argument") ############################################################################# - def available(self, jobIDList=None): + def available(self): """This method returns the number of available slots in the target CE. The CE instance polls for waiting and running jobs and compares to the limits in the CE parameters. - - :param list jobIDList: list of already existing job IDs to be checked against """ + result = self.getCEStatus() + if not result["OK"]: + return result - # If there are no already registered jobs - if jobIDList is not None and not jobIDList: - result = S_OK() - result["RunningJobs"] = 0 - result["WaitingJobs"] = 0 - result["SubmittedJobs"] = 0 - else: - result = self.getCEStatus() - if not result["OK"]: - return result runningJobs = result["RunningJobs"] waitingJobs = result["WaitingJobs"] - submittedJobs = result["SubmittedJobs"] availableProcessors = result.get("AvailableProcessors") ceInfoDict = dict(result) maxTotalJobs = int(self.ceParameters.get("MaxTotalJobs", 0)) ceInfoDict["MaxTotalJobs"] = maxTotalJobs - waitingToRunningRatio = float(self.ceParameters.get("WaitingToRunningRatio", 0.0)) - # if there are no Running job we can submit to get at most 'MaxWaitingJobs' - # if there are Running jobs we can increase this to get a ratio W / R 'WaitingToRunningRatio' - maxWaitingJobs = int(max(int(self.ceParameters.get("MaxWaitingJobs", 0)), runningJobs * waitingToRunningRatio)) + maxWaitingJobs = int(self.ceParameters.get("MaxWaitingJobs", 0)) + ceInfoDict["MaxWaitingJobs"] = maxWaitingJobs self.log.verbose("Max Number of Jobs:", maxTotalJobs) - self.log.verbose("Max W/R Ratio:", waitingToRunningRatio) self.log.verbose("Max Waiting Jobs:", maxWaitingJobs) - # Determine how many more jobs can be submitted - message = f"{self.ceName} CE: SubmittedJobs={submittedJobs}" - message += f", WaitingJobs={waitingJobs}, RunningJobs={runningJobs}" + # If we reached the maximum number of total jobs, then the CE is not available totalJobs = runningJobs + waitingJobs - - message += f", MaxTotalJobs={maxTotalJobs}" - if totalJobs >= maxTotalJobs: - self.log.verbose("Max Number of Jobs reached:", maxTotalJobs) + self.log.verbose("Max Number of Jobs reached:", f"{totalJobs} >= {maxTotalJobs}") result["Value"] = 0 - message = "There are {} waiting jobs and total jobs {} >= {} max total jobs".format( - waitingJobs, - totalJobs, - maxTotalJobs, - ) - else: - additionalJobs = 0 - if waitingJobs < maxWaitingJobs: - additionalJobs = maxWaitingJobs - waitingJobs - if totalJobs + additionalJobs >= maxTotalJobs: - additionalJobs = maxTotalJobs - totalJobs - # For SSH CE case - if int(self.ceParameters.get("MaxWaitingJobs", 0)) == 0: - additionalJobs = maxTotalJobs - runningJobs - - if availableProcessors is not None: - additionalJobs = min(additionalJobs, availableProcessors) - result["Value"] = additionalJobs - - result["Message"] = message + return result + + # If we reached the maximum number of waiting jobs, then the CE is not available + if waitingJobs >= maxWaitingJobs: + self.log.verbose("Max Number of waiting jobs reached:", f"{waitingJobs} >= {maxWaitingJobs}") + result["Value"] = 0 + return result + + additionalJobs = maxWaitingJobs - waitingJobs + if totalJobs + additionalJobs >= maxTotalJobs: + additionalJobs = maxTotalJobs - totalJobs + + if availableProcessors is not None: + additionalJobs = min(additionalJobs, availableProcessors) + result["Value"] = additionalJobs + result["CEInfoDict"] = ceInfoDict return result diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 4eb046554d5..517a30828a9 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -9,9 +9,7 @@ """ import datetime import os -import random import socket -import sys from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed @@ -22,7 +20,7 @@ from DIRAC.AccountingSystem.Client.Types.PilotSubmission import PilotSubmission as PilotSubmissionAccounting from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals, Registry from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getQueues from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.TimeUtilities import second, toEpochMilliSeconds @@ -31,13 +29,12 @@ from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter from DIRAC.ResourceStatusSystem.Client.ResourceStatus import ResourceStatus from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus +from DIRAC.Resources.Computing.ComputingElement import ComputingElement from DIRAC.WorkloadManagementSystem.Client import PilotStatus from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES - from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials -from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import ( _writePilotWrapperFile, getPilotFilesCompressedEncodedDict, @@ -58,72 +55,37 @@ def __init__(self, *args, **kwargs): """c'tor""" super().__init__(*args, **kwargs) - # on-the fly imports - ol = ObjectLoader() - res = ol.loadModule("ConfigurationSystem.Client.Helpers.Resources") - if not res["OK"]: - sys.exit(res["Message"]) - self.resourcesModule = res["Value"] - self.queueDict = {} # self.queueCECache aims at saving CEs information over the cycles to avoid to create the exact same CEs each cycle self.queueCECache = {} - self.queueSlots = {} self.failedQueues = defaultdict(int) - # failedPilotOutput stores the number of times the Site Director failed to get a given pilot output - self.failedPilotOutput = defaultdict(int) - self.firstPass = True self.maxPilotsToSubmit = MAX_PILOTS_TO_SUBMIT - self.gridEnv = "" self.vo = "" self.group = "" - # self.voGroups contain all the eligible user groups for pilots submitted by this SiteDirector - self.voGroups = [] self.pilotDN = "" self.pilotGroup = "" - self.platforms = [] - self.sites = [] - self.totalSubmittedPilots = 0 - - self.addPilotsToEmptySites = False - self.checkPlatform = False - self.updateStatus = True - self.getOutput = False + self.sendAccounting = True self.sendSubmissionAccounting = True self.sendSubmissionMonitoring = False + self.localhost = socket.getfqdn() + + self.matcherClient = None self.siteClient = None self.rssClient = None self.pilotAgentsDB = None self.rssFlag = None - self.globalParameters = {"NumberOfProcessors": 1, "MaxRAM": 2048} # self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again self.failedQueueCycleFactor = 10 - # Every N cycles, the status of the pilots are updated by the SiteDirector - self.pilotStatusUpdateCycleFactor = 10 - # Every N cycles, the number of slots available in the queues is updated - self.availableSlotsUpdateCycleFactor = 10 - self.maxQueueLength = 86400 * 3 - # Maximum number of times the Site Director is going to try to get a pilot output before stopping - self.maxRetryGetPilotOutput = 3 - self.pilotWaitingFlag = True + self.maxQueueLength = 86400 * 3 self.pilotLogLevel = "INFO" - self.matcherClient = None - self.siteMaskList = [] - self.ceMaskList = [] - - self.localhost = socket.getfqdn() def initialize(self): """Initial settings""" - self.gridEnv = self.am_getOption("GridEnv", "") - if not self.gridEnv: - self.gridEnv = getGridEnv() - # The SiteDirector is for a particular user community self.vo = self.am_getOption("VO", "") if not self.vo: @@ -133,20 +95,6 @@ def initialize(self): # The SiteDirector is for a particular user group self.group = self.am_getOption("Group", "") - # Choose the group for which pilots will be submitted. This is a hack until - # we will be able to match pilots to VOs. - if not self.group: - if self.vo: - result = Registry.getGroupsForVO(self.vo) - if not result["OK"]: - return result - self.voGroups = [] - for group in result["Value"]: - if "NormalUser" in Registry.getPropertiesForGroup(group): - self.voGroups.append(group) - else: - self.voGroups = [self.group] - # Get the clients self.siteClient = SiteStatus() self.rssClient = ResourceStatus() @@ -155,6 +103,8 @@ def initialize(self): return S_OK() + ##################################################################################### + def beginExecution(self): """This is run at every cycle, as first thing. @@ -163,11 +113,8 @@ def beginExecution(self): 3. Get the site description dictionary 4. Get what to send in pilot wrapper """ - - self.rssFlag = self.rssClient.rssFlag - # Which credentials to use? - # are they specific to the SD? (if not, get the generic ones) + # Are they specific to the SD? (if not, get the generic ones) self.pilotDN = self.am_getOption("PilotDN", self.pilotDN) self.pilotGroup = self.am_getOption("PilotGroup", self.pilotGroup) result = findGenericPilotCredentials(vo=self.vo, pilotDN=self.pilotDN, pilotGroup=self.pilotGroup) @@ -180,22 +127,11 @@ def beginExecution(self): self.maxQueueLength = self.am_getOption("MaxQueueLength", self.maxQueueLength) self.pilotLogLevel = self.am_getOption("PilotLogLevel", self.pilotLogLevel) self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit) - self.pilotWaitingFlag = self.am_getOption("PilotWaitingFlag", self.pilotWaitingFlag) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) - self.pilotStatusUpdateCycleFactor = self.am_getOption( - "PilotStatusUpdateCycleFactor", self.pilotStatusUpdateCycleFactor - ) - self.availableSlotsUpdateCycleFactor = self.am_getOption( - "AvailableSlotsUpdateCycleFactor", self.availableSlotsUpdateCycleFactor - ) - self.maxRetryGetPilotOutput = self.am_getOption("MaxRetryGetPilotOutput", self.maxRetryGetPilotOutput) # Flags - self.addPilotsToEmptySites = self.am_getOption("AddPilotsToEmptySites", self.addPilotsToEmptySites) - self.checkPlatform = self.am_getOption("CheckPlatform", self.checkPlatform) - self.updateStatus = self.am_getOption("UpdatePilotStatus", self.updateStatus) - self.getOutput = self.am_getOption("GetPilotOutput", self.getOutput) self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting) + self.rssFlag = self.rssClient.rssFlag # Check whether to send to Monitoring or Accounting or both monitoringOption = Operations().getMonitoringBackends(monitoringType="PilotSubmissionMonitoring") @@ -203,82 +139,20 @@ def beginExecution(self): self.sendSubmissionMonitoring = True if "Accounting" in monitoringOption: self.sendSubmissionAccounting = True - # Get the site description dictionary - siteNames = None - siteNamesOption = self.am_getOption("Site", ["any"]) - if siteNamesOption and "any" not in [sn.lower() for sn in siteNamesOption]: - siteNames = siteNamesOption - - ceTypes = None - ceTypesOption = self.am_getOption("CETypes", ["any"]) - if ceTypesOption and "any" not in [ct.lower() for ct in ceTypesOption]: - ceTypes = ceTypesOption - - ces = None - cesOption = self.am_getOption("CEs", ["any"]) - if cesOption and "any" not in [ce.lower() for ce in cesOption]: - ces = cesOption + # Get the site description dictionary + siteNames = self.am_getOption("Site", []) + ceTypes = self.am_getOption("CETypes", []) + ces = self.am_getOption("CEs", []) tags = self.am_getOption("Tags", []) - if not tags: - tags = None + # Display options used self.log.always("VO:", self.vo) - if self.voGroups: - self.log.always("Group(s):", self.voGroups) self.log.always("Sites:", siteNames) self.log.always("CETypes:", ceTypes) self.log.always("CEs:", ces) self.log.always("PilotDN:", self.pilotDN) self.log.always("PilotGroup:", self.pilotGroup) - - result = self.resourcesModule.getQueues( - community=self.vo, siteList=siteNames, ceList=ces, ceTypeList=ceTypes, tags=tags - ) - if not result["OK"]: - return result - result = getQueuesResolved( - siteDict=result["Value"], - queueCECache=self.queueCECache, - gridEnv=self.gridEnv, - setup=gConfig.getValue("/DIRAC/Setup", "unknown"), - workingDir=self.workingDirectory, - checkPlatform=self.checkPlatform, - instantiateCEs=True, - ) - if not result["OK"]: - return result - - self.queueDict = result["Value"] - for __queueName, queueDict in self.queueDict.items(): - # Update self.sites - if queueDict["Site"] not in self.sites: - self.sites.append(queueDict["Site"]) - - # Update self.platforms, keeping entries unique and squashing lists - self.platforms = [] - if "Platform" in queueDict["ParametersDict"]: - platform = queueDict["ParametersDict"]["Platform"] - oldPlatforms = set(self.platforms) - if isinstance(platform, list): - oldPlatforms.update(set(platform)) - else: - oldPlatforms.add(platform) - self.platforms = list(oldPlatforms) - - # Update self.globalParameters - if "WholeNode" in queueDict["ParametersDict"]: - self.globalParameters["WholeNode"] = "True" - for parameter in ["MaxRAM", "NumberOfProcessors"]: - if parameter in queueDict["ParametersDict"]: - self.globalParameters[parameter] = max( - self.globalParameters[parameter], int(queueDict["ParametersDict"][parameter]) - ) - - if self.updateStatus: - self.log.always("Pilot status update requested") - if self.getOutput: - self.log.always("Pilot output retrieval requested") if self.sendAccounting: self.log.always("Pilot accounting sending requested") if self.sendSubmissionAccounting: @@ -288,33 +162,50 @@ def beginExecution(self): self.log.always("MaxPilotsToSubmit:", self.maxPilotsToSubmit) - if self.firstPass: - if self.queueDict: - self.log.always("Agent will serve queues:") - for queue in self.queueDict: - self.log.always( - f"Site: {self.queueDict[queue]['Site']}, CE: {self.queueDict[queue]['CEName']}, Queue: {queue}" - ) - self.firstPass = False + # Build the dictionary of queues that are going to be used: self.queueDict + result = self._buildQueueDict(siteNames, ceTypes, ces, tags) + if not result: + return result - return S_OK() + # Stop the execution if there is no usable queue + if not self.queueDict: + self.log.error("No usable queue, exiting the cycle") + return S_ERROR("No usable queue, exiting the cycle") - def execute(self): - """Main execution method (what is called at each agent cycle). + self.log.always("Agent will serve queues:") + for queue in self.queueDict: + self.log.always( + f"Site: {self.queueDict[queue]['Site']}, CE: {self.queueDict[queue]['CEName']}, Queue: {queue}" + ) - It basically just calls self.submitPilots() method - """ + return S_OK() - if not self.queueDict: - self.log.warn("No site defined, exiting the cycle") - return S_OK() + def _buildQueueDict(self, siteNames, ces, ceTypes, tags): + """Build the queueDict dictionary containing information about the queues that will be provisioned""" + # Get details about the resources + result = getQueues(community=self.vo, siteList=siteNames, ceList=ces, ceTypeList=ceTypes, tags=tags) + if not result["OK"]: + return result + + # Set up the queue dictionary + result = getQueuesResolved( + siteDict=result["Value"], + queueCECache=self.queueCECache, + workingDir=self.workingDirectory, + instantiateCEs=True, + ) + if not result["OK"]: + return result + self.queueDict = result["Value"] - # get list of usable sites within this cycle - result = self.siteClient.getUsableSites() + # Get list of usable sites within this cycle + result = self.siteClient.getUsableSites(siteNames) if not result["OK"]: return result - self.siteMaskList = result.get("Value", []) + siteMaskList = result.get("Value", []) + # Get list of usable CEs + ceMaskList = [] if self.rssFlag: ceNamesList = [queue["CEName"] for queue in self.queueDict.values()] result = self.rssClient.getElementStatus(ceNamesList, "ComputingElement", vO=self.vo) @@ -322,391 +213,182 @@ def execute(self): self.log.error("Can not get the status of computing elements: ", result["Message"]) return result # Try to get CEs which have been probed and those unprobed (vO='all'). - self.ceMaskList = [ + ceMaskList = [ ceName for ceName in result["Value"] if result["Value"][ceName]["all"] in ("Active", "Degraded") ] - self.log.debug("CE list with status Active or Degraded: ", self.ceMaskList) - - result = self.submitPilots() - if not result["OK"]: - self.log.error("Errors in the job submission: ", result["Message"]) - return result - - # Every N cycles we update the pilots status - cyclesDone = self.am_getModuleParam("cyclesDone") - if self.updateStatus and cyclesDone % self.pilotStatusUpdateCycleFactor == 0: - result = self.updatePilotStatus() - if not result["OK"]: - self.log.error("Errors in updating pilot status: ", result["Message"]) - return result - return S_OK() + # Filter the unusable queues + for queueName in list(self.queueDict.keys()): + site = self.queueDict[queueName]["Site"] + ce = self.queueDict[queueName]["CEName"] - def submitPilots(self): - """Go through defined computing elements and submit pilots if necessary and possible - - :return: S_OK/S_ERROR - """ - - # First, we check if we want to submit pilots at all, and also where - submit, anySite, jobSites, testSites = self._ifAndWhereToSubmit() - if not submit: - self.log.notice("Not submitting any pilots at this cycle") - return S_OK() - - # From here on we assume we are going to (try to) submit some pilots - self.log.debug("Going to try to submit some pilots") - - self.log.verbose("Queues treated", ",".join(self.queueDict)) - - self.totalSubmittedPilots = 0 - - queueDictItems = list(self.queueDict.items()) - random.shuffle(queueDictItems) - - for queueName, queueDictionary in queueDictItems: - # now submitting to the single queues - self.log.verbose("Evaluating queue", queueName) - - # are we going to submit pilots to this specific queue? - if not self._allowedToSubmit(queueName, anySite, jobSites, testSites): + # Check the status of the Site + if site in siteMaskList: continue - if "CPUTime" in queueDictionary["ParametersDict"]: - queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"]) - else: - self.log.warn("CPU time limit is not specified, skipping", f"queue {queueName}") - continue - if queueCPUTime > self.maxQueueLength: - queueCPUTime = self.maxQueueLength - - ce, ceDict = self._getCE(queueName) - - # additionalInfo is normally taskQueueDict - pilotsWeMayWantToSubmit, additionalInfo = self._getPilotsWeMayWantToSubmit(ceDict) - self.log.debug(f"{pilotsWeMayWantToSubmit} pilotsWeMayWantToSubmit are eligible for {queueName} queue") - if not pilotsWeMayWantToSubmit: - self.log.debug(f"...so skipping {queueName}") - continue - - # Get the number of already waiting pilots for the queue - totalWaitingPilots = 0 - manyWaitingPilotsFlag = False - if self.pilotWaitingFlag: - tqIDList = list(additionalInfo) - result = self.pilotAgentsDB.countPilots( - {"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None - ) - if not result["OK"]: - self.log.error("Failed to get Number of Waiting pilots", result["Message"]) - totalWaitingPilots = 0 - else: - totalWaitingPilots = result["Value"] - self.log.debug(f"Waiting Pilots: {totalWaitingPilots}") - if totalWaitingPilots >= pilotsWeMayWantToSubmit: - self.log.verbose("Possibly enough pilots already waiting", f"({totalWaitingPilots})") - manyWaitingPilotsFlag = True - if not self.addPilotsToEmptySites: - continue - - self.log.debug( - f"{totalWaitingPilots} waiting pilots for the total of {pilotsWeMayWantToSubmit} eligible pilots for {queueName}" - ) - - # Get the number of available slots on the target site/queue - totalSlots = self.getQueueSlots(queueName, manyWaitingPilotsFlag) - if totalSlots <= 0: - self.log.debug(f"{queueName}: No slots available") + # Check the status of the CE (only for RSS=Active) + if not self.rssFlag or (self.rssFlag and ce in ceMaskList): continue - if manyWaitingPilotsFlag: - # Throttle submission of extra pilots to empty sites - pilotsToSubmit = int(self.maxPilotsToSubmit / 10) + 1 - else: - pilotsToSubmit = max(0, min(totalSlots, pilotsWeMayWantToSubmit - totalWaitingPilots)) - self.log.info( - f"{queueName}: Slots={totalSlots}, TQ jobs(pilotsWeMayWantToSubmit)={pilotsWeMayWantToSubmit}, Pilots: waiting {totalWaitingPilots}, to submit={pilotsToSubmit}" - ) - - # Limit the number of pilots to submit to MAX_PILOTS_TO_SUBMIT - pilotsToSubmit = min(self.maxPilotsToSubmit, pilotsToSubmit) - - # Get the working proxy - cpuTime = queueCPUTime + 86400 - self.log.verbose("Getting pilot proxy", f"for {self.pilotDN}/{self.pilotGroup} {cpuTime} long") - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, self.pilotGroup, cpuTime) - if not result["OK"]: - return result - proxy = result["Value"] - # Check returned proxy lifetime - result = proxy.getRemainingSecs() # pylint: disable=no-member - if not result["OK"]: - return result - lifetime_secs = result["Value"] - ce.setProxy(proxy, lifetime_secs) - - # Get valid token if needed - if "Token" in ce.ceParameters.get("Tag", []): - result = self.__getPilotToken(audience=ce.audienceName) - if not result["OK"]: - return result - ce.setToken(result["Value"], 3500) - - # now really submitting - res = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) - if not res["OK"]: - self.log.info("Failed pilot submission", f"Queue: {queueName}") - else: - pilotList, stampDict = res["Value"] - - # updating the pilotAgentsDB... done by default but maybe not strictly necessary - self._addPilotTQReference(queueName, additionalInfo, pilotList, stampDict) - - # Summary after the cycle over queues - self.log.info("Total number of pilots submitted in this cycle", f"{self.totalSubmittedPilots}") + self.log.warn("Queue not considered because not usable:", queueName) + self.queueDict.pop(queueName) return S_OK() - def __getPilotToken(self, audience: str, scope: list[str] = None): - """Get the token corresponding to the pilot user identity - - :param audience: Token audience, targeting a single CE - :param scope: list of permissions needed to interact with a CE - :return: S_OK/S_ERROR, Token object as Value - """ - if not audience: - return S_ERROR("Audience is not defined") - - if not scope: - scope = PILOT_SCOPES - - return gTokenManager.getToken(userGroup=self.pilotGroup, requiredTimeLeft=600, scope=scope, audience=audience) - - def _ifAndWhereToSubmit(self): - """Return a tuple that says if and where to submit pilots: + ##################################################################################### - (submit, anySite, jobSites, testSites) - e.g. - (True, False, {'Site1', 'Site2'}, {'Test1', 'Test2'}) + def execute(self): + """Main execution method (what is called at each agent cycle). - VOs may want to replace this method with different strategies + It basically just submits pilots and gets their status """ - - tqDict = self._getTQDictForMatching() - if not tqDict: - return True, True, set(), set() - - # the tqDict used here is a very generic one, not specific to one CE/queue only - self.log.verbose("Checking overall TQ availability with requirements") - self.log.verbose(tqDict) - - # Check that there is some work at all - result = self.matcherClient.getMatchingTaskQueues(tqDict) + result = self.submitPilots() if not result["OK"]: - self.log.error("Matcher error: ", result["Message"]) - return False, True, set(), set() - matchingTQs = result["Value"] - if not matchingTQs: - self.log.notice("No Waiting jobs suitable for the director, so nothing to submit") - return False, True, set(), set() - - # If we are here there's some work to do, now let's see for where - jobSites = set() - testSites = set() - anySite = False - - for tqDescription in matchingTQs.values(): - siteList = tqDescription.get("Sites", []) - if siteList: - jobSites |= set(siteList) - else: - anySite = True - - if "JobTypes" in tqDescription: - if "Sites" in tqDescription: - for site in tqDescription["Sites"]: - if site.lower() != "any": - testSites.add(site) - - self.monitorJobsQueuesPilots(matchingTQs) - - return True, anySite, jobSites, testSites - - def monitorJobsQueuesPilots(self, matchingTQs): - """Just printout of jobs queues and pilots status in TQ""" - tqIDList = list(matchingTQs) - result = self.pilotAgentsDB.countPilots( - {"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None - ) - - totalWaitingJobs = 0 - for tqDescription in matchingTQs.values(): - totalWaitingJobs += tqDescription["Jobs"] + self.log.error("Errors in the pilot submission:", result["Message"]) + return result + result = self.monitorPilots() if not result["OK"]: - self.log.error("Can't count pilots", result["Message"]) - else: - self.log.info( - "Total jobs : number of task queues : number of waiting pilots", - f"{totalWaitingJobs} : {len(tqIDList)} : {result['Value']}", - ) - - def _getTQDictForMatching(self): - """Just construct a dictionary (tqDict) - that will be used to check with Matcher if there's anything to submit. - - If extensions want, they can replace partly or fully this method. - If it returns just an empty dict, the assuption is that we'll submit pilots no matters what. - - :returns dict: tqDict of task queue descriptions - """ - tqDict = {"Setup": CSGlobals.getSetup(), "CPUTime": 9999999} - if self.vo: - tqDict["Community"] = self.vo - if self.voGroups: - tqDict["OwnerGroup"] = self.voGroups - - if self.checkPlatform: - platforms = self._getPlatforms() - if platforms: - tqDict["Platform"] = platforms + self.log.error("Errors in pilot monitoring:", result["Message"]) + return result - tqDict["Site"] = self.sites + return S_OK() - # Get a union of all tags - tags = [] - for queue in self.queueDict: - tags += self.queueDict[queue]["ParametersDict"].get("Tag", []) - tqDict["Tag"] = list(set(tags)) + ##################################################################################### - # Add overall max values for all queues - tqDict.update(self.globalParameters) + def submitPilots(self): + """Go through defined computing elements and submit pilots if necessary and possible""" + # Getting the status of pilots in a queue implies the use of remote CEs and may lead to network latency + # Threads aim at overcoming such issues and thus 1 thread per queue is created to submit pilots + self.log.verbose("Submission: Queues treated are", ",".join(self.queueDict)) - return tqDict + errors = [] + with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor: + futures = [] + for queue in self.queueDict: + futures.append(executor.submit(self._submitPilotsPerQueue, queue)) - def _getPlatforms(self): - """Get the platforms used for TQ match - Here for extension purpose. + for future in as_completed(futures): + result = future.result() + if not result["OK"]: + errors.append(result["Message"]) - :return: list of platforms - """ - result = self.resourcesModule.getCompatiblePlatforms(self.platforms) - if not result["OK"]: - self.log.error( - "Issue getting compatible platforms, will skip check of platforms", - self.platforms + " : " + result["Message"], - ) - return result["Value"] + if errors: + self.log.error("The following errors occurred during the pilot submission operation", "\n".join(errors)) + return S_ERROR("Pilot submission: errors occurred") - def _allowedToSubmit(self, queue, anySite, jobSites, testSites): - """Check if we are allowed to submit to a certain queue + return S_OK() - :param str queue: the queue name - :param bool anySite: submitting anywhere? - :param set jobSites: set of job site names (only considered if anySite is False) - :param set testSites: set of test site names + def _submitPilotsPerQueue(self, queueName): + """Submit pilots within a given computing elements - :return: True/False + :return: S_OK/S_ERROR """ + queueDictionary = self.queueDict[queueName] - # Check if the queue failed previously - failedCount = self.failedQueues[queue] % self.failedQueueCycleFactor + # Are we allowed to submit pilots to this specific queue? + failedCount = self.failedQueues[queueName] % self.failedQueueCycleFactor if failedCount != 0: - self.log.warn("queue failed recently ==> number of cycles skipped", f"{queue} ==> {10 - failedCount}") - self.failedQueues[queue] += 1 - return False - - # Check the status of the site - if self.queueDict[queue]["Site"] not in self.siteMaskList and self.queueDict[queue]["Site"] not in testSites: - self.log.verbose( - "Queue skipped (site not in mask)", - f"{self.queueDict[queue]['QueueName']} ({self.queueDict[queue]['Site']})", + self.log.warn( + "Queue failed recently ==> number of cycles skipped", + f"{queueName} ==> {self.failedQueueCycleFactor - failedCount}", ) - return False + self.failedQueues[queueName] += 1 + return S_OK() - # Check that there are task queues waiting for this site - if not anySite and self.queueDict[queue]["Site"] not in jobSites: - self.log.verbose( - "Queue skipped: no workload expected", - f"{self.queueDict[queue]['CEName']} at {self.queueDict[queue]['Site']}", - ) - return False + # Adjust queueCPUTime: needed to generate the proxy + if "CPUTime" not in queueDictionary["ParametersDict"]: + self.log.error("CPU time limit is not specified, skipping", f"queue {queueName}") + return S_ERROR(f"CPU time limit is not specified, skipping queue {queueName}") - # Check the status of the CE (only for RSS=Active) - if self.rssFlag: - if self.queueDict[queue]["CEName"] not in self.ceMaskList: - self.log.verbose( - "Skipping computing element: resource not usable", - f"{self.queueDict[queue]['CEName']} at {self.queueDict[queue]['Site']}", - ) - return False + queueCPUTime = int(queueDictionary["ParametersDict"]["CPUTime"]) + if queueCPUTime > self.maxQueueLength: + queueCPUTime = self.maxQueueLength - # if we are here, it means that we are allowed to submit to the queue - return True + # Get CE instance + ce = self.queueDict[queueName]["CE"] - def _getCE(self, queue): - """Prepare the queue description to look for eligible jobs + # Get the number of available slots on the target site/queue + totalSlots = self._getQueueSlots(queueName) + if totalSlots <= 0: + self.log.verbose(f"{queueName}: No slot available") + return S_ERROR(f"{queueName}: No slot available") + self.log.info(f"{queueName}: to submit={totalSlots}") - :param str queue: queue name + # Limit the number of pilots to submit to self.maxPilotsToSubmit + pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots) - :return: ce (ComputingElement object), ceDict (dict) - """ + # Set credentials + cpuTime = queueCPUTime + 86400 + result = self._setCredentials(ce, cpuTime) + if not result["OK"]: + self.log.error("Failed to set credentials:", result["Message"]) + return result - ce = self.queueDict[queue]["CE"] - ceDict = ce.ceParameters - ceDict["GridCE"] = self.queueDict[queue]["CEName"] - - if self.queueDict[queue]["Site"] not in self.siteMaskList: - ceDict["JobType"] = "Test" - if self.vo: - ceDict["Community"] = self.vo - if self.voGroups: - ceDict["OwnerGroup"] = self.voGroups - - if self.checkPlatform: - platform = self.queueDict[queue]["ParametersDict"].get("Platform") - if not platform: - self.log.error(f"No platform set for CE {ce}, returning 'ANY'") - ceDict["Platform"] = "ANY" - return ce, ceDict - result = self.resourcesModule.getCompatiblePlatforms(platform) - if result["OK"]: - ceDict["Platform"] = result["Value"] - else: - self.log.error( - "Issue getting compatible platforms, returning 'ANY'", f"{self.platforms}: {result['Message']}" - ) - ceDict["Platform"] = "ANY" + # Now really submitting + result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) + if not result["OK"]: + self.log.info("Failed pilot submission", f"Queue: {queueName}") + return result + pilotList, stampDict = result["Value"] - return ce, ceDict + # updating the pilotAgentsDB... done by default but maybe not strictly necessary + result = self._addPilotReference(queueName, pilotList, stampDict) + if not result["OK"]: + return result - def _getPilotsWeMayWantToSubmit(self, ceDict): - """Returns the number of pilots that we may want to submit to the ce described in ceDict + # Summary after the cycle over queues + self.log.info("Total number of pilots submitted in this cycle", f"{len(pilotList)} to {queueName}") + return S_OK() - This implementation is based on the number of eligible WMS taskQueues for the target site/queue. - VOs are free to override this method and to provide a different implementation. + def _getQueueSlots(self, queue): + """Get the number of available slots in the queue""" + ce = self.queueDict[queue]["CE"] + ceName = self.queueDict[queue]["CEName"] + queueName = self.queueDict[queue]["QueueName"] - :param ceDict: dictionary describing CE - :type ceDict: dict + # First, try to get available slots from the CE + result = ce.available() + if result["OK"]: + ceInfoDict = result["CEInfoDict"] + self.log.info( + "CE queue report", + f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Max={ceInfoDict['MaxTotalJobs']}", + ) + return result["Value"] - :return: pilotsWeMayWantToSubmit (int), taskQueueDict (dict) - :rType: tuple - """ + # If we cannot get available slots from the CE, then we get them from the pilotAgentsDB + maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10)) + maxTotalJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxTotalJobs", 10)) - pilotsWeMayWantToSubmit = 0 + # Get the number of transient pilots + result = self.pilotAgentsDB.countPilots( + {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_TRANSIENT_STATES} + ) + if not result["OK"]: + self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") + self.failedQueues[queue] += 1 + return 0 + totalJobs = result["Value"] - result = self.matcherClient.getMatchingTaskQueues(ceDict) + # Get the number of waiting pilots + result = self.pilotAgentsDB.countPilots( + {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_WAITING_STATES} + ) if not result["OK"]: - self.log.error("Could not retrieve TaskQueues from TaskQueueDB", result["Message"]) - return 0, {} - taskQueueDict = result["Value"] - if not taskQueueDict: - self.log.verbose("No matching TQs found", f"for {ceDict}") + self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") + self.failedQueues[queue] += 1 + return 0 + waitingJobs = result["Value"] - for tq in taskQueueDict.values(): - pilotsWeMayWantToSubmit += tq["Jobs"] + runningJobs = totalJobs - waitingJobs + self.log.info( + "PilotAgentsDB report", + f"({ceName}_{queueName}): Wait={waitingJobs}, Run={runningJobs}, Max={maxTotalJobs}", + ) - return pilotsWeMayWantToSubmit, taskQueueDict + totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) + return totalSlots def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): """Method that really submits the pilots to the ComputingElements' queue @@ -726,30 +408,35 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): """ self.log.info("Going to submit pilots", f"(a maximum of {pilotsToSubmit} pilots to {queue} queue)") + # Get parameters to generate the pilot executable bundleProxy = self.queueDict[queue].get("BundleProxy", False) proxy = None if bundleProxy: proxy = ce.proxy - jobExecDir = self.queueDict[queue]["ParametersDict"].get("JobExecDir", "") envVariables = self.queueDict[queue]["ParametersDict"].get("EnvironmentVariables", None) - executable = self.getExecutable(queue, proxy=proxy, jobExecDir=jobExecDir, envVariables=envVariables) + # Generate the executable + executable = self._getExecutable(queue, proxy=proxy, jobExecDir=jobExecDir, envVariables=envVariables) + # Submit the job submitResult = ce.submitJob(executable, "", pilotsToSubmit) # In case the CE does not need the executable after the submission, we delete it # Else, we keep it, the CE will delete it after the end of the pilot execution if submitResult.get("ExecutableToKeep") != executable: os.unlink(executable) + siteName = self.queueDict[queue]["Site"] + ceName = self.queueDict[queue]["CEName"] + queueName = self.queueDict[queue]["QueueName"] if not submitResult["OK"]: self.log.error("Failed submission to queue", f"Queue {queue}:\n{submitResult['Message']}") if self.sendSubmissionAccounting: - result = self.sendPilotSubmissionAccounting( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionAccounting( + siteName, + ceName, + queueName, pilotsToSubmit, 0, "Failed", @@ -758,10 +445,10 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): self.log.error("Failure submitting Accounting report", result["Message"]) if self.sendSubmissionMonitoring: - result = self.sendPilotSubmissionMonitoring( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionMonitoring( + siteName, + ceName, + queueName, pilotsToSubmit, 0, "Failed", @@ -772,20 +459,18 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): self.failedQueues[queue] += 1 return submitResult - # Add pilots to the PilotAgentsDB: assign pilots to TaskQueue proportionally to the task queue priorities + # Add pilots to the PilotAgentsDB pilotList = submitResult["Value"] - self.queueSlots[queue]["AvailableSlots"] -= len(pilotList) - self.totalSubmittedPilots += len(pilotList) self.log.info( f"Submitted {len(pilotList)} pilots to {self.queueDict[queue]['QueueName']}@{self.queueDict[queue]['CEName']}" ) stampDict = submitResult.get("PilotStampDict", {}) if self.sendSubmissionAccounting: - result = self.sendPilotSubmissionAccounting( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionAccounting( + siteName, + ceName, + queueName, len(pilotList), len(pilotList), "Succeeded", @@ -794,10 +479,10 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): self.log.error("Failure submitting Accounting report", result["Message"]) if self.sendSubmissionMonitoring: - result = self.sendPilotSubmissionMonitoring( - self.queueDict[queue]["Site"], - self.queueDict[queue]["CEName"], - self.queueDict[queue]["QueueName"], + result = self._sendPilotSubmissionMonitoring( + siteName, + ceName, + queueName, len(pilotList), len(pilotList), "Succeeded", @@ -807,12 +492,10 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): return S_OK((pilotList, stampDict)) - def _addPilotTQReference(self, queue, taskQueueDict, pilotList, stampDict): - """Add to pilotAgentsDB the reference of for which TqID the pilots have been sent + def _addPilotReference(self, queue, pilotList, stampDict): + """Add pilotReference to pilotAgentsDB :param str queue: the queue name - :param taskQueueDict: dict of task queues - :type taskQueueDict: dict :param pilotList: list of pilots :type pilotList: list :param stampDict: dictionary of pilots timestamps @@ -820,132 +503,31 @@ def _addPilotTQReference(self, queue, taskQueueDict, pilotList, stampDict): :return: None """ + result = self.pilotAgentsDB.addPilotReference( + pilotList, + self.pilotGroup, + self.queueDict[queue]["CEType"], + stampDict, + ) + if not result["OK"]: + self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"]) + return result - tqPriorityList = [] - sumPriority = 0.0 - for tq in taskQueueDict: - sumPriority += taskQueueDict[tq]["Priority"] - tqPriorityList.append((tq, sumPriority)) - tqDict = {} - for pilotID in pilotList: - rndm = random.random() * sumPriority - for tq, prio in tqPriorityList: - if rndm < prio: - tqID = tq - break - if tqID not in tqDict: - tqDict[tqID] = [] - tqDict[tqID].append(pilotID) - - for tqID, pilotsList in tqDict.items(): - result = self.pilotAgentsDB.addPilotTQReference( - pilotsList, - tqID, - self.pilotGroup, - self.queueDict[queue]["CEType"], - stampDict, + for pilot in pilotList: + result = self.pilotAgentsDB.setPilotStatus( + pilot, + PilotStatus.SUBMITTED, + self.queueDict[queue]["CEName"], + "Successfully submitted by the SiteDirector", + self.queueDict[queue]["Site"], + self.queueDict[queue]["QueueName"], ) if not result["OK"]: - self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"]) - continue - for pilot in pilotsList: - result = self.pilotAgentsDB.setPilotStatus( - pilot, - PilotStatus.SUBMITTED, - self.queueDict[queue]["CEName"], - "Successfully submitted by the SiteDirector", - self.queueDict[queue]["Site"], - self.queueDict[queue]["QueueName"], - ) - if not result["OK"]: - self.log.error("Failed to set pilot status", result["Message"]) - continue - - def getQueueSlots(self, queue, manyWaitingPilotsFlag): - """Get the number of available slots in the queue""" - ce = self.queueDict[queue]["CE"] - ceName = self.queueDict[queue]["CEName"] - queueName = self.queueDict[queue]["QueueName"] - queryCEFlag = self.queueDict[queue]["QueryCEFlag"].lower() in ["1", "yes", "true"] - - self.queueSlots.setdefault(queue, {}) - totalSlots = self.queueSlots[queue].get("AvailableSlots", 0) - - # See if there are waiting pilots for this queue. If not, allow submission - if totalSlots and manyWaitingPilotsFlag: - result = self.pilotAgentsDB.selectPilots( - {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_WAITING_STATES} - ) - if result["OK"]: - jobIDList = result["Value"] - if not jobIDList: - return totalSlots - return 0 - - availableSlotsCount = self.queueSlots[queue].setdefault("AvailableSlotsCount", 0) - waitingJobs = 1 - if totalSlots == 0: - if availableSlotsCount % self.availableSlotsUpdateCycleFactor == 0: - # Get the list of already existing pilots for this queue - jobIDList = None - result = self.pilotAgentsDB.selectPilots( - {"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_TRANSIENT_STATES} - ) - - if result["OK"]: - jobIDList = result["Value"] - - if queryCEFlag: - result = ce.available(jobIDList) - if not result["OK"]: - self.log.warn("Failed to check the availability of queue", f"{queue}: \n{result['Message']}") - self.failedQueues[queue] += 1 - else: - ceInfoDict = result["CEInfoDict"] - self.log.info( - "CE queue report", - f"({ceName}_{queueName}): Wait={ceInfoDict['WaitingJobs']}, Run={ceInfoDict['RunningJobs']}, Submitted={ceInfoDict['SubmittedJobs']}, Max={ceInfoDict['MaxTotalJobs']}", - ) - totalSlots = result["Value"] - self.queueSlots[queue]["AvailableSlots"] = totalSlots - waitingJobs = ceInfoDict["WaitingJobs"] - else: - maxWaitingJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxWaitingJobs", 10)) - maxTotalJobs = int(self.queueDict[queue]["ParametersDict"].get("MaxTotalJobs", 10)) - waitingToRunningRatio = float( - self.queueDict[queue]["ParametersDict"].get("WaitingToRunningRatio", 0.0) - ) - waitingJobs = 0 - totalJobs = 0 - if jobIDList: - result = self.pilotAgentsDB.getPilotInfo(jobIDList) - if not result["OK"]: - self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}") - self.failedQueues[queue] += 1 - else: - for _pilotRef, pilotDict in result["Value"].items(): - if pilotDict["Status"] in PilotStatus.PILOT_TRANSIENT_STATES: - totalJobs += 1 - if pilotDict["Status"] in PilotStatus.PILOT_WAITING_STATES: - waitingJobs += 1 - runningJobs = totalJobs - waitingJobs - self.log.info( - "PilotAgentsDB report", - f"({ceName}_{queueName}): Wait={waitingJobs}, Run={runningJobs}, Max={maxTotalJobs}", - ) - maxWaitingJobs = int(max(maxWaitingJobs, runningJobs * waitingToRunningRatio)) - - totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) - self.queueSlots[queue]["AvailableSlots"] = max(totalSlots, 0) - - self.queueSlots[queue]["AvailableSlotsCount"] += 1 - - if manyWaitingPilotsFlag and waitingJobs: - return 0 - return totalSlots + self.log.error("Failed to set pilot status", result["Message"]) + return result + return S_OK() - ##################################################################################### - def getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **kwargs): + def _getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **kwargs): """Prepare the full executable for queue :param str queue: queue name @@ -981,8 +563,6 @@ def getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **k ) return executable - ##################################################################################### - def _getPilotOptions(self, queue, **kwargs): """Prepare pilot options @@ -1055,8 +635,6 @@ def _getPilotOptions(self, queue, **kwargs): return pilotOptions - #################################################################################### - def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExecDir="", envVariables=None): """Bundle together and write out the pilot executable script, admix the proxy if given @@ -1085,113 +663,45 @@ def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExe return _writePilotWrapperFile(workingDirectory=workingDirectory, localPilot=localPilot) - def updatePilotStatus(self): - """Update status of pilots in transient and final states""" + ##################################################################################### - # Generate a proxy before feeding the threads to renew the ones of the CEs to perform actions - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, self.pilotGroup, 23400) - if not result["OK"]: - return result - proxy = result["Value"] + def monitorPilotStatus(self): + """Update status of pilots in transient and final states""" + self.log.verbose("Monitoring: Queues treated are", ",".join(self.queueDict)) # Getting the status of pilots in a queue implies the use of remote CEs and may lead to network latency # Threads aim at overcoming such issues and thus 1 thread per queue is created to # update the status of pilots in transient states + errors = [] with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor: futures = [] for queue in self.queueDict: - futures.append(executor.submit(self._updatePilotStatusPerQueue, queue, proxy)) - for res in as_completed(futures): - err = res.exception() - if err: - self.log.exception("Update pilot status thread failed", lException=err) + futures.append(executor.submit(self._monitorPilotsPerQueue, queue)) - # The pilot can be in Done state set by the job agent check if the output is retrieved - for queue in self.queueDict: - ce = self.queueDict[queue]["CE"] - - if not ce.isProxyValid(120)["OK"]: - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, self.pilotGroup, 1000) + for future in as_completed(futures): + result = future.result() if not result["OK"]: - return result - proxy = result["Value"] - ce.setProxy(proxy, 940) - - if callable(getattr(ce, "cleanupPilots", None)): - ce.cleanupPilots() + errors.append(result["Message"]) - ceName = self.queueDict[queue]["CEName"] - queueName = self.queueDict[queue]["QueueName"] - ceType = self.queueDict[queue]["CEType"] - siteName = self.queueDict[queue]["Site"] - result = self.pilotAgentsDB.selectPilots( - { - "DestinationSite": ceName, - "Queue": queueName, - "GridType": ceType, - "GridSite": siteName, - "OutputReady": "False", - "Status": PilotStatus.PILOT_FINAL_STATES, - } - ) - - if not result["OK"]: - self.log.error("Failed to select pilots", result["Message"]) - continue - pilotRefs = result["Value"] - if not pilotRefs: - continue - result = self.pilotAgentsDB.getPilotInfo(pilotRefs) - if not result["OK"]: - self.log.error("Failed to get pilots info from DB", result["Message"]) - continue - pilotDict = result["Value"] - if self.getOutput: - for pRef in pilotRefs: - self._getPilotOutput(pRef, pilotDict, ce, ceName) - - # Check if the accounting is to be sent - if self.sendAccounting: - result = self.pilotAgentsDB.selectPilots( - { - "DestinationSite": ceName, - "Queue": queueName, - "GridType": ceType, - "GridSite": siteName, - "AccountingSent": "False", - "Status": PilotStatus.PILOT_FINAL_STATES, - } - ) - - if not result["OK"]: - self.log.error("Failed to select pilots", result["Message"]) - continue - pilotRefs = result["Value"] - if not pilotRefs: - continue - result = self.pilotAgentsDB.getPilotInfo(pilotRefs) - if not result["OK"]: - self.log.error("Failed to get pilots info from DB", result["Message"]) - continue - pilotDict = result["Value"] - result = self.sendPilotAccounting(pilotDict) - if not result["OK"]: - self.log.error("Failed to send pilot agent accounting") + if errors: + self.log.error("The following errors occurred during the pilot monitoring operation", "\n".join(errors)) + return S_ERROR("Pilot monitoring: errors occurred") return S_OK() - def _updatePilotStatusPerQueue(self, queue, proxy): + def _monitorPilotsPerQueue(self, queueName): """Update status of pilots in transient state for a given queue :param queue: queue name :param proxy: proxy to check the pilot status and renewals """ - ce = self.queueDict[queue]["CE"] - ceName = self.queueDict[queue]["CEName"] - queueName = self.queueDict[queue]["QueueName"] - ceType = self.queueDict[queue]["CEType"] - siteName = self.queueDict[queue]["Site"] + ce = self.queueDict[queueName]["CE"] + ceName = self.queueDict[queueName]["CEName"] + queueName = self.queueDict[queueName]["QueueName"] + ceType = self.queueDict[queueName]["CEType"] + siteName = self.queueDict[queueName]["Site"] + # Select pilots in a transient states result = self.pilotAgentsDB.selectPilots( { "DestinationSite": ceName, @@ -1204,62 +714,82 @@ def _updatePilotStatusPerQueue(self, queue, proxy): ) if not result["OK"]: self.log.error("Failed to select pilots", f": {result['Message']}") - return + return result pilotRefs = result["Value"] if not pilotRefs: - return + return S_OK() + # Get their information result = self.pilotAgentsDB.getPilotInfo(pilotRefs) if not result["OK"]: self.log.error("Failed to get pilots info from DB", result["Message"]) - return + return result pilotDict = result["Value"] - stampedPilotRefs = [] - for pRef in pilotDict: - if pilotDict[pRef]["PilotStamp"]: - stampedPilotRefs.append(pRef + ":::" + pilotDict[pRef]["PilotStamp"]) - else: - stampedPilotRefs = list(pilotRefs) - break - - # This proxy is used for checking the pilot status and renewals + # The proxy is used for checking the pilot status and renewals # We really need at least a few hours otherwise the renewed # proxy may expire before we check again... - result = ce.isProxyValid(3 * 3600) + result = self._setCredentials(ce, 3 * 3600) if not result["OK"]: - ce.setProxy(proxy, 23300) - - # Get valid token if needed - if "Token" in ce.ceParameters.get("Tag", []): - result = self.__getPilotToken(audience=ce.audienceName) - if not result["OK"]: - self.log.error("Failed to get token", f"{ceName}: {result['Message']}") - return - ce.setToken(result["Value"], 3500) + self.log.error("Failed to set credentials:", result["Message"]) + return result - result = ce.getJobStatus(stampedPilotRefs) + # Get an update of the pilot by interrogating the CEs + result = ce.getJobStatus(pilotRefs) if not result["OK"]: self.log.error("Failed to get pilots status from CE", f"{ceName}: {result['Message']}") - return + return result pilotCEDict = result["Value"] - abortedPilots, getPilotOutput = self._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) - for pRef in getPilotOutput: - self._getPilotOutput(pRef, pilotDict, ce, ceName) - + # Update pilot status in DB + abortedPilots = self._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) # If something wrong in the queue, make a pause for the job submission if abortedPilots: - self.failedQueues[queue] += 1 + self.failedQueues[queueName] += 1 + + # FIXME: seems like it is only used by the CloudCE? Couldn't it be called from CloudCE.getJobStatus()? + if callable(getattr(ce, "cleanupPilots", None)): + ce.cleanupPilots() + + # Check if the accounting is to be sent + if self.sendAccounting: + result = self.pilotAgentsDB.selectPilots( + { + "DestinationSite": ceName, + "Queue": queueName, + "GridType": ceType, + "GridSite": siteName, + "AccountingSent": "False", + "Status": PilotStatus.PILOT_FINAL_STATES, + } + ) + if not result["OK"]: + self.log.error("Failed to select pilots", result["Message"]) + return result + + pilotRefs = result["Value"] + if not pilotRefs: + return S_OK() + + result = self.pilotAgentsDB.getPilotInfo(pilotRefs) + if not result["OK"]: + self.log.error("Failed to get pilots info from DB", result["Message"]) + return result + + pilotDict = result["Value"] + result = self._sendPilotAccounting(pilotDict) + if not result["OK"]: + self.log.error("Failed to send pilot agent accounting") + return result + + return S_OK() def _updatePilotStatus(self, pilotRefs, pilotDict, pilotCEDict): """Really updates the pilots status :return: number of aborted pilots, flag for getting the pilot output """ - abortedPilots = 0 - getPilotOutput = [] for pRef in pilotRefs: newStatus = "" @@ -1291,47 +821,73 @@ def _updatePilotStatus(self, pilotRefs, pilotDict, pilotCEDict): self.log.error(result["Message"]) if newStatus == "Aborted": abortedPilots += 1 - # Set the flag to retrieve the pilot output now or not - if newStatus in PilotStatus.PILOT_FINAL_STATES: - if pilotDict[pRef]["OutputReady"].lower() == "false" and self.getOutput: - getPilotOutput.append(pRef) - return abortedPilots, getPilotOutput + return abortedPilots - def _getPilotOutput(self, pRef, pilotDict, ce, ceName): - """Retrieves the pilot output for a pilot and stores it in the pilotAgentsDB""" - self.log.info(f"Retrieving output for pilot {pRef}") - output = None - error = None + ##################################################################################### - pilotStamp = pilotDict[pRef]["PilotStamp"] - pRefStamp = pRef - if pilotStamp: - pRefStamp = pRef + ":::" + pilotStamp + def __getPilotToken(self, audience: str, scope: list[str] = None): + """Get the token corresponding to the pilot user identity - result = ce.getJobOutput(pRefStamp) - if not result["OK"]: - self.failedPilotOutput[pRefStamp] += 1 - self.log.error("Failed to get pilot output", f"{ceName}: {result['Message']}") - self.log.verbose(f"Retries left: {max(0, self.maxRetryGetPilotOutput - self.failedPilotOutput[pRefStamp])}") - - if (self.maxRetryGetPilotOutput - self.failedPilotOutput[pRefStamp]) <= 0: - output = "Output is no longer available" - error = "Error is no longer available" - self.failedPilotOutput.pop(pRefStamp) - else: - return - else: - output, error = result["Value"] + :param audience: Token audience, targeting a single CE + :param scope: list of permissions needed to interact with a CE + :return: S_OK/S_ERROR, Token object as Value + """ + if not audience: + return S_ERROR("Audience is not defined") + + if not scope: + scope = PILOT_SCOPES + + return gTokenManager.getToken(userGroup=self.pilotGroup, requiredTimeLeft=600, scope=scope, audience=audience) - if output: - result = self.pilotAgentsDB.storePilotOutput(pRef, output, error) + def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: int): + """ + + :param ce: ComputingElement instance + :param proxyMinimumRequiredValidity: number of seconds needed to perform an operation with the proxy + :param tokenMinimumRequiredValidity: number of seconds needed to perform an operation with the token + """ + getNewProxy = False + + # If the CE does not already embed a proxy, we need one + if not ce.proxy: + getNewProxy = True + + # If the CE embeds a proxy that is too short to perform a given operation, we need a new one + if ce.proxy: + result = ce.proxy.getRemainingSecs() if not result["OK"]: - self.log.error("Failed to store pilot output", result["Message"]) - else: - self.log.warn("Empty pilot output not stored to PilotDB") + return result + + if result["Value"] < proxyMinimumRequiredValidity: + getNewProxy = True + + # Generate a new proxy if needed + if getNewProxy: + self.log.verbose( + "Getting pilot proxy", f"for {self.pilotDN}/{self.pilotGroup} {proxyMinimumRequiredValidity} long" + ) + result = gProxyManager.getPilotProxyFromDIRACGroup( + self.pilotDN, self.pilotGroup, proxyMinimumRequiredValidity + ) + if not result["OK"]: + return result + ce.setProxy(result["Value"]) - def sendPilotAccounting(self, pilotDict): + # Get valid token if needed + if "Token" in ce.ceParameters.get("Tag", []): + result = self.__getPilotToken(audience=ce.audienceName) + if not result["OK"]: + self.log.error("Failed to get token", f"{ce.ceName}: {result['Message']}") + return + ce.setToken(result["Value"]) + + return S_OK() + + ##################################################################################### + + def _sendPilotAccounting(self, pilotDict): """Send pilot accounting record""" for pRef in pilotDict: self.log.verbose("Preparing accounting record", f"for pilot {pRef}") @@ -1382,7 +938,7 @@ def sendPilotAccounting(self, pilotDict): return S_OK() - def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, numSucceeded, status): """Send pilot submission accounting record :param str siteName: Site name @@ -1399,11 +955,7 @@ def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, n pA.setStartTime(datetime.datetime.utcnow()) pA.setEndTime(datetime.datetime.utcnow()) pA.setValueByKey("HostName", DIRAC.siteName()) - if hasattr(self, "_AgentModule__moduleProperties"): - pA.setValueByKey("SiteDirector", self.am_getModuleParam("agentName")) - else: # In case it is not executed as agent - pA.setValueByKey("SiteDirector", "Client") - + pA.setValueByKey("SiteDirector", self.am_getModuleParam("agentName")) pA.setValueByKey("Site", siteName) pA.setValueByKey("CE", ceName) pA.setValueByKey("Queue", ceName + ":" + queueName) @@ -1423,7 +975,7 @@ def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, n return result return S_OK() - def sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status): """Sends pilot submission records to monitoring :param str siteName: Site name @@ -1438,14 +990,9 @@ def sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, n pilotMonitoringReporter = MonitoringReporter(monitoringType="PilotSubmissionMonitoring") - if hasattr(self, "_AgentModule__moduleProperties"): - siteDirName = self.am_getModuleParam("agentName") - else: # In case it is not executed as agent - siteDirName = "Client" - pilotMonitoringData = { "HostName": DIRAC.siteName(), - "SiteDirector": siteDirName, + "SiteDirector": self.am_getModuleParam("agentName"), "Site": siteName, "CE": ceName, "Queue": ceName + ":" + queueName, diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index 657114f6f19..c5d7860ebc8 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -5,7 +5,7 @@ Available methods are: - addPilotTQReference() + addPilotReference() setPilotStatus() deletePilot() clearPilots() @@ -38,11 +38,9 @@ def __init__(self, parentLogger=None): self.lock = threading.Lock() ########################################################################################## - def addPilotTQReference(self, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - - err = "PilotAgentsDB.addPilotTQReference: Failed to retrieve a new Id." + def addPilotReference(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + """Add a new pilot job reference""" for ref in pilotRef: stamp = "" if ref in pilotStampDict: @@ -50,9 +48,9 @@ def addPilotTQReference(self, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC req = ( "INSERT INTO PilotAgents " - + "(PilotJobReference, TaskQueueID, OwnerGroup, GridType, SubmissionTime, LastUpdateTime, Status, PilotStamp) " + + "(PilotJobReference, OwnerGroup, GridType, SubmissionTime, LastUpdateTime, Status, PilotStamp) " + "VALUES ('%s',%d,'%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'Submitted','%s')" - % (ref, int(taskQueueID), ownerGroup, gridType, stamp) + % (ref, ownerGroup, gridType, stamp) ) result = self._update(req) @@ -60,7 +58,7 @@ def addPilotTQReference(self, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC return result if "lastRowId" not in result: - return S_ERROR(f"{err}") + return S_ERROR("PilotAgentsDB.addPilotReference: Failed to retrieve a new Id.") return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index a8676d0fa6b..422d361c658 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -70,23 +70,14 @@ def export_getCurrentPilotCounters(cls, attrDict={}): return S_OK(resultDict) ########################################################################################## - types_addPilotTQReference = [list, int, str, str] + types_addPilotReference = [list, str] - @deprecated("Use addPilotTQRef") @classmethod - def export_addPilotTQReference( - cls, pilotRef, taskQueueID, ownerDN, ownerGroup, broker="Unknown", gridType="DIRAC", pilotStampDict={} + def export_addPilotReference( + cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={} ): """Add a new pilot job reference""" - - return cls.pilotAgentsDB.addPilotTQReference(pilotRef, taskQueueID, ownerGroup, gridType, pilotStampDict) - - types_addPilotTQRef = [list, int, str] - - @classmethod - def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC", pilotStampDict={}): - """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotTQReference(pilotRef, taskQueueID, ownerGroup, gridType, pilotStampDict) + return cls.pilotAgentsDB.addPilotReference(pilotRef, ownerGroup, gridType, pilotStampDict) ############################################################################## types_getPilotOutput = [str] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py index 7fa2f10c3d4..e508fd0aaca 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py @@ -11,9 +11,7 @@ from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory -def getQueuesResolved( - siteDict, queueCECache, gridEnv=None, setup=None, workingDir="", checkPlatform=False, instantiateCEs=False -): +def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False): """Get the list of relevant CEs (what is in siteDict) and their descriptions. The main goal of this method is to return a dictionary of queues """ @@ -24,7 +22,6 @@ def getQueuesResolved( for ce in siteDict[site]: ceDict = siteDict[site][ce] pilotRunDirectory = ceDict.get("PilotRunDirectory", "") - # ceMaxRAM = ceDict.get('MaxRAM', None) qDict = ceDict.pop("Queues") for queue in qDict: queueName = f"{ce}_{queue}" @@ -33,8 +30,6 @@ def getQueuesResolved( queueDict[queueName]["ParametersDict"]["Queue"] = queue queueDict[queueName]["ParametersDict"]["GridCE"] = ce queueDict[queueName]["ParametersDict"]["Site"] = site - queueDict[queueName]["ParametersDict"]["GridEnv"] = gridEnv - queueDict[queueName]["ParametersDict"]["Setup"] = setup # Evaluate the CPU limit of the queue according to the Glue convention computeQueueCPULimit(queueDict[queueName]["ParametersDict"]) @@ -80,7 +75,6 @@ def getQueuesResolved( queueDict[queueName]["CEType"] = ceDict["CEType"] queueDict[queueName]["Site"] = site queueDict[queueName]["QueueName"] = queue - queueDict[queueName]["QueryCEFlag"] = ceDict.get("QueryCEFlag", "false") if checkPlatform: setPlatform(ceDict, queueDict[queueName]["ParametersDict"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py index 006db96af3b..ad7a89693c3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_QueueUtilities.py @@ -22,7 +22,6 @@ "CE1_Queue1": { "CEName": "CE1", "CEType": "Type1", - "QueryCEFlag": False, "QueueName": "Queue1", "Site": "Site1", "ParametersDict": { @@ -39,7 +38,6 @@ "CE1_Queue2": { "CEName": "CE1", "CEType": "Type1", - "QueryCEFlag": False, "QueueName": "Queue2", "Site": "Site1", "ParametersDict": { @@ -56,7 +54,6 @@ "CE2_Queue1": { "CEName": "CE2", "CEType": "Type2", - "QueryCEFlag": False, "QueueName": "Queue1", "Site": "Site1", "ParametersDict": { @@ -73,7 +70,6 @@ "CE3_Queue1": { "CEName": "CE3", "CEType": "Type2", - "QueryCEFlag": False, "QueueName": "Queue1", "Site": "Site2", "ParametersDict": { diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py index 91a21ac548a..a9f1c43874f 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py @@ -36,9 +36,8 @@ def preparePilots(stateCount, testSite, testCE, testGroup): for i in range(nPilots): pilotRef.append("pilotRef_" + str(i)) - res = paDB.addPilotTQReference( + res = paDB.addPilotReference( pilotRef, - 123, testGroup, ) assert res["OK"] is True, res["Message"] @@ -80,7 +79,7 @@ def cleanUpPilots(pilotRef): def test_basic(): """usual insert/verify""" - res = paDB.addPilotTQReference(["pilotRef"], 123, "ownerGroup") + res = paDB.addPilotReference(["pilotRef"], "ownerGroup") assert res["OK"] is True res = paDB.deletePilot("pilotRef")