Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

558 Add optional node-level memory constraint instead of rank-level constraint to loosen #559

Draft
wants to merge 6 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class InternalParameters:

# From data input options
data_stem: Optional[str] = None
ranks_per_node : Optional[int] = 1

# From samplers input options
n_ranks: Optional[int] = None
Expand Down Expand Up @@ -135,10 +136,15 @@ def init_parameters(self, config: dict, base_dir: str):
if param_key in self.__allowed_config_keys:
self.__dict__[param_key] = param_val

# Assume 1 rank per node by default
self.ranks_per_node = 1

# Parse data parameters if present
from_data = config.get("from_data")
if from_data is not None:
self.data_stem = from_data.get("data_stem")
if from_data.get("ranks_per_node") is not None:
self.ranks_per_node = from_data.get("ranks_per_node")
# # get data directory (because data_stem includes file prefix)
data_dir = f"{os.sep}".join(self.data_stem.split(os.sep)[:-1])
file_prefix = self.data_stem.split(os.sep)[-1]
Expand Down Expand Up @@ -519,7 +525,8 @@ def run(self, cfg=None, cfg_dir=None):
logger=self.__logger,
file_suffix=file_suffix if file_suffix is not None else "json",
check_schema=check_schema,
expected_ranks=self.__parameters.expected_ranks)
expected_ranks=self.__parameters.expected_ranks,
ranks_per_node=self.__parameters.ranks_per_node)

# Retrieve n_ranks
n_ranks = reader.n_ranks
Expand Down
18 changes: 15 additions & 3 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@
#
import abc
import os
from typing import Set
from typing import Set, List

from ..IO.lbsStatistics import compute_function_statistics
from ..Model.lbsRank import Rank
from ..Model.lbsObject import Object
from ..Model.lbsNode import Node
from ..Model.lbsPhase import Phase
from ..Model.lbsWorkModelBase import WorkModelBase
from ..Utils.lbsLogging import Logger
Expand Down Expand Up @@ -92,7 +93,7 @@
self.__statistics = {
("ranks", lambda x: x.get_load()): {
"maximum load": "maximum"},
("ranks", lambda x: self._work_model.compute(x)): {

Check warning on line 96 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Lambda may not be necessary (unnecessary-lambda)
"total work": "sum"}}

def get_rebalanced_phase(self):
Expand Down Expand Up @@ -177,14 +178,25 @@

# Try to copy ranks from phase to be rebalanced to processed one
try:
ranks_per_node = 1
new_nodes: List[Node] = []
new_ranks: Set[Rank] = set()

if len(phases[p_id].get_ranks()) > 0 and phases[p_id].get_ranks()[0].node is not None:
ranks_per_node = phases[p_id].get_ranks()[0].node.get_number_of_ranks()
if ranks_per_node > 1:
n_nodes = int(len(phases[p_id].get_ranks()) / ranks_per_node)
new_nodes = list(map(
lambda n_id: Node(self._logger, n_id),
list(range(0, n_nodes))))
for r in phases[p_id].get_ranks():
# Minimally instantiate rank and copy
new_r = Rank(self._logger)
new_r.copy(r)
new_r.copy(r, new_nodes, ranks_per_node)
new_ranks.add(new_r)
self._rebalanced_phase.set_ranks(new_ranks)
self._rebalanced_phase.set_ranks(new_ranks)
except Exception as err:
print(f"{err}")
self._logger.error(f"No phase with index {p_id} is available for processing")
raise SystemExit(1) from err
self._logger.info(
Expand Down
113 changes: 107 additions & 6 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

from .lbsTransferStrategyBase import TransferStrategyBase
from ..Model.lbsRank import Rank
from ..Model.lbsObject import Object
from ..Model.lbsPhase import Phase


Expand All @@ -68,6 +69,7 @@

# Determine whether subclustering is performed immediately after swapping
self.__separate_subclustering = parameters.get("separate_subclustering", False)
self.__disable_subclustering = parameters.get("subclustering_disabled", False)
self._logger.info(
f"Enter subclustering immediately after cluster swapping: {self.__separate_subclustering}")

Expand Down Expand Up @@ -99,7 +101,7 @@
self.__n_sub_skipped, self.__n_sub_transfers, self.__n_sub_tries = 0, 0, 0

def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict:
"""Cluster migratiable objects by shared block ID when available."""
"""Cluster migratable objects by shared block ID when available."""
# Iterate over all migratable objects on rank
clusters = {None: []} if with_nullset else {}
for o in rank.get_migratable_objects():
Expand All @@ -116,6 +118,104 @@
k: clusters[k]
for k in random.sample(list(clusters.keys()), len(clusters))}

def __build_rank_subclusters_heuristic(self, r_src : Rank, r_try : Rank, ave_load : float) -> set:
"""Build a limited set of subclusters using a heuristic to find possible
transfers without a huge amount of computational complexity."""

# Bail out early if no clusters are available
if not (clusters := self.__build_rank_clusters(r_src, False)):
self._logger.info(f"No migratable clusters on rank {r_src.get_id()}")
return []

src_load = r_src.get_load()
try_load = r_try.get_load()

subclusters = {}

amount_over_average = src_load - ave_load
amount_under_average = ave_load - try_load

# print(f"amount_over_average={amount_over_average}, amount_under_average={amount_under_average}")

clusters_to_subcluster = {}

for k in clusters.keys():
v = clusters[k]
cluster_load : float = 0.0
for o in v:
cluster_load += o.get_load()
#print(f"k={k}: cluster_load={cluster_load}")
if cluster_load < amount_over_average or cluster_load < amount_under_average:
pass
else:
clusters_to_subcluster[k] = v

def comparison(elm : Object):
return elm.get_load()

# print(f"clusters_to_subcluster={clusters_to_subcluster}")

for k in clusters_to_subcluster.keys():
v = clusters_to_subcluster[k]
# print(f"k={k}")

v.sort(key=comparison)
# print(f"sorted v={v}")

load_sum : float = 0.0
cluster_set : list = []
for o in v:
load_sum += o.get_load()
cluster_set.append(o)

if load_sum > amount_under_average:
break

subclusters[tuple(cluster_set)] = load_sum

return sorted(subclusters.keys(), key=subclusters.get)

def __transfer_subclusters_heuristic(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None:
"""Perform feasible subcluster transfers from given rank to possible targets."""

# Only do this if this rank is above the mean
if not r_src.get_load() > ave_load:
return

for r_try in targets:
# Only consider transferring here if this rank is under the mean load
if r_try.get_load() < ave_load:
for o_src in self.__build_rank_subclusters_heuristic(r_src, r_try, ave_load):
c_try = self._criterion.compute(r_src, o_src, r_try)
#print(f"o_src={o_src}, c_try={c_try}")

objects_load = sum(o.get_load() for o in o_src)
l_dst = math.inf

# Additional filters prior to subclustering
if c_try <= self.__subclustering_minimum_improvement * r_src.get_load() or \
r_src.get_load() < self.__subclustering_threshold * max_load:
continue

l_try = abs(r_try.get_load() + objects_load - ave_load)
if l_try < l_dst:
c_dst, r_dst, l_dst = c_try, r_try, l_try
elif l_try == l_dst and c_try > c_dst:
c_dst, r_dst = c_try, r_try
Comment on lines +200 to +204
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we handle the else condition of this block? (There is a pylint error because r_dst might not be defined)


# Decide whether transfer is beneficial
self.__n_sub_tries += 1
if c_dst > 0.0:
# Transfer subcluster and break out
self._logger.info(
f"Transferring subcluster with {len(o_src)} objects to rank {r_dst.get_id()}")

Check failure on line 211 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Possibly using variable 'r_dst' before assignment (possibly-used-before-assignment)
self._n_transfers += phase.transfer_objects(
r_src, o_src, r_dst)
self.__n_sub_transfers += 1
break
# Reject subcluster transfer
self._n_rejects += len(o_src)

def __build_rank_subclusters(self, r_src: Rank) -> set:
"""Build subclusters to bring rank closest and above average load."""

Expand Down Expand Up @@ -145,11 +245,11 @@
for p in range(1, n_o + 1)) if self._deterministic_transfer else (
tuple(random.sample(v, p))
for p in nr.binomial(n_o, 0.5, min(n_o, self.__max_subclusters)))):

# Reject subclusters overshooting within relative tolerance
reach_load = src_load - sum(o.get_load() for o in c)
if reach_load < (1.0 - self.__cluster_swap_rtol) * self._average_load:
continue

# Retain subclusters with their respective distance and cluster
subclusters[c] = reach_load

Expand Down Expand Up @@ -203,7 +303,7 @@
# Return number of swaps performed from rank
n_rank_swaps = 0

def __transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float) -> None:

Check warning on line 306 in src/lbaf/Execution/lbsClusteringTransferStrategy.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `ClusteringTransferStrategy.__transfer_subclusters(self, phase: Phase, r_src: Rank, targets: set, ave_load: float, max_load: float)` (unused-private-member)
"""Perform feasible subcluster transfers from given rank to possible targets."""
# Iterate over source subclusters
for o_src in self.__build_rank_subclusters(r_src):
Expand Down Expand Up @@ -279,23 +379,24 @@
self.__n_sub_skipped += 1
continue

# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)
if not self.__disable_subclustering:
# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters_heuristic(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")

# Perform subclustering when it was not previously done
if self.__separate_subclustering:
if self.__separate_subclustering and not self.__disable_subclustering:
# In non-deterministic case skip subclustering when swaps passed
if self.__n_swaps and not self._deterministic_transfer:
self.__n_sub_skipped += len(rank_targets)
else:
# Iterate over ranks
for r_src, targets in rank_targets.items():
# Perform feasible subcluster swaps from given rank to possible targets
self.__transfer_subclusters(phase, r_src, targets, ave_load, max_load)
self.__transfer_subclusters_heuristic(phase, r_src, targets, ave_load, max_load)

# Report on new load and exit from rank
self._logger.debug(
Expand Down
7 changes: 6 additions & 1 deletion src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ def __init__(self, config_to_validate: dict, logger: Logger):
Optional("expected_ranks"): And(
int,
lambda x: x > 0,
error="Should be of type 'int' and > 0")
error="Should be of type 'int' and > 0"),
Optional("ranks_per_node"): And(
int,
lambda x: x > 0,
error="Should be of type 'int' and > 0"),
})
self.__from_samplers = Schema({
"n_ranks": And(
Expand Down Expand Up @@ -199,6 +203,7 @@ def __init__(self, config_to_validate: dict, logger: Logger):
str,
lambda e: e in ALLOWED_TRANSFER_STRATEGIES,
error=f"{get_error_message(ALLOWED_TRANSFER_STRATEGIES)} must be chosen"),
Optional("subclustering_disabled"): bool,
Optional("subclustering_threshold"): And(
float,
lambda x: x >= 0.0,
Expand Down
21 changes: 17 additions & 4 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@
from ..Model.lbsObject import Object
from ..Model.lbsObjectCommunicator import ObjectCommunicator
from ..Model.lbsRank import Rank
from ..Model.lbsNode import Node

class LoadReader:

Check notice on line 59 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many instance attributes (10/7) (too-many-instance-attributes)
"""A class to read VT Object Map files. These json files could be compressed with Brotli.

Each file is named as <base-name>.<node>.json, where <node> spans the number of MPI ranks that VT is utilizing.
Expand All @@ -75,7 +76,7 @@
}

def __init__(
self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None):
self, file_prefix: str, logger: Logger, file_suffix: str = "json", check_schema=True, expected_ranks=None, ranks_per_node=1):

Check notice on line 79 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Line too long (133/120) (line-too-long)
# The base directory and file name for the log files
self.__file_prefix = file_prefix

Expand Down Expand Up @@ -108,6 +109,8 @@

# determine the number of ranks
self.n_ranks = self._get_n_ranks()
self.ranks_per_node = ranks_per_node
self.__logger.info(f"Ranks per node: {ranks_per_node}")
self.__logger.info(f"Number of ranks: {self.n_ranks}")

# warn user if expected_ranks is set and is different from n_ranks
Expand Down Expand Up @@ -199,7 +202,7 @@
# Return rank ID and data dictionary
return rank_id, decompressed_dict

def _populate_rank(self, phase_id: int, rank_id: int) -> Tuple[Rank,dict]:
def _populate_rank(self, phase_id: int, rank_id: int, nodes : List[Node]) -> Tuple[Rank,dict]:

Check notice on line 205 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many local variables (40/15) (too-many-locals)

Check notice on line 205 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many branches (19/12) (too-many-branches)

Check notice on line 205 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many statements (67/50) (too-many-statements)
""" Populate rank and its communicator in phase using the JSON content."""

# Seek phase with given ID
Expand Down Expand Up @@ -267,7 +270,10 @@
self.__communications_dict.setdefault(phase_id, {rank_id: {}})

# Instantiante rank for current phase
phase_rank = Rank(self.__logger, rank_id)
node = None
if self.ranks_per_node > 1:
node = nodes[int(rank_id / self.ranks_per_node)]
phase_rank = Rank(self.__logger, rank_id, node=node)
phase_rank.set_metadata(self.__metadata[rank_id])

# Initialize storage for shared blocks information
Expand Down Expand Up @@ -339,17 +345,24 @@
# Returned rank and communicators per phase
return phase_rank, rank_comm

def populate_phase(self, phase_id: int) -> List[Rank]:

Check notice on line 348 in src/lbaf/IO/lbsVTDataReader.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Too many local variables (20/15) (too-many-locals)
""" Populate phase using the JSON content."""

# Create storage for ranks
ranks: List[Rank] = [None] * self.n_ranks
communications = {}

nodes : List[Node] = []
if self.ranks_per_node > 1:
n_nodes = int(self.n_ranks / self.ranks_per_node)
nodes = list(map(
lambda n_id: Node(self.__logger, n_id),
list(range(0, n_nodes))))

# Iterate over all ranks
for rank_id in range(self.n_ranks):
# Read data for given phase and assign it to rank
ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id)
ranks[rank_id], rank_comm = self._populate_rank(phase_id, rank_id, nodes)

# Merge rank communication with existing ones
for k, v in rank_comm.items():
Expand Down
13 changes: 9 additions & 4 deletions src/lbaf/Model/lbsAffineCombinationWorkModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ def compute(self, rank: Rank):
alpha * load + beta * max(sent, received) + gamma,
under optional strict upper bounds.
"""
# Check whether strict bounds are satisfied
for k, v in self.__upper_bounds.items():
if getattr(rank, f"get_{k}")() > v:
return math.inf
if rank.node is None:
# Check whether strict bounds are satisfied
for k, v in self.__upper_bounds.items():
if getattr(rank, f"get_{k}")() > v:
return math.inf
else:
for k, v in self.__upper_bounds.items():
if getattr(rank.node, f"get_{k}")() > v * rank.node.get_number_of_ranks():
return math.inf

# Return combination of load and volumes
return self.affine_combination(
Expand Down
Loading
Loading