From 5d206f80659cce0a9b9a855f6764987add828a17 Mon Sep 17 00:00:00 2001 From: Caleb Schilly Date: Thu, 22 Feb 2024 12:48:06 -0500 Subject: [PATCH] #501: add simpy and continue work on algorithm --- config/work-stealing.yaml | 30 +- .../Execution/lbsWorkStealingAlgorithm.py | 361 ++++++++++-------- src/lbaf/IO/lbsConfigurationValidator.py | 16 +- 3 files changed, 217 insertions(+), 190 deletions(-) diff --git a/config/work-stealing.yaml b/config/work-stealing.yaml index d71bc6ca4..ae67442f6 100644 --- a/config/work-stealing.yaml +++ b/config/work-stealing.yaml @@ -1,6 +1,6 @@ # Specify input from_data: - data_stem: ../data/synthetic_lb_data/data + data_stem: ../data/user-defined-memory-toy-problem/toy_mem phase_ids: - 0 check_schema: false @@ -9,33 +9,21 @@ check_schema: false work_model: name: AffineCombination parameters: - alpha: 0.0 - beta: 1.0 - gamma: 0.0 + alpha: 1. + beta: 0. + gamma: 0. + upper_bounds: + max_memory_usage: 8.0e+9 # Specify algorithm algorithm: name: WorkStealing parameters: discretion_interval: 0.010 - criterion: Tempered + steal_time: 0.2 + num_experiments: 20 # Specify output output_dir: ../output output_file_stem: output_file -write_JSON: - compressed: false - suffix: json - communications: true - offline_LB_compatible: false -visualization: - x_ranks: 2 - y_ranks: 2 - z_ranks: 1 - object_jitter: 0.5 - rank_qoi: work - object_qoi: load - save_meshes: true - force_continuous_object_qoi: true - output_visualization_dir: ../output - output_visualization_file_stem: output_file + diff --git a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py index 5c21a5a43..8d4d254e8 100644 --- a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py +++ b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py @@ -1,19 +1,134 @@ +import simpy import random -import time from logging import Logger from collections import deque from .lbsAlgorithmBase import AlgorithmBase -from .lbsCriterionBase import CriterionBase -from .lbsTransferStrategyBase import TransferStrategyBase from ..Model.lbsRank import Rank -from ..Model.lbsMessage import Message -from ..IO.lbsStatistics import min_Hamming_distance, print_function_statistics +from ..Model.lbsObject import Object + + +################################################## +######## Helper classes ######## +################################################## + + +class StealRequest(): + def __init__(self, r_snd: Rank, r_rcv: Rank): + """Creates a steal request where r_snd steals a cluster from r_rcv""" + self.__r_snd = r_snd + self.__r_rcv = r_rcv + + def get_requesting_rank(self): + return self.__r_snd + + def get_target_rank(self): + return self.__r_rcv + + +class RankWorker(object): + def __init__(self, env, rank_id, algorithm, lgr: Logger): + """Class that handles all transfers, steals, and executions of tasks on a rank.""" + # Set up simpy environment + self.env = env + self.action = env.process(self.run()) + + # Initialize logger and algorithm + self.__logger = lgr + self.algorithm = algorithm + + # Initialize rank information + self.rank_id = rank_id + + # Output initial information + initial_work = self.__get_total_work() + self.__logger.info(f"id={self.rank_id}: total initial work={initial_work}, initial size={len(self.algorithm.rank_queues[self.rank_id])}") + + def run(self): + """Defines the process that the simpy environment will run.""" + while self.algorithm.any_ranks_have_work() if self.algorithm.do_stealing else self.__has_work(): + if self.__has_work(): + # Get next object in queue + item = self.algorithm.rank_queues[self.rank_id].popleft() + + # Execute task if it's an Object + if isinstance(item, Object): + + # If there is a shared block, find all other objects with that id and bring them to the front of the queue + sb_id = item.get_shared_block_id() + if sb_id is not None: + all_objs_in_cluster = [] + for o in self.algorithm.rank_queues[self.rank_id]: + if isinstance(o, Object) and o.get_shared_block_id() == sb_id: + all_objs_in_cluster.append(o) + for o in all_objs_in_cluster: + self.algorithm.rank_queues[self.rank_id].insert(0, o) # Move all objects to the front of the queue + + # Then execute the task + yield self.env.process(self.__simulate_task(item)) + + # If item is a cluster, break into objects and move to front of the queue + elif isinstance(item, list): + for o in item: + self.algorithm.rank_queues[self.rank_id].insert(0,o) + + # Then execute the current task + task = self.algorithm.rank_queues[self.rank_id].popleft() + yield self.env.process(self.__simulate_task(task)) + + # If item is a Steal Request, look for clusters to give up + elif isinstance(item, StealRequest): + self.algorithm.respond_to_steal_request(item) + yield self.env.timeout(self.algorithm.steal_time) # is self.algorithm.steal_time right here? + + # Catch any errors + else: + self.__logger.info(f"Received some other datatype: {type(item)}") + + # If no work available, request a steal from a random rank + else: + target_rank_id = random.randrange(0, self.algorithm.num_ranks) + requesting_rank = self.algorithm.ranks[self.rank_id] + target_rank = self.algorithm.ranks[target_rank_id] + if self.algorithm.has_work(target_rank): + steal_request = StealRequest(requesting_rank, target_rank) + # Place steal request in target's queue + self.algorithm.rank_queues[target_rank_id].append(steal_request) + yield self.env.timeout(self.algorithm.steal_time) + + # Update algorithm queue + self.algorithm.update_rank_queue(self.rank_id, self.algorithm.rank_queues[self.rank_id]) + + def __get_total_work(self): + """Returns the total work on the rank.""" + total_work = 0. + for item in self.algorithm.rank_queues[self.rank_id]: + if isinstance(item, Object): + total_work += item.get_load() + elif isinstance(item, list): + for o in item: + total_work += o.get_load() + else: + pass + return total_work + + def __has_work(self): + """Returns True if the rank has an object, cluster, or StealRequest in its queue.""" + return len(self.algorithm.rank_queues[self.rank_id]) > 0 + + def __simulate_task(self, task: Object): + """Simulates the execution of a task""" + self.__logger.info(f"Rank {self.rank_id}: executing task {task.get_id()} (load {task.get_load()}) at time {self.env.now}") + yield self.env.timeout(task.get_load()) + + +################################################# +######## Algorithm class ######## +################################################# class WorkStealingAlgorithm(AlgorithmBase): - """A concrete class simulating execution.""" - + """A class for simulating work stealing for memory-constrained problems.""" def __init__( self, work_model, @@ -28,36 +143,27 @@ def __init__( :param rank_qoi: rank QOI to track :param object_qoi: object QOI to track. """ - # Call superclass init super(WorkStealingAlgorithm, self).__init__( work_model, parameters, lgr, rank_qoi, object_qoi) # Initialize the discretion interval self.__discretion_interval = parameters.get("discretion_interval") + self.steal_time = parameters.get("steal_time", 0.1) - # Initialize cluster swap relative threshold - self.__cluster_swap_rtol = parameters.get("cluster_swap_rtol", 0.05) - self.__logger.info( - f"Relative tolerance for cluster swaps: {self.__cluster_swap_rtol}") - - # Initialize queue per rank - self.__queue_per_rank = {} + # Initialize logger + self.__logger = lgr - # Initialize global cluster swapping counters - self.__n_swaps, self.__n_swap_tries = 0, 0 + # Initialize rank information + self.ranks = {} + self.num_ranks = 0 + self.rank_queues = {} - # Try to instantiate object transfer criterion - crit_name = parameters.get("criterion") - self.__transfer_criterion = CriterionBase.factory( - crit_name, - self._work_model, - logger=self.__logger) - if not self.__transfer_criterion: - self.__logger.error(f"Could not instantiate a transfer criterion of type {crit_name}") - raise SystemExit(1) + # Initialize the number of experiments and experiment times + self.__num_experiments = parameters.get("num_experiments", 10) + self.__experiment_times = [] - # Optional target imbalance for early termination of iterations - self.__target_imbalance = parameters.get("target_imbalance", 0.0) + # Initialize do_stealing + self.do_stealing = parameters.get("do_stealing", True) def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict: """Cluster migratiable objects by shared block ID when available.""" @@ -65,144 +171,77 @@ def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict: clusters = {None: []} if with_nullset else {} for o in rank.get_migratable_objects(): # Retrieve shared block ID and skip object without one - sb_id = o.get_shared_block_id() - if sb_id is None: + sb = o.get_shared_block() + if sb is None: continue # Add current object to its block ID cluster - clusters.setdefault(sb_id, []).append(o) + clusters.setdefault(sb.get_id(), []).append(o) # Return dict of computed object clusters possibly randomized - return clusters if self._deterministic_transfer else { - k: clusters[k] - for k in random.sample(clusters.keys(), len(clusters))} - - def __swap_clusters(self, phase: Phase, r_src: Rank, clusters_src:dict, targets: dict) -> int: - """Perform feasible cluster swaps from given rank to possible targets.""" - # Initialize return variable - n_rank_swaps = 0 - - # Iterate over targets to identify and perform beneficial cluster swaps - for r_try in targets if self._deterministic_transfer else random.sample(targets, len(targets)): - # Escape targets loop if at least one swap already occurred - if n_rank_swaps: - break - - # Cluster migratiable objects on target rank - clusters_try = self.__build_rank_clusters(r_try, True) - self.__logger.debug( - f"Constructed {len(clusters_try)} migratable clusters on target rank {r_try.get_id()}") - - # Iterate over source clusters - for k_src, o_src in clusters_src.items(): - # Iterate over target clusters - for k_try, o_try in clusters_try.items(): - # Decide whether swap is beneficial - c_try = self.__transfer_criterion.compute(r_src, o_src, r_try, o_try) - self.__n_swap_tries += 1 - if c_try > 0.0: - # Compute source cluster size only when necessary - sz_src = sum([o.get_load() for o in o_src]) - if c_try > self.__cluster_swap_rtol * sz_src: - # Perform swap - self.__logger.debug( - f"Swapping cluster {k_src} of size {sz_src} with cluster {k_try} on {r_try.get_id()}") - self._n_transfers += phase.transfer_objects( - r_src, o_src, r_try, o_try) - del clusters_try[k_try] - n_rank_swaps += 1 - break - else: - # Reject swap - self._n_rejects += len(o_src) + len(o_try) - - # Return number of swaps performed from rank - n_rank_swaps = 0 - -def __execute_stealing_stage(self): - """Execute stealing stage.""" - # Build set of all ranks in the phase - rank_set = set(self._rebalanced_phase.get_ranks()) - - # Initialize information messages and known peers - messages = {} - n_r = len(rank_set) - for r_snd in rank_set: - # Make rank aware of itself - self.__known_peers[r_snd] = {r_snd} - - # Create initial message spawned from rank - msg = Message(0, {r_snd}) - - # Broadcast message to random sample of ranks excluding self - for r_rcv in random.sample( - list(rank_set.difference({r_snd})), min(self.__fanout, n_r - 1)): - messages.setdefault(r_rcv, []).append(msg) - - # Sanity check prior to forwarding iterations - if (n_m := sum([len(m) for m in messages.values()])) != (n_c := n_r * self.__fanout): - self._logger.error( - f"Incorrect number of initial messages: {n_m} <> {n_c}") - self._logger.info( - f"Sent {n_m} initial information messages with fanout={self.__fanout}") - - # Process all received initial messages - for r_rcv, m_rcv in messages.items(): - for m in m_rcv: - # Process message by recipient - self.__process_message(r_rcv, m) - - # Perform sanity check on first round of information aggregation - n_k = 0 - for r in rank_set: - # Retrieve and tally peers known to rank - k_p = self.__known_peers.get(r, {}) - n_k += len(k_p) - self._logger.debug( - f"Peers known to rank {r.get_id()}: {[r_k.get_id() for r_k in k_p]}") - if n_k != (n_c := n_c + n_r): - self._logger.error( - f"Incorrect total number of aggregated initial known peers: {n_k} <> {n_c}") - - # Forward messages for as long as necessary and requested - for i in range(1, self.__n_rounds): - # Initiate next information round - self._logger.debug(f"Performing message forwarding round {i}") - messages.clear() - - # Iterate over all ranks - for r_snd in rank_set: - # Collect message when destination list is not empty - dst, msg = self.__forward_message( - i, r_snd, self.__fanout) - for r_rcv in dst: - messages.setdefault(r_rcv, []).append(msg) - - # Process all messages of first round - for r_rcv, msg_lst in messages.items(): - for m in msg_lst: - self.__process_message(r_rcv, m) - - # Report on known peers when requested - for rank in rank_set: - self._logger.debug( - f"Peers known to rank {rank.get_id()}: {[r_k.get_id() for r_k in k_p]}") - - # Report on final know information ratio - n_k = sum([len(k_p) for k_p in self.__known_peers.values() if k_p]) / n_r - self._logger.info( - f"Average number of peers known to ranks: {n_k} ({100 * n_k / n_r:.2f}% of {n_r})") + return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))} + + def __initialize_rank_queues(self): + """Populates every rank's deque with all initial clusters.""" + for r in self.ranks.values(): + rank_clusters = self.__build_rank_clusters(r, False) + self.rank_queues[r.get_id()] = deque(cluster for cluster in rank_clusters.values()) + + def respond_to_steal_request(self, steal_request: StealRequest): + '''Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue.''' + # Get both ranks + r_snd = steal_request.get_requesting_rank() + r_rcv = steal_request.get_target_rank() + + # Check that r_rcv still has work + if self.has_work(r_rcv): + + # If back of queue is a list (i.e. a cluster), allow steal + if isinstance(self.rank_queues[r_rcv.get_id()][-1], list): + cluster = self.rank_queues[r_rcv.get_id()].pop() + self.rank_queues[r_snd.get_id()].append(item) + + def has_work(self, rank): + """Determines if a given rank has an object, cluster, or StealRequest in its deque.""" + return len(self.rank_queues[rank.get_id()]) > 0 + + def any_ranks_have_work(self): + """Determines if any rank has an object, cluster, or StealRequest in its deque.""" + for r in self.ranks.values(): + if self.has_work(r): + return True + return False + + def update_rank_queue(self, rank_id, queue: deque): + """Syncs the algorithm's dict of each rank's queue with the RankWorker's queue.""" + self.rank_queues[rank_id] = queue def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max): - """ Simulate execution.""" - # Implement a discrete simulator - # - Once a rank completes its task, it "steals" a random cluster from a random rank - # - Output time at the end - - # Perform pre-execution checks and initializations + """Performs the simulation and returns the average time to complete all tasks.""" + # Initialize algorithm self._initialize(p_id, phases, distributions, statistics) - print_function_statistics( - self._rebalanced_phase.get_ranks(), - self._work_model.compute, - "initial rank work", - self._logger) + ranks_list = self._rebalanced_phase.get_ranks() + self.ranks = {rank.get_id(): rank for rank in ranks_list} + self.num_ranks = len(self.ranks) + self.__initialize_rank_queues() + + experiment_times = [] + + for exp in range(self.__num_experiments): + random.seed() + + workers = [] + + env = simpy.Environment() + + for i in range(self.num_ranks): + workers.append(RankWorker(env, i, self, self.__logger)) + + env.run() + end_time = env.now + self.__logger.info(f"simulation finished at time {end_time}") + + experiment_times.append(end_time) + + self.__logger.info(f"Average time: {sum(experiment_times)/len(experiment_times)}") + diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index 62d460b81..f998b09d6 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -183,15 +183,15 @@ def __init__(self, config_to_validate: dict, logger: Logger): "WorkStealing": Schema( {"name": "WorkStealing", "parameters": { - "criterion": And( - str, - lambda f: f in ALLOWED_CRITERIA, - error=f"{get_error_message(ALLOWED_CRITERIA)} must be chosen"), "discretion_interval": float, - Optional("cluster_swap_rtol"): And( - float, - lambda x: x > 0.0, - error="Should be of type 'float' and > 0.0")}})} + Optional("steal_time"): And( + float, + lambda x: x>=0.0, + error="Should be of type 'float' and >= 0.0"), + Optional("num_experiments"): And( + int, + lambda x: x > 0, + error="Should be of type 'int' and > 0")}})} self.__logger = logger @staticmethod