From b8833ae31c3e8178886395c9667366341f40410e Mon Sep 17 00:00:00 2001 From: Caleb Schilly Date: Wed, 28 Feb 2024 12:49:27 -0500 Subject: [PATCH] #501: add option to sort clusters --- config/work-stealing.yaml | 1 + .../Execution/lbsWorkStealingAlgorithm.py | 66 ++++++++++++------- src/lbaf/IO/lbsConfigurationValidator.py | 7 +- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/config/work-stealing.yaml b/config/work-stealing.yaml index 3d4e09fac..d2161e26a 100644 --- a/config/work-stealing.yaml +++ b/config/work-stealing.yaml @@ -24,6 +24,7 @@ algorithm: steal_time: 0.2 num_experiments: 10 max_memory_usage: 8.0e+9 + sort_clusters: False # Specify output output_dir: ../output diff --git a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py index bc48b0961..60773aaa4 100644 --- a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py +++ b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py @@ -62,6 +62,14 @@ def run(self): # Continue if the rank has work and memory left while self.__continue_condition(): + # Print current clusters memories + # cluster_size_list = [] + # for cluster in self.algorithm.rank_queues[self.rank_id]: + # if isinstance(cluster, list): + # cluster_size_list.append(self.algorithm.calculate_cluster_memory_footprint(cluster)) + # if len(cluster_size_list) > 0: + # print(f"Rank {self.rank_id}: {cluster_size_list}") + # Check if rank has a cluster lined up if self.current_cluster is not None: @@ -80,13 +88,19 @@ def run(self): # TODO: how long? yield self.env.timeout(self.algorithm.steal_time) - # Check if there is anything in queue + # Check for a StealRequest at the back of the queue + elif self.algorithm.has_steal_request_at_back_of_deque(self.rank): + steal_request = self.algorithm.rank_queues[self.rank_id].pop() + self.algorithm.respond_to_steal_request(steal_request) + yield self.env.timeout(self.algorithm.steal_time) + + # Check for work in queue elif self.algorithm.has_work_in_deque(self.rank, including_steals=True): # Pop next object in queue item = self.algorithm.rank_queues[self.rank_id].popleft() - # If item is a cluster, move to self.current_cluster (will be executed next) + # If item is a cluster, try to move to self.current_cluster (will be executed next) if isinstance(item, list): # Make sure there is memory on the rank to execute the cluster @@ -109,11 +123,6 @@ def run(self): yield self.env.timeout(0.01) - # If item is a StealRequest, look for clusters to give up - elif isinstance(item, StealRequest): - self.algorithm.respond_to_steal_request(item) - yield self.env.timeout(self.algorithm.steal_time) - # Catch any other datatypes else: self.__logger.error(f"Received some other datatype: {type(item)}") @@ -149,7 +158,7 @@ def run(self): steal_request = StealRequest(requesting_rank, target_rank) # Place steal request in target's queue idx = self.algorithm.get_index_of_first_non_steal_request(target_rank) - self.algorithm.rank_queues[target_rank_id].insert(idx, steal_request) + self.algorithm.rank_queues[target_rank_id].insert(idx + 1, steal_request) self.pending_steal_request = True yield self.env.timeout(self.algorithm.steal_time) # TODO: are we double counting steal time here? (see line 99) else: @@ -172,7 +181,7 @@ def __continue_condition(self): # Check if all ranks are done if not continue_condition: self.running = False - print(f"Rank {self.rank_id} is done running (memory {self.rank_memory}).") + # print(f"Rank {self.rank_id} is done running (memory {self.rank_memory}).") any_worker_still_running = any(worker.running for worker in self.algorithm.workers) # Throw error if all ranks are done but there is still work @@ -200,10 +209,10 @@ def __has_memory(self): return self.rank_memory < self.algorithm.max_memory_usage def __check_for_steal_requests(self): - """Checks next item in queue; if it's a steal request, responds accordingly.""" + """Checks item at the back of the queue; if it's a steal request, responds accordingly.""" rank_queue = self.algorithm.rank_queues[self.rank_id] - if len(rank_queue) > 0 and isinstance(rank_queue[0], StealRequest): - steal_request = rank_queue.popleft() + if len(rank_queue) > 0 and isinstance(rank_queue[-1], StealRequest): + steal_request = rank_queue.pop() self.algorithm.respond_to_steal_request(steal_request) yield self.env.timeout((self.algorithm.steal_time)) @@ -254,6 +263,7 @@ def __init__( self.steal_time = parameters.get("steal_time", 0.1) self.do_stealing = parameters.get("do_stealing", True) self.max_memory_usage = parameters.get("max_memory_usage", 8.0e+9) + self.sort_clusters = parameters.get("sort_clusters", True) # Initialize logger self.__logger = lgr @@ -285,14 +295,18 @@ def __build_rank_clusters(self, rank: Rank) -> dict: clusters.setdefault(sb.get_id(), []).append(o) self.__total_task_count += 1 - # Return dict of computed object clusters possibly randomized + # Return randomized dict of computed object clusters return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))} def __initialize_rank_queues(self): """Populates every rank's deque with all initial clusters.""" for r in self.ranks.values(): rank_clusters = self.__build_rank_clusters(r) - self.rank_queues[r.get_id()] = deque(cluster for cluster in rank_clusters.values()) + if self.sort_clusters: + clusters = sorted(rank_clusters.values(), key=self.calculate_cluster_memory_footprint, reverse=True) + else: + clusters = [cluster for cluster in rank_clusters.values()] + self.rank_queues[r.get_id()] = deque(clusters) def __reset(self): """Resets the algorithm for repeated experiments.""" @@ -329,6 +343,10 @@ def has_work_in_deque(self, rank, including_steals=False): else: return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()]) + def has_steal_request_at_back_of_deque(self, rank): + """Determines if a given rank's deque has a StealRequest at the back.""" + return self.has_work_in_deque(rank, including_steals=True) and isinstance(self.rank_queues[rank.get_id()][-1], StealRequest) + def get_task_count(self): """Returns number of tasks that have been simulated.""" return self.__task_count @@ -342,12 +360,10 @@ def get_total_task_count(self): return self.__total_task_count def get_index_of_first_non_steal_request(self, rank): - """Returns the index of the first non-StealRequest in a rank's deque.""" + """Returns the index of the first non-StealRequest in a rank's deque (iterating from the back).""" rank_queue = self.rank_queues[rank.get_id()] - for i in range(len(rank_queue)): - if isinstance(rank_queue[i], StealRequest): - continue - else: + for i in range(len(rank_queue) - 1, -1, -1): + if not isinstance(rank_queue[i], StealRequest): return i def iterate_attempted_steals(self): @@ -355,18 +371,18 @@ def iterate_attempted_steals(self): self.__attempted_steal_count += 1 def has_stealable_cluster(self, target_rank, requesting_rank): - """Asserts that a given target_rank has a stealable cluster at the back of its deque. + """Asserts that a given target_rank has a stealable cluster at the front of its deque. Also checks that the requesting rank has enough memory for the steal.""" rank_queue = self.rank_queues[target_rank.get_id()] # Make sure target_rank has a cluster at the back of its queue - has_cluster = self.has_work_in_deque(target_rank) and isinstance(rank_queue[-1], list) + has_cluster = self.has_work_in_deque(target_rank) and isinstance(rank_queue[0], list) # Make sure the requesting rank has enough memory to execute the cluster - return has_cluster and self.rank_has_memory_for_cluster(requesting_rank, rank_queue[-1]) + return has_cluster and self.rank_has_memory_for_cluster(requesting_rank, rank_queue[0]) def respond_to_steal_request(self, steal_request: StealRequest): - """Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue.""" + """Resolves steal requests; if there is a cluster at the front of the receiving rank's queue, it is relocated to the sending rank's queue.""" # Get both ranks r_requesting = steal_request.get_requesting_rank() r_target = steal_request.get_target_rank() @@ -375,9 +391,9 @@ def respond_to_steal_request(self, steal_request: StealRequest): if self.has_stealable_cluster(r_target, r_requesting): # Perform steal - cluster = self.rank_queues[r_target.get_id()].pop() + cluster = self.rank_queues[r_target.get_id()].popleft() self.__logger.info(f" Performing steal of shared block {cluster[0].get_shared_block_id()} (from {r_target.get_id()} to {r_requesting.get_id()})") - self.rank_queues[r_requesting.get_id()].append(cluster) + self.rank_queues[r_requesting.get_id()].appendleft(cluster) self.__steal_count += 1 else: diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index e5d60482f..737713904 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -185,8 +185,8 @@ def __init__(self, config_to_validate: dict, logger: Logger): "parameters": { "discretion_interval": float, Optional("do_stealing"): And( - bool, - error="Should be of type 'bool'"), + bool, + error="Should be of type 'bool'"), Optional("steal_time"): And( float, lambda x: x>=0.0, @@ -195,6 +195,9 @@ def __init__(self, config_to_validate: dict, logger: Logger): float, lambda x: x>=0.0, error="Should be of type 'float' and >= 0.0"), + Optional("sort_clusters"): And( + bool, + error="Should be of type 'bool'"), Optional("num_experiments"): And( int, lambda x: x > 0,