diff --git a/config/challenging-toy-fewer-tasks.yaml b/config/challenging-toy-fewer-tasks.yaml index 5a16d484..d4185287 100644 --- a/config/challenging-toy-fewer-tasks.yaml +++ b/config/challenging-toy-fewer-tasks.yaml @@ -3,6 +3,7 @@ from_data: data_stem: ../data/challenging_toy_fewer_tasks/toy phase_ids: - 0 + ranks_per_node: 2 check_schema: true overwrite_validator: false @@ -26,6 +27,7 @@ algorithm: fanout: 4 order_strategy: arbitrary transfer_strategy: Clustering + subclustering_disabled: true criterion: Tempered max_objects_per_transfer: 100 deterministic_transfer: true diff --git a/src/lbaf/Execution/lbsAlgorithmBase.py b/src/lbaf/Execution/lbsAlgorithmBase.py index 15d4d86f..08484f5a 100644 --- a/src/lbaf/Execution/lbsAlgorithmBase.py +++ b/src/lbaf/Execution/lbsAlgorithmBase.py @@ -136,6 +136,7 @@ def factory( return algorithm(work_model, parameters, logger) except Exception as e: # Otherwise, error out + logger.error(e) logger.error(f"Could not create an algorithm with name {algorithm_name}") raise SystemExit(1) from e diff --git a/src/lbaf/Execution/lbsClusteringTransferStrategy.py b/src/lbaf/Execution/lbsClusteringTransferStrategy.py index 854c6a68..5fb1c7e8 100644 --- a/src/lbaf/Execution/lbsClusteringTransferStrategy.py +++ b/src/lbaf/Execution/lbsClusteringTransferStrategy.py @@ -45,6 +45,8 @@ import time from itertools import chain, combinations from logging import Logger +from functools import cmp_to_key +from anytree import Node, RenderTree, NodeMixin, LevelOrderIter import numpy.random as nr @@ -52,6 +54,16 @@ from ..Model.lbsRank import Rank from ..Model.lbsPhase import Phase +class Subcluster(NodeMixin): + def __init__(self, index : list, parent=None, children=None): + #super(Subcluster, self).__init__() + self.parent = parent + if children: + self.children = children + self.load = 0.0 + self.index = index + self.leaf_object = None + self.num_objects = 0 class ClusteringTransferStrategy(TransferStrategyBase): """A concrete class for the clustering-based transfer strategy.""" @@ -68,9 +80,25 @@ def __init__(self, criterion, parameters: dict, lgr: Logger): # Determine whether subclustering is performed immediately after swapping self.__separate_subclustering = parameters.get("separate_subclustering", False) + self._logger.info( f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}") + self.__disable_subclustering = parameters.get("subclustering_disabled", False) + self.__after_iteration_subclustering = parameters.get("subclustering_after_iteration", 0) + + if self.__disable_subclustering: + self._logger.info("Subclustering is disabled") + else: + self._logger.info("Subclustering is enabled") + + self._logger.info( + f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}") + + if self.__after_iteration_subclustering > 0: + self._logger.info( + f"Delay subclustering until after iteration: {self.__after_iteration_subclustering}") + # Initialize percentage of maximum load required for subclustering self.__subclustering_threshold = parameters.get("subclustering_threshold", 0.0) self._logger.info( @@ -116,6 +144,256 @@ def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict: k: clusters[k] for k in random.sample(list(clusters.keys()), len(clusters))} + def __build_rank_subclusters_heuristic(self, r_src : Rank, r_try : Rank, ave_load : float) -> set: + """Build a limited set of subclusters using a heuristic to find possible + transfers without a huge amount of computational complexity.""" + + # Bail out early if no clusters are available + if not (clusters := self.__build_rank_clusters(r_src, False)): + self._logger.info(f"No migratable clusters on rank {r_src.get_id()}") + return [] + + src_load = r_src.get_load() + try_load = r_try.get_load() + + subclusters = {} + + amount_over_average = src_load - ave_load; + amount_under_average = ave_load - try_load; + + # print(f"amount_over_average={amount_over_average}, amount_under_average={amount_under_average}") + + clusters_to_subcluster = {} + + for k in clusters.keys(): + v = clusters[k] + cluster_load : float = 0.0 + for o in v: + cluster_load += o.get_load() + #print(f"k={k}: cluster_load={cluster_load}") + if cluster_load < amount_over_average or cluster_load < amount_under_average: + pass + else: + clusters_to_subcluster[k] = v + + # bool operator() (const vt::Index5D &idx1, const vt::Index5D &idx2) const { + # // return true if idx1 should run before idx2 + # // always group tasks from the same subblock because they share memory + # // sort tasks within a subblock in the order of their .z(), .u(), and .v() in that order + + # // preferably run non-homed work first + # bool idx1_homed = idx1.x() == this_rank_; + # bool idx2_homed = idx2.x() == this_rank_; + + # if (idx1_homed xor idx2_homed) { + # if (idx1_homed) { + # // idx1 is homed but idx2 is not + # return false; + # } else { + # // idx2 is homed but idx1 is not + # return true; + # } + # } else if (idx1.x() == idx2.x()) { + # if (idx1.y() == idx2.y()) { + # if (idx1.z() == idx2.z()) { + # if (idx1.u() == idx2.u()) { + # // sort ascending by topo idx within a row idx + # return idx1.v() < idx2.v(); + # } else { + # // sort ascending by row idx within a col idx + # return idx1.u() < idx2.u(); + # } + # } else { + # // sort ascending by col idx within a subblock + # return idx1.z() < idx2.z(); + # } + # } else { + # // sort ascending by subblock within a rank + # return idx1.y() < idx2.y(); + # } + # } else { + # // sort ascending by rank + # return idx1.x() < idx2.x(); + # } + # } + + def object_ordering(o1 : Object, o2 : Object): + return index_ordering(o1.get_index(), o2.get_index()) + + def subcluster_ordering(s1 : Subcluster, s2 : Subcluster): + return index_ordering(s1.index, s2.index) + + def cmp_0(a, b): + return bool(a > b) - bool(a < b) + + def index_ordering(idx1 : list, idx2 : list): + if idx1[0] == idx2[0]: + if idx1[1] == idx2[1]: + if idx1[2] == idx2[2]: + if idx1[3] == idx2[3]: + return cmp_0(idx1[4], idx2[4]) + else: + return cmp_0(idx1[3], idx2[3]) + else: + return cmp_0(idx1[2], idx2[2]) + else: + return cmp_0(idx1[1], idx2[1]) + else: + return cmp_0(idx1[0], idx2[0]) + + def comparison(o : Object): + return o.get_load() + + root = Subcluster([-1,-1,-1,-1,-1]) + + def insertIntoTree(o : Object, node : Subcluster, level_array : list, cur_level : int = 0): + found_child = False + node.load += o.get_load() + node.num_objects += 1 + if cur_level == len(level_array): + node.leaf_object = o + return + for child in node.children: + if child.index[cur_level] == o.get_index()[cur_level]: + found_child = True + level_array[cur_level] = o.get_index()[cur_level] + return insertIntoTree(o, child, level_array, cur_level + 1) + if not found_child: + level_array[cur_level] = o.get_index()[cur_level] + #print(f"inserting node: {level_array.copy()}, level={cur_level}") + new_node = Subcluster(level_array.copy(), parent=node) + return insertIntoTree(o, new_node, level_array, cur_level + 1) + + def eliminateSingleBranches(parent : Subcluster, node : Subcluster): + if node.num_objects == 1 and len(node.children) > 0: + #print("DOING THIS\n") + for n in node.children: + n.parent = parent + node.parent = None + elif len(node.children) >= 1: + for n in node.children: + eliminateSingleBranches(node, n) + + def makeBinaryTree(parent : Subcluster, node : Subcluster): + if len(node.children) > 2: + unsorted_children = list(node.children) + children = sorted(unsorted_children, key=cmp_to_key(subcluster_ordering)) + len1 = int(len(children) / 2) + b1 = children[0:len1] + b2 = children[len1:len(children)] + b1_node = Subcluster(parent.index, parent=parent) + b2_node = Subcluster(parent.index, parent=parent) + for n in b1: + b1_node.load += n.load + n.parent = b1_node + for n in b2: + b2_node.load += n.load + n.parent = b2_node + node.parent = None + + # recurse down the tree + for n in parent.children: + makeBinaryTree(parent, n) + else: + for n in node.children: + makeBinaryTree(node, n) + + def gatherObjects(node : Subcluster) -> list: + list = [] + if node.leaf_object is not None: + list.append(node.leaf_object) + for child in node.children: + list += gatherObjects(child) + return list + + for k in clusters_to_subcluster.keys(): + v = clusters_to_subcluster[k] + for o in v: + insertIntoTree(o, root, [-1,-1,-1,-1,-1]) + + for c in root.children: + eliminateSingleBranches(root, c) + + # print(f"NEW tree: {r_src.get_id()}") + # print(RenderTree(root).by_attr(lambda n: str(n.index) + ": " + "size=" + str(n.num_objects) + " load=" + str(round(n.load)))) + + for c in root.children: + makeBinaryTree(root, c) + + # print(f"NEW tree AFTER: {r_src.get_id()}") + # print(RenderTree(root).by_attr(lambda n: str(n.index) + ": " + "size=" + str(n.num_objects) + " load=" + str(round(n.load)))) + # exit(1) + + for node in LevelOrderIter(root): + objects = gatherObjects(node) + if len(objects) > 1: + #print(f"load={node.load}, len={len(objects)}") + subclusters[tuple(objects)] = node.load + + #print(f"clusters_to_subcluster={clusters_to_subcluster}") + + # for k in clusters_to_subcluster.keys(): + # v = clusters_to_subcluster[k] + # # print(f"k={k}") + + # #sorted(v, key=cmp_to_key(object_ordering)) + # v.sort(key=comparison) + # # print(f"sorted v={v}") + + # load_sum : float = 0.0 + # cluster_set : list = [] + # for o in v: + # load_sum += o.get_load() + # cluster_set.append(o) + + # if load_sum > amount_under_average: + # break + + # subclusters[tuple(cluster_set)] = load_sum + + return sorted(subclusters.keys(), key=subclusters.get) + + def __transfer_subclusters_heuristic(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None: + """Perform feasible subcluster transfers from given rank to possible targets.""" + + # Only do this if this rank is close to the max load + if not r_src.get_load() == max_load: + return + + for r_try in targets: + # Only consider transferring here if this rank is under the mean load + if r_try.get_load() < ave_load: + for o_src in self.__build_rank_subclusters_heuristic(r_src, r_try, ave_load): + c_try = self._criterion.compute(r_src, o_src, r_try) + #print(f"o_src={o_src}, c_try={c_try}") + + objects_load = sum(o.get_load() for o in o_src) + l_dst = math.inf + + # Additional filters prior to subclustering + if c_try <= self.__subclustering_minimum_improvement * r_src.get_load() or \ + r_src.get_load() < self.__subclustering_threshold * max_load: + continue + + l_try = abs(r_try.get_load() + objects_load - ave_load) + if l_try < l_dst: + c_dst, r_dst, l_dst = c_try, r_try, l_try + elif l_try == l_dst and c_try > c_dst: + c_dst, r_dst = c_try, r_try + + # Decide whether transfer is beneficial + self.__n_sub_tries += 1 + if c_dst > 0.0: + # Transfer subcluster and break out + self._logger.info( + f"Transferring subcluster from {r_src.get_id()} with {len(o_src)} objects to rank {r_dst.get_id()} of load {objects_load}") + self._n_transfers += phase.transfer_objects( + r_src, o_src, r_dst) + self.__n_sub_transfers += 1 + break + # Reject subcluster transfer + self._n_rejects += len(o_src) + def __build_rank_subclusters(self, r_src: Rank) -> set: """Build subclusters to bring rank closest and above average load.""" @@ -250,12 +528,19 @@ def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_lo # Reject subcluster transfer self._n_rejects += len(o_src) - def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): + def execute(self, known_peers, phase: Phase, iteration : int, ave_load: float, max_load: float): """Perform object transfer stage.""" + # Initialize transfer stage - self._initialize_transfer_stage(ave_load) + self._initialize_transfer_stage(ave_load, max_load) rank_targets = self._get_ranks_to_traverse(phase.get_ranks(), known_peers) + unsorted_loads = map(lambda x: x.get_load(), phase.get_ranks()) + loads = list(unsorted_loads) + loads.sort() + + print(f"loads: {','.join(str(round(l)) for l in loads)}") + # Iterate over ranks for r_src, targets in rank_targets.items(): # Cluster migratable objects on source rank @@ -280,14 +565,17 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): continue # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + if not self.__disable_subclustering and iteration > self.__after_iteration_subclustering: + # Perform feasible subcluster swaps from given rank to possible targets + targets_prime = filter(lambda t : t.get_load() < ave_load, targets) + self.__transfer_subclusters_heuristic(phase, r_src, targets, ave_load, max_load) # Report on new load and exit from rank self._logger.debug( f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers") # Perform subclustering when it was not previously done - if self.__separate_subclustering: + if self.__separate_subclustering and not self.__disable_subclustering and iteration > self.__after_iteration_subclustering: # In non-deterministic case skip subclustering when swaps passed if self.__n_swaps and not self._deterministic_transfer: self.__n_sub_skipped += len(rank_targets) @@ -295,7 +583,8 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): # Iterate over ranks for r_src, targets in rank_targets.items(): # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + targets_prime = filter(lambda t : t.get_load() < ave_load, targets) + self.__transfer_subclusters_heuristic(phase, r_src, targets_prime, ave_load, max_load) # Report on new load and exit from rank self._logger.debug( diff --git a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py index 23553727..96153177 100644 --- a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py +++ b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py @@ -245,6 +245,7 @@ def execute(self, p_id: int, phases: list, statistics: dict, a_min_max): n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute( self.__known_peers, self._rebalanced_phase, statistics[ "average load"], statistics["maximum load"][-1]) + if (n_proposed := n_transfers + n_rejects): self._logger.info( f"Transferred {n_transfers} objects amongst {n_proposed} proposed " diff --git a/src/lbaf/Execution/lbsRecursiveTransferStrategy.py b/src/lbaf/Execution/lbsRecursiveTransferStrategy.py index 86f97224..16cb65be 100644 --- a/src/lbaf/Execution/lbsRecursiveTransferStrategy.py +++ b/src/lbaf/Execution/lbsRecursiveTransferStrategy.py @@ -100,10 +100,10 @@ def __recursive_extended_search(self, pick_list, objects, c_fct, n_o, max_n_o): # Succeed when criterion is satisfied return True - def execute(self, known_peers, phase: Phase, ave_load: float, _): + def execute(self, known_peers, phase: Phase, iteration : int, ave_load: float, _): """Perform object transfer stage.""" # Initialize transfer stage - self._initialize_transfer_stage(ave_load) + self._initialize_transfer_stage(ave_load, max_load) max_obj_transfers = 0 # Map rank to targets and ordered migratable objects diff --git a/src/lbaf/Execution/lbsTransferStrategyBase.py b/src/lbaf/Execution/lbsTransferStrategyBase.py index dc3ef770..11e7378b 100644 --- a/src/lbaf/Execution/lbsTransferStrategyBase.py +++ b/src/lbaf/Execution/lbsTransferStrategyBase.py @@ -88,12 +88,12 @@ def __init__(self, criterion, parameters: dict, logger: Logger): self._n_transfers = 0 self._n_rejects = 0 - def _initialize_transfer_stage(self, ave_load: float): + def _initialize_transfer_stage(self, ave_load: float, max_load : float): """Initialize transfer stage consistently across strategies.""" # Keep track of average load self._average_load = ave_load - self._logger.info(f"Executing transfer phase with average load: {self._average_load}") + self._logger.info(f"Executing transfer phase with average load: {self._average_load}, and max_load: {max_load}") # Initialize numbers of transfers and rejects self._n_transfers = 0 @@ -196,7 +196,7 @@ def factory( raise SystemExit(1) from error @abc.abstractmethod - def execute(self, known_peers: dict, phase, ave_load: float, max_load: float): + def execute(self, known_peers: dict, phase, iteration : int, ave_load: float, max_load: float): """Execute transfer strategy on Phase instance :param known_peers: a dictionary of sets of known rank peers :param phase: a Phase instance diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index 7986bf87..a00097b6 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -199,6 +199,8 @@ def __init__(self, config_to_validate: dict, logger: Logger): str, lambda e: e in ALLOWED_TRANSFER_STRATEGIES, error=f"{get_error_message(ALLOWED_TRANSFER_STRATEGIES)} must be chosen"), + Optional("subclustering_disabled"): bool, + Optional("subclustering_after_iteration"): int, Optional("subclustering_threshold"): And( float, lambda x: x >= 0.0, diff --git a/src/lbaf/IO/lbsVTDataWriter.py b/src/lbaf/IO/lbsVTDataWriter.py index 25acbca9..741c7103 100644 --- a/src/lbaf/IO/lbsVTDataWriter.py +++ b/src/lbaf/IO/lbsVTDataWriter.py @@ -298,7 +298,7 @@ def _json_serializer(self, rank_phases_double) -> str: } # JSON can not handle nan so make this ratio -1 when it's not valid - homed_ratio = -1 + homed_ratio = -1.0 if not math.isnan(rank_info.get_homed_blocks_ratio()): homed_ratio = rank_info.get_homed_blocks_ratio() phase_data["user_defined"]["num_homed_ratio"] = homed_ratio @@ -325,9 +325,22 @@ def _json_serializer(self, rank_phases_double) -> str: if r_id != it_r.get_id(): continue + rank_qois = it_r.get_qois() + # Add task data for current rank in iteration iteration_data["tasks"] = self.__create_task_data(it_r) + iteration_data["user_defined"] = { + qoi_name: qoi_getter() for qoi_name, qoi_getter in rank_qois.items() + if qoi_name != "homed_blocks_ratio" # omit for now because it might be nan + } + + # JSON can not handle nan so make this ratio -1 when it's not valid + homed_ratio = -1.0 + if not math.isnan(it_r.get_homed_blocks_ratio()): + homed_ratio = it_r.get_homed_blocks_ratio() + iteration_data["user_defined"]["num_homed_ratio"] = homed_ratio + # Add communication data if present communications = self.__get_communications(it, it_r) if communications: diff --git a/src/lbaf/Model/lbsAffineCombinationWorkModel.py b/src/lbaf/Model/lbsAffineCombinationWorkModel.py index 303a7d6d..cc4bd978 100644 --- a/src/lbaf/Model/lbsAffineCombinationWorkModel.py +++ b/src/lbaf/Model/lbsAffineCombinationWorkModel.py @@ -94,10 +94,16 @@ def compute(self, rank: Rank): alpha * load + beta * max(sent, received) + gamma, under optional strict upper bounds. """ - # Check whether strict bounds are satisfied - for k, v in self.__upper_bounds.items(): - if getattr(rank, f"get_{k}")() > v: - return math.inf + if rank.node is None: + # Check whether strict bounds are satisfied + for k, v in self.__upper_bounds.items(): + if getattr(rank, f"get_{k}")() > v: + return math.inf + else: + #self.__logger.info(f"Apply node-level constraint: {rank.node.get_number_of_ranks()}") + for k, v in self.__upper_bounds.items(): + if getattr(rank.node, f"get_{k}")() > v * rank.node.get_number_of_ranks(): + return math.inf # Return combination of load and volumes return self.affine_combination( diff --git a/src/lbaf/Model/lbsRank.py b/src/lbaf/Model/lbsRank.py index ea8c5b83..0e561892 100644 --- a/src/lbaf/Model/lbsRank.py +++ b/src/lbaf/Model/lbsRank.py @@ -272,7 +272,9 @@ def remove_migratable_object(self, o: Object): @qoi def get_load(self) -> float: """Return total load on rank.""" - return sum(o.get_load() for o in self.__migratable_objects.union(self.__sentinel_objects)) + val : float = 0.0 + val += sum(o.get_load() for o in self.__migratable_objects.union(self.__sentinel_objects)) + return val @qoi def get_migratable_load(self) -> float: