Skip to content

Commit

Permalink
Make fallback mechanism for failing bjobs (bhist)
Browse files Browse the repository at this point in the history
Whenever jobs for which we need running states for are
missing in the output from bjobs, it can mean that the information
has fallen out of the bjobs cache on the LSF server. A scenario where
this can occur is that if Ert for some reason is hanging for a long time
without polling, or maybe for other reasons on the LSF side.

Any time bjobs misses information, one call to `bhist` is done, and its
output is stored in the object. The first time bhist is called, the
driver is not able to determine any states, but given that the next
bjobs call also will miss a certain job id, the subsequent bhist call
will be able to deduce the running state by comparing the timing values
in the first and the second bhist call.

This method is not able to catch job that has failed, it will be marked
as done.
  • Loading branch information
berland committed Apr 2, 2024
1 parent 3bc1f30 commit 1c1badd
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/ert/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def create_driver(config: QueueConfig) -> Driver:
bsub_cmd=queue_config.get("BSUB_CMD"),
bkill_cmd=queue_config.get("BKILL_CMD"),
bjobs_cmd=queue_config.get("BJOBS_CMD"),
bhist_cmd=queue_config.get("BHIST_CMD"),
exclude_hosts=queue_config.get("EXCLUDE_HOST", "").split(","),
queue_name=queue_config.get("LSF_QUEUE"),
resource_requirement=queue_config.get("LSF_RESOURCE"),
Expand Down
117 changes: 110 additions & 7 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import itertools
import json
import logging
import re
Expand All @@ -9,9 +10,11 @@
import stat
import subprocess
import tempfile
import time
from pathlib import Path
from typing import (
Dict,
Iterable,
List,
Literal,
Mapping,
Expand Down Expand Up @@ -129,6 +132,36 @@ def parse_resource_requirement_string(
return " ".join(resource_requirements)


def parse_bhist(bhist_output: str) -> Dict[str, Dict[str, int]]:
data: Dict[str, Dict[str, int]] = {}
for line in bhist_output.splitlines():
if line.startswith("Summary of time"):
assert "in seconds" in line
if not line or not line[0].isdigit():
continue
tokens = line.split()
try:
# The bhist output has data in 10 columns in fixed positions,
# with spaces possible in field 3. Since `split()` is used
# to parse the output, we branch on the number of tokens found.
if len(tokens) > 10:
data[tokens[0]] = {
"pending_seconds": int(tokens[-7]),
"running_seconds": int(tokens[-5]),
}
elif len(tokens) >= 6 and tokens[0] and tokens[3] and tokens[5]:
data[tokens[0]] = {
"pending_seconds": int(tokens[3]),
"running_seconds": int(tokens[5]),
}
else:
logger.warning(f'bhist parser could not parse "{line}"')
except ValueError as err:
logger.warning(f'bhist parser could not parse "{line}", "{err}"')
continue
return data


class LsfDriver(Driver):
def __init__(
self,
Expand All @@ -138,6 +171,7 @@ def __init__(
bsub_cmd: Optional[str] = None,
bjobs_cmd: Optional[str] = None,
bkill_cmd: Optional[str] = None,
bhist_cmd: Optional[str] = None,
) -> None:
super().__init__()

Expand All @@ -158,6 +192,11 @@ def __init__(

self._poll_period = _POLL_PERIOD

self._bhist_cmd = Path(bhist_cmd or shutil.which("bhist") or "bhist")
self._bhist_cache: Optional[Dict[str, Dict[str, int]]] = None
self._bhist_required_cache_age: float = 4
self._bhist_cache_timestamp: float = time.time()

async def submit(
self,
iens: int,
Expand Down Expand Up @@ -289,15 +328,30 @@ async def poll(self) -> None:
logger.warning(
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
stat = _Stat(**parse_bjobs(stdout.decode(errors="ignore")))
for job_id, job in stat.jobs.items():
bjobs_states = _Stat(**parse_bjobs(stdout.decode(errors="ignore")))

if missing_in_bjobs_output := set(self._jobs) - set(
bjobs_states.jobs.keys()
):
logger.debug(f"bhist is used for job ids: {missing_in_bjobs_output}")
bhist_states = await self._poll_once_by_bhist(missing_in_bjobs_output)
missing_in_bhist_and_bjobs = missing_in_bjobs_output - set(
bhist_states.jobs.keys()
)
else:
bhist_states = _Stat(**{"jobs": {}})
missing_in_bhist_and_bjobs = set()

for job_id, job in itertools.chain(
bjobs_states.jobs.items(), bhist_states.jobs.items()
):
await self._process_job_update(job_id, job)
missing_in_bjobs_output = set(self._jobs) - set(stat.jobs.keys())
if missing_in_bjobs_output:
logger.warning(
f"bjobs did not give status for job_ids {missing_in_bjobs_output}"

if missing_in_bhist_and_bjobs and self._bhist_cache is not None:
logger.debug(
f"bhist did not give status for job_ids {missing_in_bhist_and_bjobs}, giving up for now."
)
await asyncio.sleep(_POLL_PERIOD)
await asyncio.sleep(self._poll_period)

async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
if job_id not in self._jobs:
Expand Down Expand Up @@ -336,5 +390,54 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
del self._iens2jobid[iens]
await self.event_queue.put(event)

async def _poll_once_by_bhist(self, missing_job_ids: Iterable[str]) -> _Stat:
if time.time() - self._bhist_cache_timestamp < self._bhist_required_cache_age:
return _Stat(**{"jobs": {}})

process = await asyncio.create_subprocess_exec(
self._bhist_cmd,
*[str(job_id) for job_id in missing_job_ids],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
f"bhist gave returncode {process.returncode} and error {stderr.decode()}"
)
return _Stat(**{"jobs": {}})

data: Dict[str, Dict[str, int]] = parse_bhist(stdout.decode())

if not self._bhist_cache:
# Boot-strapping. We can't give any data until we have run again.
self._bhist_cache = data
return _Stat(**{"jobs": {}})

jobs = {}
for job_id, job_stat in data.items():
if job_id not in self._bhist_cache:
continue
if (
job_stat["pending_seconds"]
== self._bhist_cache[job_id]["pending_seconds"]
and job_stat["running_seconds"]
== self._bhist_cache[job_id]["running_seconds"]
):
jobs[job_id] = {"job_state": "DONE"} # or EXIT, we can't tell
elif (
job_stat["running_seconds"]
> self._bhist_cache[job_id]["running_seconds"]
):
jobs[job_id] = {"job_state": "RUN"}
elif (
job_stat["pending_seconds"]
> self._bhist_cache[job_id]["pending_seconds"]
):
jobs[job_id] = {"job_state": "PEND"}
self._bhist_cache = data
self._bhist_cache_timestamp = time.time()
return _Stat(**{"jobs": jobs})

async def finish(self) -> None:
pass
2 changes: 2 additions & 0 deletions tests/integration_tests/scheduler/bin/bhist
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bhist.py" "$@"
96 changes: 96 additions & 0 deletions tests/integration_tests/scheduler/bin/bhist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import argparse
import os
import time
from pathlib import Path
from typing import List, Optional

from pydantic import BaseModel


class Job(BaseModel):
job_id: str
user: str = "username"
job_name: str = "d u m m y" # can be up to 4094 chars
pend: int = 0
psusp: int = 0
run: int = 0
ususp: int = 0
ssusp: int = 0
unkwn: int = 0
total: int = 0


class SQueueOutput(BaseModel):
jobs: List[Job]


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Mocked LSF bhist command reading state from filesystem"
)
parser.add_argument("jobs", type=str, nargs="*")
return parser


def bhist_formatter(jobstats: List[Job]) -> str:
string = "Summary of time in seconds spent in various states:\n"
string += "JOBID USER JOB_NAME PEND PSUSP RUN USUSP SSUSP UNKWN TOTAL\n"
for job in jobstats:
string += (
f"{str(job.job_id):7.7s} {job.user:7.7s} "
f"{job.job_name:9.9s} {str(job.pend):7.7s} "
f"{str(job.psusp):7.7s} {str(job.run):7.7s} "
f"{str(job.ususp):7.7s} {str(job.ssusp):7.7s} "
f"{str(job.unkwn):7.7s} {str(job.total):7.7s}\n"
)
return string


def read(path: Path, default: Optional[str] = None) -> Optional[str]:
return path.read_text().strip() if path.exists() else default


def main() -> None:
args = get_parser().parse_args()

jobs_path = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs"

jobs_output: List[Job] = []
for job in args.jobs:
job_name: str = read(jobs_path / f"{job}.name") or "_"
assert job_name is not None

submit_time_millis: int = int(
os.path.getctime(jobs_path / f"{job}.name") * 1000
)
pending_time_millis = int(read(jobs_path / "pendingtimemillis") or 0)
run_start_time_millis: int = submit_time_millis + pending_time_millis
end_time_millis: int = int(time.time() * 1000)
if (jobs_path / f"{job}.returncode").exists():
print("bhist says job is done")
end_time_millis = int(
os.path.getctime(jobs_path / f"{job}.returncode") * 1000
)
print(f"run: {end_time_millis - run_start_time_millis}")
pend: int = min(
run_start_time_millis - submit_time_millis,
int(time.time() * 1000) - submit_time_millis,
)

jobs_output.append(
Job(
**{
"job_id": job,
"user": "dummyuser",
"job_name": job_name,
"pend": pend,
"run": max(0, end_time_millis - run_start_time_millis),
"total": end_time_millis - submit_time_millis,
}
)
)
print(bhist_formatter(jobs_output))


if __name__ == "__main__":
main()
27 changes: 27 additions & 0 deletions tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import json
import os
import stat
from pathlib import Path

import pytest

Expand All @@ -20,6 +22,22 @@ def mock_lsf(pytestconfig, monkeypatch, tmp_path):
mock_bin(monkeypatch, tmp_path)


@pytest.fixture
def not_found_bjobs(monkeypatch, tmp_path):
"""This creates a bjobs command that will always claim a job
does not exist, mimicking a job that has 'fallen out of the bjobs cache'."""
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
bjobs_path = bin_path / "bjobs"
bjobs_path.write_text(
"#!/bin/sh\n" 'echo "Job <$1> is not found"',
encoding="utf-8",
)
bjobs_path.chmod(bjobs_path.stat().st_mode | stat.S_IEXEC)


@pytest.mark.parametrize("explicit_runpath", [(True), (False)])
async def test_lsf_info_file_in_runpath(explicit_runpath, tmp_path):
os.chdir(tmp_path)
Expand Down Expand Up @@ -109,3 +127,12 @@ async def test_submit_with_resource_requirement(tmp_path):
await poll(driver, {0})

assert (tmp_path / "test").read_text(encoding="utf-8") == "test\n"


async def test_polling_bhist_fallback(not_found_bjobs):
driver = LsfDriver()
Path("mock_jobs").mkdir()
Path("mock_jobs/pendingtimemillis").write_text("100")
driver._poll_period = 0.01
await driver.submit(0, "sh", "-c", "sleep 1")
await poll(driver, {0})
Loading

0 comments on commit 1c1badd

Please sign in to comment.