Skip to content

Commit

Permalink
wip: use bhist
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Feb 29, 2024
1 parent 6b0af89 commit c26e6b8
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 3 deletions.
92 changes: 90 additions & 2 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil
from pathlib import Path
from typing import (
Any,
Dict,
List,
Literal,
Expand Down Expand Up @@ -74,12 +75,33 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, Dict[str, Dict[str, str]]]:
return {"jobs": data}


def parse_bhist(bhist_output: str) -> Dict[int, Dict[str, int]]:
data: Dict[int, Dict[str, int]] = {}
for line in bhist_output.splitlines():
if line.startswith("Summary of time"):
assert "in seconds" in line
if not line or not line[0].isdigit():
continue
tokens = line.split(maxsplit=7)
if len(tokens) >= 6 and tokens[0] and tokens[3] and tokens[5]:
try:
data[int(tokens[0])] = {
"pending_seconds": int(tokens[3]),
"running_seconds": int(tokens[5]),
}
except ValueError:
logger.warning(f'bhist parser could not parse "{line}"')
continue
return data


class LsfDriver(Driver):
def __init__(
self,
queue_name: Optional[str] = None,
bsub_cmd: Optional[str] = None,
bjobs_cmd: Optional[str] = None,
bhist_cmd: Optional[str] = None,
bkill_cmd: Optional[str] = None,
) -> None:
super().__init__()
Expand All @@ -97,6 +119,11 @@ def __init__(

self._poll_period = _POLL_PERIOD

self._bhist_task: Optional[asyncio.Task[Any]] = None
self._bhist_cmd = Path(bhist_cmd or shutil.which("bhist") or "bhist")
self._jobids_needing_bhist: List[int] = []
self._previous_bhist_result: Dict[int, Dict[str, int]]

async def submit(
self,
iens: int,
Expand Down Expand Up @@ -185,6 +212,8 @@ async def kill(self, iens: int) -> None:
return

async def poll(self) -> None:
if self._bhist_task is None:
self._bhist_task = asyncio.create_task(self.poll_by_bhist())
while True:
if not self._jobs.keys():
await asyncio.sleep(self._poll_period)
Expand All @@ -203,7 +232,13 @@ async def poll(self) -> None:
)
continue
stat = _Stat(**parse_bjobs(stdout.decode()))
for job_id, job in stat.jobs.items():

missing_job_ids = set(self._iens2jobid.values()) - set(stat.job.keys())

Check failure on line 236 in src/ert/scheduler/lsf_driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

"_Stat" has no attribute "job"; maybe "jobs"?
bhist_stats = []
if missing_job_ids:
bhist_stats = await self.poll_by_bhist(missing_job_ids)

Check failure on line 239 in src/ert/scheduler/lsf_driver.py

View workflow job for this annotation

GitHub Actions / type-checking (3.12)

Too many arguments for "poll_by_bhist" of "LsfDriver"

for job_id, job in list(stat.jobs.items()) + bhist_stats:
if job_id not in self._jobs:
continue

Expand Down Expand Up @@ -245,5 +280,58 @@ async def poll(self) -> None:
)
await asyncio.sleep(_POLL_PERIOD)

async def poll_by_bhist(self) -> List[Tuple[int, AnyJob]]:
while True:
if not self._jobids_needing_bhist:
await asyncio.sleep(self._poll_period * 3)
continue
logger.debug(
"bhist is needed to obtain state for "
f"job ids {self._jobids_needing_bhist}"
)
process = await asyncio.create_subprocess_exec(
self._bhist_cmd,
*[str(job_id) for job_id in self._jobids_needing_bhist],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
f"bhist gave returncode {process.returncode} and error {stderr.decode()}"
)
continue
data = parse_bhist(stdout.decode())
if not self._previous_bhist_result:
self._previous_bhist_result = data
continue
for job_id, job_stat in data.items():
if job_id not in self._previous_bhist_result:
continue
if (
job_stat["pending_seconds"]
== self._previous_bhist_result[job_id]["pending_seconds"]
and job_stat["running_seconds"]
== self._previous_bhist_result[job_id]["running_seconds"]
):
# Job is DONE or EXIT
pass
if (
job_stat["running_seconds"]
> self._previous_bhist_result[job_id]["running_seconds"]
):
# Job is running
pass
if (
job_stat["pending_seconds"]
> self._previous_bhist_result[job_id]["pending_seconds"]
):
# Job is pending
pass
self._previous_bhist_result = data
await asyncio.sleep(self._poll_period * 3)

async def finish(self) -> None:
pass
if self._bhist_task is not None and not self._bhist_task.done():
self._bhist_task.cancel()
await self._bhist_task
32 changes: 31 additions & 1 deletion tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from ert.scheduler import Driver, LsfDriver
from ert.scheduler.event import FinishedEvent, StartedEvent
from ert.scheduler.lsf_driver import JobState, parse_bjobs
from ert.scheduler.lsf_driver import JobState, parse_bhist, parse_bjobs

valid_jobstates: Collection[str] = list(get_args(JobState))

Expand Down Expand Up @@ -354,3 +354,33 @@ async def test_faulty_bjobs(monkeypatch, tmp_path, bjobs_script, expectation):
with expectation:
await driver.submit(0, "sleep")
await asyncio.wait_for(poll(driver, {0}), timeout=0.2)


@pytest.mark.parametrize(
"bhist_output, expected",
[
(
"Summary of time in seconds spent in various states:\n"
"JOBID USER JOB_NAME PEND PSUSP RUN USUSP SSUSP UNKWN TOTAL\n"
"1962 user1 *1000000 410650 0 0 0 0 0 410650\n",
{1962: {"pending_seconds": 410650, "running_seconds": 0}},
),
(
"Summary of time in seconds\n1 x x 3 x 5",
{1: {"pending_seconds": 3, "running_seconds": 5}},
),
(
"1 x x 3 x 5",
{1: {"pending_seconds": 3, "running_seconds": 5}},
),
(
"1 x x 3 x 5\n2 x x 4 x 6",
{
1: {"pending_seconds": 3, "running_seconds": 5},
2: {"pending_seconds": 4, "running_seconds": 6},
},
),
],
)
async def test_parse_bhist(bhist_output, expected):
assert parse_bhist(bhist_output) == expected

0 comments on commit c26e6b8

Please sign in to comment.