Skip to content

Commit

Permalink
#566: complete lb iterations support (#570)
Browse files Browse the repository at this point in the history
* #566: use lower case in function makes as per PEP

* #566: fixed imports from PR569 in cases when lbaf package not installed

* #566: fixed incorrect comment

* #566: Python style: do not use LB but lb

* #566: fixed instances of LB in files

* #566: LB iterations persist as sub-indexed phases

* #566: old style typing needed for 3.8 to pass

* #566: fixed tests for new syntax

* #566: fixed more test errors

* #566: restored defaut conf.yaml

* #566: progress checkpoint to exercise CI -- iterations passed to writer

* #566: whitespace cleanup

* #566: fixed typo

* #566: completed implementation

* #566: fixed bug that prevented supporting online compatibility

* #566: fixed whitespace cleanup

* #566: Ensure proper retention in correct phase of lb iterations in offline mode

* #566: resolve pylint errors

* #566: proper initialization

* #566: factor out identical rank copying and fixed duplicate code

* #566: no typing inside implementations

* #566: reorganized code and fixed lb iterations comm

* #566: whitespace cleanup

* #566: fix communications variable name

* #566: fixed missing communications

* #566: removed debug statements

* #566: style

* #566: removed forgotten test routine bypass

* #566: fixed bug

* #566: whitespace cleanup and other improvements

---------

Co-authored-by: Caleb Schilly <[email protected]>
Co-authored-by: Pierre Pebay <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent 1417345 commit f59b8d4
Show file tree
Hide file tree
Showing 30 changed files with 276 additions and 166 deletions.
2 changes: 1 addition & 1 deletion config/ccm-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ write_JSON:
compressed: true
suffix: json
communications: true
offline_LB_compatible: true
offline_lb_compatible: true
visualization:
x_ranks: 2
y_ranks: 1
Expand Down
4 changes: 2 additions & 2 deletions config/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ algorithm:
n_iterations: 8
n_rounds: 2
fanout: 2
order_strategy: arbitrary
order_strategy: element_id
transfer_strategy: Recursive
criterion: Tempered
max_objects_per_transfer: 8
Expand All @@ -35,5 +35,5 @@ write_JSON:
compressed: false
suffix: json
communications: true
offline_LB_compatible: false
offline_lb_compatible: true
lb_iterations: true
2 changes: 1 addition & 1 deletion config/permute.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ write_JSON:
compressed: false
suffix: json
communications: true
offline_LB_compatible: false
offline_lb_compatible: false
visualization:
x_ranks: 2
y_ranks: 2
Expand Down
3 changes: 2 additions & 1 deletion config/synthetic-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ write_JSON:
compressed: false
suffix: json
communications: true
offline_LB_compatible: true
offline_lb_compatible: true
lb_iterations: true
2 changes: 1 addition & 1 deletion config/test-vt-tv.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ write_JSON:
compressed: False
suffix: json
communications: True
offline_LB_compatible: True
offline_lb_compatible: True
2 changes: 1 addition & 1 deletion config/user-defined-memory-toy-problem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ write_JSON:
compressed: true
suffix: json
communications: true
offline_LB_compatible: true
offline_lb_compatible: true
45 changes: 21 additions & 24 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@
import importlib
import yaml

from ..Model.lbsRank import Rank
from ..Model.lbsObject import Object

try:
import vttv
USING_VTTV = True
Expand All @@ -67,6 +64,8 @@
from lbaf.IO.lbsConfigurationValidator import ConfigurationValidator
from lbaf.IO.lbsVTDataReader import LoadReader
from lbaf.IO.lbsVTDataWriter import VTDataWriter
from lbaf.Model.lbsRank import Rank
from lbaf.Model.lbsObject import Object
from lbaf.Model.lbsPhase import Phase
from lbaf.Model.lbsWorkModelBase import WorkModelBase
from lbaf.Utils.lbsArgumentParser import PromptArgumentParser
Expand All @@ -78,7 +77,7 @@

class InternalParameters:
"""Represent the parameters used internally by a LBAF Application"""

# Private logger
__logger: Logger

# General input options
Expand Down Expand Up @@ -139,7 +138,7 @@ def init_parameters(self, config: dict, base_dir: str):
from_data = config.get("from_data")
if from_data is not None:
self.data_stem = from_data.get("data_stem")
# # get data directory (because data_stem includes file prefix)
# Get data directory because data_stem includes file prefix
data_dir = f"{os.sep}".join(self.data_stem.split(os.sep)[:-1])
file_prefix = self.data_stem.split(os.sep)[-1]
data_dir = abspath(data_dir, relative_to=base_dir)
Expand Down Expand Up @@ -204,7 +203,7 @@ def init_parameters(self, config: dict, base_dir: str):
for k_out, k_wrt, v_def in [
("json_output_suffix", "suffix", "json"),
("communications", "communications", False),
("offline_LB_compatible", "offline_LB_compatible", False),
("offline_lb_compatible", "offline_lb_compatible", False),
("lb_iterations", "lb_iterations", False)]:
self.json_params[k_out] = wrt_json.get(k_wrt, v_def)

Expand Down Expand Up @@ -422,9 +421,9 @@ def remove_get(name : str) -> str:
return name[4:] if name.startswith("get_") else name

r = Rank(self.__logger)
rank_qois = r.get_QOIs()
rank_qois = r.get_qois()
o = Object(seq_id=0)
object_qois = o.get_QOIs()
object_qois = o.get_qois()

# Print QOI based on verbosity level
if verbosity > 0:
Expand Down Expand Up @@ -570,17 +569,16 @@ def run(self, cfg=None, cfg_dir=None):
self.__logger)

# Execute runtime for specified phases
offline_LB_compatible = self.__parameters.json_params.get(
"offline_LB_compatible", False)
lb_iterations = self.__parameters.json_params.get(
"lb_iterations", False)
offline_lb_compatible = self.__parameters.json_params.get(
"offline_lb_compatible", False)
rebalanced_phase = runtime.execute(
self.__parameters.algorithm.get("phase_id", 0),
1 if offline_LB_compatible else 0)
1 if offline_lb_compatible else 0,
self.__parameters.json_params.get("lb_iterations", False))

# Instantiate phase to VT file writer when requested
if self.__json_writer:
if offline_LB_compatible:
if offline_lb_compatible:
# Add rebalanced phase when present
if not rebalanced_phase:
self.__logger.warning(
Expand All @@ -600,15 +598,14 @@ def run(self, cfg=None, cfg_dir=None):
# Insert rebalanced phase into dictionary of phases
phases[p_id] = rebalanced_phase

# Write all phases
# Write all phasesOA
self.__logger.info(
f"Writing all ({len(phases)}) phases for offline load-balancing")
self.__json_writer.write(phases)
else:
# Add new phase when load balancing when offline mode not selected
self.__logger.info(f"Creating rebalanced phase {phase_id}")
self.__json_writer.write(
{phase_id: rebalanced_phase})
self.__json_writer.write({phase_id: rebalanced_phase})

# Generate meshes and multimedia when requested
if self.__parameters.grid_size:
Expand All @@ -618,7 +615,7 @@ def run(self, cfg=None, cfg_dir=None):
"Grid size: {self.__parameters.grid_size} < {n_ranks}")
raise SystemExit(1)

# Call vttv visualization
# Call vt-tv visualization when requested
if USING_VTTV:
self.__logger.info("Calling vt-tv")

Expand All @@ -628,11 +625,11 @@ def run(self, cfg=None, cfg_dir=None):
for r in p.get_ranks():
rank_phases.setdefault(r.get_id(), {})
rank_phases[r.get_id()][p.get_id()] = r

ranks_json_str = []
for i in range(len(rank_phases.items())):
ranks_json_str.append(self.__json_writer._json_serializer((i, rank_phases[i])))

# Retrieve vt-tv parameters
vttv_params = {
"x_ranks": self.__parameters.grid_size[0],
"y_ranks": self.__parameters.grid_size[1],
Expand All @@ -643,20 +640,20 @@ def run(self, cfg=None, cfg_dir=None):
"save_meshes": self.__parameters.save_meshes,
"force_continuous_object_qoi": self.__parameters.continuous_object_qoi,
"output_visualization_dir": self.__parameters.output_dir,
"output_visualization_file_stem": self.__parameters.output_file_stem
}
"output_visualization_file_stem": self.__parameters.output_file_stem}

# Retrieve grid topology
num_ranks = (
self.__parameters.grid_size[0] *
self.__parameters.grid_size[1] *
self.__parameters.grid_size[2]
)
self.__parameters.grid_size[2] )

vttv.tvFromJson(ranks_json_str, str(vttv_params), num_ranks)

# Report on rebalanced phase when available
if rebalanced_phase:
l_stats, w_stats = self.__print_statistics(rebalanced_phase, "rebalanced", runtime.get_work_model())
l_stats, w_stats = self.__print_statistics(
rebalanced_phase, "rebalanced", runtime.get_work_model())
with open(
"imbalance.txt" if self.__parameters.output_dir is None
else os.path.join(
Expand Down
53 changes: 29 additions & 24 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#
import abc
import os

Check warning on line 44 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused import os (unused-import)

Check warning on line 44 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused import os (unused-import)
from typing import Set
from typing import Set, List

Check warning on line 45 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Set imported from typing (unused-import)

Check warning on line 45 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Set imported from typing (unused-import)

from ..IO.lbsStatistics import compute_function_statistics
from ..Model.lbsRank import Rank

Check warning on line 48 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Rank imported from Model.lbsRank (unused-import)

Check warning on line 48 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Rank imported from Model.lbsRank (unused-import)
Expand All @@ -53,12 +53,17 @@

class AlgorithmBase:
"""An abstract base class of load/work balancing algorithms."""

__metaclass__ = abc.ABCMeta

_work_model: WorkModelBase
# Protected logger
_logger: Logger

# Concrete algorithms need a work model
_work_model: WorkModelBase=None

# Iterative algorithms are allowed to store load balancing iterations
_lb_iterations: List[Phase]

def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger):
"""Class constructor.
Expand All @@ -82,10 +87,14 @@ def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger):
self._logger.error("Could not create an algorithm without a dictionary of parameters")
raise SystemExit(1)

# Initially no phase is assigned for processing
# By default algorithms are not assumed to be iterative
self._lb_iterations = []

# Initially no phases are assigned for rebalancing
self._initial_phase = None
self._rebalanced_phase = None

# Save the initial communications data
# Keep track of phase communications
self._initial_communications = {}

# Map rank statistics to their respective computation methods
Expand All @@ -95,14 +104,18 @@ def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger):
("ranks", lambda x: self._work_model.compute(x)): {

Check warning on line 104 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Lambda may not be necessary (unnecessary-lambda)

Check warning on line 104 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Lambda may not be necessary (unnecessary-lambda)
"total work": "sum"}}

def get_rebalanced_phase(self):
"""Return phased assigned for processing by algoritm."""
return self._rebalanced_phase

def get_initial_communications(self):
"""Return the initial phase communications."""
return self._initial_communications

def get_initial_phase(self):
"""Return initial phase."""
return self._initial_phase

def get_rebalanced_phase(self):
"""Return rebalanced phased."""
return self._rebalanced_phase

@staticmethod
def factory(
algorithm_name:str,
Expand Down Expand Up @@ -168,25 +181,17 @@ def _initialize(self, p_id, phases, statistics):
self._logger.error("Algorithm execution requires a dictionary of phases")
raise SystemExit(1)

# Set initial communications for given rank
self._initial_communications[p_id] = phases[p_id].get_communications()

# Create a new phase to preserve phase to be rebalanced
self._logger.info(f"Creating new phase {p_id} for rebalancing")
self._rebalanced_phase = Phase(self._logger, p_id)

# Try to copy ranks from phase to be rebalanced to processed one
# Try to retrieve and keep track of phase to be processed
try:
new_ranks: Set[Rank] = set()
for r in phases[p_id].get_ranks():
# Minimally instantiate rank and copy
new_r = Rank(self._logger)
new_r.copy(r)
new_ranks.add(new_r)
self._rebalanced_phase.set_ranks(new_ranks)
self._initial_phase = phases[p_id]
except Exception as err:
self._logger.error(f"No phase with index {p_id} is available for processing")
raise SystemExit(1) from err
self._initial_communications[p_id] = self._initial_phase.get_communications()

# Create storage for rebalanced phase
self._rebalanced_phase = Phase(self._logger, p_id)
self._rebalanced_phase.copy_ranks(self._initial_phase)
self._logger.info(
f"Processing phase {p_id} "
f"with {self._rebalanced_phase.get_number_of_objects()} objects "
Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/Execution/lbsCriterionBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@

class CriterionBase:
"""An abstract base class of optimization criteria for LBAF execution."""

__metaclass__ = abc.ABCMeta

# Protected logger
_logger: Logger

def __init__(self, work_model: WorkModelBase, logger: Logger):
Expand Down
24 changes: 17 additions & 7 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
#
import random
import time
from typing import Set
from logging import Logger

from .lbsAlgorithmBase import AlgorithmBase
from .lbsCriterionBase import CriterionBase
from .lbsTransferStrategyBase import TransferStrategyBase
from ..Model.lbsRank import Rank
from ..Model.lbsMessage import Message
from ..Model.lbsPhase import Phase
from ..IO.lbsStatistics import min_Hamming_distance, print_function_statistics


Expand Down Expand Up @@ -242,7 +244,8 @@ def execute(self, p_id: int, phases: list, statistics: dict, a_min_max):

# Execute transfer stage
n_ignored, n_transfers, n_rejects = self.__transfer_strategy.execute(
self.__known_peers, self._rebalanced_phase, statistics["average load"], statistics["maximum load"][-1])
self.__known_peers, self._rebalanced_phase, statistics[
"average load"], statistics["maximum load"][-1])
if (n_proposed := n_transfers + n_rejects):
self._logger.info(
f"Transferred {n_transfers} objects amongst {n_proposed} proposed "
Expand All @@ -264,15 +267,22 @@ def execute(self, p_id: int, phases: list, statistics: dict, a_min_max):
# Update run statistics
self._update_statistics(statistics)

# Compute current arrangement
arrangement = dict(sorted(
{o.get_id(): p.get_id()
for p in self._rebalanced_phase.get_ranks()
for o in p.get_objects()}.items())).values()
self._logger.debug(f"Iteration {i + 1} arrangement: {tuple(arrangement)}")
# Retain load balancing iteration as a phase with sub-index
lb_iteration = Phase(self._logger, p_id, None, i + 1)
lb_iteration.copy_ranks(self._rebalanced_phase)
lb_iteration.set_communications(self._initial_communications[p_id])
self._initial_phase.get_lb_iterations().append(lb_iteration)

# Report minimum Hamming distance when minimax optimum is available
if a_min_max:
# Compute current arrangement
arrangement = dict(sorted(
{o.get_id(): p.get_id()
for p in self._rebalanced_phase.get_ranks()
for o in p.get_objects()}.items())).values()
self._logger.debug(f"Iteration {i + 1} arrangement: {tuple(arrangement)}")

# Compute minimum distance from arrangement to optimum
hd_min = min_Hamming_distance(arrangement, a_min_max)
self._logger.info(
f"Iteration {i + 1} minimum Hamming distance to optimal arrangements: {hd_min}")
Expand Down
7 changes: 6 additions & 1 deletion src/lbaf/Execution/lbsRuntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def get_work_model(self):
"""Return runtime work model."""
return self.__work_model

def execute(self, p_id: int, phase_increment: int=0):
def execute(self, p_id: int, phase_increment: int=0, lb_iterations=False):

Check warning on line 124 in src/lbaf/Execution/lbsRuntime.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused argument 'lb_iterations' (unused-argument)

Check warning on line 124 in src/lbaf/Execution/lbsRuntime.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused argument 'lb_iterations' (unused-argument)
"""Execute runtime for single phase with given ID or multiple phases in selected range."""
# Execute load balancing algorithm
self.__logger.info(
Expand All @@ -135,6 +135,11 @@ def execute(self, p_id: int, phase_increment: int=0):

# Retrieve possibly null rebalanced phase and return it
if (lbp := self.__algorithm.get_rebalanced_phase()):
# Retain lb iterations with initial phase when it is replaced
if not phase_increment:
self.__algorithm.get_rebalanced_phase().set_lb_iterations(
self.__algorithm.get_initial_phase().get_lb_iterations())

# Increment rebalanced phase ID as requested
lbp.set_id((lbp_id := lbp.get_id() + phase_increment))

Expand Down
2 changes: 1 addition & 1 deletion src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"compressed": bool,
Optional("suffix"): str,
Optional("communications"): bool,
Optional("offline_LB_compatible"): bool,
Optional("offline_lb_compatible"): bool,
Optional("lb_iterations"): bool},
})
self.__from_data = Schema({
Expand Down
Loading

0 comments on commit f59b8d4

Please sign in to comment.