Skip to content

Commit

Permalink
feat: add more SD tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aldbr committed Nov 16, 2023
1 parent 755d42b commit 78d2301
Show file tree
Hide file tree
Showing 22 changed files with 538 additions and 471 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ The following columns are provided:
**Benchmark**
Estimation of the power of the Worker Node CPU which is running the Pilot Job. If 0, the estimation was not possible.

**TaskQueueID**
Internal DIRAC WMS identifier of the Task Queue for which the Pilot Job is sent.

**PilotID**
Internal DIRAC WMS Pilot Job identifier

Expand Down
26 changes: 17 additions & 9 deletions src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags=
for site in sites:
if siteList and site not in siteList:
continue
if community:
comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/VO", [])
if comList and community.lower() not in [cl.lower() for cl in comList]:
continue

siteCEParameters = {}
result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs")
if result["OK"]:
Expand All @@ -275,10 +272,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags=
continue
if ceList and ce not in ceList:
continue
if community:
comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO", [])
if comList and community.lower() not in [cl.lower() for cl in comList]:
continue

ceOptionsDict = dict(siteCEParameters)
result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs/{ce}")
if not result["OK"]:
Expand All @@ -290,9 +284,23 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags=
queues = result["Value"]
for queue in queues:
if community:
comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO", [])
# Community can be defined on site, CE or queue level
paths = [
f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO",
f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO",
f"/Resources/Sites/{grid}/{site}/VO",
]

# Try each path in order, stopping when we find a non-empty list
for path in paths:
comList = gConfig.getValue(path, [])
if comList:
break

# If we found a list and the community is not in it, skip this iteration
if comList and community.lower() not in [cl.lower() for cl in comList]:
continue

if tags:
queueTags = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/Tag", [])
queueTags = set(ceTags + queueTags)
Expand Down
3 changes: 1 addition & 2 deletions src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
'PilotJobReference': 'https://marlb.in2p3.fr:9000/biq6KT45Q',
'PilotStamp': '',
'Status': 'Done',
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52),
'TaskQueueID': 399L}}
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}}
"""
from DIRAC.Core.Base.Script import Script

Expand Down
3 changes: 1 addition & 2 deletions src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
'PilotJobReference': 'https://marlb.in2p3.fr:9000/2KHFrQjkw',
'PilotStamp': '',
'Status': 'Done',
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52),
'TaskQueueID': 399L}}
'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}}
"""
from DIRAC.Core.Base.Script import Script

Expand Down
3 changes: 1 addition & 2 deletions src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ def __init__(self):

super().__init__()

self.keyFields = ["TaskQueueID", "GridSite", "GridType", "Status"]
self.keyFields = ["GridSite", "GridType", "Status"]

self.monitoringFields = ["NumOfPilots"]

self.index = "pilotshistory_index"

self.addMapping(
{
"TaskQueueID": {"type": "keyword"},
"GridSite": {"type": "keyword"},
"GridType": {"type": "keyword"},
"Status": {"type": "keyword"},
Expand Down
6 changes: 5 additions & 1 deletion src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,11 @@ def getCEStatus(self):
self.log.error("Failed getting the status of the CE.", result["Message"])
return S_ERROR("Failed getting the status of the CE")
response = result["Value"]
ceData = response.json()
try:
ceData = response.json()
except requests.JSONDecodeError:
self.log.exception("Failed decoding the status of the CE")
return S_ERROR(f"Failed decoding the status of the CE")

# Look only in the relevant section out of the headache
queueInfo = ceData["Domains"]["AdminDomain"]["Services"]["ComputingService"]["ComputingShare"]
Expand Down
8 changes: 2 additions & 6 deletions src/DIRAC/Resources/Computing/CloudComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ def _getMetadata(self, executableFile, pilotStamp=""):
template = yaml.safe_load(template_fd)
for filedef in template["write_files"]:
if filedef["content"] == "PROXY_STR":
filedef["content"] = self.proxy
filedef["content"] = self.proxy.dumpAllToString()["Value"]
elif filedef["content"] == "EXECUTABLE_STR":
filedef["content"] = exe_str
elif "STAMP_STR" in filedef["content"]:
Expand Down Expand Up @@ -398,11 +398,7 @@ def _renewCloudProxy(self):
if not res["OK"]:
self.log.error("Could not download proxy", res["Message"])
return False
resdump = res["Value"].dumpAllToString()
if not resdump["OK"]:
self.log.error("Failed to dump proxy to string", resdump["Message"])
return False
self.proxy = resdump["Value"]
self.proxy = res["Value"]
self.valid = datetime.datetime.utcnow() + proxyLifetime * datetime.timedelta(seconds=1)
return True

Expand Down
3 changes: 1 addition & 2 deletions src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def available(self):
self.log.verbose("Max Number of Jobs:", maxTotalJobs)
self.log.verbose("Max Waiting Jobs:", maxWaitingJobs)

result["CEInfoDict"] = ceInfoDict
# If we reached the maximum number of total jobs, then the CE is not available
totalJobs = runningJobs + waitingJobs
if totalJobs >= maxTotalJobs:
Expand All @@ -283,8 +284,6 @@ def available(self):
if availableProcessors is not None:
additionalJobs = min(additionalJobs, availableProcessors)
result["Value"] = additionalJobs

result["CEInfoDict"] = ceInfoDict
return result

#############################################################################
Expand Down
Loading

0 comments on commit 78d2301

Please sign in to comment.