From 320bb17c094200a876a04769da4f9322b3e33c5e Mon Sep 17 00:00:00 2001 From: chongyouquan <48691403+chongyouquan@users.noreply.github.com> Date: Wed, 24 Aug 2022 15:15:07 -0500 Subject: [PATCH] fix(sdk): fix output type of importer (#8172) * change output type of importer to artifact_class.TYPE_NAME * use the same logic as component_factory._annotation_to_type_struct for determining type_name * format using yapf * fix type_name * format using yapf * add yaml file * changes with regards to PR comments * format pipeline_with_importer_and_gcpc_types.py * remove unused imports * change name of test component * add pipeline_with_importer_and_gcpc_types to config * update yaml file --- .../kfp/compiler/_read_write_test_config.py | 1 + .../pipeline_with_importer_and_gcpc_types.py | 56 ++++++++++++++ ...pipeline_with_importer_and_gcpc_types.yaml | 74 +++++++++++++++++++ sdk/python/kfp/components/importer_node.py | 12 ++- 4 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.py create mode 100644 sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.yaml diff --git a/sdk/python/kfp/compiler/_read_write_test_config.py b/sdk/python/kfp/compiler/_read_write_test_config.py index 7d4ed87d5df..45221f0fa07 100644 --- a/sdk/python/kfp/compiler/_read_write_test_config.py +++ b/sdk/python/kfp/compiler/_read_write_test_config.py @@ -16,6 +16,7 @@ 'pipelines': { 'test_cases': [ 'pipeline_with_importer', + 'pipeline_with_importer_and_gcpc_types', 'pipeline_with_ontology', 'pipeline_with_if_placeholder', 'pipeline_with_concat_placeholder', diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.py b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.py new file mode 100644 index 00000000000..c3b1a18ccd4 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.py @@ -0,0 +1,56 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline using dsl.importer and GCPC types.""" + +from kfp import compiler +from kfp import components +from kfp import dsl +from kfp.dsl import importer + + +class VertexDataset(dsl.Artifact): + """An artifact representing a GCPC Vertex Dataset.""" + TYPE_NAME = 'google.VertexDataset' + + +consumer_op = components.load_component_from_text(""" +name: consumer_op +inputs: + - {name: dataset, type: google.VertexDataset} +implementation: + container: + image: dummy + command: + - cmd + args: + - {inputPath: dataset} +""") + + +@dsl.pipeline( + name='pipeline-with-importer-and-gcpc-type', pipeline_root='dummy_root') +def my_pipeline(): + + importer1 = importer( + artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt', + artifact_class=VertexDataset, + reimport=False, + metadata={'key': 'value'}) + consume1 = consumer_op(dataset=importer1.output) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.yaml b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.yaml new file mode 100644 index 00000000000..12fed6e66c2 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/pipeline_with_importer_and_gcpc_types.yaml @@ -0,0 +1,74 @@ +components: + comp-consumer-op: + executorLabel: exec-consumer-op + inputDefinitions: + artifacts: + dataset: + artifactType: + schemaTitle: google.VertexDataset + schemaVersion: 0.0.1 + comp-importer: + executorLabel: exec-importer + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: google.VertexDataset + schemaVersion: 0.0.1 +defaultPipelineRoot: dummy_root +deploymentSpec: + executors: + exec-consumer-op: + container: + args: + - '{{$.inputs.artifacts[''dataset''].path}}' + command: + - cmd + image: dummy + exec-importer: + importer: + artifactUri: + constant: gs://ml-pipeline-playground/shakespeare1.txt + metadata: + key: value + typeSchema: + schemaTitle: google.VertexDataset + schemaVersion: 0.0.1 +pipelineInfo: + name: pipeline-with-importer-and-gcpc-type +root: + dag: + tasks: + consumer-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-consumer-op + dependentTasks: + - importer + inputs: + artifacts: + dataset: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer + taskInfo: + name: consumer-op + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + uri: + runtimeValue: + constant: gs://ml-pipeline-playground/shakespeare1.txt + taskInfo: + name: importer +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.3 diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index f4bb995df7a..93a119e7223 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -54,6 +54,14 @@ def pipeline_with_importer(): reimport=False) train(dataset=importer1.output) """ + if issubclass(artifact_class, artifact_types.Artifact + ) and not artifact_class.TYPE_NAME.startswith('system.'): + # For artifact classes not under the `system` namespace, + # use its TYPE_NAME as-is. + type_name = artifact_class.TYPE_NAME + else: + type_name = artifact_class.__name__ + component_spec = structures.ComponentSpec( name='importer', implementation=structures.Implementation( @@ -64,9 +72,7 @@ def pipeline_with_importer(): reimport=reimport, metadata=metadata)), inputs={INPUT_KEY: structures.InputSpec(type='String')}, - outputs={ - OUTPUT_KEY: structures.OutputSpec(type=artifact_class.__name__) - }, + outputs={OUTPUT_KEY: structures.OutputSpec(type=type_name)}, ) importer = importer_component.ImporterComponent(