Skip to content

Commit

Permalink
Broken WIP, driver.submit when mocked bsub does not return before the…
Browse files Browse the repository at this point in the history
… entire job is done
  • Loading branch information
berland committed Jan 25, 2024
1 parent 3fa4638 commit efcd73c
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 14 deletions.
26 changes: 19 additions & 7 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ async def submit(
self, iens: int, executable: str, /, *args: str, cwd: str, job_name: str
) -> None:
await self.run_with_retries(
lambda: self._submit_once(iens, executable, cwd=cwd, job_name=job_name),
lambda: self._submit_once(
iens, executable, *args, cwd=cwd, job_name=job_name
),
error_msg="Maximum number of submit errors exceeded",
)

Expand All @@ -121,33 +123,43 @@ async def _submit_once(
"-J",
job_name,
executable,
*args,
]
print(f"setting up bsub process {bsub_with_args}")
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
job_id, stderr = await process.communicate()
print("communicate bsub process")
print(" ".join(bsub_with_args))
stdout, stderr = await process.communicate()
print("bsub done")
try:
job_id_ = (
job_id.decode("utf-8")
job_id = (
stdout.decode("utf-8")
.strip()
.replace("<", "")
.replace(">", "")
.split()[1]
)
except IndexError as err:
print(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
raise RuntimeError from err

print(f"got {job_id=}")
(Path(cwd) / LSF_JSON).write_text(
json.dumps({"job_id": job_id_}), encoding="utf-8"
json.dumps({"job_id": job_id}), encoding="utf-8"
)
self._jobs[job_id_] = (iens, _JobState.PEND)
self._iens2jobid[iens] = job_id_
self._jobs[job_id] = (iens, _JobState.PEND)
self._iens2jobid[iens] = job_id
print("done in lsf.submit()")

async def kill(self, iens: int) -> None:
try:
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/scheduler/mock_lsf_bin/bkill.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def main() -> None:
if not pidfile.exists():
print("fixme, job did not exist") # verify_me
sys.exit(1) # verify_me
pid = int(pidfile.read_text(encoding="utf-8"))
pid = int(pidfile.read_text(encoding="utf-8").strip())
print(f"{pidfile=} {pid=}")
os.kill(pid, killsignal)
pidfile.unlink()
print(f"killed a job={pid} with {killsignal=}") # verify with actual bkill
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/scheduler/mock_lsf_bin/bsub
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#!/usr/bin/env bash
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bsub.py" "$@"
exec "${PYTHON:-$(which python3)}" "$(dirname $0)/bsub.py" "$@" &
disown -a
4 changes: 2 additions & 2 deletions tests/integration_tests/scheduler/mock_lsf_bin/bsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Submit jobs to Slurm via LSF syntax")

parser.add_argument("-J", "--job_name", type=str)
parser.add_argument("shellcode", type=str)
parser.add_argument("shellcode", type=str, nargs="+")
return parser


Expand All @@ -22,7 +22,7 @@ def main() -> None:
) # todo: ensure not in use to avoid flakyness
jobdir.mkdir(exist_ok=True, parents=True)

(jobdir / f"{jobid}.script").write_text(args.shellcode)
(jobdir / f"{jobid}.script").write_text(" ".join(args.shellcode))
(jobdir / f"{jobid}.name").write_text(args.job_name or "no_name")
print(f"Job <{jobid}> is submitted to default queue <normal>.")

Expand Down
7 changes: 5 additions & 2 deletions tests/integration_tests/scheduler/mock_lsf_bin/runner
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ function handle_sigterm {
# Torque uses (256 + SIGNAL) as the returncode
# Fix this to whatever LSF does..
# SIGTERM=15
echo "Caught sigterm"
echo "271" > "${job}.returncode"
kill $child_pid
exit 0
}

trap handle_sigterm SIGTERM

echo "$$" > "${job}.pid"
sh "${job}.script" > "${job}.stdout" 2> "${job}.stderr"
sh "${job}.script" > "${job}.stdout" 2> "${job}.stderr" &
child_pid=$!
wait

echo "runner finished sh code"
echo "$?" > "${job}.returncode"
4 changes: 3 additions & 1 deletion tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def mock_lsf(pytestconfig, monkeypatch, tmp_path):
async def test_submit_poll_and_kill():
driver = LsfDriver()
iens: int = 0
await driver.submit(iens, "sleep 100", cwd=".", job_name="_")
print("sutmibbint")
await driver.submit(iens, "sleep", "100", cwd=".", job_name="_")
print("submit done awaited")
assert iens in driver._iens2jobid
jobid = driver._iens2jobid[iens]
assert driver._jobs[jobid] == (iens, "PEND")
Expand Down

0 comments on commit efcd73c

Please sign in to comment.