Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#484: Keep tempered algorithm in sync with implementation in vt #485

Merged
2 changes: 2 additions & 0 deletions config/challenging-toy-hundreds-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ algorithm:
fanout: 4
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10
cluster_swap_rtol: 0.05
criterion: Tempered
max_objects_per_transfer: 500
deterministic_transfer: false
Expand Down
3 changes: 0 additions & 3 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
import os

from ..import PROJECT_PATH
from ..IO.lbsStatistics import compute_function_statistics
from ..Model.lbsRank import Rank
from ..Model.lbsPhase import Phase
Expand Down Expand Up @@ -95,8 +94,6 @@
# pylint:enable=W0641:possibly-unused-variable,C0415:import-outside-toplevel

# Ensure that algorithm name is valid
algorithm = locals()[algorithm_name + "Algorithm"]
return algorithm(work_model, parameters, logger, rank_qoi, object_qoi)
try:
# Instantiate and return object
algorithm = locals()[algorithm_name + "Algorithm"]
Expand Down Expand Up @@ -250,4 +247,4 @@
:param: a_min_max: possibly empty list of optimal arrangements.
"""

pass

Check warning on line 250 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unnecessary pass statement (unnecessary-pass)
18 changes: 14 additions & 4 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
# Call superclass init
super(ClusteringTransferStrategy, self).__init__(criterion, parameters, lgr)

# Initialize maximum number of subclusters
self._max_subclusters = parameters.get("max_subclusters", math.inf)
self._logger.info(
f"Maximum number of visited subclusters: {self._max_subclusters}")

# Initialize cluster swap relative threshold
self._cluster_swap_rtol = parameters.get("cluster_swap_rtol",0.05)
self._logger.info(
Expand Down Expand Up @@ -59,7 +64,7 @@

# Build dict of clusters with their load
n_inspect, subclusters = 0, {}
for i, v in enumerate(clusters):

Check warning on line 67 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused variable 'i' (unused-variable)
# Determine maximum subcluster size
n_o = min(self._max_objects_per_transfer, (n_o_sub := len(v)))
self._logger.debug(
Expand All @@ -71,7 +76,7 @@
combinations(v, p)
for p in range(1, n_o + 1)) if self._deterministic_transfer else (
tuple(random.sample(v, p))
for p in nr.binomial(n_o, 0.5, min(n_o, 10)))):
for p in nr.binomial(n_o, 0.5, min(n_o, self._max_subclusters)))):
# Reject subclusters overshooting within relative tolerance
reach_load = rank_load - sum([o.get_load() for o in c])
if reach_load < (1.0 - self._cluster_swap_rtol) * self._average_load:
Expand All @@ -92,13 +97,14 @@
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
n_swaps, n_swap_tries, n_sub_transfers, n_sub_tries = 0, 0, 0, 0
n_swaps, n_swap_tries = 0, 0
n_sub_skipped, n_sub_transfers, n_sub_tries = 0, 0, 0

# Iterate over ranks
ranks = phase.get_ranks()
rank_targets = self._get_ranks_to_traverse(ranks, known_peers)
for r_src, targets in rank_targets.items():
# Cluster migratiable objects on source rank
# Cluster migratable objects on source rank
clusters_src = self.__build_rank_clusters(r_src, True)
self._logger.debug(
f"Constructed {len(clusters_src)} migratable clusters on source rank {r_src.get_id()}")
Expand All @@ -116,7 +122,7 @@
self._logger.debug(
f"Constructed {len(clusters_try)} migratable clusters on target rank {r_try.get_id()}")

# Iterate over potential targets to try to swap clusters
# Iterate over source clusters
for k_src, o_src in clusters_src.items():
# Iterate over target clusters
for k_try, o_try in clusters_try.items():
Expand Down Expand Up @@ -147,6 +153,7 @@

# In non-deterministic case skip subclustering when swaps passed
if not self._deterministic_transfer:
n_sub_skipped += 1
continue

# Iterate over subclusters only when no swaps were possible
Expand Down Expand Up @@ -202,6 +209,9 @@
if n_sub_tries:
self._logger.info(
f"Transferred {n_sub_transfers} subcluster amongst {n_sub_tries} tries ({100 * n_sub_transfers / n_sub_tries:.2f}%)")
if n_sub_skipped:
self._logger.info(
f"Skipped subclustering for {n_sub_skipped} ranks ({100 * n_sub_skipped / len(ranks):.2f}%)")

# Return object transfer counts
return len(ranks) - len(rank_targets), self._n_transfers, self._n_rejects
6 changes: 5 additions & 1 deletion src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ def __init__(self, config_to_validate: dict, logger: Logger):
Optional("cluster_swap_rtol"): And(
float,
lambda x: x > 0.0,
error="Should be of type 'float' and magnitude > 0.0"),
error="Should be of type 'float' and > 0.0"),
Optional("max_subclusters"): And(
int,
lambda x: x > 0.0,
error="Should be of type 'int' and > 0"),
cwschilly marked this conversation as resolved.
Show resolved Hide resolved
"criterion": And(
str,
lambda f: f in ALLOWED_CRITERIA,
Expand Down
1 change: 1 addition & 0 deletions tests/unit/config/conf_correct_clustering_set_tol.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ algorithm:
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10
cluster_swap_rtol: 0.07
criterion: Tempered
max_objects_per_transfer: 32
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/config/conf_wrong_max_subclusters_mag.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Specify input
from_data:
data_stem: ../data/synthetic_lb_data/data
phase_ids:
- 0
check_schema: false

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: -2
cluster_swap_rtol: 0.07
criterion: Tempered
max_objects_per_transfer: 32
deterministic_transfer: true

# Specify output
output_dir: ../output
output_file_stem: output_file
visualization:
x_ranks: 2
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
object_qoi: load
force_continuous_object_qoi: true
output_visualization_dir: ../output
output_visualization_file_stem: output_file
44 changes: 44 additions & 0 deletions tests/unit/config/conf_wrong_max_subclusters_type.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Specify input
from_data:
data_stem: ../data/synthetic_lb_data/data
phase_ids:
- 0
check_schema: false

# Specify work model
work_model:
name: AffineCombination
parameters:
alpha: 1.0
beta: 0.0
gamma: 0.0

# Specify algorithm
algorithm:
name: InformAndTransfer
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
max_subclusters: 10.0
cluster_swap_rtol: 0.07
criterion: Tempered
max_objects_per_transfer: 32
deterministic_transfer: true

# Specify output
output_dir: ../output
output_file_stem: output_file
visualization:
x_ranks: 2
y_ranks: 2
z_ranks: 1
object_jitter: 0.5
rank_qoi: load
object_qoi: load
force_continuous_object_qoi: true
output_visualization_dir: ../output
output_visualization_file_stem: output_file
26 changes: 24 additions & 2 deletions tests/unit/test_configuration_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,37 @@ def test_config_validator_wrong_clustering_set_tol_type(self):
configuration = yaml.safe_load(yaml_str)
with self.assertRaises(SchemaError) as err:
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'float' and magnitude > 0.0")
self.assertEqual(err.exception.args[0], "Should be of type 'float' and > 0.0")

def test_config_validator_wrong_clustering_set_tol_mag(self):
with open(os.path.join(self.config_dir, "conf_wrong_clustering_set_tol_mag.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
with self.assertRaises(SchemaError) as err:
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'float' and magnitude > 0.0")
self.assertEqual(err.exception.args[0], "Should be of type 'float' and > 0.0")

def test_config_validator_correct_clustering_target_imb(self):
with open(os.path.join(self.config_dir, "conf_correct_clustering_target_imb.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()

def test_config_validator_wrong_max_subclusters_type(self):
with open(os.path.join(self.config_dir, "conf_wrong_max_subclusters_type.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
with self.assertRaises(SchemaError) as err:
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'int' and > 0")

def test_config_validator_wrong_max_subclusters_mag(self):
with open(os.path.join(self.config_dir, "conf_wrong_max_subclusters_mag.yml"), "rt", encoding="utf-8") as config_file:
yaml_str = config_file.read()
configuration = yaml.safe_load(yaml_str)
with self.assertRaises(SchemaError) as err:
ConfigurationValidator(config_to_validate=configuration, logger=get_logger()).main()
self.assertEqual(err.exception.args[0], "Should be of type 'int' and > 0")

def test_config_validator_correct_clustering_target_imb(self):
with open(os.path.join(self.config_dir, "conf_correct_clustering_target_imb.yml"), "rt", encoding="utf-8") as config_file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def setUp(self):
"order_strategy": "element_id",
"transfer_strategy": "Recursive",
"criterion": "Tempered",
"max_subclusters": 15,
"max_objects_per_transfer": 8,
"deterministic_transfer": True
},
Expand Down
Loading