diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_channel.py b/sdk/python/kfp/v2/components/experimental/pipeline_channel.py index 53cd709946f..19ad4b84dee 100644 --- a/sdk/python/kfp/v2/components/experimental/pipeline_channel.py +++ b/sdk/python/kfp/v2/components/experimental/pipeline_channel.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Definition of PipelineChannel.""" - +import abc import dataclasses import json import re @@ -28,15 +28,25 @@ class ConditionOperator: Attributes: operator: The operator of the condition. - operand1: The left operand. - operand2: The right operand. + left_operand: The left operand. + right_operand: The right operand. """ operator: str - operand1: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] - operand2: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + left_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + right_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + + +# The string template used to generate the placeholder of a PipelineChannel. +_PIPELINE_CHANNEL_PLACEHOLDER_TEMPLATE = ( + '{{channel:task=%s;name=%s;type=%s;}}' +) +# The regex for parsing PipelineChannel placeholders from a string. +_PIPELINE_CHANNEL_PLACEHOLDER_REGEX = ( + r'{{channel:task=([\w\s_-]*);name=([\w\s_-]+);type=([\w\s{}":_-]*);}}' +) -class PipelineChannel: +class PipelineChannel(abc.ABC): """Represents a future value that is passed between pipeline components. A PipelineChannel object can be used as a pipeline function argument so that @@ -47,31 +57,32 @@ class PipelineChannel: Attributes: name: The name of the pipeline channel. channel_type: The type of the pipeline channel. - op_name: The name of the operation that produces the pipeline channel. - None means it is not produced by any operator, so if None, either user + task_name: The name of the task that produces the pipeline channel. + None means it is not produced by any task, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument. pattern: The serialized string regex pattern this pipeline channel created from. """ + @abc.abstractmethod def __init__( self, name: str, channel_type: Union[str, Dict], - op_name: Optional[str] = None, + task_name: Optional[str] = None, ): - """Inits a PipelineChannel instance. + """Initializes a PipelineChannel instance. Args: name: The name of the pipeline channel. channel_type: The type of the pipeline channel. - op_name: Optional; The name of the operation that produces the - pipeline channel. + task_name: Optional; The name of the task that produces the pipeline + channel. Raises: - ValueError: If name or op_name contains invalid characters. - ValueError: If both op_name and value are set. + ValueError: If name or task_name contains invalid characters. + ValueError: If both task_name and value are set. """ valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$' if not re.match(valid_name_regex, name): @@ -85,12 +96,12 @@ def __init__( # ensure value is None even if empty string or empty list/dict # so that serialization and unserialization remain consistent # (i.e. None => '' => None) - self.op_name = op_name or None + self.task_name = task_name or None @property def full_name(self) -> str: """Unique name for the PipelineChannel.""" - return f'{self.op_name}-{self.name}' if self.op_name else self.name + return f'{self.task_name}-{self.name}' if self.task_name else self.name @property def pattern(self) -> str: @@ -104,15 +115,17 @@ def __str__(self) -> str: the PipelineChannel inline with other strings such as arguments. For example, we can support: ['echo %s' % param] as the container command and later a compiler can replace the - placeholder '{{pipeline_channel:op=%s;name=%s;type=%s}}' with + placeholder '{{pipeline_channel:task=%s;name=%s;type=%s}}' with its own parameter identifier. """ - op_name = self.op_name or '' + task_name = self.task_name or '' name = self.name channel_type = self.channel_type or '' if isinstance(channel_type, dict): channel_type = json.dumps(channel_type) - return f'{{{{channel:op={op_name};name={name};type={channel_type};}}}}' + return _PIPELINE_CHANNEL_PLACEHOLDER_TEMPLATE % ( + task_name, name, channel_type + ) def __repr__(self) -> str: """Representation of the PipelineChannel. @@ -120,7 +133,7 @@ def __repr__(self) -> str: We make repr return the placeholder string so that if someone uses str()-based serialization of complex objects containing `PipelineChannel`, it works properly. (e.g. str([1, 2, 3, - kfp.dsl.PipelineChannel("aaa"), 4, 5, 6,])) + kfp.v2.dsl.PipelineParameterChannel("aaa"), 4, 5, 6,])) """ return str(self) @@ -153,8 +166,8 @@ class PipelineParameterChannel(PipelineChannel): Attributes: name: The name of the pipeline channel. channel_type: The type of the pipeline channel. - op_name: The name of the operation that produces the pipeline channel. - None means it is not produced by any operator, so if None, either user + task_name: The name of the task that produces the pipeline channel. + None means it is not produced by any task, so if None, either user constructs it directly (for providing an immediate value), or it is a pipeline function argument. pattern: The serialized string regex pattern this pipeline channel created @@ -167,25 +180,25 @@ def __init__( self, name: str, channel_type: Union[str, Dict], - op_name: Optional[str] = None, + task_name: Optional[str] = None, value: Optional[type_utils.PARAMETER_TYPES] = None, ): - """Inits a PipelineArtifactChannel instance. + """Initializes a PipelineArtifactChannel instance. Args: name: The name of the pipeline channel. channel_type: The type of the pipeline channel. - op_name: Optional; The name of the operation that produces the - pipeline channel. + task_name: Optional; The name of the task that produces the pipeline + channel. value: Optional; The actual value of the pipeline channel. Raises: - ValueError: If name or op_name contains invalid characters. - ValueError: If both op_name and value are set. + ValueError: If name or task_name contains invalid characters. + ValueError: If both task_name and value are set. TypeError: If the channel type is not a parameter type. """ - if op_name and value: - raise ValueError('op_name and value cannot be both set.') + if task_name and value: + raise ValueError('task_name and value cannot be both set.') if not type_utils.is_parameter_type(channel_type): raise TypeError(f'{channel_type} is not a parameter type.') @@ -195,7 +208,7 @@ def __init__( super(PipelineParameterChannel, self).__init__( name=name, channel_type=channel_type, - op_name=op_name, + task_name=task_name, ) @@ -205,8 +218,8 @@ class PipelineArtifactChannel(PipelineChannel): Attributes: name: The name of the pipeline channel. channel_type: The type of the pipeline channel. - op_name: The name of the operation that produces the pipeline channel. - A pipeline artifact channel is always produced by some op. + task_name: The name of the task that produces the pipeline channel. + A pipeline artifact channel is always produced by some task. pattern: The serialized string regex pattern this pipeline channel created from. """ @@ -215,17 +228,17 @@ def __init__( self, name: str, channel_type: Union[str, Dict], - op_name: str, + task_name: str, ): - """Inits a PipelineArtifactChannel instance. + """Initializes a PipelineArtifactChannel instance. Args: name: The name of the pipeline channel. channel_type: The type of the pipeline channel. - op_name: The name of the operation that produces the pipeline channel. + task_name: The name of the task that produces the pipeline channel. Raises: - ValueError: If name or op_name contains invalid characters. + ValueError: If name or task_name contains invalid characters. TypeError: If the channel type is not an artifact type. """ if type_utils.is_parameter_type(channel_type): @@ -234,11 +247,13 @@ def __init__( super(PipelineArtifactChannel, self).__init__( name=name, channel_type=channel_type, - op_name=op_name, + task_name=task_name, ) -def extract_pipeline_channels(payload: str) -> List['PipelineChannel']: +def extract_pipeline_channels_from_string( + payload: str +) -> List[PipelineChannel]: """Extracts a list of PipelineChannel instances from the payload string. Note: this function removes all duplicate matches. @@ -249,13 +264,15 @@ def extract_pipeline_channels(payload: str) -> List['PipelineChannel']: Returns: A list of PipelineChannels found from the payload. """ - matches = re.findall( - r'{{channel:op=([\w\s_-]*);name=([\w\s_-]+);type=([\w\s{}":_-]*);}}', - payload - ) - deduped_channels = set() + matches = re.findall(_PIPELINE_CHANNEL_PLACEHOLDER_REGEX, payload) + unique_channels = set() for match in matches: - op_name, name, channel_type = match + task_name, name, channel_type = match + + # channel_type could be either a string (e.g. "Integer") or a dictionary + # (e.g.: {"custom_type": {"custom_property": "some_value"}}). + # Try loading it into dictionary, if failed, it means channel_type is a + # string. try: channel_type = json.loads(channel_type) except json.JSONDecodeError: @@ -263,24 +280,24 @@ def extract_pipeline_channels(payload: str) -> List['PipelineChannel']: if type_utils.is_parameter_type(channel_type): pipeline_channel = PipelineParameterChannel( - name=utils.sanitize_k8s_name(name), + name=utils.maybe_rename_for_k8s(name), channel_type=channel_type, - op_name=utils.sanitize_k8s_name(op_name), + task_name=utils.maybe_rename_for_k8s(task_name), ) else: pipeline_channel = PipelineArtifactChannel( - name=utils.sanitize_k8s_name(name), + name=utils.maybe_rename_for_k8s(name), channel_type=channel_type, - op_name=utils.sanitize_k8s_name(op_name), + task_name=utils.maybe_rename_for_k8s(task_name), ) - deduped_channels.add(pipeline_channel) + unique_channels.add(pipeline_channel) - return list(deduped_channels) + return list(unique_channels) def extract_pipeline_channels_from_any( - payload: Union['PipelineChannel', str, list, tuple, dict] -) -> List['PipelineChannel']: + payload: Union[PipelineChannel, str, list, tuple, dict] +) -> List[PipelineChannel]: """Recursively extract PipelineChannels from any object or list of objects. Args: @@ -293,22 +310,18 @@ def extract_pipeline_channels_from_any( if not payload: return [] - # PipelineChannel if isinstance(payload, PipelineChannel): return [payload] - # str if isinstance(payload, str): - return list(set(extract_pipeline_channels(payload))) + return list(set(extract_pipeline_channels_from_string(payload))) - # list or tuple if isinstance(payload, list) or isinstance(payload, tuple): pipeline_channels = [] for item in payload: pipeline_channels += extract_pipeline_channels_from_any(item) return list(set(pipeline_channels)) - # dict if isinstance(payload, dict): pipeline_channels = [] for key, value in payload.items(): diff --git a/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py b/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py index aab66e38c9f..ae4baccc050 100644 --- a/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py +++ b/sdk/python/kfp/v2/components/experimental/pipeline_channel_test.py @@ -21,6 +21,14 @@ class PipelineChannelTest(parameterized.TestCase): + def test_instantiate_pipline_channel(self): + with self.assertRaisesRegex( + TypeError, "Can't instantiate abstract class PipelineChannel"): + p = pipeline_channel.PipelineChannel( + name='channel', + channel_type='String', + ) + def test_invalid_name(self): with self.assertRaisesRegex( ValueError, @@ -31,13 +39,13 @@ def test_invalid_name(self): channel_type='String', ) - def test_op_name_and_value_both_set(self): + def test_task_name_and_value_both_set(self): with self.assertRaisesRegex(ValueError, - 'op_name and value cannot be both set.'): + 'task_name and value cannot be both set.'): p = pipeline_channel.PipelineParameterChannel( name='abc', channel_type='Integer', - op_name='op1', + task_name='task1', value=123, ) @@ -54,7 +62,7 @@ def test_invalid_type(self): p = pipeline_channel.PipelineArtifactChannel( name='channel1', channel_type='String', - op_name='op1', + task_name='task1', ) @parameterized.parameters( @@ -62,11 +70,11 @@ def test_invalid_type(self): 'pipeline_channel': pipeline_channel.PipelineParameterChannel( name='channel1', - op_name='op1', + task_name='task1', channel_type='String', ), 'str_repr': - '{{channel:op=op1;name=channel1;type=String;}}', + '{{channel:task=task1;name=channel1;type=String;}}', }, { 'pipeline_channel': @@ -75,7 +83,7 @@ def test_invalid_type(self): channel_type='Integer', ), 'str_repr': - '{{channel:op=;name=channel2;type=Integer;}}', + '{{channel:task=;name=channel2;type=Integer;}}', }, { 'pipeline_channel': @@ -84,10 +92,10 @@ def test_invalid_type(self): channel_type={'type_a': { 'property_b': 'c' }}, - op_name='op3', + task_name='task3', ), 'str_repr': - '{{channel:op=op3;name=channel3;type={"type_a": {"property_b": "c"}};}}', + '{{channel:task=task3;name=channel3;type={"type_a": {"property_b": "c"}};}}', }, { 'pipeline_channel': @@ -97,22 +105,23 @@ def test_invalid_type(self): value=1.23, ), 'str_repr': - '{{channel:op=;name=channel4;type=Float;}}', + '{{channel:task=;name=channel4;type=Float;}}', }, { 'pipeline_channel': - pipeline_channel.PipelineChannel( + pipeline_channel.PipelineArtifactChannel( name='channel5', channel_type='Artifact', + task_name='task5', ), 'str_repr': - '{{channel:op=;name=channel5;type=Artifact;}}', + '{{channel:task=task5;name=channel5;type=Artifact;}}', }, ) def test_str_repr(self, pipeline_channel, str_repr): self.assertEqual(str_repr, str(pipeline_channel)) - def test_extract_pipelineparam(self): + def test_extract_pipeline_channels(self): p1 = pipeline_channel.PipelineParameterChannel( name='channel1', channel_type='String', @@ -121,18 +130,18 @@ def test_extract_pipelineparam(self): p2 = pipeline_channel.PipelineArtifactChannel( name='channel2', channel_type='customized_type_b', - op_name='op2', + task_name='task2', ) p3 = pipeline_channel.PipelineArtifactChannel( name='channel3', channel_type={'customized_type_c': { 'property_c': 'value_c' }}, - op_name='op3', + task_name='task3', ) stuff_chars = ' between ' payload = str(p1) + stuff_chars + str(p2) + stuff_chars + str(p3) - params = pipeline_channel.extract_pipeline_channels(payload) + params = pipeline_channel.extract_pipeline_channels_from_string(payload) self.assertListEqual([p1, p2, p3], params) # Expecting the extract_pipelineparam_from_any to dedup pipeline channels diff --git a/sdk/python/kfp/v2/components/utils.py b/sdk/python/kfp/v2/components/utils.py index 36750c88354..86a7e35a244 100644 --- a/sdk/python/kfp/v2/components/utils.py +++ b/sdk/python/kfp/v2/components/utils.py @@ -16,11 +16,11 @@ import re -def sanitize_k8s_name(name: str) -> str: - """Cleans and converts a name to be k8s compatible +def maybe_rename_for_k8s(name: str) -> str: + """Cleans and converts a name to be k8s compatible. Args: - name: original name, + name: The original name. Returns: A sanitized name. diff --git a/sdk/python/kfp/v2/components/utils_test.py b/sdk/python/kfp/v2/components/utils_test.py new file mode 100644 index 00000000000..1f52ac5b186 --- /dev/null +++ b/sdk/python/kfp/v2/components/utils_test.py @@ -0,0 +1,43 @@ +# Copyright 2020 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for kfp.v2.components.utils.""" + +import unittest + +from absl.testing import parameterized +from kfp.v2.components import utils + + +class UtilsTest(parameterized.TestCase): + + @parameterized.parameters( + { + 'original': 'name', + 'expected': 'name', + }, + { + 'original': ' Some name', + 'expected': 'some-name', + }, + { + 'original': 'name_123*', + 'expected': 'name-123', + }, + ) + def test_maybe_rename_for_k8s(self, original, expected): + self.assertEqual(utils.maybe_rename_for_k8s(original), expected) + + +if __name__ == '__main__': + unittest.main()