Skip to content

Commit

Permalink
#501: small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 23, 2024
1 parent d4219df commit 955d787
Showing 1 changed file with 12 additions and 19 deletions.
31 changes: 12 additions & 19 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def run(self):
self.__logger.error(f"Received some other datatype: {type(item)}")

# If no work is available, try to request a steal from a random rank
elif self.algorithm.do_stealing and not self.pending_steal_request:
elif self.algorithm.do_stealing and self.algorithm.any_ranks_have_stealable_work() and not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
Expand All @@ -98,7 +98,7 @@ def run(self):
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
self.pending_steal_request = True
self.__logger.info(f" Rank {self.rank_id} wants to steal from Rank {target_rank_id}")
self.__logger.info(f" Rank {self.rank_id} requests steal from Rank {target_rank_id}")
yield self.env.timeout(self.algorithm.steal_time) # double counting steal time here (line 81)

else:
Expand Down Expand Up @@ -200,32 +200,25 @@ def __init__(
self.__experiment_times = []

Check warning on line 200 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `WorkStealingAlgorithm.__experiment_times` (unused-private-member)


def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict:
def __build_rank_clusters(self, rank: Rank) -> dict:
"""Cluster migratable objects by shared block ID when available."""
# Iterate over all migratable objects on rank
clusters = {None: []} if with_nullset else {}
non_clustered_tasks = []
clusters = {}
for o in rank.get_migratable_objects():
# Retrieve shared block ID and skip object without one
sb = o.get_shared_block()
if sb is None:
non_clustered_tasks.append(o)
self.__total_task_count += 1

else:
# Add current object to its block ID cluster
clusters.setdefault(sb.get_id(), []).append(o)
self.__total_task_count += 1
# Add current object to its block ID cluster
clusters.setdefault(sb.get_id(), []).append(o)
self.__total_task_count += 1

# Return dict of computed object clusters possibly randomized
return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))}, non_clustered_tasks
return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))}

def __initialize_rank_queues(self):
"""Populates every rank's deque with all initial clusters."""
for r in self.ranks.values():
rank_clusters, loose_tasks = self.__build_rank_clusters(r, False)
rank_clusters = self.__build_rank_clusters(r)
self.rank_queues[r.get_id()] = deque(cluster for cluster in rank_clusters.values())
self.rank_queues[r.get_id()].extend(o for o in loose_tasks)

def __reset(self):
"""Resets the algorithm for repeated experiments."""
Expand Down Expand Up @@ -257,9 +250,6 @@ def respond_to_steal_request(self, steal_request: StealRequest):
r_requesting = steal_request.get_requesting_rank()
r_target = steal_request.get_target_rank()

# set false as we are responding, either by putting work or doing nothing
self.workers[r_requesting.get_id()].pending_steal_request = False

# Double check that r_target still has a cluster to steal
if self.has_stealable_cluster(r_target):

Expand All @@ -272,6 +262,9 @@ def respond_to_steal_request(self, steal_request: StealRequest):
else:
self.__logger.info(f" Ignoring steal request from {r_requesting.get_id()} ({r_target.get_id()} has no stealable clusters) ")

# set false as we are responding, either by putting work or doing nothing
self.workers[r_requesting.get_id()].pending_steal_request = False

def get_task_count(self):
"""Returns number of tasks that have been simulated."""
return self.__task_count
Expand Down

0 comments on commit 955d787

Please sign in to comment.