diff --git a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py index dd5ea79be74..eab2a53d76f 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -1159,57 +1159,13 @@ def __setPrioritiesForEntity(self, user, userGroup, share, connObj=False, consol tqDict = dict(result["Value"]) if not tqDict: return S_OK() - # Calculate Sum of priorities - totalPrio = 0 - for k in tqDict: - if tqDict[k] > 0.1 or not allowBgTQs: - totalPrio += tqDict[k] - # Update prio for each TQ - for tqId in tqDict: - if tqDict[tqId] > 0.1 or not allowBgTQs: - prio = (share / totalPrio) * tqDict[tqId] - else: - prio = TQ_MIN_SHARE - prio = max(prio, TQ_MIN_SHARE) - tqDict[tqId] = prio - # Generate groups of TQs that will have the same prio=sum(prios) maomenos result = self.retrieveTaskQueues(list(tqDict)) if not result["OK"]: return result allTQsData = result["Value"] - tqGroups = {} - for tqid in allTQsData: - tqData = allTQsData[tqid] - for field in ("Jobs", "Priority") + priorityIgnoredFields: - if field in tqData: - tqData.pop(field) - tqHash = [] - for f in sorted(tqData): - tqHash.append(f"{f}:{tqData[f]}") - tqHash = "|".join(tqHash) - if tqHash not in tqGroups: - tqGroups[tqHash] = [] - tqGroups[tqHash].append(tqid) - tqGroups = [tqGroups[td] for td in tqGroups] - - # Do the grouping - for tqGroup in tqGroups: - totalPrio = 0 - if len(tqGroup) < 2: - continue - for tqid in tqGroup: - totalPrio += tqDict[tqid] - for tqid in tqGroup: - tqDict[tqid] = totalPrio - - # Group by priorities - prioDict = {} - for tqId in tqDict: - prio = tqDict[tqId] - if prio not in prioDict: - prioDict[prio] = [] - prioDict[prio].append(tqId) + + prioDict = calculate_priority(tqDict, allTQsData, share, allowBgTQs) # Execute updates for prio, tqs in prioDict.items(): @@ -1232,3 +1188,55 @@ def getGroupShares(): for group in groups: shares[group] = gConfig.getValue(f"/Registry/Groups/{group}/JobShare", DEFAULT_GROUP_SHARE) return shares + + +def calculate_priority(tqDict, allTQsData, share, allowBgTQs): + # Calculate Sum of priorities + totalPrio = 0 + for k in tqDict: + if tqDict[k] > 0.1 or not allowBgTQs: + totalPrio += tqDict[k] + # Update prio for each TQ + for tqId in tqDict: + if tqDict[tqId] > 0.1 or not allowBgTQs: + prio = (share / totalPrio) * tqDict[tqId] + else: + prio = TQ_MIN_SHARE + prio = max(prio, TQ_MIN_SHARE) + tqDict[tqId] = prio + + # Generate groups of TQs that will have the same prio=sum(prios) maomenos + tqGroups = {} + for tqid in allTQsData: + tqData = allTQsData[tqid] + for field in ("Jobs", "Priority") + priorityIgnoredFields: + if field in tqData: + tqData.pop(field) + tqHash = [] + for f in sorted(tqData): + tqHash.append(f"{f}:{tqData[f]}") + tqHash = "|".join(tqHash) + if tqHash not in tqGroups: + tqGroups[tqHash] = [] + tqGroups[tqHash].append(tqid) + tqGroups = [tqGroups[td] for td in tqGroups] + + # Do the grouping + for tqGroup in tqGroups: + totalPrio = 0 + if len(tqGroup) < 2: + continue + for tqid in tqGroup: + totalPrio += tqDict[tqid] + for tqid in tqGroup: + tqDict[tqid] = totalPrio + + # Group by priorities + prioDict = {} + for tqId in tqDict: + prio = tqDict[tqId] + if prio not in prioDict: + prioDict[prio] = [] + prioDict[prio].append(tqId) + + return prioDict