Skip to content

Commit

Permalink
#323: progress update (refactored messages now processed and relayed)
Browse files Browse the repository at this point in the history
  • Loading branch information
ppebay committed Jun 10, 2023
1 parent 2cf2776 commit f896fb8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 35 deletions.
37 changes: 17 additions & 20 deletions src/lbaf/Execution/lbsInformAndTransferAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,20 @@ def __init__(
sys.excepthook = exc_handler
raise SystemExit(1)

def __initialize_message(self, r: Rank, loads: set, f: int):
# Initialize empty dictionary of known peers
self.__known_peers = {}

def __initialize_message(self, r_snd: Rank, rank_set: set, f: int):
"""Initialize message to be sent to selected peers."""
# Make rank aware of own load
r.set_known_load(r, r.get_load())
# Make rank aware of itself
self.__known_peers[r_snd] = {r_snd}

# Create load message tagged at first round
msg = Message(1, {
"loads": r.get_known_loads()})
# Create initial message spawned from rank
msg = Message(0, self.__known_peers[r_snd])

# Broadcast message to pseudo-random sample of ranks excluding self
return random.sample(
list(loads.difference([r])), min(f, len(loads) - 1)), msg
list(rank_set.difference([r_snd])), min(f, len(rank_set) - 1)), msg

def __forward_message(self, i: int, r: Rank, loads: set, f:int):
"""Forward information message to sample of selected peers."""
Expand All @@ -102,36 +104,31 @@ def __forward_message(self, i: int, r: Rank, loads: set, f:int):
return random.sample(
list(complement), min(f, len(complement))), msg

def __process_message(self, r: Rank, msg: Message):
"""Update internals when message is received."""
# Update loads known by recipient
r.get_known_loads().update(msg.get_content()["loads"])

def __information_stage(self):
"""Execute information stage."""

# Build set of all ranks in the phase
rank_set = set(self._rebalanced_phase.get_ranks())

# Initialize information messages
# Initialize information messages and known peers
self._logger.info(
f"Initializing information messages with fanout={self.__fanout}")
messages = {}

# Iterate over all ranks
for r_snd in rank_set:
# Reset load information known by sender
r_snd.reset_all_load_information()

# Collect message when destination list is not empty
dst, msg = self.__initialize_message(r_snd, rank_set, self.__fanout)
for r_rcv in dst:
messages.setdefault(r_rcv, []).append(msg)

# Process all messages of first round
for r_rcv, msg_lst in messages.items():
for m in msg_lst:
self.__process_message(r_rcv, m)
print(self.__known_peers)
for r_rcv, m_rcv in messages.items():
print(r_rcv, ":")
for m in m_rcv:
# Process message by recipient
self.__known_peers[r_rcv].update(m.get_support())

# Report on gossiping status when requested
for p in rank_set:
Expand Down Expand Up @@ -165,7 +162,7 @@ def __information_stage(self):
f"{[p_u.get_id() for p_u in p.get_known_loads()]}")

def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max):
""" Execute 2-phase gossip+transfer algorithm on Phase with index p_id."""
""" Execute 2-phase information+transfer algorithm on Phase with index p_id."""
# Perform pre-execution checks and initializations
self._initialize(p_id, phases, distributions, statistics)

Expand Down
12 changes: 6 additions & 6 deletions src/lbaf/Model/lbsMessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ class Message:
"""A class representing information sent between ranks
"""

def __init__(self, r, c):
def __init__(self, r: int, s: set):
# Member variables passed by constructor
self.__round = r
self.__content = c
self.__support = s

def __repr__(self):
return f"Message round: {self.__round}, Content: {self.__content}"
return f"Message at round: {self.__round}, Content: {self.__content}"

def get_round(self):
"""Return message round index
"""
return self.__round

def get_content(self):
"""Return message content
def get_support(self):
"""Return message support
"""
return self.__content
return self.__support
10 changes: 1 addition & 9 deletions src/lbaf/Model/lbsRank.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ def get_sentinel_object_ids(self) -> list:

def is_sentinel(self, o: Object) -> list:
"""Return whether given object is sentinel of rank."""
if o in self.__sentinel_objects:
return True
else:
return False
return (o in self.__sentinel_objects)

def get_known_loads(self) -> dict:
"""Return loads of peers know to self."""
Expand Down Expand Up @@ -275,11 +272,6 @@ def get_max_memory_usage(self) -> float:
"""Return maximum memory usage on rank."""
return self.__size + self.get_shared_memory() + self.get_max_object_level_memory()

def reset_all_load_information(self):
"""Reset all load information known to self."""
# Reset information about known peers
self.__known_loads = {}

def compute_transfer_cmf(self, transfer_criterion, objects: list, targets: set, strict=False):
"""Compute CMF for the sampling of transfer targets."""
# Initialize criterion values
Expand Down

0 comments on commit f896fb8

Please sign in to comment.