Skip to content

Commit

Permalink
feat: move submission policies to WMS Utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Nov 16, 2023
1 parent abc7ebc commit 755d42b
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 84 deletions.
5 changes: 2 additions & 3 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,12 @@ def _reset(self):

#############################################################################

def setToken(self, token, valid):
def setToken(self, token):
"""Set the token and update the headers
:param token: OAuth2Token object or dictionary containing token structure
:param int valid: validity period in seconds
"""
super().setToken(token, valid)
super().setToken(token)
self.headers["Authorization"] = "Bearer " + self.token["access_token"]

def _arcIDToJobReference(self, arcJobID):
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,14 @@ def __init__(self, ceUniqueID):
self.errorTemplate = ""

############################################################################
def setProxy(self, proxy, valid=0):
def setProxy(self, proxy):
"""
Set and prepare proxy to use
:param str proxy: proxy to use
:param int valid: proxy validity period
:return: S_OK/S_ERROR
"""
ComputingElement.setProxy(self, proxy, valid)
ComputingElement.setProxy(self, proxy)
if self.ceParameters.get("SSHType", "ssh") == "gsissh":
result = self._prepareProxy()
if not result["OK"]:
Expand Down
2 changes: 0 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes):
result = getQueuesResolved(
siteDict=result["Value"],
queueCECache=self.queueCECache,
gridEnv=getGridEnv(),
setup=gConfig.getValue("/DIRAC/Setup", "unknown"),
instantiateCEs=True,
)
if not result["OK"]:
Expand Down
74 changes: 4 additions & 70 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import datetime
import os
import socket
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed

Expand Down Expand Up @@ -47,7 +46,7 @@
MAX_PILOTS_TO_SUBMIT = 100

# Submission policies
AGGRESSIVE_FILLING = "AgressingFilling"
AGGRESSIVE_FILLING = "AggressiveFilling"
WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs"
SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS]

Expand Down Expand Up @@ -75,7 +74,6 @@ def __init__(self, *args, **kwargs):
self.sendAccounting = True
self.sendSubmissionAccounting = True
self.sendSubmissionMonitoring = False
self.localhost = socket.getfqdn()

self.siteClient = None
self.rssClient = None
Expand All @@ -84,12 +82,11 @@ def __init__(self, *args, **kwargs):

# self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again
self.failedQueueCycleFactor = 10
self.submissionPolicyName = AGGRESSIVE_FILLING
self.submissionPolicyName = WAITING_SUPPORTED_JOBS
self.submissionPolicy = None

self.workingDirectory = None
self.maxQueueLength = 86400 * 3
self.pilotLogLevel = "INFO"

def initialize(self):
"""Initial settings"""
Expand Down Expand Up @@ -128,9 +125,8 @@ def beginExecution(self):
self.pilotDN, self.pilotGroup = result["Value"]

# Parameters
self.workingDirectory = self.am_getOption("WorkDirectory")
self.workingDirectory = self.am_getOption("WorkDirectory", self.workingDirectory)
self.maxQueueLength = self.am_getOption("MaxQueueLength", self.maxQueueLength)
self.pilotLogLevel = self.am_getOption("PilotLogLevel", self.pilotLogLevel)
self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit)
self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor)

Expand Down Expand Up @@ -213,7 +209,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes, tags):
# Set up the queue dictionary
result = getQueuesResolved(
siteDict=result["Value"],
vo=self.vo,
queueCECache=self.queueCECache,
instantiateCEs=True,
)
Expand Down Expand Up @@ -698,7 +693,7 @@ def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExe

#####################################################################################

def monitorPilotStatus(self):
def monitorPilots(self):
"""Update status of pilots in transient and final states"""
self.log.verbose("Monitoring: Queues treated are", ",".join(self.queueDict))

Expand Down Expand Up @@ -1051,64 +1046,3 @@ def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal,
return S_ERROR()
self.log.verbose("Done committing to monitoring")
return S_OK()


class SubmissionPolicy(ABC):
"""Abstract class to define a submission strategy."""

@abstractmethod
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Method to redefine in the concrete subclasses
:param availableSlots: slots available for new pilots
:param queueName: the name of the targeted queue
:param queueInfo: a dictionary of attributes related to the queue
:param vo: VO
"""
pass


class AgressiveFillingPolicy(SubmissionPolicy):
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""All the available slots should be filled up.
Should be employed for sites that are always processing jobs.
* Pros: would quickly fill up a queue
* Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs
"""
return availableSlots


class WaitingSupportedJobsPolicy(SubmissionPolicy):
def __init__(self) -> None:
super().__init__()
self.matcherClient = MatcherClient()

def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Fill up available slots only if waiting supported jobs exist.
Should be employed for sites that are rarely used (targetting specific Task Queues).
* Pros: submit pilots only if necessary, and quickly fill up the queue if needed
* Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue
"""
# Prepare CE dictionary from the queue info
ce = queueInfo["CE"]
ceDict = ce.ceParameters
ceDict["GridCE"] = queueInfo["CEName"]
if vo:
ceDict["Community"] = vo

# Get Task Queues related to the CE
result = self.matcherClient.getMatchingTaskQueues(ceDict)
if not result["OK"]:
self.log.error("Could not retrieve TaskQueues from TaskQueueDB", result["Message"])
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Return the minimum value between the number of slots available and supported jobs
return min(availableSlots, waitingSupportedJobs)
2 changes: 1 addition & 1 deletion src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def addPilotReference(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDi
req = (
"INSERT INTO PilotAgents "
+ "(PilotJobReference, OwnerGroup, GridType, SubmissionTime, LastUpdateTime, Status, PilotStamp) "
+ "VALUES ('%s',%d,'%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'Submitted','%s')"
+ "VALUES ('%s','%s','%s',UTC_TIMESTAMP(),UTC_TIMESTAMP(),'Submitted','%s')"
% (ref, ownerGroup, gridType, stamp)
)

Expand Down
1 change: 0 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ CREATE TABLE `PilotAgents` (
`PilotID` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
`InitialJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`CurrentJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`TaskQueueID` INT(11) UNSIGNED NOT NULL DEFAULT 0,
`PilotJobReference` VARCHAR(255) NOT NULL DEFAULT 'Unknown',
`PilotStamp` VARCHAR(32) NOT NULL DEFAULT '',
`DestinationSite` VARCHAR(128) NOT NULL DEFAULT 'NotAssigned',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ def export_getCurrentPilotCounters(cls, attrDict={}):
types_addPilotReference = [list, str]

@classmethod
def export_addPilotReference(
cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}
):
def export_addPilotReference(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}):
"""Add a new pilot job reference"""
return cls.pilotAgentsDB.addPilotReference(pilotRef, ownerGroup, gridType, pilotStampDict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory


def getQueuesResolved(siteDict, vo, queueCECache, checkPlatform=False, instantiateCEs=False):
def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False):
"""Get the list of relevant CEs (what is in siteDict) and their descriptions.
The main goal of this method is to return a dictionary of queues
"""
Expand Down
63 changes: 63 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from abc import ABC, abstractmethod

from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient


class SubmissionPolicy(ABC):
"""Abstract class to define a submission strategy."""

@abstractmethod
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Method to redefine in the concrete subclasses
:param availableSlots: slots available for new pilots
:param queueName: the name of the targeted queue
:param queueInfo: a dictionary of attributes related to the queue
:param vo: VO
"""
pass


class AgressiveFillingPolicy(SubmissionPolicy):
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""All the available slots should be filled up.
Should be employed for sites that are always processing jobs.
* Pros: would quickly fill up a queue
* Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs
"""
return availableSlots


class WaitingSupportedJobsPolicy(SubmissionPolicy):
def __init__(self) -> None:
super().__init__()
self.matcherClient = MatcherClient()

def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Fill up available slots only if waiting supported jobs exist.
Should be employed for sites that are used from time to time (targeting specific Task Queues).
* Pros: submit pilots only if necessary, and quickly fill up the queue if needed
* Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue
"""
# Prepare CE dictionary from the queue info
ce = queueInfo["CE"]
ceDict = ce.ceParameters
ceDict["GridCE"] = queueInfo["CEName"]
if vo:
ceDict["Community"] = vo

# Get Task Queues related to the CE
result = self.matcherClient.getMatchingTaskQueues(ceDict)
if not result["OK"]:
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Return the minimum value between the number of slots available and supported jobs
return min(availableSlots, waitingSupportedJobs)

0 comments on commit 755d42b

Please sign in to comment.