Skip to content

Commit

Permalink
Merge pull request #110 from getindata/release-0.6.0
Browse files Browse the repository at this point in the history
Release 0.6.0
  • Loading branch information
szczeles authored Feb 18, 2022
2 parents 75f2090 + fbfbe89 commit 7a8090a
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 221 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## [Unreleased]

## [0.6.0] - 2022-02-18

- Kedro pipeline name is now added into Kubeflow pipeline name during upload
- Project hook that injected environmental variables values into all the configuration files is dropped, with backward compatibility to support these in `kubeflow.yaml`
- Added missing on-exit-handler for `node_merge_strategy: full`
- Handle `KEDRO_ENV` enviroment variable

## [0.5.1] - 2022-01-28

- Possibility to run custom Kedro pipeline as on-exit-handler
Expand Down Expand Up @@ -112,7 +119,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.5.1...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.0...HEAD

[0.6.0]: https://github.com/getindata/kedro-kubeflow/compare/0.5.1...0.6.0

[0.5.1]: https://github.com/getindata/kedro-kubeflow/compare/0.5.0...0.5.1

Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.5.1"
version = "0.6.0"
9 changes: 7 additions & 2 deletions kedro_kubeflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ def commands():
name="kubeflow", context_settings=dict(help_option_names=["-h", "--help"])
)
@click.option(
"-e", "--env", "env", type=str, default="local", help="Environment to use."
"-e",
"--env",
"env",
type=str,
default=lambda: os.environ.get("KEDRO_ENV", "local"),
help="Environment to use.",
)
@click.pass_obj
@click.pass_context
Expand Down Expand Up @@ -165,7 +170,7 @@ def upload_pipeline(ctx, image, pipeline) -> None:
config = context_helper.config.run_config

context_helper.kfp_client.upload(
pipeline=pipeline,
pipeline_name=pipeline,
image=image if image else config.image,
image_pull_policy=config.image_pull_policy,
)
Expand Down
31 changes: 30 additions & 1 deletion kedro_kubeflow/context_helper.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
import os
from functools import lru_cache
from pathlib import Path
from typing import Dict, Iterable

from kedro import __version__ as kedro_version
from kedro.config import TemplatedConfigLoader
from semver import VersionInfo

from .config import PluginConfig


class EnvTemplatedConfigLoader(TemplatedConfigLoader):
"""Config loader that can substitute $(commit_id) and $(branch_name)
placeholders with information taken from env variables."""

VAR_PREFIX = "KEDRO_CONFIG_"
# defaults provided so default variables ${commit_id|dirty} work for some entries
ENV_DEFAULTS = {"commit_id": None, "branch_name": None}

def __init__(self, conf_paths: Iterable[str]):
super().__init__(conf_paths, globals_dict=self.read_env())

def read_env(self) -> Dict:
config = EnvTemplatedConfigLoader.ENV_DEFAULTS.copy()
overrides = dict(
[
(k.replace(EnvTemplatedConfigLoader.VAR_PREFIX, "").lower(), v)
for k, v in os.environ.copy().items()
if k.startswith(EnvTemplatedConfigLoader.VAR_PREFIX)
]
)
config.update(**overrides)
return config


class ContextHelper(object):

CONFIG_FILE_PATTERN = "kubeflow*"
Expand All @@ -33,7 +60,9 @@ def context(self):
@property
@lru_cache()
def config(self) -> PluginConfig:
raw = self.context.config_loader.get(self.CONFIG_FILE_PATTERN)
raw = EnvTemplatedConfigLoader(
self.context.config_loader.conf_paths
).get(self.CONFIG_FILE_PATTERN)
return PluginConfig(raw)

@property
Expand Down
10 changes: 9 additions & 1 deletion kedro_kubeflow/generators/one_pod_pipeline_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
create_arguments_from_parameters,
create_command_using_params_dumper,
create_container_environment,
create_pipeline_exit_handler,
maybe_add_params,
)

Expand All @@ -29,7 +30,14 @@ def convert_kedro_pipeline_to_kfp() -> None:
dsl.get_pipeline_conf().set_ttl_seconds_after_finished(
self.run_config.ttl
)
self._build_kfp_op(pipeline, image, image_pull_policy)
with create_pipeline_exit_handler(
pipeline,
image,
image_pull_policy,
self.run_config,
self.context,
):
self._build_kfp_op(pipeline, image, image_pull_policy)

return convert_kedro_pipeline_to_kfp

Expand Down
84 changes: 15 additions & 69 deletions kedro_kubeflow/generators/pod_per_node_pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import contextlib
import logging
from typing import Dict, Set

import kubernetes.client as k8s
from kedro.pipeline.node import Node
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name

from ..utils import clean_name, is_mlflow_enabled
from .utils import (
create_arguments_from_parameters,
create_command_using_params_dumper,
create_container_environment,
create_pipeline_exit_handler,
customize_op,
maybe_add_params,
)

Expand Down Expand Up @@ -47,8 +47,12 @@ def convert_kedro_pipeline_to_kfp() -> None:
node_dependencies = self.context.pipelines.get(
pipeline
).node_dependencies
with self._create_pipeline_exit_handler(
pipeline, image, image_pull_policy
with create_pipeline_exit_handler(
pipeline,
image,
image_pull_policy,
self.run_config,
self.context,
):
kfp_ops = self._build_kfp_ops(
pipeline, node_dependencies, image, image_pull_policy
Expand All @@ -61,59 +65,6 @@ def convert_kedro_pipeline_to_kfp() -> None:

return convert_kedro_pipeline_to_kfp

def _create_pipeline_exit_handler(
self, pipeline, image, image_pull_policy
):
enable_volume_cleaning = (
self.run_config.volume is not None
and not self.run_config.volume.keep
)

if not enable_volume_cleaning and not self.run_config.on_exit_pipeline:
return contextlib.nullcontext()

commands = []

if enable_volume_cleaning:
commands.append(
"kedro kubeflow delete-pipeline-volume "
"{{workflow.name}}-"
+ sanitize_k8s_name(f"{pipeline}-data-volume")
)

if self.run_config.on_exit_pipeline:
commands.append(
"kedro run "
"--config config.yaml "
f"--env {self.context.env} "
f"--pipeline {self.run_config.on_exit_pipeline}"
)

exit_container_op = dsl.ContainerOp(
name="on-exit",
image=image,
command=create_command_using_params_dumper(";".join(commands)),
arguments=create_arguments_from_parameters(
self.context.params.keys()
)
+ [
"status",
"{{workflow.status}}",
"failures",
"{{workflow.failures}}",
],
container_kwargs={"env": create_container_environment()},
)

if self.run_config.max_cache_staleness not in [None, ""]:
exit_container_op.execution_options.caching_strategy.max_cache_staleness = (
self.run_config.max_cache_staleness
)

return dsl.ExitHandler(
self._customize_op(exit_container_op, image_pull_policy)
)

def _build_kfp_ops(
self,
pipeline,
Expand All @@ -135,7 +86,7 @@ def _build_kfp_ops(
nodes_env = create_container_environment()

if is_mlflow_enabled():
kfp_ops["mlflow-start-run"] = self._customize_op(
kfp_ops["mlflow-start-run"] = customize_op(
dsl.ContainerOp(
name="mlflow-start-run",
image=image,
Expand All @@ -147,10 +98,11 @@ def _build_kfp_ops(
"mlflow-start",
dsl.RUN_ID_PLACEHOLDER,
],
container_kwargs={"env": nodes_env},
container_kwargs={"env": nodes_env.copy()},
file_outputs={"mlflow_run_id": "/tmp/mlflow_run_id"},
),
image_pull_policy,
self.run_config,
)

nodes_env.append(
Expand All @@ -169,7 +121,7 @@ def _build_kfp_ops(
requests=self.run_config.resources.get_for(node.name),
)

kfp_ops[node.name] = self._customize_op(
kfp_ops[node.name] = customize_op(
dsl.ContainerOp(
name=name,
image=image,
Expand All @@ -196,18 +148,11 @@ def _build_kfp_ops(
},
),
image_pull_policy,
self.run_config,
)

return kfp_ops

def _customize_op(self, op, image_pull_policy):
op.container.set_image_pull_policy(image_pull_policy)
if self.run_config.volume and self.run_config.volume.owner is not None:
op.container.set_security_context(
k8s.V1SecurityContext(run_as_user=self.run_config.volume.owner)
)
return op

def _setup_volumes(self, volume_name, image, image_pull_policy):
vop = dsl.VolumeOp(
name="data-volume-create",
Expand All @@ -226,7 +171,7 @@ def _setup_volumes(self, volume_name, image, image_pull_policy):
if self.run_config.volume.skip_init:
return {"/home/kedro/data": vop.volume}
else:
volume_init = self._customize_op(
volume_init = customize_op(
dsl.ContainerOp(
name="data-volume-init",
image=image,
Expand All @@ -245,5 +190,6 @@ def _setup_volumes(self, volume_name, image, image_pull_policy):
pvolumes={"/home/kedro/datavolume": vop.volume},
),
image_pull_policy,
self.run_config,
)
return {"/home/kedro/data": volume_init.pvolume}
61 changes: 61 additions & 0 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import contextlib
import itertools
import os
from functools import wraps
from inspect import Parameter, signature

import kubernetes.client as k8s
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name

from ..auth import IAP_CLIENT_ID

Expand Down Expand Up @@ -58,3 +60,62 @@ def create_arguments_from_parameters(paramter_names):
*[[param, dsl.PipelineParam(param)] for param in paramter_names]
)
)


def create_pipeline_exit_handler(
pipeline, image, image_pull_policy, run_config, context
):
enable_volume_cleaning = (
run_config.volume is not None and not run_config.volume.keep
)

if not enable_volume_cleaning and not run_config.on_exit_pipeline:
return contextlib.nullcontext()

commands = []

if enable_volume_cleaning:
commands.append(
"kedro kubeflow delete-pipeline-volume "
"{{workflow.name}}-" + sanitize_k8s_name(f"{pipeline}-data-volume")
)

if run_config.on_exit_pipeline:
commands.append(
"kedro run "
"--config config.yaml "
f"--env {context.env} "
f"--pipeline {run_config.on_exit_pipeline}"
)

exit_container_op = dsl.ContainerOp(
name="on-exit",
image=image,
command=create_command_using_params_dumper(";".join(commands)),
arguments=create_arguments_from_parameters(context.params.keys())
+ [
"status",
"{{workflow.status}}",
"failures",
"{{workflow.failures}}",
],
container_kwargs={"env": create_container_environment()},
)

if run_config.max_cache_staleness not in [None, ""]:
exit_container_op.execution_options.caching_strategy.max_cache_staleness = (
run_config.max_cache_staleness
)

return dsl.ExitHandler(
customize_op(exit_container_op, image_pull_policy, run_config)
)


def customize_op(op, image_pull_policy, run_config):
op.container.set_image_pull_policy(image_pull_policy)
if run_config.volume and run_config.volume.owner is not None:
op.container.set_security_context(
k8s.V1SecurityContext(run_as_user=run_config.volume.owner)
)
return op
Loading

0 comments on commit 7a8090a

Please sign in to comment.