From ebc03d70e1fb58609e7a30886def570e32576c86 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Mon, 19 Aug 2024 11:09:21 -0400 Subject: [PATCH] Allow hooking job failure for generic error handling Closes #179 Closes #204 --- sisyphus/global_settings.py | 25 ++++++++++++++++++++++++- sisyphus/manager.py | 22 +++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/sisyphus/global_settings.py b/sisyphus/global_settings.py index 1bf2e25..7f74b10 100644 --- a/sisyphus/global_settings.py +++ b/sisyphus/global_settings.py @@ -6,11 +6,14 @@ import logging import sys -from typing import Dict +from typing import TYPE_CHECKING, Dict import sisyphus.hash from sisyphus.global_constants import * +if TYPE_CHECKING: + from sisyphus.job import Job + def engine(): """ @@ -60,6 +63,26 @@ def worker_wrapper(job, task_name, call): return call +def on_job_failure(job: Job): + """ + Job failure hook. + + Can be used for generic job-independent error monitoring, handling or retry + logic. + + Sispyhus will call this function w/ the job instance if the job enters the + failure state. The callback itself is then responsible for any retry logic, + realized by e.g. analyzing the job log file and removing error files in the job + directory as needed. + + Do: + - use with caution + - ensure you don't build infinite retry loops + - limit to specific use cases (e.g. local disk full, GPU broken, etc.) + """ + pass + + def update_engine_rqmt(last_rqmt: Dict, last_usage: Dict): """Update requirements after a job got interrupted, double limits if needed diff --git a/sisyphus/manager.py b/sisyphus/manager.py index 749daf3..36892b8 100644 --- a/sisyphus/manager.py +++ b/sisyphus/manager.py @@ -5,6 +5,7 @@ import sys import threading import time +from typing import TYPE_CHECKING, Dict, List import warnings from multiprocessing.pool import ThreadPool @@ -15,6 +16,9 @@ from sisyphus.tools import finished_results_cache import sisyphus.global_settings as gs +if TYPE_CHECKING: + from sisyphus.job import Job + class JobCleaner(threading.Thread): """Thread to scan all jobs and clean if needed""" @@ -221,6 +225,10 @@ def __init__( if gs.SHOW_JOB_TARGETS: self.sis_graph.set_job_targets(job_engine) + + self.jobs = None + self.prev_jobs = None + self.update_jobs() # Disable parallel mode for now, seems buggy @@ -238,8 +246,8 @@ def stop(self): def update_jobs(self, skip_finished=True): """Return all jobs needed to finish output""" - self.jobs = jobs = self.sis_graph.get_jobs_by_status(engine=self.job_engine, skip_finished=skip_finished) - return jobs + self.jobs = new_jobs = self.sis_graph.get_jobs_by_status(engine=self.job_engine, skip_finished=skip_finished) + return new_jobs def clear_states(self, state=gs.STATE_ERROR): # List errors/ interrupts @@ -572,6 +580,11 @@ def maybe_clear_state(state, always_clear, action): self.job_cleaner.start() return True + def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]): + for job in cur_jobs.get(gs.STATE_ERROR, []): + if not job in prev_jobs.get(gs.STATE_ERROR, []): + gs.on_job_failure(job) + @tools.default_handle_exception_interrupt_main_thread def run(self): if not self.startup(): @@ -593,7 +606,9 @@ def run(self): self.check_output(write_output=self.link_outputs) config_manager.continue_readers() - self.update_jobs() + + prev_jobs = self.jobs + cur_jobs = self.update_jobs() if gs.CLEAR_ERROR or self.clear_errors_once: self.clear_errors_once = False @@ -619,6 +634,7 @@ def run(self): self.setup_holded_jobs() self.resume_jobs() self.run_jobs() + self.handle_job_failure(prev_jobs, cur_jobs) # Stop config reader config_manager.cancel_all_reader()