Skip to content

Commit

Permalink
Merge pull request #80 from getindata/release-0.4.5
Browse files Browse the repository at this point in the history
Release 0.4.5
  • Loading branch information
Mariusz Strzelecki authored Dec 22, 2021
2 parents f1694ad + a3879e9 commit cfbc9cf
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 66 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.4.5] - 2021-12-22

- Add `node_merge_strategy` alongside with `full` option to run a whole pipeline in one pod

## [0.4.4] - 2021-09-29

- Custom networking setup for Vertex AI pipelines run
Expand Down Expand Up @@ -88,7 +92,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.4...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.4.5...HEAD

[0.4.5]: https://github.com/getindata/kedro-kubeflow/compare/0.4.4...0.4.5

[0.4.4]: https://github.com/getindata/kedro-kubeflow/compare/0.4.3...0.4.4

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.4.4"
version = "0.4.5"
24 changes: 24 additions & 0 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@
# intermediate results in the MLMD
#store_kedro_outputs_as_kfp_artifacts: True
# Strategy used to generate Kubeflow pipeline nodes from Kedro nodes
# Available strategies:
# * none (default) - nodes in Kedro pipeline are mapped to separate nodes
# in Kubeflow pipelines. This strategy allows to inspect
# a whole processing graph in Kubeflow UI and override
# resources for each node (because they are run in separate pods)
# Although, performance may not be optimal due to potential
# sharing of intermediate datasets through disk.
# * full - nodes in Kedro pipeline are mapped to one node in Kubeflow pipelines.
# This strategy mitigate potential performance issues with `none` strategy
# but at the cost of degraded user experience within Kubeflow UI: a graph
# is collapsed to one node.
#node_merge_strategy: none
# Optional volume specification (only for non vertex-ai)
volume:
Expand Down Expand Up @@ -231,6 +245,16 @@ def vertex_ai_networking(self):
self._get_or_default("vertex_ai_networking", {})
)

@property
def node_merge_strategy(self):
strategy = str(self._get_or_default("node_merge_strategy", "none"))
if strategy not in ["none", "full"]:
raise ValueError(
f"Invalid {self._get_prefix()}node_merge_strategy: {strategy}"
)
else:
return strategy

def _get_prefix(self):
return "run_config."

Expand Down
21 changes: 4 additions & 17 deletions kedro_kubeflow/context_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ class ContextHelper(object):

CONFIG_FILE_PATTERN = "kubeflow*"

def __init__(
self,
metadata,
env,
):
def __init__(self, metadata, env):
self._metadata = metadata
self._env = env

Expand Down Expand Up @@ -59,21 +55,12 @@ def kfp_client(self):
)

@staticmethod
def init(
metadata,
env,
):
def init(metadata, env):
version = VersionInfo.parse(kedro_version)
if version.match(">=0.17.0"):
return ContextHelper(
metadata,
env,
)
return ContextHelper(metadata, env)
else:
return ContextHelper16(
metadata,
env,
)
return ContextHelper16(metadata, env)


class ContextHelper16(ContextHelper):
Expand Down
1 change: 1 addition & 0 deletions kedro_kubeflow/generators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""kedro_kubeflow.generators"""
78 changes: 78 additions & 0 deletions kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
import os

import kubernetes.client as k8s
from kfp import dsl

from ..auth import IAP_CLIENT_ID
from ..utils import clean_name
from .utils import create_params, maybe_add_params


class OnePodPipelineGenerator(object):
log = logging.getLogger(__name__)

def __init__(self, config, project_name, context):
self.project_name = project_name
self.context = context
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
self.run_config = config.run_config
self.catalog = context.config_loader.get("catalog*")

def generate_pipeline(self, pipeline, image, image_pull_policy):
@dsl.pipeline(self.project_name, self.run_config.description)
@maybe_add_params(self.context.params)
def convert_kedro_pipeline_to_kfp() -> None:
dsl.get_pipeline_conf().set_ttl_seconds_after_finished(
self.run_config.ttl
)
self._build_kfp_op(pipeline, image, image_pull_policy)

return convert_kedro_pipeline_to_kfp

def _build_kfp_op(
self,
pipeline,
image,
image_pull_policy,
) -> dsl.ContainerOp:
kwargs = {
"env": [
k8s.V1EnvVar(
name=IAP_CLIENT_ID, value=os.environ.get(IAP_CLIENT_ID, "")
)
],
"image_pull_policy": image_pull_policy,
}
default_resources = self.run_config.resources.get_for("__default__")
if default_resources:
kwargs["resources"] = k8s.V1ResourceRequirements(
limits=default_resources, requests=default_resources
)
container_op = dsl.ContainerOp(
name=clean_name(pipeline),
image=image,
command=["kedro"],
arguments=[
"run",
"--env",
self.context.env,
"--params",
create_params(self.context.params.keys()),
"--pipeline",
pipeline,
],
container_kwargs=kwargs,
file_outputs={
output: f"/home/kedro/{self.catalog[output]['filepath']}"
for output in self.catalog
if "filepath" in self.catalog[output]
and self.run_config.store_kedro_outputs_as_kfp_artifacts
},
)

container_op.execution_options.caching_strategy.max_cache_staleness = (
self.run_config.max_cache_staleness
)

return container_op
Original file line number Diff line number Diff line change
@@ -1,38 +1,19 @@
import contextlib
import logging
import os
from functools import wraps
from inspect import Parameter, signature
from typing import Dict, Set

import kubernetes.client as k8s
from kedro.pipeline.node import Node
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name

from .auth import IAP_CLIENT_ID
from .utils import clean_name, is_mlflow_enabled
from ..auth import IAP_CLIENT_ID
from ..utils import clean_name, is_mlflow_enabled
from .utils import create_params, maybe_add_params


def maybe_add_params(kedro_parameters):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
return f()

sig = signature(f)
new_params = (
Parameter(name, Parameter.KEYWORD_ONLY, default=default)
for name, default in kedro_parameters.items()
)
wrapper.__signature__ = sig.replace(parameters=new_params)
return wrapper

return decorator


class PipelineGenerator(object):

class PodPerNodePipelineGenerator(object):
log = logging.getLogger(__name__)

def __init__(self, config, project_name, context):
Expand Down Expand Up @@ -158,12 +139,6 @@ def _build_kfp_ops(

for node in node_dependencies:
name = clean_name(node.name)
params = ",".join(
[
f"{param}:{dsl.PipelineParam(param)}"
for param in self.context.params.keys()
]
)
kwargs = {"env": nodes_env}
if self.run_config.resources.is_set_for(node.name):
kwargs["resources"] = k8s.V1ResourceRequirements(
Expand All @@ -181,7 +156,7 @@ def _build_kfp_ops(
"--env",
self.context.env,
"--params",
params,
create_params(self.context.params.keys()),
"--pipeline",
pipeline,
"--node",
Expand Down
28 changes: 28 additions & 0 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from functools import wraps
from inspect import Parameter, signature
from typing import Iterable

from kfp import dsl


def maybe_add_params(kedro_parameters):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
return f()

sig = signature(f)
new_params = (
Parameter(name, Parameter.KEYWORD_ONLY, default=default)
for name, default in kedro_parameters.items()
)
wrapper.__signature__ = sig.replace(parameters=new_params)
return wrapper

return decorator


def create_params(param_keys: Iterable[str]) -> str:
return ",".join(
[f"{param}:{dsl.PipelineParam(param)}" for param in param_keys]
)
21 changes: 19 additions & 2 deletions kedro_kubeflow/kfpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
from kfp.compiler import Compiler
from tabulate import tabulate

from kedro_kubeflow.generators.one_pod_pipeline_generator import (
OnePodPipelineGenerator,
)
from kedro_kubeflow.generators.pod_per_node_pipeline_generator import (
PodPerNodePipelineGenerator,
)

from .auth import AuthHandler
from .generator import PipelineGenerator
from .utils import clean_name

WAIT_TIMEOUT = 24 * 60 * 60
Expand All @@ -35,7 +41,18 @@ def __init__(self, config, project_name, context):

self.project_name = project_name
self.pipeline_description = config.run_config.description
self.generator = PipelineGenerator(config, project_name, context)
if config.run_config.node_merge_strategy == "none":
self.generator = PodPerNodePipelineGenerator(
config, project_name, context
)
elif config.run_config.node_merge_strategy == "full":
self.generator = OnePodPipelineGenerator(
config, project_name, context
)
else:
raise Exception(
f"Invalid `node_merge_strategy`: {config.run_config.node_merge_strategy}"
)

def list_pipelines(self):
pipelines = self.client.list_pipelines(page_size=30).pipelines
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.4.4
current_version = 0.4.5

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

setup(
name="kedro-kubeflow",
version="0.4.4",
version="0.4.5",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
14 changes: 4 additions & 10 deletions tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,16 @@
from kedro.pipeline import Pipeline, node

from kedro_kubeflow.config import PluginConfig
from kedro_kubeflow.generator import PipelineGenerator
from kedro_kubeflow.generators.pod_per_node_pipeline_generator import (
PodPerNodePipelineGenerator,
)


def identity(input1: str):
return input1 # pragma: no cover


class TestGenerator(unittest.TestCase):
def create_pipeline(self):
return Pipeline(
[
node(identity, "A", "B", name="node1"),
node(identity, "B", "C", name="node2"),
]
)

def test_support_modification_of_pull_policy(self):
# given
self.create_generator()
Expand Down Expand Up @@ -352,7 +346,7 @@ def create_generator(self, config={}, params={}, catalog={}):
},
},
)
self.generator_under_test = PipelineGenerator(
self.generator_under_test = PodPerNodePipelineGenerator(
PluginConfig({"host": "http://unittest", "run_config": config}),
project_name,
context,
Expand Down
Loading

0 comments on commit cfbc9cf

Please sign in to comment.