Skip to content

Commit

Permalink
chore(sdk.v2): Implement PipelineTask creation (#6713)
Browse files Browse the repository at this point in the history
* Implement PipelineTask __init__()

* add release note
  • Loading branch information
chensun authored Oct 11, 2021
1 parent bcb0d96 commit d311463
Show file tree
Hide file tree
Showing 13 changed files with 854 additions and 144 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Fix function-based components not preserving the namespace of GCPC artifact types. [\#6702](https://github.com/kubeflow/pipelines/pull/6702)
* Fix `dsl.` prefix in component I/O type annotation breaking component at runtime. [\#6714](https://github.com/kubeflow/pipelines/pull/6714)
* Update v2 yaml format [\#6661](https://github.com/kubeflow/pipelines/pull/6661)
* Implement v2 PipelineTask [\#6713](https://github.com/kubeflow/pipelines/pull/6713)
* Depends on `typing-extensions>=3.7.4,<4; python_version<"3.9"` [\#6683](https://github.com/kubeflow/pipelines/pull/6683)
* Depends on `click>=7.1.2,<9` [\#6691](https://github.com/kubeflow/pipelines/pull/6691)
* Depends on `cloudpickle>=2.0.0,<3` [\#6703](https://github.com/kubeflow/pipelines/pull/6703)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from kfp.v2.components.experimental import base_component
from kfp.v2.components.experimental import component_spec
from kfp.v2.components.experimental import structures
from kfp.v2 import dsl
from kfp.v2 import compiler

Expand All @@ -25,21 +25,21 @@ def execute(self, *args, **kwargs):


component_op = TestComponent(
component_spec=component_spec.ComponentSpec(
component_spec=structures.ComponentSpec(
name='component_1',
implementation=component_spec.Implementation(
container=component_spec.ContainerSpec(
image='alpine',
commands=[
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
component_spec.InputValuePlaceholder(input_name='input1'),
component_spec.OutputPathPlaceholder(output_name='output1'),
],
)),
inputs={'input1': component_spec.InputSpec(type='String')},
outputs={'output1': component_spec.OutputSpec(type='String')},
implementation=structures.Implementation(
container=structures.ContainerSpec(
image='alpine',
commands=[
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
structures.InputValuePlaceholder(input_name='input1'),
structures.OutputPathPlaceholder(output_name='output1'),
],
)),
inputs={'input1': structures.InputSpec(type='String')},
outputs={'output1': structures.OutputSpec(type='String')},
))


Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/v2/components/experimental/base_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import abc

from kfp.v2.components.experimental import component_spec as cspec
from kfp.v2.components.experimental import structures
from kfp.v2.components.experimental import pipeline_task


Expand All @@ -27,7 +27,7 @@ class BaseComponent(metaclass=abc.ABCMeta):
component_spec: The component definition.
"""

def __init__(self, component_spec: cspec.ComponentSpec):
def __init__(self, component_spec: structures.ComponentSpec):
"""Init function for BaseComponent.
Args:
Expand Down
22 changes: 11 additions & 11 deletions sdk/python/kfp/v2/components/experimental/base_component_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from unittest.mock import patch

from kfp.v2.components.experimental import base_component
from kfp.v2.components.experimental import component_spec
from kfp.v2.components.experimental import structures
from kfp.v2.components.experimental import pipeline_task


Expand All @@ -28,27 +28,27 @@ def execute(self, *args, **kwargs):


component_op = TestComponent(
component_spec=component_spec.ComponentSpec(
component_spec=structures.ComponentSpec(
name='component_1',
implementation=component_spec.ContainerSpec(
implementation=structures.ContainerSpec(
image='alpine',
commands=[
'sh',
'-c',
'set -ex\necho "$0" "$1" "$2" > "$3"',
component_spec.InputValuePlaceholder(input_name='input1'),
component_spec.InputValuePlaceholder(input_name='input2'),
component_spec.InputValuePlaceholder(input_name='input3'),
component_spec.OutputPathPlaceholder(output_name='output1'),
structures.InputValuePlaceholder(input_name='input1'),
structures.InputValuePlaceholder(input_name='input2'),
structures.InputValuePlaceholder(input_name='input3'),
structures.OutputPathPlaceholder(output_name='output1'),
],
),
inputs={
'input1': component_spec.InputSpec(type='String'),
'input2': component_spec.InputSpec(type='Integer'),
'input3': component_spec.InputSpec(type='Float', default=3.14),
'input1': structures.InputSpec(type='String'),
'input2': structures.InputSpec(type='Integer'),
'input3': structures.InputSpec(type='Float', default=3.14),
},
outputs={
'output1': component_spec.OutputSpec(name='output1', type='String'),
'output1': structures.OutputSpec(name='output1', type='String'),
},
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import unittest

from absl.testing import parameterized
from kfp.v2.components.experimental import component_spec, pipeline_channel
from kfp.v2.components.experimental import pipeline_channel


class PipelineChannelTest(parameterized.TestCase):
Expand Down
248 changes: 239 additions & 9 deletions sdk/python/kfp/v2/components/experimental/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
# limitations under the License.
"""Pipeline task class and operations."""

from typing import Any, Mapping, Optional
import collections
import copy
from typing import Any, List, Mapping, Optional, Union

from kfp.dsl import _component_bridge
from kfp.v2.components.experimental import component_spec as cspec
from kfp.v2.components.experimental import pipeline_channel
from kfp.v2.components.experimental import placeholders
from kfp.v2.components.experimental import structures
from kfp.v2.components.types import type_utils


# TODO(chensun): return PipelineTask object instead of ContainerOp object.
def create_pipeline_task(
component_spec: cspec.ComponentSpec,
component_spec: structures.ComponentSpec,
arguments: Mapping[str, Any],
) -> "ContainerOp": # pytype: disable=name-error
return _component_bridge._create_container_op_from_component_and_arguments(
Expand All @@ -37,15 +42,240 @@ class PipelineTask:
`.after()`, `.set_memory_limit()`, `enable_caching()`, etc.
Attributes:
task_spec:
component_spec:
executor_spec:
task_spec: The task spec of the task.
component_spec: The component spec of the task.
container_spec: The resolved container spec of the task.
"""

def __init__(
self,
component_spec: cspec.ComponentSpec,
component_spec: structures.ComponentSpec,
arguments: Mapping[str, Any],
):
# TODO(chensun): move logic from _component_bridge over here.
raise NotImplementedError
"""Initilizes a PipelineTask instance.
Args:
component_spec: The component definition.
arguments: The dictionary of component arguments.
"""
for input_name, argument_value in arguments.items():

if input_name not in component_spec.inputs:
raise ValueError(
f'Component "{component_spec.name}" got an unexpected input:'
f' {input_name}.')

input_type = component_spec.inputs[input_name].type
argument_type = None

if isinstance(argument_value, pipeline_channel.PipelineChannel):
argument_type = argument_value.channel_type
elif isinstance(argument_value, str):
argument_type = 'String'
elif isinstance(argument_value, int):
argument_type = 'Integer'
elif isinstance(argument_value, float):
argument_type = 'Float'
elif isinstance(argument_value, bool):
argument_type = 'Boolean'
elif isinstance(argument_value, dict):
argument_type = 'Dict'
elif isinstance(argument_value, list):
argument_type = 'List'
else:
raise ValueError(
'Input argument supports only the following types: '
'str, int, float, bool, dict, and list. Got: '
f'"{argument_value}" of type "{type(argument_value)}".')

type_utils.verify_type_compatibility(
given_type=argument_type,
expected_type=input_type,
error_message_prefix=(
'Incompatible argument passed to the input '
f'"{input_name}" of component "{component_spec.name}": '),
)

self.component_spec = component_spec

self.task_spec = structures.TaskSpec(
# The name of the task is subject to change due to component reuse.
name=component_spec.name,
inputs={
input_name: value for input_name, value in arguments.items()
},
dependent_tasks=[],
component_ref=component_spec.name,
enable_caching=True,
)

self.container_spec = self._resolve_command_line_and_arguments(
component_spec=component_spec,
arguments=arguments,
)

def _resolve_command_line_and_arguments(
self,
component_spec: structures.ComponentSpec,
arguments: Mapping[str, str],
) -> structures.ContainerSpec:
"""Resolves the command line argument placeholders in a container spec.
Args:
component_spec: The component definition.
arguments: The dictionary of component arguments.
"""
argument_values = arguments

if not component_spec.implementation.container:
raise TypeError(
'Only container components have command line to resolve')

component_inputs = component_spec.inputs or {}
inputs_dict = {
input_name: input_spec
for input_name, input_spec in component_inputs.items()
}
component_outputs = component_spec.outputs or {}
outputs_dict = {
output_name: output_spec
for output_name, output_spec in component_outputs.items()
}

def expand_command_part(arg) -> Union[str, List[str], None]:
if arg is None:
return None

if isinstance(arg, (str, int, float, bool)):
return str(arg)

elif isinstance(arg, (dict, list)):
return json.dumps(arg)

elif isinstance(arg, structures.InputValuePlaceholder):
input_name = arg.input_name
if not type_utils.is_parameter_type(
inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputValuePlaceholder.')

if input_name in arguments:
return placeholders.input_parameter_placeholder(input_name)
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')

elif isinstance(arg, structures.InputUriPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputUriPlaceholder.')

if input_name in arguments:
input_uri = placeholders.input_artifact_uri_placeholder(
input_name)
return input_uri
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')

elif isinstance(arg, structures.InputPathPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputPathPlaceholder.')

if input_name in arguments:
input_path = placeholders.input_artifact_path_placeholder(
input_name)
return input_path
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')

elif isinstance(arg, structures.OutputUriPlaceholder):
output_name = arg.output_name
if type_utils.is_parameter_type(outputs_dict[output_name].type):
raise TypeError(
f'Onput "{output_name}" with type '
f'"{outputs_dict[output_name].type}" cannot be paired with '
'OutputUriPlaceholder.')

output_uri = placeholders.output_artifact_uri_placeholder(
output_name)
return output_uri

elif isinstance(arg, structures.OutputPathPlaceholder):
output_name = arg.output_name

if type_utils.is_parameter_type(outputs_dict[output_name].type):
output_path = placeholders.output_parameter_path_placeholder(
output_name)
else:
output_path = placeholders.output_artifact_path_placeholder(
output_name)
return output_path

elif isinstance(arg, structures.ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)

elif isinstance(arg, structures.IfPresentPlaceholder):
if arg.if_structure.input_name in argument_values:
result_node = arg.if_structure.then
else:
result_node = arg.if_structure.otherwise

if result_node is None:
return []

if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result

else:
raise TypeError('Unrecognized argument type: {}'.format(arg))

def expand_argument_list(argument_list) -> Optional[List[str]]:
if argument_list is None:
return None

expanded_list = []
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list

container_spec = component_spec.implementation.container
resolved_container_spec = copy.deepcopy(container_spec)

resolved_container_spec.commands = expand_argument_list(
container_spec.commands)
resolved_container_spec.arguments = expand_argument_list(
container_spec.arguments)

return resolved_container_spec
Loading

0 comments on commit d311463

Please sign in to comment.