Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#431: Add a target imbalance threshold to yaml #455

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from .lbsTransferStrategyBase import TransferStrategyBase
from ..Model.lbsRank import Rank
from ..Model.lbsMessage import Message
from ..IO.lbsStatistics import print_function_statistics, min_Hamming_distance


class InformAndTransferAlgorithm(AlgorithmBase):
Expand Down Expand Up @@ -71,6 +70,9 @@ def __init__(
# No information about peers is known initially
self.__known_peers = {}

# Optional parameter to conclude the iteration process iff imbalance is below the target threshold
self.__target_imbalance = parameters.get("target_imbalance", 0.0)
ppebay marked this conversation as resolved.
Show resolved Hide resolved

def get_known_peers(self):
"""Return all known peers."""
return self.__known_peers
Expand Down Expand Up @@ -207,11 +209,11 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
self._logger.info(f"Iteration complete ({n_ignored} skipped ranks)")

# Compute and report iteration work statistics
print_function_statistics(
self._rebalanced_phase.get_ranks(),
lambda x: self._work_model.compute(x), # pylint:disable=W0108:unnecessary-lambda
f"iteration {i + 1} rank work",
self._logger)
stats = print_function_statistics(
self._rebalanced_phase.get_ranks(),
lambda x: self._work_model.compute(x), # pylint:disable=W0108:unnecessary-lambda
f"iteration {i + 1} rank work",
self._logger)

# Update run distributions and statistics
self._update_distributions_and_statistics(distributions, statistics)
Expand All @@ -231,5 +233,10 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
f"Iteration {i + 1} minimum Hamming distance to optimal arrangements: {hd_min}")
statistics["minimum Hamming distance to optimum"].append(hd_min)

# Check if the current imbalance is within the target_imbalance range
if stats.statistics["imbalance"] <= self.__target_imbalance:
self._logger.info(f"Reached target imbalance of {self.__target_imbalance} after {i + 1} iterations.")
break

# Report final mapping in debug mode
self._report_final_mapping(self._logger)
1 change: 1 addition & 0 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"phase_id": int,
"parameters": {
"n_iterations": int,
Optional("target_imbalance"): float,
"n_rounds": int,
"fanout": int,
"order_strategy": And(
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/config/conf_correct_clustering_target_imb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Specify input
from_data:
data_stem: ../data/synthetic_lb_data/data
phase_ids:
- 0
check_schema: False

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 4
target_imbalance: 0.05
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
criterion: Tempered
max_objects_per_transfer: 32
deterministic_transfer: true

# Specify output
output_dir: ../output
output_file_stem: output_file
LBAF_Viz:
x_ranks: 2
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
object_qoi: load
force_continuous_object_qoi: True
5 changes: 5 additions & 0 deletions tests/unit/test_configuration_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ def test_config_validator_wrong_clustering_set_tol_mag(self):
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'float' and magnitude > 0.0")

def test_config_validator_correct_clustering_target_imb(self):
with open(os.path.join(self.config_dir, "conf_correct_clustering_target_imb.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()

if __name__ == "__main__":
unittest.main()