From f896fb8f383b57a56d5c5df7507e081085b28a5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philippe=20P=2E=20Pe=CC=81bay=CC=88?= Date: Sat, 10 Jun 2023 17:36:22 -0400 Subject: [PATCH] #323: progress update (refactored messages now processed and relayed) --- .../lbsInformAndTransferAlgorithm.py | 37 +++++++++---------- src/lbaf/Model/lbsMessage.py | 12 +++--- src/lbaf/Model/lbsRank.py | 10 +---- 3 files changed, 24 insertions(+), 35 deletions(-) diff --git a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py index 3f9d90201..05f7c9179 100644 --- a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py +++ b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py @@ -76,18 +76,20 @@ def __init__( sys.excepthook = exc_handler raise SystemExit(1) - def __initialize_message(self, r: Rank, loads: set, f: int): + # Initialize empty dictionary of known peers + self.__known_peers = {} + + def __initialize_message(self, r_snd: Rank, rank_set: set, f: int): """Initialize message to be sent to selected peers.""" - # Make rank aware of own load - r.set_known_load(r, r.get_load()) + # Make rank aware of itself + self.__known_peers[r_snd] = {r_snd} - # Create load message tagged at first round - msg = Message(1, { - "loads": r.get_known_loads()}) + # Create initial message spawned from rank + msg = Message(0, self.__known_peers[r_snd]) # Broadcast message to pseudo-random sample of ranks excluding self return random.sample( - list(loads.difference([r])), min(f, len(loads) - 1)), msg + list(rank_set.difference([r_snd])), min(f, len(rank_set) - 1)), msg def __forward_message(self, i: int, r: Rank, loads: set, f:int): """Forward information message to sample of selected peers.""" @@ -102,36 +104,31 @@ def __forward_message(self, i: int, r: Rank, loads: set, f:int): return random.sample( list(complement), min(f, len(complement))), msg - def __process_message(self, r: Rank, msg: Message): - """Update internals when message is received.""" - # Update loads known by recipient - r.get_known_loads().update(msg.get_content()["loads"]) - def __information_stage(self): """Execute information stage.""" # Build set of all ranks in the phase rank_set = set(self._rebalanced_phase.get_ranks()) - # Initialize information messages + # Initialize information messages and known peers self._logger.info( f"Initializing information messages with fanout={self.__fanout}") messages = {} # Iterate over all ranks for r_snd in rank_set: - # Reset load information known by sender - r_snd.reset_all_load_information() - # Collect message when destination list is not empty dst, msg = self.__initialize_message(r_snd, rank_set, self.__fanout) for r_rcv in dst: messages.setdefault(r_rcv, []).append(msg) # Process all messages of first round - for r_rcv, msg_lst in messages.items(): - for m in msg_lst: - self.__process_message(r_rcv, m) + print(self.__known_peers) + for r_rcv, m_rcv in messages.items(): + print(r_rcv, ":") + for m in m_rcv: + # Process message by recipient + self.__known_peers[r_rcv].update(m.get_support()) # Report on gossiping status when requested for p in rank_set: @@ -165,7 +162,7 @@ def __information_stage(self): f"{[p_u.get_id() for p_u in p.get_known_loads()]}") def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max): - """ Execute 2-phase gossip+transfer algorithm on Phase with index p_id.""" + """ Execute 2-phase information+transfer algorithm on Phase with index p_id.""" # Perform pre-execution checks and initializations self._initialize(p_id, phases, distributions, statistics) diff --git a/src/lbaf/Model/lbsMessage.py b/src/lbaf/Model/lbsMessage.py index f27fcbd73..a4179a28d 100644 --- a/src/lbaf/Model/lbsMessage.py +++ b/src/lbaf/Model/lbsMessage.py @@ -2,20 +2,20 @@ class Message: """A class representing information sent between ranks """ - def __init__(self, r, c): + def __init__(self, r: int, s: set): # Member variables passed by constructor self.__round = r - self.__content = c + self.__support = s def __repr__(self): - return f"Message round: {self.__round}, Content: {self.__content}" + return f"Message at round: {self.__round}, Content: {self.__content}" def get_round(self): """Return message round index """ return self.__round - def get_content(self): - """Return message content + def get_support(self): + """Return message support """ - return self.__content + return self.__support diff --git a/src/lbaf/Model/lbsRank.py b/src/lbaf/Model/lbsRank.py index bc5bde65d..beab4bcca 100644 --- a/src/lbaf/Model/lbsRank.py +++ b/src/lbaf/Model/lbsRank.py @@ -179,10 +179,7 @@ def get_sentinel_object_ids(self) -> list: def is_sentinel(self, o: Object) -> list: """Return whether given object is sentinel of rank.""" - if o in self.__sentinel_objects: - return True - else: - return False + return (o in self.__sentinel_objects) def get_known_loads(self) -> dict: """Return loads of peers know to self.""" @@ -275,11 +272,6 @@ def get_max_memory_usage(self) -> float: """Return maximum memory usage on rank.""" return self.__size + self.get_shared_memory() + self.get_max_object_level_memory() - def reset_all_load_information(self): - """Reset all load information known to self.""" - # Reset information about known peers - self.__known_loads = {} - def compute_transfer_cmf(self, transfer_criterion, objects: list, targets: set, strict=False): """Compute CMF for the sampling of transfer targets.""" # Initialize criterion values