Skip to content

Commit

Permalink
add more debugging, don't all more steals to happen until they are re…
Browse files Browse the repository at this point in the history
…sponded to
  • Loading branch information
lifflander committed Feb 23, 2024
1 parent 5df91da commit a6a087d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
1 change: 1 addition & 0 deletions config/user-defined-memory-toy-problem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from_data:
phase_ids:
- 0
check_schema: false
overwrite_validator: false

# Specify work model
work_model:
Expand Down
1 change: 1 addition & 0 deletions config/work-stealing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from_data:
phase_ids:
- 0
check_schema: false
overwrite_validator: false

# Specify work model
work_model:
Expand Down
36 changes: 25 additions & 11 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):
# Initialize rank information
self.rank_id = rank_id
self.rank = self.algorithm.ranks[self.rank_id]
self.pending_steal_request = False

# Output initial information
initial_work = self.__get_total_work()
Expand Down Expand Up @@ -89,14 +90,20 @@ def run(self):

# If no work available, request a steal from a random rank
else:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_work(target_rank):
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
yield self.env.timeout(self.algorithm.steal_time)

if not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
if self.algorithm.has_work(target_rank):
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
yield self.env.timeout(self.algorithm.steal_time)
else:
# this rank is awaiting the fulfillment of a steal request
# and can not proceed until it gets a response
pass

def __get_total_work(self):
"""Returns the total work on the rank."""
Expand All @@ -118,7 +125,10 @@ def __has_work(self):
def __simulate_task(self, task: Object):
"""Simulates the execution of a task"""
self.algorithm.increment_task_count()
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()})")
queue_sizes = []
for i in range(self.algorithm.num_ranks):
queue_sizes.append(len(self.algorithm.rank_queues[i]))
self.__logger.info(f" Rank {self.rank_id}: executing task {task.get_id()} (sb_id {task.get_shared_block_id()}, load {task.get_load()}) at time {self.env.now} ({self.algorithm.get_task_count()}/{self.algorithm.get_total_task_count()}) queue sizes={queue_sizes}")
yield self.env.timeout(task.get_load())


Expand Down Expand Up @@ -157,6 +167,7 @@ def __init__(
self.ranks = {}
self.num_ranks = 0
self.rank_queues = {}
self.workers = []

# Initialize task and steal counts
self.__task_count = 0
Expand Down Expand Up @@ -232,6 +243,9 @@ def respond_to_steal_request(self, steal_request: StealRequest):
r_requesting = steal_request.get_requesting_rank()
r_target = steal_request.get_target_rank()

# set false as we are responding, either by putting work or doing nothing
self.workers[r_target.get_id()].pending_steal_request = False

# Check that r_target has a cluster to steal
if self.__has_stealable_cluster(r_target):

Expand Down Expand Up @@ -277,14 +291,14 @@ def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict

# Set up problem
random.seed()
workers = []
self.workers = []

# Create simpy environment
env = simpy.Environment()

# Instantiate RankWorkers
for i in range(self.num_ranks):
workers.append(RankWorker(env, i, self, self.__logger))
self.workers.append(RankWorker(env, i, self, self.__logger))

# Run the environment
env.run()
Expand Down

0 comments on commit a6a087d

Please sign in to comment.