From 1f08daaf06ca3acdd76071ea63fe6794813fefd7 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 19 Aug 2024 11:09:21 -0400 Subject: [PATCH 1/7] Allow hooking job failure for generic error handling Closes #179 Closes #204 --- sisyphus/global_settings.py | 20 ++++++++++++++++++++ sisyphus/manager.py | 15 ++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/sisyphus/global_settings.py b/sisyphus/global_settings.py index 1bf2e25..7511c8a 100644 --- a/sisyphus/global_settings.py +++ b/sisyphus/global_settings.py @@ -60,6 +60,26 @@ def worker_wrapper(job, task_name, call): return call +def on_job_failure(job): + """ + Job failure hook. + + Can be used for generic job-independent error monitoring, handling or retry + logic. + + Sispyhus will call this function w/ the job instance if the job enters the + failure state. The callback itself is then responsible for any retry logic, + realized by e.g. analyzing the job log file and removing error files in the job + directory as needed. + + Do: + - use with caution + - ensure you don't build infinite retry loops + - limit to specific use cases (e.g. local disk full, GPU broken, etc.) + """ + pass + + def update_engine_rqmt(last_rqmt: Dict, last_usage: Dict): """Update requirements after a job got interrupted, double limits if needed diff --git a/sisyphus/manager.py b/sisyphus/manager.py index 749daf3..c8a2480 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -5,6 +5,7 @@ import sys import threading import time +from typing import TYPE_CHECKING, Dict, List import warnings from multiprocessing.pool import ThreadPool @@ -15,6 +16,9 @@ from sisyphus.tools import finished_results_cache import sisyphus.global_settings as gs +if TYPE_CHECKING: + from sisyphus.job import Job + class JobCleaner(threading.Thread): """Thread to scan all jobs and clean if needed""" @@ -572,6 +576,12 @@ def maybe_clear_state(state, always_clear, action): self.job_cleaner.start() return True + def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]): + prev_jobs = set(prev_jobs.get(gs.STATE_ERROR, [])) + for job in cur_jobs.get(gs.STATE_ERROR, []): + if job not in prev_jobs: + gs.on_job_failure(job) + @tools.default_handle_exception_interrupt_main_thread def run(self): if not self.startup(): @@ -593,7 +603,9 @@ def run(self): self.check_output(write_output=self.link_outputs) config_manager.continue_readers() - self.update_jobs() + + prev_jobs = self.jobs + cur_jobs = self.update_jobs() if gs.CLEAR_ERROR or self.clear_errors_once: self.clear_errors_once = False @@ -619,6 +631,7 @@ def run(self): self.setup_holded_jobs() self.resume_jobs() self.run_jobs() + self.handle_job_failure(prev_jobs, cur_jobs) # Stop config reader config_manager.cancel_all_reader() From e49682191b537055c1b9f9602b58d0168c394d33 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Tue, 20 Aug 2024 10:16:23 -0400 Subject: [PATCH 2/7] fix confusion due to overwritten variable --- sisyphus/manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sisyphus/manager.py b/sisyphus/manager.py index c8a2480..688956e 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -577,9 +577,9 @@ def maybe_clear_state(state, always_clear, action): return True def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]): - prev_jobs = set(prev_jobs.get(gs.STATE_ERROR, [])) + prev_errored_jobs = set(prev_jobs.get(gs.STATE_ERROR, [])) for job in cur_jobs.get(gs.STATE_ERROR, []): - if job not in prev_jobs: + if job not in prev_errored_jobs: gs.on_job_failure(job) @tools.default_handle_exception_interrupt_main_thread From a86c4cc55c04a4fe9efc9f61b527d7c2f38da977 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 23 Sep 2024 10:00:07 -0400 Subject: [PATCH 3/7] fix typings --- sisyphus/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sisyphus/manager.py b/sisyphus/manager.py index 688956e..fe10404 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -576,7 +576,7 @@ def maybe_clear_state(state, always_clear, action): self.job_cleaner.start() return True - def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]): + def handle_job_failure(self, prev_jobs: Dict[str, List["Job"]], cur_jobs: Dict[str, List["Job"]]): prev_errored_jobs = set(prev_jobs.get(gs.STATE_ERROR, [])) for job in cur_jobs.get(gs.STATE_ERROR, []): if job not in prev_errored_jobs: From 15af67bc2c90d5b7a35fc03487294349f20663de Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 23 Sep 2024 10:05:11 -0400 Subject: [PATCH 4/7] simplify implementation, make stateless --- sisyphus/manager.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sisyphus/manager.py b/sisyphus/manager.py index fe10404..f270512 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -576,12 +576,6 @@ def maybe_clear_state(state, always_clear, action): self.job_cleaner.start() return True - def handle_job_failure(self, prev_jobs: Dict[str, List["Job"]], cur_jobs: Dict[str, List["Job"]]): - prev_errored_jobs = set(prev_jobs.get(gs.STATE_ERROR, [])) - for job in cur_jobs.get(gs.STATE_ERROR, []): - if job not in prev_errored_jobs: - gs.on_job_failure(job) - @tools.default_handle_exception_interrupt_main_thread def run(self): if not self.startup(): @@ -604,8 +598,7 @@ def run(self): config_manager.continue_readers() - prev_jobs = self.jobs - cur_jobs = self.update_jobs() + self.update_jobs() if gs.CLEAR_ERROR or self.clear_errors_once: self.clear_errors_once = False @@ -631,7 +624,9 @@ def run(self): self.setup_holded_jobs() self.resume_jobs() self.run_jobs() - self.handle_job_failure(prev_jobs, cur_jobs) + + for job in self.jobs.get(gs.STATE_ERROR, []): + gs.on_job_failure(job) # Stop config reader config_manager.cancel_all_reader() From 511d695259a420700881549651cec72e13992af5 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 23 Sep 2024 10:12:03 -0400 Subject: [PATCH 5/7] better cso --- sisyphus/global_settings.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sisyphus/global_settings.py b/sisyphus/global_settings.py index 7511c8a..8d8173e 100644 --- a/sisyphus/global_settings.py +++ b/sisyphus/global_settings.py @@ -67,10 +67,15 @@ def on_job_failure(job): Can be used for generic job-independent error monitoring, handling or retry logic. - Sispyhus will call this function w/ the job instance if the job enters the - failure state. The callback itself is then responsible for any retry logic, - realized by e.g. analyzing the job log file and removing error files in the job - directory as needed. + Sispyhus will call this function w/ the job instance for any failed job. + + The callback itself is then responsible for any retry logic, realized by e.g. + analyzing the job log file and removing error files in the job directory as + needed. + + The callback needs to be stateless and indempotent, as it can be called multiple + times on the same job, especially if the job remains in the error state after the + callback has finished. Do: - use with caution From 6de4ab0e38a262c199d9ae89e367b98de4c378a4 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 23 Sep 2024 10:22:17 -0400 Subject: [PATCH 6/7] remove superfluous imports --- sisyphus/manager.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sisyphus/manager.py b/sisyphus/manager.py index f270512..0857bd2 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -5,7 +5,6 @@ import sys import threading import time -from typing import TYPE_CHECKING, Dict, List import warnings from multiprocessing.pool import ThreadPool @@ -16,9 +15,6 @@ from sisyphus.tools import finished_results_cache import sisyphus.global_settings as gs -if TYPE_CHECKING: - from sisyphus.job import Job - class JobCleaner(threading.Thread): """Thread to scan all jobs and clean if needed""" From 01a7ca3a1984af87d8b25d3e97835da813d6f92f Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 23 Sep 2024 16:23:22 +0200 Subject: [PATCH 7/7] Update sisyphus/manager.py Co-authored-by: michelwi --- sisyphus/manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sisyphus/manager.py b/sisyphus/manager.py index 0857bd2..d3539bc 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -593,7 +593,6 @@ def run(self): self.check_output(write_output=self.link_outputs) config_manager.continue_readers() - self.update_jobs() if gs.CLEAR_ERROR or self.clear_errors_once: