diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 7c359845850..aa32ef87c7f 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -14,6 +14,7 @@ ## Bug Fixes and Other Changes +* Define PipelineParameterChannel and PipelineArtifactChannel in v2. [\#6470](https://github.com/kubeflow/pipelines/pull/6470) * Remove dead code on importer check in v1. [\#6508](https://github.com/kubeflow/pipelines/pull/6508) * Fix issue where dict, list, bool typed input parameters don't accept constant values or pipeline inputs. [\#6523](https://github.com/kubeflow/pipelines/pull/6523) * Fix passing in "" to a str parameter causes the parameter to receive it as None instead. [\#6533](https://github.com/kubeflow/pipelines/pull/6533) diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_channel.py b/sdk/python/kfp/v2/components/experimental/pipeline_channel.py new file mode 100644 index 00000000000..19ad4b84dee --- /dev/null +++ b/sdk/python/kfp/v2/components/experimental/pipeline_channel.py @@ -0,0 +1,334 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Definition of PipelineChannel.""" +import abc +import dataclasses +import json +import re +from typing import Dict, List, Optional, Union + +from kfp.v2.components import utils +from kfp.v2.components.types import type_utils + + +@dataclasses.dataclass +class ConditionOperator: + """Represents a condition expression to be used in dsl.Condition(). + + Attributes: + operator: The operator of the condition. + left_operand: The left operand. + right_operand: The right operand. + """ + operator: str + left_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + right_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + + +# The string template used to generate the placeholder of a PipelineChannel. +_PIPELINE_CHANNEL_PLACEHOLDER_TEMPLATE = ( + '{{channel:task=%s;name=%s;type=%s;}}' +) +# The regex for parsing PipelineChannel placeholders from a string. +_PIPELINE_CHANNEL_PLACEHOLDER_REGEX = ( + r'{{channel:task=([\w\s_-]*);name=([\w\s_-]+);type=([\w\s{}":_-]*);}}' +) + + +class PipelineChannel(abc.ABC): + """Represents a future value that is passed between pipeline components. + + A PipelineChannel object can be used as a pipeline function argument so that + it will be a pipeline artifact or parameter that shows up in ML Pipelines + system UI. It can also represent an intermediate value passed between + components. + + Attributes: + name: The name of the pipeline channel. + channel_type: The type of the pipeline channel. + task_name: The name of the task that produces the pipeline channel. + None means it is not produced by any task, so if None, either user + constructs it directly (for providing an immediate value), or it is a + pipeline function argument. + pattern: The serialized string regex pattern this pipeline channel created + from. + """ + + @abc.abstractmethod + def __init__( + self, + name: str, + channel_type: Union[str, Dict], + task_name: Optional[str] = None, + ): + """Initializes a PipelineChannel instance. + + Args: + name: The name of the pipeline channel. + channel_type: The type of the pipeline channel. + task_name: Optional; The name of the task that produces the pipeline + channel. + + Raises: + ValueError: If name or task_name contains invalid characters. + ValueError: If both task_name and value are set. + """ + valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' + if not re.match(valid_name_regex, name): + raise ValueError( + 'Only letters, numbers, spaces, "_", and "-" are allowed in the ' + 'name. Must begin with a letter. Got name: {}'.format(name) + ) + + self.name = name + self.channel_type = channel_type + # ensure value is None even if empty string or empty list/dict + # so that serialization and unserialization remain consistent + # (i.e. None => '' => None) + self.task_name = task_name or None + + @property + def full_name(self) -> str: + """Unique name for the PipelineChannel.""" + return f'{self.task_name}-{self.name}' if self.task_name else self.name + + @property + def pattern(self) -> str: + """Unique pattern for the PipelineChannel.""" + return str(self) + + def __str__(self) -> str: + """String representation of the PipelineChannel. + + The string representation is a string identifier so we can mix + the PipelineChannel inline with other strings such as arguments. + For example, we can support: ['echo %s' % param] as the + container command and later a compiler can replace the + placeholder '{{pipeline_channel:task=%s;name=%s;type=%s}}' with + its own parameter identifier. + """ + task_name = self.task_name or '' + name = self.name + channel_type = self.channel_type or '' + if isinstance(channel_type, dict): + channel_type = json.dumps(channel_type) + return _PIPELINE_CHANNEL_PLACEHOLDER_TEMPLATE % ( + task_name, name, channel_type + ) + + def __repr__(self) -> str: + """Representation of the PipelineChannel. + + We make repr return the placeholder string so that if someone + uses str()-based serialization of complex objects containing + `PipelineChannel`, it works properly. (e.g. str([1, 2, 3, + kfp.v2.dsl.PipelineParameterChannel("aaa"), 4, 5, 6,])) + """ + return str(self) + + def __hash__(self) -> int: + """Returns the hash of a PipelineChannel.""" + return hash(self.pattern) + + def __eq__(self, other): + return ConditionOperator('==', self, other) + + def __ne__(self, other): + return ConditionOperator('!=', self, other) + + def __lt__(self, other): + return ConditionOperator('<', self, other) + + def __le__(self, other): + return ConditionOperator('<=', self, other) + + def __gt__(self, other): + return ConditionOperator('>', self, other) + + def __ge__(self, other): + return ConditionOperator('>=', self, other) + + +class PipelineParameterChannel(PipelineChannel): + """Represents a pipeline parameter channel. + + Attributes: + name: The name of the pipeline channel. + channel_type: The type of the pipeline channel. + task_name: The name of the task that produces the pipeline channel. + None means it is not produced by any task, so if None, either user + constructs it directly (for providing an immediate value), or it is a + pipeline function argument. + pattern: The serialized string regex pattern this pipeline channel created + from. + value: The actual value of the pipeline channel. If provided, the + pipeline channel is "resolved" immediately. + """ + + def __init__( + self, + name: str, + channel_type: Union[str, Dict], + task_name: Optional[str] = None, + value: Optional[type_utils.PARAMETER_TYPES] = None, + ): + """Initializes a PipelineArtifactChannel instance. + + Args: + name: The name of the pipeline channel. + channel_type: The type of the pipeline channel. + task_name: Optional; The name of the task that produces the pipeline + channel. + value: Optional; The actual value of the pipeline channel. + + Raises: + ValueError: If name or task_name contains invalid characters. + ValueError: If both task_name and value are set. + TypeError: If the channel type is not a parameter type. + """ + if task_name and value: + raise ValueError('task_name and value cannot be both set.') + + if not type_utils.is_parameter_type(channel_type): + raise TypeError(f'{channel_type} is not a parameter type.') + + self.value = value + + super(PipelineParameterChannel, self).__init__( + name=name, + channel_type=channel_type, + task_name=task_name, + ) + + +class PipelineArtifactChannel(PipelineChannel): + """Represents a pipeline artifact channel. + + Attributes: + name: The name of the pipeline channel. + channel_type: The type of the pipeline channel. + task_name: The name of the task that produces the pipeline channel. + A pipeline artifact channel is always produced by some task. + pattern: The serialized string regex pattern this pipeline channel created + from. + """ + + def __init__( + self, + name: str, + channel_type: Union[str, Dict], + task_name: str, + ): + """Initializes a PipelineArtifactChannel instance. + + Args: + name: The name of the pipeline channel. + channel_type: The type of the pipeline channel. + task_name: The name of the task that produces the pipeline channel. + + Raises: + ValueError: If name or task_name contains invalid characters. + TypeError: If the channel type is not an artifact type. + """ + if type_utils.is_parameter_type(channel_type): + raise TypeError(f'{channel_type} is not an artifact type.') + + super(PipelineArtifactChannel, self).__init__( + name=name, + channel_type=channel_type, + task_name=task_name, + ) + + +def extract_pipeline_channels_from_string( + payload: str +) -> List[PipelineChannel]: + """Extracts a list of PipelineChannel instances from the payload string. + + Note: this function removes all duplicate matches. + + Args: + payload: A string that may contain serialized PipelineChannels. + + Returns: + A list of PipelineChannels found from the payload. + """ + matches = re.findall(_PIPELINE_CHANNEL_PLACEHOLDER_REGEX, payload) + unique_channels = set() + for match in matches: + task_name, name, channel_type = match + + # channel_type could be either a string (e.g. "Integer") or a dictionary + # (e.g.: {"custom_type": {"custom_property": "some_value"}}). + # Try loading it into dictionary, if failed, it means channel_type is a + # string. + try: + channel_type = json.loads(channel_type) + except json.JSONDecodeError: + pass + + if type_utils.is_parameter_type(channel_type): + pipeline_channel = PipelineParameterChannel( + name=utils.maybe_rename_for_k8s(name), + channel_type=channel_type, + task_name=utils.maybe_rename_for_k8s(task_name), + ) + else: + pipeline_channel = PipelineArtifactChannel( + name=utils.maybe_rename_for_k8s(name), + channel_type=channel_type, + task_name=utils.maybe_rename_for_k8s(task_name), + ) + unique_channels.add(pipeline_channel) + + return list(unique_channels) + + +def extract_pipeline_channels_from_any( + payload: Union[PipelineChannel, str, list, tuple, dict] +) -> List[PipelineChannel]: + """Recursively extract PipelineChannels from any object or list of objects. + + Args: + payload: An object that contains serialized PipelineChannels or k8 + definition objects. + + Returns: + A list of PipelineChannels found from the payload. + """ + if not payload: + return [] + + if isinstance(payload, PipelineChannel): + return [payload] + + if isinstance(payload, str): + return list(set(extract_pipeline_channels_from_string(payload))) + + if isinstance(payload, list) or isinstance(payload, tuple): + pipeline_channels = [] + for item in payload: + pipeline_channels += extract_pipeline_channels_from_any(item) + return list(set(pipeline_channels)) + + if isinstance(payload, dict): + pipeline_channels = [] + for key, value in payload.items(): + pipeline_channels += extract_pipeline_channels_from_any(key) + pipeline_channels += extract_pipeline_channels_from_any(value) + return list(set(pipeline_channels)) + + # TODO(chensun): extract PipelineChannel from v2 container spec? + + return [] diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py b/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py new file mode 100644 index 00000000000..ae4baccc050 --- /dev/null +++ b/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py @@ -0,0 +1,158 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.components.experimental.pipeline_channel.""" + +import unittest + +from absl.testing import parameterized +from kfp.v2.components.experimental import component_spec, pipeline_channel + + +class PipelineChannelTest(parameterized.TestCase): + + def test_instantiate_pipline_channel(self): + with self.assertRaisesRegex( + TypeError, "Can't instantiate abstract class PipelineChannel"): + p = pipeline_channel.PipelineChannel( + name='channel', + channel_type='String', + ) + + def test_invalid_name(self): + with self.assertRaisesRegex( + ValueError, + 'Only letters, numbers, spaces, "_", and "-" are allowed in the ' + 'name. Must begin with a letter. Got name: 123_abc'): + p = pipeline_channel.PipelineParameterChannel( + name='123_abc', + channel_type='String', + ) + + def test_task_name_and_value_both_set(self): + with self.assertRaisesRegex(ValueError, + 'task_name and value cannot be both set.'): + p = pipeline_channel.PipelineParameterChannel( + name='abc', + channel_type='Integer', + task_name='task1', + value=123, + ) + + def test_invalid_type(self): + with self.assertRaisesRegex(TypeError, + 'Artifact is not a parameter type.'): + p = pipeline_channel.PipelineParameterChannel( + name='channel1', + channel_type='Artifact', + ) + + with self.assertRaisesRegex(TypeError, + 'String is not an artifact type.'): + p = pipeline_channel.PipelineArtifactChannel( + name='channel1', + channel_type='String', + task_name='task1', + ) + + @parameterized.parameters( + { + 'pipeline_channel': + pipeline_channel.PipelineParameterChannel( + name='channel1', + task_name='task1', + channel_type='String', + ), + 'str_repr': + '{{channel:task=task1;name=channel1;type=String;}}', + }, + { + 'pipeline_channel': + pipeline_channel.PipelineParameterChannel( + name='channel2', + channel_type='Integer', + ), + 'str_repr': + '{{channel:task=;name=channel2;type=Integer;}}', + }, + { + 'pipeline_channel': + pipeline_channel.PipelineArtifactChannel( + name='channel3', + channel_type={'type_a': { + 'property_b': 'c' + }}, + task_name='task3', + ), + 'str_repr': + '{{channel:task=task3;name=channel3;type={"type_a": {"property_b": "c"}};}}', + }, + { + 'pipeline_channel': + pipeline_channel.PipelineParameterChannel( + name='channel4', + channel_type='Float', + value=1.23, + ), + 'str_repr': + '{{channel:task=;name=channel4;type=Float;}}', + }, + { + 'pipeline_channel': + pipeline_channel.PipelineArtifactChannel( + name='channel5', + channel_type='Artifact', + task_name='task5', + ), + 'str_repr': + '{{channel:task=task5;name=channel5;type=Artifact;}}', + }, + ) + def test_str_repr(self, pipeline_channel, str_repr): + self.assertEqual(str_repr, str(pipeline_channel)) + + def test_extract_pipeline_channels(self): + p1 = pipeline_channel.PipelineParameterChannel( + name='channel1', + channel_type='String', + value='abc', + ) + p2 = pipeline_channel.PipelineArtifactChannel( + name='channel2', + channel_type='customized_type_b', + task_name='task2', + ) + p3 = pipeline_channel.PipelineArtifactChannel( + name='channel3', + channel_type={'customized_type_c': { + 'property_c': 'value_c' + }}, + task_name='task3', + ) + stuff_chars = ' between ' + payload = str(p1) + stuff_chars + str(p2) + stuff_chars + str(p3) + params = pipeline_channel.extract_pipeline_channels_from_string(payload) + self.assertListEqual([p1, p2, p3], params) + + # Expecting the extract_pipelineparam_from_any to dedup pipeline channels + # among all the payloads. + payload = [ + str(p1) + stuff_chars + str(p2), + str(p2) + stuff_chars + str(p3) + ] + params = pipeline_channel.extract_pipeline_channels_from_any(payload) + self.assertListEqual([p1, p2, p3], params) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/v2/components/types/type_utils.py b/sdk/python/kfp/v2/components/types/type_utils.py index a55b3d1feb0..b1f6e4c2d65 100644 --- a/sdk/python/kfp/v2/components/types/type_utils.py +++ b/sdk/python/kfp/v2/components/types/type_utils.py @@ -19,6 +19,9 @@ from kfp.pipeline_spec import pipeline_spec_pb2 from kfp.v2.components.types import artifact_types + +PARAMETER_TYPES = Union[str, int, float, bool, dict, list] + # ComponentSpec I/O types to DSL ontology artifact classes mapping. _ARTIFACT_CLASSES_MAPPING = { 'model': artifact_types.Model, diff --git a/sdk/python/kfp/v2/components/utils.py b/sdk/python/kfp/v2/components/utils.py new file mode 100644 index 00000000000..86a7e35a244 --- /dev/null +++ b/sdk/python/kfp/v2/components/utils.py @@ -0,0 +1,29 @@ +# Copyright 2021 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Definitions of utils methods.""" + +import re + + +def maybe_rename_for_k8s(name: str) -> str: + """Cleans and converts a name to be k8s compatible. + + Args: + name: The original name. + + Returns: + A sanitized name. + """ + return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', + name.lower())).lstrip('-').rstrip('-') diff --git a/sdk/python/kfp/v2/components/utils_test.py b/sdk/python/kfp/v2/components/utils_test.py new file mode 100644 index 00000000000..1f52ac5b186 --- /dev/null +++ b/sdk/python/kfp/v2/components/utils_test.py @@ -0,0 +1,43 @@ +# Copyright 2020 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.components.utils.""" + +import unittest + +from absl.testing import parameterized +from kfp.v2.components import utils + + +class UtilsTest(parameterized.TestCase): + + @parameterized.parameters( + { + 'original': 'name', + 'expected': 'name', + }, + { + 'original': ' Some name', + 'expected': 'some-name', + }, + { + 'original': 'name_123*', + 'expected': 'name-123', + }, + ) + def test_maybe_rename_for_k8s(self, original, expected): + self.assertEqual(utils.maybe_rename_for_k8s(original), expected) + + +if __name__ == '__main__': + unittest.main()