Skip to content

Commit

Permalink
#501: get working WorkStealing algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 22, 2024
1 parent a60b9ca commit 9ec69aa
Showing 1 changed file with 43 additions and 29 deletions.
72 changes: 43 additions & 29 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
##################################################


class StealRequest():
class StealRequest:
def __init__(self, r_snd: Rank, r_rcv: Rank):
"""Creates a steal request where r_snd steals a cluster from r_rcv"""
self.__r_snd = r_snd
Expand All @@ -26,7 +26,7 @@ def get_target_rank(self):
return self.__r_rcv


class RankWorker(object):
class RankWorker:
def __init__(self, env, rank_id, algorithm, lgr: Logger):
"""Class that handles all transfers, steals, and executions of tasks on a rank."""
# Set up simpy environment
Expand All @@ -39,13 +39,14 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):

# Initialize rank information
self.rank_id = rank_id
self.rank = self.algorithm.ranks[self.rank_id]

# Output initial information
initial_work = self.__get_total_work()
self.__logger.info(f"id={self.rank_id}: total initial work={initial_work}, initial size={len(self.algorithm.rank_queues[self.rank_id])}")
self.__logger.info(f"Rank {self.rank_id}: total initial work={initial_work}, initial size={len(self.algorithm.rank_queues[self.rank_id])}")

def run(self):
"""Defines the process that the simpy environment will run."""
"""Defines the process that will run within the simpy environment."""
while self.algorithm.any_ranks_have_work() if self.algorithm.do_stealing else self.__has_work():
if self.__has_work():
# Get next object in queue
Expand All @@ -57,20 +58,21 @@ def run(self):
# If there is a shared block, find all other objects with that id and bring them to the front of the queue
sb_id = item.get_shared_block_id()
if sb_id is not None:
all_objs_in_cluster = []
for o in self.algorithm.rank_queues[self.rank_id]:
if isinstance(o, Object) and o.get_shared_block_id() == sb_id:
all_objs_in_cluster.append(o)
for o in all_objs_in_cluster:
self.algorithm.rank_queues[self.rank_id].insert(0, o) # Move all objects to the front of the queue
all_tasks_in_cluster = []
for task in self.algorithm.rank_queues[self.rank_id]:
if isinstance(task, Object) and task.get_shared_block_id() == sb_id:
all_tasks_in_cluster.append(task)
for task in all_tasks_in_cluster:
self.algorithm.rank_queues[self.rank_id].remove(task)
self.algorithm.rank_queues[self.rank_id].appendleft(task) # Move all tasks to the front of the queue

# Then execute the task
yield self.env.process(self.__simulate_task(item))

# If item is a cluster, break into objects and move to front of the queue
elif isinstance(item, list):
for o in item:
self.algorithm.rank_queues[self.rank_id].insert(0,o)
self.algorithm.rank_queues[self.rank_id].appendleft(o)

# Then execute the current task
task = self.algorithm.rank_queues[self.rank_id].popleft()
Expand All @@ -79,7 +81,6 @@ def run(self):
# If item is a StealRequest, look for clusters to give up
elif isinstance(item, StealRequest):
self.algorithm.respond_to_steal_request(item)
print(f"Rank {self.rank_id} responded to steal request")
yield self.env.timeout(self.algorithm.steal_time) # is self.algorithm.steal_time right here?

# Catch any errors
Expand All @@ -89,7 +90,7 @@ def run(self):
# If no work available, request a steal from a random rank
else:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.algorithm.ranks[self.rank_id]
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_work(target_rank):
steal_request = StealRequest(requesting_rank, target_rank)
Expand All @@ -111,13 +112,13 @@ def __get_total_work(self):
return total_work

def __has_work(self):
"""Returns True if the rank has an object, cluster, or StealRequest in its queue."""
return len(self.algorithm.rank_queues[self.rank_id]) > 0
"""Returns True if the rank has an object or cluster in its queue."""
return self.algorithm.has_work(self.rank)

def __simulate_task(self, task: Object):
"""Simulates the execution of a task"""
self.algorithm.increment_task_count()
self.__logger.info(f"Rank {self.rank_id}: executing task {task.get_id()} (load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})")
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})")
yield self.env.timeout(task.get_load())


Expand Down Expand Up @@ -195,6 +196,16 @@ def __initialize_rank_queues(self):
self.rank_queues[r.get_id()] = deque(cluster for cluster in rank_clusters.values())
self.rank_queues[r.get_id()].extend(o for o in loose_tasks)

def __reset(self):
"""Resets the algorithm for repeated experiments."""
# Initialize algorithm
self.__task_count = 0
self.__total_task_count = 0
ranks_list = self._rebalanced_phase.get_ranks()
self.ranks = {rank.get_id(): rank for rank in ranks_list}
self.num_ranks = len(self.ranks)
self.__initialize_rank_queues()

def respond_to_steal_request(self, steal_request: StealRequest):
'''Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue.'''
# Get both ranks
Expand Down Expand Up @@ -222,41 +233,44 @@ def get_total_task_count(self):
return self.__total_task_count

def has_work(self, rank):
"""Determines if a given rank has an object, cluster, or StealRequest in its deque."""
return len(self.rank_queues[rank.get_id()]) > 0
"""Determines if a given rank has an object or cluster in its deque."""
return any(isinstance(item, (Object, list)) for item in self.rank_queues[rank.get_id()])

def any_ranks_have_work(self):
"""Determines if any rank has an object, cluster, or StealRequest in its deque."""
for r in self.ranks.values():
if self.has_work(r):
return True
return False
return any(self.has_work(r) for r in self.ranks.values())

def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max):
"""Performs the simulation and returns the average time to complete all tasks."""
# Initialize algorithm
# Use initalize from AlgorithmBase
self._initialize(p_id, phases, distributions, statistics)
ranks_list = self._rebalanced_phase.get_ranks()
self.ranks = {rank.get_id(): rank for rank in ranks_list}
self.num_ranks = len(self.ranks)
self.__initialize_rank_queues()

# Save time for every experiment
experiment_times = []

# Run over multiple experiments
for exp in range(self.__num_experiments):

Check warning on line 252 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused variable 'exp' (unused-variable)
random.seed()

random.seed()
workers = []

# Create simpy environment
env = simpy.Environment()

# Instantiate RankWorkers
for i in range(self.num_ranks):
workers.append(RankWorker(env, i, self, self.__logger))

# Run the environment
env.run()

# Report elapsed time
end_time = env.now
self.__logger.info(f"simulation finished at time {end_time}")

experiment_times.append(end_time)

# Reset algorithm (re-initialize counts and queues)
self.__reset()

# Report average time for all experiments
self.__logger.info(f"Average time: {sum(experiment_times)/len(experiment_times)}")

0 comments on commit 9ec69aa

Please sign in to comment.