Skip to content

Commit

Permalink
Allow hooking job failure for generic error handling
Browse files Browse the repository at this point in the history
Closes #179
Closes #204
NeoLegends committed Aug 19, 2024
1 parent 4ca86d2 commit 1f08daa
Showing 2 changed files with 34 additions and 1 deletion.
20 changes: 20 additions & 0 deletions sisyphus/global_settings.py
Original file line number Diff line number Diff line change
@@ -60,6 +60,26 @@ def worker_wrapper(job, task_name, call):
return call


def on_job_failure(job):
"""
Job failure hook.
Can be used for generic job-independent error monitoring, handling or retry
logic.
Sispyhus will call this function w/ the job instance if the job enters the
failure state. The callback itself is then responsible for any retry logic,
realized by e.g. analyzing the job log file and removing error files in the job
directory as needed.
Do:
- use with caution
- ensure you don't build infinite retry loops
- limit to specific use cases (e.g. local disk full, GPU broken, etc.)
"""
pass


def update_engine_rqmt(last_rqmt: Dict, last_usage: Dict):
"""Update requirements after a job got interrupted, double limits if needed
15 changes: 14 additions & 1 deletion sisyphus/manager.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
import sys
import threading
import time
from typing import TYPE_CHECKING, Dict, List
import warnings

from multiprocessing.pool import ThreadPool
@@ -15,6 +16,9 @@
from sisyphus.tools import finished_results_cache
import sisyphus.global_settings as gs

if TYPE_CHECKING:
from sisyphus.job import Job


class JobCleaner(threading.Thread):
"""Thread to scan all jobs and clean if needed"""
@@ -572,6 +576,12 @@ def maybe_clear_state(state, always_clear, action):
self.job_cleaner.start()
return True

def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]):
prev_jobs = set(prev_jobs.get(gs.STATE_ERROR, []))
for job in cur_jobs.get(gs.STATE_ERROR, []):
if job not in prev_jobs:
gs.on_job_failure(job)

@tools.default_handle_exception_interrupt_main_thread
def run(self):
if not self.startup():
@@ -593,7 +603,9 @@ def run(self):
self.check_output(write_output=self.link_outputs)

config_manager.continue_readers()
self.update_jobs()

prev_jobs = self.jobs
cur_jobs = self.update_jobs()

if gs.CLEAR_ERROR or self.clear_errors_once:
self.clear_errors_once = False
@@ -619,6 +631,7 @@ def run(self):
self.setup_holded_jobs()
self.resume_jobs()
self.run_jobs()
self.handle_job_failure(prev_jobs, cur_jobs)

# Stop config reader
config_manager.cancel_all_reader()

0 comments on commit 1f08daa

Please sign in to comment.