Skip to content

Commit

Permalink
feat: introduce submissionPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Nov 16, 2023
1 parent 4950a54 commit abc7ebc
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
93 changes: 92 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datetime
import os
import socket
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed

Expand Down Expand Up @@ -45,6 +46,11 @@

MAX_PILOTS_TO_SUBMIT = 100

# Submission policies
AGGRESSIVE_FILLING = "AgressingFilling"
WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs"
SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS]


class SiteDirector(AgentModule):
"""SiteDirector class provides an implementation of a DIRAC agent.
Expand Down Expand Up @@ -78,7 +84,10 @@ def __init__(self, *args, **kwargs):

# self.failedQueueCycleFactor is the number of cycles a queue has to wait before getting pilots again
self.failedQueueCycleFactor = 10
self.submissionPolicyName = AGGRESSIVE_FILLING
self.submissionPolicy = None

self.workingDirectory = None
self.maxQueueLength = 86400 * 3
self.pilotLogLevel = "INFO"

Expand Down Expand Up @@ -125,6 +134,13 @@ def beginExecution(self):
self.maxPilotsToSubmit = self.am_getOption("MaxPilotsToSubmit", self.maxPilotsToSubmit)
self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor)

# Load submission policy
self.submissionPolicyName = self.am_getOption("SubmissionPolicy", self.submissionPolicyName)
result = self._loadSubmissionPolicy()
if not result:
return result
self.submissionPolicy = result["Value"]()

# Flags
self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting)
self.rssFlag = self.rssClient.rssFlag
Expand Down Expand Up @@ -176,6 +192,17 @@ def beginExecution(self):

return S_OK()

def _loadSubmissionPolicy(self):
"""Load a submission policy"""
objectLoader = ObjectLoader()
result = objectLoader.loadObject(
f"WorkloadManagementSystem.Agent.SiteDirector.{self.submissionPolicyName}", self.submissionPolicyName
)
if not result["OK"]:
self.log.error(f"Failed to load submission policy: {result['Message']}")
return result
return S_OK(result["Value"])

def _buildQueueDict(self, siteNames, ces, ceTypes, tags):
"""Build the queueDict dictionary containing information about the queues that will be provisioned"""
# Get details about the resources
Expand All @@ -186,8 +213,8 @@ def _buildQueueDict(self, siteNames, ces, ceTypes, tags):
# Set up the queue dictionary
result = getQueuesResolved(
siteDict=result["Value"],
vo=self.vo,
queueCECache=self.queueCECache,
workingDir=self.workingDirectory,
instantiateCEs=True,
)
if not result["OK"]:
Expand Down Expand Up @@ -311,6 +338,9 @@ def _submitPilotsPerQueue(self, queueName):
return S_ERROR(f"{queueName}: No slot available")
self.log.info(f"{queueName}: to submit={totalSlots}")

# Apply the submission policy
totalSlots = self.submissionPolicy.apply(totalSlots)

# Limit the number of pilots to submit to self.maxPilotsToSubmit
pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots)

Expand Down Expand Up @@ -1021,3 +1051,64 @@ def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal,
return S_ERROR()
self.log.verbose("Done committing to monitoring")
return S_OK()


class SubmissionPolicy(ABC):
"""Abstract class to define a submission strategy."""

@abstractmethod
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Method to redefine in the concrete subclasses
:param availableSlots: slots available for new pilots
:param queueName: the name of the targeted queue
:param queueInfo: a dictionary of attributes related to the queue
:param vo: VO
"""
pass


class AgressiveFillingPolicy(SubmissionPolicy):
def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""All the available slots should be filled up.
Should be employed for sites that are always processing jobs.
* Pros: would quickly fill up a queue
* Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs
"""
return availableSlots


class WaitingSupportedJobsPolicy(SubmissionPolicy):
def __init__(self) -> None:
super().__init__()
self.matcherClient = MatcherClient()

def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int:
"""Fill up available slots only if waiting supported jobs exist.
Should be employed for sites that are rarely used (targetting specific Task Queues).
* Pros: submit pilots only if necessary, and quickly fill up the queue if needed
* Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue
"""
# Prepare CE dictionary from the queue info
ce = queueInfo["CE"]
ceDict = ce.ceParameters
ceDict["GridCE"] = queueInfo["CEName"]
if vo:
ceDict["Community"] = vo

# Get Task Queues related to the CE
result = self.matcherClient.getMatchingTaskQueues(ceDict)
if not result["OK"]:
self.log.error("Could not retrieve TaskQueues from TaskQueueDB", result["Message"])
return 0
taskQueueDict = result["Value"]

# Get the number of jobs that would match the capability of the CE
waitingSupportedJobs = 0
for tq in taskQueueDict.values():
waitingSupportedJobs += tq["Jobs"]

# Return the minimum value between the number of slots available and supported jobs
return min(availableSlots, waitingSupportedJobs)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory


def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False):
def getQueuesResolved(siteDict, vo, queueCECache, checkPlatform=False, instantiateCEs=False):
"""Get the list of relevant CEs (what is in siteDict) and their descriptions.
The main goal of this method is to return a dictionary of queues
"""
Expand Down

0 comments on commit abc7ebc

Please sign in to comment.