Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): enable dynamic importer metadata #7660

79 changes: 71 additions & 8 deletions sdk/python/kfp/components/importer_node.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 The Kubeflow Authors
# Copyright 2020-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,18 +13,20 @@
# limitations under the License.
"""Utility function for building Importer Node spec."""

from typing import Any, Mapping, Optional, Type, Union
from typing import Any, Dict, Mapping, Optional, Type, Union

from kfp.components import importer_component
from kfp.components import pipeline_channel
from kfp.components import pipeline_task
from kfp.components import placeholders
from kfp.components import structures
from kfp.components import utils
from kfp.components.types import artifact_types
from kfp.components.types import type_utils

INPUT_KEY = 'uri'
URI_KEY = 'uri'
OUTPUT_KEY = 'artifact'
METADATA_KEY = 'metadata'


def importer(
Expand Down Expand Up @@ -56,18 +58,80 @@ def pipeline_with_importer():
train(dataset=importer1.output)
"""

component_inputs: Dict[str, structures.InputSpec] = {}
call_inputs: Dict[str, Any] = {}

def traverse_dict_and_create_metadata_inputs(d: Any) -> Any:
if isinstance(d, pipeline_channel.PipelineParameterChannel):
reversed_call_inputs = {
pipeline_param_chan: name
for name, pipeline_param_chan in call_inputs.items()
}

# minimizes importer spec interface by not creating new
# inputspec/parameters if the same input is used multiple places
# in metadata
unique_name = reversed_call_inputs.get(
d,
utils.make_name_unique_by_adding_index(
METADATA_KEY,
list(call_inputs),
'-',
),
)

call_inputs[unique_name] = d
component_inputs[unique_name] = structures.InputSpec(
type=d.channel_type)

return placeholders.InputValuePlaceholder(
input_name=unique_name)._to_placeholder_string()

elif isinstance(d, dict):
# use this instead of list comprehension to ensure compiles are identical across Python versions
res = {}
for k, v in d.items():
new_k = traverse_dict_and_create_metadata_inputs(k)
new_v = traverse_dict_and_create_metadata_inputs(v)
res[new_k] = new_v
return res

elif isinstance(d, list):
return [traverse_dict_and_create_metadata_inputs(el) for el in d]

elif isinstance(d, str):
# extract pipeline channels from f-strings, if any
pipeline_channels = pipeline_channel.extract_pipeline_channels_from_any(
d)

# pass the channel back into the recursive function to create the placeholder, component inputs, and call inputs, then replace the channel with the placeholder
for channel in pipeline_channels:
input_placeholder = traverse_dict_and_create_metadata_inputs(
channel)
d = d.replace(channel.pattern, input_placeholder)
return d

else:
return d
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR, as I still don't quite follow the practical usage of an importer node when pipeline accepts artifact inputs. But I think there's a bug here in case isinstance(d, str), users may embed a PipelineChannel into a string, e.g.: f'{task1.output}-{name}'. and I think we are missing that parse-and-find logic here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR, as I still don't quite follow the practical usage of an importer node when pipeline accepts artifact inputs.

Agreed -- let's discuss offline about where we can raise this consideration.

I think we are missing that parse-and-find logic here

Updated


metadata_with_placeholders = traverse_dict_and_create_metadata_inputs(
metadata)

component_spec = structures.ComponentSpec(
name='importer',
implementation=structures.Implementation(
importer=structures.ImporterSpec(
artifact_uri=placeholders.InputValuePlaceholder(
INPUT_KEY)._to_placeholder_string(),
URI_KEY)._to_placeholder_string(),
schema_title=type_utils.create_bundled_artifact_type(
artifact_class.schema_title, artifact_class.schema_version),
schema_version=artifact_class.schema_version,
reimport=reimport,
metadata=metadata)),
inputs={INPUT_KEY: structures.InputSpec(type='String')},
metadata=metadata_with_placeholders)),
inputs={
URI_KEY: structures.InputSpec(type='String'),
**component_inputs
},
outputs={
OUTPUT_KEY:
structures.OutputSpec(
Expand All @@ -76,7 +140,6 @@ def pipeline_with_importer():
artifact_class.schema_version))
},
)

importer = importer_component.ImporterComponent(
component_spec=component_spec)
return importer(uri=artifact_uri)
return importer(uri=artifact_uri, **call_inputs)
186 changes: 186 additions & 0 deletions sdk/python/kfp/components/importer_node_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest

from kfp import dsl
from kfp.components import importer_node
from kfp.components.types.artifact_types import Dataset


class TestImporterSupportsDynamicMetadata(unittest.TestCase):

def test_dynamic_dict_element_from_pipeline_param(self):

@dsl.pipeline()
def my_pipeline(meta_inp: str):

dataset = importer_node.importer(
'gs://my_bucket',
Dataset,
reimport=True,
metadata={
'string': meta_inp,
'string-2': meta_inp
})

pipeline_spec = my_pipeline.pipeline_spec
input_keys = list(pipeline_spec.components['comp-importer']
.input_definitions.parameters.keys())
self.assertIn('metadata', input_keys)
deployment_spec = pipeline_spec.deployment_spec.fields[
'executors'].struct_value.fields['exec-importer']
metadata = deployment_spec.struct_value.fields[
'importer'].struct_value.fields['metadata']
self.assertEqual(metadata.struct_value.fields['string'].string_value,
"{{$.inputs.parameters['metadata']}}")
self.assertEqual(metadata.struct_value.fields['string-2'].string_value,
"{{$.inputs.parameters['metadata']}}")

def test_dynamic_list_element_from_pipeline_param(self):

@dsl.pipeline()
def my_pipeline(meta_inp1: str, meta_inp2: int):

dataset = importer_node.importer(
'gs://my_bucket',
Dataset,
reimport=True,
metadata={
'outer_key': [meta_inp1, meta_inp2],
meta_inp1: meta_inp1
})

pipeline_spec = my_pipeline.pipeline_spec
input_keys = list(pipeline_spec.components['comp-importer']
.input_definitions.parameters.keys())
self.assertIn('metadata', input_keys)
deployment_spec = pipeline_spec.deployment_spec.fields[
'executors'].struct_value.fields['exec-importer']
metadata = deployment_spec.struct_value.fields[
'importer'].struct_value.fields['metadata']
self.assertEqual(
metadata.struct_value.fields['outer_key'].list_value.values[0]
.string_value, "{{$.inputs.parameters['metadata']}}")
self.assertEqual(
metadata.struct_value.fields['outer_key'].list_value.values[1]
.string_value, "{{$.inputs.parameters['metadata-2']}}")
self.assertEqual(
metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"]
.string_value, "{{$.inputs.parameters['metadata']}}")

def test_dynamic_dict_element_from_task_output(self):

@dsl.component
def string_task(string: str) -> str:
return 'string'

@dsl.pipeline()
def my_pipeline():
task1 = string_task(string='string1')
task2 = string_task(string='string2')
dataset = importer_node.importer(
'gs://my_bucket',
Dataset,
reimport=True,
metadata={
'string-1': task1.output,
'string-2': task2.output
})

pipeline_spec = my_pipeline.pipeline_spec
input_keys = list(pipeline_spec.components['comp-importer']
.input_definitions.parameters.keys())
self.assertIn('metadata', input_keys)
self.assertIn('metadata-2', input_keys)

deployment_spec = pipeline_spec.deployment_spec.fields[
'executors'].struct_value.fields['exec-importer']
metadata = deployment_spec.struct_value.fields[
'importer'].struct_value.fields['metadata']
self.assertEqual(metadata.struct_value.fields['string-1'].string_value,
"{{$.inputs.parameters['metadata']}}")
self.assertEqual(metadata.struct_value.fields['string-2'].string_value,
"{{$.inputs.parameters['metadata-2']}}")

def test_dynamic_list_element_from_task_output(self):

@dsl.component
def string_task() -> str:
return 'string'

@dsl.pipeline()
def my_pipeline():
task = string_task()
dataset = importer_node.importer(
'gs://my_bucket',
Dataset,
reimport=True,
metadata={
'outer_key': [task.output, task.output],
task.output: task.output
})

pipeline_spec = my_pipeline.pipeline_spec
input_keys = list(pipeline_spec.components['comp-importer']
.input_definitions.parameters.keys())
self.assertIn('metadata', input_keys)
self.assertNotIn('metadata-2', input_keys)
deployment_spec = pipeline_spec.deployment_spec.fields[
'executors'].struct_value.fields['exec-importer']
metadata = deployment_spec.struct_value.fields[
'importer'].struct_value.fields['metadata']
self.assertEqual(
metadata.struct_value.fields['outer_key'].list_value.values[0]
.string_value, "{{$.inputs.parameters['metadata']}}")
self.assertEqual(
metadata.struct_value.fields['outer_key'].list_value.values[1]
.string_value, "{{$.inputs.parameters['metadata']}}")
self.assertEqual(
metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"]
.string_value, "{{$.inputs.parameters['metadata']}}")

def test_dynamic_fstring_in_metadata(self):

@dsl.component
def string_task() -> str:
return 'string'

@dsl.pipeline()
def my_pipeline(integer: int = 1):
task = string_task()
dataset = importer_node.importer(
'gs://my_bucket',
Dataset,
reimport=True,
metadata={
f'prefix1-{integer}': f'prefix2-{task.output}',
'key': 'value'
})

pipeline_spec = my_pipeline.pipeline_spec
input_keys = list(pipeline_spec.components['comp-importer']
.input_definitions.parameters.keys())
self.assertIn('metadata', input_keys)
self.assertIn('metadata-2', input_keys)
self.assertNotIn('metadata-3', input_keys)
deployment_spec = pipeline_spec.deployment_spec.fields[
'executors'].struct_value.fields['exec-importer']
metadata = deployment_spec.struct_value.fields[
'importer'].struct_value.fields['metadata']
self.assertEqual(
metadata.struct_value.fields[
"prefix1-{{$.inputs.parameters[\'metadata\']}}"].string_value,
"prefix2-{{$.inputs.parameters[\'metadata-2\']}}")
self.assertEqual(metadata.struct_value.fields['key'].string_value,
'value')
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline using dsl.importer, with dynamic metadata."""

from kfp import compiler
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import importer

DEFAULT_ARTIFACT_URI = 'gs://ml-pipeline-playground/shakespeare1.txt'
DEFAULT_IMAGE_URI = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest'


@dsl.component
def make_name(name: str) -> str:
return name


@dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root')
def my_pipeline(name: str = 'default-name',
int_input: int = 1,
pipeline_input_artifact_uri: str = DEFAULT_ARTIFACT_URI,
pipeline_input_image_uri: str = DEFAULT_IMAGE_URI):

importer1 = importer(
artifact_uri=pipeline_input_artifact_uri,
artifact_class=Dataset,
reimport=False,
metadata={
'name': [name, 'alias-name'],
'containerSpec': {
'imageUri': pipeline_input_image_uri
}
})

make_name_op = make_name(name='a-different-name')

importer2 = importer(
artifact_uri=DEFAULT_ARTIFACT_URI,
artifact_class=Dataset,
reimport=False,
metadata={
'name': f'prefix-{make_name_op.output}',
'list-of-data': [make_name_op.output, name, int_input],
make_name_op.output: make_name_op.output,
name: DEFAULT_IMAGE_URI,
'containerSpec': {
'imageUri': DEFAULT_IMAGE_URI
}
})


if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))
Loading