diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index e2c85829dbc..267bc07a09c 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -109,7 +109,9 @@ async def submit( self, iens: int, executable: str, /, *args: str, cwd: str, job_name: str ) -> None: await self.run_with_retries( - lambda: self._submit_once(iens, executable, cwd=cwd, job_name=job_name), + lambda: self._submit_once( + iens, executable, *args, cwd=cwd, job_name=job_name + ), error_msg="Maximum number of submit errors exceeded", ) @@ -121,33 +123,43 @@ async def _submit_once( "-J", job_name, executable, + *args, ] + print(f"setting up bsub process {bsub_with_args}") process = await asyncio.create_subprocess_exec( *bsub_with_args, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - job_id, stderr = await process.communicate() + print("communicate bsub process") + print(" ".join(bsub_with_args)) + stdout, stderr = await process.communicate() + print("bsub done") try: - job_id_ = ( - job_id.decode("utf-8") + job_id = ( + stdout.decode("utf-8") .strip() .replace("<", "") .replace(">", "") .split()[1] ) except IndexError as err: + print( + f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}" + ) logger.error( f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}" ) raise RuntimeError from err + print(f"got {job_id=}") (Path(cwd) / LSF_JSON).write_text( - json.dumps({"job_id": job_id_}), encoding="utf-8" + json.dumps({"job_id": job_id}), encoding="utf-8" ) - self._jobs[job_id_] = (iens, _JobState.PEND) - self._iens2jobid[iens] = job_id_ + self._jobs[job_id] = (iens, _JobState.PEND) + self._iens2jobid[iens] = job_id + print("done in lsf.submit()") async def kill(self, iens: int) -> None: try: diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py b/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py index eb234e4928a..92f28667337 100644 --- a/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bkill.py @@ -21,7 +21,8 @@ def main() -> None: if not pidfile.exists(): print("fixme, job did not exist") # verify_me sys.exit(1) # verify_me - pid = int(pidfile.read_text(encoding="utf-8")) + pid = int(pidfile.read_text(encoding="utf-8").strip()) + print(f"{pidfile=} {pid=}") os.kill(pid, killsignal) pidfile.unlink() print(f"killed a job={pid} with {killsignal=}") # verify with actual bkill diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bsub b/tests/integration_tests/scheduler/mock_lsf_bin/bsub index e3c952dc89f..ecddcc8d56d 100755 --- a/tests/integration_tests/scheduler/mock_lsf_bin/bsub +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bsub @@ -1,2 +1,3 @@ #!/usr/bin/env bash -exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bsub.py" "$@" +exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bsub.py" "$@" & +disown -a diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py b/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py index ed2533eb899..4dd32d30260 100644 --- a/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py +++ b/tests/integration_tests/scheduler/mock_lsf_bin/bsub.py @@ -9,7 +9,7 @@ def get_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Submit jobs to Slurm via LSF syntax") parser.add_argument("-J", "--job_name", type=str) - parser.add_argument("shellcode", type=str) + parser.add_argument("shellcode", type=str, nargs="+") return parser @@ -22,7 +22,7 @@ def main() -> None: ) # todo: ensure not in use to avoid flakyness jobdir.mkdir(exist_ok=True, parents=True) - (jobdir / f"{jobid}.script").write_text(args.shellcode) + (jobdir / f"{jobid}.script").write_text(" ".join(args.shellcode)) (jobdir / f"{jobid}.name").write_text(args.job_name or "no_name") print(f"Job <{jobid}> is submitted to default queue .") diff --git a/tests/integration_tests/scheduler/mock_lsf_bin/runner b/tests/integration_tests/scheduler/mock_lsf_bin/runner index 1fa4926005b..85aaa164d60 100755 --- a/tests/integration_tests/scheduler/mock_lsf_bin/runner +++ b/tests/integration_tests/scheduler/mock_lsf_bin/runner @@ -5,14 +5,17 @@ function handle_sigterm { # Torque uses (256 + SIGNAL) as the returncode # Fix this to whatever LSF does.. # SIGTERM=15 - echo "Caught sigterm" echo "271" > "${job}.returncode" + kill $child_pid exit 0 } trap handle_sigterm SIGTERM echo "$$" > "${job}.pid" -sh "${job}.script" > "${job}.stdout" 2> "${job}.stderr" +sh "${job}.script" > "${job}.stdout" 2> "${job}.stderr" & +child_pid=$! +wait + echo "runner finished sh code" echo "$?" > "${job}.returncode" diff --git a/tests/integration_tests/scheduler/test_lsf_driver.py b/tests/integration_tests/scheduler/test_lsf_driver.py index 2353004379f..ff5604ff50e 100644 --- a/tests/integration_tests/scheduler/test_lsf_driver.py +++ b/tests/integration_tests/scheduler/test_lsf_driver.py @@ -26,7 +26,9 @@ def mock_lsf(pytestconfig, monkeypatch, tmp_path): async def test_submit_poll_and_kill(): driver = LsfDriver() iens: int = 0 - await driver.submit(iens, "sleep 100", cwd=".", job_name="_") + print("sutmibbint") + await driver.submit(iens, "sleep", "100", cwd=".", job_name="_") + print("submit done awaited") assert iens in driver._iens2jobid jobid = driver._iens2jobid[iens] assert driver._jobs[jobid] == (iens, "PEND")