Skip to content

Commit

Permalink
#501: add option to sort clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 28, 2024
1 parent 8c4ee14 commit b8833ae
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 27 deletions.
1 change: 1 addition & 0 deletions config/work-stealing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ algorithm:
steal_time: 0.2
num_experiments: 10
max_memory_usage: 8.0e+9
sort_clusters: False

# Specify output
output_dir: ../output
Expand Down
66 changes: 41 additions & 25 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def run(self):
# Continue if the rank has work and memory left
while self.__continue_condition():

# Print current clusters memories
# cluster_size_list = []
# for cluster in self.algorithm.rank_queues[self.rank_id]:
# if isinstance(cluster, list):
# cluster_size_list.append(self.algorithm.calculate_cluster_memory_footprint(cluster))
# if len(cluster_size_list) > 0:
# print(f"Rank {self.rank_id}: {cluster_size_list}")

# Check if rank has a cluster lined up
if self.current_cluster is not None:

Expand All @@ -80,13 +88,19 @@ def run(self):
# TODO: how long?

Check warning on line 88 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: how long? (fixme)
yield self.env.timeout(self.algorithm.steal_time)

# Check if there is anything in queue
# Check for a StealRequest at the back of the queue
elif self.algorithm.has_steal_request_at_back_of_deque(self.rank):
steal_request = self.algorithm.rank_queues[self.rank_id].pop()
self.algorithm.respond_to_steal_request(steal_request)
yield self.env.timeout(self.algorithm.steal_time)

# Check for work in queue
elif self.algorithm.has_work_in_deque(self.rank, including_steals=True):

# Pop next object in queue
item = self.algorithm.rank_queues[self.rank_id].popleft()

# If item is a cluster, move to self.current_cluster (will be executed next)
# If item is a cluster, try to move to self.current_cluster (will be executed next)
if isinstance(item, list):

# Make sure there is memory on the rank to execute the cluster
Expand All @@ -109,11 +123,6 @@ def run(self):

yield self.env.timeout(0.01)

# If item is a StealRequest, look for clusters to give up
elif isinstance(item, StealRequest):
self.algorithm.respond_to_steal_request(item)
yield self.env.timeout(self.algorithm.steal_time)

# Catch any other datatypes
else:
self.__logger.error(f"Received some other datatype: {type(item)}")
Expand Down Expand Up @@ -149,7 +158,7 @@ def run(self):
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
idx = self.algorithm.get_index_of_first_non_steal_request(target_rank)
self.algorithm.rank_queues[target_rank_id].insert(idx, steal_request)
self.algorithm.rank_queues[target_rank_id].insert(idx + 1, steal_request)
self.pending_steal_request = True
yield self.env.timeout(self.algorithm.steal_time) # TODO: are we double counting steal time here? (see line 99)

Check warning on line 163 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: are we double counting steal time here? (see line 99) (fixme)
else:
Expand All @@ -172,7 +181,7 @@ def __continue_condition(self):
# Check if all ranks are done
if not continue_condition:
self.running = False
print(f"Rank {self.rank_id} is done running (memory {self.rank_memory}).")
# print(f"Rank {self.rank_id} is done running (memory {self.rank_memory}).")
any_worker_still_running = any(worker.running for worker in self.algorithm.workers)

# Throw error if all ranks are done but there is still work
Expand Down Expand Up @@ -200,10 +209,10 @@ def __has_memory(self):
return self.rank_memory < self.algorithm.max_memory_usage

def __check_for_steal_requests(self):
"""Checks next item in queue; if it's a steal request, responds accordingly."""
"""Checks item at the back of the queue; if it's a steal request, responds accordingly."""
rank_queue = self.algorithm.rank_queues[self.rank_id]
if len(rank_queue) > 0 and isinstance(rank_queue[0], StealRequest):
steal_request = rank_queue.popleft()
if len(rank_queue) > 0 and isinstance(rank_queue[-1], StealRequest):
steal_request = rank_queue.pop()
self.algorithm.respond_to_steal_request(steal_request)
yield self.env.timeout((self.algorithm.steal_time))

Expand Down Expand Up @@ -254,6 +263,7 @@ def __init__(
self.steal_time = parameters.get("steal_time", 0.1)
self.do_stealing = parameters.get("do_stealing", True)
self.max_memory_usage = parameters.get("max_memory_usage", 8.0e+9)
self.sort_clusters = parameters.get("sort_clusters", True)

# Initialize logger
self.__logger = lgr
Expand Down Expand Up @@ -285,14 +295,18 @@ def __build_rank_clusters(self, rank: Rank) -> dict:
clusters.setdefault(sb.get_id(), []).append(o)
self.__total_task_count += 1

# Return dict of computed object clusters possibly randomized
# Return randomized dict of computed object clusters
return {k: clusters[k] for k in random.sample(clusters.keys(), len(clusters))}

def __initialize_rank_queues(self):
"""Populates every rank's deque with all initial clusters."""
for r in self.ranks.values():
rank_clusters = self.__build_rank_clusters(r)
self.rank_queues[r.get_id()] = deque(cluster for cluster in rank_clusters.values())
if self.sort_clusters:
clusters = sorted(rank_clusters.values(), key=self.calculate_cluster_memory_footprint, reverse=True)
else:
clusters = [cluster for cluster in rank_clusters.values()]
self.rank_queues[r.get_id()] = deque(clusters)

def __reset(self):
"""Resets the algorithm for repeated experiments."""
Expand Down Expand Up @@ -329,6 +343,10 @@ def has_work_in_deque(self, rank, including_steals=False):
else:
return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()])

def has_steal_request_at_back_of_deque(self, rank):
"""Determines if a given rank's deque has a StealRequest at the back."""
return self.has_work_in_deque(rank, including_steals=True) and isinstance(self.rank_queues[rank.get_id()][-1], StealRequest)

def get_task_count(self):
"""Returns number of tasks that have been simulated."""
return self.__task_count
Expand All @@ -342,31 +360,29 @@ def get_total_task_count(self):
return self.__total_task_count

def get_index_of_first_non_steal_request(self, rank):
"""Returns the index of the first non-StealRequest in a rank's deque."""
"""Returns the index of the first non-StealRequest in a rank's deque (iterating from the back)."""
rank_queue = self.rank_queues[rank.get_id()]
for i in range(len(rank_queue)):
if isinstance(rank_queue[i], StealRequest):
continue
else:
for i in range(len(rank_queue) - 1, -1, -1):
if not isinstance(rank_queue[i], StealRequest):
return i

def iterate_attempted_steals(self):
"""Increases number of attempted steals by one."""
self.__attempted_steal_count += 1

def has_stealable_cluster(self, target_rank, requesting_rank):
"""Asserts that a given target_rank has a stealable cluster at the back of its deque.
"""Asserts that a given target_rank has a stealable cluster at the front of its deque.
Also checks that the requesting rank has enough memory for the steal."""
rank_queue = self.rank_queues[target_rank.get_id()]

# Make sure target_rank has a cluster at the back of its queue
has_cluster = self.has_work_in_deque(target_rank) and isinstance(rank_queue[-1], list)
has_cluster = self.has_work_in_deque(target_rank) and isinstance(rank_queue[0], list)

# Make sure the requesting rank has enough memory to execute the cluster
return has_cluster and self.rank_has_memory_for_cluster(requesting_rank, rank_queue[-1])
return has_cluster and self.rank_has_memory_for_cluster(requesting_rank, rank_queue[0])

def respond_to_steal_request(self, steal_request: StealRequest):
"""Resolves steal requests; if there is a cluster at the back of the receiving rank's queue, it is relocated to the sending rank's queue."""
"""Resolves steal requests; if there is a cluster at the front of the receiving rank's queue, it is relocated to the sending rank's queue."""
# Get both ranks
r_requesting = steal_request.get_requesting_rank()
r_target = steal_request.get_target_rank()
Expand All @@ -375,9 +391,9 @@ def respond_to_steal_request(self, steal_request: StealRequest):
if self.has_stealable_cluster(r_target, r_requesting):

# Perform steal
cluster = self.rank_queues[r_target.get_id()].pop()
cluster = self.rank_queues[r_target.get_id()].popleft()
self.__logger.info(f" Performing steal of shared block {cluster[0].get_shared_block_id()} (from {r_target.get_id()} to {r_requesting.get_id()})")
self.rank_queues[r_requesting.get_id()].append(cluster)
self.rank_queues[r_requesting.get_id()].appendleft(cluster)
self.__steal_count += 1

else:
Expand Down
7 changes: 5 additions & 2 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ def __init__(self, config_to_validate: dict, logger: Logger):
"parameters": {
"discretion_interval": float,
Optional("do_stealing"): And(
bool,
error="Should be of type 'bool'"),
bool,
error="Should be of type 'bool'"),
Optional("steal_time"): And(
float,
lambda x: x>=0.0,
Expand All @@ -195,6 +195,9 @@ def __init__(self, config_to_validate: dict, logger: Logger):
float,
lambda x: x>=0.0,
error="Should be of type 'float' and >= 0.0"),
Optional("sort_clusters"): And(
bool,
error="Should be of type 'bool'"),
Optional("num_experiments"): And(
int,
lambda x: x > 0,
Expand Down

0 comments on commit b8833ae

Please sign in to comment.