Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#482: Improve and complete JSON writing #483

Merged
merged 14 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/lbaf/Applications/LBAF_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,11 +517,11 @@ def run(self):
self.__parameters.rank_qoi if self.__parameters.rank_qoi is not None else '',
self.__parameters.object_qoi if self.__parameters.object_qoi is not None else '')

# Execute runtime for specified phases, -1 for all phases
# Execute runtime for specified phases
offline_LB_compatible = self.__parameters.json_params.get( # pylint:disable=C0103:invalid-name;not lowercase
"offline_LB_compatible", False)
rebalanced_phase = runtime.execute(
self.__parameters.algorithm.get("phase_id", -1),
self.__parameters.algorithm.get("phase_id", 0),
offline_LB_compatible)

# Instantiate phase to VT file writer when requested
Expand Down
12 changes: 11 additions & 1 deletion src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
# Initially no phase is assigned for processing
self._rebalanced_phase = None

# Save the initial communications data
self._initial_communications = {}

# Map global statistical QOIs to their computation methods
self.__statistics = {
("ranks", lambda x: x.get_load()): {
Expand All @@ -76,6 +79,10 @@
"""Return phased assigned for processing by algoritm."""
return self._rebalanced_phase

def get_initial_communications(self):
"""Return the initial phase communications."""
return self._initial_communications

@staticmethod
def factory(
algorithm_name:str,
Expand Down Expand Up @@ -211,11 +218,14 @@
self._logger.error("Algorithm execution requires a dictionary of phases")
raise SystemExit(1)

# Set initial communications for given rank
self._initial_communications[p_id] = phases[p_id].get_communications()

# Create a new phase to preserve phase to be rebalanced
self._logger.info(f"Creating new phase {p_id} for rebalancing")
self._rebalanced_phase = Phase(self._logger, p_id)

# Try to copy ranks from phase to be rebalanced to processd one
# Try to copy ranks from phase to be rebalanced to processed one
try:
new_ranks = []
for r in phases[p_id].get_ranks():
Expand Down Expand Up @@ -247,4 +257,4 @@
:param: a_min_max: possibly empty list of optimal arrangements.
"""

pass

Check warning on line 260 in src/lbaf/Execution/lbsAlgorithmBase.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unnecessary pass statement (unnecessary-pass)
3 changes: 1 addition & 2 deletions src/lbaf/Execution/lbsPhaseStepperAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, work_model, parameters: dict, lgr: Logger, rank_qoi: str, obj
super(PhaseStepperAlgorithm, self).__init__(work_model, parameters, lgr, rank_qoi, object_qoi)

def execute(self, _, phases: list, distributions: dict, statistics: dict, __):
"""Execute brute force optimization algorithm on all phases."""
"""Steps through all phases."""

# Ensure that a list with at least one phase was provided
if not isinstance(phases, dict) or not all(
Expand All @@ -31,7 +31,6 @@ def execute(self, _, phases: list, distributions: dict, statistics: dict, __):
for p_id, self._rebalanced_phase in phases.items():
# Step through current phase
self._logger.info(f"Stepping through phase {p_id}")
self._logger.info(f"Stepping through phase {p_id}")

# Compute and report phase rank work statistics
print_function_statistics(
Expand Down
7 changes: 6 additions & 1 deletion src/lbaf/Execution/lbsRuntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def get_statistics(self):
return self.__statistics

def execute(self, p_id: int, phase_increment=0):
"""Execute runtime for single phase with given ID or all (-1)."""
"""Execute runtime for single phase with given ID or multiple phases in selected range."""
# Execute balancing algorithm
self.__logger.info(
f"Executing {type(self.__algorithm).__name__} for "
Expand All @@ -104,5 +104,10 @@ def execute(self, p_id: int, phase_increment=0):
# Retrieve possibly null rebalanced phase and return it
if (pp := self.__algorithm.get_rebalanced_phase()):
pp.set_id((pp_id := pp.get_id() + phase_increment))

# Share communications from original phase with new phase
initial_communications = self.__algorithm.get_initial_communications()
pp.set_communications(initial_communications[p_id])

self.__logger.info(f"Created rebalanced phase {pp_id}")
return pp
29 changes: 27 additions & 2 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import re
from logging import Logger
from multiprocessing import get_context
from multiprocessing import get_context, Manager
from multiprocessing.pool import Pool

import brotli
Expand Down Expand Up @@ -47,6 +47,13 @@ def __init__(self, file_prefix: str, logger: Logger, file_suffix: str = "json",
# Assign schema checker
self.__check_schema = check_schema

# Save initial communications array from every rank
self.__communications_dict = {}

# Save metadata dict
manager = Manager()
self.__metadata = manager.dict()

# imported JSON_data_files_validator module (lazy import)
if LoadReader.SCHEMA_VALIDATOR_CLASS is None:
from ..imported.JSON_data_files_validator import \
Expand Down Expand Up @@ -129,6 +136,9 @@ def _load_vt_file(self, rank_id: int):
raise SystemExit(1)
self.__logger.debug(f"{file_name} has type {schema_type}")

# Save metadata
self.__metadata[rank_id] = metadata

# Checking Schema from configuration
if self.__check_schema:
# Validate schema
Expand Down Expand Up @@ -175,6 +185,10 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:
rank_comm = {}
communications = phase.get("communications") # pylint:disable=W0631:undefined-loop-variable
if communications:
if phase_id in self.__communications_dict:
self.__communications_dict[phase_id][rank_id] = communications
else:
self.__communications_dict[phase_id] = {rank_id: communications}
for num, comm in enumerate(communications):
# Retrieve communication attributes
c_type = comm.get("type")
Expand Down Expand Up @@ -209,6 +223,7 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:

# Instantiante rank for current phase
phase_rank = Rank(self.__logger, rank_id)
phase_rank.set_metadata(self.__metadata[rank_id])

# Initialize storage for shared blocks information
rank_blocks, task_user_defined = {}, {}
Expand All @@ -221,6 +236,8 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:
task_load = task.get("time")
task_user_defined = task.get("user_defined", {})
subphases = task.get("subphases")
collection_id = task_entity.get("collection_id")
index = task_entity.get("index")

# Instantiate object with retrieved parameters
o = Object(
Expand All @@ -244,6 +261,14 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:
else:
phase_rank.add_sentinel_object(o)

# Add dict of currently unused parameters
unused_params = {}
if collection_id:
unused_params["collection_id"] = collection_id
if index:
unused_params["index"] = index
o.set_unused_params(unused_params)

# Print debug information when requested
self.__logger.debug(
f"Added object {task_id}, load: {task_load} to phase {curr_phase_id}")
Expand Down Expand Up @@ -314,4 +339,4 @@ def populate_phase(self, phase_id: int) -> list:
i=obj_id, logger=self.__logger, r=received, s=sent))

# Return populated list of ranks
return ranks
return ranks, self.__communications_dict[phase_id]
137 changes: 126 additions & 11 deletions src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def __init__(

# Useful fields
self.__rank_phases = None
self.__phases = None

# Set up mp manager
manager = mp.Manager()
self.__moved_comms = manager.list()
# self.__to_remove = manager.list()

# Assign internals
self.__file_stem = stem
Expand All @@ -51,6 +57,7 @@ def __create_tasks(self, rank_id, objects, migratable):
"""Create per-object entries to be outputted to JSON."""
tasks = []
for o in objects:
unused_params = o.get_unused_params()
task_data = {
"entity": {
"home": rank_id,
Expand All @@ -62,41 +69,147 @@ def __create_tasks(self, rank_id, objects, migratable):
"resource": "cpu",
"time": o.get_load()
}
if unused_params:
task_data["entity"].update(unused_params)

user_defined = o.get_user_defined()
if user_defined:
task_data["user_defined"] = user_defined

subphases = o.get_subphases()
if subphases:
task_data["subphases"] = subphases

tasks.append(task_data)

return tasks

def __get_communications(self, phase, rank):
"""Create communication entries to be outputted to JSON."""

def _json_writer(self, rank_phases_double) -> str:
# Get initial communications (if any) for current phase
phase_communications_dict = phase.get_communications()

# Add empty entries for ranks with no initial communication
if rank.get_id() not in phase_communications_dict:
phase_communications_dict[rank.get_id()] = {}

# Get original communications on current rank
initial_on_rank_communications = phase_communications_dict[rank.get_id()]

# Get all objects on current rank
rank_objects = rank.get_object_ids()

# Initialize final communications
communications = []

# Ensure all objects are on the correct rank
if initial_on_rank_communications:
for comm_dict in initial_on_rank_communications:
if "migratable" in comm_dict["from"].keys() and not comm_dict["from"]["migratable"]: # object is sentinel
communications.append(comm_dict)
elif comm_dict["from"]["id"] in rank_objects:
communications.append(comm_dict)
else:
self.__moved_comms.append(comm_dict)

# Loop through any moved objects to find the correct rank
if self.__moved_comms:
for moved_dict in self.__moved_comms:
if moved_dict["from"]["id"] in rank_objects:
communications.append(moved_dict)

return communications

def serialize(self, phases: dict) -> list:
cwschilly marked this conversation as resolved.
Show resolved Hide resolved
""" Create list of serialized JSON per """

# Ensure that provided phases is a dictionary of Phase instances
if not isinstance(phases, dict) or not all(
[isinstance(p, Phase) for p in phases.values()]):
self.__logger.error(
"serialize must be passed a dictionary of phases")
raise SystemExit(1)

self.__phases = phases

# Assemble mapping from ranks to their phases
rank_phases = {}
for p in phases.values():
for r in p.get_ranks():
rank_phases.setdefault(r, {})
rank_phases[r.get_id()][p.get_id()] = r

# Serialize each rank's data
serialized_data_list = []
for rank_phases_double in rank_phases.items():
serialized_data = self._json_serializer(rank_phases_double)
serialized_data_list.append(serialized_data)

return serialized_data_list

def _json_serializer(self, rank_phases_double) -> str:
"""Write one JSON per rank for list of phase instances."""
# Unpack received double
r_id, r_phases = rank_phases_double

# Create file name for current rank
file_name = f"{self.__file_stem}.{r_id}.{self.__extension}"
self.__logger.debug(f"Writing {file_name}")
# Get current rank
for p_id, rank in r_phases.items():
if rank.get_id() == r_id:
current_rank = rank

# Get metadata
if current_rank.get_metadata():
metadata = current_rank.get_metadata()
else:
metadata = {
"type": "LBDatafile",
"rank": r_id
}

# Initialize output dict
output = {
"metadata": {
"type": "LBDatafile",
"rank": r_id},
"metadata": metadata,
"phases": []}

# Iterate over phases
for p_id, rank in r_phases.items():
# Get current phase
current_phase = self.__phases.get(p_id)

# Create data to be outputted for current phase
self.__logger.debug(f"Writing phase {p_id} for rank {r_id}")
phase_data = {"id": p_id}
phase_data["tasks"] = self.__create_tasks(
r_id, rank.get_migratable_objects(), migratable=True) + self.__create_tasks(
r_id, rank.get_sentinel_objects(), migratable=False)
phase_data= {"id": p_id,
"tasks":
self.__create_tasks(
r_id, rank.get_migratable_objects(), migratable=True) +
self.__create_tasks(
r_id, rank.get_sentinel_objects(), migratable=False),
}

# Add communication data (if present)
communications = self.__get_communications(current_phase, rank)
if communications:
phase_data["communications"] = communications

output["phases"].append(phase_data)

# Serialize and possibly compress JSON payload
serial_json = json.dumps(output, separators=(',', ':'))
return serial_json

def _json_writer(self, rank_phases_double) -> str:
"""Write one JSON per rank for list of phase instances."""
# Unpack received double
r_id, r_phases = rank_phases_double

# Create file name for current rank
file_name = f"{self.__file_stem}.{r_id}.{self.__extension}"
self.__logger.debug(f"Writing {file_name}")

# Serialize and possibly compress JSON payload
serial_json = self._json_serializer(rank_phases_double)

if self.__compress:
serial_json = brotli.compress(
string=serial_json.encode("utf-8"), mode=brotli.MODE_TEXT)
Expand All @@ -116,6 +229,8 @@ def write(self, phases: dict):
"JSON writer must be passed a dictionary of phases")
raise SystemExit(1)

self.__phases = phases

# Assemble mapping from ranks to their phases
self.__rank_phases = {}
for p in phases.values():
Expand Down
11 changes: 11 additions & 0 deletions src/lbaf/Model/lbsObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def __init__(
self.__overhead = 0.0
self.__shared_block = None

# Initialize currently unused parameters (for writing back out)
self.__unused_params = {}

# Retrieve and set optionally defined fields
if isinstance(user_defined, dict) or user_defined is None:
self.__user_defined = user_defined
Expand Down Expand Up @@ -163,3 +166,11 @@ def set_communicator(self, c) -> None:
def get_subphases(self) -> list:
"""Return subphases of this object."""
return self.__subphases

def set_unused_params(self, unused_params: dict):
"""Assign any extraneous parameters."""
self.__unused_params = unused_params

def get_unused_params(self) -> dict:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the way we dynamically get all Object QOI, this will now list unused_params as a QOI

"""Return all current unused parameters."""
return self.__unused_params
Loading
Loading