Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python LSF driver #6960

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ markers = [
"integration_test",
"quick_only",
"requires_eclipse",
"requires_lsf",
"requires_window_manager",
"scheduler",
"script",
Expand Down
3 changes: 2 additions & 1 deletion src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ def run_cli(args: Namespace, _: Any = None) -> None:
) and ert_config.queue_config.queue_system not in [
QueueSystem.LOCAL,
QueueSystem.TORQUE,
QueueSystem.LSF,
]:
raise ErtCliError(
"Scheduler only supports LOCAL and TORQUE queue at the moment!"
"Scheduler only supports LOCAL, TORQUE and LSF queues at the moment!"
)
local_storage_set_ert_config(ert_config)

Expand Down
12 changes: 11 additions & 1 deletion src/ert/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ert.config.parsing.queue_system import QueueSystem
from ert.scheduler.driver import Driver
from ert.scheduler.local_driver import LocalDriver
from ert.scheduler.lsf_driver import LsfDriver
from ert.scheduler.openpbs_driver import OpenPBSDriver
from ert.scheduler.scheduler import Scheduler

Expand All @@ -22,8 +23,17 @@ def create_driver(config: QueueConfig) -> Driver:
queue_name = val

return OpenPBSDriver(queue_name=queue_name)
elif config.queue_system == QueueSystem.LSF:
queue_config = {
key: value for key, value in config.queue_options.get(QueueSystem.LSF, [])
}
return LsfDriver(
bsub_cmd=queue_config.get("BSUB_CMD"),
bkill_cmd=queue_config.get("BJOBS_CMD"),
bjobs_cmd=queue_config.get("BJOBS_CMD"),
)
else:
raise NotImplementedError("Only LOCAL and TORQUE drivers are implemented")
raise NotImplementedError("Only LOCAL, TORQUE and LSF drivers are implemented")


__all__ = [
Expand Down
5 changes: 3 additions & 2 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

import asyncio
from abc import ABC, abstractmethod
from typing import Optional
from typing import Dict, Optional

from ert.scheduler.event import Event


class Driver(ABC):
"""Adapter for the HPC cluster."""

def __init__(self) -> None:
def __init__(self, **kwargs: Dict[str, str]) -> None:
self._event_queue: Optional[asyncio.Queue[Event]] = None

@property
Expand All @@ -30,6 +30,7 @@ async def submit(
executable: Program to execute.
args: List of arguments to send to the program.
cwd: Working directory.
name: Name of job as submitted to compute cluster
"""

@abstractmethod
Expand Down
201 changes: 201 additions & 0 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from __future__ import annotations

import asyncio
import json
import logging
import shlex
import shutil
from pathlib import Path
from typing import (
Dict,
List,
Literal,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

from pydantic import BaseModel, Field
from typing_extensions import Annotated

from ert.scheduler.driver import Driver
from ert.scheduler.event import Event, FinishedEvent, StartedEvent

_POLL_PERIOD = 2.0 # seconds

logger = logging.getLogger(__name__)

JobState = Literal[
"EXIT", "DONE", "PEND", "RUN", "ZOMBI", "PDONE", "SSUSP", "USUSP", "UNKWN"
]


class FinishedJob(BaseModel):
job_state: Literal["DONE", "EXIT"]


class QueuedJob(BaseModel):
job_state: Literal["PEND"]


class RunningJob(BaseModel):
job_state: Literal["RUN"]


AnyJob = Annotated[
Union[FinishedJob, QueuedJob, RunningJob], Field(discriminator="job_state")
]

LSF_INFO_JSON_FILENAME = "lsf_info.json"


class _Stat(BaseModel):
jobs: Mapping[str, AnyJob]


def parse_bjobs(bjobs_output_raw: bytes) -> Dict[str, Dict[str, Dict[str, str]]]:
data: Dict[str, Dict[str, str]] = {}
for line in bjobs_output_raw.decode().splitlines():
if not line or not line[0].isdigit():
continue
(jobid, _, stat, _) = line.split(maxsplit=3)
data[jobid] = {"job_state": stat}
return {"jobs": data}


class LsfDriver(Driver):
def __init__(
self,
queue_name: Optional[str] = None,
bsub_cmd: Optional[str] = None,
bjobs_cmd: Optional[str] = None,
bkill_cmd: Optional[str] = None,
) -> None:
super().__init__()

self._queue_name = queue_name

self._bsub_cmd = Path(bsub_cmd or shutil.which("bsub") or "bsub")
self._bjobs_cmd = Path(bjobs_cmd or shutil.which("bjobs") or "bjobs")
self._bkill_cmd = Path(bkill_cmd or shutil.which("bkill") or "bkill")

self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
self._max_attempt: int = 100
self._retry_sleep_period = 3

async def submit(
self, iens: int, executable: str, /, *args: str, cwd: str, name: str = "dummy"
) -> None:
script = executable
arg_queue_name = ["-q", self._queue_name] if self._queue_name else []

bsub_with_args: List[str] = (
[str(self._bsub_cmd)]
+ arg_queue_name
+ [
"-J",
name,
script,
# skipping args for now, it might not work?
cwd, # assuming job_dispatch.py will handle cwd
]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
try:
job_id = (
stdout.decode("utf-8")
.strip()
.replace("<", "")
.replace(">", "")
.split()[1]
)
except IndexError as err:
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
raise RuntimeError from err
logger.info(f"Realization {iens} accepted by LSF, got id {job_id}")

(Path(cwd) / LSF_INFO_JSON_FILENAME).write_text(
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id] = (iens, "PEND")
self._iens2jobid[iens] = job_id

async def kill(self, iens: int) -> None:
try:
job_id = self._iens2jobid[iens]

logger.info(f"Killing realization {iens} with LSF-id {job_id}")
proc = await asyncio.create_subprocess_exec(self._bkill_cmd, job_id)
await proc.wait()
except KeyError:
return

async def poll(self) -> None:
while True:
if not self._jobs.keys():
await asyncio.sleep(_POLL_PERIOD)
continue
proc = await asyncio.create_subprocess_exec(
self._bjobs_cmd,
*self._jobs.keys(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdout, _ = await proc.communicate()
stat = _Stat(**parse_bjobs(stdout))
for job_id, job in stat.jobs.items():
if job_id not in self._jobs:
continue

iens, old_state = self._jobs[job_id]
new_state = job.job_state
if old_state == new_state:
continue

self._jobs[job_id] = (iens, new_state)
event: Optional[Event] = None
if isinstance(job, RunningJob):
logger.debug(f"Realization {iens} is running.")
event = StartedEvent(iens=iens)
elif isinstance(job, FinishedJob):
aborted = job.job_state == "EXIT"
event = FinishedEvent(
iens=iens,
returncode=1 if job.job_state == "EXIT" else 0,
aborted=aborted,
)
if aborted:
logger.warning(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed."
)
else:
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
)
del self._jobs[job_id]
del self._iens2jobid[iens]

if event:
await self.event_queue.put(event)

missing_in_bjobs_output = set(self._jobs) - set(stat.jobs.keys())
if missing_in_bjobs_output:
logger.warning(
f"bjobs did not give status for job_ids {missing_in_bjobs_output}"
)
await asyncio.sleep(_POLL_PERIOD)

async def finish(self) -> None:
pass
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ def pytest_addoption(parser):
default=False,
help="Run TORQUE tests against the real cluster",
)
parser.addoption(
"--lsf",
action="store_true",
default=False,
help="Run LSF tests against the real cluster.",
)
parser.addoption("--show-gui", action="store_true", default=False)


Expand Down
6 changes: 3 additions & 3 deletions tests/integration_tests/job_queue/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def make_failing_bsub(script_path, success_script):
"""
Approx 3/10 of the submits will fail due to the random generator in the
created mocked bsub script. By using the retry functionality towards
queue-errors in job_queue.cpp we should still manage to finalize all our runs
queue-errors in lsf_driver.py we should still manage to finalize all our runs
before exhausting the limits
"""
script_path.write_text(
Expand Down Expand Up @@ -154,7 +154,7 @@ def copy_lsf_poly_case(copy_poly_case, tmp_path):

config = [
"JOBNAME poly_%d\n",
"QUEUE_SYSTEM LSF\n",
"QUEUE_SYSTEM LSF\n",
"QUEUE_OPTION LSF MAX_RUNNING 10\n",
f"QUEUE_OPTION LSF BJOBS_CMD {tmp_path}/mock_bjobs\n",
f"QUEUE_OPTION LSF BSUB_CMD {tmp_path}/mock_bsub\n",
Expand All @@ -179,7 +179,7 @@ def copy_lsf_poly_case(copy_poly_case, tmp_path):
"try_queue_and_scheduler",
"monkeypatch",
)
@pytest.mark.scheduler(skip=True)
@pytest.mark.scheduler(skip=True) # Scheduler-LSF-driver does not support flaky bsub
@pytest.mark.integration_test
def test_run_mocked_lsf_queue():
run_cli(
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/scheduler/bin/bjobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bjobs.py" "$@"
Loading
Loading