diff --git a/sisyphus/manager.py b/sisyphus/manager.py index d3539bc..9faa8f8 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -211,6 +211,7 @@ def __init__( self.ui = ui self.interactive = interative self.interactive_always_skip = set() + self.update_out_lock = threading.Lock() self.stop_if_done = True self._stop_loop = False @@ -436,12 +437,13 @@ def f(job): self.thread_pool.map(f, self.jobs.get(gs.STATE_RUNNABLE, [])) def check_output(self, write_output=False, update_all_outputs=False, force_update=False): - targets = self.sis_graph.targets if update_all_outputs else self.sis_graph.active_targets - for target in targets: - target.update_requirements(write_output=write_output, force=force_update) - if target.is_done(): - target.run_when_done(write_output=write_output) - self.sis_graph.remove_from_active_targets(target) + with self.update_out_lock: + targets = self.sis_graph.targets if update_all_outputs else self.sis_graph.active_targets + for target in targets: + target.update_requirements(write_output=write_output, force=force_update) + if target.is_done(): + target.run_when_done(write_output=write_output) + self.sis_graph.remove_from_active_targets(target) def continue_manager_loop(self): # Stop loop flag is set @@ -482,7 +484,6 @@ def startup(self): config_manager.continue_readers() self.job_engine.reset_cache() - self.check_output(write_output=False, update_all_outputs=True) self.update_jobs() # Ensure at least one async reader head the chance to continue until he added his jobs to the list @@ -548,8 +549,11 @@ def maybe_clear_state(state, always_clear, action): self.print_state_overview(verbose=True) elif answer.lower() == "y": self.link_outputs = True - create_aliases(self.sis_graph.jobs()) - self.check_output(write_output=self.link_outputs, update_all_outputs=True, force_update=True) + self.thread_pool.apply_async(create_aliases, self.sis_graph.jobs()) + self.thread_pool.apply_async( + self.check_output, + kwds={"write_output": self.link_outputs, "update_all_outputs": True, "force_update": True}, + ) break elif answer.lower() == "u": self.link_outputs = True @@ -590,7 +594,6 @@ def run(self): if self.mem_profile: self.mem_profile.snapshot() self.job_engine.reset_cache() - self.check_output(write_output=self.link_outputs) config_manager.continue_readers() self.update_jobs() @@ -623,6 +626,8 @@ def run(self): for job in self.jobs.get(gs.STATE_ERROR, []): gs.on_job_failure(job) + self.check_output(write_output=self.link_outputs) + # Stop config reader config_manager.cancel_all_reader()