diff --git a/src/lbaf/Applications/LBAF_app.py b/src/lbaf/Applications/LBAF_app.py index cb900fa7..a1204f01 100644 --- a/src/lbaf/Applications/LBAF_app.py +++ b/src/lbaf/Applications/LBAF_app.py @@ -86,6 +86,7 @@ class InternalParameters: # From data input options data_stem: Optional[str] = None + ranks_per_node : Optional[int] = 1 # From samplers input options n_ranks: Optional[int] = None @@ -132,10 +133,15 @@ def init_parameters(self, config: dict, base_dir: str): if param_key in self.__allowed_config_keys: self.__dict__[param_key] = param_val + # Assume 1 rank per node by default + self.ranks_per_node = 1 + # Parse data parameters if present from_data = config.get("from_data") if from_data is not None: self.data_stem = from_data.get("data_stem") + if from_data.get("ranks_per_node") is not None: + self.ranks_per_node = from_data.get("ranks_per_node") # # get data directory (because data_stem includes file prefix) data_dir = f"{os.sep}".join(self.data_stem.split(os.sep)[:-1]) file_prefix = self.data_stem.split(os.sep)[-1] @@ -532,7 +538,8 @@ def run(self, cfg=None, cfg_dir=None): logger=self.__logger, file_suffix=file_suffix if file_suffix is not None else "json", check_schema=check_schema, - expected_ranks=self.__parameters.expected_ranks) + expected_ranks=self.__parameters.expected_ranks, + ranks_per_node=self.__parameters.ranks_per_node) # Retrieve n_ranks n_ranks = reader.n_ranks diff --git a/src/lbaf/Execution/lbsAlgorithmBase.py b/src/lbaf/Execution/lbsAlgorithmBase.py index 28ee5a16..6a30419f 100644 --- a/src/lbaf/Execution/lbsAlgorithmBase.py +++ b/src/lbaf/Execution/lbsAlgorithmBase.py @@ -46,6 +46,7 @@ from ..IO.lbsStatistics import compute_function_statistics from ..Model.lbsRank import Rank +from ..Model.lbsNode import Node from ..Model.lbsPhase import Phase from ..Model.lbsWorkModelBase import WorkModelBase from ..Utils.lbsLogging import Logger @@ -270,14 +271,25 @@ def _initialize(self, p_id, phases, distributions, statistics): # Try to copy ranks from phase to be rebalanced to processed one try: + ranks_per_node = 1 + new_nodes: List[Node] = [] new_ranks: Set[Rank] = set() + + if len(phases[p_id].get_ranks()) > 0 and phases[p_id].get_ranks()[0].node is not None: + ranks_per_node = phases[p_id].get_ranks()[0].node.get_number_of_ranks() + if ranks_per_node > 1: + n_nodes = int(len(phases[p_id].get_ranks()) / ranks_per_node) + new_nodes = list(map( + lambda n_id: Node(self._logger, n_id), + list(range(0, n_nodes)))) for r in phases[p_id].get_ranks(): # Minimally instantiate rank and copy new_r = Rank(self._logger) - new_r.copy(r) + new_r.copy(r, new_nodes, ranks_per_node) new_ranks.add(new_r) - self._rebalanced_phase.set_ranks(new_ranks) + self._rebalanced_phase.set_ranks(new_ranks) except Exception as err: + print(f"{err}") self._logger.error(f"No phase with index {p_id} is available for processing") raise SystemExit(1) from err self._logger.info( diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index f5bb555b..2e63f19e 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -147,7 +147,11 @@ def __init__(self, config_to_validate: dict, logger: Logger): Optional("expected_ranks"): And( int, lambda x: x > 0, - error="Should be of type 'int' and > 0") + error="Should be of type 'int' and > 0"), + Optional("ranks_per_node"): And( + int, + lambda x: x > 0, + error="Should be of type 'int' and > 0"), }) self.__from_samplers = Schema({ "n_ranks": And( diff --git a/src/lbaf/IO/lbsVTDataReader.py b/src/lbaf/IO/lbsVTDataReader.py index 8e6cb626..b461b626 100644 --- a/src/lbaf/IO/lbsVTDataReader.py +++ b/src/lbaf/IO/lbsVTDataReader.py @@ -54,6 +54,7 @@ from ..Model.lbsObject import Object from ..Model.lbsObjectCommunicator import ObjectCommunicator from ..Model.lbsRank import Rank +from ..Model.lbsNode import Node class LoadReader: """A class to read VT Object Map files. These json files could be compressed with Brotli. @@ -75,7 +76,7 @@ class LoadReader: } def __init__( - self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None): + self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None, ranks_per_node=1): # The base directory and file name for the log files self.__file_prefix = file_prefix @@ -108,6 +109,8 @@ def __init__( # determine the number of ranks self.n_ranks = self._get_n_ranks() + self.ranks_per_node = ranks_per_node + self.__logger.info(f"Ranks per node: {ranks_per_node}") self.__logger.info(f"Number of ranks: {self.n_ranks}") # warn user if expected_ranks is set and is different from n_ranks @@ -199,7 +202,7 @@ def _load_vt_file(self, rank_id: int): # Return rank ID and data dictionary return rank_id, decompressed_dict - def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]: + def _populate_rank(self, phase_id: int, rank_id: int, nodes : List[Node]) -> Tuple[Rank,dict]: """ Populate rank and its communicator in phase using the JSON content.""" # Seek phase with given ID @@ -267,7 +270,10 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]: self.__communications_dict.setdefault(phase_id, {rank_id: {}}) # Instantiante rank for current phase - phase_rank = Rank(self.__logger, rank_id) + node = None + if self.ranks_per_node > 1: + node = nodes[int(rank_id / self.ranks_per_node)] + phase_rank = Rank(self.__logger, rank_id, node=node) phase_rank.set_metadata(self.__metadata[rank_id]) # Initialize storage for shared blocks information @@ -347,10 +353,17 @@ def populate_phase(self, phase_id: int) -> List[Rank]: ranks: List[Rank] = [None] * self.n_ranks communications = {} + nodes : List[Node] = [] + if self.ranks_per_node > 1: + n_nodes = int(self.n_ranks / self.ranks_per_node) + nodes = list(map( + lambda n_id: Node(self.__logger, n_id), + list(range(0, n_nodes)))) + # Iterate over all ranks for rank_id in range(self.n_ranks): # Read data for given phase and assign it to rank - ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id) + ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id, nodes) # Merge rank communication with existing ones for k, v in rank_comm.items(): diff --git a/src/lbaf/Model/lbsAffineCombinationWorkModel.py b/src/lbaf/Model/lbsAffineCombinationWorkModel.py index 303a7d6d..a347f0ad 100644 --- a/src/lbaf/Model/lbsAffineCombinationWorkModel.py +++ b/src/lbaf/Model/lbsAffineCombinationWorkModel.py @@ -94,10 +94,15 @@ def compute(self, rank: Rank): alpha * load + beta * max(sent, received) + gamma, under optional strict upper bounds. """ - # Check whether strict bounds are satisfied - for k, v in self.__upper_bounds.items(): - if getattr(rank, f"get_{k}")() > v: - return math.inf + if rank.node is None: + # Check whether strict bounds are satisfied + for k, v in self.__upper_bounds.items(): + if getattr(rank, f"get_{k}")() > v: + return math.inf + else: + for k, v in self.__upper_bounds.items(): + if getattr(rank.node, f"get_{k}")() > v * rank.node.get_number_of_ranks(): + return math.inf # Return combination of load and volumes return self.affine_combination( diff --git a/src/lbaf/Model/lbsNode.py b/src/lbaf/Model/lbsNode.py new file mode 100644 index 00000000..116fb96a --- /dev/null +++ b/src/lbaf/Model/lbsNode.py @@ -0,0 +1,74 @@ +# +#@HEADER +############################################################################### +# +# lbsNode.py +# DARMA/LB-analysis-framework => LB Analysis Framework +# +# Copyright 2019-2024 National Technology & Engineering Solutions of Sandia, LLC +# (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +# Government retains certain rights in this software. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# Questions? Contact darma@sandia.gov +# +############################################################################### +#@HEADER +# + +import copy +import math +import functools +import operator +from logging import Logger +from typing import Optional + +class Node: + """A class representing a node to which a set of ranks are assigned.""" + + def __init__( + self, + logger: Logger, + n_id: int = -1): + + # Assign logger to instance variable + self.__logger = logger #pylint:disable=unused-private-member + + # Member variables passed by constructor + self.__index = n_id + self.__ranks = set() + + def get_max_memory_usage(self): + # Combine all memory usages for each rank to get the node memory usage + return functools.reduce(operator.add, map(lambda a: a.get_max_memory_usage(), list(self.__ranks))) + + def add_rank(self, rank): + self.__ranks.add(rank) + + def get_number_of_ranks(self) -> int: + return len(self.__ranks) diff --git a/src/lbaf/Model/lbsRank.py b/src/lbaf/Model/lbsRank.py index d9fd54fb..dc412822 100644 --- a/src/lbaf/Model/lbsRank.py +++ b/src/lbaf/Model/lbsRank.py @@ -47,7 +47,7 @@ from .lbsBlock import Block from .lbsObject import Object - +from .lbsNode import Node class Rank: """A class representing a rank to which objects are assigned.""" @@ -57,7 +57,8 @@ def __init__( logger: Logger, r_id: int = -1, mo: set = None, - so: set = None): + so: set = None, + node : Node = None): # Assign logger to instance variable self.__logger = logger #pylint:disable=unused-private-member @@ -82,12 +83,21 @@ def __init__( # Start with empty metadata self.__metadata = {} - def copy(self, rank): + # Optionally, the rank is connected to a node + self.node = node + if node is not None: + node.add_rank(self) + + def copy(self, rank, nodes=[], ranks_per_node=1): """Specialized copy method.""" # Copy all flat member variables self.__index = rank.get_id() self.__size = rank.get_size() + if len(nodes) > 0 and ranks_per_node > 1: + self.node = nodes[int(self.__index / ranks_per_node)] + self.node.add_rank(self) + # Shallow copy owned objects self.__shared_blocks = copy.copy(rank.__shared_blocks) self.__sentinel_objects = copy.copy(rank.__sentinel_objects)