From 8082875060e07e8a9e64bad774bc7300f88349f9 Mon Sep 17 00:00:00 2001 From: Jonathan Karlsen Date: Wed, 3 Apr 2024 11:53:58 +0200 Subject: [PATCH] Implement EXCLUDE_HOST for scheduler lsf driver This commit implements the EXCLUDE_HOST keyword for the scheduler LSF driver, the same way it was implemented in C. --- src/ert/scheduler/__init__.py | 1 + src/ert/scheduler/lsf_driver.py | 53 ++++++++++++++-- tests/unit_tests/scheduler/test_lsf_driver.py | 61 ++++++++++++++++++- tests/unit_tests/scheduler/test_scheduler.py | 3 + 4 files changed, 112 insertions(+), 6 deletions(-) diff --git a/src/ert/scheduler/__init__.py b/src/ert/scheduler/__init__.py index cff785275a7..5658b3c1a1b 100644 --- a/src/ert/scheduler/__init__.py +++ b/src/ert/scheduler/__init__.py @@ -39,6 +39,7 @@ def create_driver(config: QueueConfig) -> Driver: bkill_cmd=queue_config.get("BKILL_CMD"), bjobs_cmd=queue_config.get("BJOBS_CMD"), bhist_cmd=queue_config.get("BHIST_CMD"), + exclude_hosts=queue_config.get("EXCLUDE_HOST"), queue_name=queue_config.get("LSF_QUEUE"), resource_requirement=queue_config.get("LSF_RESOURCE"), ) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 55f0a5ad52d..7feaac6560a 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -20,6 +20,7 @@ Mapping, MutableMapping, Optional, + Sequence, Tuple, Union, get_args, @@ -102,6 +103,39 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, Dict[str, Dict[str, str]]]: return {"jobs": data} +def build_resource_requirement_string( + exclude_hosts: Sequence[str], resource_requirement: Optional[str] +) -> str: + exclude_hosts_string = "" + if exclude_hosts: + # Create a string representing the exclusion of hosts if any are provided. + select_list = [ + f"hname!='{host_name}'" for host_name in exclude_hosts if host_name + ] + exclude_hosts_string = " && ".join(select_list) + + # If no resource requirements are provided, simply return the exclusion string. + if not resource_requirement: + return f"select[{exclude_hosts_string}]" if exclude_hosts_string else "" + + # If 'select[' is in the resource requirement string, insert the exclusion string. + if "select[" in resource_requirement and exclude_hosts_string: + # We split string into (before) "bla[..] bla[..] select[xxx_" + # and (after) "... bla[..] bla[..]". (we replaced one ']' with ' ') + # Then we make final string: before + &&excludes] + after + end_index = resource_requirement.rindex("]") + first_part = resource_requirement[:end_index] + second_part = resource_requirement[end_index:] + return f"{first_part} && {exclude_hosts_string}{second_part}" + + # If 'select[' is not in the resource requirement, append the exclusion string. + if exclude_hosts_string: + return f"{resource_requirement} select[{exclude_hosts_string}]" + + # Return the original resource requirement if no exclusions are needed. + return resource_requirement + + def parse_bhist(bhist_output: str) -> Dict[str, Dict[str, int]]: data: Dict[str, Dict[str, int]] = {} for line in bhist_output.splitlines(): @@ -137,6 +171,7 @@ def __init__( self, queue_name: Optional[str] = None, resource_requirement: Optional[str] = None, + exclude_hosts: Optional[str] = None, bsub_cmd: Optional[str] = None, bjobs_cmd: Optional[str] = None, bkill_cmd: Optional[str] = None, @@ -145,7 +180,10 @@ def __init__( super().__init__() self._queue_name = queue_name - self._resource_requirement = resource_requirement or "" + self._resource_requirement = resource_requirement + self._exclude_hosts = [ + host.strip() for host in (exclude_hosts.split(",") if exclude_hosts else []) + ] self._bsub_cmd = Path(bsub_cmd or shutil.which("bsub") or "bsub") self._bjobs_cmd = Path(bjobs_cmd or shutil.which("bjobs") or "bjobs") @@ -197,14 +235,11 @@ async def submit( script_path = Path(script_handle.name) assert script_path is not None script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC) - arg_resource_requirement = ( - ["-R", self._resource_requirement] if self._resource_requirement else [] - ) bsub_with_args: List[str] = ( [str(self._bsub_cmd)] + arg_queue_name - + arg_resource_requirement + + self._build_resource_requirement_arg() + ["-J", name, str(script_path), str(runpath)] ) logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}") @@ -400,5 +435,13 @@ async def _poll_once_by_bhist(self, missing_job_ids: Iterable[str]) -> _Stat: self._bhist_cache_timestamp = time.time() return _Stat(**{"jobs": jobs}) + def _build_resource_requirement_arg(self) -> List[str]: + resource_requirement_string = build_resource_requirement_string( + self._exclude_hosts, self._resource_requirement + ) + return ( + ["-R", resource_requirement_string] if resource_requirement_string else [] + ) + async def finish(self) -> None: pass diff --git a/tests/unit_tests/scheduler/test_lsf_driver.py b/tests/unit_tests/scheduler/test_lsf_driver.py index 311fbd4db6b..fddc625b11b 100644 --- a/tests/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/unit_tests/scheduler/test_lsf_driver.py @@ -5,7 +5,7 @@ from contextlib import ExitStack as does_not_raise from pathlib import Path from textwrap import dedent -from typing import Collection, List, get_args +from typing import Collection, List, Optional, get_args import pytest from hypothesis import given @@ -24,6 +24,7 @@ RunningJob, StartedEvent, _Stat, + build_resource_requirement_string, parse_bhist, parse_bjobs, ) @@ -139,6 +140,7 @@ async def test_submit_with_resource_requirement(): assert "-R select[cs && x86_64Linux]" in Path("captured_bsub_args").read_text( encoding="utf-8" ) + assert "hname" not in Path("captured_bsub_args").read_text(encoding="utf-8") @pytest.mark.parametrize( @@ -518,6 +520,63 @@ async def test_that_bsub_will_retry_and_succeed( await driver.submit(0, "sleep 10") +@pytest.mark.parametrize( + "resource_requirement, exclude_hosts, expected_string", + [ + pytest.param(None, None, "", id="None input"), + pytest.param( + "rusage[mem=50]", + [], + "rusage[mem=50]", + id="resource_requirement_without_select_and_no_excluded_hosts", + ), + pytest.param( + None, + ["linrgs12-foo", "linrgs13-bar"], + "select[hname!='linrgs12-foo' && hname!='linrgs13-bar']", + id="None_resource_string_with_excluded_hosts", + ), + pytest.param( + "rusage[mem=50]", + ["linrgs12-foo"], + "rusage[mem=50] select[hname!='linrgs12-foo']", + id="resource_requirement_and_excluded_hosts", + ), + pytest.param( + "select[location=='cloud']", + ["linrgs12-foo", "linrgs13-bar"], + "select[location=='cloud' && hname!='linrgs12-foo' && hname!='linrgs13-bar']", + id="multiple_selects", + ), + pytest.param( + None, + [""], + "", + id="None_resource_requirement_with_empty_string_in_excluded_hosts", + ), + pytest.param( + "rusage[mem=50]", + [""], + "rusage[mem=50]", + id="resource_requirement_and_empty_string_in_excluded_hosts", + ), + pytest.param( + "select[location=='cloud']", + [""], + "select[location=='cloud']", + id="select_in_resource_requirement_and_empty_string_in_excluded_hosts", + ), + ], +) +def test_build_resource_requirement_string( + resource_requirement: Optional[str], exclude_hosts: List[str], expected_string: str +): + assert ( + build_resource_requirement_string(exclude_hosts, resource_requirement) + == expected_string + ) + + @pytest.mark.parametrize( "bhist_output, expected", [ diff --git a/tests/unit_tests/scheduler/test_scheduler.py b/tests/unit_tests/scheduler/test_scheduler.py index bf2463570ca..eb75d15abd3 100644 --- a/tests/unit_tests/scheduler/test_scheduler.py +++ b/tests/unit_tests/scheduler/test_scheduler.py @@ -563,6 +563,7 @@ def test_scheduler_create_lsf_driver(): bjobs_cmd = "bar_bjobs_cmd" bhist_cmd = "com_bjobs_cmd" lsf_resource = "select[cs && x86_64Linux]" + exclude_host = "host1,host2" queue_config_dict = { "QUEUE_SYSTEM": "LSF", "QUEUE_OPTION": [ @@ -572,6 +573,7 @@ def test_scheduler_create_lsf_driver(): ("LSF", "BHIST_CMD", bhist_cmd), ("LSF", "LSF_QUEUE", queue_name), ("LSF", "LSF_RESOURCE", lsf_resource), + ("LSF", "EXCLUDE_HOST", exclude_host), ], } queue_config = QueueConfig.from_dict(queue_config_dict) @@ -582,6 +584,7 @@ def test_scheduler_create_lsf_driver(): assert str(driver._bhist_cmd) == bhist_cmd assert driver._queue_name == queue_name assert driver._resource_requirement == lsf_resource + assert driver._exclude_hosts == ["host1", "host2"] def test_scheduler_create_openpbs_driver():