Skip to content

Commit

Permalink
#501: small fixes and more debugging output
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 23, 2024
1 parent a6a087d commit 045e1f3
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,42 +42,38 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):
self.rank = self.algorithm.ranks[self.rank_id]
self.pending_steal_request = False

# Initialize current cluster (if a rank is currently working through a cluster)
self.current_cluster = None

# Output initial information
initial_work = self.__get_total_work()
self.__logger.info(f" Rank {self.rank_id}: total initial work={initial_work}, initial size={len(self.algorithm.rank_queues[self.rank_id])}")
self.__logger.info(f" Rank {self.rank_id} Initial Info: work={self.__get_total_work()}, n_tasks={self.rank.get_number_of_objects()}, n_clusters={self.rank.get_number_of_shared_blocks()}")

def run(self):
"""Defines the process that will run within the simpy environment."""
while self.algorithm.any_ranks_have_work() if self.algorithm.do_stealing else self.__has_work():
if self.__has_work():
# Get next object in queue
item = self.algorithm.rank_queues[self.rank_id].popleft()

# Execute task if it's an Object
if isinstance(item, Object):
# Check if rank is currently executing a cluster
if self.current_cluster is not None:

# Execute all tasks on the cluster
for task in self.current_cluster:
self.env.process(self.__simulate_task(task))

# If there is a shared block, find all other objects with that id and bring them to the front of the queue
sb_id = item.get_shared_block_id()
if sb_id is not None:
all_tasks_in_cluster = []
for task in self.algorithm.rank_queues[self.rank_id]:
if isinstance(task, Object) and task.get_shared_block_id() == sb_id:
all_tasks_in_cluster.append(task)
for task in all_tasks_in_cluster:
self.algorithm.rank_queues[self.rank_id].remove(task)
self.algorithm.rank_queues[self.rank_id].appendleft(task) # Move all tasks to the front of the queue
# After each task, check if there is a steal request (to prevent hang ups)
self.__check_for_steal_requests()

# Then execute the task
yield self.env.process(self.__simulate_task(item))
# Once all tasks are executed, reset the current cluster
self.current_cluster = None

# If item is a cluster, break into objects and move to front of the queue
elif isinstance(item, list):
for o in item:
self.algorithm.rank_queues[self.rank_id].appendleft(o)
# Otherwise, check if there is any other work
elif self.__has_work():

# Then execute the current task
task = self.algorithm.rank_queues[self.rank_id].popleft()
yield self.env.process(self.__simulate_task(task))
# Pop next object in queue
item = self.algorithm.rank_queues[self.rank_id].popleft()

# If item is a cluster, move to self.current_cluster (will be executed next)
if isinstance(item, list):
self.current_cluster = item

# If item is a StealRequest, look for clusters to give up
elif isinstance(item, StealRequest):
Expand Down Expand Up @@ -122,13 +118,22 @@ def __has_work(self):
"""Returns True if the rank has an object or cluster in its queue."""
return self.algorithm.has_work(self.rank)

def __check_for_steal_requests(self):
"""Checks next item in queue; if it's a steal request, responds accordingly."""
rank_queue = self.algorithm.rank_queues[self.rank_id]
if len(rank_queue) > 0 and isinstance(rank_queue[0], StealRequest):
self.algorithm.respond_to_steal_request(rank_queue[0])
yield self.env.timeout((self.algorithm.steal_time))

def __simulate_task(self, task: Object):
"""Simulates the execution of a task"""
self.algorithm.increment_task_count()
num_steal_reqs = []
queue_sizes = []
for i in range(self.algorithm.num_ranks):
queue_sizes.append(len(self.algorithm.rank_queues[i]))
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes}")
queue_sizes.append(sum(isinstance(elm, list) for elm in self.algorithm.rank_queues[i]))
num_steal_reqs.append(sum(isinstance(elm, StealRequest) for elm in self.algorithm.rank_queues[i]))
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes} steal requests = {num_steal_reqs}")
yield self.env.timeout(task.get_load())


Expand Down Expand Up @@ -231,7 +236,7 @@ def __has_stealable_cluster(self, rank):
if isinstance(self.rank_queues[rank.get_id()][-1], list):

# Make sure that the rank will not then be empty (prevent stealing back and forth)
if len(self.rank_queues[rank.get_id()]) > 1:
if sum(isinstance(elm, list) for elm in self.rank_queues[rank.get_id()]) > 1:
stealable = True

return stealable
Expand All @@ -244,7 +249,7 @@ def respond_to_steal_request(self, steal_request: StealRequest):
r_target = steal_request.get_target_rank()

# set false as we are responding, either by putting work or doing nothing
self.workers[r_target.get_id()].pending_steal_request = False
self.workers[r_requesting.get_id()].pending_steal_request = False

# Check that r_target has a cluster to steal
if self.__has_stealable_cluster(r_target):
Expand All @@ -269,7 +274,7 @@ def get_total_task_count(self):

def has_work(self, rank):
"""Determines if a given rank has an object or cluster in its deque."""
return any(isinstance(item, (Object, list)) for item in self.rank_queues[rank.get_id()])
return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()])

def any_ranks_have_work(self):
"""Determines if any rank has an object, cluster, or StealRequest in its deque."""
Expand All @@ -286,8 +291,11 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
# Run over multiple experiments
for exp in range(self.__num_experiments):

# Reset algorithm (re-initialize counts and queues)
self.__reset()

# Print out current experiment
self.__logger.info(f"Experiment {exp}")
self.__logger.info(f"Experiment {exp} ({self.get_total_task_count()} tasks)")

# Set up problem
random.seed()
Expand All @@ -309,8 +317,5 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
self.__logger.info(f" {self.__steal_count} steals ({self.__attempted_steal_count} attempted).")
experiment_times.append(end_time)

# Reset algorithm (re-initialize counts and queues)
self.__reset()

# Report average time for all experiments
self.__logger.info(f"Average time: {sum(experiment_times)/len(experiment_times)}")

0 comments on commit 045e1f3

Please sign in to comment.