Skip to content

Commit

Permalink
#464: optimized transfer swapping logic and improved instrumentation …
Browse files Browse the repository at this point in the history
…and stata
  • Loading branch information
ppebay committed Oct 13, 2023
1 parent 99e2b1b commit daed0c1
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 104 deletions.
1 change: 1 addition & 0 deletions config/challenging-toy-fewer-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from_data:
phase_ids:
- 0
check_schema: true
overwrite_validator: false

# Specify work model
work_model:
Expand Down
3 changes: 2 additions & 1 deletion config/challenging-toy-hundreds-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ from_data:
data_stem: ../data/challenging_toy_hundreds_tasks/toy
phase_ids:
- 0
check_schema: False
check_schema: true
overwrite_validator: false

# Specify work model
work_model:
Expand Down
100 changes: 52 additions & 48 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@ class InternalParameters:

__logger: Logger

# Input options
# General input options
check_schema: Optional[bool] = None
output_dir: Optional[str] = None
output_file_stem: Optional[str] = None
file_suffix: Optional[str] = None
# - from_data options

# From data input options
data_stem: Optional[str] = None
# - from_samplers options

# From samplers input options
n_ranks: Optional[int] = None
n_objects: Optional[int] = None
n_mapped_ranks: Optional[int] = None
communication_degree: Optional[int] = None
load_sampler: Optional[dict] = None
volume_sampler: Optional[dict] = None

# Viz options
# Visualization options
rank_qoi: Optional[str] = None
object_qoi: Optional[str] = None
grid_size: Optional[list] = None

# work_model options
# Load-balancing options
work_model: Optional[Dict[str, dict]] = None

# algorithm options
algorithm: Dict[str, Any]

def __init__(self, config: dict, base_dir: str, logger: Logger):
Expand Down Expand Up @@ -173,19 +173,18 @@ def __init__(self):

def __parse_args(self):
"""Parse arguments."""
parser = PromptArgumentParser(allow_abbrev=False,
description="Run a Load-Balancing Simulation with some configuration",
prompt_default=False)
parser.add_argument("-c", "--configuration",
help="Path to the config file. If path is relative it must be resolvable from either the "
"current working directory or the config directory",
default="conf.yaml",
)
parser.add_argument("-v", "--verbose",
help="Verbosity level. If 1, print all possible rank QOI. If 2, print all possible rank "
"and object QOI.",
default="0"
)
parser = PromptArgumentParser(
allow_abbrev=False,
description="Run a Load-Balancing Simulation with some configuration",
prompt_default=False)
parser.add_argument(
"-c", "--configuration",
help="Path to the config file. If path is relative it must be resolvable from either the current working directory or the config directory",
default="conf.yaml")
parser.add_argument(
"-v", "--verbose",
help="Verbosity level. If 1, print all possible rank QOI. If 2, print all possible rank and object QOI.",
default="0")
self.__args = parser.parse_args()

def __read_configuration_file(self, path: str):
Expand All @@ -196,8 +195,7 @@ def __read_configuration_file(self, path: str):
data = yaml.safe_load(file_io)
if not data.get("overwrite_validator", True):
self.__logger.info(
f"Option 'overwrite_validator' in configuration file: {path} is set to False"
)
f"Option 'overwrite_validator' in configuration file: {path} is set to False")
except yaml.MarkedYAMLError as err:
err_line = err.problem_mark.line if err.problem_mark is not None else -1
self.__logger.error(
Expand Down Expand Up @@ -249,15 +247,18 @@ def __configure(self, *config_path):
name="lbaf",
level=lvl,
log_to_console=config.get("log_to_console", None) is None,
log_to_file=None if log_to_file is None else abspath(config.get("log_to_file"), relative_to=config_dir)
log_to_file=None if log_to_file is None else abspath(
config.get("log_to_file"), relative_to=config_dir)
)
self.__logger.info(f"Logging level: {lvl.lower()}")
if log_to_file is not None:
log_to_file_path = abspath(config.get("log_to_file"), relative_to=config_dir)
log_to_file_path = abspath(
config.get("log_to_file"), relative_to=config_dir)
self.__logger.info(f"Logging to file: {log_to_file_path}")

# Instantiate the application internal parameters
self.__parameters = InternalParameters(config=config, base_dir=config_dir, logger=self.__logger)
self.__parameters = InternalParameters(
config=config, base_dir=config_dir, logger=self.__logger)

# Create VT writer except when explicitly turned off
self.__json_writer = VTDataWriter(
Expand All @@ -275,7 +276,7 @@ def __resolve_config_path(self, config_path) -> str:
:raises FileNotFoundError: if configuration file cannot be found
"""
# search config file in the current working directory if relative
# Search config file in the current working directory if relative
path = config_path
path_list = []
path_list.append(path)
Expand All @@ -284,19 +285,17 @@ def __resolve_config_path(self, config_path) -> str:
not os.path.isfile(path) and
not os.path.isabs(config_path) and PROJECT_PATH is not None
):
# then search config file relative to the config folder
# Then search config file relative to the config folder
search_dir = abspath("config", relative_to=PROJECT_PATH)
path = search_dir + '/' + config_path
path_list.append(path)

# Verify path correctness
if not os.path.isfile(path):
error_message = "The configuration file cannot be found." \
" If you provide a relative path, please verify that the file exists in the " \
"current working directory or in the `<project_path>/config` directory"
raise FileNotFoundError(error_message)
raise FileNotFoundError(
"Configuration file not found. If a relative path was provided the file may not exist in current working directory or in the `<project_path>/config` directory")
else:
self.__logger.info(f"Found configuration file at path {path}")

return path

def __print_statistics(self, phase: Phase, phase_name: str):
Expand Down Expand Up @@ -345,21 +344,24 @@ def __print_QOI(self) -> int: # pylint:disable=C0103:invalid-name # not snake c

# Initialize file paths
current_script_path = os.path.abspath(__file__)
target_dir = os.path.join(os.path.dirname(os.path.dirname(current_script_path)), "Model")
target_dir = os.path.join(
os.path.dirname(os.path.dirname(current_script_path)), "Model")
rank_script_name = "lbsRank.py"
object_script_name = "lbsObject.py"

# Create list of all Rank QOI (Rank.get_*)
r_qoi_list = ["work"]
lbs_rank_file = open(os.path.join(target_dir, rank_script_name), 'r', encoding="utf-8")
lbs_rank_file = open(
os.path.join(target_dir, rank_script_name), 'r', encoding="utf-8")
lbs_rank_lines = lbs_rank_file.readlines()
for line in lbs_rank_lines:
if line[8:12] == "get_":
r_qoi_list.append(line[12:line.find("(")])

# Create list of all Object QOI (Object.get_*)
o_qoi_list = []
lbs_object_file = open(os.path.join(target_dir, object_script_name), 'r', encoding="utf-8")
lbs_object_file = open(
os.path.join(target_dir, object_script_name), 'r', encoding="utf-8")
lbs_object_lines = lbs_object_file.readlines()
for line in lbs_object_lines:
if line[8:12] == "get_":
Expand Down Expand Up @@ -412,7 +414,7 @@ def run(self):
# Apply configuration
cfg = self.__configure(*config_file_list)

# Download JSON data files validator (JSON data files validator is required to continue)
# Download of JSON data files validator required to continue
loader = JSONDataFilesValidatorLoader()
if loader.run(cfg.get("overwrite_validator", True)) != 0:
raise RuntimeError("The JSON data files validator must be loaded to run the application")
Expand All @@ -436,9 +438,8 @@ def run(self):
# Check schema
check_schema = True if "check_schema" not in self.__parameters.__dict__ else self.__parameters.check_schema

reader = None # type: Optional(LoadReader)

n_ranks = None
# Initialize variables
reader, n_ranks = None, None

# Populate phase depending on chosen mechanism
if self.__parameters.data_stem:
Expand All @@ -452,7 +453,8 @@ def run(self):
file_suffix=file_suffix if file_suffix is not None else "json",
check_schema=check_schema,
expected_ranks=self.__parameters.expected_ranks)
# retrieve n_ranks

# Retrieve n_ranks
n_ranks = reader.n_ranks

# Iterate over phase IDs
Expand Down Expand Up @@ -488,8 +490,8 @@ def run(self):
self.__parameters.work_model.get("parameters", {}).get(k)
for k in ("alpha", "beta", "gamma")
]
_n_a, _w_min_max, a_min_max = lbstats.compute_min_max_arrangements_work(objects, alpha, beta, gamma,
n_ranks, logger=self.__logger)
_n_a, _w_min_max, a_min_max = lbstats.compute_min_max_arrangements_work(
objects, alpha, beta, gamma, n_ranks, logger=self.__logger)
else:
self.__logger.info("No brute force optimization performed")
a_min_max = []
Expand Down Expand Up @@ -519,7 +521,7 @@ def run(self):
self.__logger.warning(
"No rebalancing took place for offline load-balancing")
else:
# Determine if a phase with the same index was present
# Determine if a phase with same index was present
if _existing_phase := phases.get(p_id := rebalanced_phase.get_id()):
# Apply object timings to rebalanced phase
self.__logger.info(
Expand Down Expand Up @@ -551,11 +553,13 @@ def run(self):
raise SystemExit(1)

# Look for prescribed QOI bounds
qoi_request = [
self.__parameters.rank_qoi,
self.__parameters.work_model.get("parameters", {}).get("upper_bounds", {}).get(self.__parameters.rank_qoi),
self.__parameters.object_qoi
]
qoi_request = [self.__parameters.rank_qoi]
qoi_request.append(
self.__parameters.work_model.get(
"parameters").get(
"upper_bounds", {}).get(
self.__parameters.rank_qoi))
qoi_request.append(self.__parameters.object_qoi)

# Instantiate and execute visualizer
visualizer = Visualizer(
Expand Down
5 changes: 4 additions & 1 deletion src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ def _initialize(self, p_id, phases, distributions, statistics):
self._logger.error("Algorithm execution requires a dictionary of phases")
raise SystemExit(1)

# Create a rebalanced phase to preserve phase to be rebalanced
# Create a new phase to preserve phase to be rebalanced
self._logger.info(f"Creating new phase {p_id} for rebalancing")
self._rebalanced_phase = Phase(self._logger, p_id)

# Try to copy ranks from phase to be rebalanced to processd one
Expand Down Expand Up @@ -245,3 +246,5 @@ def execute(self, p_id, phases, distributions, statistics, a_min_max):
:param: statistics: dictionary of statistics
:param: a_min_max: possibly empty list of optimal arrangements.
"""

pass
62 changes: 42 additions & 20 deletions src/lbaf/Execution/lbsClusteringTransferStrategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, criterion, parameters: dict, lgr: Logger):
self._logger.info(
f"Relative tolerance for cluster swaps: {self._cluster_swap_rtol}")

def __cluster_objects(self, rank):
def __build_rank_clusters(self, rank):
"""Cluster migratiable objects by shared block ID when available."""
# Iterate over all migratable objects on rank
clusters = {None: []}
Expand Down Expand Up @@ -93,56 +93,68 @@ def execute(self, known_peers, phase: Phase, ave_load: float):
"""Perform object transfer stage."""
# Initialize transfer stage
self._initialize_transfer_stage(ave_load)
n_swaps, n_swap_tries, n_sub_transfers, n_sub_tries = 0, 0, 0, 0

# Iterate over ranks
ranks = phase.get_ranks()
rank_targets = self._get_ranks_to_traverse(ranks, known_peers)
for r_src, targets in rank_targets.items():
# Cluster migratiable objects on source rank
clusters_src = self.__cluster_objects(r_src)
self._logger.info(
f"Constructed {len(clusters_src)} migratable clusters on rank {r_src.get_id()} with load: {r_src.get_load()}")
clusters_src = self.__build_rank_clusters(r_src)
self._logger.debug(
f"Constructed {len(clusters_src)} migratable clusters on source rank {r_src.get_id()}")

# Identify and perform beneficial cluster swaps
n_swaps = 0
n_rank_swaps = 0
for r_try in targets if self._deterministic_transfer else random.sample(
targets, len(targets)):
# Escape targets loop if at least one swap already occurred
if n_swaps:
if n_rank_swaps:
break

# Cluster migratiable objects on target rank
clusters_try = self.__build_rank_clusters(r_try)
self._logger.debug(
f"Constructed {len(clusters_try)} migratable clusters on target rank {r_try.get_id()}")

# Iterate over potential targets to try to swap clusters
for k_src, o_src in clusters_src.items():
# Iterate over target clusters
for k_try, o_try in self.__cluster_objects(r_try).items():
for k_try, o_try in clusters_try.items():
# Decide whether swap is beneficial
c_try = self._criterion.compute(r_src, o_src, r_try, o_try)
n_swap_tries += 1
if c_try > 0.0:
# Compute source cluster size only when necessary
sz_src = sum([o.get_load() for o in o_src])
if c_try > self._cluster_swap_rtol * sz_src:
# Perform swap
self._logger.info(
self._logger.debug(
f"Swapping cluster {k_src} of size {sz_src} with cluster {k_try} on {r_try.get_id()}")
self._n_transfers += phase.transfer_objects(
r_src, o_src, r_try, o_try)
n_swaps += 1
del clusters_try[k_try]
n_rank_swaps += 1
break
else:
# Reject swap
self._n_rejects += len(o_src) + len(o_try)

# Report on swaps when some occurred
if n_swaps:
self._logger.info(
f"New rank {r_src.get_id()} load: {r_src.get_load()} after {n_swaps} cluster swaps")
if n_rank_swaps:
n_swaps += n_rank_swaps
self._logger.debug(
f"New rank {r_src.get_id()} load: {r_src.get_load()} after {n_rank_swaps} cluster swaps")

# In non-deterministic case skip subclustering when swaps passed
if not self._deterministic_transfer:
continue

# Iterate over suitable subclusters only when no swaps were possible
for o_src in self.__find_suitable_subclusters(
self.__cluster_objects(r_src), r_src.get_load()):
self.__build_rank_clusters(r_src), r_src.get_load()):
# Initialize destination information
r_dst, c_dst = None, -math.inf

# Use deterministic or probabilistic transfer method
if self._deterministic_transfer:
# Initialize destination load information
Expand All @@ -164,21 +176,31 @@ def execute(self, known_peers, phase: Phase, ave_load: float):
# Pseudo-randomly select transfer destination
r_dst, c_dst = self._randomly_select_target(
r_src, o_src, targets)
if not r_dst:
self._n_rejects += 1
continue

# Transfer subcluster and break out if best criterion is positive
# Decide whether transfer is beneficial
n_sub_tries += 1
if c_dst > 0.0:
self._logger.info(
# Transfer subcluster and break out
self._logger.debug(
f"Transferring subcluster of size {sum([o.get_load() for o in o_src])} to rank {r_dst.get_id()}")
self._n_transfers += phase.transfer_objects(
r_src, o_src, r_dst)
n_sub_transfers += 1
break
else:
# Reject subcluster transfer
self._n_rejects += len(o_src)

# Report on new load and exit from rank
self._logger.info(
self._logger.debug(
f"Rank {r_src.get_id()} load: {r_src.get_load()} after {self._n_transfers} object transfers")

# Report on global transfer statistics
self._logger.info(
f"Swapped {n_swaps} cluster pairs amongst {n_swap_tries} tries ({100 * n_swaps / n_swap_tries:.2f}%)")
if n_sub_tries:
self._logger.info(
f"Transferred {n_sub_transfers} subcluster amongst {n_sub_tries} tries ({100 * n_sub_transfers / n_sub_tries:.2f}%)")

# Return object transfer counts
return len(ranks) - len(rank_targets), self._n_transfers, self._n_rejects
Loading

0 comments on commit daed0c1

Please sign in to comment.