Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] Simplify and speed up the SiteDirector #7110

Merged
merged 5 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ you should carefully read the RFC 18, and what follows.

Pilot commands can be extended. A custom list of commands can be added starting the pilot with the -X option.

Pilots started when controlled by the SiteDirector
==================================================

The :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` is a central component in DIRAC,
responsible for managing and optimizing the submission of pilot jobs to various computing resources. It features:

- *Parallel Submission*: Capable of submitting pilot jobs in parallel across different Computing Elements (CEs) to enhance throughput.
- :py:mod:`~DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy`: It utilizes various submission policies to optimize pilot-job distribution:
- *AggressiveFilling*: Fills available slots regardless of waiting jobs, ideal for continuously busy sites.
- *WaitingSupportedJobs* (default one): Fills slots based on the number of waiting jobs, suitable for intermittently busy sites.
- *Monitoring and Accounting*: Features parallel monitoring and accounting for efficient tracking and management of pilot jobs.
- *Pilot Wrapping*: Creates pilot wrappers that facilitate the execution of pilot scripts in diverse environments, including Grid, cloud, and virtualized resources.
- *Resource Status Handling*: Integrates with the Resource Status System to ensure that pilots are only submitted to operational and enabled resources.

The Site Director is controlled through different parameters set in the DIRAC configuration. More details in :py:mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector`.

Pilots started when not controlled by the SiteDirector
======================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ The following columns are provided:
**Benchmark**
Estimation of the power of the Worker Node CPU which is running the Pilot Job. If 0, the estimation was not possible.

**TaskQueueID**
Internal DIRAC WMS identifier of the Task Queue for which the Pilot Job is sent.

**PilotID**
Internal DIRAC WMS Pilot Job identifier

Expand Down
26 changes: 17 additions & 9 deletions src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags=
for site in sites:
if siteList and site not in siteList:
continue
if community:
comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/VO", [])
if comList and community.lower() not in [cl.lower() for cl in comList]:
continue

siteCEParameters = {}
result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs")
if result["OK"]:
Expand All @@ -272,10 +269,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags=
continue
if ceList and ce not in ceList:
continue
if community:
comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO", [])
if comList and community.lower() not in [cl.lower() for cl in comList]:
continue

ceOptionsDict = dict(siteCEParameters)
result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs/{ce}")
if not result["OK"]:
Expand All @@ -287,9 +281,23 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags=
queues = result["Value"]
for queue in queues:
if community:
comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO", [])
# Community can be defined on site, CE or queue level
paths = [
f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO",
f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO",
f"/Resources/Sites/{grid}/{site}/VO",
]

# Try each path in order, stopping when we find a non-empty list
for path in paths:
comList = gConfig.getValue(path, [])
if comList:
break

# If we found a list and the community is not in it, skip this iteration
if comList and community.lower() not in [cl.lower() for cl in comList]:
continue

if tags:
queueTags = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/Tag", [])
queueTags = set(ceTags + queueTags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
'PilotJobReference': 'https://marlb.in2p3.fr:9000/biq6KT45Q',
'PilotStamp': '',
'Status': 'Done',
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52),
'TaskQueueID': 399L}}
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}}
"""
from DIRAC.Core.Base.Script import Script

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
'PilotJobReference': 'https://marlb.in2p3.fr:9000/2KHFrQjkw',
'PilotStamp': '',
'Status': 'Done',
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52),
'TaskQueueID': 399L}}
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}}
"""
from DIRAC.Core.Base.Script import Script

Expand Down
3 changes: 1 addition & 2 deletions src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ def __init__(self):

super().__init__()

self.keyFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
self.keyFields = ["GridSite", "GridType", "Status"]

self.monitoringFields = ["NumOfPilots"]

self.index = "pilotshistory_index"

self.addMapping(
{
"TaskQueueID": {"type": "keyword"},
"GridSite": {"type": "keyword"},
"GridType": {"type": "keyword"},
"Status": {"type": "keyword"},
Expand Down
7 changes: 5 additions & 2 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def setToken(self, token):
"""Set the token and update the headers

:param token: OAuth2Token object or dictionary containing token structure
:param int valid: validity period in seconds
"""
super().setToken(token)
self.headers["Authorization"] = "Bearer " + self.token["access_token"]
Expand Down Expand Up @@ -619,7 +618,11 @@ def getCEStatus(self):
self.log.error("Failed getting the status of the CE.", result["Message"])
return S_ERROR("Failed getting the status of the CE")
response = result["Value"]
ceData = response.json()
try:
ceData = response.json()
except requests.JSONDecodeError:
self.log.exception("Failed decoding the status of the CE")
return S_ERROR(f"Failed decoding the status of the CE")

# Look only in the relevant section out of the headache
queueInfo = ceData["Domains"]["AdminDomain"]["Services"]["ComputingService"]["ComputingShare"]
Expand Down
8 changes: 2 additions & 6 deletions src/DIRAC/Resources/Computing/CloudComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _getMetadata(self, executableFile, pilotStamp=""):
template = yaml.safe_load(template_fd)
for filedef in template["write_files"]:
if filedef["content"] == "PROXY_STR":
filedef["content"] = self.proxy
filedef["content"] = self.proxy.dumpAllToString()["Value"]
elif filedef["content"] == "EXECUTABLE_STR":
filedef["content"] = exe_str
elif "STAMP_STR" in filedef["content"]:
Expand Down Expand Up @@ -398,11 +398,7 @@ def _renewCloudProxy(self):
if not res["OK"]:
self.log.error("Could not download proxy", res["Message"])
return False
resdump = res["Value"].dumpAllToString()
if not resdump["OK"]:
self.log.error("Failed to dump proxy to string", resdump["Message"])
return False
self.proxy = resdump["Value"]
self.proxy = res["Value"]
self.valid = datetime.datetime.utcnow() + proxyLifetime * datetime.timedelta(seconds=1)
return True

Expand Down
90 changes: 25 additions & 65 deletions src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def __init__(self, ceName):
self.minProxyTime = gConfig.getValue("/Registry/MinProxyLifeTime", 10800) # secs
self.defaultProxyTime = gConfig.getValue("/Registry/DefaultProxyLifeTime", 43200) # secs
self.proxyCheckPeriod = gConfig.getValue("/Registry/ProxyCheckingPeriod", 3600) # secs
self.valid = None

self.batchSystem = None
self.taskResults = {}
Expand All @@ -97,10 +96,9 @@ def __init__(self, ceName):
self.initializeParameters()
self.log.debug("CE parameters", self.ceParameters)

def setProxy(self, proxy, valid=0):
def setProxy(self, proxy):
"""Set proxy for this instance"""
self.proxy = proxy
self.valid = datetime.datetime.utcnow() + second * valid

def setToken(self, token):
self.token = token
Expand All @@ -116,21 +114,6 @@ def _prepareProxy(self):
self.log.debug(f"Set proxy variable X509_USER_PROXY to {os.environ['X509_USER_PROXY']}")
return S_OK()

def isProxyValid(self, valid=1000):
"""Check if the stored proxy is valid"""
if not self.valid:
result = S_ERROR("Proxy is not valid for the requested length")
result["Value"] = 0
return result
delta = self.valid - datetime.datetime.utcnow()
totalSeconds = delta.days * 86400 + delta.seconds
if totalSeconds > valid:
return S_OK(totalSeconds - valid)

result = S_ERROR("Proxy is not valid for the requested length")
result["Value"] = totalSeconds - valid
return result

def initializeParameters(self):
"""Initialize the CE parameters after they are collected from various sources"""

Expand Down Expand Up @@ -258,72 +241,49 @@ def setCPUTimeLeft(self, cpuTimeLeft=None):
return S_ERROR("Wrong type for setCPUTimeLeft argument")

#############################################################################
def available(self, jobIDList=None):
def available(self):
"""This method returns the number of available slots in the target CE. The CE
instance polls for waiting and running jobs and compares to the limits
in the CE parameters.

:param list jobIDList: list of already existing job IDs to be checked against
"""
result = self.getCEStatus()
if not result["OK"]:
return result

# If there are no already registered jobs
if jobIDList is not None and not jobIDList:
result = S_OK()
result["RunningJobs"] = 0
result["WaitingJobs"] = 0
result["SubmittedJobs"] = 0
else:
result = self.getCEStatus()
if not result["OK"]:
return result
runningJobs = result["RunningJobs"]
waitingJobs = result["WaitingJobs"]
submittedJobs = result["SubmittedJobs"]
availableProcessors = result.get("AvailableProcessors")
ceInfoDict = dict(result)

maxTotalJobs = int(self.ceParameters.get("MaxTotalJobs", 0))
ceInfoDict["MaxTotalJobs"] = maxTotalJobs
waitingToRunningRatio = float(self.ceParameters.get("WaitingToRunningRatio", 0.0))
# if there are no Running job we can submit to get at most 'MaxWaitingJobs'
# if there are Running jobs we can increase this to get a ratio W / R 'WaitingToRunningRatio'
maxWaitingJobs = int(max(int(self.ceParameters.get("MaxWaitingJobs", 0)), runningJobs * waitingToRunningRatio))
maxWaitingJobs = int(self.ceParameters.get("MaxWaitingJobs", 0))
ceInfoDict["MaxWaitingJobs"] = maxWaitingJobs

self.log.verbose("Max Number of Jobs:", maxTotalJobs)
self.log.verbose("Max W/R Ratio:", waitingToRunningRatio)
self.log.verbose("Max Waiting Jobs:", maxWaitingJobs)

# Determine how many more jobs can be submitted
message = f"{self.ceName} CE: SubmittedJobs={submittedJobs}"
message += f", WaitingJobs={waitingJobs}, RunningJobs={runningJobs}"
result["CEInfoDict"] = ceInfoDict
# If we reached the maximum number of total jobs, then the CE is not available
totalJobs = runningJobs + waitingJobs

message += f", MaxTotalJobs={maxTotalJobs}"

if totalJobs >= maxTotalJobs:
self.log.verbose("Max Number of Jobs reached:", maxTotalJobs)
self.log.verbose("Max Number of Jobs reached:", f"{totalJobs} >= {maxTotalJobs}")
result["Value"] = 0
message = "There are {} waiting jobs and total jobs {} >= {} max total jobs".format(
waitingJobs,
totalJobs,
maxTotalJobs,
)
else:
additionalJobs = 0
if waitingJobs < maxWaitingJobs:
additionalJobs = maxWaitingJobs - waitingJobs
if totalJobs + additionalJobs >= maxTotalJobs:
additionalJobs = maxTotalJobs - totalJobs
# For SSH CE case
if int(self.ceParameters.get("MaxWaitingJobs", 0)) == 0:
additionalJobs = maxTotalJobs - runningJobs

if availableProcessors is not None:
additionalJobs = min(additionalJobs, availableProcessors)
result["Value"] = additionalJobs

result["Message"] = message
result["CEInfoDict"] = ceInfoDict
return result

# If we reached the maximum number of waiting jobs, then the CE is not available
if waitingJobs >= maxWaitingJobs:
self.log.verbose("Max Number of waiting jobs reached:", f"{waitingJobs} >= {maxWaitingJobs}")
result["Value"] = 0
return result

additionalJobs = maxWaitingJobs - waitingJobs
if totalJobs + additionalJobs >= maxTotalJobs:
additionalJobs = maxTotalJobs - totalJobs

if availableProcessors is not None:
additionalJobs = min(additionalJobs, availableProcessors)
result["Value"] = additionalJobs
return result

#############################################################################
Expand Down
5 changes: 2 additions & 3 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,14 @@ def __init__(self, ceUniqueID):
self.errorTemplate = ""

############################################################################
def setProxy(self, proxy, valid=0):
def setProxy(self, proxy):
"""
Set and prepare proxy to use

:param str proxy: proxy to use
:param int valid: proxy validity period
:return: S_OK/S_ERROR
"""
ComputingElement.setProxy(self, proxy, valid)
ComputingElement.setProxy(self, proxy)
if self.ceParameters.get("SSHType", "ssh") == "gsissh":
result = self._prepareProxy()
if not result["OK"]:
Expand Down
2 changes: 0 additions & 2 deletions src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,6 @@ def _buildQueueDict(self, siteNames, ces, ceTypes):
result = getQueuesResolved(
siteDict=result["Value"],
queueCECache=self.queueCECache,
gridEnv=getGridEnv(),
setup=gConfig.getValue("/DIRAC/Setup", "unknown"),
instantiateCEs=True,
)
if not result["OK"]:
Expand Down
Loading
Loading