From 12f7c6e6fff1cddaad204ebbdd685a7fe065777e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philippe=20P=2E=20Pe=CC=81bay=CC=88?= Date: Sat, 27 May 2023 23:28:52 -0400 Subject: [PATCH] #323: moved all messaging logic to inform and transfer algo --- config/user-defined-memory-toy-problem.yaml | 4 +- .../lbsInformAndTransferAlgorithm.py | 74 +++++++++++++------ src/lbaf/Model/lbsRank.py | 47 +----------- 3 files changed, 56 insertions(+), 69 deletions(-) diff --git a/config/user-defined-memory-toy-problem.yaml b/config/user-defined-memory-toy-problem.yaml index 18017a7a4..2e5824118 100644 --- a/config/user-defined-memory-toy-problem.yaml +++ b/config/user-defined-memory-toy-problem.yaml @@ -21,8 +21,8 @@ algorithm: phase_id: 0 parameters: n_iterations: 4 - n_rounds: 4 - fanout: 4 + n_rounds: 2 + fanout: 2 order_strategy: arbitrary transfer_strategy: Clustering criterion: Tempered diff --git a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py index 5476bb975..e8cbfea2d 100644 --- a/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py +++ b/src/lbaf/Execution/lbsInformAndTransferAlgorithm.py @@ -1,9 +1,12 @@ import sys +import random from logging import Logger from .lbsAlgorithmBase import AlgorithmBase from .lbsCriterionBase import CriterionBase from .lbsTransferStrategyBase import TransferStrategyBase +from ..Model.lbsRank import Rank +from ..Model.lbsMessage import Message from ..Model.lbsPhase import Phase from ..IO.lbsStatistics import print_function_statistics, min_Hamming_distance from ..Utils.exception_handler import exc_handler @@ -73,6 +76,35 @@ def __init__( sys.excepthook = exc_handler raise SystemExit(1) + def __initialize_message(self, r: Rank, loads: set, f: int): + """Initialize message to be sent to selected peers.""" + # Make rank aware of own load + r.set_known_load(r, r.get_load()) + + # Create load message tagged at first round + msg = Message(1, r.get_known_loads()) + + # Broadcast message to pseudo-random sample of ranks excluding self + return random.sample( + list(loads.difference([r])), min(f, len(loads) - 1)), msg + + def __forward_message(self, i: int, r: Rank, loads: set, f:int): + """Forward information message to sample of selected peers.""" + # Create load message tagged at given information round + msg = Message(i, r.get_known_loads()) + + # Compute complement of set of known peers + complement = set(r.get_known_loads()).difference([r]) + + # Forward message to pseudo-random sample of ranks + return random.sample( + list(complement), min(f, len(complement))), msg + + def __process_message(self, r: Rank, msg: Message): + """Update internals when message is received.""" + # Update load information + r.get_known_loads().update(msg.get_content()) + def __information_stage(self): """Execute information stage.""" @@ -82,49 +114,47 @@ def __information_stage(self): # Initialize information messages self._logger.info( f"Initializing information messages with fanout={self.__fanout}") - information_round = 1 messages = {} # Iterate over all ranks - for p_snd in rank_set: + for r_snd in rank_set: # Reset load information known by sender - p_snd.reset_all_load_information() + r_snd.reset_all_load_information() # Collect message when destination list is not empty - dst, msg = p_snd.initialize_message(rank_set, self.__fanout) - for p_rcv in dst: - messages.setdefault(p_rcv, []).append(msg) + dst, msg = self.__initialize_message(r_snd, rank_set, self.__fanout) + for r_rcv in dst: + messages.setdefault(r_rcv, []).append(msg) # Process all messages of first round - for p_rcv, msg_lst in messages.items(): + for r_rcv, msg_lst in messages.items(): for m in msg_lst: - p_rcv.process_message(m) + self.__process_message(r_rcv, m) # Report on gossiping status when requested for p in rank_set: - self._logger.debug(f"information known to rank {p.get_id()}: " - f"{[p_u.get_id() for p_u in p.get_known_loads()]}") + self._logger.debug( + f"information known to rank {p.get_id()}: " + f"{[p_u.get_id() for p_u in p.get_known_loads()]}") # Forward messages for as long as necessary and requested - while information_round < self.__n_rounds: + for i in range(1, self.__n_rounds): # Initiate next gossiping round - self._logger.debug(f"Performing message forwarding round {information_round}") - information_round += 1 + self._logger.debug(f"Performing message forwarding round {i}") messages.clear() # Iterate over all ranks - for p_snd in rank_set: - # Check whether rank must relay previously received message - if p_snd.round_last_received + 1 == information_round: - # Collect message when destination list is not empty - dst, msg = p_snd.forward_message(information_round, rank_set, self.__fanout) - for p_rcv in dst: - messages.setdefault(p_rcv, []).append(msg) + for r_snd in rank_set: + # Collect message when destination list is not empty + dst, msg = self.__forward_message( + i, r_snd, rank_set, self.__fanout) + for r_rcv in dst: + messages.setdefault(r_rcv, []).append(msg) # Process all messages of first round - for p_rcv, msg_lst in messages.items(): + for r_rcv, msg_lst in messages.items(): for m in msg_lst: - p_rcv.process_message(m) + self.__process_message(r_rcv, m) # Report on gossiping status when requested for p in rank_set: diff --git a/src/lbaf/Model/lbsRank.py b/src/lbaf/Model/lbsRank.py index 458e1c730..3e21c6455 100644 --- a/src/lbaf/Model/lbsRank.py +++ b/src/lbaf/Model/lbsRank.py @@ -1,11 +1,9 @@ import sys import copy import math -import random as rnd from logging import Logger from .lbsBlock import Block -from .lbsMessage import Message from .lbsObject import Object from ..Utils.exception_handler import exc_handler @@ -43,16 +41,12 @@ def __init__( # No information about peers is known initially self.__known_loads = {} - # No message was received initially - self.round_last_received = 0 - def copy(self, rank): """ Specialized copy method.""" # Copy all flat member variables self.__index = rank.get_id() self.__size = rank.get_size() - self.round_last_received = rank.round_last_received # Shallow copy owned objects self.__shared_blocks = copy.copy(rank.__shared_blocks) @@ -194,8 +188,8 @@ def get_known_loads(self) -> dict: """Return loads of peers know to self.""" return self.__known_loads - def add_known_load(self, rank): - """Make rank known to self if not already known.""" + def set_known_load(self, rank: "Rank", l: float): + """Set load of peer known to self.""" self.__known_loads.setdefault(rank, rank.get_load()) def get_targets(self) -> list: @@ -288,43 +282,6 @@ def reset_all_load_information(self): # Reset information about known peers self.__known_loads = {} - def initialize_message(self, loads: set, f: int): - """Initialize message to be sent to selected peers.""" - # Retrieve current load on this rank - l = self.get_load() - - # Make rank aware of own load - self.__known_loads[self] = l - - # Create load message tagged at first round - msg = Message(1, self.__known_loads) - - # Broadcast message to pseudo-random sample of ranks excluding self - return rnd.sample(list(loads.difference([self])), min(f, len(loads) - 1)), msg - - def forward_message(self, r, s, f): - """Forward information message to sample of selected peers.""" - # Create load message tagged at current round - msg = Message(r, self.__known_loads) - - # Compute complement of set of known peers - complement = set(self.__known_loads).difference([self]) - - # Forward message to pseudo-random sample of ranks - return rnd.sample(list(complement), min(f, len(complement))), msg - - def process_message(self, msg): - """Update internals when message is received.""" - # Assert that message has the expected type - if not isinstance(msg, Message): - self.__logger.warning(f"Attempted to pass message of incorrect type {type(msg)}. Ignoring it.") - - # Update load information - self.__known_loads.update(msg.get_content()) - - # Update last received message index - self.round_last_received = msg.get_round() - def compute_transfer_cmf(self, transfer_criterion, objects: list, targets: dict, strict=False): """Compute CMF for the sampling of transfer targets.""" # Initialize criterion values