Skip to content

Commit

Permalink
Support GCPC types with function-based component
Browse files Browse the repository at this point in the history
  • Loading branch information
chensun committed Oct 7, 2021
1 parent 74c7773 commit 58de55d
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 1 deletion.
3 changes: 3 additions & 0 deletions sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ def test_v2_component_with_optional_inputs(self):
def test_experimental_v2_component(self):
self._test_compile_py_to_json('experimental_v2_component')

def test_pipeline_with_gcpc_types(self):
self._test_compile_py_to_json('pipeline_with_gcpc_types')


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
{
"pipelineSpec": {
"components": {
"comp-consumer-op": {
"executorLabel": "exec-consumer-op",
"inputDefinitions": {
"artifacts": {
"model": {
"artifactType": {
"schemaTitle": "google.VertexModel",
"schemaVersion": "0.0.1"
}
}
}
}
},
"comp-producer": {
"executorLabel": "exec-producer",
"outputDefinitions": {
"artifacts": {
"model": {
"artifactType": {
"schemaTitle": "google.VertexModel",
"schemaVersion": "0.0.1"
}
}
}
}
}
},
"deploymentSpec": {
"executors": {
"exec-consumer-op": {
"container": {
"args": [
"--executor_input",
"{{$}}",
"--function_to_execute",
"consumer_op"
],
"command": [
"sh",
"-c",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.4' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.4' --user) && \"$0\" \"$@\"",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef consumer_op(model: Input[VertexModel]):\n pass\n\n"
],
"image": "python:3.7"
}
},
"exec-producer": {
"container": {
"args": [
"{{$.outputs.artifacts['model'].path}}"
],
"command": [
"cmd"
],
"image": "dummy"
}
}
}
},
"pipelineInfo": {
"name": "pipeline-with-gcpc-types"
},
"root": {
"dag": {
"tasks": {
"consumer-op": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-consumer-op"
},
"dependentTasks": [
"producer"
],
"inputs": {
"artifacts": {
"model": {
"taskOutputArtifact": {
"outputArtifactKey": "model",
"producerTask": "producer"
}
}
}
},
"taskInfo": {
"name": "consumer-op"
}
},
"producer": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-producer"
},
"taskInfo": {
"name": "producer"
}
}
}
}
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.8.4"
},
"runtimeConfig": {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import components
from kfp.v2.dsl import component, Input, Output
from kfp.v2 import compiler
from kfp.v2 import dsl


class VertexModel(dsl.Artifact):
TYPE_NAME = 'google.VertexModel'


producer_op = components.load_component_from_text("""
name: producer
outputs:
- {name: model, type: google.VertexModel}
implementation:
container:
image: dummy
command:
- cmd
args:
- {outputPath: model}
""")


@component
def consumer_op(model: Input[VertexModel]):
pass


@dsl.pipeline(name='pipeline-with-gcpc-types')
def my_pipeline():
consumer_op(model=producer_op().outputs['model'])


if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.json'))
8 changes: 7 additions & 1 deletion sdk/python/kfp/v2/components/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ def _annotation_to_type_struct(annotation):
type_struct = _data_passing.get_canonical_type_name_for_type(annotation)
if type_struct:
return type_struct
type_name = str(annotation.__name__)
if issubclass(annotation, artifact_types.Artifact
) and not annotation.TYPE_NAME.startswith('system.'):
# For artifact classes not under the `system` namespace,
# use its TYPE_NAME as-is.
type_name = annotation.TYPE_NAME
else:
type_name = str(annotation.__name__)
elif hasattr(
annotation, '__forward_arg__'
): # Handling typing.ForwardRef('Type_name') (the name was _ForwardRef in python 3.5-3.6)
Expand Down

0 comments on commit 58de55d

Please sign in to comment.