Skip to content

Commit

Permalink
#501: add task count to output
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 22, 2024
1 parent 511d4d3 commit a60b9ca
Showing 1 changed file with 32 additions and 10 deletions.
42 changes: 32 additions & 10 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ def run(self):
task = self.algorithm.rank_queues[self.rank_id].popleft()
yield self.env.process(self.__simulate_task(task))

# If item is a Steal Request, look for clusters to give up
# If item is a StealRequest, look for clusters to give up
elif isinstance(item, StealRequest):
self.algorithm.respond_to_steal_request(item)
print(f"Rank {self.rank_id} responded to steal request")
yield self.env.timeout(self.algorithm.steal_time) # is self.algorithm.steal_time right here?

# Catch any errors
Expand Down Expand Up @@ -115,12 +116,13 @@ def __has_work(self):

def __simulate_task(self, task: Object):
"""Simulates the execution of a task"""
self.__logger.info(f"Rank {self.rank_id}: executing task {task.get_id()} (load {task.get_load()}) at time {self.env.now}")
self.algorithm.increment_task_count()
self.__logger.info(f"Rank {self.rank_id}: executing task {task.get_id()} (load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})")
yield self.env.timeout(task.get_load())


#################################################
######## Algorithm class ########
######### Algorithm class #########
#################################################


Expand Down Expand Up @@ -155,6 +157,10 @@ def __init__(
self.num_ranks = 0
self.rank_queues = {}

# Initialize total task count
self.__task_count = 0
self.__total_task_count = 0

# Initialize the number of experiments and experiment times
self.__num_experiments = parameters.get("num_experiments", 10)
self.__experiment_times = []

Check warning on line 166 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `WorkStealingAlgorithm.__experiment_times` (unused-private-member)
Expand All @@ -163,26 +169,31 @@ def __init__(
self.do_stealing = parameters.get("do_stealing", True)

def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict:
"""Cluster migratiable objects by shared block ID when available."""
"""Cluster migratable objects by shared block ID when available."""
# Iterate over all migratable objects on rank
clusters = {None: []} if with_nullset else {}
non_clustered_tasks = []
for o in rank.get_migratable_objects():
# Retrieve shared block ID and skip object without one
sb = o.get_shared_block()
if sb is None:
continue
non_clustered_tasks.append(o)
self.__total_task_count += 1

# Add current object to its block ID cluster
clusters.setdefault(sb.get_id(), []).append(o)
else:
# Add current object to its block ID cluster
clusters.setdefault(sb.get_id(), []).append(o)
self.__total_task_count += 1

# Return dict of computed object clusters possibly randomized
return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))}
return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))}, non_clustered_tasks

def __initialize_rank_queues(self):
"""Populates every rank's deque with all initial clusters."""
for r in self.ranks.values():
rank_clusters = self.__build_rank_clusters(r, False)
rank_clusters, loose_tasks = self.__build_rank_clusters(r, False)
self.rank_queues[r.get_id()] = deque(cluster for cluster in rank_clusters.values())
self.rank_queues[r.get_id()].extend(o for o in loose_tasks)

def respond_to_steal_request(self, steal_request: StealRequest):
'''Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue.'''
Expand All @@ -198,6 +209,18 @@ def respond_to_steal_request(self, steal_request: StealRequest):
cluster = self.rank_queues[r_rcv.get_id()].pop()
self.rank_queues[r_snd.get_id()].append(cluster)

def get_task_count(self):
"""Returns number of tasks that have been simulated."""
return self.__task_count

def increment_task_count(self):
"""Increments the number of tasks that have been simulated."""
self.__task_count += 1

def get_total_task_count(self):
"""Returns the total number of tasks that need to be simualted."""
return self.__total_task_count

def has_work(self, rank):
"""Determines if a given rank has an object, cluster, or StealRequest in its deque."""
return len(self.rank_queues[rank.get_id()]) > 0
Expand Down Expand Up @@ -237,4 +260,3 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict
experiment_times.append(end_time)

self.__logger.info(f"Average time: {sum(experiment_times)/len(experiment_times)}")

0 comments on commit a60b9ca

Please sign in to comment.