Skip to content

Commit

Permalink
Allow hooking job failure for generic error handling
Browse files Browse the repository at this point in the history
Closes #179
Closes #204
  • Loading branch information
NeoLegends committed Aug 19, 2024
1 parent 4ca86d2 commit ebc03d7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
25 changes: 24 additions & 1 deletion sisyphus/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@

import logging
import sys
from typing import Dict
from typing import TYPE_CHECKING, Dict

import sisyphus.hash
from sisyphus.global_constants import *

if TYPE_CHECKING:
from sisyphus.job import Job


def engine():
"""
Expand Down Expand Up @@ -60,6 +63,26 @@ def worker_wrapper(job, task_name, call):
return call


def on_job_failure(job: Job):
"""
Job failure hook.
Can be used for generic job-independent error monitoring, handling or retry
logic.
Sispyhus will call this function w/ the job instance if the job enters the
failure state. The callback itself is then responsible for any retry logic,
realized by e.g. analyzing the job log file and removing error files in the job
directory as needed.
Do:
- use with caution
- ensure you don't build infinite retry loops
- limit to specific use cases (e.g. local disk full, GPU broken, etc.)
"""
pass


def update_engine_rqmt(last_rqmt: Dict, last_usage: Dict):
"""Update requirements after a job got interrupted, double limits if needed
Expand Down
22 changes: 19 additions & 3 deletions sisyphus/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import threading
import time
from typing import TYPE_CHECKING, Dict, List
import warnings

from multiprocessing.pool import ThreadPool
Expand All @@ -15,6 +16,9 @@
from sisyphus.tools import finished_results_cache
import sisyphus.global_settings as gs

if TYPE_CHECKING:
from sisyphus.job import Job


class JobCleaner(threading.Thread):
"""Thread to scan all jobs and clean if needed"""
Expand Down Expand Up @@ -221,6 +225,10 @@ def __init__(

if gs.SHOW_JOB_TARGETS:
self.sis_graph.set_job_targets(job_engine)

self.jobs = None
self.prev_jobs = None

self.update_jobs()

# Disable parallel mode for now, seems buggy
Expand All @@ -238,8 +246,8 @@ def stop(self):

def update_jobs(self, skip_finished=True):
"""Return all jobs needed to finish output"""
self.jobs = jobs = self.sis_graph.get_jobs_by_status(engine=self.job_engine, skip_finished=skip_finished)
return jobs
self.jobs = new_jobs = self.sis_graph.get_jobs_by_status(engine=self.job_engine, skip_finished=skip_finished)
return new_jobs

def clear_states(self, state=gs.STATE_ERROR):
# List errors/ interrupts
Expand Down Expand Up @@ -572,6 +580,11 @@ def maybe_clear_state(state, always_clear, action):
self.job_cleaner.start()
return True

def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]):
for job in cur_jobs.get(gs.STATE_ERROR, []):
if not job in prev_jobs.get(gs.STATE_ERROR, []):
gs.on_job_failure(job)

@tools.default_handle_exception_interrupt_main_thread
def run(self):
if not self.startup():
Expand All @@ -593,7 +606,9 @@ def run(self):
self.check_output(write_output=self.link_outputs)

config_manager.continue_readers()
self.update_jobs()

prev_jobs = self.jobs
cur_jobs = self.update_jobs()

if gs.CLEAR_ERROR or self.clear_errors_once:
self.clear_errors_once = False
Expand All @@ -619,6 +634,7 @@ def run(self):
self.setup_holded_jobs()
self.resume_jobs()
self.run_jobs()
self.handle_job_failure(prev_jobs, cur_jobs)

# Stop config reader
config_manager.cancel_all_reader()
Expand Down

0 comments on commit ebc03d7

Please sign in to comment.