Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Squashed pipeline generator #29

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions kedro_vertexai/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

from .config import PluginConfig
from .data_models import PipelineResult, PipelineStatus
from .generator import PipelineGenerator
from .generator import ( # noqa
DefaultPipelineGenerator,
SquashedPipelineGenerator,
)


class VertexAIPipelinesClient:
Expand All @@ -41,7 +44,7 @@ def __init__(self, config: PluginConfig, project_name, context):
)
self.run_config = config.run_config
self.run_name = self._generate_run_name(config)
self.generator = PipelineGenerator(
self.generator = globals()[self.run_config.generator_class](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do that...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:smutnazaba:

config, project_name, context, self.run_name
)

Expand Down
20 changes: 20 additions & 0 deletions kedro_vertexai/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from enum import Enum
from typing import Dict, List, Optional

from pydantic import BaseModel
Expand Down Expand Up @@ -50,6 +51,18 @@
# pipeline status. Used to send notifications or raise the alerts
# on_exit_pipeline: notify_via_slack

# Class used for pipeline generation. Possible options:
# DefaultPipelineGenerator -> creates one Vertex AI component for every
# Kedro node (default)
# SquashedPipelineGenerator -> creates one Vertex AI component for the entire
# Kedo pipeline, useful for libraries that support
# lazy evaluation (e.g. SparkML)
#generator_class: DefaultPipelineGenerator

# Optional environment parameters passed to the kedro invocations
# environment:
# NAME: value

# Optional section allowing adjustment of the resources, reservations and limits
# for the nodes. When not provided they're set to 500m cpu and 1024Mi memory.
# If you don't want to specify pipeline resources set both to None in __default__.
Expand Down Expand Up @@ -87,6 +100,11 @@ class NetworkConfig(BaseModel):
host_aliases: Optional[List[HostAliasConfig]] = []


class GeneratorClassEnum(str, Enum):
DEFAULT = "DefaultPipelineGenerator"
SQUASHED = "SquashedPipelineGenerator"


class RunConfig(BaseModel):
image: str
image_pull_policy: Optional[str] = "IfNotPresent"
Expand All @@ -100,6 +118,8 @@ class RunConfig(BaseModel):
resources: Optional[Dict[str, ResourcesConfig]] = dict(
__default__=ResourcesConfig(cpu="500m", memory="1024Mi")
)
generator_class: GeneratorClassEnum = GeneratorClassEnum.DEFAULT
environment: Dict[str, str] = {}

def resources_for(self, node):
if node in self.resources.keys():
Expand Down
102 changes: 92 additions & 10 deletions kedro_vertexai/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@


class PipelineGenerator:
def generate_pipeline(self, pipeline, image, image_pull_policy, token):
"""
This method return @dsl.pipeline annotated function that contains
dynamically generated pipelines.
:param pipeline: kedro pipeline
:param image: full docker image name
:param image_pull_policy: docker pull policy
:param token: mlflow authentication token
:return: kfp pipeline function
"""
raise NotImplementedError()


class DefaultPipelineGenerator(PipelineGenerator):
"""
Generator creates Vertex AI pipeline function that operatoes with Vertex AI specific
opertator spec.
Expand All @@ -48,16 +62,6 @@ def get_pipeline_name(self):
return self.project_name.lower().replace(" ", "-")

def generate_pipeline(self, pipeline, image, image_pull_policy, token):
"""
This method return @dsl.pipeline annotated function that contains
dynamically generated pipelines.
:param pipeline: kedro pipeline
:param image: full docker image name
:param image_pull_policy: docker pull policy
:param token: mlflow authentication token
:return: kfp pipeline function
"""

def set_dependencies(node, dependencies, kfp_ops):
for dependency in dependencies:
name = clean_name(node.name)
Expand Down Expand Up @@ -220,3 +224,81 @@ def _configure_resources(self, name: str, operator):
if "memory" in resources and resources["memory"]:
operator.set_memory_limit(resources["memory"])
operator.set_memory_request(resources["memory"])


class SquashedPipelineGenerator(DefaultPipelineGenerator):
def generate_pipeline(self, pipeline, image, image_pull_policy, token):
@dsl.pipeline(
name=self.get_pipeline_name(),
description=self.run_config.description,
)
def convert_kedro_pipeline_to_kfp() -> None:
kfp_op = self._build_kfp_pipeline_op(image, pipeline, token)
kfp_op.container.set_image_pull_policy(image_pull_policy)

return convert_kedro_pipeline_to_kfp

def _build_kfp_pipeline_op(
self,
image,
pipeline,
tracking_token=None,
) -> dsl.ContainerOp:
inputs = []
command = []
env = self.run_config.environment
should_add_params = len(self.context.params) > 0
name = clean_name(pipeline)

def _decorate_command(cmd, env):
return (
" ".join(f"{key}={value}" for key, value in env.items())
+ " "
+ cmd
)

if self.run_config.network.host_aliases:
command.append(self._generate_hosts_file())

if should_add_params:
command.append(self._generate_params_command(should_add_params))

if is_mlflow_enabled():
inputs.append(InputSpec("mlflow_tracking_token", "String"))
env[
"MLFLOW_TRACKING_TOKEN"
] = "{{$.inputs.parameters['mlflow_tracking_token']}}"
command.append(
_decorate_command(
f"kedro vertexai -e {self.context.env} mlflow-start --output /tmp/mlflow.run.id {self.run_name} &&",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this if we're running in a single node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! It has one nice feature of setting the tag with vertex run id, so I decided to keep it. It could be covered by some custom hook in the project, but it requires custom development.
image

env,
)
)
env["MLFLOW_RUN_ID"] = "`cat /tmp/mlflow.run.id`"

if CONFIG_HOOK_DISABLED:
env["KEDRO_VERTEXAI_DISABLE_CONFIG_HOOK"] = "true"

command.append(
_decorate_command(
f"kedro run -e {self.context.env} --pipeline {pipeline}"
+ (" --config config.yaml" if should_add_params else ""),
env,
)
)

spec = ComponentSpec(
name=name,
inputs=inputs,
outputs=[],
implementation=ContainerImplementation(
container=ContainerSpec(
image=image,
command=["/bin/bash", "-c"],
args=[" ".join(command)],
)
),
)
return self._create_kedro_op(
name, spec, [tracking_token] if is_mlflow_enabled() else []
)