-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow hooking job failure for generic error handling #205
Changes from 1 commit
1f08daa
e496821
a86c4cc
15af67b
511d695
6de4ab0
01a7ca3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import sys | ||
import threading | ||
import time | ||
from typing import TYPE_CHECKING, Dict, List | ||
NeoLegends marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import warnings | ||
|
||
from multiprocessing.pool import ThreadPool | ||
|
@@ -15,6 +16,9 @@ | |
from sisyphus.tools import finished_results_cache | ||
import sisyphus.global_settings as gs | ||
|
||
if TYPE_CHECKING: | ||
from sisyphus.job import Job | ||
|
||
NeoLegends marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
class JobCleaner(threading.Thread): | ||
"""Thread to scan all jobs and clean if needed""" | ||
|
@@ -572,6 +576,12 @@ def maybe_clear_state(state, always_clear, action): | |
self.job_cleaner.start() | ||
return True | ||
|
||
def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a short doc string here as well. |
||
prev_jobs = set(prev_jobs.get(gs.STATE_ERROR, [])) | ||
for job in cur_jobs.get(gs.STATE_ERROR, []): | ||
if job not in prev_jobs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible that this line should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think overwriting that variable is just confusing and not what I intended. I've since changed the var names, so it should be clearer now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed that, but yes it's better to use a different name. |
||
gs.on_job_failure(job) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would you call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed, since this will loop around immediately when the processing is done. |
||
@tools.default_handle_exception_interrupt_main_thread | ||
def run(self): | ||
if not self.startup(): | ||
|
@@ -593,7 +603,9 @@ def run(self): | |
self.check_output(write_output=self.link_outputs) | ||
|
||
config_manager.continue_readers() | ||
self.update_jobs() | ||
|
||
NeoLegends marked this conversation as resolved.
Show resolved
Hide resolved
|
||
prev_jobs = self.jobs | ||
cur_jobs = self.update_jobs() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will this interact with the block directly below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is ok, the loop cycles around if jobs were cleared before any of the logic is run. |
||
|
||
if gs.CLEAR_ERROR or self.clear_errors_once: | ||
self.clear_errors_once = False | ||
|
@@ -619,6 +631,7 @@ def run(self): | |
self.setup_holded_jobs() | ||
self.resume_jobs() | ||
self.run_jobs() | ||
self.handle_job_failure(prev_jobs, cur_jobs) | ||
|
||
# Stop config reader | ||
config_manager.cancel_all_reader() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is called "error" everywhere else. cf. also "interrupted_resumable" and "interrupted_non_resumable" which I would also name failures but they are not handled here.
Maybe the function name should be renamed as well.