From cd50f3d64412a7ebedb48478e57278032dc5776e Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 10 Aug 2022 10:57:07 -0600 Subject: [PATCH 01/13] support placeholder in metadata dict --- sdk/python/kfp/components/importer_node.py | 66 ++++++++++++++++++++-- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index 3c1dba652cf..b9a77f17900 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -1,4 +1,4 @@ -# Copyright 2020 The Kubeflow Authors +# Copyright 2020-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,8 @@ # limitations under the License. """Utility function for building Importer Node spec.""" -from typing import Any, Mapping, Optional, Type, Union +import sys +from typing import Any, Dict, List, Mapping, Optional, Type, Union from kfp.components import importer_component from kfp.components import pipeline_channel @@ -56,6 +57,42 @@ def pipeline_with_importer(): train(dataset=importer1.output) """ + metadata_pipeline_param_channels = set() + + def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: + if isinstance(d, pipeline_channel.PipelineParameterChannel): + metadata_pipeline_param_channels.add(d) + result = placeholders.InputValuePlaceholder( + input_name=d.name).to_placeholder_string() + return result + elif isinstance(d, dict): + return { + traverse_dict_and_create_metadata_inputs(k): + traverse_dict_and_create_metadata_inputs(v) + for k, v in d.items() + } + + elif isinstance(d, list): + return [traverse_dict_and_create_metadata_inputs(el) for el in d] + else: + return d + + metadata_with_placeholders = traverse_dict_and_create_metadata_inputs( + metadata) + + component_inputs: Dict[str, structures.InputSpec] = {} + call_inputs: Dict[str, Any] = {} + + for pipeline_param_channel in metadata_pipeline_param_channels: + unique_name = make_placeholder_unique( + pipeline_param_channel.name, + list(component_inputs), + '-', + ) + component_inputs[unique_name] = structures.InputSpec( + type=pipeline_param_channel.channel_type) + call_inputs[unique_name] = pipeline_param_channel + component_spec = structures.ComponentSpec( name='importer', implementation=structures.Implementation( @@ -66,8 +103,11 @@ def pipeline_with_importer(): artifact_class.schema_title, artifact_class.schema_version), schema_version=artifact_class.schema_version, reimport=reimport, - metadata=metadata)), - inputs={INPUT_KEY: structures.InputSpec(type='String')}, + metadata=metadata_with_placeholders)), + inputs={ + INPUT_KEY: structures.InputSpec(type='String'), + **component_inputs + }, outputs={ OUTPUT_KEY: structures.OutputSpec( @@ -76,7 +116,21 @@ def pipeline_with_importer(): artifact_class.schema_version)) }, ) - importer = importer_component.ImporterComponent( component_spec=component_spec) - return importer(uri=artifact_uri) + return importer(uri=artifact_uri, **call_inputs) + + +def make_placeholder_unique( + name: str, + collection: List[str], + delimiter: str, +) -> str: + """Makes a unique name by adding index.""" + unique_name = name + if unique_name in collection: + for i in range(2, sys.maxsize**10): + unique_name = name + delimiter + str(i) + if unique_name not in collection: + break + return unique_name \ No newline at end of file From d9b57b9b4d791bd296735c21a733c11426544781 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 10 Aug 2022 10:57:26 -0600 Subject: [PATCH 02/13] add unit tests --- .../kfp/components/importer_node_test.py | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 sdk/python/kfp/components/importer_node_test.py diff --git a/sdk/python/kfp/components/importer_node_test.py b/sdk/python/kfp/components/importer_node_test.py new file mode 100644 index 00000000000..54bfb87bd0b --- /dev/null +++ b/sdk/python/kfp/components/importer_node_test.py @@ -0,0 +1,157 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest + +from kfp import compiler +from kfp import dsl +from kfp.components import importer_node +from kfp.components.types.artifact_types import Dataset + + +class TestImporterSupportsDynamicMetadata(unittest.TestCase): + + def test_dynamic_dict_element_from_pipeline_param(self): + + @dsl.pipeline() + def my_pipeline(meta_inp: str): + + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'string': meta_inp, + 'string-2': meta_inp + }) + + pipeline_spec = compiler.Compiler()._create_pipeline( + pipeline_func=my_pipeline) + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('meta_inp', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual(metadata.struct_value.fields['string'].string_value, + "{{$.inputs.parameters['meta_inp']}}") + self.assertEqual(metadata.struct_value.fields['string-2'].string_value, + "{{$.inputs.parameters['meta_inp-2']}}") + + def test_dynamic_list_element_from_pipeline_param(self): + + @dsl.pipeline() + def my_pipeline(meta_inp: str): + + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'outer_key': [meta_inp, meta_inp], + meta_inp: meta_inp + }) + + pipeline_spec = compiler.Compiler()._create_pipeline( + pipeline_func=my_pipeline) + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('meta_inp', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[0] + .string_value, "{{$.inputs.parameters['meta_inp']}}") + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[1] + .string_value, "{{$.inputs.parameters['meta_inp-2']}}") + self.assertEqual( + metadata.struct_value + .fields["{{$.inputs.parameters['meta_inp-4']}}"].string_value, + "{{$.inputs.parameters['meta_inp-3']}}") + + def test_dynamic_dict_element_from_task_output(self): + + @dsl.component + def string_task(string: str) -> str: + return 'string' + + @dsl.pipeline() + def my_pipeline(): + task1 = string_task(string='string1') + task2 = string_task(string='string2') + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'string-1': task1.output, + 'string-2': task2.output + }) + + pipeline_spec = compiler.Compiler()._create_pipeline( + pipeline_func=my_pipeline) + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('Output', input_keys) + self.assertIn('Output-2', input_keys) + + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual(metadata.struct_value.fields['string-1'].string_value, + "{{$.inputs.parameters['Output']}}") + self.assertEqual(metadata.struct_value.fields['string-2'].string_value, + "{{$.inputs.parameters['Output-2']}}") + + def test_dynamic_list_element_from_task_output(self): + + @dsl.component + def string_task() -> str: + return 'string' + + @dsl.pipeline() + def my_pipeline(): + task = string_task() + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + 'outer_key': [task.output, task.output], + task.output: task.output + }) + + pipeline_spec = compiler.Compiler()._create_pipeline( + pipeline_func=my_pipeline) + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('Output', input_keys) + self.assertIn('Output-2', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[0] + .string_value, "{{$.inputs.parameters['Output']}}") + self.assertEqual( + metadata.struct_value.fields['outer_key'].list_value.values[1] + .string_value, "{{$.inputs.parameters['Output-2']}}") + self.assertEqual( + metadata.struct_value.fields["{{$.inputs.parameters['Output-4']}}"] + .string_value, "{{$.inputs.parameters['Output-3']}}") From 3c9c47935f60b615e2b711222fb2fe85ebc3651e Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 10 Aug 2022 11:24:53 -0600 Subject: [PATCH 03/13] add compilation test --- ...pipeline_with_dynamic_importer_metadata.py | 64 ++++++ ...peline_with_dynamic_importer_metadata.yaml | 182 ++++++++++++++++++ sdk/python/test_data/test_data_config.yaml | 3 + 3 files changed, 249 insertions(+) create mode 100644 sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py create mode 100644 sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py new file mode 100644 index 00000000000..d4171ac3861 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py @@ -0,0 +1,64 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline using dsl.importer, with dynamic metadata.""" + +from kfp import compiler +from kfp import dsl +from kfp.dsl import Dataset +from kfp.dsl import importer + +DEFAULT_ARTIFACT_URI = 'gs://ml-pipeline-playground/shakespeare1.txt' +DEFAULT_IMAGE_URI = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest' + + +@dsl.component +def make_name(name: str) -> str: + return name + + +@dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root') +def my_pipeline(name: str, + pipeline_input_artifact_uri: str = DEFAULT_ARTIFACT_URI, + pipeline_input_image_uri: str = DEFAULT_IMAGE_URI): + + importer1 = importer( + artifact_uri=pipeline_input_artifact_uri, + artifact_class=Dataset, + reimport=False, + metadata={ + 'name': name, + 'containerSpec': { + 'imageUri': pipeline_input_image_uri + } + }) + + make_name_op = make_name(name='the_name') + + importer2 = importer( + artifact_uri=DEFAULT_ARTIFACT_URI, + artifact_class=Dataset, + reimport=False, + metadata={ + 'name': make_name_op.output, + 'list_field': [make_name_op.output, make_name_op.output], + make_name_op.output: make_name_op.output, + 'containerSpec': { + 'imageUri': DEFAULT_IMAGE_URI + } + }) + + +if __name__ == '__main__': + ir_file = __file__.replace('.py', '.yaml') + compiler.Compiler().compile(pipeline_func=my_pipeline, package_path=ir_file) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml new file mode 100644 index 00000000000..314d4cdfb54 --- /dev/null +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml @@ -0,0 +1,182 @@ +components: + comp-importer: + executorLabel: exec-importer + inputDefinitions: + parameters: + name: + parameterType: STRING + pipeline_input_image_uri: + parameterType: STRING + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-importer-2: + executorLabel: exec-importer-2 + inputDefinitions: + parameters: + Output: + parameterType: STRING + Output-2: + parameterType: STRING + Output-3: + parameterType: STRING + Output-4: + parameterType: STRING + Output-5: + parameterType: STRING + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-name: + executorLabel: exec-make-name + inputDefinitions: + parameters: + name: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +defaultPipelineRoot: dummy_root +deploymentSpec: + executors: + exec-importer: + importer: + artifactUri: + runtimeParameter: uri + metadata: + containerSpec: + imageUri: '{{$.inputs.parameters[''pipeline_input_image_uri'']}}' + name: '{{$.inputs.parameters[''name'']}}' + typeSchema: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + exec-importer-2: + importer: + artifactUri: + constant: gs://ml-pipeline-playground/shakespeare1.txt + metadata: + containerSpec: + imageUri: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest + list_field: + - '{{$.inputs.parameters[''Output-2'']}}' + - '{{$.inputs.parameters[''Output-3'']}}' + name: '{{$.inputs.parameters[''Output'']}}' + '{{$.inputs.parameters[''Output-5'']}}': '{{$.inputs.parameters[''Output-4'']}}' + typeSchema: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + exec-make-name: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_name + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.1'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_name(name: str) -> str:\n return name\n\n" + image: python:3.7 +pipelineInfo: + name: pipeline-with-importer +root: + dag: + tasks: + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + name: + componentInputParameter: name + pipeline_input_image_uri: + componentInputParameter: pipeline_input_image_uri + uri: + componentInputParameter: pipeline_input_artifact_uri + taskInfo: + name: importer + importer-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer-2 + dependentTasks: + - make-name + inputs: + parameters: + Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: make-name + Output-2: + taskOutputParameter: + outputParameterKey: Output + producerTask: make-name + Output-3: + taskOutputParameter: + outputParameterKey: Output + producerTask: make-name + Output-4: + taskOutputParameter: + outputParameterKey: Output + producerTask: make-name + Output-5: + taskOutputParameter: + outputParameterKey: Output + producerTask: make-name + uri: + runtimeValue: + constant: gs://ml-pipeline-playground/shakespeare1.txt + taskInfo: + name: importer-2 + make-name: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-name + inputs: + parameters: + name: + runtimeValue: + constant: the_name + taskInfo: + name: make-name + inputDefinitions: + parameters: + name: + parameterType: STRING + pipeline_input_artifact_uri: + defaultValue: gs://ml-pipeline-playground/shakespeare1.txt + parameterType: STRING + pipeline_input_image_uri: + defaultValue: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index 7c6e1b40dd4..a173fdf4677 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -138,6 +138,9 @@ pipelines: - module: pipeline_with_google_artifact_type name: my_pipeline execute: false + - module: pipeline_with_dynamic_importer_metadata + name: my_pipeline + execute: false components: test_data_dir: sdk/python/test_data/components read: true From 57207868b442ec71e2e1e4623d7e4dbf57c4a451 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 31 Aug 2022 10:36:42 -0600 Subject: [PATCH 04/13] fix tests --- sdk/python/kfp/components/importer_node_test.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sdk/python/kfp/components/importer_node_test.py b/sdk/python/kfp/components/importer_node_test.py index 54bfb87bd0b..cdc37410081 100644 --- a/sdk/python/kfp/components/importer_node_test.py +++ b/sdk/python/kfp/components/importer_node_test.py @@ -13,7 +13,6 @@ # limitations under the License. import unittest -from kfp import compiler from kfp import dsl from kfp.components import importer_node from kfp.components.types.artifact_types import Dataset @@ -35,8 +34,7 @@ def my_pipeline(meta_inp: str): 'string-2': meta_inp }) - pipeline_spec = compiler.Compiler()._create_pipeline( - pipeline_func=my_pipeline) + pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) self.assertIn('meta_inp', input_keys) @@ -63,8 +61,7 @@ def my_pipeline(meta_inp: str): meta_inp: meta_inp }) - pipeline_spec = compiler.Compiler()._create_pipeline( - pipeline_func=my_pipeline) + pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) self.assertIn('meta_inp', input_keys) @@ -102,8 +99,7 @@ def my_pipeline(): 'string-2': task2.output }) - pipeline_spec = compiler.Compiler()._create_pipeline( - pipeline_func=my_pipeline) + pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) self.assertIn('Output', input_keys) @@ -136,8 +132,7 @@ def my_pipeline(): task.output: task.output }) - pipeline_spec = compiler.Compiler()._create_pipeline( - pipeline_func=my_pipeline) + pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) self.assertIn('Output', input_keys) From c0f62fe4b113537de9c4be6727cc0ce95f201bd6 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Mon, 12 Sep 2022 17:14:06 -0600 Subject: [PATCH 05/13] update compiler test --- ...pipeline_with_dynamic_importer_metadata.py | 9 ++-- ...peline_with_dynamic_importer_metadata.yaml | 43 +++++-------------- 2 files changed, 16 insertions(+), 36 deletions(-) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py index d4171ac3861..c2dba53097b 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py @@ -28,7 +28,7 @@ def make_name(name: str) -> str: @dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root') -def my_pipeline(name: str, +def my_pipeline(name: str = 'hi', pipeline_input_artifact_uri: str = DEFAULT_ARTIFACT_URI, pipeline_input_image_uri: str = DEFAULT_IMAGE_URI): @@ -37,7 +37,7 @@ def my_pipeline(name: str, artifact_class=Dataset, reimport=False, metadata={ - 'name': name, + 'name': [name, name], 'containerSpec': { 'imageUri': pipeline_input_image_uri } @@ -60,5 +60,6 @@ def my_pipeline(name: str, if __name__ == '__main__': - ir_file = __file__.replace('.py', '.yaml') - compiler.Compiler().compile(pipeline_func=my_pipeline, package_path=ir_file) + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml index 314d4cdfb54..6851a5fedb9 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml @@ -21,14 +21,6 @@ components: parameters: Output: parameterType: STRING - Output-2: - parameterType: STRING - Output-3: - parameterType: STRING - Output-4: - parameterType: STRING - Output-5: - parameterType: STRING uri: parameterType: STRING outputDefinitions: @@ -57,9 +49,11 @@ deploymentSpec: metadata: containerSpec: imageUri: '{{$.inputs.parameters[''pipeline_input_image_uri'']}}' - name: '{{$.inputs.parameters[''name'']}}' + name: + - '{{$.inputs.parameters[''name'']}}' + - '{{$.inputs.parameters[''name'']}}' typeSchema: - schemaTitle: system.Artifact + schemaTitle: system.Dataset schemaVersion: 0.0.1 exec-importer-2: importer: @@ -69,12 +63,12 @@ deploymentSpec: containerSpec: imageUri: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest list_field: - - '{{$.inputs.parameters[''Output-2'']}}' - - '{{$.inputs.parameters[''Output-3'']}}' + - '{{$.inputs.parameters[''Output'']}}' + - '{{$.inputs.parameters[''Output'']}}' name: '{{$.inputs.parameters[''Output'']}}' - '{{$.inputs.parameters[''Output-5'']}}': '{{$.inputs.parameters[''Output-4'']}}' + '{{$.inputs.parameters[''Output'']}}': '{{$.inputs.parameters[''Output'']}}' typeSchema: - schemaTitle: system.Artifact + schemaTitle: system.Dataset schemaVersion: 0.0.1 exec-make-name: container: @@ -88,7 +82,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.1'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.0.0-beta.4'\ \ && \"$0\" \"$@\"\n" - sh - -ec @@ -135,22 +129,6 @@ root: taskOutputParameter: outputParameterKey: Output producerTask: make-name - Output-2: - taskOutputParameter: - outputParameterKey: Output - producerTask: make-name - Output-3: - taskOutputParameter: - outputParameterKey: Output - producerTask: make-name - Output-4: - taskOutputParameter: - outputParameterKey: Output - producerTask: make-name - Output-5: - taskOutputParameter: - outputParameterKey: Output - producerTask: make-name uri: runtimeValue: constant: gs://ml-pipeline-playground/shakespeare1.txt @@ -171,6 +149,7 @@ root: inputDefinitions: parameters: name: + defaultValue: hi parameterType: STRING pipeline_input_artifact_uri: defaultValue: gs://ml-pipeline-playground/shakespeare1.txt @@ -179,4 +158,4 @@ root: defaultValue: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest parameterType: STRING schemaVersion: 2.1.0 -sdkVersion: kfp-2.0.0-beta.1 +sdkVersion: kfp-2.0.0-beta.4 From 35ff77c77473309f80fa0276f9510e229a0371b1 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 14 Sep 2022 17:31:35 -0600 Subject: [PATCH 06/13] use 'metadata' as input key --- sdk/python/kfp/components/importer_node.py | 47 +++++++++++-------- .../kfp/components/importer_node_test.py | 43 +++++++++-------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index b9a77f17900..f16cba458bf 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -26,6 +26,7 @@ INPUT_KEY = 'uri' OUTPUT_KEY = 'artifact' +METADATA_KEY = 'metadata' def importer( @@ -57,14 +58,35 @@ def pipeline_with_importer(): train(dataset=importer1.output) """ - metadata_pipeline_param_channels = set() + component_inputs: Dict[str, structures.InputSpec] = {} + call_inputs: Dict[str, Any] = {} def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: if isinstance(d, pipeline_channel.PipelineParameterChannel): - metadata_pipeline_param_channels.add(d) - result = placeholders.InputValuePlaceholder( - input_name=d.name).to_placeholder_string() - return result + reversed_call_inputs = { + pipeline_param_chan: name + for name, pipeline_param_chan in call_inputs.items() + } + + # minimizes importer spec interface by not creating new + # inputspec/parameters if the same input is used multiple places + # in metadata + unique_name = reversed_call_inputs.get( + d, + make_placeholder_unique( + METADATA_KEY, + list(call_inputs), + '-', + ), + ) + + call_inputs[unique_name] = d + component_inputs[unique_name] = structures.InputSpec( + type=d.channel_type) + + return placeholders.InputValuePlaceholder( + input_name=unique_name).to_placeholder_string() + elif isinstance(d, dict): return { traverse_dict_and_create_metadata_inputs(k): @@ -80,19 +102,6 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: metadata_with_placeholders = traverse_dict_and_create_metadata_inputs( metadata) - component_inputs: Dict[str, structures.InputSpec] = {} - call_inputs: Dict[str, Any] = {} - - for pipeline_param_channel in metadata_pipeline_param_channels: - unique_name = make_placeholder_unique( - pipeline_param_channel.name, - list(component_inputs), - '-', - ) - component_inputs[unique_name] = structures.InputSpec( - type=pipeline_param_channel.channel_type) - call_inputs[unique_name] = pipeline_param_channel - component_spec = structures.ComponentSpec( name='importer', implementation=structures.Implementation( @@ -133,4 +142,4 @@ def make_placeholder_unique( unique_name = name + delimiter + str(i) if unique_name not in collection: break - return unique_name \ No newline at end of file + return unique_name diff --git a/sdk/python/kfp/components/importer_node_test.py b/sdk/python/kfp/components/importer_node_test.py index cdc37410081..7b63ce2e790 100644 --- a/sdk/python/kfp/components/importer_node_test.py +++ b/sdk/python/kfp/components/importer_node_test.py @@ -37,48 +37,47 @@ def my_pipeline(meta_inp: str): pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) - self.assertIn('meta_inp', input_keys) + self.assertIn('metadata', input_keys) deployment_spec = pipeline_spec.deployment_spec.fields[ 'executors'].struct_value.fields['exec-importer'] metadata = deployment_spec.struct_value.fields[ 'importer'].struct_value.fields['metadata'] self.assertEqual(metadata.struct_value.fields['string'].string_value, - "{{$.inputs.parameters['meta_inp']}}") + "{{$.inputs.parameters['metadata']}}") self.assertEqual(metadata.struct_value.fields['string-2'].string_value, - "{{$.inputs.parameters['meta_inp-2']}}") + "{{$.inputs.parameters['metadata']}}") def test_dynamic_list_element_from_pipeline_param(self): @dsl.pipeline() - def my_pipeline(meta_inp: str): + def my_pipeline(meta_inp1: str, meta_inp2: int): dataset = importer_node.importer( 'gs://my_bucket', Dataset, reimport=True, metadata={ - 'outer_key': [meta_inp, meta_inp], - meta_inp: meta_inp + 'outer_key': [meta_inp1, meta_inp2], + meta_inp1: meta_inp1 }) pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) - self.assertIn('meta_inp', input_keys) + self.assertIn('metadata', input_keys) deployment_spec = pipeline_spec.deployment_spec.fields[ 'executors'].struct_value.fields['exec-importer'] metadata = deployment_spec.struct_value.fields[ 'importer'].struct_value.fields['metadata'] self.assertEqual( metadata.struct_value.fields['outer_key'].list_value.values[0] - .string_value, "{{$.inputs.parameters['meta_inp']}}") + .string_value, "{{$.inputs.parameters['metadata']}}") self.assertEqual( metadata.struct_value.fields['outer_key'].list_value.values[1] - .string_value, "{{$.inputs.parameters['meta_inp-2']}}") + .string_value, "{{$.inputs.parameters['metadata-2']}}") self.assertEqual( - metadata.struct_value - .fields["{{$.inputs.parameters['meta_inp-4']}}"].string_value, - "{{$.inputs.parameters['meta_inp-3']}}") + metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"] + .string_value, "{{$.inputs.parameters['metadata']}}") def test_dynamic_dict_element_from_task_output(self): @@ -102,17 +101,17 @@ def my_pipeline(): pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) - self.assertIn('Output', input_keys) - self.assertIn('Output-2', input_keys) + self.assertIn('metadata', input_keys) + self.assertIn('metadata-2', input_keys) deployment_spec = pipeline_spec.deployment_spec.fields[ 'executors'].struct_value.fields['exec-importer'] metadata = deployment_spec.struct_value.fields[ 'importer'].struct_value.fields['metadata'] self.assertEqual(metadata.struct_value.fields['string-1'].string_value, - "{{$.inputs.parameters['Output']}}") + "{{$.inputs.parameters['metadata']}}") self.assertEqual(metadata.struct_value.fields['string-2'].string_value, - "{{$.inputs.parameters['Output-2']}}") + "{{$.inputs.parameters['metadata-2']}}") def test_dynamic_list_element_from_task_output(self): @@ -135,18 +134,18 @@ def my_pipeline(): pipeline_spec = my_pipeline.pipeline_spec input_keys = list(pipeline_spec.components['comp-importer'] .input_definitions.parameters.keys()) - self.assertIn('Output', input_keys) - self.assertIn('Output-2', input_keys) + self.assertIn('metadata', input_keys) + self.assertNotIn('metadata-2', input_keys) deployment_spec = pipeline_spec.deployment_spec.fields[ 'executors'].struct_value.fields['exec-importer'] metadata = deployment_spec.struct_value.fields[ 'importer'].struct_value.fields['metadata'] self.assertEqual( metadata.struct_value.fields['outer_key'].list_value.values[0] - .string_value, "{{$.inputs.parameters['Output']}}") + .string_value, "{{$.inputs.parameters['metadata']}}") self.assertEqual( metadata.struct_value.fields['outer_key'].list_value.values[1] - .string_value, "{{$.inputs.parameters['Output-2']}}") + .string_value, "{{$.inputs.parameters['metadata']}}") self.assertEqual( - metadata.struct_value.fields["{{$.inputs.parameters['Output-4']}}"] - .string_value, "{{$.inputs.parameters['Output-3']}}") + metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"] + .string_value, "{{$.inputs.parameters['metadata']}}") From 69bfe6efd06f8f312d851804e81d7398bdb5c387 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 14 Sep 2022 17:31:53 -0600 Subject: [PATCH 07/13] update compiler test and golden snapshot --- ...pipeline_with_dynamic_importer_metadata.py | 9 +++-- ...peline_with_dynamic_importer_metadata.yaml | 37 +++++++++++-------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py index c2dba53097b..cbb2e3f89e5 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py @@ -28,7 +28,7 @@ def make_name(name: str) -> str: @dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root') -def my_pipeline(name: str = 'hi', +def my_pipeline(name: str = 'default-name', pipeline_input_artifact_uri: str = DEFAULT_ARTIFACT_URI, pipeline_input_image_uri: str = DEFAULT_IMAGE_URI): @@ -37,13 +37,13 @@ def my_pipeline(name: str = 'hi', artifact_class=Dataset, reimport=False, metadata={ - 'name': [name, name], + 'name': [name, 'alias-name'], 'containerSpec': { 'imageUri': pipeline_input_image_uri } }) - make_name_op = make_name(name='the_name') + make_name_op = make_name(name='a-different-name') importer2 = importer( artifact_uri=DEFAULT_ARTIFACT_URI, @@ -51,8 +51,9 @@ def my_pipeline(name: str = 'hi', reimport=False, metadata={ 'name': make_name_op.output, - 'list_field': [make_name_op.output, make_name_op.output], + 'list-of-strings': [make_name_op.output, name], make_name_op.output: make_name_op.output, + name: DEFAULT_IMAGE_URI, 'containerSpec': { 'imageUri': DEFAULT_IMAGE_URI } diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml index 6851a5fedb9..6fd366f72f7 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml @@ -3,9 +3,9 @@ components: executorLabel: exec-importer inputDefinitions: parameters: - name: + metadata: parameterType: STRING - pipeline_input_image_uri: + metadata-2: parameterType: STRING uri: parameterType: STRING @@ -19,7 +19,9 @@ components: executorLabel: exec-importer-2 inputDefinitions: parameters: - Output: + metadata: + parameterType: STRING + metadata-2: parameterType: STRING uri: parameterType: STRING @@ -48,10 +50,10 @@ deploymentSpec: runtimeParameter: uri metadata: containerSpec: - imageUri: '{{$.inputs.parameters[''pipeline_input_image_uri'']}}' + imageUri: '{{$.inputs.parameters[''metadata-2'']}}' name: - - '{{$.inputs.parameters[''name'']}}' - - '{{$.inputs.parameters[''name'']}}' + - '{{$.inputs.parameters[''metadata'']}}' + - alias-name typeSchema: schemaTitle: system.Dataset schemaVersion: 0.0.1 @@ -62,11 +64,12 @@ deploymentSpec: metadata: containerSpec: imageUri: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest - list_field: - - '{{$.inputs.parameters[''Output'']}}' - - '{{$.inputs.parameters[''Output'']}}' - name: '{{$.inputs.parameters[''Output'']}}' - '{{$.inputs.parameters[''Output'']}}': '{{$.inputs.parameters[''Output'']}}' + list-of-strings: + - '{{$.inputs.parameters[''metadata'']}}' + - '{{$.inputs.parameters[''metadata-2'']}}' + name: '{{$.inputs.parameters[''metadata'']}}' + '{{$.inputs.parameters[''metadata'']}}': '{{$.inputs.parameters[''metadata'']}}' + '{{$.inputs.parameters[''metadata-2'']}}': us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest typeSchema: schemaTitle: system.Dataset schemaVersion: 0.0.1 @@ -108,9 +111,9 @@ root: name: comp-importer inputs: parameters: - name: + metadata: componentInputParameter: name - pipeline_input_image_uri: + metadata-2: componentInputParameter: pipeline_input_image_uri uri: componentInputParameter: pipeline_input_artifact_uri @@ -125,10 +128,12 @@ root: - make-name inputs: parameters: - Output: + metadata: taskOutputParameter: outputParameterKey: Output producerTask: make-name + metadata-2: + componentInputParameter: name uri: runtimeValue: constant: gs://ml-pipeline-playground/shakespeare1.txt @@ -143,13 +148,13 @@ root: parameters: name: runtimeValue: - constant: the_name + constant: a-different-name taskInfo: name: make-name inputDefinitions: parameters: name: - defaultValue: hi + defaultValue: default-name parameterType: STRING pipeline_input_artifact_uri: defaultValue: gs://ml-pipeline-playground/shakespeare1.txt From 4cb6e2f1be9cb99e0c7793f2c240c2da06edd3bb Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 14 Sep 2022 17:33:37 -0600 Subject: [PATCH 08/13] add int input to compiler test --- .../pipeline_with_dynamic_importer_metadata.py | 3 ++- .../pipeline_with_dynamic_importer_metadata.yaml | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py index cbb2e3f89e5..76af9145e5d 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py @@ -29,6 +29,7 @@ def make_name(name: str) -> str: @dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root') def my_pipeline(name: str = 'default-name', + int_input: int = 1, pipeline_input_artifact_uri: str = DEFAULT_ARTIFACT_URI, pipeline_input_image_uri: str = DEFAULT_IMAGE_URI): @@ -51,7 +52,7 @@ def my_pipeline(name: str = 'default-name', reimport=False, metadata={ 'name': make_name_op.output, - 'list-of-strings': [make_name_op.output, name], + 'list-of-data': [make_name_op.output, name, int_input], make_name_op.output: make_name_op.output, name: DEFAULT_IMAGE_URI, 'containerSpec': { diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml index 6fd366f72f7..80f6dfb4a2f 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml @@ -23,6 +23,8 @@ components: parameterType: STRING metadata-2: parameterType: STRING + metadata-3: + parameterType: NUMBER_INTEGER uri: parameterType: STRING outputDefinitions: @@ -64,9 +66,10 @@ deploymentSpec: metadata: containerSpec: imageUri: us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest - list-of-strings: + list-of-data: - '{{$.inputs.parameters[''metadata'']}}' - '{{$.inputs.parameters[''metadata-2'']}}' + - '{{$.inputs.parameters[''metadata-3'']}}' name: '{{$.inputs.parameters[''metadata'']}}' '{{$.inputs.parameters[''metadata'']}}': '{{$.inputs.parameters[''metadata'']}}' '{{$.inputs.parameters[''metadata-2'']}}': us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest @@ -134,6 +137,8 @@ root: producerTask: make-name metadata-2: componentInputParameter: name + metadata-3: + componentInputParameter: int_input uri: runtimeValue: constant: gs://ml-pipeline-playground/shakespeare1.txt @@ -153,6 +158,9 @@ root: name: make-name inputDefinitions: parameters: + int_input: + defaultValue: 1.0 + parameterType: NUMBER_INTEGER name: defaultValue: default-name parameterType: STRING From 5bc0b35c4b98615628f4ad77595145217ff147ab Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Wed, 14 Sep 2022 17:59:58 -0600 Subject: [PATCH 09/13] change constant name --- sdk/python/kfp/components/importer_node.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index f16cba458bf..76fc466670b 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -24,7 +24,7 @@ from kfp.components.types import artifact_types from kfp.components.types import type_utils -INPUT_KEY = 'uri' +URI_KEY = 'uri' OUTPUT_KEY = 'artifact' METADATA_KEY = 'metadata' @@ -107,14 +107,14 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: implementation=structures.Implementation( importer=structures.ImporterSpec( artifact_uri=placeholders.InputValuePlaceholder( - INPUT_KEY)._to_placeholder_string(), + URI_KEY).to_placeholder_string(), schema_title=type_utils.create_bundled_artifact_type( artifact_class.schema_title, artifact_class.schema_version), schema_version=artifact_class.schema_version, reimport=reimport, metadata=metadata_with_placeholders)), inputs={ - INPUT_KEY: structures.InputSpec(type='String'), + URI_KEY: structures.InputSpec(type='String'), **component_inputs }, outputs={ From 4b670df15aff711c854d0ce5978160d29d5356b9 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Mon, 19 Sep 2022 13:15:52 -0600 Subject: [PATCH 10/13] use util function --- sdk/python/kfp/components/importer_node.py | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index 76fc466670b..dd670d4e7d8 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -13,14 +13,14 @@ # limitations under the License. """Utility function for building Importer Node spec.""" -import sys -from typing import Any, Dict, List, Mapping, Optional, Type, Union +from typing import Any, Dict, Mapping, Optional, Type, Union from kfp.components import importer_component from kfp.components import pipeline_channel from kfp.components import pipeline_task from kfp.components import placeholders from kfp.components import structures +from kfp.components import utils from kfp.components.types import artifact_types from kfp.components.types import type_utils @@ -73,7 +73,7 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: # in metadata unique_name = reversed_call_inputs.get( d, - make_placeholder_unique( + utils.make_name_unique_by_adding_index( METADATA_KEY, list(call_inputs), '-', @@ -128,18 +128,3 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: importer = importer_component.ImporterComponent( component_spec=component_spec) return importer(uri=artifact_uri, **call_inputs) - - -def make_placeholder_unique( - name: str, - collection: List[str], - delimiter: str, -) -> str: - """Makes a unique name by adding index.""" - unique_name = name - if unique_name in collection: - for i in range(2, sys.maxsize**10): - unique_name = name + delimiter + str(i) - if unique_name not in collection: - break - return unique_name From 6cd08e8245ec08706bcbb001dd9144661263f2e6 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Mon, 19 Sep 2022 13:55:05 -0600 Subject: [PATCH 11/13] support f-strings in dynamic import metadata; add and update tests --- sdk/python/kfp/components/importer_node.py | 13 +++++++ .../kfp/components/importer_node_test.py | 35 +++++++++++++++++++ ...pipeline_with_dynamic_importer_metadata.py | 2 +- ...peline_with_dynamic_importer_metadata.yaml | 2 +- 4 files changed, 50 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index dd670d4e7d8..bf8ed6cbddc 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -96,6 +96,19 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: elif isinstance(d, list): return [traverse_dict_and_create_metadata_inputs(el) for el in d] + + elif isinstance(d, str): + # extract pipeline channels from f-strings, if any + pipeline_channels = pipeline_channel.extract_pipeline_channels_from_any( + d) + + # pass the channel back into the recursive function to create the placeholder, component inputs, and call inputs, then replace the channel with the placeholder + for channel in pipeline_channels: + input_placeholder = traverse_dict_and_create_metadata_inputs( + channel) + d = d.replace(channel.pattern, input_placeholder) + return d + else: return d diff --git a/sdk/python/kfp/components/importer_node_test.py b/sdk/python/kfp/components/importer_node_test.py index 7b63ce2e790..b8a613ab12a 100644 --- a/sdk/python/kfp/components/importer_node_test.py +++ b/sdk/python/kfp/components/importer_node_test.py @@ -149,3 +149,38 @@ def my_pipeline(): self.assertEqual( metadata.struct_value.fields["{{$.inputs.parameters['metadata']}}"] .string_value, "{{$.inputs.parameters['metadata']}}") + + def test_dynamic_fstring_in_metadata(self): + + @dsl.component + def string_task() -> str: + return 'string' + + @dsl.pipeline() + def my_pipeline(integer: int = 1): + task = string_task() + dataset = importer_node.importer( + 'gs://my_bucket', + Dataset, + reimport=True, + metadata={ + f'prefix1-{integer}': f'prefix2-{task.output}', + 'key': 'value' + }) + + pipeline_spec = my_pipeline.pipeline_spec + input_keys = list(pipeline_spec.components['comp-importer'] + .input_definitions.parameters.keys()) + self.assertIn('metadata', input_keys) + self.assertIn('metadata-2', input_keys) + self.assertNotIn('metadata-3', input_keys) + deployment_spec = pipeline_spec.deployment_spec.fields[ + 'executors'].struct_value.fields['exec-importer'] + metadata = deployment_spec.struct_value.fields[ + 'importer'].struct_value.fields['metadata'] + self.assertEqual( + metadata.struct_value.fields[ + "prefix1-{{$.inputs.parameters[\'metadata-2\']}}"].string_value, + "prefix2-{{$.inputs.parameters[\'metadata\']}}") + self.assertEqual(metadata.struct_value.fields['key'].string_value, + 'value') diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py index 76af9145e5d..099bd2d8dc1 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.py @@ -51,7 +51,7 @@ def my_pipeline(name: str = 'default-name', artifact_class=Dataset, reimport=False, metadata={ - 'name': make_name_op.output, + 'name': f'prefix-{make_name_op.output}', 'list-of-data': [make_name_op.output, name, int_input], make_name_op.output: make_name_op.output, name: DEFAULT_IMAGE_URI, diff --git a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml index 80f6dfb4a2f..b7a034e0a87 100644 --- a/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml +++ b/sdk/python/test_data/pipelines/pipeline_with_dynamic_importer_metadata.yaml @@ -70,7 +70,7 @@ deploymentSpec: - '{{$.inputs.parameters[''metadata'']}}' - '{{$.inputs.parameters[''metadata-2'']}}' - '{{$.inputs.parameters[''metadata-3'']}}' - name: '{{$.inputs.parameters[''metadata'']}}' + name: prefix-{{$.inputs.parameters['metadata']}} '{{$.inputs.parameters[''metadata'']}}': '{{$.inputs.parameters[''metadata'']}}' '{{$.inputs.parameters[''metadata-2'']}}': us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-5:latest typeSchema: From 2b98dd36746dbf18acdcf341fda9a3913fc0eb00 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Mon, 19 Sep 2022 14:02:04 -0600 Subject: [PATCH 12/13] update method name after rebase --- sdk/python/kfp/components/importer_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index bf8ed6cbddc..f474424100b 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -85,7 +85,7 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: type=d.channel_type) return placeholders.InputValuePlaceholder( - input_name=unique_name).to_placeholder_string() + input_name=unique_name)._to_placeholder_string() elif isinstance(d, dict): return { @@ -120,7 +120,7 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: implementation=structures.Implementation( importer=structures.ImporterSpec( artifact_uri=placeholders.InputValuePlaceholder( - URI_KEY).to_placeholder_string(), + URI_KEY)._to_placeholder_string(), schema_title=type_utils.create_bundled_artifact_type( artifact_class.schema_title, artifact_class.schema_version), schema_version=artifact_class.schema_version, From 0210fe474d4d9a9f86efc68f661fd73ccfb6c2d3 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Mon, 19 Sep 2022 14:16:42 -0600 Subject: [PATCH 13/13] make compile consistent across python versions --- sdk/python/kfp/components/importer_node.py | 12 +++++++----- sdk/python/kfp/components/importer_node_test.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdk/python/kfp/components/importer_node.py b/sdk/python/kfp/components/importer_node.py index f474424100b..17322872a37 100644 --- a/sdk/python/kfp/components/importer_node.py +++ b/sdk/python/kfp/components/importer_node.py @@ -88,11 +88,13 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: input_name=unique_name)._to_placeholder_string() elif isinstance(d, dict): - return { - traverse_dict_and_create_metadata_inputs(k): - traverse_dict_and_create_metadata_inputs(v) - for k, v in d.items() - } + # use this instead of list comprehension to ensure compiles are identical across Python versions + res = {} + for k, v in d.items(): + new_k = traverse_dict_and_create_metadata_inputs(k) + new_v = traverse_dict_and_create_metadata_inputs(v) + res[new_k] = new_v + return res elif isinstance(d, list): return [traverse_dict_and_create_metadata_inputs(el) for el in d] diff --git a/sdk/python/kfp/components/importer_node_test.py b/sdk/python/kfp/components/importer_node_test.py index b8a613ab12a..d6066b5a4da 100644 --- a/sdk/python/kfp/components/importer_node_test.py +++ b/sdk/python/kfp/components/importer_node_test.py @@ -180,7 +180,7 @@ def my_pipeline(integer: int = 1): 'importer'].struct_value.fields['metadata'] self.assertEqual( metadata.struct_value.fields[ - "prefix1-{{$.inputs.parameters[\'metadata-2\']}}"].string_value, - "prefix2-{{$.inputs.parameters[\'metadata\']}}") + "prefix1-{{$.inputs.parameters[\'metadata\']}}"].string_value, + "prefix2-{{$.inputs.parameters[\'metadata-2\']}}") self.assertEqual(metadata.struct_value.fields['key'].string_value, 'value')