diff --git a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py index 3f437c8d241..45a3dcc6d0c 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -1,7 +1,9 @@ """ TaskQueueDB class is a front-end to the task queues db """ +from collections import defaultdict import random import string +from typing import Any from DIRAC import S_ERROR, S_OK, gConfig from DIRAC.ConfigurationSystem.Client.Helpers import Registry @@ -1162,57 +1164,13 @@ def __setPrioritiesForEntity(self, user, userGroup, share, connObj=False, consol tqDict = dict(result["Value"]) if not tqDict: return S_OK() - # Calculate Sum of priorities - totalPrio = 0 - for k in tqDict: - if tqDict[k] > 0.1 or not allowBgTQs: - totalPrio += tqDict[k] - # Update prio for each TQ - for tqId in tqDict: - if tqDict[tqId] > 0.1 or not allowBgTQs: - prio = (share / totalPrio) * tqDict[tqId] - else: - prio = TQ_MIN_SHARE - prio = max(prio, TQ_MIN_SHARE) - tqDict[tqId] = prio - # Generate groups of TQs that will have the same prio=sum(prios) maomenos result = self.retrieveTaskQueues(list(tqDict)) if not result["OK"]: return result allTQsData = result["Value"] - tqGroups = {} - for tqid in allTQsData: - tqData = allTQsData[tqid] - for field in ("Jobs", "Priority") + priorityIgnoredFields: - if field in tqData: - tqData.pop(field) - tqHash = [] - for f in sorted(tqData): - tqHash.append(f"{f}:{tqData[f]}") - tqHash = "|".join(tqHash) - if tqHash not in tqGroups: - tqGroups[tqHash] = [] - tqGroups[tqHash].append(tqid) - tqGroups = [tqGroups[td] for td in tqGroups] - - # Do the grouping - for tqGroup in tqGroups: - totalPrio = 0 - if len(tqGroup) < 2: - continue - for tqid in tqGroup: - totalPrio += tqDict[tqid] - for tqid in tqGroup: - tqDict[tqid] = totalPrio - - # Group by priorities - prioDict = {} - for tqId in tqDict: - prio = tqDict[tqId] - if prio not in prioDict: - prioDict[prio] = [] - prioDict[prio].append(tqId) + + prioDict = calculate_priority(tqDict, allTQsData, share, allowBgTQs) # Execute updates for prio, tqs in prioDict.items(): @@ -1235,3 +1193,61 @@ def getGroupShares(): for group in groups: shares[group] = gConfig.getValue(f"/Registry/Groups/{group}/JobShare", DEFAULT_GROUP_SHARE) return shares + + +def calculate_priority( + tq_dict: dict[int, float], all_tqs_data: dict[int, dict[str, Any]], share: float, allow_bg_tqs: bool +) -> dict[float, list[int]]: + """ + Calculate the priority for each TQ given a share + + :param tq_dict: dict of {tq_id: prio} + :param all_tqs_data: dict of {tq_id: {tq_data}}, where tq_data is a dict of {field: value} + :param share: share to be distributed among TQs + :param allow_bg_tqs: allow background TQs to be used + :return: dict of {priority: [tq_ids]} + """ + + def is_background(tq_priority: float, allow_bg_tqs: bool) -> bool: + """ + A TQ is background if its priority is below a threshold and background TQs are allowed + """ + return tq_priority <= 0.1 and allow_bg_tqs + + # Calculate Sum of priorities of non background TQs + total_prio = sum([prio for prio in tq_dict.values() if not is_background(prio, allow_bg_tqs)]) + + # Update prio for each TQ + for tq_id, tq_priority in tq_dict.items(): + if is_background(tq_priority, allow_bg_tqs): + prio = TQ_MIN_SHARE + else: + prio = max((share / total_prio) * tq_priority, TQ_MIN_SHARE) + tq_dict[tq_id] = prio + + # Generate groups of TQs that will have the same prio=sum(prios) maomenos + tq_groups: dict[str, list[int]] = defaultdict(list) + for tq_id, tq_data in all_tqs_data.items(): + for field in ("Jobs", "Priority") + priorityIgnoredFields: + if field in tq_data: + tq_data.pop(field) + tq_hash = [] + for f in sorted(tq_data): + tq_hash.append(f"{f}:{tq_data[f]}") + tq_hash = "|".join(tq_hash) + # if tq_hash not in tq_groups: + # tq_groups[tq_hash] = [] + tq_groups[tq_hash].append(tq_id) + + # Do the grouping + for tq_group in tq_groups.values(): + total_prio = sum(tq_dict[tq_id] for tq_id in tq_group) + for tq_id in tq_group: + tq_dict[tq_id] = total_prio + + # Group by priorities + result: dict[float, list[int]] = defaultdict(list) + for tq_id, tq_priority in tq_dict.items(): + result[tq_priority].append(tq_id) + + return result diff --git a/src/DIRAC/WorkloadManagementSystem/DB/tests/Test_TaskQueueDB.py b/src/DIRAC/WorkloadManagementSystem/DB/tests/Test_TaskQueueDB.py new file mode 100644 index 00000000000..c5cfa23f8d4 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/DB/tests/Test_TaskQueueDB.py @@ -0,0 +1,216 @@ +import math +from typing import Any + +import pytest +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TQ_MIN_SHARE, calculate_priority + + +@pytest.mark.parametrize("allow_bg_tqs", [True, False]) +@pytest.mark.parametrize("share", [0.5, 1.0, 2.0]) +def test_calculate_priority_empty_entry(share: float, allow_bg_tqs: bool) -> None: + """test of the calculate_priority function""" + # Arrange + tq_dict: dict[int, float] = {} + all_tqs_data: dict[int, dict[str, Any]] = {} + + # Act + result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs) + + # Assert + assert isinstance(result, dict) + assert len(result.keys()) == 0 + + +@pytest.mark.parametrize("allow_bg_tqs", [True, False]) +@pytest.mark.parametrize("share", [0.5, 1.0, 2.0]) +def test_calculate_priority_different_priority_same_number_of_jobs(share: float, allow_bg_tqs: bool) -> None: + """test of the calculate_priority function""" + # Arrange + tq_dict: dict[int, float] = { + 1: 3.0, + 2: 2.0, + 3: 0.3, + } + all_tqs_data: dict[int, dict[str, Any]] = { + 1: { + "Priority": 3.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + 2: { + "Priority": 2.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + 3: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + } + + # Act + result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs) + + # Assert + assert isinstance(result, dict) + assert len(result.keys()) == 1 + key, value = result.popitem() + assert key == pytest.approx(share) + assert value == [1, 2, 3] + assert all(prio >= TQ_MIN_SHARE for prio in result.keys()) + + +@pytest.mark.parametrize("allow_bg_tqs", [True, False]) +@pytest.mark.parametrize("share", [0.5, 1.0, 2.0]) +def test_calculate_priority_same_cpu_time(share: float, allow_bg_tqs: bool) -> None: + """test of the calculate_priority function""" + # Arrange + + # NOTE: the priority value from the tq_dict is not used in the calculation + # because all task queues end up in the same "priority group" let's say + tq_dict: dict[int, float] = { + 1: 3.0, + 2: 2.0, + 3: 0.3, + } + all_tqs_data: dict[int, dict[str, Any]] = { + 1: { + "Priority": 1.0, + "Jobs": 100, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + 2: { + "Priority": 2.0, + "Jobs": 14, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + 3: { + "Priority": 1.0, + "Jobs": 154, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + } + + # Act + result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs) + + # Assert + # All the tqs are supporsed to be regrouped in the same priority group + # even though they have different priority values (same cpu time) + assert isinstance(result, dict) + assert len(result.keys()) == 1 + priority = set(result.keys()).pop() + assert priority == pytest.approx(share) + assert result[priority] == [1, 2, 3] + assert all(prio >= TQ_MIN_SHARE for prio in result.keys()) + + +@pytest.mark.parametrize("allow_bg_tqs", [True, False]) +@pytest.mark.parametrize("share", [0.5, 1.0, 2.0]) +def test_calculate_priority_different_cpu_time(share: float, allow_bg_tqs: bool) -> None: + """test of the calculate_priority function""" + # Arrange + tq_dict: dict[int, float] = { + 1: 1.0, + 2: 1.0, + 3: 1.0, + } + all_tqs_data: dict[int, dict[str, Any]] = { + 1: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 150000, + }, + 2: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 100000, + }, + 3: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + } + + # Act + result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs) + + # Assert + assert isinstance(result, dict) + assert len(result.keys()) == 1 + priority = set(result.keys()).pop() + assert priority == pytest.approx(share / 3) # different group category + assert result[priority] == [1, 2, 3] + + +@pytest.mark.parametrize("allow_bg_tqs", [True, False]) +@pytest.mark.parametrize("share", [0.5, 1.0, 2.0]) +def test_calculate_priority_different_priority_different_number_of_jobs_different_cpu_time( + share: float, allow_bg_tqs: bool +) -> None: + """test of the calculate_priority function""" + # Arrange + tq_dict: dict[int, float] = { + 1: 5.0, + 2: 3.0, + 3: 2.0, + } + all_tqs_data: dict[int, dict[str, Any]] = { + 1: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 150000, + }, + 2: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 100000, + }, + 3: { + "Priority": 1.0, + "Jobs": 1, + "Owner": "userName", + "OwnerGroup": "myGroup", + "CPUTime": 50000, + }, + } + + # Act + result = calculate_priority(tq_dict, all_tqs_data, share, allow_bg_tqs) + + # Assert + assert isinstance(result, dict) + assert sum(result.keys()) == pytest.approx(share) + assert len(result.keys()) == 3 + + for priority in result.keys(): + # assert that each key is in the following list at maximum epsilon distance + delta = math.inf + for expected_priority in [share * 0.5, share * 0.3, share * 0.2]: + delta = min(delta, abs(priority - expected_priority)) + assert delta < 1e-6 + assert len(result[priority]) == 1