Skip to content

Commit

Permalink
#482: add new test data and work on supporting multiple phases
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Dec 21, 2023
1 parent 6dc470a commit 7e1a17f
Show file tree
Hide file tree
Showing 15 changed files with 4,650 additions and 126 deletions.
9 changes: 1 addition & 8 deletions src/lbaf/Execution/lbsAlgorithmBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ def __init__(self, work_model: WorkModelBase, parameters: dict, logger: Logger,
# Save the initial communications data
self._initial_communications = {}

# Save initial phase metadata
self._metadata = {}

# Map global statistical QOIs to their computation methods
self.__statistics = {
("ranks", lambda x: x.get_load()): {
Expand All @@ -86,10 +83,6 @@ def get_initial_communications(self):
"""Return the initial phase communications."""
return self._initial_communications

def get_metadata(self):
"""Return the metadata from the original JSON."""
return self._metadata

@staticmethod
def factory(
algorithm_name:str,
Expand Down Expand Up @@ -225,8 +218,8 @@ def _initialize(self, p_id, phases, distributions, statistics):
self._logger.error("Algorithm execution requires a dictionary of phases")
raise SystemExit(1)

# Set initial communications for given rank
self._initial_communications[p_id] = phases[p_id].get_communications()
self._metadata[p_id] = phases[p_id].get_metadata()

# Create a new phase to preserve phase to be rebalanced
self._logger.info(f"Creating new phase {p_id} for rebalancing")
Expand Down
1 change: 0 additions & 1 deletion src/lbaf/Execution/lbsPhaseStepperAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def execute(self, _, phases: list, distributions: dict, statistics: dict, __):
for p_id, self._rebalanced_phase in phases.items():
# Step through current phase
self._logger.info(f"Stepping through phase {p_id}")
self._logger.info(f"Stepping through phase {p_id}")

# Compute and report phase rank work statistics
print_function_statistics(
Expand Down
4 changes: 0 additions & 4 deletions src/lbaf/Execution/lbsRuntime.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,5 @@ def execute(self, p_id: int, phase_increment=0):
initial_communications = self.__algorithm.get_initial_communications()
pp.set_communications(initial_communications[p_id])

# Share original metadata with new phase
metadata = self.__algorithm.get_metadata()
pp.set_metadata(metadata[p_id])

self.__logger.info(f"Created rebalanced phase {pp_id}")
return pp
8 changes: 6 additions & 2 deletions src/lbaf/IO/lbsVTDataReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:
rank_comm = {}
communications = phase.get("communications") # pylint:disable=W0631:undefined-loop-variable
if communications:
self.__communications_dict[rank_id] = communications
if phase_id in self.__communications_dict:
self.__communications_dict[phase_id][rank_id] = communications
else:
self.__communications_dict[phase_id] = {rank_id: communications}
for num, comm in enumerate(communications):
# Retrieve communication attributes
c_type = comm.get("type")
Expand Down Expand Up @@ -220,6 +223,7 @@ def _populate_rank(self, phase_id: int, rank_id: int) -> tuple:

# Instantiante rank for current phase
phase_rank = Rank(self.__logger, rank_id)
phase_rank.set_metadata(self.__metadata[rank_id])

# Initialize storage for shared blocks information
rank_blocks, task_user_defined = {}, {}
Expand Down Expand Up @@ -335,4 +339,4 @@ def populate_phase(self, phase_id: int) -> list:
i=obj_id, logger=self.__logger, r=received, s=sent))

# Return populated list of ranks
return ranks, self.__communications_dict, self.__metadata
return ranks, self.__communications_dict[phase_id]
49 changes: 37 additions & 12 deletions src/lbaf/IO/lbsVTDataWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ def __init__(
# Useful fields
self.__rank_phases = None
self.__phases = None
self.__moved_objs_dicts = []

# Set up mp manager
manager = mp.Manager()
self.__moved_objs_dicts = manager.list()
self.__moved_comms = manager.list()
self.__to_remove = manager.list()

# Assign internals
self.__file_stem = stem
Expand Down Expand Up @@ -91,7 +91,7 @@ def __get_communications(self, phase, rank):
phase_communications_dict = phase.get_communications()

# Add empty entries for ranks with no initial communication
if rank.get_id() not in phase_communications_dict.keys():
if rank.get_id() not in phase_communications_dict:
phase_communications_dict[rank.get_id()] = {}

# Get original communications on current rank
Expand All @@ -103,21 +103,42 @@ def __get_communications(self, phase, rank):
# Initialize final communications
communications = []

print_condition = True if phase.get_id() == 0 else False

# Ensure all objects are on the correct rank
if initial_on_rank_communications:
if print_condition:
print(f"WRITER (pre ): rank {rank.get_id()} phase {phase.get_id()}: {len(initial_on_rank_communications)}")
for comm_dict in initial_on_rank_communications:
if comm_dict["from"]["id"] in rank_objects:
communications.append(comm_dict)
else:
self.__moved_objs_dicts.append(comm_dict)
self.__moved_comms.append(comm_dict)
if print_condition:
print(f"Rank {rank.get_id()} moved a communication.")

if print_condition:
print(f"len moved_comms: {len(self.__moved_comms)}")
print(f"moved_comms: {self.__moved_comms}")

# Loop through any moved objects to find the correct rank
if self.__moved_objs_dicts:
to_remove = []
for moved_dict in self.__moved_objs_dicts:
if self.__moved_comms:
for moved_dict in self.__moved_comms:
# if print_condition:
# print(f"\ncurrent obj id: {moved_dict['from']['id']}")
# print(f"rank {rank.get_id()} objects: {rank_objects}\n")
if moved_dict["from"]["id"] in rank_objects:
if print_condition:
print(f"Rank {rank.get_id()} received a communication.")
communications.append(moved_dict)
to_remove.append(moved_dict)
self.__to_remove.append(moved_dict)

if self.__to_remove:
for removable_dict in self.__to_remove:
self.__moved_comms.remove(removable_dict)

if print_condition:
print(f"WRITER (post): rank {rank.get_id()} phase {phase.get_id()}: {len(communications)}\n")

return communications

Expand Down Expand Up @@ -153,13 +174,14 @@ def _json_serializer(self, rank_phases_double) -> str:
# Unpack received double
r_id, r_phases = rank_phases_double

# Get current phase
# Get current rank
for p_id, rank in r_phases.items():
current_phase = self.__phases.get(p_id)
if rank.get_id() == r_id:
current_rank = rank

# Get metadata
if current_phase.get_metadata()[r_id]:
metadata = current_phase.get_metadata()[r_id]
if current_rank.get_metadata():
metadata = current_rank.get_metadata()
else:
metadata = {
"type": "LBDatafile",
Expand All @@ -173,6 +195,9 @@ def _json_serializer(self, rank_phases_double) -> str:

# Iterate over phases
for p_id, rank in r_phases.items():
# Get current phase
current_phase = self.__phases.get(p_id)

# Create data to be outputted for current phase
self.__logger.debug(f"Writing phase {p_id} for rank {r_id}")
phase_data= {"id": p_id,
Expand Down
10 changes: 1 addition & 9 deletions src/lbaf/Model/lbsPhase.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,6 @@ def get_communications(self):
"""Return the phase communications dict."""
return self.__communications

def get_metadata(self):
"""Return the metadata dict."""
return self.__metadata

def set_metadata(self, metadata: dict):
"""Assign the metadata dict."""
self.__metadata = metadata

def __update_or_create_directed_edge(self, from_id: int, to_id: int, v: float):
"""Convenience method to update or create directed edge with given volume."""
# Create undidrected edge index and try to retrieve edge
Expand Down Expand Up @@ -401,7 +393,7 @@ def populate_from_samplers(self, n_ranks, n_objects, t_sampler, v_sampler, c_deg
def populate_from_log(self, phase_id):
"""Populate this phase by reading in a load profile from log files."""
# Populate phase with JSON reader output
self.__ranks, self.__communications, self.__metadata = self.__reader.populate_phase(phase_id)
self.__ranks, self.__communications = self.__reader.populate_phase(phase_id)
objects = set()
for p in self.__ranks:
objects = objects.union(p.get_objects())
Expand Down
11 changes: 11 additions & 0 deletions src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def __init__(
# Start with empty shared block information
self.__shared_blocks = set()

# Start with empty metadata
self.__metadata = {}

def copy(self, rank):
"""Specialized copy method."""
# Copy all flat member variables
Expand Down Expand Up @@ -69,6 +72,14 @@ def set_size(self, size: float):
f"size: incorrect type {type(size)} or value: {size}")
self.__size = size

def get_metadata(self) -> dict:
"""Return original metadata."""
return self.__metadata

def set_metadata(self, metadata: dict):
"""Set rank's metadata."""
self.__metadata = metadata

def get_shared_block_ids(self) -> set:
"""Return IDs of shared blocks."""
return {b.get_id() for b in self.__shared_blocks}
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/IO/test_lbs_vt_data_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_lbs_vt_data_reader_read_wrong_schema(self):
self.assertIn(err.exception.args[0], list_of_err_msg)

def test_lbs_vt_data_reader_populate_phase(self):
rank_list, comm_dict, metadata = self.lr.populate_phase(0)
rank_list, comm_dict = self.lr.populate_phase(0)
for rank_real, rank_mock in zip(rank_list, self.rank_list):
generated_list = sorted(list(rank_real.get_migratable_objects()), key=lambda x: x.get_id())
prepared_list = sorted(list(rank_mock.get_migratable_objects()), key=lambda x: x.get_id())
Expand Down
Loading

0 comments on commit 7e1a17f

Please sign in to comment.