From c52fc427b8c5a378fd1828c473b62b854a40befd Mon Sep 17 00:00:00 2001 From: Caleb Schilly <132086024+cwschilly@users.noreply.github.com> Date: Sat, 28 Sep 2024 10:49:48 -0400 Subject: [PATCH] #487: Introduce additional filters prior to subclustering (#517) * #487: add filters to subclustering based on load before and after * #487: fix failing test, set default thresholds to 0.0 * #487: make subclustering filters configurable * #487: fix pylint warnings * #487: fix formatting of max_load --- .../lbsClusteringTransferStrategy.py | 26 ++++++++++---- .../lbsInformAndTransferAlgorithm.py | 4 ++- .../Execution/lbsRecursiveTransferStrategy.py | 2 +- src/lbaf/Execution/lbsTransferStrategyBase.py | 5 +-- src/lbaf/IO/lbsConfigurationValidator.py | 8 +++++ .../test_lbs_clustering_transfer_strategy.py | 23 +++++++----- tests/unit/IO/test_configuration_validator.py | 22 ++++++++++++ .../conf_correct_subclustering_filters.yml | 35 +++++++++++++++++++ ...rong_subclustering_minimum_improvement.yml | 34 ++++++++++++++++++ .../conf_wrong_subclustering_threshold.yml | 34 ++++++++++++++++++ 10 files changed, 174 insertions(+), 19 deletions(-) create mode 100644 tests/unit/config/conf_correct_subclustering_filters.yml create mode 100644 tests/unit/config/conf_wrong_subclustering_minimum_improvement.yml create mode 100644 tests/unit/config/conf_wrong_subclustering_threshold.yml diff --git a/src/lbaf/Execution/lbsClusteringTransferStrategy.py b/src/lbaf/Execution/lbsClusteringTransferStrategy.py index 9195beca0..46bc2e78f 100644 --- a/src/lbaf/Execution/lbsClusteringTransferStrategy.py +++ b/src/lbaf/Execution/lbsClusteringTransferStrategy.py @@ -71,6 +71,17 @@ def __init__(self, criterion, parameters: dict, lgr: Logger): self._logger.info( f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}") + # Initialize percentage of maximum load required for subclustering + self.__subclustering_threshold = parameters.get("subclustering_threshold", 0.0) + self._logger.info( + f"Percentage of maximum load required for subclustering: {self.__subclustering_threshold}") + + # Initialize fraction of local imbalance that must be resolved by subcluster + self.__subclustering_minimum_improvement = parameters.get("subclustering_minimum_improvement", 0.0) + self._logger.info( + "Fraction of local imbalance that must be resolved by subcluster: " + f"{self.__subclustering_minimum_improvement}") + # Initialize cluster swap relative threshold self.__cluster_swap_rtol = parameters.get("cluster_swap_rtol", 0.05) self._logger.info( @@ -192,7 +203,7 @@ def __swap_clusters(self, phase: Phase, r_src: Rank, clusters_src:dict, targets: # Return number of swaps performed from rank n_rank_swaps = 0 - def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float) -> None: + def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None: """Perform feasible subcluster transfers from given rank to possible targets.""" # Iterate over source subclusters for o_src in self.__build_rank_subclusters(r_src): @@ -209,8 +220,12 @@ def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_lo for r_try in targets: c_try = self._criterion.compute( r_src, o_src, r_try) - if c_try <= 0.0: + + # Additional filters prior to subclustering + if c_try <= self.__subclustering_minimum_improvement * r_src.get_load() or \ + r_src.get_load() < self.__subclustering_threshold * max_load: continue + l_try = abs(r_try.get_load() + objects_load - ave_load) if l_try < l_dst: c_dst, r_dst, l_dst = c_try, r_try, l_try @@ -236,8 +251,7 @@ def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_lo # Reject subcluster transfer self._n_rejects += len(o_src) - - def execute(self, known_peers, phase: Phase, ave_load: float): + def execute(self, known_peers, phase: Phase, ave_load: float, max_load: float): """Perform object transfer stage.""" # Initialize transfer stage self._initialize_transfer_stage(ave_load) @@ -267,7 +281,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float): continue # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load) + self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) # Report on new load and exit from rank self._logger.debug( @@ -282,7 +296,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float): # Iterate over ranks for r_src, targets in rank_targets.items(): # Perform feasible subcluster swaps from given rank to possible targets - self.__transfer_subclusters(phase, r_src, targets, ave_load) + self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load) # Report on new load and exit from rank self._logger.debug( diff --git a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py index ea2a5494d..c8c207d8b 100644 --- a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py +++ b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py @@ -245,9 +245,11 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict # Start with information stage self.__execute_information_stage() + print(f"statistics: {statistics}") + # Execute transfer stage n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute( - self.__known_peers, self._rebalanced_phase, statistics["average load"]) + self.__known_peers, self._rebalanced_phase, statistics["average load"], statistics["maximum load"][-1]) if (n_proposed := n_transfers + n_rejects): self._logger.info( f"Transferred {n_transfers} objects amongst {n_proposed} proposed " diff --git a/src/lbaf/Execution/lbsRecursiveTransferStrategy.py b/src/lbaf/Execution/lbsRecursiveTransferStrategy.py index a01141571..9738cb98f 100644 --- a/src/lbaf/Execution/lbsRecursiveTransferStrategy.py +++ b/src/lbaf/Execution/lbsRecursiveTransferStrategy.py @@ -101,7 +101,7 @@ def __recursive_extended_search(self, pick_list, objects, c_fct, n_o, max_n_o): # Succeed when criterion is satisfied return True - def execute(self, known_peers, phase: Phase, ave_load: float): + def execute(self, known_peers, phase: Phase, ave_load: float, _): """Perform object transfer stage.""" # Initialize transfer stage self._initialize_transfer_stage(ave_load) diff --git a/src/lbaf/Execution/lbsTransferStrategyBase.py b/src/lbaf/Execution/lbsTransferStrategyBase.py index 7908ae4f0..d8978295c 100644 --- a/src/lbaf/Execution/lbsTransferStrategyBase.py +++ b/src/lbaf/Execution/lbsTransferStrategyBase.py @@ -197,10 +197,11 @@ def factory( raise SystemExit(1) from error @abc.abstractmethod - def execute(self, phase, known_peers: dict, ave_load: float): + def execute(self, known_peers: dict, phase, ave_load: float, max_load: float): """Execute transfer strategy on Phase instance - :param phase: a Phase instance :param known_peers: a dictionary of sets of known rank peers + :param phase: a Phase instance :param ave_load: average load in current phase. + :param max_load: maximum load across current phase. """ # Must be implemented by concrete subclass diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index 18fe4d3ab..34a388ded 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -198,6 +198,14 @@ def __init__(self, config_to_validate: dict, logger: Logger): str, lambda e: e in ALLOWED_TRANSFER_STRATEGIES, error=f"{get_error_message(ALLOWED_TRANSFER_STRATEGIES)} must be chosen"), + Optional("subclustering_threshold"): And( + float, + lambda x: x >= 0.0, + error="Should be of type 'float' and >= 0.0"), + Optional("subclustering_minimum_improvement"): And( + float, + lambda x: x >= 0.0, + error="Should be of type 'float' and >= 0.0"), Optional("cluster_swap_rtol"): And( float, lambda x: x > 0.0, diff --git a/tests/unit/Execution/test_lbs_clustering_transfer_strategy.py b/tests/unit/Execution/test_lbs_clustering_transfer_strategy.py index 08ce95c9f..b1398d7cd 100644 --- a/tests/unit/Execution/test_lbs_clustering_transfer_strategy.py +++ b/tests/unit/Execution/test_lbs_clustering_transfer_strategy.py @@ -139,7 +139,8 @@ def test_lbs_clustering_transfer_strategy_execute_cluster_swaps(self): self.assertEqual( self.clustering_transfer_strategy.execute(known_peers=self.known_peers, phase=self.phase, - ave_load=ave_load), + ave_load=ave_load, + max_load=2.5), (0,len(rank_list) - 1,0) ) @@ -233,17 +234,21 @@ def test_lbs_clustering_transfer_strategy_iterate_subclusters(self): # Test that non deterministic execute function runs assert isinstance( - clustering_transfer_strategy_non_det.execute(known_peers=known_peers, - phase=phase, - ave_load=ave_load), + clustering_transfer_strategy_non_det.execute( + known_peers=known_peers, + phase=phase, + ave_load=ave_load, + max_load=101), tuple) # Test that deterministic execute function is as expected - self.assertEqual( - clustering_transfer_strategy.execute(known_peers=known_peers, - phase=phase, - ave_load=ave_load)[2], - len(rank_list)) + self.assertLessEqual( + clustering_transfer_strategy.execute( + known_peers=known_peers, + phase=phase, + ave_load=ave_load, + max_load=101)[1], + 1) if __name__ == "__main__": diff --git a/tests/unit/IO/test_configuration_validator.py b/tests/unit/IO/test_configuration_validator.py index f76b0a117..d32dad16b 100644 --- a/tests/unit/IO/test_configuration_validator.py +++ b/tests/unit/IO/test_configuration_validator.py @@ -305,5 +305,27 @@ def test_config_validator_wrong_separate_subclustering(self): ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() self.assertEqual(err.exception.args[0], "Key 'parameters' error:\nKey 'separate_subclustering' error:\n'incorrect' should be instance of 'bool'") + def test_config_validator_correct_subclustering_filters(self): + with open(os.path.join(self.config_dir, "conf_correct_subclustering_filters.yml"), "rt", encoding="utf-8") as config_file: + yaml_str = config_file.read() + configuration = yaml.safe_load(yaml_str) + ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() + + def test_config_validator_wrong_subclustering_minimum_improvement(self): + with open(os.path.join(self.config_dir, "conf_wrong_subclustering_minimum_improvement.yml"), "rt", encoding="utf-8") as config_file: + yaml_str = config_file.read() + configuration = yaml.safe_load(yaml_str) + with self.assertRaises(SchemaError) as err: + ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() + self.assertEqual(err.exception.args[0], "Should be of type 'float' and >= 0.0") + + def test_config_validator_wrong_subclustering_threshold(self): + with open(os.path.join(self.config_dir, "conf_wrong_subclustering_threshold.yml"), "rt", encoding="utf-8") as config_file: + yaml_str = config_file.read() + configuration = yaml.safe_load(yaml_str) + with self.assertRaises(SchemaError) as err: + ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() + self.assertEqual(err.exception.args[0], "Should be of type 'float' and >= 0.0") + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/config/conf_correct_subclustering_filters.yml b/tests/unit/config/conf_correct_subclustering_filters.yml new file mode 100644 index 000000000..6321e0262 --- /dev/null +++ b/tests/unit/config/conf_correct_subclustering_filters.yml @@ -0,0 +1,35 @@ +# Specify input +from_data: + data_stem: ../data/synthetic_lb_data/data + phase_ids: + - 0 +check_schema: false + +# Specify work model +work_model: + name: AffineCombination + parameters: + alpha: 1.0 + beta: 0.0 + gamma: 0.0 + +# Specify algorithm +algorithm: + name: InformAndTransfer + phase_id: 0 + parameters: + n_iterations: 4 + n_rounds: 2 + fanout: 2 + order_strategy: arbitrary + transfer_strategy: Clustering + max_subclusters: 10 + subclustering_threshold: 0.2 + subclustering_minimum_improvement: 0.3 + criterion: Tempered + max_objects_per_transfer: 32 + deterministic_transfer: true + +# Specify output +output_dir: ../output +output_file_stem: output_file diff --git a/tests/unit/config/conf_wrong_subclustering_minimum_improvement.yml b/tests/unit/config/conf_wrong_subclustering_minimum_improvement.yml new file mode 100644 index 000000000..cb1b6aac6 --- /dev/null +++ b/tests/unit/config/conf_wrong_subclustering_minimum_improvement.yml @@ -0,0 +1,34 @@ +# Specify input +from_data: + data_stem: ../data/synthetic_lb_data/data + phase_ids: + - 0 +check_schema: false + +# Specify work model +work_model: + name: AffineCombination + parameters: + alpha: 1.0 + beta: 0.0 + gamma: 0.0 + +# Specify algorithm +algorithm: + name: InformAndTransfer + phase_id: 0 + parameters: + n_iterations: 4 + n_rounds: 2 + fanout: 2 + order_strategy: arbitrary + transfer_strategy: Clustering + max_subclusters: 10 + subclustering_minimum_improvement: -1 + criterion: Tempered + max_objects_per_transfer: 32 + deterministic_transfer: true + +# Specify output +output_dir: ../output +output_file_stem: output_file diff --git a/tests/unit/config/conf_wrong_subclustering_threshold.yml b/tests/unit/config/conf_wrong_subclustering_threshold.yml new file mode 100644 index 000000000..9b78cd9ea --- /dev/null +++ b/tests/unit/config/conf_wrong_subclustering_threshold.yml @@ -0,0 +1,34 @@ +# Specify input +from_data: + data_stem: ../data/synthetic_lb_data/data + phase_ids: + - 0 +check_schema: false + +# Specify work model +work_model: + name: AffineCombination + parameters: + alpha: 1.0 + beta: 0.0 + gamma: 0.0 + +# Specify algorithm +algorithm: + name: InformAndTransfer + phase_id: 0 + parameters: + n_iterations: 4 + n_rounds: 2 + fanout: 2 + order_strategy: arbitrary + transfer_strategy: Clustering + max_subclusters: 10 + subclustering_threshold: -1 + criterion: Tempered + max_objects_per_transfer: 32 + deterministic_transfer: true + +# Specify output +output_dir: ../output +output_file_stem: output_file