Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add metadata field for importer #7112

Merged
merged 4 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Merge v2 experimental change back to v2 namespace [\#6890](https://github.com/kubeflow/pipelines/pull/6890)
* Add ImporterSpec v2 [\#6917](https://github.com/kubeflow/pipelines/pull/6917)
* Add add set_env_variable for Pipeline task [\#6919](https://github.com/kubeflow/pipelines/pull/6919)
* Add metadata field for importer [\#7112](https://github.com/kubeflow/pipelines/pull/7112)

## Breaking Changes

Expand Down Expand Up @@ -69,7 +70,7 @@
## Documentation Updates

* N/A
*

# 1.8.9

## Major Features and Improvements
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/kfp/v2/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from kfp.v2.components import pipeline_channel
from kfp.v2.components import pipeline_task
from kfp.v2.components import placeholders
from kfp.v2.components import structures
from kfp.v2.components import tasks_group
from kfp.v2.components.types import artifact_types
from kfp.v2.components.types import type_utils
Expand Down Expand Up @@ -357,6 +356,11 @@ def build_importer_spec_for_task(
type_schema=type_schema,
reimport=task.importer_spec.reimport)

if task.importer_spec.metadata:
metadata_protobuf_struct = struct_pb2.Struct()
metadata_protobuf_struct.update(task.importer_spec.metadata)
importer_spec.metadata.CopyFrom(metadata_protobuf_struct)

if isinstance(task.importer_spec.artifact_uri, pipeline_channel.PipelineParameterChannel):
importer_spec.artifact_uri.runtime_parameter = 'uri'
elif isinstance(task.importer_spec.artifact_uri, str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@
"artifactUri": {
"constant": "gs://ml-pipeline-playground/shakespeare1.txt"
},
"metadata": {
"key": "value"
},
"typeSchema": {
"schemaTitle": "system.Artifact",
"schemaVersion": "0.0.1"
Expand Down Expand Up @@ -192,7 +195,7 @@
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.9' && \"$0\" \"$@\"\n",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.10' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
Expand All @@ -212,7 +215,7 @@
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.9' && \"$0\" \"$@\"\n",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.10' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
Expand Down Expand Up @@ -312,5 +315,5 @@
}
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-1.8.9"
"sdkVersion": "kfp-1.8.10"
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def my_pipeline(dataset2: str = 'gs://ml-pipeline-playground/shakespeare2.txt'):
importer1 = importer(
artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
artifact_class=Dataset,
reimport=False)
reimport=False,
metadata={'key': 'value'})
train1 = train(dataset=importer1.output)

with dsl.Condition(train1.outputs['scalar'] == '123'):
Expand Down
9 changes: 6 additions & 3 deletions sdk/python/kfp/v2/components/importer_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""Utility function for building Importer Node spec."""

from typing import Union, Type
from typing import Any, Union, Optional, Type, Mapping

from kfp.v2.components import pipeline_task
from kfp.v2.components import pipeline_channel
Expand All @@ -29,14 +29,16 @@
def importer(artifact_uri: Union[pipeline_channel.PipelineParameterChannel,
str],
artifact_class: Type[artifact_types.Artifact],
reimport: bool = False) -> pipeline_task.PipelineTask:
reimport: bool = False,
metadata: Optional[Mapping[str, Any]] = None) -> pipeline_task.PipelineTask:
"""dsl.importer for importing an existing artifact. Only for v2 pipeline.

Args:
artifact_uri: The artifact uri to import from.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
reimport: Whether to reimport the artifact. Defaults to False.
metadata: Properties of the artifact.

Returns:
A PipelineTask instance.
Expand All @@ -52,7 +54,8 @@ def importer(artifact_uri: Union[pipeline_channel.PipelineParameterChannel,
artifact_uri=placeholders.input_parameter_placeholder(
INPUT_KEY),
type_schema=artifact_class.TYPE_NAME,
reimport=reimport)),
reimport=reimport,
metadata=metadata)),
inputs={INPUT_KEY: structures.InputSpec(type='String')},
outputs={OUTPUT_KEY: structures.OutputSpec(type='Artifact')},
)
Expand Down