Skip to content

Commit

Permalink
#323: moved all messaging logic to inform and transfer algo
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed May 28, 2023
1 parent 6b3ab6d commit 12f7c6e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 69 deletions.
4 changes: 2 additions & 2 deletions config/user-defined-memory-toy-problem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ algorithm:
phase_id: 0
parameters:
n_iterations: 4
n_rounds: 4
fanout: 4
n_rounds: 2
fanout: 2
order_strategy: arbitrary
transfer_strategy: Clustering
criterion: Tempered
Expand Down
74 changes: 52 additions & 22 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import sys
import random

from logging import Logger
from .lbsAlgorithmBase import AlgorithmBase
from .lbsCriterionBase import CriterionBase
from .lbsTransferStrategyBase import TransferStrategyBase
from ..Model.lbsRank import Rank
from ..Model.lbsMessage import Message
from ..Model.lbsPhase import Phase
from ..IO.lbsStatistics import print_function_statistics, min_Hamming_distance
from ..Utils.exception_handler import exc_handler
Expand Down Expand Up @@ -73,6 +76,35 @@ def __init__(
sys.excepthook = exc_handler
raise SystemExit(1)

def __initialize_message(self, r: Rank, loads: set, f: int):
"""Initialize message to be sent to selected peers."""
# Make rank aware of own load
r.set_known_load(r, r.get_load())

# Create load message tagged at first round
msg = Message(1, r.get_known_loads())

# Broadcast message to pseudo-random sample of ranks excluding self
return random.sample(
list(loads.difference([r])), min(f, len(loads) - 1)), msg

def __forward_message(self, i: int, r: Rank, loads: set, f:int):
"""Forward information message to sample of selected peers."""
# Create load message tagged at given information round
msg = Message(i, r.get_known_loads())

# Compute complement of set of known peers
complement = set(r.get_known_loads()).difference([r])

# Forward message to pseudo-random sample of ranks
return random.sample(
list(complement), min(f, len(complement))), msg

def __process_message(self, r: Rank, msg: Message):
"""Update internals when message is received."""
# Update load information
r.get_known_loads().update(msg.get_content())

def __information_stage(self):
"""Execute information stage."""

Expand All @@ -82,49 +114,47 @@ def __information_stage(self):
# Initialize information messages
self._logger.info(
f"Initializing information messages with fanout={self.__fanout}")
information_round = 1
messages = {}

# Iterate over all ranks
for p_snd in rank_set:
for r_snd in rank_set:
# Reset load information known by sender
p_snd.reset_all_load_information()
r_snd.reset_all_load_information()

# Collect message when destination list is not empty
dst, msg = p_snd.initialize_message(rank_set, self.__fanout)
for p_rcv in dst:
messages.setdefault(p_rcv, []).append(msg)
dst, msg = self.__initialize_message(r_snd, rank_set, self.__fanout)
for r_rcv in dst:
messages.setdefault(r_rcv, []).append(msg)

# Process all messages of first round
for p_rcv, msg_lst in messages.items():
for r_rcv, msg_lst in messages.items():
for m in msg_lst:
p_rcv.process_message(m)
self.__process_message(r_rcv, m)

# Report on gossiping status when requested
for p in rank_set:
self._logger.debug(f"information known to rank {p.get_id()}: "
f"{[p_u.get_id() for p_u in p.get_known_loads()]}")
self._logger.debug(
f"information known to rank {p.get_id()}: "
f"{[p_u.get_id() for p_u in p.get_known_loads()]}")

# Forward messages for as long as necessary and requested
while information_round < self.__n_rounds:
for i in range(1, self.__n_rounds):
# Initiate next gossiping round
self._logger.debug(f"Performing message forwarding round {information_round}")
information_round += 1
self._logger.debug(f"Performing message forwarding round {i}")
messages.clear()

# Iterate over all ranks
for p_snd in rank_set:
# Check whether rank must relay previously received message
if p_snd.round_last_received + 1 == information_round:
# Collect message when destination list is not empty
dst, msg = p_snd.forward_message(information_round, rank_set, self.__fanout)
for p_rcv in dst:
messages.setdefault(p_rcv, []).append(msg)
for r_snd in rank_set:
# Collect message when destination list is not empty
dst, msg = self.__forward_message(
i, r_snd, rank_set, self.__fanout)
for r_rcv in dst:
messages.setdefault(r_rcv, []).append(msg)

# Process all messages of first round
for p_rcv, msg_lst in messages.items():
for r_rcv, msg_lst in messages.items():
for m in msg_lst:
p_rcv.process_message(m)
self.__process_message(r_rcv, m)

# Report on gossiping status when requested
for p in rank_set:
Expand Down
47 changes: 2 additions & 45 deletions src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import sys
import copy
import math
import random as rnd
from logging import Logger

from .lbsBlock import Block
from .lbsMessage import Message
from .lbsObject import Object
from ..Utils.exception_handler import exc_handler

Expand Down Expand Up @@ -43,16 +41,12 @@ def __init__(
# No information about peers is known initially
self.__known_loads = {}

# No message was received initially
self.round_last_received = 0

def copy(self, rank):
""" Specialized copy method."""

# Copy all flat member variables
self.__index = rank.get_id()
self.__size = rank.get_size()
self.round_last_received = rank.round_last_received

# Shallow copy owned objects
self.__shared_blocks = copy.copy(rank.__shared_blocks)
Expand Down Expand Up @@ -194,8 +188,8 @@ def get_known_loads(self) -> dict:
"""Return loads of peers know to self."""
return self.__known_loads

def add_known_load(self, rank):
"""Make rank known to self if not already known."""
def set_known_load(self, rank: "Rank", l: float):
"""Set load of peer known to self."""
self.__known_loads.setdefault(rank, rank.get_load())

def get_targets(self) -> list:
Expand Down Expand Up @@ -288,43 +282,6 @@ def reset_all_load_information(self):
# Reset information about known peers
self.__known_loads = {}

def initialize_message(self, loads: set, f: int):
"""Initialize message to be sent to selected peers."""
# Retrieve current load on this rank
l = self.get_load()

# Make rank aware of own load
self.__known_loads[self] = l

# Create load message tagged at first round
msg = Message(1, self.__known_loads)

# Broadcast message to pseudo-random sample of ranks excluding self
return rnd.sample(list(loads.difference([self])), min(f, len(loads) - 1)), msg

def forward_message(self, r, s, f):
"""Forward information message to sample of selected peers."""
# Create load message tagged at current round
msg = Message(r, self.__known_loads)

# Compute complement of set of known peers
complement = set(self.__known_loads).difference([self])

# Forward message to pseudo-random sample of ranks
return rnd.sample(list(complement), min(f, len(complement))), msg

def process_message(self, msg):
"""Update internals when message is received."""
# Assert that message has the expected type
if not isinstance(msg, Message):
self.__logger.warning(f"Attempted to pass message of incorrect type {type(msg)}. Ignoring it.")

# Update load information
self.__known_loads.update(msg.get_content())

# Update last received message index
self.round_last_received = msg.get_round()

def compute_transfer_cmf(self, transfer_criterion, objects: list, targets: dict, strict=False):
"""Compute CMF for the sampling of transfer targets."""
# Initialize criterion values
Expand Down

0 comments on commit 12f7c6e

Please sign in to comment.