Skip to content

Commit

Permalink
Add test_job_name for LSF
Browse files Browse the repository at this point in the history
  • Loading branch information
berland committed Jan 22, 2024
1 parent d7c0632 commit 6cd1002
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
33 changes: 24 additions & 9 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Awaitable,
Callable,
Dict,
List,
Mapping,
MutableMapping,
Optional,
Expand Down Expand Up @@ -100,8 +101,7 @@ async def run_with_retries(
try:
await func()
return
except (asyncio.CancelledError, IndexError) as e:
logger.error(e)
except (asyncio.CancelledError, RuntimeError):
await asyncio.sleep(self._retry_sleep_period)
raise RuntimeError(error_msg)

Expand All @@ -116,18 +116,33 @@ async def submit(
async def _submit_once(
self, iens: int, executable: str, /, *args: str, cwd: str, job_name: str
) -> None:
process = await asyncio.create_subprocess_exec(
self._bsub_cmd,
bsub_with_args: List[str] = [
str(self._bsub_cmd),
"-J",
job_name,
executable,
cwd,
]
process = await asyncio.create_subprocess_exec(
*bsub_with_args,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
job_id, _ = await process.communicate()
job_id_ = (
job_id.decode("utf-8").strip().replace("<", "").replace(">", "").split()[1]
)
job_id, stderr = await process.communicate()
try:
job_id_ = (
job_id.decode("utf-8")
.strip()
.replace("<", "")
.replace(">", "")
.split()[1]
)
except IndexError as err:
logger.error(
f"Command \"{' '.join(bsub_with_args)}\" failed with error message: {stderr.decode()}"
)
raise RuntimeError from err

(Path(cwd) / LSF_JSON).write_text(
json.dumps({"job_id": job_id_}), encoding="utf-8"
)
Expand Down
16 changes: 16 additions & 0 deletions tests/integration_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,19 @@ async def test_submit_poll_and_kill():

with contextlib.suppress(asyncio.CancelledError):
await poll_task


@pytest.mark.timeout(5)
@pytest.mark.requires_lsf
async def test_job_name():
driver = LsfDriver()
iens: int = 0
await driver.submit(iens, "sleep 99", cwd=".", job_name="my_job_name")
jobid = driver._iens2jobid[iens]
bjobs_process = await asyncio.create_subprocess_exec(
"bjobs",
jobid,
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await bjobs_process.communicate()
assert "my_job_name" in stdout.decode()

0 comments on commit 6cd1002

Please sign in to comment.