diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index d2f65114646..a137c65719c 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -3,7 +3,7 @@ ## Major Features and Improvements * Support parallelism setting in ParallelFor [\#8146](https://github.com/kubeflow/pipelines/pull/8146) * Support for Python v3.10 [\#8186](https://github.com/kubeflow/pipelines/pull/8186) -* Support pipeline as a component [\#8179](https://github.com/kubeflow/pipelines/pull/8179), [\#8204](https://github.com/kubeflow/pipelines/pull/8204) +* Support pipeline as a component [\#8179](https://github.com/kubeflow/pipelines/pull/8179), [\#8204](https://github.com/kubeflow/pipelines/pull/8204), [\#8209](https://github.com/kubeflow/pipelines/pull/8209) ## Breaking Changes diff --git a/sdk/python/kfp/compiler/_read_write_test_config.py b/sdk/python/kfp/compiler/_read_write_test_config.py index 45c2bd3d25d..61746b2c5b1 100644 --- a/sdk/python/kfp/compiler/_read_write_test_config.py +++ b/sdk/python/kfp/compiler/_read_write_test_config.py @@ -50,10 +50,11 @@ 'pipeline_in_pipeline', 'pipeline_in_pipeline_complex', 'pipeline_with_outputs', + 'pipeline_in_pipeline_loaded_from_yaml', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines', 'config': { - 'read': False, + 'read': True, 'write': True } }, diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 6b49f88809a..30e69c1d10b 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -17,18 +17,13 @@ https://docs.google.com/document/d/1PUDuSQ8vmeKSBloli53mp7GIvzekaY7sggg6ywy35Dk/ """ -import inspect -from typing import Any, Callable, Dict, Mapping, Optional, Union -import uuid +from typing import Any, Callable, Dict, Optional, Union from kfp.compiler import pipeline_spec_builder as builder from kfp.components import base_component -from kfp.components import component_factory from kfp.components import graph_component -from kfp.components import pipeline_channel -from kfp.components import pipeline_context +from kfp.components import yaml_component from kfp.components.types import type_utils -from kfp.pipeline_spec import pipeline_spec_pb2 class Compiler: @@ -72,39 +67,17 @@ def compile( """ with type_utils.TypeCheckManager(enable=type_check): - if isinstance(pipeline_func, graph_component.GraphComponent): - # Retrieve the pre-comppiled pipeline spec. - pipeline_spec = pipeline_func.component_spec.implementation.graph - - # Verify that pipeline_parameters contains only input names - # that match the pipeline inputs definition. - for input_name, input_value in (pipeline_parameters or - {}).items(): - if input_name in pipeline_spec.root.input_definitions.parameters: - pipeline_spec.root.input_definitions.parameters[ - input_name].default_value.CopyFrom( - builder.to_protobuf_value(input_value)) - elif input_name in pipeline_spec.root.input_definitions.artifacts: - raise NotImplementedError( - 'Default value for artifact input is not supported yet.' - ) - else: - raise ValueError( - 'Pipeline parameter {} does not match any known ' - 'pipeline input.'.format(input_name)) - - elif isinstance(pipeline_func, base_component.BaseComponent): - component_spec = builder.modify_component_spec_for_compile( - component_spec=pipeline_func.component_spec, - pipeline_name=pipeline_name, - pipeline_parameters_override=pipeline_parameters, - ) - pipeline_spec = component_spec.to_pipeline_spec() - else: + if not isinstance(pipeline_func, base_component.BaseComponent): raise ValueError( 'Unsupported pipeline_func type. Expected ' 'subclass of `base_component.BaseComponent` or ' '`Callable` constructed with @dsl.pipeline ' f'decorator. Got: {type(pipeline_func)}') + + pipeline_spec = builder.modify_pipeline_spec_with_override( + pipeline_spec=pipeline_func.pipeline_spec, + pipeline_name=pipeline_name, + pipeline_parameters=pipeline_parameters, + ) builder.write_pipeline_spec_to_file( pipeline_spec=pipeline_spec, package_path=package_path) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 3ff8ecdf32f..ddb8ad51f91 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -1027,40 +1027,46 @@ def populate_metrics_in_dag_outputs( sub_task_output = unique_output_name -def modify_component_spec_for_compile( - component_spec: structures.ComponentSpec, +def modify_pipeline_spec_with_override( + pipeline_spec: pipeline_spec_pb2.PipelineSpec, pipeline_name: Optional[str], - pipeline_parameters_override: Optional[Mapping[str, Any]], -) -> structures.ComponentSpec: - """Modifies the ComponentSpec using arguments passed to the - Compiler.compile method. + pipeline_parameters: Optional[Mapping[str, Any]], +) -> pipeline_spec_pb2.PipelineSpec: + """Modifies the PipelineSpec using arguments passed to the Compiler.compile + method. Args: - component_spec (structures.ComponentSpec): ComponentSpec to modify. + pipeline_spec (pipeline_spec_pb2.PipelineSpec): PipelineSpec to modify. pipeline_name (Optional[str]): Name of the pipeline. Overrides component name. - pipeline_parameters_override (Optional[Mapping[str, Any]]): Pipeline parameters. Overrides component input default values. + pipeline_parameters (Optional[Mapping[str, Any]]): Pipeline parameters. Overrides component input default values. + Returns: + The modified PipelineSpec copy. Raises: ValueError: If a parameter is passed to the compiler that is not a component input. - - Returns: - structures.ComponentSpec: The modified ComponentSpec. """ - pipeline_name = pipeline_name or utils.sanitize_component_name( - component_spec.name).replace(utils._COMPONENT_NAME_PREFIX, '') - - component_spec.name = pipeline_name - if component_spec.inputs is not None: - pipeline_parameters_override = pipeline_parameters_override or {} - for input_name in pipeline_parameters_override: - if input_name not in component_spec.inputs: - raise ValueError( - f'Parameter {input_name} does not match any known component parameters.' - ) - component_spec.inputs[ - input_name].default = pipeline_parameters_override[input_name] + pipeline_spec_new = pipeline_spec_pb2.PipelineSpec() + pipeline_spec_new.CopyFrom(pipeline_spec) + pipeline_spec = pipeline_spec_new + + if pipeline_name is not None: + pipeline_spec.pipeline_info.name = pipeline_name + + # Verify that pipeline_parameters contains only input names + # that match the pipeline inputs definition. + for input_name, input_value in (pipeline_parameters or {}).items(): + if input_name in pipeline_spec.root.input_definitions.parameters: + pipeline_spec.root.input_definitions.parameters[ + input_name].default_value.CopyFrom( + to_protobuf_value(input_value)) + elif input_name in pipeline_spec.root.input_definitions.artifacts: + raise NotImplementedError( + 'Default value for artifact input is not supported.') + else: + raise ValueError('Pipeline parameter {} does not match any known ' + 'pipeline input.'.format(input_name)) - return component_spec + return pipeline_spec def build_spec_by_group( @@ -1125,6 +1131,9 @@ def build_spec_by_group( ] is_parent_component_root = (group_component_spec == pipeline_spec.root) + # Track if component spec is addeded from merging pipeline spec. + component_spec_added = False + if isinstance(subgroup, pipeline_task.PipelineTask): subgroup_task_spec = build_task_spec_for_task( @@ -1138,30 +1147,33 @@ def build_spec_by_group( task=subgroup) task_name_to_component_spec[subgroup.name] = subgroup_component_spec - executor_label = subgroup_component_spec.executor_label + if subgroup_component_spec.executor_label: + executor_label = utils.make_name_unique_by_adding_index( + name=subgroup_component_spec.executor_label, + collection=list(deployment_config.executors.keys()), + delimiter='-') + subgroup_component_spec.executor_label = executor_label - if executor_label not in deployment_config.executors: - if subgroup.container_spec is not None: - subgroup_container_spec = build_container_spec_for_task( - task=subgroup) - deployment_config.executors[ - executor_label].container.CopyFrom( - subgroup_container_spec) - elif subgroup.importer_spec is not None: - subgroup_importer_spec = build_importer_spec_for_task( - task=subgroup) - deployment_config.executors[ - executor_label].importer.CopyFrom( - subgroup_importer_spec) - elif subgroup.pipeline_spec is not None: - merge_deployment_spec_and_component_spec( - main_pipeline_spec=pipeline_spec, - main_deployment_config=deployment_config, - sub_pipeline_spec=subgroup.pipeline_spec, - sub_pipeline_component_name=subgroup_component_name, - ) - else: - raise RuntimeError + if subgroup.container_spec is not None: + subgroup_container_spec = build_container_spec_for_task( + task=subgroup) + deployment_config.executors[executor_label].container.CopyFrom( + subgroup_container_spec) + elif subgroup.importer_spec is not None: + subgroup_importer_spec = build_importer_spec_for_task( + task=subgroup) + deployment_config.executors[executor_label].importer.CopyFrom( + subgroup_importer_spec) + elif subgroup.pipeline_spec is not None: + merge_deployment_spec_and_component_spec( + main_pipeline_spec=pipeline_spec, + main_deployment_config=deployment_config, + sub_pipeline_spec=subgroup.pipeline_spec, + sub_pipeline_component_name=subgroup_component_name, + ) + component_spec_added = True + else: + raise RuntimeError elif isinstance(subgroup, tasks_group.ParallelFor): # "Punch the hole", adding additional inputs (other than loop @@ -1258,8 +1270,14 @@ def build_spec_by_group( subgroup_task_spec.dependent_tasks.extend( [utils.sanitize_task_name(dep) for dep in group_dependencies]) - # Add component spec if not exists - if subgroup_component_name not in pipeline_spec.components: + # Add component spec if not already added from merging pipeline spec. + if not component_spec_added: + subgroup_component_name = utils.make_name_unique_by_adding_index( + name=subgroup_component_name, + collection=list(pipeline_spec.components.keys()), + delimiter='-') + + subgroup_task_spec.component_ref.name = subgroup_component_name pipeline_spec.components[subgroup_component_name].CopyFrom( subgroup_component_spec) @@ -1470,12 +1488,11 @@ def _rename_component_refs( sub_pipeline_spec.root) -def create_pipeline_spec_and_deployment_config( +def create_pipeline_spec( pipeline: pipeline_context.Pipeline, component_spec: structures.ComponentSpec, pipeline_outputs: Optional[Any] = None, -) -> Tuple[pipeline_spec_pb2.PipelineSpec, - pipeline_spec_pb2.PipelineDeploymentConfig]: +) -> pipeline_spec_pb2.PipelineSpec: """Creates a pipeline spec object. Args: @@ -1484,8 +1501,7 @@ def create_pipeline_spec_and_deployment_config( pipeline_outputs: The pipeline outputs via return. Returns: - A tuple of PipelineSpec proto representing the compiled pipeline and its - PipelineDeploymentconfig proto object. + A PipelineSpec proto representing the compiled pipeline. Raises: ValueError if the argument is of unsupported types. @@ -1553,7 +1569,7 @@ def create_pipeline_spec_and_deployment_config( deployment_config=deployment_config, ) - return pipeline_spec, deployment_config + return pipeline_spec def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec, diff --git a/sdk/python/kfp/compiler/read_write_test.py b/sdk/python/kfp/compiler/read_write_test.py index f5bca2a00aa..5054e0efd98 100644 --- a/sdk/python/kfp/compiler/read_write_test.py +++ b/sdk/python/kfp/compiler/read_write_test.py @@ -126,11 +126,15 @@ def load_compiled_file(filename: str) -> Dict[str, Any]: return ignore_kfp_version_helper(contents) -def set_description_in_component_spec_to_none( +def strip_some_component_spec_fields( component_spec: structures.ComponentSpec) -> structures.ComponentSpec: - """Sets the description field of a ComponentSpec to None.""" + """Strips some component spec fields that should be ignored when comparing + with golden result.""" # Ignore description when comparing components specs read in from v1 component YAML and from IR YAML, because non lightweight Python components defined in v1 YAML can have a description field, but IR YAML does not preserve this field unless the component is a lightweight Python function-based component component_spec.description = None + # ignore SDK version so that golden snapshots don't need to be updated between SDK version bump + if component_spec.implementation.graph is not None: + component_spec.implementation.graph.sdk_version = '' return component_spec @@ -158,10 +162,8 @@ def _test_serialization_deserialization_consistency(self, yaml_file: str): reloaded_component = self._compile_and_load_component( original_component) self.assertEqual( - set_description_in_component_spec_to_none( - original_component.component_spec), - set_description_in_component_spec_to_none( - reloaded_component.component_spec)) + strip_some_component_spec_fields(original_component.component_spec), + strip_some_component_spec_fields(reloaded_component.component_spec)) def _test_serialization_correctness(self, python_file: str, diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.py b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.py index 22dcfe14aff..19738bde3aa 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.py +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.py @@ -59,8 +59,8 @@ def pipeline_not_used(): @dsl.pipeline(name='pipeline-in-pipeline-complex') -def my_pipeline(): - print_op1(msg='Hello') +def my_pipeline(msg: str = 'Hello'): + print_op1(msg=msg) with dsl.ParallelFor(['Hello', 'world!']) as item: graph_component(msg=item) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.yaml b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.yaml index e7de2d75f61..ecbeb6a5836 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.yaml +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_complex.yaml @@ -227,9 +227,13 @@ root: inputs: parameters: msg: - runtimeValue: - constant: Hello + componentInputParameter: msg taskInfo: name: print-op1 + inputDefinitions: + parameters: + msg: + defaultValue: Hello + parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.0.0-beta.3 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_loaded_from_yaml.py b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_loaded_from_yaml.py new file mode 100644 index 00000000000..491375a946b --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_loaded_from_yaml.py @@ -0,0 +1,43 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib + +from kfp import compiler +from kfp import components +from kfp import dsl +from kfp.dsl import Artifact +from kfp.dsl import Input + + +@dsl.component +def print_op1(data: Input[Artifact]): + with open(data.path, 'r') as f: + print(f.read()) + + +reuse_yaml_pipeline = components.load_component_from_file( + pathlib.Path(__file__).parent / 'pipeline_with_outputs.yaml') + + +@dsl.pipeline(name='pipeline-in-pipeline') +def my_pipeline(): + task = reuse_yaml_pipeline(msg='Hello') + print_op1(data=task.output) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_loaded_from_yaml.yaml b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_loaded_from_yaml.yaml new file mode 100644 index 00000000000..221a06936ae --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_in_pipeline_loaded_from_yaml.yaml @@ -0,0 +1,264 @@ +components: + comp-inner-pipeline: + dag: + outputs: + artifacts: + data: + artifactSelectors: + - outputArtifactKey: data + producerSubtask: print-op2 + parameters: + msg: + valueFromParameter: + outputParameterKey: Output + producerSubtask: print-op1 + tasks: + print-op1: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-op1-2 + inputs: + parameters: + msg: + componentInputParameter: msg + taskInfo: + name: print-op1 + print-op2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-op2 + dependentTasks: + - print-op1 + inputs: + parameters: + msg: + taskOutputParameter: + outputParameterKey: Output + producerTask: print-op1 + taskInfo: + name: print-op2 + inputDefinitions: + parameters: + msg: + parameterType: STRING + outputDefinitions: + artifacts: + data: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + parameters: + msg: + parameterType: STRING + comp-pipeline-in-pipeline: + dag: + outputs: + artifacts: + Output: + artifactSelectors: + - outputArtifactKey: data + producerSubtask: inner-pipeline + tasks: + inner-pipeline: + cachingOptions: + enableCache: true + componentRef: + name: comp-inner-pipeline + inputs: + parameters: + msg: + runtimeValue: + constant: world + taskInfo: + name: inner-pipeline + print-op1: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-op1 + inputs: + parameters: + msg: + componentInputParameter: msg + taskInfo: + name: print-op1 + inputDefinitions: + parameters: + msg: + defaultValue: Hello + parameterType: STRING + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-print-op1: + executorLabel: exec-print-op1 + inputDefinitions: + parameters: + msg: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-op1-2: + executorLabel: exec-print-op1-2 + inputDefinitions: + parameters: + msg: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-op1-3: + executorLabel: exec-print-op1-3 + inputDefinitions: + artifacts: + data: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-print-op2: + executorLabel: exec-print-op2 + inputDefinitions: + parameters: + msg: + parameterType: STRING + outputDefinitions: + artifacts: + data: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-print-op1: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_op1 + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_op1(msg: str) -> str:\n print(msg)\n return msg\n\ + \n" + image: python:3.7 + exec-print-op1-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_op1 + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_op1(msg: str) -> str:\n print(msg)\n return msg\n\ + \n" + image: python:3.7 + exec-print-op1-3: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_op1 + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.3'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_op1(data: Input[Artifact]):\n with open(data.path, 'r')\ + \ as f:\n print(f.read())\n\n" + image: python:3.7 + exec-print-op2: + container: + args: + - '{{$.inputs.parameters[''msg'']}}' + - '{{$.outputs.artifacts[''data''].path}}' + command: + - sh + - -c + - mkdir --parents $(dirname "$1") && echo "$0" > "$1" + image: alpine +pipelineInfo: + name: pipeline-in-pipeline +root: + dag: + tasks: + pipeline-in-pipeline: + cachingOptions: + enableCache: true + componentRef: + name: comp-pipeline-in-pipeline + inputs: + parameters: + msg: + runtimeValue: + constant: Hello + taskInfo: + name: pipeline-in-pipeline + print-op1: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-op1-3 + dependentTasks: + - pipeline-in-pipeline + inputs: + artifacts: + data: + taskOutputArtifact: + outputArtifactKey: Output + producerTask: pipeline-in-pipeline + taskInfo: + name: print-op1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.3 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.py b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.py index 5590bc50cf3..ed9ee150d05 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.py +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.py @@ -53,8 +53,8 @@ def inner_pipeline( @dsl.pipeline(name='pipeline-in-pipeline') -def my_pipeline() -> Artifact: - task1 = print_op1(msg='Hello') +def my_pipeline(msg: str = 'Hello') -> Artifact: + task1 = print_op1(msg=msg) task2 = inner_pipeline(msg='world') return task2.outputs['data'] diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.yaml b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.yaml index c524d5cdc2b..d49a2a85a0a 100644 --- a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.yaml +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_outputs.yaml @@ -181,10 +181,14 @@ root: inputs: parameters: msg: - runtimeValue: - constant: Hello + componentInputParameter: msg taskInfo: name: print-op1 + inputDefinitions: + parameters: + msg: + defaultValue: Hello + parameterType: STRING outputDefinitions: artifacts: Output: diff --git a/sdk/python/kfp/components/base_component.py b/sdk/python/kfp/components/base_component.py index cba5c5d96f1..3b7e158e5c6 100644 --- a/sdk/python/kfp/components/base_component.py +++ b/sdk/python/kfp/components/base_component.py @@ -18,6 +18,7 @@ from kfp.components import pipeline_task from kfp.components import structures from kfp.components.types import type_utils +from kfp.pipeline_spec import pipeline_spec_pb2 class BaseComponent(abc.ABC): @@ -89,6 +90,11 @@ def __call__(self, *args, **kwargs) -> pipeline_task.PipelineTask: args=task_inputs, ) + @property + def pipeline_spec(self) -> pipeline_spec_pb2.PipelineSpec: + """Returns the pipeline spec of the component.""" + return self.component_spec.to_pipeline_spec() + @abc.abstractmethod def execute(self, **kwargs): """Executes the component locally if implemented by the inheriting diff --git a/sdk/python/kfp/components/graph_component.py b/sdk/python/kfp/components/graph_component.py index 20a3d62e968..9dc8aed5ac5 100644 --- a/sdk/python/kfp/components/graph_component.py +++ b/sdk/python/kfp/components/graph_component.py @@ -22,6 +22,7 @@ from kfp.components import pipeline_channel from kfp.components import pipeline_context from kfp.components import structures +from kfp.pipeline_spec import pipeline_spec_pb2 class GraphComponent(base_component.BaseComponent): @@ -64,18 +65,22 @@ def __init__( pipeline_group = dsl_pipeline.groups[0] pipeline_group.name = uuid.uuid4().hex - self.pipeline_spec, self.deployment_config = ( - builder.create_pipeline_spec_and_deployment_config( - pipeline=dsl_pipeline, - component_spec=self.component_spec, - pipeline_outputs=pipeline_outputs, - )) + pipeline_spec = builder.create_pipeline_spec( + pipeline=dsl_pipeline, + component_spec=self.component_spec, + pipeline_outputs=pipeline_outputs, + ) pipeline_root = getattr(pipeline_func, 'pipeline_root', None) if pipeline_root is not None: - self.pipeline_spec.default_pipeline_root = pipeline_root + pipeline_spec.default_pipeline_root = pipeline_root - self.component_spec.implementation.graph = self.pipeline_spec + self.component_spec.implementation.graph = pipeline_spec + + @property + def pipeline_spec(self) -> pipeline_spec_pb2.PipelineSpec: + """Returns the pipeline spec of the component.""" + return self.component_spec.implementation.graph def execute(self, **kwargs): raise RuntimeError('Graph component has no local execution mode.') diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index 051f911ef9c..435d38692dc 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -67,26 +67,31 @@ def __init__(self, *args, **kwargs) -> None: self._optional = 'default' in kwargs @classmethod - def from_ir_parameter_dict( - cls, ir_parameter_dict: Dict[str, Any]) -> 'InputSpec': + def from_ir_component_inputs_dict( + cls, ir_component_inputs_dict: Dict[str, Any]) -> 'InputSpec': """Creates an InputSpec from a ComponentInputsSpec message in dict format (pipeline_spec.components..inputDefinitions.parameters.). Args: - ir_parameter_dict (Dict[str, Any]): The ComponentInputsSpec message in dict format. + ir_component_inputs_dict (Dict[str, Any]): The ComponentInputsSpec + message in dict format. Returns: InputSpec: The InputSpec object. """ - type_ = type_utils.IR_TYPE_TO_IN_MEMORY_SPEC_TYPE.get( - ir_parameter_dict['parameterType']) + if 'parameterType' in ir_component_inputs_dict: + type_string = ir_component_inputs_dict['parameterType'] + default_value = ir_component_inputs_dict.get('defaultValue') + else: + type_string = ir_component_inputs_dict['artifactType'][ + 'schemaTitle'] + default_value = None + type_ = type_utils.IR_TYPE_TO_IN_MEMORY_SPEC_TYPE.get(type_string) if type_ is None: - raise ValueError( - f'Unknown type {ir_parameter_dict["parameterType"]} found in IR.' - ) - default = ir_parameter_dict.get('defaultValue') - return InputSpec(type=type_, default=default) + raise ValueError(f'Unknown type {type_string} found in IR.') + + return InputSpec(type=type_, default=default_value) def __eq__(self, other: Any) -> bool: """Equality comparison for InputSpec. Robust to different type @@ -118,26 +123,27 @@ class OutputSpec(base_model.BaseModel): type: Union[str, dict] @classmethod - def from_ir_parameter_dict( - cls, ir_parameter_dict: Dict[str, Any]) -> 'OutputSpec': + def from_ir_component_outputs_dict( + cls, ir_component_outputs_dict: Dict[str, Any]) -> 'OutputSpec': """Creates an OutputSpec from a ComponentOutputsSpec message in dict format (pipeline_spec.components..outputDefinitions.parameters|artifacts.). Args: - ir_parameter_dict (Dict[str, Any]): The ComponentOutputsSpec in dict format. + ir_component_outputs_dict (Dict[str, Any]): The ComponentOutputsSpec + in dict format. Returns: OutputSpec: The OutputSpec object. """ - type_string = ir_parameter_dict[ - 'parameterType'] if 'parameterType' in ir_parameter_dict else ir_parameter_dict[ - 'artifactType']['schemaTitle'] + if 'parameterType' in ir_component_outputs_dict: + type_string = ir_component_outputs_dict['parameterType'] + else: + type_string = ir_component_outputs_dict['artifactType'][ + 'schemaTitle'] type_ = type_utils.IR_TYPE_TO_IN_MEMORY_SPEC_TYPE.get(type_string) if type_ is None: - raise ValueError( - f'Unknown type {ir_parameter_dict["parameterType"]} found in IR.' - ) + raise ValueError(f'Unknown type {type_string} found in IR.') return OutputSpec(type=type_) def __eq__(self, other: Any) -> bool: @@ -395,8 +401,8 @@ class Implementation(base_model.BaseModel): graph: Optional['pipeline_spec_pb2.PipelineSpec'] = None @classmethod - def from_deployment_spec_dict(cls, deployment_spec_dict: Dict[str, Any], - component_name: str) -> 'Implementation': + def from_pipeline_spec_dict(cls, pipeline_spec_dict: Dict[str, Any], + component_name: str) -> 'Implementation': """Creates an Implmentation object from a deployment spec message in dict format (pipeline_spec.deploymentSpec). @@ -407,11 +413,17 @@ def from_deployment_spec_dict(cls, deployment_spec_dict: Dict[str, Any], Returns: Implementation: An implementation object. """ - executor_key = utils._EXECUTOR_LABEL_PREFIX + component_name - container = deployment_spec_dict['executors'][executor_key]['container'] - container_spec = ContainerSpecImplementation.from_container_dict( - container) - return Implementation(container=container_spec) + executor_key = utils.sanitize_executor_label(component_name) + executor = pipeline_spec_dict['deploymentSpec']['executors'].get( + executor_key) + if executor is not None: + container_spec = ContainerSpecImplementation.from_container_dict( + executor['container']) if executor else None + return Implementation(container=container_spec) + else: + pipeline_spec = json_format.ParseDict( + pipeline_spec_dict, pipeline_spec_pb2.PipelineSpec()) + return Implementation(graph=pipeline_spec) def _check_valid_placeholder_reference( @@ -511,18 +523,16 @@ def transform_outputs(self) -> None: def validate_placeholders(self): """Validates that input/output placeholders refer to an existing input/output.""" - implementation = self.implementation - if getattr(implementation, 'container', None) is None: + if self.implementation.container is None: return - containerSpecImplementation: ContainerSpecImplementation = implementation.container - valid_inputs = [] if self.inputs is None else list(self.inputs.keys()) valid_outputs = [] if self.outputs is None else list( self.outputs.keys()) - for arg in itertools.chain((containerSpecImplementation.command or []), - (containerSpecImplementation.args or [])): + for arg in itertools.chain( + (self.implementation.container.command or []), + (self.implementation.container.args or [])): _check_valid_placeholder_reference(valid_inputs, valid_outputs, arg) @classmethod @@ -596,32 +606,28 @@ def from_pipeline_spec_dict( cls, pipeline_spec_dict: Dict[str, Any]) -> 'ComponentSpec': raw_name = pipeline_spec_dict['pipelineInfo']['name'] - implementation = Implementation.from_deployment_spec_dict( - pipeline_spec_dict['deploymentSpec'], raw_name) - - def inputs_dict_from_components_dict( - components_dict: Dict[str, Any], - component_name: str) -> Dict[str, InputSpec]: - component_key = utils._COMPONENT_NAME_PREFIX + component_name - parameters = components_dict[component_key].get( - 'inputDefinitions', {}).get('parameters', {}) + def inputs_dict_from_component_spec_dict( + component_spec_dict: Dict[str, Any]) -> Dict[str, InputSpec]: + parameters = component_spec_dict.get('inputDefinitions', + {}).get('parameters', {}) + artifacts = component_spec_dict.get('inputDefinitions', + {}).get('artifacts', {}) + all_inputs = {**parameters, **artifacts} return { - name: InputSpec.from_ir_parameter_dict(parameter_dict) - for name, parameter_dict in parameters.items() + name: InputSpec.from_ir_component_inputs_dict(input_dict) + for name, input_dict in all_inputs.items() } - def outputs_dict_from_components_dict( - components_dict: Dict[str, Any], - component_name: str) -> Dict[str, OutputSpec]: - component_key = utils._COMPONENT_NAME_PREFIX + component_name - parameters = components_dict[component_key].get( - 'outputDefinitions', {}).get('parameters', {}) - artifacts = components_dict[component_key].get( - 'outputDefinitions', {}).get('artifacts', {}) + def outputs_dict_from_component_spec_dict( + components_spec_dict: Dict[str, Any]) -> Dict[str, OutputSpec]: + parameters = component_spec_dict.get('outputDefinitions', + {}).get('parameters', {}) + artifacts = components_spec_dict.get('outputDefinitions', + {}).get('artifacts', {}) all_outputs = {**parameters, **artifacts} return { - name: OutputSpec.from_ir_parameter_dict(parameter_dict) - for name, parameter_dict in all_outputs.items() + name: OutputSpec.from_ir_component_outputs_dict(output_dict) + for name, output_dict in all_outputs.items() } def extract_description_from_command( @@ -637,13 +643,20 @@ def extract_description_from_command( return docstring return None - inputs = inputs_dict_from_components_dict( - pipeline_spec_dict['components'], raw_name) - outputs = outputs_dict_from_components_dict( - pipeline_spec_dict['components'], raw_name) + component_key = utils.sanitize_component_name(raw_name) + component_spec_dict = pipeline_spec_dict['components'].get( + component_key, pipeline_spec_dict['root']) + + inputs = inputs_dict_from_component_spec_dict(component_spec_dict) + outputs = outputs_dict_from_component_spec_dict(component_spec_dict) + + implementation = Implementation.from_pipeline_spec_dict( + pipeline_spec_dict, raw_name) description = extract_description_from_command( - implementation.container.command or []) + implementation.container.command or + []) if implementation.container else None + return ComponentSpec( name=raw_name, implementation=implementation, diff --git a/sdk/python/kfp/components/structures_test.py b/sdk/python/kfp/components/structures_test.py index 114e2ab08a7..100ef63e785 100644 --- a/sdk/python/kfp/components/structures_test.py +++ b/sdk/python/kfp/components/structures_test.py @@ -255,12 +255,17 @@ def test_simple_component_spec_save_to_component_yaml(self): inputs={'input1': structures.InputSpec(type='String')}, outputs={'output1': structures.OutputSpec(type='String')}, ) - from kfp.components import yaml_component - yaml_component = yaml_component.YamlComponent( - component_spec=original_component_spec) + from kfp.components import base_component + + class TestComponent(base_component.BaseComponent): + + def execute(self, **kwargs): + pass + + test_component = TestComponent(component_spec=original_component_spec) with tempfile.TemporaryDirectory() as tempdir: output_path = os.path.join(tempdir, 'component.yaml') - compiler.Compiler().compile(yaml_component, output_path) + compiler.Compiler().compile(test_component, output_path) # test that it can be read back correctly with open(output_path, 'r') as f: @@ -580,14 +585,16 @@ def test_optional(self): self.assertEqual(input_spec.default, None) self.assertEqual(input_spec._optional, False) - def test_from_ir_parameter_dict(self): + def test_from_ir_component_inputs_dict(self): parameter_dict = {'parameterType': 'STRING'} - input_spec = structures.InputSpec.from_ir_parameter_dict(parameter_dict) + input_spec = structures.InputSpec.from_ir_component_inputs_dict( + parameter_dict) self.assertEqual(input_spec.type, 'String') self.assertEqual(input_spec.default, None) parameter_dict = {'parameterType': 'NUMBER_INTEGER'} - input_spec = structures.InputSpec.from_ir_parameter_dict(parameter_dict) + input_spec = structures.InputSpec.from_ir_component_inputs_dict( + parameter_dict) self.assertEqual(input_spec.type, 'Integer') self.assertEqual(input_spec.default, None) @@ -595,20 +602,32 @@ def test_from_ir_parameter_dict(self): 'defaultValue': 'default value', 'parameterType': 'STRING' } - input_spec = structures.InputSpec.from_ir_parameter_dict(parameter_dict) + input_spec = structures.InputSpec.from_ir_component_inputs_dict( + parameter_dict) self.assertEqual(input_spec.type, 'String') self.assertEqual(input_spec.default, 'default value') - input_spec = structures.InputSpec.from_ir_parameter_dict(parameter_dict) + input_spec = structures.InputSpec.from_ir_component_inputs_dict( + parameter_dict) self.assertEqual(input_spec.type, 'String') self.assertEqual(input_spec.default, 'default value') + artifact_dict = { + 'artifactType': { + 'schemaTitle': 'system.Artifact', + 'schemaVersion': '0.0.1' + } + } + input_spec = structures.InputSpec.from_ir_component_inputs_dict( + artifact_dict) + self.assertEqual(input_spec.type, 'Artifact') + class TestOutputSpec(parameterized.TestCase): - def test_from_ir_parameter_dict(self): + def test_from_ir_component_outputs_dict(self): parameter_dict = {'parameterType': 'STRING'} - output_spec = structures.OutputSpec.from_ir_parameter_dict( + output_spec = structures.OutputSpec.from_ir_component_outputs_dict( parameter_dict) self.assertEqual(output_spec.type, 'String') @@ -618,7 +637,7 @@ def test_from_ir_parameter_dict(self): 'schemaVersion': '0.0.1' } } - output_spec = structures.OutputSpec.from_ir_parameter_dict( + output_spec = structures.OutputSpec.from_ir_component_outputs_dict( artifact_dict) self.assertEqual(output_spec.type, 'Artifact') diff --git a/sdk/python/kfp/components/yaml_component.py b/sdk/python/kfp/components/yaml_component.py index 8e97192829c..48aaf5029b3 100644 --- a/sdk/python/kfp/components/yaml_component.py +++ b/sdk/python/kfp/components/yaml_component.py @@ -15,9 +15,12 @@ from typing import Optional, Tuple +from google.protobuf import json_format from kfp import components from kfp.components import structures +from kfp.pipeline_spec import pipeline_spec_pb2 import requests +import yaml class YamlComponent(components.BaseComponent): @@ -25,10 +28,30 @@ class YamlComponent(components.BaseComponent): **Note:** ``YamlComponent`` is not intended to be used to construct components directly. Use ``kfp.components.load_component_from_*()`` instead. - Args: + Artribute: component_spec: Component definition. + component_yaml: The yaml string that this component is loaded from. """ + def __init__( + self, + component_spec: structures.ComponentSpec, + component_yaml: str, + ): + super().__init__(component_spec=component_spec) + self.component_yaml = component_yaml + + @property + def pipeline_spec(self) -> pipeline_spec_pb2.PipelineSpec: + """Returns the pipeline spec of the component.""" + component_dict = yaml.safe_load(self.component_yaml) + is_v1 = 'implementation' in set(component_dict.keys()) + if is_v1: + return self.component_spec.to_pipeline_spec() + else: + return json_format.ParseDict(component_dict, + pipeline_spec_pb2.PipelineSpec()) + def execute(self, *args, **kwargs): """Not implemented.""" raise NotImplementedError @@ -44,7 +67,8 @@ def load_component_from_text(text: str) -> YamlComponent: Component loaded from YAML. """ return YamlComponent( - structures.ComponentSpec.load_from_component_yaml(text)) + component_spec=structures.ComponentSpec.load_from_component_yaml(text), + component_yaml=text) def load_component_from_file(file_path: str) -> YamlComponent: