diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index 3c1dba652cf..17322872a37 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -1,4 +1,4 @@ -# Copyright 2020 The Kubeflow Authors +# Copyright 2020-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,18 +13,20 @@ # limitations under the License. """Utility function for building Importer Node spec.""" -from typing import Any, Mapping, Optional, Type, Union +from typing import Any, Dict, Mapping, Optional, Type, Union from kfp.components import importer_component from kfp.components import pipeline_channel from kfp.components import pipeline_task from kfp.components import placeholders from kfp.components import structures +from kfp.components import utils from kfp.components.types import artifact_types from kfp.components.types import type_utils -INPUT_KEY = 'uri' +URI_KEY = 'uri' OUTPUT_KEY = 'artifact' +METADATA_KEY = 'metadata' def importer( @@ -56,18 +58,80 @@ def pipeline_with_importer(): train(dataset=importer1.output) """ + component_inputs: Dict[str, structures.InputSpec] = {} + call_inputs: Dict[str, Any] = {} + + def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: + if isinstance(d, pipeline_channel.PipelineParameterChannel): + reversed_call_inputs = { + pipeline_param_chan: name + for name, pipeline_param_chan in call_inputs.items() + } + + # minimizes importer spec interface by not creating new + # inputspec/parameters if the same input is used multiple places + # in metadata + unique_name = reversed_call_inputs.get( + d, + utils.make_name_unique_by_adding_index( + METADATA_KEY, + list(call_inputs), + '-', + ), + ) + + call_inputs[unique_name] = d + component_inputs[unique_name] = structures.InputSpec( + type=d.channel_type) + + return placeholders.InputValuePlaceholder( + input_name=unique_name)._to_placeholder_string() + + elif isinstance(d, dict): + # use this instead of list comprehension to ensure compiles are identical across Python versions + res = {} + for k, v in d.items(): + new_k = traverse_dict_and_create_metadata_inputs(k) + new_v = traverse_dict_and_create_metadata_inputs(v) + res[new_k] = new_v + return res + + elif isinstance(d, list): + return [traverse_dict_and_create_metadata_inputs(el) for el in d] + + elif isinstance(d, str): + # extract pipeline channels from f-strings, if any + pipeline_channels = pipeline_channel.extract_pipeline_channels_from_any( + d) + + # pass the channel back into the recursive function to create the placeholder, component inputs, and call inputs, then replace the channel with the placeholder + for channel in pipeline_channels: + input_placeholder = traverse_dict_and_create_metadata_inputs( + channel) + d = d.replace(channel.pattern, input_placeholder) + return d + + else: + return d + + metadata_with_placeholders = traverse_dict_and_create_metadata_inputs( + metadata) + component_spec = structures.ComponentSpec( name='importer', implementation=structures.Implementation( importer=structures.ImporterSpec( artifact_uri=placeholders.InputValuePlaceholder( - INPUT_KEY)._to_placeholder_string(), + URI_KEY)._to_placeholder_string(), schema_title=type_utils.create_bundled_artifact_type( artifact_class.schema_title, artifact_class.schema_version), schema_version=artifact_class.schema_version, reimport=reimport, - metadata=metadata)), - inputs={INPUT_KEY: structures.InputSpec(type='String')}, + metadata=metadata_with_placeholders)), + inputs={ + URI_KEY: structures.InputSpec(type='String'), + **component_inputs + }, outputs={ OUTPUT_KEY: structures.OutputSpec( @@ -76,7 +140,6 @@ def pipeline_with_importer(): artifact_class.schema_version)) }, ) - importer = importer_component.ImporterComponent( component_spec=component_spec) - return importer(uri=artifact_uri) + return importer(uri=artifact_uri, **call_inputs) diff --git a/sdk/python/kfp/components/importer_node_test.py b/sdk/python/kfp/components/importer_node_test.py new file mode 100644 index 00000000000..d6066b5a4da --- /dev/null +++ b/sdk/python/kfp/components/importer_node_test.py @@ -0,0 +1,186 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest + +from kfp import dsl +from kfp.components import importer_node +from kfp.components.types.artifact_types import Dataset + + +class TestImporterSupportsDynamicMetadata(unittest.TestCase): + + def test_dynamic_dict_element_from_pipeline_param(self): + + @dsl.pipeline() + def my_pipeline(meta_inp: str): + + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'string': meta_inp, + 'string-2': meta_inp + }) + + pipeline_spec = my_pipeline.pipeline_spec + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('metadata', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual(metadata.struct_value.fields['string'].string_value, + "{{$.inputs.parameters['metadata']}}") + self.assertEqual(metadata.struct_value.fields['string-2'].string_value, + "{{$.inputs.parameters['metadata']}}") + + def test_dynamic_list_element_from_pipeline_param(self): + + @dsl.pipeline() + def my_pipeline(meta_inp1: str, meta_inp2: int): + + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'outer_key': [meta_inp1, meta_inp2], + meta_inp1: meta_inp1 + }) + + pipeline_spec = my_pipeline.pipeline_spec + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('metadata', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[0] + .string_value, "{{$.inputs.parameters['metadata']}}") + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[1] + .string_value, "{{$.inputs.parameters['metadata-2']}}") + self.assertEqual( + metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"] + .string_value, "{{$.inputs.parameters['metadata']}}") + + def test_dynamic_dict_element_from_task_output(self): + + @dsl.component + def string_task(string: str) -> str: + return 'string' + + @dsl.pipeline() + def my_pipeline(): + task1 = string_task(string='string1') + task2 = string_task(string='string2') + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'string-1': task1.output, + 'string-2': task2.output + }) + + pipeline_spec = my_pipeline.pipeline_spec + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('metadata', input_keys) + self.assertIn('metadata-2', input_keys) + + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual(metadata.struct_value.fields['string-1'].string_value, + "{{$.inputs.parameters['metadata']}}") + self.assertEqual(metadata.struct_value.fields['string-2'].string_value, + "{{$.inputs.parameters['metadata-2']}}") + + def test_dynamic_list_element_from_task_output(self): + + @dsl.component + def string_task() -> str: + return 'string' + + @dsl.pipeline() + def my_pipeline(): + task = string_task() + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'outer_key': [task.output, task.output], + task.output: task.output + }) + + pipeline_spec = my_pipeline.pipeline_spec + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('metadata', input_keys) + self.assertNotIn('metadata-2', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[0] + .string_value, "{{$.inputs.parameters['metadata']}}") + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[1] + .string_value, "{{$.inputs.parameters['metadata']}}") + self.assertEqual( + metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"] + .string_value, "{{$.inputs.parameters['metadata']}}") + + def test_dynamic_fstring_in_metadata(self): + + @dsl.component + def string_task() -> str: + return 'string' + + @dsl.pipeline() + def my_pipeline(integer: int = 1): + task = string_task() + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + f'prefix1-{integer}': f'prefix2-{task.output}', + 'key': 'value' + }) + + pipeline_spec = my_pipeline.pipeline_spec + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('metadata', input_keys) + self.assertIn('metadata-2', input_keys) + self.assertNotIn('metadata-3', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual( + metadata.struct_value.fields[ + "prefix1-{{$.inputs.parameters[\'metadata\']}}"].string_value, + "prefix2-{{$.inputs.parameters[\'metadata-2\']}}") + self.assertEqual(metadata.struct_value.fields['key'].string_value, + 'value') diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py new file mode 100644 index 00000000000..099bd2d8dc1 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py @@ -0,0 +1,67 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline using dsl.importer, with dynamic metadata.""" + +from kfp import compiler +from kfp import dsl +from kfp.dsl import Dataset +from kfp.dsl import importer + +DEFAULT_ARTIFACT_URI = 'gs://ml-pipeline-playground/shakespeare1.txt' +DEFAULT_IMAGE_URI = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest' + + +@dsl.component +def make_name(name: str) -> str: + return name + + +@dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root') +def my_pipeline(name: str = 'default-name', + int_input: int = 1, + pipeline_input_artifact_uri: str = DEFAULT_ARTIFACT_URI, + pipeline_input_image_uri: str = DEFAULT_IMAGE_URI): + + importer1 = importer( + artifact_uri=pipeline_input_artifact_uri, + artifact_class=Dataset, + reimport=False, + metadata={ + 'name': [name, 'alias-name'], + 'containerSpec': { + 'imageUri': pipeline_input_image_uri + } + }) + + make_name_op = make_name(name='a-different-name') + + importer2 = importer( + artifact_uri=DEFAULT_ARTIFACT_URI, + artifact_class=Dataset, + reimport=False, + metadata={ + 'name': f'prefix-{make_name_op.output}', + 'list-of-data': [make_name_op.output, name, int_input], + make_name_op.output: make_name_op.output, + name: DEFAULT_IMAGE_URI, + 'containerSpec': { + 'imageUri': DEFAULT_IMAGE_URI + } + }) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml new file mode 100644 index 00000000000..b7a034e0a87 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml @@ -0,0 +1,174 @@ +components: + comp-importer: + executorLabel: exec-importer + inputDefinitions: + parameters: + metadata: + parameterType: STRING + metadata-2: + parameterType: STRING + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-importer-2: + executorLabel: exec-importer-2 + inputDefinitions: + parameters: + metadata: + parameterType: STRING + metadata-2: + parameterType: STRING + metadata-3: + parameterType: NUMBER_INTEGER + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-name: + executorLabel: exec-make-name + inputDefinitions: + parameters: + name: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +defaultPipelineRoot: dummy_root +deploymentSpec: + executors: + exec-importer: + importer: + artifactUri: + runtimeParameter: uri + metadata: + containerSpec: + imageUri: '{{$.inputs.parameters[''metadata-2'']}}' + name: + - '{{$.inputs.parameters[''metadata'']}}' + - alias-name + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-importer-2: + importer: + artifactUri: + constant: gs://ml-pipeline-playground/shakespeare1.txt + metadata: + containerSpec: + imageUri: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest + list-of-data: + - '{{$.inputs.parameters[''metadata'']}}' + - '{{$.inputs.parameters[''metadata-2'']}}' + - '{{$.inputs.parameters[''metadata-3'']}}' + name: prefix-{{$.inputs.parameters['metadata']}} + '{{$.inputs.parameters[''metadata'']}}': '{{$.inputs.parameters[''metadata'']}}' + '{{$.inputs.parameters[''metadata-2'']}}': us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-make-name: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_name + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.4'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_name(name: str) -> str:\n return name\n\n" + image: python:3.7 +pipelineInfo: + name: pipeline-with-importer +root: + dag: + tasks: + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + metadata: + componentInputParameter: name + metadata-2: + componentInputParameter: pipeline_input_image_uri + uri: + componentInputParameter: pipeline_input_artifact_uri + taskInfo: + name: importer + importer-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer-2 + dependentTasks: + - make-name + inputs: + parameters: + metadata: + taskOutputParameter: + outputParameterKey: Output + producerTask: make-name + metadata-2: + componentInputParameter: name + metadata-3: + componentInputParameter: int_input + uri: + runtimeValue: + constant: gs://ml-pipeline-playground/shakespeare1.txt + taskInfo: + name: importer-2 + make-name: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-name + inputs: + parameters: + name: + runtimeValue: + constant: a-different-name + taskInfo: + name: make-name + inputDefinitions: + parameters: + int_input: + defaultValue: 1.0 + parameterType: NUMBER_INTEGER + name: + defaultValue: default-name + parameterType: STRING + pipeline_input_artifact_uri: + defaultValue: gs://ml-pipeline-playground/shakespeare1.txt + parameterType: STRING + pipeline_input_image_uri: + defaultValue: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.4 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index 7c6e1b40dd4..a173fdf4677 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -138,6 +138,9 @@ pipelines: - module: pipeline_with_google_artifact_type name: my_pipeline execute: false + - module: pipeline_with_dynamic_importer_metadata + name: my_pipeline + execute: false components: test_data_dir: sdk/python/test_data/components read: true