Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to deal with PBS jobs with no records #345

Merged
merged 4 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions balsam/platform/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .lsf_sched import LsfScheduler
from .pbs_sched import PBSScheduler
from .scheduler import (
DelayedSubmitFail,
SchedulerDeleteError,
SchedulerError,
SchedulerInterface,
Expand All @@ -22,4 +23,5 @@
"SchedulerSubmitError",
"SchedulerDeleteError",
"SchedulerNonZeroReturnCode",
"DelayedSubmitFail",
]
10 changes: 9 additions & 1 deletion balsam/platform/scheduler/pbs_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from balsam.util import parse_to_utc

from .scheduler import (
DelayedSubmitFail,
SchedulerBackfillWindow,
SchedulerJobLog,
SchedulerJobStatus,
SchedulerNonZeroReturnCode,
SubprocessSchedulerInterface,
scheduler_subproc,
)
Expand Down Expand Up @@ -320,7 +322,13 @@ def _parse_logs(scheduler_id: int, job_script_path: Optional[PathLike]) -> Sched
args += ["-x", "-f", "-F", "json"]
args += [str(scheduler_id)]
logger.info(f"_parse_logs issuing qstat: {str(args)}")
stdout = scheduler_subproc(args)
try:
stdout = scheduler_subproc(args)
except SchedulerNonZeroReturnCode as e:
if "Unknown Job Id" in str(e):
logger.warning(f"Batch Job {scheduler_id} not found in PBS")
raise DelayedSubmitFail
return SchedulerJobLog()
json_output = json.loads(stdout)
# logger.info(f"_parse_logs json_output: {json_output}")
if len(json_output["Jobs"]) == 0:
Expand Down
4 changes: 4 additions & 0 deletions balsam/platform/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class SchedulerNonZeroReturnCode(SchedulerError):
pass


class DelayedSubmitFail(SchedulerError):
pass


class SchedulerSubmitError(SchedulerError):
pass

Expand Down
21 changes: 14 additions & 7 deletions balsam/site/service/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Dict, List, Type

from balsam.platform.scheduler import (
DelayedSubmitFail,
SchedulerDeleteError,
SchedulerError,
SchedulerNonZeroReturnCode,
Expand Down Expand Up @@ -154,13 +155,19 @@ def run_cycle(self) -> None:
job.state = BatchJobState.finished
assert job.scheduler_id is not None
assert job.status_info is not None
job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None))
start_time = job_log.start_time
end_time = job_log.end_time
if start_time:
job.start_time = start_time
if end_time:
job.end_time = end_time
try:
job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None))

start_time = job_log.start_time
end_time = job_log.end_time
if start_time:
job.start_time = start_time
if end_time:
job.end_time = end_time

except DelayedSubmitFail:
job.state = BatchJobState.submit_failed

elif job.state != scheduler_jobs[job.scheduler_id].state:
job.state = scheduler_jobs[job.scheduler_id].state
logger.info(f"Job {job.id} (sched_id {job.scheduler_id}) advanced to state {job.state}")
Expand Down