diff --git a/config/user-defined-memory-toy-problem.yaml b/config/user-defined-memory-toy-problem.yaml index 287184c5d..1cc93590d 100644 --- a/config/user-defined-memory-toy-problem.yaml +++ b/config/user-defined-memory-toy-problem.yaml @@ -4,6 +4,7 @@ from_data: phase_ids: - 0 check_schema: false +overwrite_validator: false # Specify work model work_model: diff --git a/config/work-stealing.yaml b/config/work-stealing.yaml index 4e957ee07..5f0c757f5 100644 --- a/config/work-stealing.yaml +++ b/config/work-stealing.yaml @@ -4,6 +4,7 @@ from_data: phase_ids: - 0 check_schema: false +overwrite_validator: false # Specify work model work_model: diff --git a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py index 6cad6a2d9..e80212193 100644 --- a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py +++ b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py @@ -40,6 +40,7 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger): # Initialize rank information self.rank_id = rank_id self.rank = self.algorithm.ranks[self.rank_id] + self.pending_steal_request = False # Output initial information initial_work = self.__get_total_work() @@ -89,14 +90,20 @@ def run(self): # If no work available, request a steal from a random rank else: - target_rank_id = random.randrange(0, self.algorithm.num_ranks) - requesting_rank = self.rank - target_rank = self.algorithm.ranks[target_rank_id] - if self.algorithm.has_work(target_rank): - steal_request = StealRequest(requesting_rank, target_rank) - # Place steal request in target's queue - self.algorithm.rank_queues[target_rank_id].appendleft(steal_request) - yield self.env.timeout(self.algorithm.steal_time) + + if not self.pending_steal_request: + target_rank_id = random.randrange(0, self.algorithm.num_ranks) + requesting_rank = self.rank + target_rank = self.algorithm.ranks[target_rank_id] + if self.algorithm.has_work(target_rank): + steal_request = StealRequest(requesting_rank, target_rank) + # Place steal request in target's queue + self.algorithm.rank_queues[target_rank_id].appendleft(steal_request) + yield self.env.timeout(self.algorithm.steal_time) + else: + # this rank is awaiting the fulfillment of a steal request + # and can not proceed until it gets a response + pass def __get_total_work(self): """Returns the total work on the rank.""" @@ -118,7 +125,10 @@ def __has_work(self): def __simulate_task(self, task: Object): """Simulates the execution of a task""" self.algorithm.increment_task_count() - self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})") + queue_sizes = [] + for i in range(self.algorithm.num_ranks): + queue_sizes.append(len(self.algorithm.rank_queues[i])) + self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes}") yield self.env.timeout(task.get_load()) @@ -157,6 +167,7 @@ def __init__( self.ranks = {} self.num_ranks = 0 self.rank_queues = {} + self.workers = [] # Initialize task and steal counts self.__task_count = 0 @@ -232,6 +243,9 @@ def respond_to_steal_request(self, steal_request: StealRequest): r_requesting = steal_request.get_requesting_rank() r_target = steal_request.get_target_rank() + # set false as we are responding, either by putting work or doing nothing + self.workers[r_target.get_id()].pending_steal_request = False + # Check that r_target has a cluster to steal if self.__has_stealable_cluster(r_target): @@ -277,14 +291,14 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict # Set up problem random.seed() - workers = [] + self.workers = [] # Create simpy environment env = simpy.Environment() # Instantiate RankWorkers for i in range(self.num_ranks): - workers.append(RankWorker(env, i, self, self.__logger)) + self.workers.append(RankWorker(env, i, self, self.__logger)) # Run the environment env.run()