Skip to content

Commit

Permalink
feat: move submission policies to WMS Utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Sep 19, 2023
1 parent 41f49db commit b48b759
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 79 deletions.
5 changes: 2 additions & 3 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,12 @@ def _reset(self):

#############################################################################

def setToken(self, token, valid):
def setToken(self, token):
"""Set the token and update the headers
:param token: OAuth2Token object or dictionary containing token structure
:param int valid: validity period in seconds
"""
super().setToken(token, valid)
super().setToken(token)
self.headers["Authorization"] = "Bearer " + self.token["access_token"]

def _arcToDiracID(self, arcJobID):
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,14 @@ def __init__(self, ceUniqueID):
self.errorTemplate = ""

############################################################################
def setProxy(self, proxy, valid=0):
def setProxy(self, proxy):
"""
Set and prepare proxy to use
:param str proxy: proxy to use
:param int valid: proxy validity period
:return: S_OK/S_ERROR
"""
ComputingElement.setProxy(self, proxy, valid)
ComputingElement.setProxy(self, proxy)
if self.ceParameters.get("SSHType", "ssh") == "gsissh":
result = self._prepareProxy()
if not result["OK"]:
Expand Down
2 changes: 0 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes):
result = getQueuesResolved(
siteDict=result["Value"],
queueCECache=self.queueCECache,
gridEnv=getGridEnv(),
setup=gConfig.getValue("/DIRAC/Setup", "unknown"),
instantiateCEs=True,
)
if not result["OK"]:
Expand Down
72 changes: 3 additions & 69 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import datetime
import os
import socket
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed

Expand All @@ -33,7 +32,6 @@
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Client.PilotScopes import PILOT_SCOPES
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import (
Expand All @@ -46,7 +44,7 @@
MAX_PILOTS_TO_SUBMIT = 100

# Submission policies
AGGRESSIVE_FILLING = "AgressingFilling"
AGGRESSIVE_FILLING = "AggressiveFilling"
WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs"
SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS]

Expand Down Expand Up @@ -77,15 +75,14 @@ def __init__(self, *args, **kwargs):
self.sendSubmissionMonitoring = False
self.localhost = socket.getfqdn()

self.matcherClient = None
self.siteClient = None
self.rssClient = None
self.pilotAgentsDB = None
self.rssFlag = None

# self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again
self.failedQueueCycleFactor = 10
self.submissionPolicyName = AGGRESSIVE_FILLING
self.submissionPolicyName = WAITING_SUPPORTED_JOBS
self.submissionPolicy = None

self.workingDirectory = None
Expand All @@ -107,7 +104,6 @@ def initialize(self):
# Get the clients
self.siteClient = SiteStatus()
self.rssClient = ResourceStatus()
self.matcherClient = MatcherClient()
self.pilotAgentsDB = getPilotAgentsDB()

return S_OK()
Expand Down Expand Up @@ -217,7 +213,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes, tags):
# Set up the queue dictionary
result = getQueuesResolved(
siteDict=result["Value"],
vo=self.vo,
queueCECache=self.queueCECache,
instantiateCEs=True,
)
Expand Down Expand Up @@ -695,7 +690,7 @@ def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExe

#####################################################################################

def monitorPilotStatus(self):
def monitorPilots(self):
"""Update status of pilots in transient and final states"""
self.log.verbose("Monitoring: Queues treated are", ",".join(self.queueDict))

Expand Down Expand Up @@ -1040,64 +1035,3 @@ def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal,
return S_ERROR()
self.log.verbose("Done committing to monitoring")
return S_OK()


class SubmissionPolicy(ABC):
"""Abstract class to define a submission strategy."""

@abstractmethod
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Method to redefine in the concrete subclasses
:param availableSlots: slots available for new pilots
:param queueName: the name of the targeted queue
:param queueInfo: a dictionary of attributes related to the queue
:param vo: VO
"""
pass


class AgressiveFillingPolicy(SubmissionPolicy):
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""All the available slots should be filled up.
Should be employed for sites that are always processing jobs.
* Pros: would quickly fill up a queue
* Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs
"""
return availableSlots


class WaitingSupportedJobsPolicy(SubmissionPolicy):
def __init__(self) -> None:
super().__init__()
self.matcherClient = MatcherClient()

def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Fill up available slots only if waiting supported jobs exist.
Should be employed for sites that are rarely used (targetting specific Task Queues).
* Pros: submit pilots only if necessary, and quickly fill up the queue if needed
* Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue
"""
# Prepare CE dictionary from the queue info
ce = queueInfo["CE"]
ceDict = ce.ceParameters
ceDict["GridCE"] = queueInfo["CEName"]
if vo:
ceDict["Community"] = vo

# Get Task Queues related to the CE
result = self.matcherClient.getMatchingTaskQueues(ceDict)
if not result["OK"]:
self.log.error("Could not retrieve TaskQueues from TaskQueueDB", result["Message"])
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Return the minimum value between the number of slots available and supported jobs
return min(availableSlots, waitingSupportedJobs)
1 change: 0 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ CREATE TABLE `PilotAgents` (
`PilotID` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`InitialJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`CurrentJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`TaskQueueID` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`PilotJobReference` VARCHAR(255) NOT NULL DEFAULT 'Unknown',
`PilotStamp` VARCHAR(32) NOT NULL DEFAULT '',
`DestinationSite` VARCHAR(128) NOT NULL DEFAULT 'NotAssigned',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory


def getQueuesResolved(siteDict, vo, queueCECache, checkPlatform=False, instantiateCEs=False):
def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False):
"""Get the list of relevant CEs (what is in siteDict) and their descriptions.
The main goal of this method is to return a dictionary of queues
"""
Expand Down
63 changes: 63 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from abc import ABC, abstractmethod

from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient


class SubmissionPolicy(ABC):
"""Abstract class to define a submission strategy."""

@abstractmethod
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Method to redefine in the concrete subclasses
:param availableSlots: slots available for new pilots
:param queueName: the name of the targeted queue
:param queueInfo: a dictionary of attributes related to the queue
:param vo: VO
"""
pass


class AgressiveFillingPolicy(SubmissionPolicy):
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""All the available slots should be filled up.
Should be employed for sites that are always processing jobs.
* Pros: would quickly fill up a queue
* Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs
"""
return availableSlots


class WaitingSupportedJobsPolicy(SubmissionPolicy):
def __init__(self) -> None:
super().__init__()
self.matcherClient = MatcherClient()

def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Fill up available slots only if waiting supported jobs exist.
Should be employed for sites that are used from time to time (targeting specific Task Queues).
* Pros: submit pilots only if necessary, and quickly fill up the queue if needed
* Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue
"""
# Prepare CE dictionary from the queue info
ce = queueInfo["CE"]
ceDict = ce.ceParameters
ceDict["GridCE"] = queueInfo["CEName"]
if vo:
ceDict["Community"] = vo

# Get Task Queues related to the CE
result = self.matcherClient.getMatchingTaskQueues(ceDict)
if not result["OK"]:
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Return the minimum value between the number of slots available and supported jobs
return min(availableSlots, waitingSupportedJobs)

0 comments on commit b48b759

Please sign in to comment.