Skip to content

Commit

Permalink
wip: mocked LSF-driver as local queue
Browse files Browse the repository at this point in the history
Broken WIP with LSF-commands translating into local driverr. driver.submit when mocked bsub does not return before the entire job is done
  • Loading branch information
berland committed Jan 26, 2024
1 parent 578fb76 commit 8de764b
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 20 deletions.
26 changes: 19 additions & 7 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ async def submit(
self, iens: int, executable: str, /, *args: str, cwd: str, job_name: str
) -> None:
await self.run_with_retries(
lambda: self._submit_once(iens, executable, cwd=cwd, job_name=job_name),
lambda: self._submit_once(
iens, executable, *args, cwd=cwd, job_name=job_name
),
error_msg="Maximum number of submit errors exceeded",
)

Expand All @@ -121,33 +123,43 @@ async def _submit_once(
"-J",
job_name,
executable,
*args,
]
print(f"setting up bsub process {bsub_with_args}")
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
job_id, stderr = await process.communicate()
print("communicate bsub process")
print(" ".join(bsub_with_args))
stdout, stderr = await process.communicate()
print("bsub done")
try:
job_id_ = (
job_id.decode("utf-8")
job_id = (
stdout.decode("utf-8")
.strip()
.replace("<", "")
.replace(">", "")
.split()[1]
)
except IndexError as err:
print(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
raise RuntimeError from err

print(f"got {job_id=}")
(Path(cwd) / LSF_JSON).write_text(
json.dumps({"job_id": job_id_}), encoding="utf-8"
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id_] = (iens, _JobState.PEND)
self._iens2jobid[iens] = job_id_
self._jobs[job_id] = (iens, _JobState.PEND)
self._iens2jobid[iens] = job_id
print("done in lsf.submit()")

async def kill(self, iens: int) -> None:
try:
Expand Down
10 changes: 1 addition & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def pytest_addoption(parser):
"--lsf",
action="store_true",
default=False,
help="Defaults to not running tests that require LSF.",
help="Run LSF tests towards an actual cluster.",
)
parser.addoption("--show-gui", action="store_true", default=False)

Expand Down Expand Up @@ -314,10 +314,6 @@ def pytest_collection_modifyitems(config, items):
"--eclipse_simulator"
):
item.add_marker(pytest.mark.skip("Requires eclipse"))
if item.get_closest_marker("requires_lsf") and not config.getoption(
"--lsf"
):
item.add_marker(pytest.mark.skip("Requires LSF"))

else:
skip_slow = pytest.mark.skip(reason="need --runslow option to run")
Expand All @@ -328,10 +324,6 @@ def pytest_collection_modifyitems(config, items):
"--eclipse-simulator"
):
item.add_marker(pytest.mark.skip("Requires eclipse"))
if item.get_closest_marker("requires_lsf") and not config.getoption(
"--lsf"
):
item.add_marker(pytest.mark.skip("Requires LSF"))


def _run_snake_oil(source_root):
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/bjobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bjobs.py" "$@"
85 changes: 85 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/bjobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import argparse
import os
import subprocess
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import List, Literal, Mapping, Optional

from pydantic import BaseModel


class LsfState(str, Enum):
EXIT = ("EXIT",)
DONE = ("DONE",)
PEND = ("PEND",)
RUN = ("RUN",)


class Job(BaseModel):
job_id: str
name: str
job_state: LsfState
user_name: str = "username"
queue: str = "normal"
from_host: str = "localhost"
exec_host: str = "localhost"
submit_time: int = 0


class SQueueOutput(BaseModel):
jobs: List[Job]


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Get slurm jobs in LSF way")
parser.add_argument("jobids", type=str, nargs="*")
return parser


def lsf_formatter(jobstats: List[Job]) -> str:
string = "JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME\n"
for job in jobstats:
submit_time = datetime.utcfromtimestamp((job.submit_time))
string += (
f"{str(job.job_id):<5s} {job.user_name:<8s} "
f"{job.job_state:<4s} {job.queue:<8} "
f"{job.from_host:<11s} {job.exec_host:<11s} {job.name:<8s} "
f"{submit_time}\n"
)
return string


def read(path: Path, default: Optional[str] = None) -> Optional[str]:
return path.read_text().strip() if path.exists() else default


def main() -> None:
args = get_parser().parse_args()

jobdir = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs"

jobs: List[Job] = []
for jobid in args.jobids:
pid = read(jobdir / f"{jobid}.pid")
returncode = read(jobdir / f"{jobid}.returncode")
state = LsfState.PEND
if pid is not None:
state = LsfState.RUN
else:
state = LsfState.DONE if returncode == 0 else LsfState.EXIT
jobs.append(
Job(
**{
"job_id": jobid,
"name": read(jobdir / "f{jobid}.name") or "_",
"job_state": state,
}
)
)

print(lsf_formatter(jobs))


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/bkill
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bkill.py" "$@"
32 changes: 32 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/bkill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import argparse
import os
import signal
import sys
from pathlib import Path


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Kill jobs")
parser.add_argument("jobids", type=str, nargs="+")
return parser


def main() -> None:
args = get_parser().parse_args()

jobdir = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs"
killsignal = signal.SIGTERM
for jobid in args.jobids:
pidfile = jobdir / f"{jobid}.pid"
if not pidfile.exists():
print("fixme, job did not exist") # verify_me
sys.exit(1) # verify_me
pid = int(pidfile.read_text(encoding="utf-8").strip())
print(f"{pidfile=} {pid=}")
os.kill(pid, killsignal)
pidfile.unlink()
print(f"killed a job={pid} with {killsignal=}") # verify with actual bkill


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/bsub
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bsub.py" "$@" &
disown -a
37 changes: 37 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/bsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import argparse
import os
import random
import subprocess
from pathlib import Path


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Submit jobs to Slurm via LSF syntax")

parser.add_argument("-J", "--job_name", type=str)
parser.add_argument("shellcode", type=str, nargs="+")
return parser


def main() -> None:
args = get_parser().parse_args()

jobdir = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs"
jobid = str(
random.randint(1, 2**15)
) # todo: ensure not in use to avoid flakyness
jobdir.mkdir(exist_ok=True, parents=True)

(jobdir / f"{jobid}.script").write_text(" ".join(args.shellcode))
(jobdir / f"{jobid}.name").write_text(args.job_name or "no_name")
print(f"Job <{jobid}> is submitted to default queue <normal>.")

# Actually run the requested command/script on localhost
subprocess.Popen(
[Path(__file__).parent / "runner", str(jobdir / jobid)],
)
print("exiting main in bsub.py")


if __name__ == "__main__":
main()
21 changes: 21 additions & 0 deletions tests/integration_tests/scheduler/mock_lsf_bin/runner
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
job=$1

function handle_sigterm {
# Torque uses (256 + SIGNAL) as the returncode
# Fix this to whatever LSF does..
# SIGTERM=15
echo "271" > "${job}.returncode"
kill $child_pid
exit 0
}

trap handle_sigterm SIGTERM

echo "$$" > "${job}.pid"
sh "${job}.script" > "${job}.stdout" 2> "${job}.stderr" &
child_pid=$!
wait

echo "runner finished sh code"
echo "$?" > "${job}.returncode"
23 changes: 19 additions & 4 deletions tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
import asyncio
import contextlib
import os
import sys

import pytest

from ert.scheduler import LsfDriver


@pytest.fixture(autouse=True)
def mock_lsf(pytestconfig, monkeypatch, tmp_path):
if pytestconfig.getoption("lsf"):
# User provided --lsf, which means we should use the actual LSF
# cluster without mocking anything.
return

bin_path = os.path.join(os.path.dirname(__file__), "mock_lsf_bin")

monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
monkeypatch.setenv("PYTEST_TMP_PATH", str(tmp_path))
monkeypatch.setenv("PYTHON", sys.executable)


@pytest.mark.timeout(5)
@pytest.mark.requires_lsf
async def test_submit_poll_and_kill():
driver = LsfDriver()
iens: int = 0
await driver.submit(iens, "sleep 100", cwd=".", job_name="_")
print("sutmibbint")
await driver.submit(iens, "sleep", "100", cwd=".", job_name="_")
print("submit done awaited")
assert iens in driver._iens2jobid
jobid = driver._iens2jobid[iens]
assert driver._jobs[jobid] == (iens, "PEND")
Expand All @@ -33,8 +50,6 @@ async def test_submit_poll_and_kill():
await poll_task


@pytest.mark.timeout(5)
@pytest.mark.requires_lsf
async def test_job_name():
driver = LsfDriver()
iens: int = 0
Expand Down

0 comments on commit 8de764b

Please sign in to comment.