From 410afb52b75abb54bbd90c24d52179bf246bbe4f Mon Sep 17 00:00:00 2001 From: Chen Sun Date: Mon, 11 Oct 2021 08:43:36 +0000 Subject: [PATCH] Implement PipelineTask __init__() --- .../test_data/experimental_v2_component.py | 30 +-- .../components/experimental/base_component.py | 4 +- .../experimental/base_component_test.py | 22 +- .../experimental/pipeline_channel_test.py | 2 +- .../components/experimental/pipeline_task.py | 248 +++++++++++++++++- .../experimental/pipeline_task_test.py | 214 +++++++++++++++ .../components/experimental/placeholders.py | 42 +++ .../experimental/placeholders_test.py | 67 +++++ .../{component_spec.py => structures.py} | 61 ++--- ...ponent_spec_test.py => structures_test.py} | 145 +++++----- .../kfp/v2/components/types/type_utils.py | 90 +++++++ .../v2/components/types/type_utils_test.py | 72 ++++- 12 files changed, 853 insertions(+), 144 deletions(-) create mode 100644 sdk/python/kfp/v2/components/experimental/pipeline_task_test.py create mode 100644 sdk/python/kfp/v2/components/experimental/placeholders.py create mode 100644 sdk/python/kfp/v2/components/experimental/placeholders_test.py rename sdk/python/kfp/v2/components/experimental/{component_spec.py => structures.py} (91%) rename sdk/python/kfp/v2/components/experimental/{component_spec_test.py => structures_test.py} (70%) diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/experimental_v2_component.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/experimental_v2_component.py index ff15cfd3e9b2..d0993c725f0e 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/experimental_v2_component.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/experimental_v2_component.py @@ -13,7 +13,7 @@ # limitations under the License. from kfp.v2.components.experimental import base_component -from kfp.v2.components.experimental import component_spec +from kfp.v2.components.experimental import structures from kfp.v2 import dsl from kfp.v2 import compiler @@ -25,21 +25,21 @@ def execute(self, *args, **kwargs): component_op = TestComponent( - component_spec=component_spec.ComponentSpec( + component_spec=structures.ComponentSpec( name='component_1', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( - image='alpine', - commands=[ - 'sh', - '-c', - 'set -ex\necho "$0" > "$1"', - component_spec.InputValuePlaceholder(input_name='input1'), - component_spec.OutputPathPlaceholder(output_name='output1'), - ], - )), - inputs={'input1': component_spec.InputSpec(type='String')}, - outputs={'output1': component_spec.OutputSpec(type='String')}, + implementation=structures.Implementation( + container=structures.ContainerSpec( + image='alpine', + commands=[ + 'sh', + '-c', + 'set -ex\necho "$0" > "$1"', + structures.InputValuePlaceholder(input_name='input1'), + structures.OutputPathPlaceholder(output_name='output1'), + ], + )), + inputs={'input1': structures.InputSpec(type='String')}, + outputs={'output1': structures.OutputSpec(type='String')}, )) diff --git a/sdk/python/kfp/v2/components/experimental/base_component.py b/sdk/python/kfp/v2/components/experimental/base_component.py index ee8e23f3771e..41bd79ec8248 100644 --- a/sdk/python/kfp/v2/components/experimental/base_component.py +++ b/sdk/python/kfp/v2/components/experimental/base_component.py @@ -15,7 +15,7 @@ import abc -from kfp.v2.components.experimental import component_spec as cspec +from kfp.v2.components.experimental import structures from kfp.v2.components.experimental import pipeline_task @@ -27,7 +27,7 @@ class BaseComponent(metaclass=abc.ABCMeta): component_spec: The component definition. """ - def __init__(self, component_spec: cspec.ComponentSpec): + def __init__(self, component_spec: structures.ComponentSpec): """Init function for BaseComponent. Args: diff --git a/sdk/python/kfp/v2/components/experimental/base_component_test.py b/sdk/python/kfp/v2/components/experimental/base_component_test.py index fc59da573ef4..2f9ac7a5f4b5 100644 --- a/sdk/python/kfp/v2/components/experimental/base_component_test.py +++ b/sdk/python/kfp/v2/components/experimental/base_component_test.py @@ -17,7 +17,7 @@ from unittest.mock import patch from kfp.v2.components.experimental import base_component -from kfp.v2.components.experimental import component_spec +from kfp.v2.components.experimental import structures from kfp.v2.components.experimental import pipeline_task @@ -28,27 +28,27 @@ def execute(self, *args, **kwargs): component_op = TestComponent( - component_spec=component_spec.ComponentSpec( + component_spec=structures.ComponentSpec( name='component_1', - implementation=component_spec.ContainerSpec( + implementation=structures.ContainerSpec( image='alpine', commands=[ 'sh', '-c', 'set -ex\necho "$0" "$1" "$2" > "$3"', - component_spec.InputValuePlaceholder(input_name='input1'), - component_spec.InputValuePlaceholder(input_name='input2'), - component_spec.InputValuePlaceholder(input_name='input3'), - component_spec.OutputPathPlaceholder(output_name='output1'), + structures.InputValuePlaceholder(input_name='input1'), + structures.InputValuePlaceholder(input_name='input2'), + structures.InputValuePlaceholder(input_name='input3'), + structures.OutputPathPlaceholder(output_name='output1'), ], ), inputs={ - 'input1': component_spec.InputSpec(type='String'), - 'input2': component_spec.InputSpec(type='Integer'), - 'input3': component_spec.InputSpec(type='Float', default=3.14), + 'input1': structures.InputSpec(type='String'), + 'input2': structures.InputSpec(type='Integer'), + 'input3': structures.InputSpec(type='Float', default=3.14), }, outputs={ - 'output1': component_spec.OutputSpec(name='output1', type='String'), + 'output1': structures.OutputSpec(name='output1', type='String'), }, )) diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py b/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py index ae4baccc0503..3df2c3c63f5b 100644 --- a/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py +++ b/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py @@ -16,7 +16,7 @@ import unittest from absl.testing import parameterized -from kfp.v2.components.experimental import component_spec, pipeline_channel +from kfp.v2.components.experimental import pipeline_channel class PipelineChannelTest(parameterized.TestCase): diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_task.py b/sdk/python/kfp/v2/components/experimental/pipeline_task.py index d7e22aaa00bf..5b1054de2f9a 100644 --- a/sdk/python/kfp/v2/components/experimental/pipeline_task.py +++ b/sdk/python/kfp/v2/components/experimental/pipeline_task.py @@ -13,15 +13,20 @@ # limitations under the License. """Pipeline task class and operations.""" -from typing import Any, Mapping, Optional +import collections +import copy +from typing import Any, List, Mapping, Optional, Union from kfp.dsl import _component_bridge -from kfp.v2.components.experimental import component_spec as cspec +from kfp.v2.components.experimental import pipeline_channel +from kfp.v2.components.experimental import placeholders +from kfp.v2.components.experimental import structures +from kfp.v2.components.types import type_utils # TODO(chensun): return PipelineTask object instead of ContainerOp object. def create_pipeline_task( - component_spec: cspec.ComponentSpec, + component_spec: structures.ComponentSpec, arguments: Mapping[str, Any], ) -> "ContainerOp": # pytype: disable=name-error return _component_bridge._create_container_op_from_component_and_arguments( @@ -37,15 +42,240 @@ class PipelineTask: `.after()`, `.set_memory_limit()`, `enable_caching()`, etc. Attributes: - task_spec: - component_spec: - executor_spec: + task_spec: The task spec of the task. + component_spec: The component spec of the task. + container_spec: The resolved container spec of the task. """ def __init__( self, - component_spec: cspec.ComponentSpec, + component_spec: structures.ComponentSpec, arguments: Mapping[str, Any], ): - # TODO(chensun): move logic from _component_bridge over here. - raise NotImplementedError + """Initilizes a PipelineTask instance. + + Args: + component_spec: The component definition. + arguments: The dictionary of component arguments. + """ + for input_name, argument_value in arguments.items(): + + if input_name not in component_spec.inputs: + raise ValueError( + f'Component "{component_spec.name}" got an unexpected input:' + f' {input_name}.') + + input_type = component_spec.inputs[input_name].type + argument_type = None + + if isinstance(argument_value, pipeline_channel.PipelineChannel): + argument_type = argument_value.channel_type + elif isinstance(argument_value, str): + argument_type = 'String' + elif isinstance(argument_value, int): + argument_type = 'Integer' + elif isinstance(argument_value, float): + argument_type = 'Float' + elif isinstance(argument_value, bool): + argument_type = 'Boolean' + elif isinstance(argument_value, dict): + argument_type = 'Dict' + elif isinstance(argument_value, list): + argument_type = 'List' + else: + raise ValueError( + 'Input argument supports only the following types: ' + 'str, int, float, bool, dict, and list. Got: ' + f'"{argument_value}" of type "{type(argument_value)}".') + + type_utils.verify_type_compatibility( + given_type=argument_type, + expected_type=input_type, + error_message_prefix=( + 'Incompatible argument passed to the input ' + f'"{input_name}" of component "{component_spec.name}": '), + ) + + self.component_spec = component_spec + + self.task_spec = structures.TaskSpec( + # The name of the task is subject to change due to component reuse. + name=component_spec.name, + inputs={ + input_name: value for input_name, value in arguments.items() + }, + dependent_tasks=[], + component_ref=component_spec.name, + enable_caching=True, + ) + + self.container_spec = self._resolve_command_line_and_arguments( + component_spec=component_spec, + arguments=arguments, + ) + + def _resolve_command_line_and_arguments( + self, + component_spec: structures.ComponentSpec, + arguments: Mapping[str, str], + ) -> structures.ContainerSpec: + """Resolves the command line argument placeholders in a container spec. + + Args: + component_spec: The component definition. + arguments: The dictionary of component arguments. + """ + argument_values = arguments + + if not component_spec.implementation.container: + raise TypeError( + 'Only container components have command line to resolve') + + component_inputs = component_spec.inputs or {} + inputs_dict = { + input_name: input_spec + for input_name, input_spec in component_inputs.items() + } + component_outputs = component_spec.outputs or {} + outputs_dict = { + output_name: output_spec + for output_name, output_spec in component_outputs.items() + } + + def expand_command_part(arg) -> Union[str, List[str], None]: + if arg is None: + return None + + if isinstance(arg, (str, int, float, bool)): + return str(arg) + + elif isinstance(arg, (dict, list)): + return json.dumps(arg) + + elif isinstance(arg, structures.InputValuePlaceholder): + input_name = arg.input_name + if not type_utils.is_parameter_type( + inputs_dict[input_name].type): + raise TypeError( + f'Input "{input_name}" with type ' + f'"{inputs_dict[input_name].type}" cannot be paired with ' + 'InputValuePlaceholder.') + + if input_name in arguments: + return placeholders.input_parameter_placeholder(input_name) + else: + input_spec = inputs_dict[input_name] + if input_spec.default is not None: + return None + else: + raise ValueError( + f'No value provided for input: {input_name}.') + + elif isinstance(arg, structures.InputUriPlaceholder): + input_name = arg.input_name + if type_utils.is_parameter_type(inputs_dict[input_name].type): + raise TypeError( + f'Input "{input_name}" with type ' + f'"{inputs_dict[input_name].type}" cannot be paired with ' + 'InputUriPlaceholder.') + + if input_name in arguments: + input_uri = placeholders.input_artifact_uri_placeholder( + input_name) + return input_uri + else: + input_spec = inputs_dict[input_name] + if input_spec.default is not None: + return None + else: + raise ValueError( + f'No value provided for input: {input_name}.') + + elif isinstance(arg, structures.InputPathPlaceholder): + input_name = arg.input_name + if type_utils.is_parameter_type(inputs_dict[input_name].type): + raise TypeError( + f'Input "{input_name}" with type ' + f'"{inputs_dict[input_name].type}" cannot be paired with ' + 'InputPathPlaceholder.') + + if input_name in arguments: + input_path = placeholders.input_artifact_path_placeholder( + input_name) + return input_path + else: + input_spec = inputs_dict[input_name] + if input_spec.optional: + return None + else: + raise ValueError( + f'No value provided for input: {input_name}.') + + elif isinstance(arg, structures.OutputUriPlaceholder): + output_name = arg.output_name + if type_utils.is_parameter_type(outputs_dict[output_name].type): + raise TypeError( + f'Onput "{output_name}" with type ' + f'"{outputs_dict[output_name].type}" cannot be paired with ' + 'OutputUriPlaceholder.') + + output_uri = placeholders.output_artifact_uri_placeholder( + output_name) + return output_uri + + elif isinstance(arg, structures.OutputPathPlaceholder): + output_name = arg.output_name + + if type_utils.is_parameter_type(outputs_dict[output_name].type): + output_path = placeholders.output_parameter_path_placeholder( + output_name) + else: + output_path = placeholders.output_artifact_path_placeholder( + output_name) + return output_path + + elif isinstance(arg, structures.ConcatPlaceholder): + expanded_argument_strings = expand_argument_list(arg.items) + return ''.join(expanded_argument_strings) + + elif isinstance(arg, structures.IfPresentPlaceholder): + if arg.if_structure.input_name in argument_values: + result_node = arg.if_structure.then + else: + result_node = arg.if_structure.otherwise + + if result_node is None: + return [] + + if isinstance(result_node, list): + expanded_result = expand_argument_list(result_node) + else: + expanded_result = expand_command_part(result_node) + return expanded_result + + else: + raise TypeError('Unrecognized argument type: {}'.format(arg)) + + def expand_argument_list(argument_list) -> Optional[List[str]]: + if argument_list is None: + return None + + expanded_list = [] + for part in argument_list: + expanded_part = expand_command_part(part) + if expanded_part is not None: + if isinstance(expanded_part, list): + expanded_list.extend(expanded_part) + else: + expanded_list.append(str(expanded_part)) + return expanded_list + + container_spec = component_spec.implementation.container + resolved_container_spec = copy.deepcopy(container_spec) + + resolved_container_spec.commands = expand_argument_list( + container_spec.commands) + resolved_container_spec.arguments = expand_argument_list( + container_spec.arguments) + + return resolved_container_spec diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_task_test.py b/sdk/python/kfp/v2/components/experimental/pipeline_task_test.py new file mode 100644 index 000000000000..0141f60bf16a --- /dev/null +++ b/sdk/python/kfp/v2/components/experimental/pipeline_task_test.py @@ -0,0 +1,214 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.components.experimental.pipeline_task.""" + +import textwrap +import unittest + +from absl.testing import parameterized +from kfp.v2.components.experimental import pipeline_task +from kfp.v2.components.experimental import structures + +V2_YAML = textwrap.dedent("""\ + name: component1 + inputs: + input1: {type: String} + outputs: + output1: {type: Artifact} + implementation: + container: + image: alpine + commands: + - sh + - -c + - echo "$0" >> "$1" + arguments: + - {inputValue: input1} + - {outputPath: output1} +""") + +V2_YAML_IF_PLACEHOLDER = textwrap.dedent("""\ + name: component_if + inputs: + optional_input_1: {type: String} + implementation: + container: + image: alpine + commands: + - sh + - -c + - echo "$0" "$1" + arguments: + - ifPresent: + inputName: optional_input_1 + then: + - "input: " + - {inputValue: optional_input_1} + otherwise: + - "default: " + - "Hello world!" +""") + +V2_YAML_CONCAT_PLACEHOLDER = textwrap.dedent("""\ + name: component_concat + inputs: + input1: {type: String} + input2: {type: String} + implementation: + container: + image: alpine + commands: + - sh + - -c + - echo "$0" + - concat: [{inputValue: input1}, "+", {inputValue: input2}] + """) + + +class PipelineTaskTest(parameterized.TestCase): + + def test_create_pipeline_task_valid(self): + expected_component_spec = structures.ComponentSpec( + name='component1', + implementation=structures.Implementation( + container=structures.ContainerSpec( + image='alpine', + commands=['sh', '-c', 'echo "$0" >> "$1"'], + arguments=[ + structures.InputValuePlaceholder(input_name='input1'), + structures.OutputPathPlaceholder(output_name='output1'), + ], + )), + inputs={ + 'input1': structures.InputSpec(type='String'), + }, + outputs={ + 'output1': structures.OutputSpec(type='Artifact'), + }, + ) + expected_task_spec = structures.TaskSpec( + name='component1', + inputs={'input1': 'value'}, + dependent_tasks=[], + component_ref='component1', + ) + expected_container_spec = structures.ContainerSpec( + image='alpine', + commands=['sh', '-c', 'echo "$0" >> "$1"'], + arguments=[ + "{{$.inputs.parameters['input1']}}", + "{{$.outputs.artifacts['output1'].path}}", + ], + ) + + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.load_from_component_yaml( + V2_YAML), + arguments={'input1': 'value'}, + ) + self.assertEqual(task.task_spec, expected_task_spec) + self.assertEqual(task.component_spec, expected_component_spec) + self.assertEqual(task.container_spec, expected_container_spec) + + def test_create_pipeline_task_invalid_missing_required_input(self): + with self.assertRaisesRegex(ValueError, + 'No value provided for input: input1.'): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec + .load_from_component_yaml(V2_YAML), + arguments={}, + ) + + def test_create_pipeline_task_invalid_wrong_input(self): + with self.assertRaisesRegex( + ValueError, + 'Component "component1" got an unexpected input: input0.'): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec + .load_from_component_yaml(V2_YAML), + arguments={ + 'input1': 'value', + 'input0': 'abc', + }, + ) + + @parameterized.parameters( + { + 'component_yaml': + V2_YAML_IF_PLACEHOLDER, + 'arguments': { + 'optional_input_1': 'value' + }, + 'expected_container_spec': + structures.ContainerSpec( + image='alpine', + commands=['sh', '-c', 'echo "$0" "$1"'], + arguments=[ + 'input: ', + "{{$.inputs.parameters['optional_input_1']}}", + ], + ) + }, + { + 'component_yaml': + V2_YAML_IF_PLACEHOLDER, + 'arguments': {}, + 'expected_container_spec': + structures.ContainerSpec( + image='alpine', + commands=['sh', '-c', 'echo "$0" "$1"'], + arguments=[ + 'default: ', + 'Hello world!', + ], + ) + }, + ) + def test_resolve_if_placeholder( + self, + component_yaml: str, + arguments: dict, + expected_container_spec: structures.ContainerSpec, + ): + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.load_from_component_yaml( + component_yaml), + arguments=arguments, + ) + self.assertEqual(task.container_spec, expected_container_spec) + + def test_resolve_concat_placeholder(self): + expected_container_spec = structures.ContainerSpec( + image='alpine', + commands=[ + 'sh', + '-c', + 'echo "$0"', + "{{$.inputs.parameters['input1']}}+{{$.inputs.parameters['input2']}}", + ], + ) + + task = pipeline_task.PipelineTask( + component_spec=structures.ComponentSpec.load_from_component_yaml( + V2_YAML_CONCAT_PLACEHOLDER), + arguments={ + 'input1': '1', + 'input2': '2', + }, + ) + self.assertEqual(task.container_spec, expected_container_spec) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/v2/components/experimental/placeholders.py b/sdk/python/kfp/v2/components/experimental/placeholders.py new file mode 100644 index 000000000000..ad24a3adb2cb --- /dev/null +++ b/sdk/python/kfp/v2/components/experimental/placeholders.py @@ -0,0 +1,42 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Placeholders for component inputs and outputs.""" + + +def input_artifact_uri_placeholder(input_key: str) -> str: + return "{{{{$.inputs.artifacts['{}'].uri}}}}".format(input_key) + + +def input_artifact_path_placeholder(input_key: str) -> str: + return "{{{{$.inputs.artifacts['{}'].path}}}}".format(input_key) + + +def input_parameter_placeholder(input_key: str) -> str: + return "{{{{$.inputs.parameters['{}']}}}}".format(input_key) + + +def output_artifact_uri_placeholder(output_key: str) -> str: + return "{{{{$.outputs.artifacts['{}'].uri}}}}".format(output_key) + + +def output_artifact_path_placeholder(output_key: str) -> str: + return "{{{{$.outputs.artifacts['{}'].path}}}}".format(output_key) + + +def output_parameter_path_placeholder(output_key: str) -> str: + return "{{{{$.outputs.parameters['{}'].output_file}}}}".format(output_key) + + +def executor_input_placeholder() -> str: + return "{{{{$}}}}" diff --git a/sdk/python/kfp/v2/components/experimental/placeholders_test.py b/sdk/python/kfp/v2/components/experimental/placeholders_test.py new file mode 100644 index 000000000000..46e252ef1650 --- /dev/null +++ b/sdk/python/kfp/v2/components/experimental/placeholders_test.py @@ -0,0 +1,67 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.components.experimental.placeholders.""" + +import unittest + +from kfp.v2.components.experimental import placeholders + + +class PlaceholdersTest(unittest.TestCase): + + def test_input_artifact_uri_placeholder(self): + self.assertEqual( + "{{$.inputs.artifacts['input1'].uri}}", + placeholders.input_artifact_uri_placeholder('input1'), + ) + + def test_output_artifact_uri_placeholder(self): + self.assertEqual( + "{{$.outputs.artifacts['output1'].uri}}", + placeholders.output_artifact_uri_placeholder('output1'), + ) + + def test_input_artifact_path_placeholder(self): + self.assertEqual( + "{{$.inputs.artifacts['input1'].path}}", + placeholders.input_artifact_path_placeholder('input1'), + ) + + def test_output_artifact_path_placeholder(self): + self.assertEqual( + "{{$.outputs.artifacts['output1'].path}}", + placeholders.output_artifact_path_placeholder('output1'), + ) + + def test_input_parameter_placeholder(self): + self.assertEqual( + "{{$.inputs.parameters['input1']}}", + placeholders.input_parameter_placeholder('input1'), + ) + + def test_output_parameter_path_placeholder(self): + self.assertEqual( + "{{$.outputs.parameters['output1'].output_file}}", + placeholders.output_parameter_path_placeholder('output1'), + ) + + def test_executor_input_placeholder(self): + self.assertEqual( + "{{{{$}}}}", + placeholders.executor_input_placeholder(), + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/v2/components/experimental/component_spec.py b/sdk/python/kfp/v2/components/experimental/structures.py similarity index 91% rename from sdk/python/kfp/v2/components/experimental/component_spec.py rename to sdk/python/kfp/v2/components/experimental/structures.py index 6caa8c8ff3b2..8d7c6f332b96 100644 --- a/sdk/python/kfp/v2/components/experimental/component_spec.py +++ b/sdk/python/kfp/v2/components/experimental/structures.py @@ -19,7 +19,7 @@ from typing import Any, Dict, Mapping, Optional, Sequence, Union from kfp.components import _components -from kfp.components import structures +from kfp.components import structures as v1_structures import pydantic import yaml @@ -53,8 +53,8 @@ class OutputSpec(BaseModel): class BasePlaceholder(BaseModel): - """Base class for placeholders that could appear in container cmd and args. - """ + """Base class for placeholders that could appear in container cmd and + args.""" pass @@ -113,9 +113,9 @@ class ConcatPlaceholder(BasePlaceholder): """Class that extends basePlaceholders for concatenation. Attributes: - concat: string or ValidCommandArgs for concatenation. + items: string or ValidCommandArgs for concatenation. """ - concat: Sequence[ValidCommandArgs] + items: Sequence[ValidCommandArgs] = pydantic.Field(alias='concat') class IfPresentPlaceholderStructure(BaseModel): @@ -147,7 +147,7 @@ class IfPresentPlaceholder(BasePlaceholder): Attributes: if_present (ifPresent): holds structure for conditional cases. """ - if_present: IfPresentPlaceholderStructure = pydantic.Field( + if_structure: IfPresentPlaceholderStructure = pydantic.Field( alias='ifPresent') @@ -209,27 +209,28 @@ class TaskSpec(BaseModel): name: The name of the task. inputs: The sources of task inputs. Constant values or PipelineParams. dependent_tasks: The list of upstream tasks. - enable_caching: Whether or not to enable caching for the task. component_ref: The name of a component spec this task is based on. trigger_condition: Optional; an expression which will be evaluated into - a boolean value. True to trigger the task to run. + a boolean value. True to trigger the task to run. trigger_strategy: Optional; when the task will be ready to be triggered. Valid values include: "TRIGGER_STRATEGY_UNSPECIFIED", "ALL_UPSTREAM_TASKS_SUCCEEDED", and "ALL_UPSTREAM_TASKS_COMPLETED". iterator_items: Optional; the items to iterate on. A constant value or - a PipelineParam. + a PipelineParam. iterator_item_input: Optional; the name of the input which has the item - from the [items][] collection. + from the [items][] collection. + enable_caching: Optional; whether or not to enable caching for the task. + Default is True. """ name: str inputs: Mapping[str, Any] dependent_tasks: Sequence[str] - enable_caching: bool component_ref: str trigger_condition: Optional[str] = None trigger_strategy: Optional[str] = None iterator_items: Optional[Any] = None iterator_item_input: Optional[str] = None + enable_caching: bool = True class DagSpec(BaseModel): @@ -350,16 +351,16 @@ def _check_valid_placeholder_reference(cls, valid_inputs: Sequence[str], raise ValueError( f'Argument "{arg}" references non-existing output.') elif isinstance(arg, IfPresentPlaceholder): - if arg.if_present.input_name not in valid_inputs: + if arg.if_structure.input_name not in valid_inputs: raise ValueError( f'Argument "{arg}" references non-existing input.') - for placeholder in itertools.chain(arg.if_present.then, - arg.if_present.otherwise): + for placeholder in itertools.chain(arg.if_structure.then, + arg.if_structure.otherwise): cls._check_valid_placeholder_reference(valid_inputs, valid_outputs, placeholder) elif isinstance(arg, ConcatPlaceholder): - for placeholder in arg.concat: + for placeholder in arg.items: cls._check_valid_placeholder_reference(valid_inputs, valid_outputs, placeholder) @@ -369,7 +370,7 @@ def _check_valid_placeholder_reference(cls, valid_inputs: Sequence[str], @classmethod def from_v1_component_spec( cls, - v1_component_spec: structures.ComponentSpec) -> 'ComponentSpec': + v1_component_spec: v1_structures.ComponentSpec) -> 'ComponentSpec': """Converts V1 ComponentSpec to V2 ComponentSpec. Args: @@ -412,7 +413,7 @@ def _transform_arg(arg: Union[str, Dict[str, str]]) -> ValidCommandArgs: IfPresentPlaceholderStructure.update_forward_refs() return IfPresentPlaceholder( - if_present=IfPresentPlaceholderStructure( + if_structure=IfPresentPlaceholderStructure( input_name=if_placeholder_values['cond']['isPresent'], then=list( _transform_arg(val) @@ -459,7 +460,7 @@ def _transform_arg(arg: Union[str, Dict[str, str]]) -> ValidCommandArgs: for spec in component_dict.get('outputs', []) }) - def to_v1_component_spec(self) -> structures.ComponentSpec: + def to_v1_component_spec(self) -> v1_structures.ComponentSpec: """Converts to v1 ComponentSpec. Returns: @@ -472,40 +473,40 @@ def _transform_arg(arg: ValidCommandArgs) -> Any: if isinstance(arg, str): return arg if isinstance(arg, InputValuePlaceholder): - return structures.InputValuePlaceholder(arg.input_name) + return v1_structures.InputValuePlaceholder(arg.input_name) if isinstance(arg, InputPathPlaceholder): - return structures.InputPathPlaceholder(arg.input_name) + return v1_structures.InputPathPlaceholder(arg.input_name) if isinstance(arg, InputUriPlaceholder): - return structures.InputUriPlaceholder(arg.input_name) + return v1_structures.InputUriPlaceholder(arg.input_name) if isinstance(arg, OutputPathPlaceholder): - return structures.OutputPathPlaceholder(arg.output_name) + return v1_structures.OutputPathPlaceholder(arg.output_name) if isinstance(arg, OutputUriPlaceholder): - return structures.OutputUriPlaceholder(arg.output_name) + return v1_structures.OutputUriPlaceholder(arg.output_name) if isinstance(arg, IfPresentPlaceholder): - return structures.IfPlaceholder(arg.if_present) + return v1_structures.IfPlaceholder(arg.if_structure) if isinstance(arg, ConcatPlaceholder): - return structures.ConcatPlaceholder(arg.concat) + return v1_structures.ConcatPlaceholder(arg.concat) raise ValueError( f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".' ) - return structures.ComponentSpec( + return v1_structures.ComponentSpec( name=self.name, inputs=[ - structures.InputSpec( + v1_structures.InputSpec( name=name, type=input_spec.type, default=input_spec.default, ) for name, input_spec in self.inputs.items() ], outputs=[ - structures.OutputSpec( + v1_structures.OutputSpec( name=name, type=output_spec.type, ) for name, output_spec in self.outputs.items() ], - implementation=structures.ContainerImplementation( - container=structures.ContainerSpec( + implementation=v1_structures.ContainerImplementation( + container=v1_structures.ContainerSpec( image=self.implementation.container.image, command=[ _transform_arg(cmd) diff --git a/sdk/python/kfp/v2/components/experimental/component_spec_test.py b/sdk/python/kfp/v2/components/experimental/structures_test.py similarity index 70% rename from sdk/python/kfp/v2/components/experimental/component_spec_test.py rename to sdk/python/kfp/v2/components/experimental/structures_test.py index 373033110af8..56f5f6bff228 100644 --- a/sdk/python/kfp/v2/components/experimental/component_spec_test.py +++ b/sdk/python/kfp/v2/components/experimental/structures_test.py @@ -11,16 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Tests for kfp.v2.components.experimental.component_spec.""" - -from absl.testing import parameterized +"""Tests for kfp.v2.components.experimental.structures.""" import textwrap import unittest from unittest import mock -import pydantic -from kfp.v2.components.experimental import component_spec +import pydantic +from absl.testing import parameterized +from kfp.v2.components.experimental import structures V1_YAML_IF_PLACEHOLDER = textwrap.dedent("""\ name: component_if @@ -57,18 +56,18 @@ otherwise: [--arg2, default] """) -V2_COMPONENT_SPEC_IF_PLACEHOLDER = component_spec.ComponentSpec( +V2_COMPONENT_SPEC_IF_PLACEHOLDER = structures.ComponentSpec( name='component_if', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', arguments=[ - component_spec.IfPresentPlaceholder( - if_present=component_spec.IfPresentPlaceholderStructure( + structures.IfPresentPlaceholder( + if_structure=structures.IfPresentPlaceholderStructure( input_name='optional_input_1', then=[ '--arg1', - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='optional_input_1'), ], otherwise=[ @@ -76,7 +75,7 @@ 'default', ])) ])), - inputs={'optional_input_1': component_spec.InputSpec(type='String')}, + inputs={'optional_input_1': structures.InputSpec(type='String')}, ) V1_YAML_CONCAT_PLACEHOLDER = textwrap.dedent("""\ @@ -103,19 +102,18 @@ - {inputValue: input_prefix} """) -V2_COMPONENT_SPEC_CONCAT_PLACEHOLDER = component_spec.ComponentSpec( +V2_COMPONENT_SPEC_CONCAT_PLACEHOLDER = structures.ComponentSpec( name='component_concat', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', arguments=[ - component_spec.ConcatPlaceholder(concat=[ + structures.ConcatPlaceholder(concat=[ '--arg1', - component_spec.InputValuePlaceholder( - input_name='input_prefix'), + structures.InputValuePlaceholder(input_name='input_prefix'), ]) ])), - inputs={'input_prefix': component_spec.InputSpec(type='String')}, + inputs={'input_prefix': structures.InputSpec(type='String')}, ) V2_YAML_NESTED_PLACEHOLDER = textwrap.dedent("""\ @@ -141,84 +139,84 @@ - {inputValue: input_prefix} """) -V2_COMPONENT_SPEC_NESTED_PLACEHOLDER = component_spec.ComponentSpec( +V2_COMPONENT_SPEC_NESTED_PLACEHOLDER = structures.ComponentSpec( name='component_nested', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', arguments=[ - component_spec.ConcatPlaceholder(concat=[ + structures.ConcatPlaceholder(concat=[ '--arg1', - component_spec.IfPresentPlaceholder( - if_present=component_spec.IfPresentPlaceholderStructure( + structures.IfPresentPlaceholder( + if_structure=structures.IfPresentPlaceholderStructure( input_name='input_prefix', then=[ '--arg1', - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='input_prefix'), ], otherwise=[ '--arg2', 'default', - component_spec.ConcatPlaceholder(concat=[ + structures.ConcatPlaceholder(concat=[ '--arg1', - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='input_prefix'), ]), ])), ]) ])), - inputs={'input_prefix': component_spec.InputSpec(type='String')}, + inputs={'input_prefix': structures.InputSpec(type='String')}, ) -class ComponentSpecTest(parameterized.TestCase): +class StructuresTest(parameterized.TestCase): def test_component_spec_with_placeholder_referencing_nonexisting_input_output( self): with self.assertRaisesRegex( pydantic.ValidationError, 'Argument "input_name=\'input000\'" ' 'references non-existing input.'): - component_spec.ComponentSpec( + structures.ComponentSpec( name='component_1', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', commands=[ 'sh', '-c', 'set -ex\necho "$0" > "$1"', - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='input000'), - component_spec.OutputPathPlaceholder( + structures.OutputPathPlaceholder( output_name='output1'), ], )), - inputs={'input1': component_spec.InputSpec(type='String')}, - outputs={'output1': component_spec.OutputSpec(type='String')}, + inputs={'input1': structures.InputSpec(type='String')}, + outputs={'output1': structures.OutputSpec(type='String')}, ) with self.assertRaisesRegex( pydantic.ValidationError, 'Argument "output_name=\'output000\'" ' 'references non-existing output.'): - component_spec.ComponentSpec( + structures.ComponentSpec( name='component_1', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', commands=[ 'sh', '-c', 'set -ex\necho "$0" > "$1"', - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='input1'), - component_spec.OutputPathPlaceholder( + structures.OutputPathPlaceholder( output_name='output000'), ], )), - inputs={'input1': component_spec.InputSpec(type='String')}, - outputs={'output1': component_spec.OutputSpec(type='String')}, + inputs={'input1': structures.InputSpec(type='String')}, + outputs={'output1': structures.OutputSpec(type='String')}, ) def test_simple_component_spec_save_to_component_yaml(self): @@ -243,26 +241,26 @@ def test_simple_component_spec_save_to_component_yaml(self): """) with mock.patch("builtins.open", open_mock, create=True): - component_spec.ComponentSpec( + structures.ComponentSpec( name='component_1', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', commands=[ 'sh', '-c', 'set -ex\necho "$0" > "$1"', - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='input1'), - component_spec.OutputPathPlaceholder( + structures.OutputPathPlaceholder( output_name='output1'), ], )), inputs={ - 'input1': component_spec.InputSpec(type='String') + 'input1': structures.InputSpec(type='String') }, outputs={ - 'output1': component_spec.OutputSpec(type='String') + 'output1': structures.OutputSpec(type='String') }, ).save_to_component_yaml('test_save_file.txt') @@ -315,26 +313,24 @@ def test_simple_component_spec_load_from_v2_component_yaml(self): - outputPath: output1 """) - generated_spec = component_spec.ComponentSpec.load_from_component_yaml( + generated_spec = structures.ComponentSpec.load_from_component_yaml( component_yaml_v2) - expected_spec = component_spec.ComponentSpec( + expected_spec = structures.ComponentSpec( name='component_1', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='alpine', commands=[ 'sh', '-c', 'set -ex\necho "$0" > "$1"', - component_spec.InputValuePlaceholder( - input_name='input1'), - component_spec.OutputPathPlaceholder( - output_name='output1'), + structures.InputValuePlaceholder(input_name='input1'), + structures.OutputPathPlaceholder(output_name='output1'), ], )), - inputs={'input1': component_spec.InputSpec(type='String')}, - outputs={'output1': component_spec.OutputSpec(type='String')}) + inputs={'input1': structures.InputSpec(type='String')}, + outputs={'output1': structures.OutputSpec(type='String')}) self.assertEqual(generated_spec, expected_spec) @parameterized.parameters( @@ -361,8 +357,7 @@ def test_simple_component_spec_load_from_v2_component_yaml(self): ) def test_component_spec_placeholder_load_from_v2_component_yaml( self, yaml, expected_component): - generated_spec = component_spec.ComponentSpec.load_from_component_yaml( - yaml) + generated_spec = structures.ComponentSpec.load_from_component_yaml(yaml) self.assertEqual(generated_spec, expected_component) def test_component_spec_load_from_v1_component_yaml(self): @@ -391,13 +386,13 @@ def test_component_spec_load_from_v1_component_yaml(self): - {outputPath: Output 2} """) - generated_spec = component_spec.ComponentSpec.load_from_component_yaml( + generated_spec = structures.ComponentSpec.load_from_component_yaml( component_yaml_v1) - expected_spec = component_spec.ComponentSpec( + expected_spec = structures.ComponentSpec( name='Component with 2 inputs and 2 outputs', - implementation=component_spec.Implementation( - container=component_spec.ContainerSpec( + implementation=structures.Implementation( + container=structures.ContainerSpec( image='busybox', commands=[ 'sh', @@ -406,24 +401,24 @@ def test_component_spec_load_from_v1_component_yaml(self): 'echo "$0" > "$2" cp "$1" "$3" '), ], arguments=[ - component_spec.InputValuePlaceholder( + structures.InputValuePlaceholder( input_name='Input parameter'), - component_spec.InputPathPlaceholder( + structures.InputPathPlaceholder( input_name='Input artifact'), - component_spec.OutputPathPlaceholder( + structures.OutputPathPlaceholder( output_name='Output 1'), - component_spec.OutputPathPlaceholder( + structures.OutputPathPlaceholder( output_name='Output 2'), ], env={}, )), inputs={ - 'Input parameter': component_spec.InputSpec(type='Artifact'), - 'Input artifact': component_spec.InputSpec(type='Artifact') + 'Input parameter': structures.InputSpec(type='Artifact'), + 'Input artifact': structures.InputSpec(type='Artifact') }, outputs={ - 'Output 1': component_spec.OutputSpec(type='Artifact'), - 'Output 2': component_spec.OutputSpec(type='Artifact'), + 'Output 1': structures.OutputSpec(type='Artifact'), + 'Output 2': structures.OutputSpec(type='Artifact'), }) self.assertEqual(generated_spec, expected_spec) diff --git a/sdk/python/kfp/v2/components/types/type_utils.py b/sdk/python/kfp/v2/components/types/type_utils.py index 22b1a7ca1647..911e00369a99 100644 --- a/sdk/python/kfp/v2/components/types/type_utils.py +++ b/sdk/python/kfp/v2/components/types/type_utils.py @@ -14,6 +14,7 @@ """Utilities for component I/O type mapping.""" import inspect import re +import warnings from typing import Dict, List, Optional, Type, Union from kfp.components import structures, type_annotation_utils @@ -171,3 +172,92 @@ def get_input_artifact_type_schema( component_input.type), 'Input is not an artifact type.' return get_artifact_type_schema(component_input.type) assert False, 'Input not found.' + + +class InconsistentTypeException(Exception): + """InconsistencyTypeException is raised when two types are not + consistent.""" + pass + + +class InconsistentTypeWarning(Warning): + """InconsistentTypeWarning is issued when two types are not consistent.""" + pass + + +def verify_type_compatibility( + given_type: Union[str, dict], + expected_type: Union[str, dict], + error_message_prefix: str, +) -> bool: + """Verifies the given argument type is compatible with the expected type. + + Args: + given_type: The type of the argument passed to the input. + expected_type: The declared type of the input. + error_message_prefix: The prefix for the error message. + + Returns: + True if types are compatible, and False if otherwise. + + Raises: + InconsistentTypeException if types are incompatible and TYPE_CHECK==True. + """ + + # Generic "Artifact" type is compatible with any specific artifact types. + if not is_parameter_type( + str(given_type)) and (str(given_type).lower() == "artifact" or + str(expected_type).lower() == "artifact"): + return True + + types_are_compatible = _check_types(given_type, expected_type) + + if not types_are_compatible: + error_text = error_message_prefix + ( + 'Argument type "{}" is incompatible with the input type "{}"' + ).format(str(given_type), str(expected_type)) + import kfp + if kfp.TYPE_CHECK: + raise InconsistentTypeException(error_text) + else: + warnings.warn(InconsistentTypeWarning(error_text)) + return types_are_compatible + + +def _check_types( + given_type: Union[str, dict], + expected_type: Union[str, dict], +): + if isinstance(given_type, str): + given_type = {given_type: {}} + if isinstance(expected_type, str): + expected_type = {expected_type: {}} + return _check_dict_types(given_type, expected_type) + + +def _check_dict_types( + given_type: dict, + expected_type: dict, +): + given_type_name, _ = list(given_type.items())[0] + expected_type_name, _ = list(expected_type.items())[0] + if given_type_name == "" or expected_type_name == "": + # If the type name is empty, it matches any types + return True + if given_type_name != expected_type_name: + print("type name " + str(given_type_name) + + " is different from expected: " + str(expected_type_name)) + return False + type_name = given_type_name + for type_property in given_type[type_name]: + if type_property not in expected_type[type_name]: + print(type_name + " has a property " + str(type_property) + + " that the latter does not.") + return False + if given_type[type_name][type_property] != expected_type[type_name][ + type_property]: + print(type_name + " has a property " + str(type_property) + + " with value: " + str(given_type[type_name][type_property]) + + " and " + str(expected_type[type_name][type_property])) + return False + return True diff --git a/sdk/python/kfp/v2/components/types/type_utils_test.py b/sdk/python/kfp/v2/components/types/type_utils_test.py index c94543c38be9..2fa1fbc7b6bd 100644 --- a/sdk/python/kfp/v2/components/types/type_utils_test.py +++ b/sdk/python/kfp/v2/components/types/type_utils_test.py @@ -14,12 +14,13 @@ import sys import unittest -from typing import Any, Dict, List +from typing import Any, Dict, List, Union from absl.testing import parameterized from kfp.components import structures from kfp.pipeline_spec import pipeline_spec_pb2 as pb from kfp.v2.components.types import artifact_types, type_utils +from kfp.v2.components.types.type_utils import InconsistentTypeException _PARAMETER_TYPES = [ 'String', @@ -321,6 +322,75 @@ def test_get_parameter_type_field_name(self): self.assertEqual('double_value', type_utils.get_parameter_type_field_name('Float')) + @parameterized.parameters( + { + 'given_type': 'String', + 'expected_type': 'String', + 'is_compatible': True, + }, + { + 'given_type': 'String', + 'expected_type': 'Integer', + 'is_compatible': False, + }, + { + 'given_type': { + 'type_a': { + 'property': 'property_b', + } + }, + 'expected_type': { + 'type_a': { + 'property': 'property_b', + } + }, + 'is_compatible': True, + }, + { + 'given_type': { + 'type_a': { + 'property': 'property_b', + } + }, + 'expected_type': { + 'type_a': { + 'property': 'property_c', + } + }, + 'is_compatible': False, + }, + { + 'given_type': 'Artifact', + 'expected_type': 'Model', + 'is_compatible': True, + }, + { + 'given_type': 'Metrics', + 'expected_type': 'Artifact', + 'is_compatible': True, + }, + ) + def test_verify_type_compatibility( + self, + given_type: Union[str, dict], + expected_type: Union[str, dict], + is_compatible: bool, + ): + if is_compatible: + self.assertTrue( + type_utils.verify_type_compatibility( + given_type=given_type, + expected_type=expected_type, + error_message_prefix='', + )) + else: + with self.assertRaises(InconsistentTypeException): + type_utils.verify_type_compatibility( + given_type=given_type, + expected_type=expected_type, + error_message_prefix='', + ) + if __name__ == '__main__': unittest.main()