Skip to content

Commit

Permalink
feat: change the SiteDirector logic
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Nov 16, 2023
1 parent 2a94452 commit 4950a54
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 905 deletions.
92 changes: 26 additions & 66 deletions src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def __init__(self, ceName):
self.minProxyTime = gConfig.getValue("/Registry/MinProxyLifeTime", 10800) # secs
self.defaultProxyTime = gConfig.getValue("/Registry/DefaultProxyLifeTime", 43200) # secs
self.proxyCheckPeriod = gConfig.getValue("/Registry/ProxyCheckingPeriod", 3600) # secs
self.valid = None

self.batchSystem = None
self.taskResults = {}
Expand All @@ -97,14 +96,12 @@ def __init__(self, ceName):
self.initializeParameters()
self.log.debug("CE parameters", self.ceParameters)

def setProxy(self, proxy, valid=0):
def setProxy(self, proxy):
"""Set proxy for this instance"""
self.proxy = proxy
self.valid = datetime.datetime.utcnow() + second * valid

def setToken(self, token, valid=0):
def setToken(self, token):
self.token = token
self.valid = datetime.datetime.utcnow() + second * valid

def _prepareProxy(self):
"""Set the environment variable X509_USER_PROXY"""
Expand All @@ -117,21 +114,6 @@ def _prepareProxy(self):
self.log.debug(f"Set proxy variable X509_USER_PROXY to {os.environ['X509_USER_PROXY']}")
return S_OK()

def isProxyValid(self, valid=1000):
"""Check if the stored proxy is valid"""
if not self.valid:
result = S_ERROR("Proxy is not valid for the requested length")
result["Value"] = 0
return result
delta = self.valid - datetime.datetime.utcnow()
totalSeconds = delta.days * 86400 + delta.seconds
if totalSeconds > valid:
return S_OK(totalSeconds - valid)

result = S_ERROR("Proxy is not valid for the requested length")
result["Value"] = totalSeconds - valid
return result

def initializeParameters(self):
"""Initialize the CE parameters after they are collected from various sources"""

Expand Down Expand Up @@ -259,71 +241,49 @@ def setCPUTimeLeft(self, cpuTimeLeft=None):
return S_ERROR("Wrong type for setCPUTimeLeft argument")

#############################################################################
def available(self, jobIDList=None):
def available(self):
"""This method returns the number of available slots in the target CE. The CE
instance polls for waiting and running jobs and compares to the limits
in the CE parameters.
:param list jobIDList: list of already existing job IDs to be checked against
"""
result = self.getCEStatus()
if not result["OK"]:
return result

# If there are no already registered jobs
if jobIDList is not None and not jobIDList:
result = S_OK()
result["RunningJobs"] = 0
result["WaitingJobs"] = 0
result["SubmittedJobs"] = 0
else:
result = self.getCEStatus()
if not result["OK"]:
return result
runningJobs = result["RunningJobs"]
waitingJobs = result["WaitingJobs"]
submittedJobs = result["SubmittedJobs"]
availableProcessors = result.get("AvailableProcessors")
ceInfoDict = dict(result)

maxTotalJobs = int(self.ceParameters.get("MaxTotalJobs", 0))
ceInfoDict["MaxTotalJobs"] = maxTotalJobs
waitingToRunningRatio = float(self.ceParameters.get("WaitingToRunningRatio", 0.0))
# if there are no Running job we can submit to get at most 'MaxWaitingJobs'
# if there are Running jobs we can increase this to get a ratio W / R 'WaitingToRunningRatio'
maxWaitingJobs = int(max(int(self.ceParameters.get("MaxWaitingJobs", 0)), runningJobs * waitingToRunningRatio))
maxWaitingJobs = int(self.ceParameters.get("MaxWaitingJobs", 0))
ceInfoDict["MaxWaitingJobs"] = maxWaitingJobs

self.log.verbose("Max Number of Jobs:", maxTotalJobs)
self.log.verbose("Max W/R Ratio:", waitingToRunningRatio)
self.log.verbose("Max Waiting Jobs:", maxWaitingJobs)

# Determine how many more jobs can be submitted
message = f"{self.ceName} CE: SubmittedJobs={submittedJobs}"
message += f", WaitingJobs={waitingJobs}, RunningJobs={runningJobs}"
# If we reached the maximum number of total jobs, then the CE is not available
totalJobs = runningJobs + waitingJobs

message += f", MaxTotalJobs={maxTotalJobs}"

if totalJobs >= maxTotalJobs:
self.log.verbose("Max Number of Jobs reached:", maxTotalJobs)
self.log.verbose("Max Number of Jobs reached:", f"{totalJobs} >= {maxTotalJobs}")
result["Value"] = 0
message = "There are {} waiting jobs and total jobs {} >= {} max total jobs".format(
waitingJobs,
totalJobs,
maxTotalJobs,
)
else:
additionalJobs = 0
if waitingJobs < maxWaitingJobs:
additionalJobs = maxWaitingJobs - waitingJobs
if totalJobs + additionalJobs >= maxTotalJobs:
additionalJobs = maxTotalJobs - totalJobs
# For SSH CE case
if int(self.ceParameters.get("MaxWaitingJobs", 0)) == 0:
additionalJobs = maxTotalJobs - runningJobs

if availableProcessors is not None:
additionalJobs = min(additionalJobs, availableProcessors)
result["Value"] = additionalJobs

result["Message"] = message
return result

# If we reached the maximum number of waiting jobs, then the CE is not available
if waitingJobs >= maxWaitingJobs:
self.log.verbose("Max Number of waiting jobs reached:", f"{waitingJobs} >= {maxWaitingJobs}")
result["Value"] = 0
return result

additionalJobs = maxWaitingJobs - waitingJobs
if totalJobs + additionalJobs >= maxTotalJobs:
additionalJobs = maxTotalJobs - totalJobs

if availableProcessors is not None:
additionalJobs = min(additionalJobs, availableProcessors)
result["Value"] = additionalJobs

result["CEInfoDict"] = ceInfoDict
return result

Expand Down
Loading

0 comments on commit 4950a54

Please sign in to comment.