Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.4.4 #76

Merged
merged 3 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.4.4] - 2021-09-29

- Custom networking setup for Vertex AI pipelines run

## [0.4.3] - 2021-09-27

- Kedro environment used by `kedro kubeflow` invocation is passed to the steps
Expand Down Expand Up @@ -84,7 +88,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.3...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.4...HEAD

[0.4.4]: https://github.com/getindata/kedro-kubeflow/compare/0.4.3...0.4.4

[0.4.3]: https://github.com/getindata/kedro-kubeflow/compare/0.4.2...0.4.3

Expand Down
13 changes: 13 additions & 0 deletions docs/source/03_getting_started/02_gcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ run_config:
root: vertex-ai-pipelines-accessible-gcs-bucket/pipelines-specific-path
```

If the pipeline requires access to services that are not exposed to public internet, you need to configure [VPC peering between Vertex internal network and VPC that hosts the internal service](https://cloud.google.com/vertex-ai/docs/general/vpc-peering) and then set the VPC identifier in the configuration. Optionally, you can add custom host aliases:

```yaml
run_config:
vertex_ai_networking:
vpc: projects/12345/global/networks/name-of-vpc
host_aliases:
- ip: 10.10.10.10
hostnames: ['mlflow.internal']
- ip: 10.10.20.20
hostnames: ['featurestore.internal']
```

##### 2. Preparing environment variables

There're the following specific environment variables required for the pipeline to run correctly:
Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.3"
version = "0.4.4"
17 changes: 17 additions & 0 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ def __eq__(self, other):
return self._raw == other._raw


class VertexAiNetworkingConfig(Config):
@property
def vpc(self):
return self._get_or_default("vpc", None)

@property
def host_aliases(self):
aliases = self._get_or_default("host_aliases", [])
return {alias["ip"]: alias["hostnames"] for alias in aliases}


class VolumeConfig(Config):
@property
def storageclass(self):
Expand Down Expand Up @@ -214,6 +225,12 @@ def max_cache_staleness(self):
def ttl(self):
return int(self._get_or_default("ttl", 3600 * 24 * 7))

@property
def vertex_ai_networking(self):
return VertexAiNetworkingConfig(
self._get_or_default("vertex_ai_networking", {})
)

def _get_prefix(self):
return "run_config."

Expand Down
1 change: 1 addition & 0 deletions kedro_kubeflow/vertex_ai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def run_once(
pipeline_root=f"gs://{self.run_config.root}",
parameter_values={},
enable_caching=False,
network=self.run_config.vertex_ai_networking.vpc,
)
self.log.info("Run created %s", str(run))
return run
Expand Down
24 changes: 11 additions & 13 deletions kedro_kubeflow/vertex_ai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ def convert_kedro_pipeline_to_kfp() -> None:

return convert_kedro_pipeline_to_kfp

def _generate_hosts_file(self):
host_aliases = self.run_config.vertex_ai_networking.host_aliases
return " ".join(
f"echo {ip}\t{' '.join(hostnames)} >> /etc/hosts;"
for ip, hostnames in host_aliases.items()
)

def _create_data_volume_init_op(
self, kfp_ops: Dict[str, dsl.ContainerOp], image: str
):
Expand All @@ -97,6 +104,7 @@ def _create_data_volume_init_op(
def _create_mlflow_op(self, image, tracking_token) -> dsl.ContainerOp:
mlflow_command = " ".join(
[
self._generate_hosts_file(),
"mkdir --parents "
"`dirname {{$.outputs.parameters['output'].output_file}}`",
"&&",
Expand Down Expand Up @@ -182,6 +190,7 @@ def _build_kfp_ops(
)
node_command = " ".join(
[
self._generate_hosts_file(),
"rm -r /home/kedro/data"
"&&"
f"ln -s /gcs/{self._get_data_path()} /home/kedro/data"
Expand Down Expand Up @@ -244,22 +253,11 @@ def _get_data_path(self):
f"{self.run_config.experiment_name}/{self.run_config.run_name}/data"
)

def _get_mlruns_path(self):
return (
f"{self.run_config.root}/"
f"{self.run_config.experiment_name}/{self.run_config.run_name}/mlruns"
)

def _setup_volume_op(self, image):
command = " ".join(
[
f"mkdir --parents /gcs/{self._get_data_path()}",
"&&",
"cp",
"--verbose",
"-r",
"/home/kedro/data/*",
f"/gcs/{self._get_data_path()}",
f"mkdir --parents /gcs/{self._get_data_path()} &&",
f"cp -r /home/kedro/data/* /gcs/{self._get_data_path()}",
]
)
spec = ComponentSpec(
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.3
current_version = 0.4.4

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

setup(
name="kedro-kubeflow",
version="0.4.3",
version="0.4.4",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
29 changes: 27 additions & 2 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@
keep: True
"""

VERTEX_YAML = """
host: vertex-ai-pipelines
project_id: some-project
region: some-region
run_config:
vertex_ai_networking:
vpc: projects/some-project-id/global/networks/some-vpc-name
host_aliases:
- ip: 10.10.10.10
hostnames: ['mlflow.internal']
"""


class TestPluginConfig(unittest.TestCase):
def test_plugin_config(self):

cfg = PluginConfig(yaml.safe_load(CONFIG_YAML))

assert cfg.host == "https://example.com"
assert cfg.run_config.image == "gcr.io/project-image/test"
assert cfg.run_config.image_pull_policy == "Always"
Expand Down Expand Up @@ -98,3 +108,18 @@ def test_resources_default_and_node_specific(self):
def test_do_not_keep_volume_by_default(self):
cfg = PluginConfig({"run_config": {"volume": {}}})
assert cfg.run_config.volume.keep is False

def test_parse_vertex_ai_networking_config(self):
cfg = PluginConfig(yaml.safe_load(VERTEX_YAML))
assert (
cfg.run_config.vertex_ai_networking.vpc
== "projects/some-project-id/global/networks/some-vpc-name"
)
assert cfg.run_config.vertex_ai_networking.host_aliases == {
"10.10.10.10": ["mlflow.internal"]
}

def test_accept_default_vertex_ai_networking_config(self):
cfg = PluginConfig({"run_config": {}})
assert cfg.run_config.vertex_ai_networking.vpc is None
assert cfg.run_config.vertex_ai_networking.host_aliases == {}
8 changes: 7 additions & 1 deletion tests/test_vertex_ai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ def create_client(self, cloud_scheduler_client_mock):
{
"project_id": "PROJECT_ID",
"region": "REGION",
"run_config": {"image": "IMAGE", "root": "BUCKET/PREFIX"},
"run_config": {
"image": "IMAGE",
"root": "BUCKET/PREFIX",
"vertex_ai_networking": {"vpc": "my-vpc"},
},
}
)
return VertexAIPipelinesClient(config, MagicMock(), MagicMock())
Expand Down Expand Up @@ -65,6 +69,8 @@ def test_run_once(self):
)

assert run_mock == run
_, kwargs = ai_client.create_run_from_job_spec.call_args
assert kwargs["network"] == "my-vpc"

def test_should_list_pipelines(self):
with patch(
Expand Down
33 changes: 33 additions & 0 deletions tests/test_vertex_ai_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ def test_should_add_env_and_pipeline_in_the_invocations(self):
in dsl_pipeline.ops["node1"].container.args[0]
)

def test_should_add_host_aliases_if_requested(self):
# given
self.create_generator(
config={
"vertex_ai_networking": {
"host_aliases": [
{
"ip": "10.10.10.10",
"hostnames": ["mlflow.internal", "mlflow.cloud"],
}
]
}
}
)
self.mock_mlflow(True)

# when
pipeline = self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN"
)
with kfp.dsl.Pipeline(None) as dsl_pipeline:
pipeline()

# then
hosts_entry_cmd = (
"echo 10.10.10.10\tmlflow.internal mlflow.cloud >> /etc/hosts;"
)
assert (
hosts_entry_cmd
in dsl_pipeline.ops["mlflow-start-run"].container.args[0]
)
assert hosts_entry_cmd in dsl_pipeline.ops["node1"].container.args[0]

def mock_mlflow(self, enabled=False):
def fakeimport(name, *args, **kw):
if not enabled and name == "mlflow":
Expand Down