Skip to content

Commit

Permalink
#487: Introduce additional filters prior to subclustering (#517)
Browse files Browse the repository at this point in the history
* #487: add filters to subclustering based on load before and after

* #487: fix failing test, set default thresholds to 0.0

* #487: make subclustering filters configurable

* #487: fix pylint warnings

* #487: fix formatting of max_load
  • Loading branch information
cwschilly authored Sep 28, 2024
1 parent 6f285d5 commit c52fc42
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 19 deletions.
26 changes: 20 additions & 6 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ def __init__(self, criterion, parameters: dict, lgr: Logger):
self._logger.info(
f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}")

# Initialize percentage of maximum load required for subclustering
self.__subclustering_threshold = parameters.get("subclustering_threshold", 0.0)
self._logger.info(
f"Percentage of maximum load required for subclustering: {self.__subclustering_threshold}")

# Initialize fraction of local imbalance that must be resolved by subcluster
self.__subclustering_minimum_improvement = parameters.get("subclustering_minimum_improvement", 0.0)
self._logger.info(
"Fraction of local imbalance that must be resolved by subcluster: "
f"{self.__subclustering_minimum_improvement}")

# Initialize cluster swap relative threshold
self.__cluster_swap_rtol = parameters.get("cluster_swap_rtol", 0.05)
self._logger.info(
Expand Down Expand Up @@ -192,7 +203,7 @@ def __swap_clusters(self, phase: Phase, r_src: Rank, clusters_src:dict, targets:
# Return number of swaps performed from rank
n_rank_swaps = 0

def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float) -> None:
def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None:
"""Perform feasible subcluster transfers from given rank to possible targets."""
# Iterate over source subclusters
for o_src in self.__build_rank_subclusters(r_src):
Expand All @@ -209,8 +220,12 @@ def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_lo
for r_try in targets:
c_try = self._criterion.compute(
r_src, o_src, r_try)
if c_try <= 0.0:

# Additional filters prior to subclustering
if c_try <= self.__subclustering_minimum_improvement * r_src.get_load() or \
r_src.get_load() < self.__subclustering_threshold * max_load:
continue

l_try = abs(r_try.get_load() + objects_load - ave_load)
if l_try < l_dst:
c_dst, r_dst, l_dst = c_try, r_try, l_try
Expand All @@ -236,8 +251,7 @@ def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_lo
# Reject subcluster transfer
self._n_rejects += len(o_src)


def execute(self, known_peers, phase: Phase, ave_load: float):
def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float):
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
Expand Down Expand Up @@ -267,7 +281,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float):
continue

# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load)
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
Expand All @@ -282,7 +296,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float):
# Iterate over ranks
for r_src, targets in rank_targets.items():
# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load)
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
Expand Down
4 changes: 3 additions & 1 deletion src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,11 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
# Start with information stage
self.__execute_information_stage()

print(f"statistics: {statistics}")

# Execute transfer stage
n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute(
self.__known_peers, self._rebalanced_phase, statistics["average load"])
self.__known_peers, self._rebalanced_phase, statistics["average load"], statistics["maximum load"][-1])
if (n_proposed := n_transfers + n_rejects):
self._logger.info(
f"Transferred {n_transfers} objects amongst {n_proposed} proposed "
Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsRecursiveTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __recursive_extended_search(self, pick_list, objects, c_fct, n_o, max_n_o):
# Succeed when criterion is satisfied
return True

def execute(self, known_peers, phase: Phase, ave_load: float):
def execute(self, known_peers, phase: Phase, ave_load: float, _):
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
Expand Down
5 changes: 3 additions & 2 deletions src/lbaf/Execution/lbsTransferStrategyBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ def factory(
raise SystemExit(1) from error

@abc.abstractmethod
def execute(self, phase, known_peers: dict, ave_load: float):
def execute(self, known_peers: dict, phase, ave_load: float, max_load: float):
"""Execute transfer strategy on Phase instance
:param phase: a Phase instance
:param known_peers: a dictionary of sets of known rank peers
:param phase: a Phase instance
:param ave_load: average load in current phase.
:param max_load: maximum load across current phase.
"""
# Must be implemented by concrete subclass
8 changes: 8 additions & 0 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ def __init__(self, config_to_validate: dict, logger: Logger):
str,
lambda e: e in ALLOWED_TRANSFER_STRATEGIES,
error=f"{get_error_message(ALLOWED_TRANSFER_STRATEGIES)} must be chosen"),
Optional("subclustering_threshold"): And(
float,
lambda x: x >= 0.0,
error="Should be of type 'float' and >= 0.0"),
Optional("subclustering_minimum_improvement"): And(
float,
lambda x: x >= 0.0,
error="Should be of type 'float' and >= 0.0"),
Optional("cluster_swap_rtol"): And(
float,
lambda x: x > 0.0,
Expand Down
23 changes: 14 additions & 9 deletions tests/unit/Execution/test_lbs_clustering_transfer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ def test_lbs_clustering_transfer_strategy_execute_cluster_swaps(self):
self.assertEqual(
self.clustering_transfer_strategy.execute(known_peers=self.known_peers,
phase=self.phase,
ave_load=ave_load),
ave_load=ave_load,
max_load=2.5),
(0,len(rank_list) - 1,0)
)

Expand Down Expand Up @@ -233,17 +234,21 @@ def test_lbs_clustering_transfer_strategy_iterate_subclusters(self):

# Test that non deterministic execute function runs
assert isinstance(
clustering_transfer_strategy_non_det.execute(known_peers=known_peers,
phase=phase,
ave_load=ave_load),
clustering_transfer_strategy_non_det.execute(
known_peers=known_peers,
phase=phase,
ave_load=ave_load,
max_load=101),
tuple)

# Test that deterministic execute function is as expected
self.assertEqual(
clustering_transfer_strategy.execute(known_peers=known_peers,
phase=phase,
ave_load=ave_load)[2],
len(rank_list))
self.assertLessEqual(
clustering_transfer_strategy.execute(
known_peers=known_peers,
phase=phase,
ave_load=ave_load,
max_load=101)[1],
1)


if __name__ == "__main__":
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/IO/test_configuration_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,5 +305,27 @@ def test_config_validator_wrong_separate_subclustering(self):
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Key 'parameters' error:\nKey 'separate_subclustering' error:\n'incorrect' should be instance of 'bool'")

def test_config_validator_correct_subclustering_filters(self):
with open(os.path.join(self.config_dir, "conf_correct_subclustering_filters.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()

def test_config_validator_wrong_subclustering_minimum_improvement(self):
with open(os.path.join(self.config_dir, "conf_wrong_subclustering_minimum_improvement.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
with self.assertRaises(SchemaError) as err:
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'float' and >= 0.0")

def test_config_validator_wrong_subclustering_threshold(self):
with open(os.path.join(self.config_dir, "conf_wrong_subclustering_threshold.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
with self.assertRaises(SchemaError) as err:
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'float' and >= 0.0")

if __name__ == "__main__":
unittest.main()
35 changes: 35 additions & 0 deletions tests/unit/config/conf_correct_subclustering_filters.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Specify input
from_data:
data_stem: ../data/synthetic_lb_data/data
phase_ids:
- 0
check_schema: false

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10
subclustering_threshold: 0.2
subclustering_minimum_improvement: 0.3
criterion: Tempered
max_objects_per_transfer: 32
deterministic_transfer: true

# Specify output
output_dir: ../output
output_file_stem: output_file
34 changes: 34 additions & 0 deletions tests/unit/config/conf_wrong_subclustering_minimum_improvement.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Specify input
from_data:
data_stem: ../data/synthetic_lb_data/data
phase_ids:
- 0
check_schema: false

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10
subclustering_minimum_improvement: -1
criterion: Tempered
max_objects_per_transfer: 32
deterministic_transfer: true

# Specify output
output_dir: ../output
output_file_stem: output_file
34 changes: 34 additions & 0 deletions tests/unit/config/conf_wrong_subclustering_threshold.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Specify input
from_data:
data_stem: ../data/synthetic_lb_data/data
phase_ids:
- 0
check_schema: false

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10
subclustering_threshold: -1
criterion: Tempered
max_objects_per_transfer: 32
deterministic_transfer: true

# Specify output
output_dir: ../output
output_file_stem: output_file

0 comments on commit c52fc42

Please sign in to comment.