diff --git a/config/challenging-toy-hundreds-tasks.yaml b/config/challenging-toy-hundreds-tasks.yaml index 15d2cbed5..2c65c9e3c 100644 --- a/config/challenging-toy-hundreds-tasks.yaml +++ b/config/challenging-toy-hundreds-tasks.yaml @@ -26,6 +26,8 @@ algorithm: fanout: 4 order_strategy: arbitrary transfer_strategy: Clustering + max_subclusters: 10 + cluster_swap_rtol: 0.05 criterion: Tempered max_objects_per_transfer: 500 deterministic_transfer: false diff --git a/src/lbaf/Execution/lbsAlgorithmBase.py b/src/lbaf/Execution/lbsAlgorithmBase.py index 70a4ee2d3..4caec2d4c 100644 --- a/src/lbaf/Execution/lbsAlgorithmBase.py +++ b/src/lbaf/Execution/lbsAlgorithmBase.py @@ -1,7 +1,6 @@ import abc import os -from ..import PROJECT_PATH from ..IO.lbsStatistics import compute_function_statistics from ..Model.lbsRank import Rank from ..Model.lbsPhase import Phase @@ -95,8 +94,6 @@ def factory( # pylint:enable=W0641:possibly-unused-variable,C0415:import-outside-toplevel # Ensure that algorithm name is valid - algorithm = locals()[algorithm_name + "Algorithm"] - return algorithm(work_model, parameters, logger, rank_qoi, object_qoi) try: # Instantiate and return object algorithm = locals()[algorithm_name + "Algorithm"] diff --git a/src/lbaf/Execution/lbsClusteringTransferStrategy.py b/src/lbaf/Execution/lbsClusteringTransferStrategy.py index 989205571..a9c7e18d3 100644 --- a/src/lbaf/Execution/lbsClusteringTransferStrategy.py +++ b/src/lbaf/Execution/lbsClusteringTransferStrategy.py @@ -23,6 +23,11 @@ def __init__(self, criterion, parameters: dict, lgr: Logger): # Call superclass init super(ClusteringTransferStrategy, self).__init__(criterion, parameters, lgr) + # Initialize maximum number of subclusters + self._max_subclusters = parameters.get("max_subclusters", math.inf) + self._logger.info( + f"Maximum number of visited subclusters: {self._max_subclusters}") + # Initialize cluster swap relative threshold self._cluster_swap_rtol = parameters.get("cluster_swap_rtol",0.05) self._logger.info( @@ -71,7 +76,7 @@ def __build_rank_subclusters(self, clusters, rank_load): combinations(v, p) for p in range(1, n_o + 1)) if self._deterministic_transfer else ( tuple(random.sample(v, p)) - for p in nr.binomial(n_o, 0.5, min(n_o, 10)))): + for p in nr.binomial(n_o, 0.5, min(n_o, self._max_subclusters)))): # Reject subclusters overshooting within relative tolerance reach_load = rank_load - sum([o.get_load() for o in c]) if reach_load < (1.0 - self._cluster_swap_rtol) * self._average_load: @@ -92,13 +97,14 @@ def execute(self, known_peers, phase: Phase, ave_load: float): """Perform object transfer stage.""" # Initialize transfer stage self._initialize_transfer_stage(ave_load) - n_swaps, n_swap_tries, n_sub_transfers, n_sub_tries = 0, 0, 0, 0 + n_swaps, n_swap_tries = 0, 0 + n_sub_skipped, n_sub_transfers, n_sub_tries = 0, 0, 0 # Iterate over ranks ranks = phase.get_ranks() rank_targets = self._get_ranks_to_traverse(ranks, known_peers) for r_src, targets in rank_targets.items(): - # Cluster migratiable objects on source rank + # Cluster migratable objects on source rank clusters_src = self.__build_rank_clusters(r_src, True) self._logger.debug( f"Constructed {len(clusters_src)} migratable clusters on source rank {r_src.get_id()}") @@ -116,7 +122,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float): self._logger.debug( f"Constructed {len(clusters_try)} migratable clusters on target rank {r_try.get_id()}") - # Iterate over potential targets to try to swap clusters + # Iterate over source clusters for k_src, o_src in clusters_src.items(): # Iterate over target clusters for k_try, o_try in clusters_try.items(): @@ -147,6 +153,7 @@ def execute(self, known_peers, phase: Phase, ave_load: float): # In non-deterministic case skip subclustering when swaps passed if not self._deterministic_transfer: + n_sub_skipped += 1 continue # Iterate over subclusters only when no swaps were possible @@ -202,6 +209,9 @@ def execute(self, known_peers, phase: Phase, ave_load: float): if n_sub_tries: self._logger.info( f"Transferred {n_sub_transfers} subcluster amongst {n_sub_tries} tries ({100 * n_sub_transfers / n_sub_tries:.2f}%)") + if n_sub_skipped: + self._logger.info( + f"Skipped subclustering for {n_sub_skipped} ranks ({100 * n_sub_skipped / len(ranks):.2f}%)") # Return object transfer counts return len(ranks) - len(rank_targets), self._n_transfers, self._n_rejects diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index c512e35e9..f93d7032d 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -159,7 +159,11 @@ def __init__(self, config_to_validate: dict, logger: Logger): Optional("cluster_swap_rtol"): And( float, lambda x: x > 0.0, - error="Should be of type 'float' and magnitude > 0.0"), + error="Should be of type 'float' and > 0.0"), + Optional("max_subclusters"): And( + int, + lambda x: x > 0.0, + error="Should be of type 'int' and > 0"), "criterion": And( str, lambda f: f in ALLOWED_CRITERIA, diff --git a/tests/unit/config/conf_correct_clustering_set_tol.yml b/tests/unit/config/conf_correct_clustering_set_tol.yml index 27779afab..cda2975c8 100644 --- a/tests/unit/config/conf_correct_clustering_set_tol.yml +++ b/tests/unit/config/conf_correct_clustering_set_tol.yml @@ -23,6 +23,7 @@ algorithm: fanout: 2 order_strategy: arbitrary transfer_strategy: Clustering + max_subclusters: 10 cluster_swap_rtol: 0.07 criterion: Tempered max_objects_per_transfer: 32 diff --git a/tests/unit/config/conf_wrong_max_subclusters_mag.yml b/tests/unit/config/conf_wrong_max_subclusters_mag.yml new file mode 100644 index 000000000..c7fad42fc --- /dev/null +++ b/tests/unit/config/conf_wrong_max_subclusters_mag.yml @@ -0,0 +1,44 @@ +# Specify input +from_data: + data_stem: ../data/synthetic_lb_data/data + phase_ids: + - 0 +check_schema: false + +# Specify work model +work_model: + name: AffineCombination + parameters: + alpha: 1.0 + beta: 0.0 + gamma: 0.0 + +# Specify algorithm +algorithm: + name: InformAndTransfer + phase_id: 0 + parameters: + n_iterations: 4 + n_rounds: 2 + fanout: 2 + order_strategy: arbitrary + transfer_strategy: Clustering + max_subclusters: -2 + cluster_swap_rtol: 0.07 + criterion: Tempered + max_objects_per_transfer: 32 + deterministic_transfer: true + +# Specify output +output_dir: ../output +output_file_stem: output_file +visualization: + x_ranks: 2 + y_ranks: 2 + z_ranks: 1 + object_jitter: 0.5 + rank_qoi: load + object_qoi: load + force_continuous_object_qoi: true + output_visualization_dir: ../output + output_visualization_file_stem: output_file diff --git a/tests/unit/config/conf_wrong_max_subclusters_type.yml b/tests/unit/config/conf_wrong_max_subclusters_type.yml new file mode 100644 index 000000000..4b7d4b9d8 --- /dev/null +++ b/tests/unit/config/conf_wrong_max_subclusters_type.yml @@ -0,0 +1,44 @@ +# Specify input +from_data: + data_stem: ../data/synthetic_lb_data/data + phase_ids: + - 0 +check_schema: false + +# Specify work model +work_model: + name: AffineCombination + parameters: + alpha: 1.0 + beta: 0.0 + gamma: 0.0 + +# Specify algorithm +algorithm: + name: InformAndTransfer + phase_id: 0 + parameters: + n_iterations: 4 + n_rounds: 2 + fanout: 2 + order_strategy: arbitrary + transfer_strategy: Clustering + max_subclusters: 10.0 + cluster_swap_rtol: 0.07 + criterion: Tempered + max_objects_per_transfer: 32 + deterministic_transfer: true + +# Specify output +output_dir: ../output +output_file_stem: output_file +visualization: + x_ranks: 2 + y_ranks: 2 + z_ranks: 1 + object_jitter: 0.5 + rank_qoi: load + object_qoi: load + force_continuous_object_qoi: true + output_visualization_dir: ../output + output_visualization_file_stem: output_file diff --git a/tests/unit/test_configuration_validator.py b/tests/unit/test_configuration_validator.py index 0a051b5a6..d8d4f89e5 100644 --- a/tests/unit/test_configuration_validator.py +++ b/tests/unit/test_configuration_validator.py @@ -222,7 +222,7 @@ def test_config_validator_wrong_clustering_set_tol_type(self): configuration = yaml.safe_load(yaml_str) with self.assertRaises(SchemaError) as err: ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() - self.assertEqual(err.exception.args[0], "Should be of type 'float' and magnitude > 0.0") + self.assertEqual(err.exception.args[0], "Should be of type 'float' and > 0.0") def test_config_validator_wrong_clustering_set_tol_mag(self): with open(os.path.join(self.config_dir, "conf_wrong_clustering_set_tol_mag.yml"), "rt", encoding="utf-8") as config_file: @@ -230,7 +230,29 @@ def test_config_validator_wrong_clustering_set_tol_mag(self): configuration = yaml.safe_load(yaml_str) with self.assertRaises(SchemaError) as err: ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() - self.assertEqual(err.exception.args[0], "Should be of type 'float' and magnitude > 0.0") + self.assertEqual(err.exception.args[0], "Should be of type 'float' and > 0.0") + + def test_config_validator_correct_clustering_target_imb(self): + with open(os.path.join(self.config_dir, "conf_correct_clustering_target_imb.yml"), "rt", encoding="utf-8") as config_file: + yaml_str = config_file.read() + configuration = yaml.safe_load(yaml_str) + ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() + + def test_config_validator_wrong_max_subclusters_type(self): + with open(os.path.join(self.config_dir, "conf_wrong_max_subclusters_type.yml"), "rt", encoding="utf-8") as config_file: + yaml_str = config_file.read() + configuration = yaml.safe_load(yaml_str) + with self.assertRaises(SchemaError) as err: + ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() + self.assertEqual(err.exception.args[0], "Should be of type 'int' and > 0") + + def test_config_validator_wrong_max_subclusters_mag(self): + with open(os.path.join(self.config_dir, "conf_wrong_max_subclusters_mag.yml"), "rt", encoding="utf-8") as config_file: + yaml_str = config_file.read() + configuration = yaml.safe_load(yaml_str) + with self.assertRaises(SchemaError) as err: + ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main() + self.assertEqual(err.exception.args[0], "Should be of type 'int' and > 0") def test_config_validator_correct_clustering_target_imb(self): with open(os.path.join(self.config_dir, "conf_correct_clustering_target_imb.yml"), "rt", encoding="utf-8") as config_file: diff --git a/tests/test_lbs_inform_and_transfer_algorithm.py b/tests/unit/test_lbs_inform_and_transfer_algorithm.py similarity index 98% rename from tests/test_lbs_inform_and_transfer_algorithm.py rename to tests/unit/test_lbs_inform_and_transfer_algorithm.py index 1fac9ac92..156d41bf5 100644 --- a/tests/test_lbs_inform_and_transfer_algorithm.py +++ b/tests/unit/test_lbs_inform_and_transfer_algorithm.py @@ -24,6 +24,7 @@ def setUp(self): "order_strategy": "element_id", "transfer_strategy": "Recursive", "criterion": "Tempered", + "max_subclusters": 15, "max_objects_per_transfer": 8, "deterministic_transfer": True },