Skip to content

Commit

Permalink
Implement PipelineTask __init__()
Browse files Browse the repository at this point in the history
  • Loading branch information
chensun committed Oct 11, 2021
1 parent 5892bf9 commit 410afb5
Show file tree
Hide file tree
Showing 12 changed files with 853 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from kfp.v2.components.experimental import base_component
from kfp.v2.components.experimental import component_spec
from kfp.v2.components.experimental import structures
from kfp.v2 import dsl
from kfp.v2 import compiler

Expand All @@ -25,21 +25,21 @@ def execute(self, *args, **kwargs):


component_op = TestComponent(
component_spec=component_spec.ComponentSpec(
component_spec=structures.ComponentSpec(
name='component_1',
implementation=component_spec.Implementation(
container=component_spec.ContainerSpec(
image='alpine',
commands=[
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
component_spec.InputValuePlaceholder(input_name='input1'),
component_spec.OutputPathPlaceholder(output_name='output1'),
],
)),
inputs={'input1': component_spec.InputSpec(type='String')},
outputs={'output1': component_spec.OutputSpec(type='String')},
implementation=structures.Implementation(
container=structures.ContainerSpec(
image='alpine',
commands=[
'sh',
'-c',
'set -ex\necho "$0" > "$1"',
structures.InputValuePlaceholder(input_name='input1'),
structures.OutputPathPlaceholder(output_name='output1'),
],
)),
inputs={'input1': structures.InputSpec(type='String')},
outputs={'output1': structures.OutputSpec(type='String')},
))


Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/v2/components/experimental/base_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import abc

from kfp.v2.components.experimental import component_spec as cspec
from kfp.v2.components.experimental import structures
from kfp.v2.components.experimental import pipeline_task


Expand All @@ -27,7 +27,7 @@ class BaseComponent(metaclass=abc.ABCMeta):
component_spec: The component definition.
"""

def __init__(self, component_spec: cspec.ComponentSpec):
def __init__(self, component_spec: structures.ComponentSpec):
"""Init function for BaseComponent.
Args:
Expand Down
22 changes: 11 additions & 11 deletions sdk/python/kfp/v2/components/experimental/base_component_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from unittest.mock import patch

from kfp.v2.components.experimental import base_component
from kfp.v2.components.experimental import component_spec
from kfp.v2.components.experimental import structures
from kfp.v2.components.experimental import pipeline_task


Expand All @@ -28,27 +28,27 @@ def execute(self, *args, **kwargs):


component_op = TestComponent(
component_spec=component_spec.ComponentSpec(
component_spec=structures.ComponentSpec(
name='component_1',
implementation=component_spec.ContainerSpec(
implementation=structures.ContainerSpec(
image='alpine',
commands=[
'sh',
'-c',
'set -ex\necho "$0" "$1" "$2" > "$3"',
component_spec.InputValuePlaceholder(input_name='input1'),
component_spec.InputValuePlaceholder(input_name='input2'),
component_spec.InputValuePlaceholder(input_name='input3'),
component_spec.OutputPathPlaceholder(output_name='output1'),
structures.InputValuePlaceholder(input_name='input1'),
structures.InputValuePlaceholder(input_name='input2'),
structures.InputValuePlaceholder(input_name='input3'),
structures.OutputPathPlaceholder(output_name='output1'),
],
),
inputs={
'input1': component_spec.InputSpec(type='String'),
'input2': component_spec.InputSpec(type='Integer'),
'input3': component_spec.InputSpec(type='Float', default=3.14),
'input1': structures.InputSpec(type='String'),
'input2': structures.InputSpec(type='Integer'),
'input3': structures.InputSpec(type='Float', default=3.14),
},
outputs={
'output1': component_spec.OutputSpec(name='output1', type='String'),
'output1': structures.OutputSpec(name='output1', type='String'),
},
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import unittest

from absl.testing import parameterized
from kfp.v2.components.experimental import component_spec, pipeline_channel
from kfp.v2.components.experimental import pipeline_channel


class PipelineChannelTest(parameterized.TestCase):
Expand Down
248 changes: 239 additions & 9 deletions sdk/python/kfp/v2/components/experimental/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
# limitations under the License.
"""Pipeline task class and operations."""

from typing import Any, Mapping, Optional
import collections
import copy
from typing import Any, List, Mapping, Optional, Union

from kfp.dsl import _component_bridge
from kfp.v2.components.experimental import component_spec as cspec
from kfp.v2.components.experimental import pipeline_channel
from kfp.v2.components.experimental import placeholders
from kfp.v2.components.experimental import structures
from kfp.v2.components.types import type_utils


# TODO(chensun): return PipelineTask object instead of ContainerOp object.
def create_pipeline_task(
component_spec: cspec.ComponentSpec,
component_spec: structures.ComponentSpec,
arguments: Mapping[str, Any],
) -> "ContainerOp": # pytype: disable=name-error
return _component_bridge._create_container_op_from_component_and_arguments(
Expand All @@ -37,15 +42,240 @@ class PipelineTask:
`.after()`, `.set_memory_limit()`, `enable_caching()`, etc.
Attributes:
task_spec:
component_spec:
executor_spec:
task_spec: The task spec of the task.
component_spec: The component spec of the task.
container_spec: The resolved container spec of the task.
"""

def __init__(
self,
component_spec: cspec.ComponentSpec,
component_spec: structures.ComponentSpec,
arguments: Mapping[str, Any],
):
# TODO(chensun): move logic from _component_bridge over here.
raise NotImplementedError
"""Initilizes a PipelineTask instance.
Args:
component_spec: The component definition.
arguments: The dictionary of component arguments.
"""
for input_name, argument_value in arguments.items():

if input_name not in component_spec.inputs:
raise ValueError(
f'Component "{component_spec.name}" got an unexpected input:'
f' {input_name}.')

input_type = component_spec.inputs[input_name].type
argument_type = None

if isinstance(argument_value, pipeline_channel.PipelineChannel):
argument_type = argument_value.channel_type
elif isinstance(argument_value, str):
argument_type = 'String'
elif isinstance(argument_value, int):
argument_type = 'Integer'
elif isinstance(argument_value, float):
argument_type = 'Float'
elif isinstance(argument_value, bool):
argument_type = 'Boolean'
elif isinstance(argument_value, dict):
argument_type = 'Dict'
elif isinstance(argument_value, list):
argument_type = 'List'
else:
raise ValueError(
'Input argument supports only the following types: '
'str, int, float, bool, dict, and list. Got: '
f'"{argument_value}" of type "{type(argument_value)}".')

type_utils.verify_type_compatibility(
given_type=argument_type,
expected_type=input_type,
error_message_prefix=(
'Incompatible argument passed to the input '
f'"{input_name}" of component "{component_spec.name}": '),
)

self.component_spec = component_spec

self.task_spec = structures.TaskSpec(
# The name of the task is subject to change due to component reuse.
name=component_spec.name,
inputs={
input_name: value for input_name, value in arguments.items()
},
dependent_tasks=[],
component_ref=component_spec.name,
enable_caching=True,
)

self.container_spec = self._resolve_command_line_and_arguments(
component_spec=component_spec,
arguments=arguments,
)

def _resolve_command_line_and_arguments(
self,
component_spec: structures.ComponentSpec,
arguments: Mapping[str, str],
) -> structures.ContainerSpec:
"""Resolves the command line argument placeholders in a container spec.
Args:
component_spec: The component definition.
arguments: The dictionary of component arguments.
"""
argument_values = arguments

if not component_spec.implementation.container:
raise TypeError(
'Only container components have command line to resolve')

component_inputs = component_spec.inputs or {}
inputs_dict = {
input_name: input_spec
for input_name, input_spec in component_inputs.items()
}
component_outputs = component_spec.outputs or {}
outputs_dict = {
output_name: output_spec
for output_name, output_spec in component_outputs.items()
}

def expand_command_part(arg) -> Union[str, List[str], None]:
if arg is None:
return None

if isinstance(arg, (str, int, float, bool)):
return str(arg)

elif isinstance(arg, (dict, list)):
return json.dumps(arg)

elif isinstance(arg, structures.InputValuePlaceholder):
input_name = arg.input_name
if not type_utils.is_parameter_type(
inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputValuePlaceholder.')

if input_name in arguments:
return placeholders.input_parameter_placeholder(input_name)
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')

elif isinstance(arg, structures.InputUriPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputUriPlaceholder.')

if input_name in arguments:
input_uri = placeholders.input_artifact_uri_placeholder(
input_name)
return input_uri
else:
input_spec = inputs_dict[input_name]
if input_spec.default is not None:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')

elif isinstance(arg, structures.InputPathPlaceholder):
input_name = arg.input_name
if type_utils.is_parameter_type(inputs_dict[input_name].type):
raise TypeError(
f'Input "{input_name}" with type '
f'"{inputs_dict[input_name].type}" cannot be paired with '
'InputPathPlaceholder.')

if input_name in arguments:
input_path = placeholders.input_artifact_path_placeholder(
input_name)
return input_path
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
return None
else:
raise ValueError(
f'No value provided for input: {input_name}.')

elif isinstance(arg, structures.OutputUriPlaceholder):
output_name = arg.output_name
if type_utils.is_parameter_type(outputs_dict[output_name].type):
raise TypeError(
f'Onput "{output_name}" with type '
f'"{outputs_dict[output_name].type}" cannot be paired with '
'OutputUriPlaceholder.')

output_uri = placeholders.output_artifact_uri_placeholder(
output_name)
return output_uri

elif isinstance(arg, structures.OutputPathPlaceholder):
output_name = arg.output_name

if type_utils.is_parameter_type(outputs_dict[output_name].type):
output_path = placeholders.output_parameter_path_placeholder(
output_name)
else:
output_path = placeholders.output_artifact_path_placeholder(
output_name)
return output_path

elif isinstance(arg, structures.ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)

elif isinstance(arg, structures.IfPresentPlaceholder):
if arg.if_structure.input_name in argument_values:
result_node = arg.if_structure.then
else:
result_node = arg.if_structure.otherwise

if result_node is None:
return []

if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result

else:
raise TypeError('Unrecognized argument type: {}'.format(arg))

def expand_argument_list(argument_list) -> Optional[List[str]]:
if argument_list is None:
return None

expanded_list = []
for part in argument_list:
expanded_part = expand_command_part(part)
if expanded_part is not None:
if isinstance(expanded_part, list):
expanded_list.extend(expanded_part)
else:
expanded_list.append(str(expanded_part))
return expanded_list

container_spec = component_spec.implementation.container
resolved_container_spec = copy.deepcopy(container_spec)

resolved_container_spec.commands = expand_argument_list(
container_spec.commands)
resolved_container_spec.arguments = expand_argument_list(
container_spec.arguments)

return resolved_container_spec
Loading

0 comments on commit 410afb5

Please sign in to comment.