Skip to content

Commit

Permalink
Fix slurm exit-code func
Browse files Browse the repository at this point in the history
  • Loading branch information
verveerpj committed Dec 3, 2024
1 parent 309a535 commit a0b0bb8
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions src/ert/scheduler/slurm_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
include_hosts: str = "",
squeue_cmd: str = "squeue",
scontrol_cmd: str = "scontrol",
sacct_cmd: str = "sacct",
scancel_cmd: str = "scancel",
sbatch_cmd: str = "sbatch",
user: Optional[str] = None,
Expand Down Expand Up @@ -119,6 +120,7 @@ def __init__(
self._squeue = squeue_cmd

self._scontrol = scontrol_cmd
self._sacct_cmd = sacct_cmd
self._scontrol_cache_timestamp = 0.0
self._scontrol_required_cache_age = 30
self._scontrol_cache: dict[str, ScontrolInfo] = {}
Expand Down Expand Up @@ -288,7 +290,7 @@ async def poll(self) -> None:
scontrol_states = {}
for job_id in missing_in_squeue_output:
if (
scontrol_info := await self._poll_once_by_scontrol(job_id)
scontrol_info := await self._poll_once_by_sacct(job_id)
) is not None:
scontrol_states[job_id] = scontrol_info
missing_in_squeue_and_scontrol = missing_in_squeue_output - set(
Expand Down Expand Up @@ -348,7 +350,7 @@ async def _get_exit_code(self, job_id: str) -> int:
retries = 0
while retries < 10 and self._jobs[job_id].exit_code is None:
retries += 1
if (scontrol_info := await self._poll_once_by_scontrol(job_id)) is not None:
if (scontrol_info := await self._poll_once_by_sacct(job_id)) is not None:
self._jobs[job_id].exit_code = scontrol_info.exit_code
else:
await asyncio.sleep(self._poll_period)
Expand Down Expand Up @@ -396,6 +398,45 @@ async def _poll_once_by_scontrol(
self._scontrol_cache_timestamp = time.time()
return info

async def _poll_once_by_sacct(self, missing_job_id: str) -> Optional[ScontrolInfo]:
if (
time.time() - self._scontrol_cache_timestamp
< self._scontrol_required_cache_age
) and missing_job_id in self._scontrol_cache:
return self._scontrol_cache[missing_job_id]

process = await asyncio.create_subprocess_exec(
self._sacct_cmd,
"-X",
"-n",
"-o",
"ExitCode",
"-j",
str(missing_job_id),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode:
logger.error(
f"sacct gave returncode {process.returncode} with "
f"output{stdout.decode(errors='ignore').strip()} "
f"and error {stderr.decode(errors='ignore').strip()}"
)
return None

info = None
try:
info = _parse_scontrol_output(stdout.decode(errors="ignore"))
except Exception as err:
logger.error(
f"Could no parse scontrol stdout {stdout.decode(errors='ignore')}: {err}"
)
return info
self._scontrol_cache[missing_job_id] = info
self._scontrol_cache_timestamp = time.time()
return info

async def finish(self) -> None:
pass

Expand Down

0 comments on commit a0b0bb8

Please sign in to comment.