diff --git a/tests/conftest.py b/tests/conftest.py index 7d05edf..7cc3a2c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,21 @@ def use_test_config_dir(vte, monkeypatch): global_state.flush_global_state() +@pytest.fixture +def use_distributed_config(request, monkeypatch): + config_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "test_parsing_distributed", + request.param, + ) + with monkeypatch.context() as m: + from valohai.distributed import Distributed + + m.setattr(Distributed, "_get_config_path", lambda self: config_path) + yield config_path + global_state.flush_global_state() + + @pytest.fixture def outputs_path(tmpdir, monkeypatch): outputs_path = os.path.join(str(tmpdir), "outputs") diff --git a/tests/test_distributed.py b/tests/test_distributed.py new file mode 100644 index 0000000..e4ca6ae --- /dev/null +++ b/tests/test_distributed.py @@ -0,0 +1,134 @@ +import json + +import pytest + +import valohai +from valohai.internals.distributed_config import Member +from valohai.internals.distributed_config.utils import member_ids_to_rank_mapping + +# all _valid_ test distributed configurations for different use-cases +configs = { + "exposed_ports": "exposed-ports.json", + "is-master": "is-master.json", + "is-not-master": "is-not-master.json", + "network-host": "network-host.json", + "no-public-ips": "no-public-ips.json", +} + + +@pytest.mark.parametrize("use_distributed_config", configs.values(), indirect=True) +def test_parsing_basic_values(use_distributed_config): + assert valohai.distributed.is_distributed_task() + assert valohai.distributed.group_name.startswith("task-") + assert valohai.distributed.member_id in ["0", "1", "2"] + assert valohai.distributed.rank in [0, 1, 2] + assert isinstance(valohai.distributed.required_count, int) + assert isinstance(valohai.distributed.self(), Member) + assert len(valohai.distributed.members()) > 0 + for member in valohai.distributed.members(): + assert isinstance(member, Member) + assert isinstance(member.rank, int) + assert isinstance(member.exposed_ports, dict) + assert all( + isinstance(k, str) and isinstance(v, str) + for k, v in member.exposed_ports.items() + ) + assert all(isinstance(li, str) for li in member.local_ips) + assert all(isinstance(pi, str) for pi in member.public_ips) + + +@pytest.mark.parametrize( + "use_distributed_config", ["I do not exist.json"], indirect=True +) +def test_using_missing_file(use_distributed_config): + assert not valohai.distributed.is_distributed_task() + with pytest.raises(FileNotFoundError): + assert valohai.distributed.group_name + + +@pytest.mark.parametrize("use_distributed_config", ["malformed.json"], indirect=True) +def test_using_malformed_file(use_distributed_config): + assert not valohai.distributed.is_distributed_task() + with pytest.raises(json.decoder.JSONDecodeError): + assert valohai.distributed.group_name + + +@pytest.mark.parametrize("use_distributed_config", configs.values(), indirect=True) +def test_getting_member_by_id(use_distributed_config): + expected_self = valohai.distributed.member(valohai.distributed.self().member_id) + assert expected_self.member_id == valohai.distributed.self().member_id + assert expected_self.identity == valohai.distributed.self().identity + assert expected_self.job_id == valohai.distributed.self().job_id + + +@pytest.mark.parametrize( + "use_distributed_config", [configs["is-master"]], indirect=True +) +def test_unable_to_find_member_by_id(use_distributed_config): + with pytest.raises(Exception) as e: + valohai.distributed.member("1234") + assert "no member" in str(e).lower() + + +@pytest.mark.parametrize( + "use_distributed_config", [configs["is-master"]], indirect=True +) +def test_checking_master_as_master(use_distributed_config): + master = valohai.distributed.master() + assert valohai.distributed.self().member_id == master.member_id + assert valohai.distributed.member_id == master.member_id + assert master.is_master + assert valohai.distributed.self().is_master + + +@pytest.mark.parametrize( + "use_distributed_config", [configs["is-not-master"]], indirect=True +) +def test_checking_master_as_non_master(use_distributed_config): + master = valohai.distributed.master() + assert valohai.distributed.self().member_id != master.member_id + assert valohai.distributed.member_id != master.member_id + assert master.is_master + assert not valohai.distributed.self().is_master + + +@pytest.mark.parametrize("use_distributed_config", configs.values(), indirect=True) +def test_getting_master_primary_local_ip(use_distributed_config): + assert valohai.distributed.master().primary_local_ip + + +@pytest.mark.parametrize( + "use_distributed_config", [configs["network-host"]], indirect=True +) +def test_getting_master_primary_public_ip(use_distributed_config): + assert valohai.distributed.master().primary_public_ip + + +@pytest.mark.parametrize( + "use_distributed_config", [configs["no-public-ips"]], indirect=True +) +def test_failing_to_get_master_primary_public_ip(use_distributed_config): + with pytest.raises(Exception) as e: + assert valohai.distributed.master().primary_public_ip + assert "no public ips" in str(e).lower() + + +@pytest.mark.parametrize( + "ids_and_ranking", + [ + (["0", "1", "2"], {"0": 0, "1": 1, "2": 2}), + (["2", "0", "1"], {"0": 0, "1": 1, "2": 2}), # integers out-of-order is fine + ( + ["30", "100", "2000"], + {"30": 0, "100": 1, "2000": 2}, + ), # integers don't map 1:1 to ranking + ( + ["abc", "ghj", "def"], + {"abc": 0, "def": 1, "ghj": 2}, + ), # member ids are non-integer + (["10", "2", "x"], {"10": 0, "2": 1, "x": 2}), # mixed type will string sort + ], +) +def test_ranking_member_ids(ids_and_ranking): + member_ids, ranking = ids_and_ranking + assert member_ids_to_rank_mapping(member_ids) == ranking diff --git a/tests/test_parsing_distributed/exposed-ports.json b/tests/test_parsing_distributed/exposed-ports.json new file mode 100644 index 0000000..1c5eeca --- /dev/null +++ b/tests/test_parsing_distributed/exposed-ports.json @@ -0,0 +1,81 @@ +{ + "config": { + "group_name": "task-0180f5a9-9ffe-4e09-d5a7-9a0a507019d4", + "member_id": "0", + "required_count": 3 + }, + "members": [ + { + "announce_time": "2022-05-24T10:42:57", + "identity": "happy-yjaqaqlx", + "job_id": "exec-0180f5a9-a002-45a0-f0e6-8e98720eeaad", + "member_id": "0", + "network": { + "exposed_ports": { + "1234": "1234" + }, + "local_ips": [ + "10.0.16.61", + "fe80::4001:aff:fe00:103d" + ], + "public_ips": [ + "34.121.32.110" + ] + } + }, + { + "announce_time": "2022-05-24T10:42:58", + "identity": "happy-kwfncqxe", + "job_id": "exec-0180f5a9-a007-633b-8af3-e11593482653", + "member_id": "2", + "network": { + "exposed_ports": { + "1234": "1234" + }, + "local_ips": [ + "10.0.16.60", + "fe80::4001:aff:fe00:103c" + ], + "public_ips": [ + "34.134.18.149" + ] + } + }, + { + "announce_time": "2022-05-24T10:42:57", + "identity": "happy-tcuaezxm", + "job_id": "exec-0180f5a9-a005-f2ef-693a-3b4c4c115ed8", + "member_id": "1", + "network": { + "exposed_ports": { + "1234": "1234" + }, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [ + "35.194.55.255" + ] + } + } + ], + "self": { + "announce_time": "2022-05-24T10:42:57.461646", + "identity": "happy-yjaqaqlx", + "job_id": "exec-0180f5a9-a002-45a0-f0e6-8e98720eeaad", + "member_id": "0", + "network": { + "exposed_ports": { + "1234": "1234" + }, + "local_ips": [ + "10.0.16.61", + "fe80::4001:aff:fe00:103d" + ], + "public_ips": [ + "34.121.32.110" + ] + } + } +} diff --git a/tests/test_parsing_distributed/is-master.json b/tests/test_parsing_distributed/is-master.json new file mode 100644 index 0000000..bf59dbb --- /dev/null +++ b/tests/test_parsing_distributed/is-master.json @@ -0,0 +1,51 @@ +{ + "config": { + "group_name": "task-20f5bfe1-ea93-47a4-b449-805649f0651d", + "member_id": "0", + "required_count": 2 + }, + "members": [ + { + "announce_time": "2022-05-24T10:36:01", + "identity": "happy-tcuaezxm", + "job_id": "exec-5c8c367d-00ce-4866-8b3f-69e66008e980", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [] + } + }, + { + "announce_time": "2022-05-24T10:36:05", + "identity": "happy-kwfncqxe", + "job_id": "exec-0e4f7f9e-ee92-4b59-92fa-187fe026d3af", + "member_id": "1", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.60", + "fe80::4001:aff:fe00:103c" + ], + "public_ips": [] + } + } + ], + "self": { + "announce_time": "2022-05-24T10:36:01.696740", + "identity": "happy-tcuaezxm", + "job_id": "exec-5c8c367d-00ce-4866-8b3f-69e66008e980", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [] + } + } +} diff --git a/tests/test_parsing_distributed/is-not-master.json b/tests/test_parsing_distributed/is-not-master.json new file mode 100644 index 0000000..6f06539 --- /dev/null +++ b/tests/test_parsing_distributed/is-not-master.json @@ -0,0 +1,73 @@ +{ + "config": { + "group_name": "task-0180f5b0-6825-e65f-86be-a0f92beb15f0", + "member_id": "2", + "required_count": 3 + }, + "members": [ + { + "announce_time": "2022-05-24T10:50:21", + "identity": "chubby-gcp-2-8192-kwfncqxe", + "job_id": "exec-0180f5b0-682b-4c97-c759-c16f117b2547", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.60", + "fe80::4001:aff:fe00:103c" + ], + "public_ips": [ + "34.134.18.149" + ] + } + }, + { + "announce_time": "2022-05-24T10:50:22", + "identity": "chubby-gcp-2-8192-yjaqaqlx", + "job_id": "exec-0180f5b0-682f-496a-d32b-593fdf9d1b3e", + "member_id": "2", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.61", + "fe80::4001:aff:fe00:103d" + ], + "public_ips": [ + "34.121.32.110" + ] + } + }, + { + "announce_time": "2022-05-24T10:50:22", + "identity": "chubby-gcp-2-8192-tcuaezxm", + "job_id": "exec-0180f5b0-682d-2aae-08c5-9d78b4a1b9b3", + "member_id": "1", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [ + "35.194.55.255" + ] + } + } + ], + "self": { + "announce_time": "2022-05-24T10:50:22.696496", + "identity": "chubby-gcp-2-8192-yjaqaqlx", + "job_id": "exec-0180f5b0-682f-496a-d32b-593fdf9d1b3e", + "member_id": "2", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.61", + "fe80::4001:aff:fe00:103d" + ], + "public_ips": [ + "34.121.32.110" + ] + } + } +} diff --git a/tests/test_parsing_distributed/malformed.json b/tests/test_parsing_distributed/malformed.json new file mode 100644 index 0000000..d66b4cc --- /dev/null +++ b/tests/test_parsing_distributed/malformed.json @@ -0,0 +1,8 @@ +{ + "config": { + "group_name": "task-0180f5a0-f0c6-6398-1c0b-8e279cbf3d40", + "member_id": "0", + "required_count": 3 + }, + "members": [ + { \ No newline at end of file diff --git a/tests/test_parsing_distributed/network-host.json b/tests/test_parsing_distributed/network-host.json new file mode 100644 index 0000000..34211da --- /dev/null +++ b/tests/test_parsing_distributed/network-host.json @@ -0,0 +1,73 @@ +{ + "config": { + "group_name": "task-0180f5a0-f0c6-6398-1c0b-8e279cbf3d40", + "member_id": "0", + "required_count": 3 + }, + "members": [ + { + "announce_time": "2022-05-24T10:36:01", + "identity": "happy-tcuaezxm", + "job_id": "exec-0180f5a0-f0ca-f018-d82e-bcbcd9a62693", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [ + "35.194.55.255" + ] + } + }, + { + "announce_time": "2022-05-24T10:36:06", + "identity": "happy-yjaqaqlx", + "job_id": "exec-0180f5a0-f0ce-06ce-c9c2-ceb5c7960317", + "member_id": "2", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.61", + "fe80::4001:aff:fe00:103d" + ], + "public_ips": [ + "34.121.32.110" + ] + } + }, + { + "announce_time": "2022-05-24T10:36:05", + "identity": "happy-kwfncqxe", + "job_id": "exec-0180f5a0-f0cc-132f-a3f0-04ee9b67d7a2", + "member_id": "1", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.60", + "fe80::4001:aff:fe00:103c" + ], + "public_ips": [ + "34.134.18.149" + ] + } + } + ], + "self": { + "announce_time": "2022-05-24T10:36:01.696740", + "identity": "happy-tcuaezxm", + "job_id": "exec-0180f5a0-f0ca-f018-d82e-bcbcd9a62693", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [ + "35.194.55.255" + ] + } + } +} diff --git a/tests/test_parsing_distributed/no-public-ips.json b/tests/test_parsing_distributed/no-public-ips.json new file mode 100644 index 0000000..9d94da1 --- /dev/null +++ b/tests/test_parsing_distributed/no-public-ips.json @@ -0,0 +1,65 @@ +{ + "config": { + "group_name": "task-0b342f7a-d09a-4be8-aa7f-69429f824734", + "member_id": "0", + "required_count": 3 + }, + "members": [ + { + "announce_time": "2022-05-24T10:36:01", + "identity": "happy-tcuaezxm", + "job_id": "exec-0180f5a0-f0ca-f018-d82e-bcbcd9a62693", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [] + } + }, + { + "announce_time": "2022-05-24T10:36:06", + "identity": "happy-yjaqaqlx", + "job_id": "exec-0180f5a0-f0ce-06ce-c9c2-ceb5c7960317", + "member_id": "2", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.61", + "fe80::4001:aff:fe00:103d" + ], + "public_ips": [] + } + }, + { + "announce_time": "2022-05-24T10:36:05", + "identity": "happy-kwfncqxe", + "job_id": "exec-0180f5a0-f0cc-132f-a3f0-04ee9b67d7a2", + "member_id": "1", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.60", + "fe80::4001:aff:fe00:103c" + ], + "public_ips": [] + } + } + ], + "self": { + "announce_time": "2022-05-24T10:36:01.696740", + "identity": "happy-tcuaezxm", + "job_id": "exec-0180f5a0-f0ca-f018-d82e-bcbcd9a62693", + "member_id": "0", + "network": { + "exposed_ports": {}, + "local_ips": [ + "10.0.16.59", + "fe80::4001:aff:fe00:103b" + ], + "public_ips": [] + } + } +} diff --git a/valohai/__init__.py b/valohai/__init__.py index a1accb6..750f668 100644 --- a/valohai/__init__.py +++ b/valohai/__init__.py @@ -3,6 +3,7 @@ import papi from valohai.inputs import inputs +from valohai.internals.global_state import distributed from valohai.metadata import logger from valohai.outputs import outputs from valohai.parameters import parameters @@ -10,4 +11,12 @@ Pipeline = papi.Papi -__all__ = ["inputs", "logger", "outputs", "parameters", "prepare", "Pipeline"] +__all__ = [ + "distributed", + "inputs", + "logger", + "outputs", + "parameters", + "prepare", + "Pipeline", +] diff --git a/valohai/distributed.py b/valohai/distributed.py new file mode 100644 index 0000000..48f8d8c --- /dev/null +++ b/valohai/distributed.py @@ -0,0 +1,68 @@ +import json +from typing import List, Optional + +from valohai import paths +from valohai.internals.distributed_config import DistributedConfig, Member +from valohai.internals.distributed_config.consts import MASTER_MEMBER_ID + + +class Distributed: + """ + Distributed toolkit accessed through `valohai.distributed`. + """ + + _config: Optional[DistributedConfig] = None + + def is_distributed_task(self) -> bool: + # not a property to mimic `is_running_in_valohai` + try: + return bool(self.config.group_name) + except FileNotFoundError: + return False + except json.decoder.JSONDecodeError: + return False + + @property + def group_name(self) -> str: + return self.config.group_name + + @property + def member_id(self) -> str: + return self.config.member_id + + @property + def rank(self) -> Optional[int]: + return self.self().rank + + @property + def required_count(self) -> int: + return self.config.required_count + + def members(self) -> List[Member]: + return self.config.members + + def member(self, member_id: str) -> Member: + for member in self.members(): + if member.member_id == member_id: + return member + raise RuntimeError(f"No member with id {member_id}") + + def self(self) -> Member: + return self.member(member_id=self.config.member_id) + + def master(self) -> Member: + return self.member(member_id=MASTER_MEMBER_ID) + + @property + def config(self) -> DistributedConfig: + if not self._config: + with open(self._get_config_path()) as json_file: + json_data = json.load(json_file) + self._config = DistributedConfig.from_json_data(json_data) + return self._config + + def _get_config_path(self) -> str: + return paths.get_distributed_config_path() + + def flush_state(self) -> None: + self._config = None diff --git a/valohai/internals/global_state.py b/valohai/internals/global_state.py index 1597195..ca195f5 100644 --- a/valohai/internals/global_state.py +++ b/valohai/internals/global_state.py @@ -1,5 +1,6 @@ from typing import Any, Dict, Optional +from valohai.distributed import Distributed from valohai.internals.input_info import InputInfo loaded: bool = False @@ -7,12 +8,14 @@ parameters: Dict[str, Any] = {} step_name: Optional[str] = None image_name: Optional[str] = None +distributed = Distributed() def flush_global_state() -> None: - global loaded, inputs, parameters, step_name, image_name + global loaded, inputs, parameters, step_name, image_name, distributed loaded = False inputs = {} parameters = {} step_name = None image_name = None + distributed.flush_state()