Skip to content

Commit

Permalink
Add minimal Python LSF driver
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Feb 13, 2024
1 parent 55a5e12 commit 1d527d9
Show file tree
Hide file tree
Showing 15 changed files with 505 additions and 7 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ markers = [
"integration_test",
"quick_only",
"requires_eclipse",
"requires_lsf",
"requires_window_manager",
"scheduler",
"script",
Expand Down
3 changes: 2 additions & 1 deletion src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ def run_cli(args: Namespace, _: Any = None) -> None:
) and ert_config.queue_config.queue_system not in [
QueueSystem.LOCAL,
QueueSystem.TORQUE,
QueueSystem.LSF,
]:
raise ErtCliError(
"Scheduler only supports LOCAL and TORQUE queue at the moment!"
"Scheduler only supports LOCAL, TORQUE and LSF queues at the moment!"
)
local_storage_set_ert_config(ert_config)

Expand Down
12 changes: 11 additions & 1 deletion src/ert/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ert.config.parsing.queue_system import QueueSystem
from ert.scheduler.driver import Driver
from ert.scheduler.local_driver import LocalDriver
from ert.scheduler.lsf_driver import LsfDriver
from ert.scheduler.openpbs_driver import OpenPBSDriver
from ert.scheduler.scheduler import Scheduler

Expand All @@ -22,8 +23,17 @@ def create_driver(config: QueueConfig) -> Driver:
queue_name = val

return OpenPBSDriver(queue_name=queue_name)
elif config.queue_system == QueueSystem.LSF:
queue_config = {
key: value for key, value in config.queue_options.get(QueueSystem.LSF, [])
}
return LsfDriver(
bsub_cmd=queue_config.get("BSUB_CMD"),
bkill_cmd=queue_config.get("BJOBS_CMD"),
bjobs_cmd=queue_config.get("BJOBS_CMD"),
)
else:
raise NotImplementedError("Only LOCAL and TORQUE drivers are implemented")
raise NotImplementedError("Only LOCAL, TORQUE and LSF drivers are implemented")


__all__ = [
Expand Down
5 changes: 3 additions & 2 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import asyncio
from abc import ABC, abstractmethod
from typing import Optional
from typing import Dict, Optional

from ert.scheduler.event import Event


class Driver(ABC):
"""Adapter for the HPC cluster."""

def __init__(self) -> None:
def __init__(self, **kwargs: Dict[str, str]) -> None:
self._event_queue: Optional[asyncio.Queue[Event]] = None

@property
Expand All @@ -30,6 +30,7 @@ async def submit(
executable: Program to execute.
args: List of arguments to send to the program.
cwd: Working directory.
name: Name of job as submitted to compute cluster
"""

@abstractmethod
Expand Down
201 changes: 201 additions & 0 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from __future__ import annotations

import asyncio
import json
import logging
import shlex
import shutil
from pathlib import Path
from typing import (
Dict,
List,
Literal,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

from pydantic import BaseModel, Field
from typing_extensions import Annotated

from ert.scheduler.driver import Driver
from ert.scheduler.event import Event, FinishedEvent, StartedEvent

_POLL_PERIOD = 2.0 # seconds

logger = logging.getLogger(__name__)

JobState = Literal[
"EXIT", "DONE", "PEND", "RUN", "ZOMBI", "PDONE", "SSUSP", "USUSP", "UNKWN"
]


class FinishedJob(BaseModel):
job_state: Literal["DONE", "EXIT"]


class QueuedJob(BaseModel):
job_state: Literal["PEND"]


class RunningJob(BaseModel):
job_state: Literal["RUN"]


AnyJob = Annotated[
Union[FinishedJob, QueuedJob, RunningJob], Field(discriminator="job_state")
]

LSF_INFO_JSON_FILENAME = "lsf_info.json"


class _Stat(BaseModel):
jobs: Mapping[str, AnyJob]


def parse_bjobs(bjobs_output_raw: bytes) -> Dict[str, Dict[str, Dict[str, str]]]:
data: Dict[str, Dict[str, str]] = {}
for line in bjobs_output_raw.decode().splitlines():
if not line or not line[0].isdigit():
continue
(jobid, _, stat, _) = line.split(maxsplit=3)
data[jobid] = {"job_state": stat}
return {"jobs": data}


class LsfDriver(Driver):
def __init__(
self,
queue_name: Optional[str] = None,
bsub_cmd: Optional[str] = None,
bjobs_cmd: Optional[str] = None,
bkill_cmd: Optional[str] = None,
) -> None:
super().__init__()

self._queue_name = queue_name

self._bsub_cmd = Path(bsub_cmd or shutil.which("bsub") or "bsub")
self._bjobs_cmd = Path(bjobs_cmd or shutil.which("bjobs") or "bjobs")
self._bkill_cmd = Path(bkill_cmd or shutil.which("bkill") or "bkill")

self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._retry_sleep_period = 3

async def submit(
self, iens: int, executable: str, /, *args: str, cwd: str, name: str = "dummy"
) -> None:
script = executable
arg_queue_name = ["-q", self._queue_name] if self._queue_name else []

bsub_with_args: List[str] = (
[str(self._bsub_cmd)]
+ arg_queue_name
+ [
"-J",
name,
script,
# skipping args for now, it might not work?
cwd, # assuming job_dispatch.py will handle cwd
]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
try:
job_id = (
stdout.decode("utf-8")
.strip()
.replace("<", "")
.replace(">", "")
.split()[1]
)
except IndexError as err:
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
raise RuntimeError from err
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")

(Path(cwd) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id] = (iens, "PEND")
self._iens2jobid[iens] = job_id

async def kill(self, iens: int) -> None:
try:
job_id = self._iens2jobid[iens]

logger.info(f"Killing realization {iens} with LSF-id {job_id}")
proc = await asyncio.create_subprocess_exec(self._bkill_cmd, job_id)
await proc.wait()
except KeyError:
return

async def poll(self) -> None:
while True:
if not self._jobs.keys():
await asyncio.sleep(_POLL_PERIOD)
continue
proc = await asyncio.create_subprocess_exec(
self._bjobs_cmd,
*self._jobs.keys(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdout, _ = await proc.communicate()
stat = _Stat(**parse_bjobs(stdout))
for job_id, job in stat.jobs.items():
if job_id not in self._jobs:
continue

iens, old_state = self._jobs[job_id]
new_state = job.job_state
if old_state == new_state:
continue

self._jobs[job_id] = (iens, new_state)
event: Optional[Event] = None
if isinstance(job, RunningJob):
logger.debug(f"Realization {iens} is running.")
event = StartedEvent(iens=iens)
elif isinstance(job, FinishedJob):
aborted = job.job_state == "EXIT"
event = FinishedEvent(
iens=iens,
returncode=1 if job.job_state == "EXIT" else 0,
aborted=aborted,
)
if aborted:
logger.warning(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed."
)
else:
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
)
del self._jobs[job_id]
del self._iens2jobid[iens]

if event:
await self.event_queue.put(event)

missing_in_bjobs_output = set(self._jobs) - set(stat.jobs.keys())
if missing_in_bjobs_output:
logger.warning(
f"bjobs did not give status for job_ids {missing_in_bjobs_output}"
)
await asyncio.sleep(_POLL_PERIOD)

async def finish(self) -> None:
pass
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ def pytest_addoption(parser):
default=False,
help="Run TORQUE tests against the real cluster",
)
parser.addoption(
"--lsf",
action="store_true",
default=False,
help="Run LSF tests against the real cluster.",
)
parser.addoption("--show-gui", action="store_true", default=False)


Expand Down
6 changes: 3 additions & 3 deletions tests/integration_tests/job_queue/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def make_failing_bsub(script_path, success_script):
"""
Approx 3/10 of the submits will fail due to the random generator in the
created mocked bsub script. By using the retry functionality towards
queue-errors in job_queue.cpp we should still manage to finalize all our runs
queue-errors in lsf_driver.py we should still manage to finalize all our runs
before exhausting the limits
"""
script_path.write_text(
Expand Down Expand Up @@ -154,7 +154,7 @@ def copy_lsf_poly_case(copy_poly_case, tmp_path):

config = [
"JOBNAME poly_%d\n",
"QUEUE_SYSTEM LSF\n",
"QUEUE_SYSTEM LSF\n",
"QUEUE_OPTION LSF MAX_RUNNING 10\n",
f"QUEUE_OPTION LSF BJOBS_CMD {tmp_path}/mock_bjobs\n",
f"QUEUE_OPTION LSF BSUB_CMD {tmp_path}/mock_bsub\n",
Expand All @@ -179,7 +179,7 @@ def copy_lsf_poly_case(copy_poly_case, tmp_path):
"try_queue_and_scheduler",
"monkeypatch",
)
@pytest.mark.scheduler(skip=True)
@pytest.mark.scheduler(skip=True) # Scheduler-LSF-driver does not support flaky bsub
@pytest.mark.integration_test
def test_run_mocked_lsf_queue():
run_cli(
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/scheduler/bin/bjobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bjobs.py" "$@"
Loading

0 comments on commit 1d527d9

Please sign in to comment.