diff --git a/src/lbaf/Execution/lbsClusteringTransferStrategy.py b/src/lbaf/Execution/lbsClusteringTransferStrategy.py index e4f631a3..be26d300 100644 --- a/src/lbaf/Execution/lbsClusteringTransferStrategy.py +++ b/src/lbaf/Execution/lbsClusteringTransferStrategy.py @@ -50,6 +50,7 @@ from .lbsTransferStrategyBase import TransferStrategyBase from ..Model.lbsRank import Rank +from ..Model.lbsObject import Object from ..Model.lbsPhase import Phase @@ -100,7 +101,7 @@ def __init__(self, criterion, parameters: dict, lgr: Logger): self.__n_sub_skipped, self.__n_sub_transfers, self.__n_sub_tries = 0, 0, 0 def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict: - """Cluster migratiable objects by shared block ID when available.""" + """Cluster migratable objects by shared block ID when available.""" # Iterate over all migratable objects on rank clusters = {None: []} if with_nullset else {} for o in rank.get_migratable_objects(): @@ -117,6 +118,104 @@ def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict: k: clusters[k] for k in random.sample(list(clusters.keys()), len(clusters))} + def __build_rank_subclusters_heuristic(self, r_src : Rank, r_try : Rank, ave_load : float) -> set: + """Build a limited set of subclusters using a heuristic to find possible + transfers without a huge amount of computational complexity.""" + + # Bail out early if no clusters are available + if not (clusters := self.__build_rank_clusters(r_src, False)): + self._logger.info(f"No migratable clusters on rank {r_src.get_id()}") + return [] + + src_load = r_src.get_load() + try_load = r_try.get_load() + + subclusters = {} + + amount_over_average = src_load - ave_load; + amount_under_average = ave_load - try_load; + + # print(f"amount_over_average={amount_over_average}, amount_under_average={amount_under_average}") + + clusters_to_subcluster = {} + + for k in clusters.keys(): + v = clusters[k] + cluster_load : float = 0.0 + for o in v: + cluster_load += o.get_load() + #print(f"k={k}: cluster_load={cluster_load}") + if cluster_load < amount_over_average or cluster_load < amount_under_average: + pass + else: + clusters_to_subcluster[k] = v + + def comparison(elm : Object): + return elm.get_load() + + # print(f"clusters_to_subcluster={clusters_to_subcluster}") + + for k in clusters_to_subcluster.keys(): + v = clusters_to_subcluster[k] + # print(f"k={k}") + + v.sort(key=comparison) + # print(f"sorted v={v}") + + load_sum : float = 0.0 + cluster_set : list = [] + for o in v: + load_sum += o.get_load() + cluster_set.append(o) + + if load_sum > amount_under_average: + break + + subclusters[tuple(cluster_set)] = load_sum + + return sorted(subclusters.keys(), key=subclusters.get) + + def __transfer_subclusters_heuristic(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None: + """Perform feasible subcluster transfers from given rank to possible targets.""" + + # Only do this if this rank is above the mean + if not r_src.get_load() > ave_load: + return + + for r_try in targets: + # Only consider transferring here if this rank is under the mean load + if r_try.get_load() < ave_load: + for o_src in self.__build_rank_subclusters_heuristic(r_src, r_try, ave_load): + c_try = self._criterion.compute(r_src, o_src, r_try) + #print(f"o_src={o_src}, c_try={c_try}") + + objects_load = sum(o.get_load() for o in o_src) + l_dst = math.inf + + # Additional filters prior to subclustering + if c_try <= self.__subclustering_minimum_improvement * r_src.get_load() or \ + r_src.get_load() < self.__subclustering_threshold * max_load: + continue + + l_try = abs(r_try.get_load() + objects_load - ave_load) + if l_try < l_dst: + c_dst, r_dst, l_dst = c_try, r_try, l_try + elif l_try == l_dst and c_try > c_dst: + c_dst, r_dst = c_try, r_try + + # Decide whether transfer is beneficial + self.__n_sub_tries += 1 + if c_dst > 0.0: + # Transfer subcluster and break out + self._logger.info( + f"Transferring subcluster with {len(o_src)} objects to rank {r_dst.get_id()}") + self._n_transfers += phase.transfer_objects( + r_src, o_src, r_dst) + self.__n_sub_transfers += 1 + break + # Reject subcluster transfer + self._n_rejects += len(o_src) + def __build_rank_subclusters(self, r_src: Rank) -> set: """Build subclusters to bring rank closest and above average load.""" @@ -146,11 +245,11 @@ def __build_rank_subclusters(self, r_src: Rank) -> set: for p in range(1, n_o + 1)) if self._deterministic_transfer else ( tuple(random.sample(v, p)) for p in nr.binomial(n_o, 0.5, min(n_o, self.__max_subclusters)))): + # Reject subclusters overshooting within relative tolerance reach_load = src_load - sum(o.get_load() for o in c) if reach_load < (1.0 - self.__cluster_swap_rtol) * self._average_load: continue - # Retain subclusters with their respective distance and cluster subclusters[c] = reach_load @@ -282,7 +381,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): if not self.__disable_subclustering: # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + self.__transfer_subclusters_heuristic(phase, r_src, targets, ave_load, max_load) # Report on new load and exit from rank self._logger.debug( @@ -297,7 +396,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): # Iterate over ranks for r_src, targets in rank_targets.items(): # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) + self.__transfer_subclusters_heuristic(phase, r_src, targets, ave_load, max_load) # Report on new load and exit from rank self._logger.debug(