Skip to content

Commit

Permalink
address cr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chensun committed Sep 10, 2021
1 parent 4cf8e69 commit 1297433
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 77 deletions.
129 changes: 71 additions & 58 deletions sdk/python/kfp/v2/components/experimental/pipeline_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Definition of PipelineChannel."""

import abc
import dataclasses
import json
import re
Expand All @@ -28,15 +28,25 @@ class ConditionOperator:
Attributes:
operator: The operator of the condition.
operand1: The left operand.
operand2: The right operand.
left_operand: The left operand.
right_operand: The right operand.
"""
operator: str
operand1: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES]
operand2: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES]
left_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES]
right_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES]


# The string template used to generate the placeholder of a PipelineChannel.
_PIPELINE_CHANNEL_PLACEHOLDER_TEMPLATE = (
'{{channel:task=%s;name=%s;type=%s;}}'
)
# The regex for parsing PipelineChannel placeholders from a string.
_PIPELINE_CHANNEL_PLACEHOLDER_REGEX = (
r'{{channel:task=([\w\s_-]*);name=([\w\s_-]+);type=([\w\s{}":_-]*);}}'
)


class PipelineChannel:
class PipelineChannel(abc.ABC):
"""Represents a future value that is passed between pipeline components.
A PipelineChannel object can be used as a pipeline function argument so that
Expand All @@ -47,31 +57,32 @@ class PipelineChannel:
Attributes:
name: The name of the pipeline channel.
channel_type: The type of the pipeline channel.
op_name: The name of the operation that produces the pipeline channel.
None means it is not produced by any operator, so if None, either user
task_name: The name of the task that produces the pipeline channel.
None means it is not produced by any task, so if None, either user
constructs it directly (for providing an immediate value), or it is a
pipeline function argument.
pattern: The serialized string regex pattern this pipeline channel created
from.
"""

@abc.abstractmethod
def __init__(
self,
name: str,
channel_type: Union[str, Dict],
op_name: Optional[str] = None,
task_name: Optional[str] = None,
):
"""Inits a PipelineChannel instance.
"""Initializes a PipelineChannel instance.
Args:
name: The name of the pipeline channel.
channel_type: The type of the pipeline channel.
op_name: Optional; The name of the operation that produces the
pipeline channel.
task_name: Optional; The name of the task that produces the pipeline
channel.
Raises:
ValueError: If name or op_name contains invalid characters.
ValueError: If both op_name and value are set.
ValueError: If name or task_name contains invalid characters.
ValueError: If both task_name and value are set.
"""
valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
Expand All @@ -85,12 +96,12 @@ def __init__(
# ensure value is None even if empty string or empty list/dict
# so that serialization and unserialization remain consistent
# (i.e. None => '' => None)
self.op_name = op_name or None
self.task_name = task_name or None

@property
def full_name(self) -> str:
"""Unique name for the PipelineChannel."""
return f'{self.op_name}-{self.name}' if self.op_name else self.name
return f'{self.task_name}-{self.name}' if self.task_name else self.name

@property
def pattern(self) -> str:
Expand All @@ -104,23 +115,25 @@ def __str__(self) -> str:
the PipelineChannel inline with other strings such as arguments.
For example, we can support: ['echo %s' % param] as the
container command and later a compiler can replace the
placeholder '{{pipeline_channel:op=%s;name=%s;type=%s}}' with
placeholder '{{pipeline_channel:task=%s;name=%s;type=%s}}' with
its own parameter identifier.
"""
op_name = self.op_name or ''
task_name = self.task_name or ''
name = self.name
channel_type = self.channel_type or ''
if isinstance(channel_type, dict):
channel_type = json.dumps(channel_type)
return f'{{{{channel:op={op_name};name={name};type={channel_type};}}}}'
return _PIPELINE_CHANNEL_PLACEHOLDER_TEMPLATE % (
task_name, name, channel_type
)

def __repr__(self) -> str:
"""Representation of the PipelineChannel.
We make repr return the placeholder string so that if someone
uses str()-based serialization of complex objects containing
`PipelineChannel`, it works properly. (e.g. str([1, 2, 3,
kfp.dsl.PipelineChannel("aaa"), 4, 5, 6,]))
kfp.v2.dsl.PipelineParameterChannel("aaa"), 4, 5, 6,]))
"""
return str(self)

Expand Down Expand Up @@ -153,8 +166,8 @@ class PipelineParameterChannel(PipelineChannel):
Attributes:
name: The name of the pipeline channel.
channel_type: The type of the pipeline channel.
op_name: The name of the operation that produces the pipeline channel.
None means it is not produced by any operator, so if None, either user
task_name: The name of the task that produces the pipeline channel.
None means it is not produced by any task, so if None, either user
constructs it directly (for providing an immediate value), or it is a
pipeline function argument.
pattern: The serialized string regex pattern this pipeline channel created
Expand All @@ -167,25 +180,25 @@ def __init__(
self,
name: str,
channel_type: Union[str, Dict],
op_name: Optional[str] = None,
task_name: Optional[str] = None,
value: Optional[type_utils.PARAMETER_TYPES] = None,
):
"""Inits a PipelineArtifactChannel instance.
"""Initializes a PipelineArtifactChannel instance.
Args:
name: The name of the pipeline channel.
channel_type: The type of the pipeline channel.
op_name: Optional; The name of the operation that produces the
pipeline channel.
task_name: Optional; The name of the task that produces the pipeline
channel.
value: Optional; The actual value of the pipeline channel.
Raises:
ValueError: If name or op_name contains invalid characters.
ValueError: If both op_name and value are set.
ValueError: If name or task_name contains invalid characters.
ValueError: If both task_name and value are set.
TypeError: If the channel type is not a parameter type.
"""
if op_name and value:
raise ValueError('op_name and value cannot be both set.')
if task_name and value:
raise ValueError('task_name and value cannot be both set.')

if not type_utils.is_parameter_type(channel_type):
raise TypeError(f'{channel_type} is not a parameter type.')
Expand All @@ -195,7 +208,7 @@ def __init__(
super(PipelineParameterChannel, self).__init__(
name=name,
channel_type=channel_type,
op_name=op_name,
task_name=task_name,
)


Expand All @@ -205,8 +218,8 @@ class PipelineArtifactChannel(PipelineChannel):
Attributes:
name: The name of the pipeline channel.
channel_type: The type of the pipeline channel.
op_name: The name of the operation that produces the pipeline channel.
A pipeline artifact channel is always produced by some op.
task_name: The name of the task that produces the pipeline channel.
A pipeline artifact channel is always produced by some task.
pattern: The serialized string regex pattern this pipeline channel created
from.
"""
Expand All @@ -215,17 +228,17 @@ def __init__(
self,
name: str,
channel_type: Union[str, Dict],
op_name: str,
task_name: str,
):
"""Inits a PipelineArtifactChannel instance.
"""Initializes a PipelineArtifactChannel instance.
Args:
name: The name of the pipeline channel.
channel_type: The type of the pipeline channel.
op_name: The name of the operation that produces the pipeline channel.
task_name: The name of the task that produces the pipeline channel.
Raises:
ValueError: If name or op_name contains invalid characters.
ValueError: If name or task_name contains invalid characters.
TypeError: If the channel type is not an artifact type.
"""
if type_utils.is_parameter_type(channel_type):
Expand All @@ -234,11 +247,13 @@ def __init__(
super(PipelineArtifactChannel, self).__init__(
name=name,
channel_type=channel_type,
op_name=op_name,
task_name=task_name,
)


def extract_pipeline_channels(payload: str) -> List['PipelineChannel']:
def extract_pipeline_channels_from_string(
payload: str
) -> List[PipelineChannel]:
"""Extracts a list of PipelineChannel instances from the payload string.
Note: this function removes all duplicate matches.
Expand All @@ -249,38 +264,40 @@ def extract_pipeline_channels(payload: str) -> List['PipelineChannel']:
Returns:
A list of PipelineChannels found from the payload.
"""
matches = re.findall(
r'{{channel:op=([\w\s_-]*);name=([\w\s_-]+);type=([\w\s{}":_-]*);}}',
payload
)
deduped_channels = set()
matches = re.findall(_PIPELINE_CHANNEL_PLACEHOLDER_REGEX, payload)
unique_channels = set()
for match in matches:
op_name, name, channel_type = match
task_name, name, channel_type = match

# channel_type could be either a string (e.g. "Integer") or a dictionary
# (e.g.: {"custom_type": {"custom_property": "some_value"}}).
# Try loading it into dictionary, if failed, it means channel_type is a
# string.
try:
channel_type = json.loads(channel_type)
except json.JSONDecodeError:
pass

if type_utils.is_parameter_type(channel_type):
pipeline_channel = PipelineParameterChannel(
name=utils.sanitize_k8s_name(name),
name=utils.maybe_rename_for_k8s(name),
channel_type=channel_type,
op_name=utils.sanitize_k8s_name(op_name),
task_name=utils.maybe_rename_for_k8s(task_name),
)
else:
pipeline_channel = PipelineArtifactChannel(
name=utils.sanitize_k8s_name(name),
name=utils.maybe_rename_for_k8s(name),
channel_type=channel_type,
op_name=utils.sanitize_k8s_name(op_name),
task_name=utils.maybe_rename_for_k8s(task_name),
)
deduped_channels.add(pipeline_channel)
unique_channels.add(pipeline_channel)

return list(deduped_channels)
return list(unique_channels)


def extract_pipeline_channels_from_any(
payload: Union['PipelineChannel', str, list, tuple, dict]
) -> List['PipelineChannel']:
payload: Union[PipelineChannel, str, list, tuple, dict]
) -> List[PipelineChannel]:
"""Recursively extract PipelineChannels from any object or list of objects.
Args:
Expand All @@ -293,22 +310,18 @@ def extract_pipeline_channels_from_any(
if not payload:
return []

# PipelineChannel
if isinstance(payload, PipelineChannel):
return [payload]

# str
if isinstance(payload, str):
return list(set(extract_pipeline_channels(payload)))
return list(set(extract_pipeline_channels_from_string(payload)))

# list or tuple
if isinstance(payload, list) or isinstance(payload, tuple):
pipeline_channels = []
for item in payload:
pipeline_channels += extract_pipeline_channels_from_any(item)
return list(set(pipeline_channels))

# dict
if isinstance(payload, dict):
pipeline_channels = []
for key, value in payload.items():
Expand Down
Loading

0 comments on commit 1297433

Please sign in to comment.