Skip to content

Commit

Permalink
feat(sdk.v1): add importer_spec metadata to v1 (#7180)
Browse files Browse the repository at this point in the history
* feat(sdk.v1): add importer_spec metadata to v1

* relaese nots
  • Loading branch information
ji-yaqi authored Jan 21, 2022
1 parent 49941a1 commit d3d3c9f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
2 changes: 2 additions & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Major Features and Improvements

* Add importer_spec metadata to v1 [\#7180](https://github.com/kubeflow/pipelines/pull/7180)

## Breaking Changes

### For Pipeline Authors
Expand Down
14 changes: 11 additions & 3 deletions sdk/python/kfp/v2/components/importer_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.
"""Utility function for building Importer Node spec."""

from typing import Union, Type
from typing import Any, Union, Optional, Type, Mapping
from google.protobuf import struct_pb2

from kfp.dsl import _container_op
from kfp.dsl import _pipeline_param
Expand All @@ -28,6 +29,7 @@
def _build_importer_spec(
artifact_uri: Union[_pipeline_param.PipelineParam, str],
artifact_type_schema: pipeline_spec_pb2.ArtifactTypeSchema,
metadata: Optional[Mapping[str, Any]] = None,
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec:
"""Builds an importer executor spec.
Expand All @@ -47,6 +49,11 @@ def _build_importer_spec(
elif isinstance(artifact_uri, str):
importer_spec.artifact_uri.constant_value.string_value = artifact_uri

if metadata:
metadata_protobuf_struct = struct_pb2.Struct()
metadata_protobuf_struct.update(metadata)
importer_spec.metadata.CopyFrom(metadata_protobuf_struct)

return importer_spec


Expand Down Expand Up @@ -112,7 +119,8 @@ def _build_importer_component_spec(

def importer(artifact_uri: Union[_pipeline_param.PipelineParam, str],
artifact_class: Type[artifact_types.Artifact],
reimport: bool = False) -> _container_op.ContainerOp:
reimport: bool = False,
metadata: Optional[Mapping[str, Any]] = None) -> _container_op.ContainerOp:
"""dsl.importer for importing an existing artifact. Only for v2 pipeline.
Args:
Expand Down Expand Up @@ -154,7 +162,7 @@ def importer(artifact_uri: Union[_pipeline_param.PipelineParam, str],

artifact_type_schema = type_utils.get_artifact_type_schema(artifact_class)
task.importer_spec = _build_importer_spec(
artifact_uri=artifact_uri, artifact_type_schema=artifact_type_schema)
artifact_uri=artifact_uri, artifact_type_schema=artifact_type_schema, metadata=metadata)
task.task_spec = _build_importer_task_spec(
importer_base_name=task.name, artifact_uri=artifact_uri)
task.component_spec = _build_importer_component_spec(
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/kfp/v2/components/importer_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ class ImporterNodeTest(parameterized.TestCase):
'gs://artifact',
'artifact_type_schema':
pb.ArtifactTypeSchema(schema_title='system.Dataset'),
'metadata': {'key':'value'},
'expected_result': {
'artifactUri': {
'constantValue': {
'stringValue': 'gs://artifact'
}
},
'metadata': {
'key': 'value'
},
'typeSchema': {
'schemaTitle': 'system.Dataset'
}
Expand All @@ -47,6 +51,7 @@ class ImporterNodeTest(parameterized.TestCase):
_pipeline_param.PipelineParam(name='uri_to_import'),
'artifact_type_schema':
pb.ArtifactTypeSchema(schema_title='system.Model'),
'metadata': {},
'expected_result': {
'artifactUri': {
'runtimeParameter': 'uri'
Expand All @@ -56,12 +61,11 @@ class ImporterNodeTest(parameterized.TestCase):
}
},
})
def test_build_importer_spec(self, input_uri, artifact_type_schema,
expected_result):
def test_build_importer_spec(self, input_uri, artifact_type_schema, metadata, expected_result):
expected_importer_spec = pb.PipelineDeploymentConfig.ImporterSpec()
json_format.ParseDict(expected_result, expected_importer_spec)
importer_spec = importer_node._build_importer_spec(
artifact_uri=input_uri, artifact_type_schema=artifact_type_schema)
artifact_uri=input_uri, artifact_type_schema=artifact_type_schema, metadata=metadata)

self.maxDiff = None
self.assertEqual(expected_importer_spec, importer_spec)
Expand Down

0 comments on commit d3d3c9f

Please sign in to comment.