Skip to content

Commit

Permalink
Merge pull request #76 from getindata/release-0.4.4
Browse files Browse the repository at this point in the history
Release 0.4.4
  • Loading branch information
Mariusz Strzelecki authored Sep 29, 2021
2 parents 0fb943a + c3e66fe commit f1694ad
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 20 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.4.4] - 2021-09-29

- Custom networking setup for Vertex AI pipelines run

## [0.4.3] - 2021-09-27

- Kedro environment used by `kedro kubeflow` invocation is passed to the steps
Expand Down Expand Up @@ -84,7 +88,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.3...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.4...HEAD

[0.4.4]: https://github.com/getindata/kedro-kubeflow/compare/0.4.3...0.4.4

[0.4.3]: https://github.com/getindata/kedro-kubeflow/compare/0.4.2...0.4.3

Expand Down
13 changes: 13 additions & 0 deletions docs/source/03_getting_started/02_gcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ run_config:
root: vertex-ai-pipelines-accessible-gcs-bucket/pipelines-specific-path
```
If the pipeline requires access to services that are not exposed to public internet, you need to configure [VPC peering between Vertex internal network and VPC that hosts the internal service](https://cloud.google.com/vertex-ai/docs/general/vpc-peering) and then set the VPC identifier in the configuration. Optionally, you can add custom host aliases:
```yaml
run_config:
vertex_ai_networking:
vpc: projects/12345/global/networks/name-of-vpc
host_aliases:
- ip: 10.10.10.10
hostnames: ['mlflow.internal']
- ip: 10.10.20.20
hostnames: ['featurestore.internal']
```
##### 2. Preparing environment variables
There're the following specific environment variables required for the pipeline to run correctly:
Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.3"
version = "0.4.4"
17 changes: 17 additions & 0 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ def __eq__(self, other):
return self._raw == other._raw


class VertexAiNetworkingConfig(Config):
@property
def vpc(self):
return self._get_or_default("vpc", None)

@property
def host_aliases(self):
aliases = self._get_or_default("host_aliases", [])
return {alias["ip"]: alias["hostnames"] for alias in aliases}


class VolumeConfig(Config):
@property
def storageclass(self):
Expand Down Expand Up @@ -214,6 +225,12 @@ def max_cache_staleness(self):
def ttl(self):
return int(self._get_or_default("ttl", 3600 * 24 * 7))

@property
def vertex_ai_networking(self):
return VertexAiNetworkingConfig(
self._get_or_default("vertex_ai_networking", {})
)

def _get_prefix(self):
return "run_config."

Expand Down
1 change: 1 addition & 0 deletions kedro_kubeflow/vertex_ai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def run_once(
pipeline_root=f"gs://{self.run_config.root}",
parameter_values={},
enable_caching=False,
network=self.run_config.vertex_ai_networking.vpc,
)
self.log.info("Run created %s", str(run))
return run
Expand Down
24 changes: 11 additions & 13 deletions kedro_kubeflow/vertex_ai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ def convert_kedro_pipeline_to_kfp() -> None:

return convert_kedro_pipeline_to_kfp

def _generate_hosts_file(self):
host_aliases = self.run_config.vertex_ai_networking.host_aliases
return " ".join(
f"echo {ip}\t{' '.join(hostnames)} >> /etc/hosts;"
for ip, hostnames in host_aliases.items()
)

def _create_data_volume_init_op(
self, kfp_ops: Dict[str, dsl.ContainerOp], image: str
):
Expand All @@ -97,6 +104,7 @@ def _create_data_volume_init_op(
def _create_mlflow_op(self, image, tracking_token) -> dsl.ContainerOp:
mlflow_command = " ".join(
[
self._generate_hosts_file(),
"mkdir --parents "
"`dirname {{$.outputs.parameters['output'].output_file}}`",
"&&",
Expand Down Expand Up @@ -182,6 +190,7 @@ def _build_kfp_ops(
)
node_command = " ".join(
[
self._generate_hosts_file(),
"rm -r /home/kedro/data"
"&&"
f"ln -s /gcs/{self._get_data_path()} /home/kedro/data"
Expand Down Expand Up @@ -244,22 +253,11 @@ def _get_data_path(self):
f"{self.run_config.experiment_name}/{self.run_config.run_name}/data"
)

def _get_mlruns_path(self):
return (
f"{self.run_config.root}/"
f"{self.run_config.experiment_name}/{self.run_config.run_name}/mlruns"
)

def _setup_volume_op(self, image):
command = " ".join(
[
f"mkdir --parents /gcs/{self._get_data_path()}",
"&&",
"cp",
"--verbose",
"-r",
"/home/kedro/data/*",
f"/gcs/{self._get_data_path()}",
f"mkdir --parents /gcs/{self._get_data_path()} &&",
f"cp -r /home/kedro/data/* /gcs/{self._get_data_path()}",
]
)
spec = ComponentSpec(
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.3
current_version = 0.4.4

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

setup(
name="kedro-kubeflow",
version="0.4.3",
version="0.4.4",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
29 changes: 27 additions & 2 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@
keep: True
"""

VERTEX_YAML = """
host: vertex-ai-pipelines
project_id: some-project
region: some-region
run_config:
vertex_ai_networking:
vpc: projects/some-project-id/global/networks/some-vpc-name
host_aliases:
- ip: 10.10.10.10
hostnames: ['mlflow.internal']
"""


class TestPluginConfig(unittest.TestCase):
def test_plugin_config(self):

cfg = PluginConfig(yaml.safe_load(CONFIG_YAML))

assert cfg.host == "https://example.com"
assert cfg.run_config.image == "gcr.io/project-image/test"
assert cfg.run_config.image_pull_policy == "Always"
Expand Down Expand Up @@ -98,3 +108,18 @@ def test_resources_default_and_node_specific(self):
def test_do_not_keep_volume_by_default(self):
cfg = PluginConfig({"run_config": {"volume": {}}})
assert cfg.run_config.volume.keep is False

def test_parse_vertex_ai_networking_config(self):
cfg = PluginConfig(yaml.safe_load(VERTEX_YAML))
assert (
cfg.run_config.vertex_ai_networking.vpc
== "projects/some-project-id/global/networks/some-vpc-name"
)
assert cfg.run_config.vertex_ai_networking.host_aliases == {
"10.10.10.10": ["mlflow.internal"]
}

def test_accept_default_vertex_ai_networking_config(self):
cfg = PluginConfig({"run_config": {}})
assert cfg.run_config.vertex_ai_networking.vpc is None
assert cfg.run_config.vertex_ai_networking.host_aliases == {}
8 changes: 7 additions & 1 deletion tests/test_vertex_ai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ def create_client(self, cloud_scheduler_client_mock):
{
"project_id": "PROJECT_ID",
"region": "REGION",
"run_config": {"image": "IMAGE", "root": "BUCKET/PREFIX"},
"run_config": {
"image": "IMAGE",
"root": "BUCKET/PREFIX",
"vertex_ai_networking": {"vpc": "my-vpc"},
},
}
)
return VertexAIPipelinesClient(config, MagicMock(), MagicMock())
Expand Down Expand Up @@ -65,6 +69,8 @@ def test_run_once(self):
)

assert run_mock == run
_, kwargs = ai_client.create_run_from_job_spec.call_args
assert kwargs["network"] == "my-vpc"

def test_should_list_pipelines(self):
with patch(
Expand Down
33 changes: 33 additions & 0 deletions tests/test_vertex_ai_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,39 @@ def test_should_add_env_and_pipeline_in_the_invocations(self):
in dsl_pipeline.ops["node1"].container.args[0]
)

def test_should_add_host_aliases_if_requested(self):
# given
self.create_generator(
config={
"vertex_ai_networking": {
"host_aliases": [
{
"ip": "10.10.10.10",
"hostnames": ["mlflow.internal", "mlflow.cloud"],
}
]
}
}
)
self.mock_mlflow(True)

# when
pipeline = self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN"
)
with kfp.dsl.Pipeline(None) as dsl_pipeline:
pipeline()

# then
hosts_entry_cmd = (
"echo 10.10.10.10\tmlflow.internal mlflow.cloud >> /etc/hosts;"
)
assert (
hosts_entry_cmd
in dsl_pipeline.ops["mlflow-start-run"].container.args[0]
)
assert hosts_entry_cmd in dsl_pipeline.ops["node1"].container.args[0]

def mock_mlflow(self, enabled=False):
def fakeimport(name, *args, **kw):
if not enabled and name == "mlflow":
Expand Down

0 comments on commit f1694ad

Please sign in to comment.