Skip to content

Commit

Permalink
Implement EXCLUDE_HOST for scheduler lsf driver
Browse files Browse the repository at this point in the history
This commit implements the EXCLUDE_HOST keyword for the scheduler LSF
driver, the same way it was implemented in C.
  • Loading branch information
jonathan-eq committed Apr 9, 2024
1 parent 414bea7 commit 8082875
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/ert/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def create_driver(config: QueueConfig) -> Driver:
bkill_cmd=queue_config.get("BKILL_CMD"),
bjobs_cmd=queue_config.get("BJOBS_CMD"),
bhist_cmd=queue_config.get("BHIST_CMD"),
exclude_hosts=queue_config.get("EXCLUDE_HOST"),
queue_name=queue_config.get("LSF_QUEUE"),
resource_requirement=queue_config.get("LSF_RESOURCE"),
)
Expand Down
53 changes: 48 additions & 5 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Mapping,
MutableMapping,
Optional,
Sequence,
Tuple,
Union,
get_args,
Expand Down Expand Up @@ -102,6 +103,39 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, Dict[str, Dict[str, str]]]:
return {"jobs": data}


def build_resource_requirement_string(
exclude_hosts: Sequence[str], resource_requirement: Optional[str]
) -> str:
exclude_hosts_string = ""
if exclude_hosts:
# Create a string representing the exclusion of hosts if any are provided.
select_list = [
f"hname!='{host_name}'" for host_name in exclude_hosts if host_name
]
exclude_hosts_string = " && ".join(select_list)

# If no resource requirements are provided, simply return the exclusion string.
if not resource_requirement:
return f"select[{exclude_hosts_string}]" if exclude_hosts_string else ""

# If 'select[' is in the resource requirement string, insert the exclusion string.
if "select[" in resource_requirement and exclude_hosts_string:
# We split string into (before) "bla[..] bla[..] select[xxx_"
# and (after) "... bla[..] bla[..]". (we replaced one ']' with ' ')
# Then we make final string: before + &&excludes] + after
end_index = resource_requirement.rindex("]")
first_part = resource_requirement[:end_index]
second_part = resource_requirement[end_index:]
return f"{first_part} && {exclude_hosts_string}{second_part}"

# If 'select[' is not in the resource requirement, append the exclusion string.
if exclude_hosts_string:
return f"{resource_requirement} select[{exclude_hosts_string}]"

# Return the original resource requirement if no exclusions are needed.
return resource_requirement


def parse_bhist(bhist_output: str) -> Dict[str, Dict[str, int]]:
data: Dict[str, Dict[str, int]] = {}
for line in bhist_output.splitlines():
Expand Down Expand Up @@ -137,6 +171,7 @@ def __init__(
self,
queue_name: Optional[str] = None,
resource_requirement: Optional[str] = None,
exclude_hosts: Optional[str] = None,
bsub_cmd: Optional[str] = None,
bjobs_cmd: Optional[str] = None,
bkill_cmd: Optional[str] = None,
Expand All @@ -145,7 +180,10 @@ def __init__(
super().__init__()

self._queue_name = queue_name
self._resource_requirement = resource_requirement or ""
self._resource_requirement = resource_requirement
self._exclude_hosts = [
host.strip() for host in (exclude_hosts.split(",") if exclude_hosts else [])
]

self._bsub_cmd = Path(bsub_cmd or shutil.which("bsub") or "bsub")
self._bjobs_cmd = Path(bjobs_cmd or shutil.which("bjobs") or "bjobs")
Expand Down Expand Up @@ -197,14 +235,11 @@ async def submit(
script_path = Path(script_handle.name)
assert script_path is not None
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)
arg_resource_requirement = (
["-R", self._resource_requirement] if self._resource_requirement else []
)

bsub_with_args: List[str] = (
[str(self._bsub_cmd)]
+ arg_queue_name
+ arg_resource_requirement
+ self._build_resource_requirement_arg()
+ ["-J", name, str(script_path), str(runpath)]
)
logger.debug(f"Submitting to LSF with command {shlex.join(bsub_with_args)}")
Expand Down Expand Up @@ -400,5 +435,13 @@ async def _poll_once_by_bhist(self, missing_job_ids: Iterable[str]) -> _Stat:
self._bhist_cache_timestamp = time.time()
return _Stat(**{"jobs": jobs})

def _build_resource_requirement_arg(self) -> List[str]:
resource_requirement_string = build_resource_requirement_string(
self._exclude_hosts, self._resource_requirement
)
return (
["-R", resource_requirement_string] if resource_requirement_string else []
)

async def finish(self) -> None:
pass
61 changes: 60 additions & 1 deletion tests/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import ExitStack as does_not_raise
from pathlib import Path
from textwrap import dedent
from typing import Collection, List, get_args
from typing import Collection, List, Optional, get_args

import pytest
from hypothesis import given
Expand All @@ -24,6 +24,7 @@
RunningJob,
StartedEvent,
_Stat,
build_resource_requirement_string,
parse_bhist,
parse_bjobs,
)
Expand Down Expand Up @@ -139,6 +140,7 @@ async def test_submit_with_resource_requirement():
assert "-R select[cs && x86_64Linux]" in Path("captured_bsub_args").read_text(
encoding="utf-8"
)
assert "hname" not in Path("captured_bsub_args").read_text(encoding="utf-8")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -518,6 +520,63 @@ async def test_that_bsub_will_retry_and_succeed(
await driver.submit(0, "sleep 10")


@pytest.mark.parametrize(
"resource_requirement, exclude_hosts, expected_string",
[
pytest.param(None, None, "", id="None input"),
pytest.param(
"rusage[mem=50]",
[],
"rusage[mem=50]",
id="resource_requirement_without_select_and_no_excluded_hosts",
),
pytest.param(
None,
["linrgs12-foo", "linrgs13-bar"],
"select[hname!='linrgs12-foo' && hname!='linrgs13-bar']",
id="None_resource_string_with_excluded_hosts",
),
pytest.param(
"rusage[mem=50]",
["linrgs12-foo"],
"rusage[mem=50] select[hname!='linrgs12-foo']",
id="resource_requirement_and_excluded_hosts",
),
pytest.param(
"select[location=='cloud']",
["linrgs12-foo", "linrgs13-bar"],
"select[location=='cloud' && hname!='linrgs12-foo' && hname!='linrgs13-bar']",
id="multiple_selects",
),
pytest.param(
None,
[""],
"",
id="None_resource_requirement_with_empty_string_in_excluded_hosts",
),
pytest.param(
"rusage[mem=50]",
[""],
"rusage[mem=50]",
id="resource_requirement_and_empty_string_in_excluded_hosts",
),
pytest.param(
"select[location=='cloud']",
[""],
"select[location=='cloud']",
id="select_in_resource_requirement_and_empty_string_in_excluded_hosts",
),
],
)
def test_build_resource_requirement_string(
resource_requirement: Optional[str], exclude_hosts: List[str], expected_string: str
):
assert (
build_resource_requirement_string(exclude_hosts, resource_requirement)
== expected_string
)


@pytest.mark.parametrize(
"bhist_output, expected",
[
Expand Down
3 changes: 3 additions & 0 deletions tests/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ def test_scheduler_create_lsf_driver():
bjobs_cmd = "bar_bjobs_cmd"
bhist_cmd = "com_bjobs_cmd"
lsf_resource = "select[cs && x86_64Linux]"
exclude_host = "host1,host2"
queue_config_dict = {
"QUEUE_SYSTEM": "LSF",
"QUEUE_OPTION": [
Expand All @@ -572,6 +573,7 @@ def test_scheduler_create_lsf_driver():
("LSF", "BHIST_CMD", bhist_cmd),
("LSF", "LSF_QUEUE", queue_name),
("LSF", "LSF_RESOURCE", lsf_resource),
("LSF", "EXCLUDE_HOST", exclude_host),
],
}
queue_config = QueueConfig.from_dict(queue_config_dict)
Expand All @@ -582,6 +584,7 @@ def test_scheduler_create_lsf_driver():
assert str(driver._bhist_cmd) == bhist_cmd
assert driver._queue_name == queue_name
assert driver._resource_requirement == lsf_resource
assert driver._exclude_hosts == ["host1", "host2"]


def test_scheduler_create_openpbs_driver():
Expand Down

0 comments on commit 8082875

Please sign in to comment.