Skip to content

Commit

Permalink
feat: refactor taskqueuedb for diracx
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-mazenoux committed Sep 22, 2023
1 parent d011057 commit f33df48
Showing 1 changed file with 54 additions and 46 deletions.
100 changes: 54 additions & 46 deletions src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,57 +1159,13 @@ def __setPrioritiesForEntity(self, user, userGroup, share, connObj=False, consol
tqDict = dict(result["Value"])
if not tqDict:
return S_OK()
# Calculate Sum of priorities
totalPrio = 0
for k in tqDict:
if tqDict[k] > 0.1 or not allowBgTQs:
totalPrio += tqDict[k]
# Update prio for each TQ
for tqId in tqDict:
if tqDict[tqId] > 0.1 or not allowBgTQs:
prio = (share / totalPrio) * tqDict[tqId]
else:
prio = TQ_MIN_SHARE
prio = max(prio, TQ_MIN_SHARE)
tqDict[tqId] = prio

# Generate groups of TQs that will have the same prio=sum(prios) maomenos
result = self.retrieveTaskQueues(list(tqDict))
if not result["OK"]:
return result
allTQsData = result["Value"]
tqGroups = {}
for tqid in allTQsData:
tqData = allTQsData[tqid]
for field in ("Jobs", "Priority") + priorityIgnoredFields:
if field in tqData:
tqData.pop(field)
tqHash = []
for f in sorted(tqData):
tqHash.append(f"{f}:{tqData[f]}")
tqHash = "|".join(tqHash)
if tqHash not in tqGroups:
tqGroups[tqHash] = []
tqGroups[tqHash].append(tqid)
tqGroups = [tqGroups[td] for td in tqGroups]

# Do the grouping
for tqGroup in tqGroups:
totalPrio = 0
if len(tqGroup) < 2:
continue
for tqid in tqGroup:
totalPrio += tqDict[tqid]
for tqid in tqGroup:
tqDict[tqid] = totalPrio

# Group by priorities
prioDict = {}
for tqId in tqDict:
prio = tqDict[tqId]
if prio not in prioDict:
prioDict[prio] = []
prioDict[prio].append(tqId)

prioDict = calculate_priority(tqDict, allTQsData, share, allowBgTQs)

# Execute updates
for prio, tqs in prioDict.items():
Expand All @@ -1232,3 +1188,55 @@ def getGroupShares():
for group in groups:
shares[group] = gConfig.getValue(f"/Registry/Groups/{group}/JobShare", DEFAULT_GROUP_SHARE)
return shares


def calculate_priority(tqDict, allTQsData, share, allowBgTQs):
# Calculate Sum of priorities
totalPrio = 0
for k in tqDict:
if tqDict[k] > 0.1 or not allowBgTQs:
totalPrio += tqDict[k]
# Update prio for each TQ
for tqId in tqDict:
if tqDict[tqId] > 0.1 or not allowBgTQs:
prio = (share / totalPrio) * tqDict[tqId]
else:
prio = TQ_MIN_SHARE
prio = max(prio, TQ_MIN_SHARE)
tqDict[tqId] = prio

# Generate groups of TQs that will have the same prio=sum(prios) maomenos
tqGroups = {}
for tqid in allTQsData:
tqData = allTQsData[tqid]
for field in ("Jobs", "Priority") + priorityIgnoredFields:
if field in tqData:
tqData.pop(field)
tqHash = []
for f in sorted(tqData):
tqHash.append(f"{f}:{tqData[f]}")
tqHash = "|".join(tqHash)
if tqHash not in tqGroups:
tqGroups[tqHash] = []
tqGroups[tqHash].append(tqid)
tqGroups = [tqGroups[td] for td in tqGroups]

# Do the grouping
for tqGroup in tqGroups:
totalPrio = 0
if len(tqGroup) < 2:
continue
for tqid in tqGroup:
totalPrio += tqDict[tqid]
for tqid in tqGroup:
tqDict[tqid] = totalPrio

# Group by priorities
prioDict = {}
for tqId in tqDict:
prio = tqDict[tqId]
if prio not in prioDict:
prioDict[prio] = []
prioDict[prio].append(tqId)

return prioDict

0 comments on commit f33df48

Please sign in to comment.