Skip to content

Commit

Permalink
#501: get working algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 26, 2024
1 parent 955d787 commit 1c211a6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 21 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ scikit-learn==1.0.2
colorama==0.4.6
matplotlib==3.5.3;python_version == '3.8'
matplotlib==3.6.2;python_version == '3.9'
simpy==4.1.1

# LBAF testing
tox==4.6.0
Expand Down
45 changes: 24 additions & 21 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):

def run(self):
"""Defines the process that will run within the simpy environment."""
# Continue if the rank has clusters left. If stealing is on, also continue if any other ranks have stealable clusters
# Continue if the rank has clusters left. If stealing is on, also continue if any other ranks have stealable clusters.
while self.__continue_condition():

# Check if rank is currently executing a cluster
if self.current_cluster is not None:

# Execute all tasks on the cluster
for task in self.current_cluster:
self.env.process(self.__simulate_task(task))
yield self.env.process(self.__simulate_task(task))

# After each task, check if there is a steal request (to prevent hang ups)
# After each task, check if there is a steal request (to prevent hang ups) -- TODO: is this necessary
if self.algorithm.do_stealing:
self.__check_for_steal_requests()

Expand All @@ -89,27 +89,27 @@ def run(self):
self.__logger.error(f"Received some other datatype: {type(item)}")

# If no work is available, try to request a steal from a random rank
elif self.algorithm.do_stealing and self.algorithm.any_ranks_have_stealable_work() and not self.pending_steal_request:
elif self.algorithm.do_stealing and not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_stealable_cluster(target_rank):
self.algorithm.iterate_attempted_steals()
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
self.pending_steal_request = True
self.__logger.info(f" Rank {self.rank_id} requests steal from Rank {target_rank_id}")
yield self.env.timeout(self.algorithm.steal_time) # double counting steal time here (line 81)
yield self.env.timeout(self.algorithm.steal_time) # TODO: are we double counting steal time here (see line 81)

Check warning on line 102 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: are we double counting steal time here (see line 81) (fixme)

else:
# this rank is awaiting the fulfillment of a steal request
# and can not proceed until it gets a response
pass
yield self.env.timeout(0.01) # need to yield here -- but for how long?

def __continue_condition(self):
"""Continue if the rank has clusters in its queue. If stealing is on, also continue if any other ranks have stealable clusters."""
if self.algorithm.do_stealing:
condition = self.__has_work() or self.algorithm.any_ranks_have_stealable_work()
condition = self.__has_work() or (self.algorithm.any_ranks_have_stealable_work())
else:
condition = self.__has_work()
return condition
Expand All @@ -131,7 +131,6 @@ def __check_for_steal_requests(self):
"""Checks next item in queue; if it's a steal request, responds accordingly."""
rank_queue = self.algorithm.rank_queues[self.rank_id]
if len(rank_queue) > 0 and isinstance(rank_queue[0], StealRequest):
print(f"Rank {self.rank_id} found a steal request while working through cluster")
steal_request = rank_queue.popleft()
self.algorithm.respond_to_steal_request(steal_request)
yield self.env.timeout((self.algorithm.steal_time))
Expand All @@ -140,12 +139,14 @@ def __simulate_task(self, task: Object):
"""Simulates the execution of a task."""
self.algorithm.increment_task_count()
num_steal_reqs = []

Check warning on line 141 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused variable 'num_steal_reqs' (unused-variable)
queue_sizes = []
for i in range(self.algorithm.num_ranks):
queue_sizes.append(sum(isinstance(elm, list) for elm in self.algorithm.rank_queues[i]))
num_steal_reqs.append(sum(isinstance(elm, StealRequest) for elm in self.algorithm.rank_queues[i]))
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes} steal requests = {num_steal_reqs}")
yield self.env.timeout(task.get_load())
# queue_sizes = []
# for i in range(self.algorithm.num_ranks):
# queue_sizes.append(sum(isinstance(elm, list) for elm in self.algorithm.rank_queues[i]))
# num_steal_reqs.append(sum(isinstance(elm, StealRequest) for elm in self.algorithm.rank_queues[i]))
task_time = task.get_load()
# self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task_time}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes} steal requests = {num_steal_reqs}")
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task_time}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})")
yield self.env.timeout(task_time)


#################################################
Expand Down Expand Up @@ -232,21 +233,24 @@ def __reset(self):
self.num_ranks = len(self.ranks)
self.__initialize_rank_queues()

def iterate_attempted_steals(self):
"""Increases number of attempted steals by one."""
self.__attempted_steal_count += 1

def has_stealable_cluster(self, rank):
"""Asserts that a given rank has a stealable cluster."""
"""Asserts that a given rank has a stealable cluster at the back of its deque."""
stealable = False
rank_queue = self.rank_queues[rank.get_id()]

# Make sure rank has at least two clusters in its queue (this prevents passing the last cluster around forever)
if self.has_work_in_deque(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1:
# Make sure rank has at least two clusters in its queue
if self.has_work_in_deque(rank) and isinstance(rank_queue[-1], list):
stealable = True

return stealable

def respond_to_steal_request(self, steal_request: StealRequest):
'''Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue.'''
# Get both ranks
self.__attempted_steal_count += 1
r_requesting = steal_request.get_requesting_rank()
r_target = steal_request.get_target_rank()

Expand Down Expand Up @@ -321,8 +325,7 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict

# Report elapsed time and steals
end_time = env.now
self.__logger.info(f" simulation finished at time {end_time}")
self.__logger.info(f" {self.__steal_count} steals ({self.__attempted_steal_count} attempted).")
self.__logger.info(f" simulation finished at time {end_time} ({self.__steal_count}/{self.__attempted_steal_count} steals completed)")
experiment_times.append(end_time)

# Report average time for all experiments
Expand Down

0 comments on commit 1c211a6

Please sign in to comment.