Skip to content

Commit

Permalink
Have scheduler LSF driver not bhist jobs newer than POLL_PERIOD
Browse files Browse the repository at this point in the history
This commit makes lsf driver not bhist jobs submitted after previous polling. This fixes the issue where we would bhist for jobs recently submitted, because sometimes LSF takes a couple of milliseconds to process a submission before answering for it in bjobs.
  • Loading branch information
jonathan-eq authored and eivindjahren committed Apr 23, 2024
1 parent 998751b commit 0217bfd
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 10 deletions.
38 changes: 31 additions & 7 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
MutableMapping,
Optional,
Sequence,
Tuple,
Union,
get_args,
)
Expand Down Expand Up @@ -86,6 +85,12 @@ class _Stat(BaseModel):
jobs: Mapping[str, AnyJob]


class JobData(BaseModel):
iens: int
job_state: AnyJob
submitted_timestamp: float


def parse_bjobs(bjobs_output: str) -> Dict[str, Dict[str, Dict[str, str]]]:
data: Dict[str, Dict[str, str]] = {}
for line in bjobs_output.splitlines():
Expand Down Expand Up @@ -166,6 +171,16 @@ def parse_bhist(bhist_output: str) -> Dict[str, Dict[str, int]]:
return data


def filter_job_ids_on_submission_time(
jobs: MutableMapping[str, JobData], submitted_before: float
) -> set[str]:
return {
job_id
for job_id, job_data in jobs.items()
if submitted_before > job_data.submitted_timestamp
}


class LsfDriver(Driver):
def __init__(
self,
Expand All @@ -189,7 +204,7 @@ def __init__(
self._bjobs_cmd = Path(bjobs_cmd or shutil.which("bjobs") or "bjobs")
self._bkill_cmd = Path(bkill_cmd or shutil.which("bkill") or "bkill")

self._jobs: MutableMapping[str, Tuple[int, AnyJob]] = {}
self._jobs: MutableMapping[str, JobData] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._sleep_time_between_bkills = 30
Expand Down Expand Up @@ -261,7 +276,11 @@ async def submit(
(Path(runpath) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id] = (iens, QueuedJob(job_state="PEND"))
self._jobs[job_id] = JobData(
iens=iens,
job_state=QueuedJob(job_state="PEND"),
submitted_timestamp=time.time(),
)
self._iens2jobid[iens] = job_id

async def kill(self, iens: int) -> None:
Expand Down Expand Up @@ -319,8 +338,12 @@ async def poll(self) -> None:
)
bjobs_states = _Stat(**parse_bjobs(stdout.decode(errors="ignore")))

if missing_in_bjobs_output := set(current_jobids) - set(
bjobs_states.jobs.keys()
job_ids_found_in_bjobs_output = set(bjobs_states.jobs.keys())
if (
missing_in_bjobs_output := filter_job_ids_on_submission_time(
self._jobs, submitted_before=time.time() - self._poll_period
)
- job_ids_found_in_bjobs_output
):
logger.debug(f"bhist is used for job ids: {missing_in_bjobs_output}")
bhist_states = await self._poll_once_by_bhist(missing_in_bjobs_output)
Expand All @@ -346,7 +369,8 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
if job_id not in self._jobs:
return

iens, old_state = self._jobs[job_id]
old_state = self._jobs[job_id].job_state
iens = self._jobs[job_id].iens
if isinstance(new_state, IgnoredJobstates):
logger.debug(
f"Job ID '{job_id}' for {iens=} is of unknown job state '{new_state.job_state}'"
Expand All @@ -356,7 +380,7 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
if _STATE_ORDER[type(new_state)] <= _STATE_ORDER[type(old_state)]:
return

self._jobs[job_id] = (iens, new_state)
self._jobs[job_id].job_state = new_state
event: Optional[Event] = None
if isinstance(new_state, RunningJob):
logger.debug(f"Realization {iens} is running")
Expand Down
37 changes: 34 additions & 3 deletions tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
FinishedEvent,
FinishedJobFailure,
FinishedJobSuccess,
JobData,
JobState,
QueuedJob,
RunningJob,
StartedEvent,
_Stat,
build_resource_requirement_string,
filter_job_ids_on_submission_time,
parse_bhist,
parse_bjobs,
)
Expand Down Expand Up @@ -77,7 +79,11 @@ async def test_events_produced_from_jobstate_updates(jobstate_sequence: List[str

async def mocked_submit(self, iens, *_args, **_kwargs):
"""A mocked submit is speedier than going through a command on disk"""
self._jobs["1"] = (iens, QueuedJob(job_state="PEND"))
self._jobs["1"] = JobData(
iens=iens,
job_state=QueuedJob(job_state="PEND"),
submitted_timestamp=time.time(),
)
self._iens2jobid[iens] = "1"

driver.submit = mocked_submit.__get__(driver)
Expand All @@ -95,14 +101,14 @@ async def mocked_submit(self, iens, *_args, **_kwargs):
if not started and not finished_success and not finished_failure:
assert len(events) == 0

iens, state = driver._jobs["1"]
iens, state = driver._jobs["1"].iens, driver._jobs["1"].job_state
assert iens == 0
assert isinstance(state, QueuedJob)
elif started and not finished_success and not finished_failure:
assert len(events) == 1
assert events[0] == StartedEvent(iens=0)

iens, state = driver._jobs["1"]
iens, state = driver._jobs["1"].iens, driver._jobs["1"].job_state
assert iens == 0
assert isinstance(state, RunningJob)
elif started and finished_success and finished_failure:
Expand Down Expand Up @@ -727,3 +733,28 @@ async def mock_submit(*args, **kwargs):
assert "LSF kill failed" not in caplog.text
assert "LSF kill failed" not in capsys.readouterr().err
assert "LSF kill failed" not in capsys.readouterr().out


@pytest.mark.parametrize(
"time_submitted_modifier, expected_result",
[
pytest.param(
-1.0,
set(["1"]),
id="job_submitted_before_deadline",
),
pytest.param(0, set(), id="job_submitted_on_deadline"),
pytest.param(1.0, set(), id="job_submitted_after_deadline"),
],
)
def test_filter_job_ids_on_submission_time(time_submitted_modifier, expected_result):
submitted_before = time.time()
job_submitted_timestamp = submitted_before + time_submitted_modifier
jobs = {
"1": JobData(
iens=0,
job_state=QueuedJob(job_state="PEND"),
submitted_timestamp=job_submitted_timestamp,
)
}
assert filter_job_ids_on_submission_time(jobs, submitted_before) == expected_result

0 comments on commit 0217bfd

Please sign in to comment.