Skip to content

Commit

Permalink
Merge pull request #82 from getindata/release-0.4.6
Browse files Browse the repository at this point in the history
Release 0.4.6
  • Loading branch information
Mariusz Strzelecki authored Dec 23, 2021
2 parents cfbc9cf + f3c26a3 commit 35b7656
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 20 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.4.6] - 2021-12-23

- Passing all `KEDRO_CONFIG_` environment variables to the pipeline nodes

## [0.4.5] - 2021-12-22

- Add `node_merge_strategy` alongside with `full` option to run a whole pipeline in one pod
Expand Down Expand Up @@ -92,7 +96,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.5...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.6...HEAD

[0.4.6]: https://github.com/getindata/kedro-kubeflow/compare/0.4.5...0.4.6

[0.4.5]: https://github.com/getindata/kedro-kubeflow/compare/0.4.4...0.4.5

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.5"
version = "0.4.6"
14 changes: 6 additions & 8 deletions kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import os

import kubernetes.client as k8s
from kfp import dsl

from ..auth import IAP_CLIENT_ID
from ..utils import clean_name
from .utils import create_params, maybe_add_params
from .utils import (
create_container_environment,
create_params,
maybe_add_params,
)


class OnePodPipelineGenerator(object):
Expand Down Expand Up @@ -37,11 +39,7 @@ def _build_kfp_op(
image_pull_policy,
) -> dsl.ContainerOp:
kwargs = {
"env": [
k8s.V1EnvVar(
name=IAP_CLIENT_ID, value=os.environ.get(IAP_CLIENT_ID, "")
)
],
"env": create_container_environment(),
"image_pull_policy": image_pull_policy,
}
default_resources = self.run_config.resources.get_for("__default__")
Expand Down
15 changes: 7 additions & 8 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import contextlib
import logging
import os
from typing import Dict, Set

import kubernetes.client as k8s
from kedro.pipeline.node import Node
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name

from ..auth import IAP_CLIENT_ID
from ..utils import clean_name, is_mlflow_enabled
from .utils import create_params, maybe_add_params
from .utils import (
create_container_environment,
create_params,
maybe_add_params,
)


class PodPerNodePipelineGenerator(object):
Expand Down Expand Up @@ -106,10 +108,7 @@ def _build_kfp_ops(
else {}
)

iap_env_var = k8s.V1EnvVar(
name=IAP_CLIENT_ID, value=os.environ.get(IAP_CLIENT_ID, "")
)
nodes_env = [iap_env_var]
nodes_env = create_container_environment()

if is_mlflow_enabled():
kfp_ops["mlflow-start-run"] = self._customize_op(
Expand All @@ -124,7 +123,7 @@ def _build_kfp_ops(
"mlflow-start",
dsl.RUN_ID_PLACEHOLDER,
],
container_kwargs={"env": [iap_env_var]},
container_kwargs={"env": nodes_env},
file_outputs={"mlflow_run_id": "/tmp/mlflow_run_id"},
),
image_pull_policy,
Expand Down
17 changes: 17 additions & 0 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import os
from functools import wraps
from inspect import Parameter, signature
from typing import Iterable

import kubernetes.client as k8s
from kfp import dsl

from ..auth import IAP_CLIENT_ID


def maybe_add_params(kedro_parameters):
def decorator(f):
Expand All @@ -26,3 +30,16 @@ def create_params(param_keys: Iterable[str]) -> str:
return ",".join(
[f"{param}:{dsl.PipelineParam(param)}" for param in param_keys]
)


def create_container_environment():
env_vars = [
k8s.V1EnvVar(
name=IAP_CLIENT_ID, value=os.environ.get(IAP_CLIENT_ID, "")
)
]
for key in os.environ.keys():
if key.startswith("KEDRO_CONFIG_"):
env_vars.append(k8s.V1EnvVar(name=key, value=os.environ[key]))

return env_vars
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.5
current_version = 0.4.6

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

setup(
name="kedro-kubeflow",
version="0.4.5",
version="0.4.6",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
27 changes: 27 additions & 0 deletions tests/test_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test generator"""

import os
import unittest
from inspect import signature
from unittest.mock import MagicMock
Expand Down Expand Up @@ -325,6 +326,32 @@ def test_should_skip_volume_removal_if_requested(self):
# then
assert "schedule-volume-termination" not in dsl_pipeline.ops

def test_should_pass_kedro_config_env_to_nodes(self):
# given
self.create_generator()
os.environ["KEDRO_CONFIG_MY_KEY"] = "42"
os.environ["SOME_VALUE"] = "100"

try:
# when
with kfp.dsl.Pipeline(None) as dsl_pipeline:
self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Always"
)()

# then
for node_name in ["node1", "node2"]:
env_values = {
e.name: e.value
for e in dsl_pipeline.ops[node_name].container.env
}
assert "KEDRO_CONFIG_MY_KEY" in env_values
assert env_values["KEDRO_CONFIG_MY_KEY"] == "42"
assert "SOME_VALUE" not in env_values
finally:
del os.environ["KEDRO_CONFIG_MY_KEY"]
del os.environ["SOME_VALUE"]

def create_generator(self, config={}, params={}, catalog={}):
project_name = "my-awesome-project"
config_loader = MagicMock()
Expand Down
26 changes: 26 additions & 0 deletions tests/test_one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test generator"""

import os
import unittest
from inspect import signature
from unittest.mock import MagicMock
Expand Down Expand Up @@ -152,6 +153,31 @@ def test_should_skip_artifact_registration_if_requested(self):
# then
assert dsl_pipeline.ops["pipeline"].file_outputs == {}

def test_should_pass_kedro_config_env_to_nodes(self):
# given
self.create_generator(params={"param1": 0.3, "param2": 42})
os.environ["KEDRO_CONFIG_MY_KEY"] = "42"
os.environ["SOME_VALUE"] = "100"

try:
# when
with kfp.dsl.Pipeline(None) as dsl_pipeline:
self.generator_under_test.generate_pipeline(
"pipeline", "unittest-image", "Always"
)()

# then
env_values = {
e.name: e.value
for e in dsl_pipeline.ops["pipeline"].container.env
}
assert "KEDRO_CONFIG_MY_KEY" in env_values
assert env_values["KEDRO_CONFIG_MY_KEY"] == "42"
assert "SOME_VALUE" not in env_values
finally:
del os.environ["KEDRO_CONFIG_MY_KEY"]
del os.environ["SOME_VALUE"]

def create_generator(self, config=None, params=None, catalog=None):
if config is None:
config = {}
Expand Down

0 comments on commit 35b7656

Please sign in to comment.