Skip to content

Commit

Permalink
#558: memory: add node-level memory constraint instead of rank-level
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 21, 2024
1 parent c7130e7 commit c824fa3
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 15 deletions.
9 changes: 8 additions & 1 deletion src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class InternalParameters:

# From data input options
data_stem: Optional[str] = None
ranks_per_node : Optional[int] = 1

# From samplers input options
n_ranks: Optional[int] = None
Expand Down Expand Up @@ -132,10 +133,15 @@ def init_parameters(self, config: dict, base_dir: str):
if param_key in self.__allowed_config_keys:
self.__dict__[param_key] = param_val

# Assume 1 rank per node by default
self.ranks_per_node = 1

# Parse data parameters if present
from_data = config.get("from_data")
if from_data is not None:
self.data_stem = from_data.get("data_stem")
if from_data.get("ranks_per_node") is not None:
self.ranks_per_node = from_data.get("ranks_per_node")
# # get data directory (because data_stem includes file prefix)
data_dir = f"{os.sep}".join(self.data_stem.split(os.sep)[:-1])
file_prefix = self.data_stem.split(os.sep)[-1]
Expand Down Expand Up @@ -532,7 +538,8 @@ def run(self, cfg=None, cfg_dir=None):
logger=self.__logger,
file_suffix=file_suffix if file_suffix is not None else "json",
check_schema=check_schema,
expected_ranks=self.__parameters.expected_ranks)
expected_ranks=self.__parameters.expected_ranks,
ranks_per_node=self.__parameters.ranks_per_node)

# Retrieve n_ranks
n_ranks = reader.n_ranks
Expand Down
16 changes: 14 additions & 2 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

from ..IO.lbsStatistics import compute_function_statistics
from ..Model.lbsRank import Rank
from ..Model.lbsNode import Node
from ..Model.lbsPhase import Phase
from ..Model.lbsWorkModelBase import WorkModelBase
from ..Utils.lbsLogging import Logger
Expand Down Expand Up @@ -270,14 +271,25 @@ def _initialize(self, p_id, phases, distributions, statistics):

# Try to copy ranks from phase to be rebalanced to processed one
try:
ranks_per_node = 1
new_nodes: List[Node] = []

Check failure on line 275 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Undefined variable 'List' (undefined-variable)
new_ranks: Set[Rank] = set()

if len(phases[p_id].get_ranks()) > 0 and phases[p_id].get_ranks()[0].node is not None:
ranks_per_node = phases[p_id].get_ranks()[0].node.get_number_of_ranks()
if ranks_per_node > 1:
n_nodes = int(len(phases[p_id].get_ranks()) / ranks_per_node)
new_nodes = list(map(
lambda n_id: Node(self._logger, n_id),
list(range(0, n_nodes))))
for r in phases[p_id].get_ranks():
# Minimally instantiate rank and copy
new_r = Rank(self._logger)
new_r.copy(r)
new_r.copy(r, new_nodes, ranks_per_node)
new_ranks.add(new_r)
self._rebalanced_phase.set_ranks(new_ranks)
self._rebalanced_phase.set_ranks(new_ranks)
except Exception as err:
print(f"{err}")
self._logger.error(f"No phase with index {p_id} is available for processing")
raise SystemExit(1) from err
self._logger.info(
Expand Down
6 changes: 5 additions & 1 deletion src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ def __init__(self, config_to_validate: dict, logger: Logger):
Optional("expected_ranks"): And(
int,
lambda x: x > 0,
error="Should be of type 'int' and > 0")
error="Should be of type 'int' and > 0"),
Optional("ranks_per_node"): And(
int,
lambda x: x > 0,
error="Should be of type 'int' and > 0"),
})
self.__from_samplers = Schema({
"n_ranks": And(
Expand Down
21 changes: 17 additions & 4 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from ..Model.lbsObject import Object
from ..Model.lbsObjectCommunicator import ObjectCommunicator
from ..Model.lbsRank import Rank
from ..Model.lbsNode import Node

class LoadReader:
"""A class to read VT Object Map files. These json files could be compressed with Brotli.
Expand All @@ -75,7 +76,7 @@ class LoadReader:
}

def __init__(
self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None):
self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None, ranks_per_node=1):
# The base directory and file name for the log files
self.__file_prefix = file_prefix

Expand Down Expand Up @@ -108,6 +109,8 @@ def __init__(

# determine the number of ranks
self.n_ranks = self._get_n_ranks()
self.ranks_per_node = ranks_per_node
self.__logger.info(f"Ranks per node: {ranks_per_node}")
self.__logger.info(f"Number of ranks: {self.n_ranks}")

# warn user if expected_ranks is set and is different from n_ranks
Expand Down Expand Up @@ -199,7 +202,7 @@ def _load_vt_file(self, rank_id: int):
# Return rank ID and data dictionary
return rank_id, decompressed_dict

def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
def _populate_rank(self, phase_id: int, rank_id: int, nodes : List[Node]) -> Tuple[Rank,dict]:
""" Populate rank and its communicator in phase using the JSON content."""

# Seek phase with given ID
Expand Down Expand Up @@ -267,7 +270,10 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
self.__communications_dict.setdefault(phase_id, {rank_id: {}})

# Instantiante rank for current phase
phase_rank = Rank(self.__logger, rank_id)
node = None
if self.ranks_per_node > 1:
node = nodes[int(rank_id / self.ranks_per_node)]
phase_rank = Rank(self.__logger, rank_id, node=node)
phase_rank.set_metadata(self.__metadata[rank_id])

# Initialize storage for shared blocks information
Expand Down Expand Up @@ -347,10 +353,17 @@ def populate_phase(self, phase_id: int) -> List[Rank]:
ranks: List[Rank] = [None] * self.n_ranks
communications = {}

nodes : List[Node] = []
if self.ranks_per_node > 1:
n_nodes = int(self.n_ranks / self.ranks_per_node)
nodes = list(map(
lambda n_id: Node(self.__logger, n_id),
list(range(0, n_nodes))))

# Iterate over all ranks
for rank_id in range(self.n_ranks):
# Read data for given phase and assign it to rank
ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id)
ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id, nodes)

# Merge rank communication with existing ones
for k, v in rank_comm.items():
Expand Down
13 changes: 9 additions & 4 deletions src/lbaf/Model/lbsAffineCombinationWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ def compute(self, rank: Rank):
alpha * load + beta * max(sent, received) + gamma,
under optional strict upper bounds.
"""
# Check whether strict bounds are satisfied
for k, v in self.__upper_bounds.items():
if getattr(rank, f"get_{k}")() > v:
return math.inf
if rank.node is None:
# Check whether strict bounds are satisfied
for k, v in self.__upper_bounds.items():
if getattr(rank, f"get_{k}")() > v:
return math.inf
else:
for k, v in self.__upper_bounds.items():
if getattr(rank.node, f"get_{k}")() > v * rank.node.get_number_of_ranks():
return math.inf

# Return combination of load and volumes
return self.affine_combination(
Expand Down
74 changes: 74 additions & 0 deletions src/lbaf/Model/lbsNode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
#@HEADER
###############################################################################
#
# lbsNode.py
# DARMA/LB-analysis-framework => LB Analysis Framework
#
# Copyright 2019-2024 National Technology & Engineering Solutions of Sandia, LLC
# (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
# Government retains certain rights in this software.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Questions? Contact [email protected]
#
###############################################################################
#@HEADER
#

import copy

Check warning on line 44 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused import copy (unused-import)
import math

Check warning on line 45 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused import math (unused-import)
import functools
import operator
from logging import Logger
from typing import Optional

Check warning on line 49 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused Optional imported from typing (unused-import)

class Node:
"""A class representing a node to which a set of ranks are assigned."""

def __init__(
self,
logger: Logger,
n_id: int = -1):

# Assign logger to instance variable
self.__logger = logger #pylint:disable=unused-private-member

# Member variables passed by constructor
self.__index = n_id

Check warning on line 63 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `Node.__index` (unused-private-member)
self.__ranks = set()

def get_max_memory_usage(self):

Check notice on line 66 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Missing function or method docstring (missing-function-docstring)
# Combine all memory usages for each rank to get the node memory usage
return functools.reduce(operator.add, map(lambda a: a.get_max_memory_usage(), list(self.__ranks)))

def add_rank(self, rank):

Check notice on line 70 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Missing function or method docstring (missing-function-docstring)
self.__ranks.add(rank)

def get_number_of_ranks(self) -> int:

Check notice on line 73 in src/lbaf/Model/lbsNode.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Missing function or method docstring (missing-function-docstring)
return len(self.__ranks)
16 changes: 13 additions & 3 deletions src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

from .lbsBlock import Block
from .lbsObject import Object

from .lbsNode import Node

class Rank:

Check notice on line 52 in src/lbaf/Model/lbsRank.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many instance attributes (8/7) (too-many-instance-attributes)

Check notice on line 52 in src/lbaf/Model/lbsRank.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many public methods (35/20) (too-many-public-methods)
"""A class representing a rank to which objects are assigned."""
Expand All @@ -57,7 +57,8 @@ def __init__(
logger: Logger,
r_id: int = -1,
mo: set = None,
so: set = None):
so: set = None,
node : Node = None):

# Assign logger to instance variable
self.__logger = logger #pylint:disable=unused-private-member
Expand All @@ -82,12 +83,21 @@ def __init__(
# Start with empty metadata
self.__metadata = {}

def copy(self, rank):
# Optionally, the rank is connected to a node
self.node = node
if node is not None:
node.add_rank(self)

def copy(self, rank, nodes=[], ranks_per_node=1):

Check warning on line 91 in src/lbaf/Model/lbsRank.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Dangerous default value [] as argument (dangerous-default-value)
"""Specialized copy method."""
# Copy all flat member variables
self.__index = rank.get_id()
self.__size = rank.get_size()

if len(nodes) > 0 and ranks_per_node > 1:
self.node = nodes[int(self.__index / ranks_per_node)]
self.node.add_rank(self)

# Shallow copy owned objects
self.__shared_blocks = copy.copy(rank.__shared_blocks)
self.__sentinel_objects = copy.copy(rank.__sentinel_objects)
Expand Down

0 comments on commit c824fa3

Please sign in to comment.