Skip to content

Commit

Permalink
#575: qois: fix rank qois
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Dec 24, 2024
1 parent 23fff49 commit bed2d89
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 16 deletions.
2 changes: 2 additions & 0 deletions config/challenging-toy-fewer-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ from_data:
data_stem: ../data/challenging_toy_fewer_tasks/toy
phase_ids:
- 0
ranks_per_node: 2
check_schema: true
overwrite_validator: false

Expand All @@ -26,6 +27,7 @@ algorithm:
fanout: 4
order_strategy: arbitrary
transfer_strategy: Clustering
subclustering_disabled: true
criterion: Tempered
max_objects_per_transfer: 100
deterministic_transfer: true
Expand Down
1 change: 1 addition & 0 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def factory(
return algorithm(work_model, parameters, logger)
except Exception as e:
# Otherwise, error out
logger.error(e)
logger.error(f"Could not create an algorithm with name {algorithm_name}")
raise SystemExit(1) from e

Expand Down
299 changes: 294 additions & 5 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,25 @@
import time
from itertools import chain, combinations
from logging import Logger
from functools import cmp_to_key
from anytree import Node, RenderTree, NodeMixin, LevelOrderIter

import numpy.random as nr

from .lbsTransferStrategyBase import TransferStrategyBase
from ..Model.lbsRank import Rank
from ..Model.lbsPhase import Phase

class Subcluster(NodeMixin):
def __init__(self, index : list, parent=None, children=None):
#super(Subcluster, self).__init__()
self.parent = parent
if children:
self.children = children
self.load = 0.0
self.index = index
self.leaf_object = None
self.num_objects = 0

class ClusteringTransferStrategy(TransferStrategyBase):
"""A concrete class for the clustering-based transfer strategy."""
Expand All @@ -68,9 +80,25 @@ def __init__(self, criterion, parameters: dict, lgr: Logger):

# Determine whether subclustering is performed immediately after swapping
self.__separate_subclustering = parameters.get("separate_subclustering", False)

self._logger.info(
f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}")

self.__disable_subclustering = parameters.get("subclustering_disabled", False)
self.__after_iteration_subclustering = parameters.get("subclustering_after_iteration", 0)

if self.__disable_subclustering:
self._logger.info("Subclustering is disabled")
else:
self._logger.info("Subclustering is enabled")

self._logger.info(
f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}")

if self.__after_iteration_subclustering > 0:
self._logger.info(
f"Delay subclustering until after iteration: {self.__after_iteration_subclustering}")

# Initialize percentage of maximum load required for subclustering
self.__subclustering_threshold = parameters.get("subclustering_threshold", 0.0)
self._logger.info(
Expand Down Expand Up @@ -116,6 +144,256 @@ def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict:
k: clusters[k]
for k in random.sample(list(clusters.keys()), len(clusters))}

def __build_rank_subclusters_heuristic(self, r_src : Rank, r_try : Rank, ave_load : float) -> set:
"""Build a limited set of subclusters using a heuristic to find possible
transfers without a huge amount of computational complexity."""

# Bail out early if no clusters are available
if not (clusters := self.__build_rank_clusters(r_src, False)):
self._logger.info(f"No migratable clusters on rank {r_src.get_id()}")
return []

src_load = r_src.get_load()
try_load = r_try.get_load()

subclusters = {}

amount_over_average = src_load - ave_load;
amount_under_average = ave_load - try_load;

# print(f"amount_over_average={amount_over_average}, amount_under_average={amount_under_average}")

clusters_to_subcluster = {}

for k in clusters.keys():
v = clusters[k]
cluster_load : float = 0.0
for o in v:
cluster_load += o.get_load()
#print(f"k={k}: cluster_load={cluster_load}")
if cluster_load < amount_over_average or cluster_load < amount_under_average:
pass
else:
clusters_to_subcluster[k] = v

# bool operator() (const vt::Index5D &idx1, const vt::Index5D &idx2) const {
# // return true if idx1 should run before idx2
# // always group tasks from the same subblock because they share memory
# // sort tasks within a subblock in the order of their .z(), .u(), and .v() in that order

# // preferably run non-homed work first
# bool idx1_homed = idx1.x() == this_rank_;
# bool idx2_homed = idx2.x() == this_rank_;

# if (idx1_homed xor idx2_homed) {
# if (idx1_homed) {
# // idx1 is homed but idx2 is not
# return false;
# } else {
# // idx2 is homed but idx1 is not
# return true;
# }
# } else if (idx1.x() == idx2.x()) {
# if (idx1.y() == idx2.y()) {
# if (idx1.z() == idx2.z()) {
# if (idx1.u() == idx2.u()) {
# // sort ascending by topo idx within a row idx
# return idx1.v() < idx2.v();
# } else {
# // sort ascending by row idx within a col idx
# return idx1.u() < idx2.u();
# }
# } else {
# // sort ascending by col idx within a subblock
# return idx1.z() < idx2.z();
# }
# } else {
# // sort ascending by subblock within a rank
# return idx1.y() < idx2.y();
# }
# } else {
# // sort ascending by rank
# return idx1.x() < idx2.x();
# }
# }

def object_ordering(o1 : Object, o2 : Object):
return index_ordering(o1.get_index(), o2.get_index())

def subcluster_ordering(s1 : Subcluster, s2 : Subcluster):
return index_ordering(s1.index, s2.index)

def cmp_0(a, b):
return bool(a > b) - bool(a < b)

def index_ordering(idx1 : list, idx2 : list):
if idx1[0] == idx2[0]:
if idx1[1] == idx2[1]:
if idx1[2] == idx2[2]:
if idx1[3] == idx2[3]:
return cmp_0(idx1[4], idx2[4])
else:
return cmp_0(idx1[3], idx2[3])
else:
return cmp_0(idx1[2], idx2[2])
else:
return cmp_0(idx1[1], idx2[1])
else:
return cmp_0(idx1[0], idx2[0])

def comparison(o : Object):
return o.get_load()

root = Subcluster([-1,-1,-1,-1,-1])

def insertIntoTree(o : Object, node : Subcluster, level_array : list, cur_level : int = 0):
found_child = False
node.load += o.get_load()
node.num_objects += 1
if cur_level == len(level_array):
node.leaf_object = o
return
for child in node.children:
if child.index[cur_level] == o.get_index()[cur_level]:
found_child = True
level_array[cur_level] = o.get_index()[cur_level]
return insertIntoTree(o, child, level_array, cur_level + 1)
if not found_child:
level_array[cur_level] = o.get_index()[cur_level]
#print(f"inserting node: {level_array.copy()}, level={cur_level}")
new_node = Subcluster(level_array.copy(), parent=node)
return insertIntoTree(o, new_node, level_array, cur_level + 1)

def eliminateSingleBranches(parent : Subcluster, node : Subcluster):
if node.num_objects == 1 and len(node.children) > 0:
#print("DOING THIS\n")
for n in node.children:
n.parent = parent
node.parent = None
elif len(node.children) >= 1:
for n in node.children:
eliminateSingleBranches(node, n)

def makeBinaryTree(parent : Subcluster, node : Subcluster):
if len(node.children) > 2:
unsorted_children = list(node.children)
children = sorted(unsorted_children, key=cmp_to_key(subcluster_ordering))
len1 = int(len(children) / 2)
b1 = children[0:len1]
b2 = children[len1:len(children)]
b1_node = Subcluster(parent.index, parent=parent)
b2_node = Subcluster(parent.index, parent=parent)
for n in b1:
b1_node.load += n.load
n.parent = b1_node
for n in b2:
b2_node.load += n.load
n.parent = b2_node
node.parent = None

# recurse down the tree
for n in parent.children:
makeBinaryTree(parent, n)
else:
for n in node.children:
makeBinaryTree(node, n)

def gatherObjects(node : Subcluster) -> list:
list = []
if node.leaf_object is not None:
list.append(node.leaf_object)
for child in node.children:
list += gatherObjects(child)
return list

for k in clusters_to_subcluster.keys():
v = clusters_to_subcluster[k]
for o in v:
insertIntoTree(o, root, [-1,-1,-1,-1,-1])

for c in root.children:
eliminateSingleBranches(root, c)

# print(f"NEW tree: {r_src.get_id()}")
# print(RenderTree(root).by_attr(lambda n: str(n.index) + ": " + "size=" + str(n.num_objects) + " load=" + str(round(n.load))))

for c in root.children:
makeBinaryTree(root, c)

# print(f"NEW tree AFTER: {r_src.get_id()}")
# print(RenderTree(root).by_attr(lambda n: str(n.index) + ": " + "size=" + str(n.num_objects) + " load=" + str(round(n.load))))
# exit(1)

for node in LevelOrderIter(root):
objects = gatherObjects(node)
if len(objects) > 1:
#print(f"load={node.load}, len={len(objects)}")
subclusters[tuple(objects)] = node.load

#print(f"clusters_to_subcluster={clusters_to_subcluster}")

# for k in clusters_to_subcluster.keys():
# v = clusters_to_subcluster[k]
# # print(f"k={k}")

# #sorted(v, key=cmp_to_key(object_ordering))
# v.sort(key=comparison)
# # print(f"sorted v={v}")

# load_sum : float = 0.0
# cluster_set : list = []
# for o in v:
# load_sum += o.get_load()
# cluster_set.append(o)

# if load_sum > amount_under_average:
# break

# subclusters[tuple(cluster_set)] = load_sum

return sorted(subclusters.keys(), key=subclusters.get)

def __transfer_subclusters_heuristic(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None:
"""Perform feasible subcluster transfers from given rank to possible targets."""

# Only do this if this rank is close to the max load
if not r_src.get_load() == max_load:
return

for r_try in targets:
# Only consider transferring here if this rank is under the mean load
if r_try.get_load() < ave_load:
for o_src in self.__build_rank_subclusters_heuristic(r_src, r_try, ave_load):
c_try = self._criterion.compute(r_src, o_src, r_try)
#print(f"o_src={o_src}, c_try={c_try}")

objects_load = sum(o.get_load() for o in o_src)
l_dst = math.inf

# Additional filters prior to subclustering
if c_try <= self.__subclustering_minimum_improvement * r_src.get_load() or \
r_src.get_load() < self.__subclustering_threshold * max_load:
continue

l_try = abs(r_try.get_load() + objects_load - ave_load)
if l_try < l_dst:
c_dst, r_dst, l_dst = c_try, r_try, l_try
elif l_try == l_dst and c_try > c_dst:
c_dst, r_dst = c_try, r_try

# Decide whether transfer is beneficial
self.__n_sub_tries += 1
if c_dst > 0.0:
# Transfer subcluster and break out
self._logger.info(
f"Transferring subcluster from {r_src.get_id()} with {len(o_src)} objects to rank {r_dst.get_id()} of load {objects_load}")
self._n_transfers += phase.transfer_objects(
r_src, o_src, r_dst)
self.__n_sub_transfers += 1
break
# Reject subcluster transfer
self._n_rejects += len(o_src)

def __build_rank_subclusters(self, r_src: Rank) -> set:
"""Build subclusters to bring rank closest and above average load."""

Expand Down Expand Up @@ -250,12 +528,19 @@ def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_lo
# Reject subcluster transfer
self._n_rejects += len(o_src)

def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):
def execute(self, known_peers, phase: Phase, iteration : int, ave_load: float, max_load: float):
"""Perform object transfer stage."""

# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
self._initialize_transfer_stage(ave_load, max_load)
rank_targets = self._get_ranks_to_traverse(phase.get_ranks(), known_peers)

unsorted_loads = map(lambda x: x.get_load(), phase.get_ranks())
loads = list(unsorted_loads)
loads.sort()

print(f"loads: {','.join(str(round(l)) for l in loads)}")

# Iterate over ranks
for r_src, targets in rank_targets.items():
# Cluster migratable objects on source rank
Expand All @@ -280,22 +565,26 @@ def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):
continue

# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)
if not self.__disable_subclustering and iteration > self.__after_iteration_subclustering:
# Perform feasible subcluster swaps from given rank to possible targets
targets_prime = filter(lambda t : t.get_load() < ave_load, targets)
self.__transfer_subclusters_heuristic(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")

# Perform subclustering when it was not previously done
if self.__separate_subclustering:
if self.__separate_subclustering and not self.__disable_subclustering and iteration > self.__after_iteration_subclustering:
# In non-deterministic case skip subclustering when swaps passed
if self.__n_swaps and not self._deterministic_transfer:
self.__n_sub_skipped += len(rank_targets)
else:
# Iterate over ranks
for r_src, targets in rank_targets.items():
# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)
targets_prime = filter(lambda t : t.get_load() < ave_load, targets)
self.__transfer_subclusters_heuristic(phase, r_src, targets_prime, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
Expand Down
1 change: 1 addition & 0 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def execute(self, p_id: int, phases: list, statistics: dict, a_min_max):
n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute(
self.__known_peers, self._rebalanced_phase, statistics[
"average load"], statistics["maximum load"][-1])

if (n_proposed := n_transfers + n_rejects):
self._logger.info(
f"Transferred {n_transfers} objects amongst {n_proposed} proposed "
Expand Down
Loading

0 comments on commit bed2d89

Please sign in to comment.