diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index e2c85829dbc..267bc07a09c 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -109,7 +109,9 @@ async def submit( self, iens: int, executable: str, /, *args: str, cwd: str, job_name: str ) -> None: await self.run_with_retries( - lambda: self._submit_once(iens, executable, cwd=cwd, job_name=job_name), + lambda: self._submit_once( + iens, executable, *args, cwd=cwd, job_name=job_name + ), error_msg="Maximum number of submit errors exceeded", ) @@ -121,33 +123,43 @@ async def _submit_once( "-J", job_name, executable, + *args, ] + print(f"setting up bsub process {bsub_with_args}") process = await asyncio.create_subprocess_exec( *bsub_with_args, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - job_id, stderr = await process.communicate() + print("communicate bsub process") + print(" ".join(bsub_with_args)) + stdout, stderr = await process.communicate() + print("bsub done") try: - job_id_ = ( - job_id.decode("utf-8") + job_id = ( + stdout.decode("utf-8") .strip() .replace("<", "") .replace(">", "") .split()[1] ) except IndexError as err: + print( + f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}" + ) logger.error( f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}" ) raise RuntimeError from err + print(f"got {job_id=}") (Path(cwd) / LSF_JSON).write_text( - json.dumps({"job_id": job_id_}), encoding="utf-8" + json.dumps({"job_id": job_id}), encoding="utf-8" ) - self._jobs[job_id_] = (iens, _JobState.PEND) - self._iens2jobid[iens] = job_id_ + self._jobs[job_id] = (iens, _JobState.PEND) + self._iens2jobid[iens] = job_id + print("done in lsf.submit()") async def kill(self, iens: int) -> None: try: diff --git a/tests/conftest.py b/tests/conftest.py index 6d5cf06cc1a..720fde33a1a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -241,7 +241,7 @@ def pytest_addoption(parser): "--lsf", action="store_true", default=False, - help="Defaults to not running tests that require LSF.", + help="Run LSF tests towards an actual cluster.", ) parser.addoption("--show-gui", action="store_true", default=False) @@ -314,10 +314,6 @@ def pytest_collection_modifyitems(config, items): "--eclipse_simulator" ): item.add_marker(pytest.mark.skip("Requires eclipse")) - if item.get_closest_marker("requires_lsf") and not config.getoption( - "--lsf" - ): - item.add_marker(pytest.mark.skip("Requires LSF")) else: skip_slow = pytest.mark.skip(reason="need --runslow option to run") @@ -328,10 +324,6 @@ def pytest_collection_modifyitems(config, items): "--eclipse-simulator" ): item.add_marker(pytest.mark.skip("Requires eclipse")) - if item.get_closest_marker("requires_lsf") and not config.getoption( - "--lsf" - ): - item.add_marker(pytest.mark.skip("Requires LSF")) def _run_snake_oil(source_root): diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bjobs b/tests/integration_tests/scheduler/mock_lsf_bin/bjobs new file mode 100755 index 00000000000..09849ce8d66 --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bjobs @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bjobs.py" "$@" diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bjobs.py b/tests/integration_tests/scheduler/mock_lsf_bin/bjobs.py new file mode 100644 index 00000000000..fd04441f9a4 --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bjobs.py @@ -0,0 +1,85 @@ +import argparse +import os +import subprocess +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import List, Literal, Mapping, Optional + +from pydantic import BaseModel + + +class LsfState(str, Enum): + EXIT = ("EXIT",) + DONE = ("DONE",) + PEND = ("PEND",) + RUN = ("RUN",) + + +class Job(BaseModel): + job_id: str + name: str + job_state: LsfState + user_name: str = "username" + queue: str = "normal" + from_host: str = "localhost" + exec_host: str = "localhost" + submit_time: int = 0 + + +class SQueueOutput(BaseModel): + jobs: List[Job] + + +def get_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Get slurm jobs in LSF way") + parser.add_argument("jobids", type=str, nargs="*") + return parser + + +def lsf_formatter(jobstats: List[Job]) -> str: + string = "JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME\n" + for job in jobstats: + submit_time = datetime.utcfromtimestamp((job.submit_time)) + string += ( + f"{str(job.job_id):<5s} {job.user_name:<8s} " + f"{job.job_state:<4s} {job.queue:<8} " + f"{job.from_host:<11s} {job.exec_host:<11s} {job.name:<8s} " + f"{submit_time}\n" + ) + return string + + +def read(path: Path, default: Optional[str] = None) -> Optional[str]: + return path.read_text().strip() if path.exists() else default + + +def main() -> None: + args = get_parser().parse_args() + + jobdir = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs" + + jobs: List[Job] = [] + for jobid in args.jobids: + pid = read(jobdir / f"{jobid}.pid") + returncode = read(jobdir / f"{jobid}.returncode") + state = LsfState.PEND + if pid is not None: + state = LsfState.RUN + else: + state = LsfState.DONE if returncode == 0 else LsfState.EXIT + jobs.append( + Job( + **{ + "job_id": jobid, + "name": read(jobdir / "f{jobid}.name") or "_", + "job_state": state, + } + ) + ) + + print(lsf_formatter(jobs)) + + +if __name__ == "__main__": + main() diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bkill b/tests/integration_tests/scheduler/mock_lsf_bin/bkill new file mode 100755 index 00000000000..b8bc5ba8917 --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bkill @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bkill.py" "$@" diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py b/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py new file mode 100644 index 00000000000..92f28667337 --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py @@ -0,0 +1,32 @@ +import argparse +import os +import signal +import sys +from pathlib import Path + + +def get_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Kill jobs") + parser.add_argument("jobids", type=str, nargs="+") + return parser + + +def main() -> None: + args = get_parser().parse_args() + + jobdir = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs" + killsignal = signal.SIGTERM + for jobid in args.jobids: + pidfile = jobdir / f"{jobid}.pid" + if not pidfile.exists(): + print("fixme, job did not exist") # verify_me + sys.exit(1) # verify_me + pid = int(pidfile.read_text(encoding="utf-8").strip()) + print(f"{pidfile=} {pid=}") + os.kill(pid, killsignal) + pidfile.unlink() + print(f"killed a job={pid} with {killsignal=}") # verify with actual bkill + + +if __name__ == "__main__": + main() diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bsub b/tests/integration_tests/scheduler/mock_lsf_bin/bsub new file mode 100755 index 00000000000..ecddcc8d56d --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bsub @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bsub.py" "$@" & +disown -a diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py b/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py new file mode 100644 index 00000000000..4dd32d30260 --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py @@ -0,0 +1,37 @@ +import argparse +import os +import random +import subprocess +from pathlib import Path + + +def get_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Submit jobs to Slurm via LSF syntax") + + parser.add_argument("-J", "--job_name", type=str) + parser.add_argument("shellcode", type=str, nargs="+") + return parser + + +def main() -> None: + args = get_parser().parse_args() + + jobdir = Path(os.getenv("PYTEST_TMP_PATH", ".")) / "mock_jobs" + jobid = str( + random.randint(1, 2**15) + ) # todo: ensure not in use to avoid flakyness + jobdir.mkdir(exist_ok=True, parents=True) + + (jobdir / f"{jobid}.script").write_text(" ".join(args.shellcode)) + (jobdir / f"{jobid}.name").write_text(args.job_name or "no_name") + print(f"Job <{jobid}> is submitted to default queue .") + + # Actually run the requested command/script on localhost + subprocess.Popen( + [Path(__file__).parent / "runner", str(jobdir / jobid)], + ) + print("exiting main in bsub.py") + + +if __name__ == "__main__": + main() diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/runner b/tests/integration_tests/scheduler/mock_lsf_bin/runner new file mode 100755 index 00000000000..85aaa164d60 --- /dev/null +++ b/tests/integration_tests/scheduler/mock_lsf_bin/runner @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +job=$1 + +function handle_sigterm { + # Torque uses (256 + SIGNAL) as the returncode + # Fix this to whatever LSF does.. + # SIGTERM=15 + echo "271" > "${job}.returncode" + kill $child_pid + exit 0 +} + +trap handle_sigterm SIGTERM + +echo "$$" > "${job}.pid" +sh "${job}.script" > "${job}.stdout" 2> "${job}.stderr" & +child_pid=$! +wait + +echo "runner finished sh code" +echo "$?" > "${job}.returncode" diff --git a/tests/integration_tests/scheduler/test_lsf_driver.py b/tests/integration_tests/scheduler/test_lsf_driver.py index 59707cb13fc..ff5604ff50e 100644 --- a/tests/integration_tests/scheduler/test_lsf_driver.py +++ b/tests/integration_tests/scheduler/test_lsf_driver.py @@ -1,17 +1,34 @@ import asyncio import contextlib +import os +import sys import pytest from ert.scheduler import LsfDriver +@pytest.fixture(autouse=True) +def mock_lsf(pytestconfig, monkeypatch, tmp_path): + if pytestconfig.getoption("lsf"): + # User provided --lsf, which means we should use the actual LSF + # cluster without mocking anything. + return + + bin_path = os.path.join(os.path.dirname(__file__), "mock_lsf_bin") + + monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}") + monkeypatch.setenv("PYTEST_TMP_PATH", str(tmp_path)) + monkeypatch.setenv("PYTHON", sys.executable) + + @pytest.mark.timeout(5) -@pytest.mark.requires_lsf async def test_submit_poll_and_kill(): driver = LsfDriver() iens: int = 0 - await driver.submit(iens, "sleep 100", cwd=".", job_name="_") + print("sutmibbint") + await driver.submit(iens, "sleep", "100", cwd=".", job_name="_") + print("submit done awaited") assert iens in driver._iens2jobid jobid = driver._iens2jobid[iens] assert driver._jobs[jobid] == (iens, "PEND") @@ -33,8 +50,6 @@ async def test_submit_poll_and_kill(): await poll_task -@pytest.mark.timeout(5) -@pytest.mark.requires_lsf async def test_job_name(): driver = LsfDriver() iens: int = 0