From 23ed01480d869baa70526f9ac5fd025410cf3715 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Wed, 5 Jan 2022 16:41:35 -0800 Subject: [PATCH] feat(sdk): add metadata field for importer (#7112) * feat(sdk): add metadata * metadata * release notes * address comments --- sdk/RELEASE.md | 3 ++- sdk/python/kfp/v2/compiler/pipeline_spec_builder.py | 6 +++++- .../test_data/pipeline_with_importer.json | 9 ++++++--- .../test_data/pipeline_with_importer.py | 3 ++- sdk/python/kfp/v2/components/importer_node.py | 9 ++++++--- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 80017d1960c3..b5d666a51d3f 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -8,6 +8,7 @@ * Merge v2 experimental change back to v2 namespace [\#6890](https://github.com/kubeflow/pipelines/pull/6890) * Add ImporterSpec v2 [\#6917](https://github.com/kubeflow/pipelines/pull/6917) * Add add set_env_variable for Pipeline task [\#6919](https://github.com/kubeflow/pipelines/pull/6919) +* Add metadata field for importer [\#7112](https://github.com/kubeflow/pipelines/pull/7112) ## Breaking Changes @@ -69,7 +70,7 @@ ## Documentation Updates * N/A -* + # 1.8.9 ## Major Features and Improvements diff --git a/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py b/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py index 7cfdcd791373..152d6511712d 100644 --- a/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/v2/compiler/pipeline_spec_builder.py @@ -23,7 +23,6 @@ from kfp.v2.components import pipeline_channel from kfp.v2.components import pipeline_task from kfp.v2.components import placeholders -from kfp.v2.components import structures from kfp.v2.components import tasks_group from kfp.v2.components.types import artifact_types from kfp.v2.components.types import type_utils @@ -357,6 +356,11 @@ def build_importer_spec_for_task( type_schema=type_schema, reimport=task.importer_spec.reimport) + if task.importer_spec.metadata: + metadata_protobuf_struct = struct_pb2.Struct() + metadata_protobuf_struct.update(task.importer_spec.metadata) + importer_spec.metadata.CopyFrom(metadata_protobuf_struct) + if isinstance(task.importer_spec.artifact_uri, pipeline_channel.PipelineParameterChannel): importer_spec.artifact_uri.runtime_parameter = 'uri' elif isinstance(task.importer_spec.artifact_uri, str): diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json index d29ee5f8bd97..476b0db9aa8d 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json @@ -163,6 +163,9 @@ "artifactUri": { "constant": "gs://ml-pipeline-playground/shakespeare1.txt" }, + "metadata": { + "key": "value" + }, "typeSchema": { "schemaTitle": "system.Artifact", "schemaVersion": "0.0.1" @@ -192,7 +195,7 @@ "command": [ "sh", "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.9' && \"$0\" \"$@\"\n", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.10' && \"$0\" \"$@\"\n", "sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", @@ -212,7 +215,7 @@ "command": [ "sh", "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.9' && \"$0\" \"$@\"\n", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.10' && \"$0\" \"$@\"\n", "sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", @@ -312,5 +315,5 @@ } }, "schemaVersion": "2.1.0", - "sdkVersion": "kfp-1.8.9" + "sdkVersion": "kfp-1.8.10" } \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py index 3564ea2fe103..46d5a12ef6f4 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py @@ -50,7 +50,8 @@ def my_pipeline(dataset2: str = 'gs://ml-pipeline-playground/shakespeare2.txt'): importer1 = importer( artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt', artifact_class=Dataset, - reimport=False) + reimport=False, + metadata={'key': 'value'}) train1 = train(dataset=importer1.output) with dsl.Condition(train1.outputs['scalar'] == '123'): diff --git a/sdk/python/kfp/v2/components/importer_node.py b/sdk/python/kfp/v2/components/importer_node.py index 1bd377d6b053..0cfefa9aa623 100644 --- a/sdk/python/kfp/v2/components/importer_node.py +++ b/sdk/python/kfp/v2/components/importer_node.py @@ -13,7 +13,7 @@ # limitations under the License. """Utility function for building Importer Node spec.""" -from typing import Union, Type +from typing import Any, Union, Optional, Type, Mapping from kfp.v2.components import pipeline_task from kfp.v2.components import pipeline_channel @@ -29,7 +29,8 @@ def importer(artifact_uri: Union[pipeline_channel.PipelineParameterChannel, str], artifact_class: Type[artifact_types.Artifact], - reimport: bool = False) -> pipeline_task.PipelineTask: + reimport: bool = False, + metadata: Optional[Mapping[str, Any]] = None) -> pipeline_task.PipelineTask: """dsl.importer for importing an existing artifact. Only for v2 pipeline. Args: @@ -37,6 +38,7 @@ def importer(artifact_uri: Union[pipeline_channel.PipelineParameterChannel, artifact_type_schema: The user specified artifact type schema of the artifact to be imported. reimport: Whether to reimport the artifact. Defaults to False. + metadata: Properties of the artifact. Returns: A PipelineTask instance. @@ -52,7 +54,8 @@ def importer(artifact_uri: Union[pipeline_channel.PipelineParameterChannel, artifact_uri=placeholders.input_parameter_placeholder( INPUT_KEY), type_schema=artifact_class.TYPE_NAME, - reimport=reimport)), + reimport=reimport, + metadata=metadata)), inputs={INPUT_KEY: structures.InputSpec(type='String')}, outputs={OUTPUT_KEY: structures.OutputSpec(type='Artifact')}, )