Skip to content

Commit

Permalink
Add _execute_with_retry to pbs driver
Browse files Browse the repository at this point in the history
 - Leave qstat fail and continue, while providing a list of exit codes
 for qsub and qdel for which driver will retry.
 - Instead of timeout will just use number of retries after which the
 driver will reise RuntimeError.
  • Loading branch information
xjules committed Feb 26, 2024
1 parent df902d1 commit c6a56b8
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 9 deletions.
87 changes: 78 additions & 9 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import logging
import shlex
from asyncio.subprocess import PIPE
from typing import List, Literal, Mapping, MutableMapping, Optional, Tuple, Union

from pydantic import BaseModel, Field
Expand All @@ -18,6 +17,20 @@
JobState = Literal["B", "E", "F", "H", "M", "Q", "R", "S", "T", "U", "W", "X"]
JOBSTATE_INITIAL: JobState = "Q"

QSUB_INVALID_CREDENTIAL: int = 171
QSUB_PREMATURE_END_OF_MESSAGE: int = 183
QSUB_CONNECTION_REFUSED: int = 162
QDEL_JOB_HAS_FINISHED: int = 35
QDEL_REQUEST_INVALID: int = 168

QSUB_EXIT_CODES = [
QSUB_INVALID_CREDENTIAL,
QSUB_PREMATURE_END_OF_MESSAGE,
QSUB_CONNECTION_REFUSED,
]

QDEL_EXIT_CODES = [QDEL_REQUEST_INVALID, QDEL_JOB_HAS_FINISHED]


class FinishedJob(BaseModel):
job_state: Literal["F"]
Expand Down Expand Up @@ -56,6 +69,8 @@ def __init__(
self._queue_name = queue_name
self._memory_per_job = memory_per_job
self._job_prefix = job_prefix
self._num_retries = 10
self._retry_interval = 2

self._jobs: MutableMapping[str, Tuple[int, JobState]] = {}
self._iens2jobid: MutableMapping[int, str] = {}
Expand All @@ -66,6 +81,51 @@ def _resource_string(self) -> str:
resource_specifiers += ["mem=" + self._memory_per_job]
return ":".join(resource_specifiers)

async def _execute_with_retry(
self,
cmd_with_args: List[str],
exit_codes_triggering_retries: List[int],
) -> Tuple[bool, str]:
error_message: Optional[str] = None

for _ in range(self._num_retries):
try:
process = await asyncio.create_subprocess_exec(
*cmd_with_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()

if process.returncode == 0:
return True, stdout.decode().strip()
elif process.returncode in exit_codes_triggering_retries:
# expected exit code when failed command
error_message = stderr.decode().strip()
else:
error_message = (
f"Command \"{' '.join(cmd_with_args)}\" failed "
f"with exit code {process.returncode} and error message: {stderr.decode().strip()}"
)
logger.error(error_message)
return False, error_message

except Exception as e:
error_message = (
"Unexpected exception occurred when running "
f"\"{' '.join(cmd_with_args)}\" {str(e)}"
)
logger.error(error_message)
return False, error_message

await asyncio.sleep(self._retry_interval)
error_message = (
f"Command \"{' '.join(cmd_with_args)}\" failed after {self._num_retries} retries"
f" with error {error_message}"
)
logger.error(error_message)
return False, error_message

async def submit(
self,
iens: int,
Expand All @@ -75,7 +135,6 @@ async def submit(
name: str = "dummy",
runpath: Optional[str] = None,
) -> None:

arg_queue_name = ["-q", self._queue_name] if self._queue_name else []
resource_string = self._resource_string()
arg_resource_string = ["-l", resource_string] if resource_string else []
Expand All @@ -93,12 +152,14 @@ async def submit(
*args,
]
logger.debug(f"Submitting to PBS with command {shlex.join(qsub_with_args)}")
process = await asyncio.create_subprocess_exec(
*qsub_with_args,
stdout=PIPE,

job_success, job_message = await self._execute_with_retry(
qsub_with_args, exit_codes_triggering_retries=QSUB_EXIT_CODES
)
job_id, _ = await process.communicate()
job_id_ = job_id.decode("utf-8").strip()
if not job_success:
raise RuntimeError(job_message)

job_id_ = job_message
logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}")
self._jobs[job_id_] = (iens, JOBSTATE_INITIAL)
self._iens2jobid[iens] = job_id_
Expand All @@ -108,8 +169,12 @@ async def kill(self, iens: int) -> None:
job_id = self._iens2jobid[iens]

logger.debug(f"Killing realization {iens} with PBS-id {job_id}")
proc = await asyncio.create_subprocess_exec("qdel", job_id)
await proc.wait()

job_success, job_message = await self._execute_with_retry(
["qdel", job_id], exit_codes_triggering_retries=QDEL_EXIT_CODES
)
if not job_success:
raise RuntimeError(job_message)
except KeyError:
return

Expand All @@ -126,6 +191,10 @@ async def poll(self) -> None:
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
if proc.returncode != 0:
await asyncio.sleep(_POLL_PERIOD)
continue

stat = _Stat.model_validate_json(stdout)

for job_id, job in stat.jobs.items():
Expand Down
71 changes: 71 additions & 0 deletions tests/unit_tests/scheduler/test_openpbs_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import stat
from pathlib import Path
from textwrap import dedent
from typing import List

import pytest
Expand Down Expand Up @@ -106,3 +107,73 @@ async def test_job_name_with_prefix():
driver = OpenPBSDriver(job_prefix="pre_")
await driver.submit(0, "sleep", name="sleepy")
assert " -Npre_sleepy " in Path("captured_qsub_args").read_text(encoding="utf-8")


@pytest.mark.parametrize(
("exit_code, error_msg"),
[
(171, "Invalid credential"),
(183, "Premature end of message"),
(162, "Connection refused"),
(199, "Not recognized"),
],
)
async def test_that_qsub_will_retry_and_fail(
monkeypatch, tmp_path, exit_code, error_msg
):
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
qsub_path = bin_path / "qsub"
qsub_path.write_text(f"#!/bin/sh\necho {error_msg} >&2\nexit {exit_code}")
qsub_path.chmod(qsub_path.stat().st_mode | stat.S_IEXEC)
driver = OpenPBSDriver()
driver._num_retries = 2
driver._retry_interval = 0.2
match_str = (
f"failed after 2 retries with error {error_msg}"
if exit_code != 199
else "failed with exit code 199 and error message: Not recognized"
)
with pytest.raises(RuntimeError, match=match_str):
await driver.submit(0, "sleep 10")


@pytest.mark.parametrize(
("exit_code, error_msg"),
[
(171, "Invalid credential"),
(183, "Premature end of message"),
(162, "Connection refused"),
],
)
async def test_that_qsub_will_retry_and_succeed(
monkeypatch, tmp_path, exit_code, error_msg
):
os.chdir(tmp_path)
bin_path = tmp_path / "bin"
bin_path.mkdir()
monkeypatch.setenv("PATH", f"{bin_path}:{os.environ['PATH']}")
qsub_path = bin_path / "qsub"
qsub_path.write_text(
"#!/bin/sh"
+ dedent(
f"""
TRY_FILE="{bin_path}/script_try"
if [ -f "$TRY_FILE" ]; then
echo "SUCCESS"
exit 0
else
echo "TRIED" > $TRY_FILE
echo "{error_msg}" >&2
exit {exit_code}
fi
"""
)
)
qsub_path.chmod(qsub_path.stat().st_mode | stat.S_IEXEC)
driver = OpenPBSDriver()
driver._num_retries = 2
driver._retry_interval = 0.2
await driver.submit(0, "sleep 10")

0 comments on commit c6a56b8

Please sign in to comment.