diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 6497b2337c4..4f301354753 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -2,6 +2,8 @@ ## Major Features and Improvements +* Add importer_spec metadata to v1 [\#7180](https://github.com/kubeflow/pipelines/pull/7180) + ## Breaking Changes ### For Pipeline Authors diff --git a/sdk/python/kfp/v2/components/importer_node.py b/sdk/python/kfp/v2/components/importer_node.py index 38ccbc8b41d..94b17d8a4b7 100644 --- a/sdk/python/kfp/v2/components/importer_node.py +++ b/sdk/python/kfp/v2/components/importer_node.py @@ -13,7 +13,8 @@ # limitations under the License. """Utility function for building Importer Node spec.""" -from typing import Union, Type +from typing import Any, Union, Optional, Type, Mapping +from google.protobuf import struct_pb2 from kfp.dsl import _container_op from kfp.dsl import _pipeline_param @@ -28,6 +29,7 @@ def _build_importer_spec( artifact_uri: Union[_pipeline_param.PipelineParam, str], artifact_type_schema: pipeline_spec_pb2.ArtifactTypeSchema, + metadata: Optional[Mapping[str, Any]] = None, ) -> pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec: """Builds an importer executor spec. @@ -47,6 +49,11 @@ def _build_importer_spec( elif isinstance(artifact_uri, str): importer_spec.artifact_uri.constant_value.string_value = artifact_uri + if metadata: + metadata_protobuf_struct = struct_pb2.Struct() + metadata_protobuf_struct.update(metadata) + importer_spec.metadata.CopyFrom(metadata_protobuf_struct) + return importer_spec @@ -112,7 +119,8 @@ def _build_importer_component_spec( def importer(artifact_uri: Union[_pipeline_param.PipelineParam, str], artifact_class: Type[artifact_types.Artifact], - reimport: bool = False) -> _container_op.ContainerOp: + reimport: bool = False, + metadata: Optional[Mapping[str, Any]] = None) -> _container_op.ContainerOp: """dsl.importer for importing an existing artifact. Only for v2 pipeline. Args: @@ -154,7 +162,7 @@ def importer(artifact_uri: Union[_pipeline_param.PipelineParam, str], artifact_type_schema = type_utils.get_artifact_type_schema(artifact_class) task.importer_spec = _build_importer_spec( - artifact_uri=artifact_uri, artifact_type_schema=artifact_type_schema) + artifact_uri=artifact_uri, artifact_type_schema=artifact_type_schema, metadata=metadata) task.task_spec = _build_importer_task_spec( importer_base_name=task.name, artifact_uri=artifact_uri) task.component_spec = _build_importer_component_spec( diff --git a/sdk/python/kfp/v2/components/importer_node_test.py b/sdk/python/kfp/v2/components/importer_node_test.py index 041591b2742..76fabaa9d76 100644 --- a/sdk/python/kfp/v2/components/importer_node_test.py +++ b/sdk/python/kfp/v2/components/importer_node_test.py @@ -30,12 +30,16 @@ class ImporterNodeTest(parameterized.TestCase): 'gs://artifact', 'artifact_type_schema': pb.ArtifactTypeSchema(schema_title='system.Dataset'), + 'metadata': {'key':'value'}, 'expected_result': { 'artifactUri': { 'constantValue': { 'stringValue': 'gs://artifact' } }, + 'metadata': { + 'key': 'value' + }, 'typeSchema': { 'schemaTitle': 'system.Dataset' } @@ -47,6 +51,7 @@ class ImporterNodeTest(parameterized.TestCase): _pipeline_param.PipelineParam(name='uri_to_import'), 'artifact_type_schema': pb.ArtifactTypeSchema(schema_title='system.Model'), + 'metadata': {}, 'expected_result': { 'artifactUri': { 'runtimeParameter': 'uri' @@ -56,12 +61,11 @@ class ImporterNodeTest(parameterized.TestCase): } }, }) - def test_build_importer_spec(self, input_uri, artifact_type_schema, - expected_result): + def test_build_importer_spec(self, input_uri, artifact_type_schema, metadata, expected_result): expected_importer_spec = pb.PipelineDeploymentConfig.ImporterSpec() json_format.ParseDict(expected_result, expected_importer_spec) importer_spec = importer_node._build_importer_spec( - artifact_uri=input_uri, artifact_type_schema=artifact_type_schema) + artifact_uri=input_uri, artifact_type_schema=artifact_type_schema, metadata=metadata) self.maxDiff = None self.assertEqual(expected_importer_spec, importer_spec)