diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f2de78..5fdb0eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - The memory_allocation_type in the slurm engine now default to per_node instead of per_cpu for consistency with other engines +- Improved the scheduling approach of the LocalEngine ### Added - Support for async workflow definitions. If await tk.async_run(obj) is called sisyphus will wait until all Path objects inside of obj are available diff --git a/sisyphus/localengine.py b/sisyphus/localengine.py index 7de9a42..a754d47 100644 --- a/sisyphus/localengine.py +++ b/sisyphus/localengine.py @@ -99,7 +99,7 @@ def start_engine(self): if self.started: return # control input - self.input_queue = queue.Queue() + self.runnable_tasks = sync_object([]) self.waiting_tasks = sync_object({}) # control output / which tasks are currently running @@ -194,21 +194,20 @@ def release_resources(self, rqmt, selected_devices): @tools.default_handle_exception_interrupt_main_thread def run(self): - next_task = None try: while self.running.value: self.check_finished_tasks() wait = True # wait if no new job is started - # get next task - logging.debug("Check for new task (Free resources %s)" % self.free_resources) - with self.waiting_tasks as waiting_tasks: # get object for synchronisation - if next_task is None and not self.input_queue.empty(): - next_task = self.input_queue.get() - logging.debug("Found new task: %s" % str(next_task)) + # check runnable tasks + logging.debug("Check for new tasks (Free resources %s)" % self.free_resources) + # get object for synchronisation + with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks: + runnable_task_idx = 0 # run next task if the capacities are available - if next_task is not None: + while runnable_task_idx < len(runnable_tasks): + next_task = runnable_tasks[runnable_task_idx] with self.running_tasks as running_tasks: # if enough free resources => run job if self.enough_free_resources(next_task.rqmt): @@ -225,8 +224,10 @@ def run(self): # Start job: process = self.start_task(next_task, selected_gpus) running_tasks[name] = (process, next_task, selected_gpus) - next_task = None + del runnable_tasks[runnable_task_idx] wait = False + else: + runnable_task_idx += 1 if wait: # check only once per second for new jobs @@ -255,8 +256,8 @@ def submit_call(self, call, logpath, rqmt, name, task_name, task_ids): call_with_id += ["--redirect_output"] task = TaskQueueInstance(call_with_id, logpath, rqmt, name, task_name, task_id) - with self.waiting_tasks as waiting_tasks: - self.input_queue.put(task) + with self.waiting_tasks as waiting_tasks, self.runnable_tasks as runnable_tasks: + runnable_tasks.append(task) waiting_tasks[(name, task_id)] = task return ENGINE_NAME, socket.gethostname()