Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#501: Implement work stealing simulator for memory-constrained problems #502

Draft
wants to merge 22 commits into
base: develop
Choose a base branch
from

Conversation

cwschilly
Copy link
Contributor

Fixes #501

@cwschilly cwschilly linked an issue Feb 21, 2024 that may be closed by this pull request
3 tasks
@cwschilly cwschilly marked this pull request as draft February 21, 2024 19:06
@lifflander
Copy link
Contributor

This is the sketch I wrote using simpy to work off of:

import simpy
import random
from collections import deque

do_stealing = True
num_workers = 4
workers = []
steal_time = 0.1
num_experiments = 1000

workers_work = [[5.0, 9.5, 8.2],[3.0],[9.5, 3.5],[3.1, 1.1, 5.5]]

def anyWorkersHaveWork():
    any_worker_has_work = False
    for w in workers:
        any_worker_has_work = any_worker_has_work or w.has_work()
    return any_worker_has_work
    
class RankWorker(object):
    def __init__(self, env, worker_id):
        self.env = env
        self.action = env.process(self.run())

        self.worker_id = worker_id
        self.work_deque = deque()

        total_work = 0.0
        for elm in workers_work[self.worker_id]:
            #print(f"id={self.worker_id}: work={elm}")
            self.work_deque.append(elm)
            total_work += elm

        print(f"id={self.worker_id}: total initial work={total_work}, initial size={len(self.work_deque)}")

    def run(self):
        while anyWorkersHaveWork() if do_stealing else self.has_work():
            if self.has_work():
                task_time = self.work_deque.pop()
                print(f"id={self.worker_id}: executing task {task_time} at time {self.env.now}")
                yield self.env.process(self.execute_task(task_time))
            else:
                random_worker = random.randrange(0, num_workers)
                if workers[random_worker].has_work():
                    self.work_deque.append(workers[random_worker].steal_work())
                yield self.env.timeout(steal_time)

    def has_work(self):
        return len(self.work_deque) > 0

    def steal_work(self):
        return self.work_deque.popleft()
            
    def execute_task(self, duration):
        yield self.env.timeout(duration)

experiment_times = []

for exp in range(num_experiments):
    random.seed()

    workers = []
    env = simpy.Environment()
        
    for i in range(num_workers):
        workers.append(RankWorker(env, i))

    env.run()

    end_time = env.now
    print(f"simulation finished at time {end_time}")

    experiment_times.append(end_time)

print(f"Average time: {sum(experiment_times)/len(experiment_times)}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement work stealing simulator for memory-constrained problems
2 participants