Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add valohai.distributed with config parsing #88

Merged
merged 9 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ venv.bak/
.spyderproject
.spyproject

# JetBrains IDEs
.idea/

# Rope project settings
.ropeproject

Expand Down
87 changes: 62 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
Python helper library for the Valohai machine learning platform.

# Install
```python
```bash
pip install valohai-utils
```

# Execution
Run locally
```python
```bash
python mycode.py
```
Run in the cloud
```python
```bash
vh yaml step mycode.py
vh exec run -a mystep
```
Expand All @@ -33,8 +33,8 @@ vh exec run -a mystep

```python
default_parameters = {
'iterations': 100,
'learning_rate': 0.001
'iterations': 100,
'learning_rate': 0.001
}
```

Expand Down Expand Up @@ -62,31 +62,31 @@ for i in range(valohai.parameters('iterations').value):

```python
default_inputs = {
'input_name': 'http://example.com/1.png'
'input_name': 'http://example.com/1.png'
}
```

An input can also be a list of URLs or a folder:

```python
default_inputs = {
'input_name': [
'http://example.com/1.png',
'http://example.com/2.png'
],
'input_folder': [
's3://mybucket/images/*',
'azure://mycontainer/images/*',
'gs://mybucket/images/*'
]
'input_name': [
'http://example.com/1.png',
'http://example.com/2.png'
],
'input_folder': [
's3://mybucket/images/*',
'azure://mycontainer/images/*',
'gs://mybucket/images/*'
]
}
```

Or it can be an archive full of files (uncompressed automagically on-demand):

```python
default_inputs = {
'images': 'http://example.com/myimages.zip'
'images': 'http://example.com/myimages.zip'
}
```

Expand All @@ -100,7 +100,7 @@ import csv
import valohai

default_inputs = {
'myinput': 'https://pokemon-images-example.s3-eu-west-1.amazonaws.com/pokemon.csv'
'myinput': 'https://pokemon-images-example.s3-eu-west-1.amazonaws.com/pokemon.csv'
}

valohai.prepare(step="test", default_inputs=default_inputs)
Expand Down Expand Up @@ -143,10 +143,10 @@ It is important for visualization that logs for single epoch are flushed out as
import valohai

for epoch in range(100):
with valohai.metadata.logger() as logger:
logger.log("epoch", epoch)
logger.log("accuracy", accuracy)
logger.log("loss", loss)
with valohai.metadata.logger() as logger:
logger.log("epoch", epoch)
logger.log("accuracy", accuracy)
logger.log("loss", loss)
```

## Example 2
Expand All @@ -155,10 +155,37 @@ import valohai

logger = valohai.logger()
for epoch in range(100):
logger.log("epoch", epoch)
logger.log("accuracy", accuracy)
logger.log("loss", loss)
logger.flush()
logger.log("epoch", epoch)
logger.log("accuracy", accuracy)
logger.log("loss", loss)
logger.flush()
```

# Distributed Workloads

`valohai.distributed` contains a toolset for running distributed tasks on Valohai.

```python
import valohai

if valohai.distributed.is_distributed_task():

# `master()` reports the same worker on all contexts
master = valohai.distributed.master()
master_url = f'tcp://{master.primary_local_ip}:1234'

# `members()` contains all workers in the distributed task
member_public_ips = ",".join([
m.primary_public_ip
for m
in valohai.distributed.members()
])

# `me()` has full details about the current worker context
details = valohai.distributed.me()

size = valohai.distributed.required_count
rank = valohai.distributed.rank # 0, 1, 2, etc. depending on run context
```

# Full example
Expand Down Expand Up @@ -242,3 +269,13 @@ Will produce this `valohai.yaml` config:
- https://homepages.cae.wisc.edu/~ece533/images/airplane.png
optional: false
```

# Development

If you wish to further develop `valohai-utils`, remember to install development dependencies and write tests for your additions.

```bash
pip install -e . -r requirements-dev.txt
make lint
pytest
```
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ profile = black

[coverage:run]
omit =
*site-packages*
setup.py
*site-packages*
setup.py
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ def use_test_config_dir(vte, monkeypatch):
global_state.flush_global_state()


@pytest.fixture
def use_distributed_config(request, monkeypatch):
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"test_parsing_distributed",
request.param,
)
with monkeypatch.context() as m:
from valohai.distributed import Distributed

m.setattr(Distributed, "_get_config_path", lambda self: config_path)
yield config_path
global_state.flush_global_state()


@pytest.fixture
def outputs_path(tmpdir, monkeypatch):
outputs_path = os.path.join(str(tmpdir), "outputs")
Expand Down
134 changes: 134 additions & 0 deletions tests/test_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json

import pytest

import valohai
from valohai.internals.distributed_config import Member
from valohai.internals.distributed_config.utils import compute_member_id_ranks

# all _valid_ test distributed configurations for different use-cases
configs = {
"exposed_ports": "exposed-ports.json",
"is-master": "is-master.json",
"is-not-master": "is-not-master.json",
"network-host": "network-host.json",
"no-public-ips": "no-public-ips.json",
}


@pytest.mark.parametrize("use_distributed_config", configs.values(), indirect=True)
def test_parsing_basic_values(use_distributed_config):
assert valohai.distributed.is_distributed_task()
assert valohai.distributed.group_name.startswith("task-")
assert valohai.distributed.member_id in ["0", "1", "2"]
assert valohai.distributed.rank in [0, 1, 2]
assert isinstance(valohai.distributed.required_count, int)
assert isinstance(valohai.distributed.me(), Member)
assert len(valohai.distributed.members()) > 0
for member in valohai.distributed.members():
assert isinstance(member, Member)
assert isinstance(member.rank, int)
assert isinstance(member.exposed_ports, dict)
assert all(
isinstance(k, str) and isinstance(v, str)
for k, v in member.exposed_ports.items()
)
assert all(isinstance(li, str) for li in member.local_ips)
assert all(isinstance(pi, str) for pi in member.public_ips)


@pytest.mark.parametrize(
"use_distributed_config", ["I do not exist.json"], indirect=True
)
def test_using_missing_file(use_distributed_config):
assert not valohai.distributed.is_distributed_task()
with pytest.raises(FileNotFoundError):
assert valohai.distributed.group_name


@pytest.mark.parametrize("use_distributed_config", ["malformed.json"], indirect=True)
def test_using_malformed_file(use_distributed_config):
assert not valohai.distributed.is_distributed_task()
with pytest.raises(json.decoder.JSONDecodeError):
assert valohai.distributed.group_name


@pytest.mark.parametrize("use_distributed_config", configs.values(), indirect=True)
def test_getting_member_by_id(use_distributed_config):
expected_self = valohai.distributed.member(valohai.distributed.me().member_id)
assert expected_self.member_id == valohai.distributed.me().member_id
assert expected_self.identity == valohai.distributed.me().identity
assert expected_self.job_id == valohai.distributed.me().job_id


@pytest.mark.parametrize(
"use_distributed_config", [configs["is-master"]], indirect=True
)
def test_unable_to_find_member_by_id(use_distributed_config):
with pytest.raises(Exception) as e:
valohai.distributed.member("1234")
assert "no member" in str(e).lower()


@pytest.mark.parametrize(
"use_distributed_config", [configs["is-master"]], indirect=True
)
def test_checking_master_as_master(use_distributed_config):
master = valohai.distributed.master()
assert valohai.distributed.me().member_id == master.member_id
assert valohai.distributed.member_id == master.member_id
assert master.is_master
assert valohai.distributed.me().is_master


@pytest.mark.parametrize(
"use_distributed_config", [configs["is-not-master"]], indirect=True
)
def test_checking_master_as_non_master(use_distributed_config):
master = valohai.distributed.master()
assert valohai.distributed.me().member_id != master.member_id
assert valohai.distributed.member_id != master.member_id
assert master.is_master
assert not valohai.distributed.me().is_master


@pytest.mark.parametrize("use_distributed_config", configs.values(), indirect=True)
def test_getting_master_primary_local_ip(use_distributed_config):
assert valohai.distributed.master().primary_local_ip


@pytest.mark.parametrize(
"use_distributed_config", [configs["network-host"]], indirect=True
)
def test_getting_master_primary_public_ip(use_distributed_config):
assert valohai.distributed.master().primary_public_ip


@pytest.mark.parametrize(
"use_distributed_config", [configs["no-public-ips"]], indirect=True
)
def test_failing_to_get_master_primary_public_ip(use_distributed_config):
with pytest.raises(Exception) as e:
assert valohai.distributed.master().primary_public_ip
assert "no public ips" in str(e).lower()


@pytest.mark.parametrize(
"ids_and_ranking",
[
(["0", "1", "2"], {"0": 0, "1": 1, "2": 2}),
(["2", "0", "1"], {"0": 0, "1": 1, "2": 2}), # integers out-of-order is fine
(
["30", "100", "2000"],
{"30": 0, "100": 1, "2000": 2},
), # integers don't map 1:1 to ranking
(
["abc", "ghj", "def"],
{"abc": 0, "def": 1, "ghj": 2},
), # member ids are non-integer
(["10", "2", "x"], {"10": 0, "2": 1, "x": 2}), # mixed type will string sort
],
)
def test_ranking_member_ids(ids_and_ranking):
member_ids, ranking = ids_and_ranking
assert compute_member_id_ranks(member_ids) == ranking
Loading