From 69fbf32c87b61ec0276284ae59a98bab965c88d9 Mon Sep 17 00:00:00 2001 From: Mariusz Strzelecki Date: Fri, 8 Apr 2022 12:02:33 +0200 Subject: [PATCH 1/2] Squashed pipeline generator --- kedro_vertexai/client.py | 7 ++- kedro_vertexai/config.py | 15 ++++++ kedro_vertexai/generator.py | 103 ++++++++++++++++++++++++++++++++---- 3 files changed, 113 insertions(+), 12 deletions(-) diff --git a/kedro_vertexai/client.py b/kedro_vertexai/client.py index be2aade..c8f2bb7 100644 --- a/kedro_vertexai/client.py +++ b/kedro_vertexai/client.py @@ -20,7 +20,10 @@ from .config import PluginConfig from .data_models import PipelineResult, PipelineStatus -from .generator import PipelineGenerator +from .generator import ( # noqa + DefaultPipelineGenerator, + SquashedPipelineGenerator, +) class VertexAIPipelinesClient: @@ -41,7 +44,7 @@ def __init__(self, config: PluginConfig, project_name, context): ) self.run_config = config.run_config self.run_name = self._generate_run_name(config) - self.generator = PipelineGenerator( + self.generator = globals()[self.run_config.generator_class]( config, project_name, context, self.run_name ) diff --git a/kedro_vertexai/config.py b/kedro_vertexai/config.py index 5820a59..b893dc8 100644 --- a/kedro_vertexai/config.py +++ b/kedro_vertexai/config.py @@ -1,4 +1,5 @@ import os +from enum import Enum from typing import Dict, List, Optional from pydantic import BaseModel @@ -50,6 +51,14 @@ # pipeline status. Used to send notifications or raise the alerts # on_exit_pipeline: notify_via_slack + # Class used for pipeline generation. Possible options: + # DefaultPipelineGenerator -> creates one Vertex AI component for every + # Kedro node (default) + # SquashedPipelineGenerator -> creates one Vertex AI component for the entire + # Kedo pipeline, useful for libraries that support + # lazy evaluation (e.g. SparkML) + #generator_class: DefaultPipelineGenerator + # Optional section allowing adjustment of the resources, reservations and limits # for the nodes. When not provided they're set to 500m cpu and 1024Mi memory. # If you don't want to specify pipeline resources set both to None in __default__. @@ -87,6 +96,11 @@ class NetworkConfig(BaseModel): host_aliases: Optional[List[HostAliasConfig]] = [] +class GeneratorClassEnum(str, Enum): + DEFAULT = "DefaultPipelineGenerator" + SQUASHED = "SquashedPipelineGenerator" + + class RunConfig(BaseModel): image: str image_pull_policy: Optional[str] = "IfNotPresent" @@ -100,6 +114,7 @@ class RunConfig(BaseModel): resources: Optional[Dict[str, ResourcesConfig]] = dict( __default__=ResourcesConfig(cpu="500m", memory="1024Mi") ) + generator_class: GeneratorClassEnum = GeneratorClassEnum.DEFAULT def resources_for(self, node): if node in self.resources.keys(): diff --git a/kedro_vertexai/generator.py b/kedro_vertexai/generator.py index bb4df03..61a672c 100644 --- a/kedro_vertexai/generator.py +++ b/kedro_vertexai/generator.py @@ -26,6 +26,20 @@ class PipelineGenerator: + def generate_pipeline(self, pipeline, image, image_pull_policy, token): + """ + This method return @dsl.pipeline annotated function that contains + dynamically generated pipelines. + :param pipeline: kedro pipeline + :param image: full docker image name + :param image_pull_policy: docker pull policy + :param token: mlflow authentication token + :return: kfp pipeline function + """ + raise NotImplementedError() + + +class DefaultPipelineGenerator(PipelineGenerator): """ Generator creates Vertex AI pipeline function that operatoes with Vertex AI specific opertator spec. @@ -48,16 +62,6 @@ def get_pipeline_name(self): return self.project_name.lower().replace(" ", "-") def generate_pipeline(self, pipeline, image, image_pull_policy, token): - """ - This method return @dsl.pipeline annotated function that contains - dynamically generated pipelines. - :param pipeline: kedro pipeline - :param image: full docker image name - :param image_pull_policy: docker pull policy - :param token: mlflow authentication token - :return: kfp pipeline function - """ - def set_dependencies(node, dependencies, kfp_ops): for dependency in dependencies: name = clean_name(node.name) @@ -220,3 +224,82 @@ def _configure_resources(self, name: str, operator): if "memory" in resources and resources["memory"]: operator.set_memory_limit(resources["memory"]) operator.set_memory_request(resources["memory"]) + + +class SquashedPipelineGenerator(DefaultPipelineGenerator): + def generate_pipeline(self, pipeline, image, image_pull_policy, token): + @dsl.pipeline( + name=self.get_pipeline_name(), + description=self.run_config.description, + ) + def convert_kedro_pipeline_to_kfp() -> None: + kfp_op = self._build_kfp_pipeline_op(image, pipeline, token) + kfp_op.container.set_image_pull_policy(image_pull_policy) + + return convert_kedro_pipeline_to_kfp + + def _build_kfp_pipeline_op( + self, + image, + pipeline, + tracking_token=None, + ) -> dsl.ContainerOp: + + inputs = [] + command = [] + env = {} + should_add_params = len(self.context.params) > 0 + name = clean_name(pipeline) + + def _decorate_command(cmd, env): + return ( + " ".join(f"{key}={value}" for key, value in env.items()) + + " " + + cmd + ) + + if self.run_config.network.host_aliases: + command.append(self._generate_hosts_file()) + + if should_add_params: + command.append(self._generate_params_command(should_add_params)) + + if is_mlflow_enabled(): + inputs.append(InputSpec("mlflow_tracking_token", "String")) + env[ + "MLFLOW_TRACKING_TOKEN" + ] = "{{$.inputs.parameters['mlflow_tracking_token']}}" + command.append( + _decorate_command( + f"kedro vertexai -e {self.context.env} mlflow-start --output /tmp/mlflow.run.id {self.run_name} &&", + env, + ) + ) + env["MLFLOW_RUN_ID"] = "`cat /tmp/mlflow.run.id`" + + if CONFIG_HOOK_DISABLED: + env["KEDRO_VERTEXAI_DISABLE_CONFIG_HOOK"] = "true" + + command.append( + _decorate_command( + f"kedro run -e {self.context.env} --pipeline {pipeline}" + + (" --config config.yaml" if should_add_params else ""), + env, + ) + ) + + spec = ComponentSpec( + name=name, + inputs=inputs, + outputs=[], + implementation=ContainerImplementation( + container=ContainerSpec( + image=image, + command=["/bin/bash", "-c"], + args=[" ".join(command)], + ) + ), + ) + return self._create_kedro_op( + name, spec, [tracking_token] if is_mlflow_enabled() else [] + ) From 64b05388fef31455ed438c43eb3a5b809a3302a4 Mon Sep 17 00:00:00 2001 From: Mariusz Strzelecki Date: Fri, 8 Apr 2022 12:30:31 +0200 Subject: [PATCH 2/2] Handle custom env --- kedro_vertexai/config.py | 5 +++++ kedro_vertexai/generator.py | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/kedro_vertexai/config.py b/kedro_vertexai/config.py index b893dc8..9990861 100644 --- a/kedro_vertexai/config.py +++ b/kedro_vertexai/config.py @@ -59,6 +59,10 @@ # lazy evaluation (e.g. SparkML) #generator_class: DefaultPipelineGenerator + # Optional environment parameters passed to the kedro invocations + # environment: + # NAME: value + # Optional section allowing adjustment of the resources, reservations and limits # for the nodes. When not provided they're set to 500m cpu and 1024Mi memory. # If you don't want to specify pipeline resources set both to None in __default__. @@ -115,6 +119,7 @@ class RunConfig(BaseModel): __default__=ResourcesConfig(cpu="500m", memory="1024Mi") ) generator_class: GeneratorClassEnum = GeneratorClassEnum.DEFAULT + environment: Dict[str, str] = {} def resources_for(self, node): if node in self.resources.keys(): diff --git a/kedro_vertexai/generator.py b/kedro_vertexai/generator.py index 61a672c..701b517 100644 --- a/kedro_vertexai/generator.py +++ b/kedro_vertexai/generator.py @@ -244,10 +244,9 @@ def _build_kfp_pipeline_op( pipeline, tracking_token=None, ) -> dsl.ContainerOp: - inputs = [] command = [] - env = {} + env = self.run_config.environment should_add_params = len(self.context.params) > 0 name = clean_name(pipeline)