Skip to content

Commit

Permalink
fix(sdk): fix nested placeholders and block illegal IfPresent form in…
Browse files Browse the repository at this point in the history
… Concat (kubeflow#8414)

* support single element IfPresentPlaceholders that contain a primitive placeholder

* block list then and else_ inside concat

* support single element IfPresent in v1

* clean up typing

* skip loading batch_predict_job yaml
  • Loading branch information
connor-mccarthy authored and jlyaoyuli committed Jan 5, 2023
1 parent b72f01b commit f2e628c
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 54 deletions.
3 changes: 3 additions & 0 deletions components/test_load_all_components.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ SKIP_COMPONENT_FILES = [
"./contrib/XGBoost/Cross_validation_for_regression/from_CSV/component.yaml",
"./contrib/XGBoost/Train_regression_and_calculate_metrics/from_CSV/component.yaml",
"./contrib/XGBoost/Train_and_cross-validate_regression/from_CSV/component.yaml",
# TODO: This component uses invalid placeholders. Updated when migrating GCPC to v2.
"./google-cloud/google_cloud_pipeline_components/aiplatform/batch_predict_job/component.yaml",
"./google-cloud/google_cloud_pipeline_components/v1/batch_predict_job/component.yaml"
]
for component_file in sys.stdin:
Expand Down
19 changes: 17 additions & 2 deletions sdk/python/kfp/components/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def my_pipeline():
def check_primitive_placeholder_is_used_for_correct_io_type(
inputs_dict: Dict[str, structures.InputSpec],
outputs_dict: Dict[str, structures.OutputSpec],
arg: Union[placeholders.Placeholder, Any],
arg: Union[placeholders.CommandLineElement, Any],
):
"""Validates input/output placeholders refer to an input/output with an
appropriate type for the placeholder. This should only apply to components
Expand Down Expand Up @@ -514,7 +514,22 @@ def check_primitive_placeholder_is_used_for_correct_io_type(
f'"{outputs_dict[output_name].type}" cannot be paired with '
f'{arg.__class__.__name__}.')
elif isinstance(arg, placeholders.IfPresentPlaceholder):
for arg in itertools.chain(arg.then or [], arg.else_ or []):
all_normalized_args: List[placeholders.CommandLineElement] = []
if arg.then is None:
pass
elif isinstance(arg.then, list):
all_normalized_args.extend(arg.then)
else:
all_normalized_args.append(arg.then)

if arg.else_ is None:
pass
elif isinstance(arg.else_, list):
all_normalized_args.extend(arg.else_)
else:
all_normalized_args.append(arg.else_)

for arg in all_normalized_args:
check_primitive_placeholder_is_used_for_correct_io_type(
inputs_dict, outputs_dict, arg)
elif isinstance(arg, placeholders.ConcatPlaceholder):
Expand Down
70 changes: 56 additions & 14 deletions sdk/python/kfp/components/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def container_with_concat_placeholder(text1: str, text2: Output[Dataset],
"""

def __init__(self, items: List['CommandLineElement']) -> None:
for item in items:
if isinstance(item, IfPresentPlaceholder):
item._validate_then_and_else_are_only_single_element()
self.items = items

def _to_dict(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -197,6 +200,36 @@ def __init__(
self.then = then
self.else_ = else_

def _validate_then_and_else_are_only_single_element(self) -> None:
"""Rercursively validate that then and else contain only a single
element.
This method should only be called by a ConcatPlaceholder, which
cannot have an IfPresentPlaceholder with a list in either 'then'
or 'else_'.
"""

# the illegal state
if isinstance(self.then, list) or isinstance(self.else_, list):
raise ValueError(
f'Cannot use {IfPresentPlaceholder.__name__} within {ConcatPlaceholder.__name__} when `then` and `else_` arguments to {IfPresentPlaceholder.__name__} are lists. Please use a single element for `then` and `else_` only.'
)

# check that there is no illegal state found recursively
if isinstance(self.then, ConcatPlaceholder):
for item in self.then.items:
if isinstance(item, IfPresentPlaceholder):
item._validate_then_and_else_are_only_single_element()
elif isinstance(self.then, IfPresentPlaceholder):
self.then._validate_then_and_else_are_only_single_element()

if isinstance(self.else_, ConcatPlaceholder):
for item in self.else_.items:
if isinstance(item, IfPresentPlaceholder):
item._validate_then_and_else_are_only_single_element()
elif isinstance(self.else_, IfPresentPlaceholder):
self.else_._validate_then_and_else_are_only_single_element()

def _to_dict(self) -> Dict[str, Any]:
struct = {
'IfPresent': {
Expand Down Expand Up @@ -231,11 +264,7 @@ def _to_string(self) -> str:
OutputPathPlaceholder, OutputUriPlaceholder,
OutputMetadataPlaceholder)

CommandLineElement = Union[str, IfPresentPlaceholder, ConcatPlaceholder,
InputValuePlaceholder, InputPathPlaceholder,
InputUriPlaceholder, InputMetadataPlaceholder,
OutputParameterPlaceholder, OutputPathPlaceholder,
OutputUriPlaceholder, OutputMetadataPlaceholder]
CommandLineElement = Union[str, Placeholder]


def convert_command_line_element_to_string(
Expand Down Expand Up @@ -330,18 +359,31 @@ def maybe_convert_v1_yaml_placeholder_to_v2_placeholder(
elif 'if' in arg:
if_ = arg['if']
input_name = utils.sanitize_input_name(if_['cond']['isPresent'])
then_ = if_['then']
else_ = if_.get('else', [])
return IfPresentPlaceholder(
input_name=input_name,
then=[
then = if_['then']
else_ = if_.get('else')

if isinstance(then, list):
then = [
maybe_convert_v1_yaml_placeholder_to_v2_placeholder(
val, component_dict=component_dict) for val in then_
],
else_=[
val, component_dict=component_dict) for val in then
]
else:
then = maybe_convert_v1_yaml_placeholder_to_v2_placeholder(
then, component_dict=component_dict)

if else_ is None:
pass
elif isinstance(else_, list):
else_ = [
maybe_convert_v1_yaml_placeholder_to_v2_placeholder(
val, component_dict=component_dict) for val in else_
])
]
else:
maybe_convert_v1_yaml_placeholder_to_v2_placeholder(
else_, component_dict=component_dict)

return IfPresentPlaceholder(
input_name=input_name, then=then, else_=else_)

elif 'concat' in arg:

Expand Down
171 changes: 158 additions & 13 deletions sdk/python/kfp/components/placeholders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains tests for kfp.components.placeholders."""
import os
import tempfile
from typing import Any

from absl.testing import parameterized
from kfp import compiler
from kfp import dsl
from kfp.components import placeholders


Expand Down Expand Up @@ -89,7 +93,7 @@ def test(self):
"{{$.outputs.artifacts['output1'].metadata}}")


class TestIfPresentPlaceholderStructure(parameterized.TestCase):
class TestIfPresentPlaceholder(parameterized.TestCase):

@parameterized.parameters([
(placeholders.IfPresentPlaceholder(
Expand Down Expand Up @@ -139,6 +143,56 @@ def test_with_primitive_placeholders(
placeholder: str):
self.assertEqual(placeholder_obj._to_string(), placeholder)

def test_if_present_with_single_element_simple_can_be_compiled(self):

@dsl.container_component
def container_component(a: str):
return dsl.ContainerSpec(
image='alpine',
command=[
placeholders.IfPresentPlaceholder(
input_name='a', then='b', else_='c')
])

with tempfile.TemporaryDirectory() as tempdir:
output_yaml = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_component, package_path=output_yaml)

def test_if_present_with_single_element_parameter_reference_can_be_compiled(
self):

@dsl.container_component
def container_component(a: str):
return dsl.ContainerSpec(
image='alpine',
command=[
placeholders.IfPresentPlaceholder(
input_name='a', then=a, else_='c')
])

with tempfile.TemporaryDirectory() as tempdir:
output_yaml = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_component, package_path=output_yaml)

def test_if_present_with_single_element_artifact_reference_can_be_compiled(
self):

@dsl.container_component
def container_component(a: dsl.Input[dsl.Artifact]):
return dsl.ContainerSpec(
image='alpine',
command=[
placeholders.IfPresentPlaceholder(
input_name='a', then=a.path, else_='c')
])

with tempfile.TemporaryDirectory() as tempdir:
output_yaml = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_component, package_path=output_yaml)


class TestConcatPlaceholder(parameterized.TestCase):

Expand Down Expand Up @@ -182,16 +236,6 @@ class TestContainerPlaceholdersTogether(parameterized.TestCase):
]),
"""{"Concat": ["a", {"Concat": ["b", {"Concat": ["c", "{{$.inputs.parameters['input2']}}"]}]}]}"""
),
(placeholders.ConcatPlaceholder([
'a',
placeholders.ConcatPlaceholder([
'b',
placeholders.IfPresentPlaceholder(
input_name='output1', then=['then'], else_=['something'])
])
]),
'{"Concat": ["a", {"Concat": ["b", {"IfPresent": {"InputName": "output1", "Then": ["then"], "Else": ["something"]}}]}]}'
),
(placeholders.ConcatPlaceholder([
'a',
placeholders.ConcatPlaceholder([
Expand Down Expand Up @@ -227,10 +271,111 @@ class TestContainerPlaceholdersTogether(parameterized.TestCase):
"""{"Concat": ["a", {"IfPresent": {"InputName": "output1", "Then": {"Concat": ["--", "flag"]}, "Else": "{{$.inputs.artifacts['input2'].path}}"}}, "c"]}"""
),
])
def test(self, placeholder_obj: placeholders.IfPresentPlaceholder,
placeholder: str):
def test_valid(self, placeholder_obj: placeholders.IfPresentPlaceholder,
placeholder: str):
self.assertEqual(placeholder_obj._to_string(), placeholder)

def test_only_single_element_ifpresent_inside_concat_outer(self):
with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
placeholders.ConcatPlaceholder([
'b',
placeholders.IfPresentPlaceholder(
input_name='output1', then=['then'], else_=['something'])
])

def test_only_single_element_ifpresent_inside_concat_recursive(self):
with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
placeholders.ConcatPlaceholder([
'a',
placeholders.ConcatPlaceholder([
'b',
placeholders.IfPresentPlaceholder(
input_name='output1',
then=['then'],
else_=['something'])
])
])

with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
placeholders.ConcatPlaceholder([
'a',
placeholders.ConcatPlaceholder([
'b',
placeholders.IfPresentPlaceholder(
input_name='output1',
then=placeholders.ConcatPlaceholder([
placeholders.IfPresentPlaceholder(
input_name='a', then=['b'])
]),
else_='something')
])
])

def test_only_single_element_in_nested_ifpresent_inside_concat(self):
with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
dsl.ConcatPlaceholder([
'my-prefix-',
dsl.IfPresentPlaceholder(
input_name='input1',
then=[
dsl.IfPresentPlaceholder(
input_name='input1',
then=dsl.ConcatPlaceholder(['infix-', 'value']))
])
])

def test_recursive_nested_placeholder_validation_does_not_exit_when_first_valid_then_is_found(
self):
with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
dsl.ConcatPlaceholder([
'my-prefix-',
dsl.IfPresentPlaceholder(
input_name='input1',
then=dsl.IfPresentPlaceholder(
input_name='input1',
then=[dsl.ConcatPlaceholder(['infix-', 'value'])]))
])

def test_only_single_element_in_nested_ifpresent_inside_concat_with_outer_ifpresent(
self):
with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
dsl.IfPresentPlaceholder(
input_name='input_1',
then=dsl.ConcatPlaceholder([
'my-prefix-',
dsl.IfPresentPlaceholder(
input_name='input1',
then=dsl.IfPresentPlaceholder(
input_name='input1',
then=[dsl.ConcatPlaceholder(['infix-', 'value'])]))
]))

def test_valid_then_but_invalid_else(self):
with self.assertRaisesRegex(
ValueError,
f'Please use a single element for `then` and `else_` only\.'):
dsl.ConcatPlaceholder([
'my-prefix-',
dsl.IfPresentPlaceholder(
input_name='input1',
then=dsl.IfPresentPlaceholder(
input_name='input1',
then='single-element',
else_=['one', 'two']))
])


class TestConvertCommandLineElementToStringOrStruct(parameterized.TestCase):

Expand Down
18 changes: 17 additions & 1 deletion sdk/python/kfp/components/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,23 @@ def check_placeholder_references_valid_io_name(
raise ValueError(
f'Argument "{arg}" references nonexistant input: "{arg.input_name}".'
)
for arg in itertools.chain(arg.then or [], arg.else_ or []):

all_normalized_args: List[placeholders.CommandLineElement] = []
if arg.then is None:
pass
elif isinstance(arg.then, list):
all_normalized_args.extend(arg.then)
else:
all_normalized_args.append(arg.then)

if arg.else_ is None:
pass
elif isinstance(arg.else_, list):
all_normalized_args.extend(arg.else_)
else:
all_normalized_args.append(arg.else_)

for arg in all_normalized_args:
check_placeholder_references_valid_io_name(inputs_dict,
outputs_dict, arg)
elif isinstance(arg, placeholders.ConcatPlaceholder):
Expand Down
Loading

0 comments on commit f2e628c

Please sign in to comment.