From c6b236d1a0a2385421e823512bd4c37041f1af26 Mon Sep 17 00:00:00 2001 From: Connor McCarthy Date: Mon, 11 Sep 2023 13:19:35 -0700 Subject: [PATCH] feat(sdk): support dsl.If, dsl.Elif, and dsl.Else (#9894) * support if/elif/else * deprecate dsl.Condition * alter rebase * update release notes * address review feedback * change BinaryOperation to ConditionOperation --- sdk/RELEASE.md | 1 + sdk/python/kfp/compiler/compiler_test.py | 326 +++++++ sdk/python/kfp/compiler/compiler_utils.py | 33 +- .../kfp/compiler/pipeline_spec_builder.py | 72 +- sdk/python/kfp/dsl/__init__.py | 6 + sdk/python/kfp/dsl/pipeline_channel.py | 19 +- sdk/python/kfp/dsl/pipeline_context.py | 6 + sdk/python/kfp/dsl/tasks_group.py | 181 +++- sdk/python/kfp/dsl/tasks_group_test.py | 24 +- .../test_data/pipelines/if_elif_else.py | 51 + .../test_data/pipelines/if_elif_else.yaml | 280 ++++++ .../pipelines/if_elif_else_complex.py | 86 ++ .../pipelines/if_elif_else_complex.yaml | 910 ++++++++++++++++++ sdk/python/test_data/pipelines/if_else.py | 42 + sdk/python/test_data/pipelines/if_else.yaml | 202 ++++ sdk/python/test_data/test_data_config.yaml | 9 + 16 files changed, 2183 insertions(+), 65 deletions(-) create mode 100644 sdk/python/test_data/pipelines/if_elif_else.py create mode 100644 sdk/python/test_data/pipelines/if_elif_else.yaml create mode 100644 sdk/python/test_data/pipelines/if_elif_else_complex.py create mode 100644 sdk/python/test_data/pipelines/if_elif_else_complex.yaml create mode 100644 sdk/python/test_data/pipelines/if_else.py create mode 100644 sdk/python/test_data/pipelines/if_else.yaml diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 03954015443..bc2effc7057 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -2,6 +2,7 @@ ## Features +* Add support for `dsl.If`, `dsl.Elif`, and `dsl.Else` control flow context managers; deprecate `dsl.Condition` in favor of `dsl.If` [\#9894](https://github.com/kubeflow/pipelines/pull/9894) ## Breaking changes diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 92b1f6a1b72..56407cd7524 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -40,6 +40,7 @@ from kfp.dsl import OutputPath from kfp.dsl import pipeline_task from kfp.dsl import PipelineTaskFinalStatus +from kfp.dsl import tasks_group from kfp.dsl import yaml_component from kfp.dsl.types import type_utils from kfp.pipeline_spec import pipeline_spec_pb2 @@ -4161,5 +4162,330 @@ def my_pipeline( 'Component output artifact.') +@dsl.component +def flip_coin() -> str: + import random + return 'heads' if random.randint(0, 1) == 0 else 'tails' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.component +def flip_three_sided_coin() -> str: + import random + val = random.randint(0, 2) + + if val == 0: + return 'heads' + elif val == 1: + return 'tails' + else: + return 'draw' + + +@dsl.component +def int_zero_through_three() -> int: + import random + return random.randint(0, 3) + + +class TestConditionLogic(unittest.TestCase): + + def test_if(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + + def test_if_else(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Else(): + print_and_return(text='Got tails!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads')" + ) + + def test_if_elif_else(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_three_sided_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Elif(flip_coin_task.output == 'tails'): + print_and_return(text='Got tails!') + with dsl.Else(): + print_and_return(text='Draw!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads') && inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'tails'" + ) + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-3'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads') && !(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'tails')" + ) + + def test_if_multiple_elif_else(self): + + @dsl.pipeline + def int_to_string(): + int_task = int_zero_through_three() + with dsl.If(int_task.output == 0): + print_and_return(text='Got zero!') + with dsl.Elif(int_task.output == 1): + print_and_return(text='Got one!') + with dsl.Elif(int_task.output == 2): + print_and_return(text='Got two!') + with dsl.Else(): + print_and_return(text='Got three!') + + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0" + ) + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1" + ) + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-3'] + .trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2" + ) + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-4'] + .trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2)" + ) + + def test_nested_if_elif_else_with_pipeline_param(self): + + @dsl.pipeline + def flip_coin_pipeline(confirm: bool): + int_task = int_zero_through_three() + heads_task = flip_coin() + + with dsl.If(heads_task.output == 'heads'): + with dsl.If(int_task.output == 0): + print_and_return(text='Got zero!') + + with dsl.Elif(int_task.output == 1): + task = print_and_return(text='Got one!') + with dsl.If(confirm == True): + print_and_return(text='Confirmed: definitely got one.') + + with dsl.Elif(int_task.output == 2): + print_and_return(text='Got two!') + + with dsl.Else(): + print_and_return(text='Got three!') + + # top level conditions + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + # second level nested conditions + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-2'].trigger_policy.condition, + "int(inputs.parameter_values[\'pipelinechannel--int-zero-through-three-Output\']) == 0" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-3'].trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-5'].trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-6'].trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2)" + ) + # third level nested conditions + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-3'].dag + .tasks['condition-4'].trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--confirm'] == true") + + def test_multiple_ifs_permitted(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.If(flip_coin_task.output == 'tails'): + print_and_return(text='Got tails!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'tails'" + ) + + def test_multiple_else_not_permitted(self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + r'Cannot use dsl\.Else following another dsl\.Else\. dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Else(): + print_and_return(text='Got tails!') + with dsl.Else(): + print_and_return(text='Got tails!') + + def test_else_no_if_not_supported(self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + r'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + with dsl.Else(): + print_and_return(text='Got unknown') + + def test_elif_no_if_not_supported(self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + r'dsl\.Elif can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.Elif(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + + def test_boolean_condition_has_helpful_error(self): + with self.assertRaisesRegex( + ValueError, + r'Got constant boolean True as a condition\. This is likely because the provided condition evaluated immediately\. At least one of the operands must be an output from an upstream task or a pipeline parameter\.' + ): + + @dsl.pipeline + def my_pipeline(): + with dsl.Condition('foo' == 'foo'): + print_and_return(text='I will always run.') + + def test_boolean_elif_has_helpful_error(self): + with self.assertRaisesRegex( + ValueError, + r'Got constant boolean False as a condition\. This is likely because the provided condition evaluated immediately\. At least one of the operands must be an output from an upstream task or a pipeline parameter\.' + ): + + @dsl.pipeline + def my_pipeline(text: str): + with dsl.If(text == 'foo'): + print_and_return(text='I will always run.') + with dsl.Elif('foo' == 'bar'): + print_and_return(text='I will never run.') + + def test_tasks_instantiated_between_if_else_and_elif_permitted(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads on coin one!') + + flip_coin_task_2 = flip_coin() + + with dsl.Elif(flip_coin_task_2.output == 'tails'): + print_and_return(text='Got heads on coin two!') + + flip_coin_task_3 = flip_coin() + + with dsl.Else(): + print_and_return( + text=f'Coin three result: {flip_coin_task_3.output}') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads') && inputs.parameter_values['pipelinechannel--flip-coin-2-Output'] == 'tails'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-3'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads') && !(inputs.parameter_values['pipelinechannel--flip-coin-2-Output'] == 'tails')" + ) + + def test_other_control_flow_instantiated_between_if_else_not_permitted( + self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + 'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.ParallelFor(['foo', 'bar']) as item: + print_and_return(text=item) + with dsl.Else(): + print_and_return(text='Got tails!') + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 79c4418bdff..ccc6730b1e3 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -119,7 +119,18 @@ def _get_parent_groups_helper( return (tasks_to_groups, groups_to_groups) -# TODO: do we really need this? +def get_channels_from_condition( + operations: List[pipeline_channel.ConditionOperation], + collected_channels: list, +) -> None: + """Appends to collected_channels each pipeline channels used in each + operand of each operation in operations.""" + for operation in operations: + for operand in [operation.left_operand, operation.right_operand]: + if isinstance(operand, pipeline_channel.PipelineChannel): + collected_channels.append(operand) + + def get_condition_channels_for_tasks( root_group: tasks_group.TasksGroup, ) -> Mapping[str, Set[pipeline_channel.PipelineChannel]]: @@ -139,16 +150,13 @@ def _get_condition_channels_for_tasks_helper( current_conditions_channels, ): new_current_conditions_channels = current_conditions_channels - if isinstance(group, tasks_group.Condition): + if isinstance(group, tasks_group._ConditionBase): new_current_conditions_channels = list(current_conditions_channels) - if isinstance(group.condition.left_operand, - pipeline_channel.PipelineChannel): - new_current_conditions_channels.append( - group.condition.left_operand) - if isinstance(group.condition.right_operand, - pipeline_channel.PipelineChannel): - new_current_conditions_channels.append( - group.condition.right_operand) + get_channels_from_condition( + group.conditions, + new_current_conditions_channels, + ) + for task in group.tasks: for channel in new_current_conditions_channels: conditions[task.name].add(channel) @@ -661,8 +669,9 @@ def get_dependencies( dependent_group = group_name_to_group.get( uncommon_upstream_groups[0], None) - if isinstance(dependent_group, - (tasks_group.Condition, tasks_group.ExitHandler)): + if isinstance( + dependent_group, + (tasks_group._ConditionBase, tasks_group.ExitHandler)): raise InvalidTopologyException( f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} A downstream task cannot depend on an upstream task within a dsl.{dependent_group.__class__.__name__} context unless the downstream is within that context too. Found task {task.name} which depends on upstream task {upstream_task.name} within an uncommon dsl.{dependent_group.__class__.__name__} context.' ) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index b276f892c1d..75be4eb647f 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -709,22 +709,38 @@ def _update_task_spec_for_loop_group( input_name=pipeline_task_spec.parameter_iterator.item_input) -def _resolve_condition_operands( - left_operand: Union[str, pipeline_channel.PipelineChannel], - right_operand: Union[str, pipeline_channel.PipelineChannel], -) -> Tuple[str, str]: - """Resolves values and PipelineChannels for condition operands. +def _binary_operations_to_cel_conjunctive( + operations: List[pipeline_channel.ConditionOperation]) -> str: + """Converts a list of ConditionOperation to a CEL string with placeholders. + Each ConditionOperation will be joined the others via the conjunctive (&&). Args: - left_operand: The left operand of a condition expression. - right_operand: The right operand of a condition expression. + operations: The binary operations to convert to convert and join. Returns: - A tuple of the resolved operands values: - (left_operand_value, right_operand_value). + The binary operations as a CEL string. """ + operands = [ + _single_binary_operation_to_cel_condition(operation=bin_op) + for bin_op in operations + ] + return ' && '.join(operands) - # Pre-scan the operand to get the type of constant value if there's any. + +def _single_binary_operation_to_cel_condition( + operation: pipeline_channel.ConditionOperation) -> str: + """Converts a ConditionOperation to a CEL string with placeholders. + + Args: + operation: The binary operation to convert to a string. + + Returns: + The binary operation as a CEL string. + """ + left_operand = operation.left_operand + right_operand = operation.right_operand + + # cannot make comparisons involving particular types for value_or_reference in [left_operand, right_operand]: if isinstance(value_or_reference, pipeline_channel.PipelineChannel): parameter_type = type_utils.get_parameter_type( @@ -738,8 +754,10 @@ def _resolve_condition_operands( input_name = compiler_utils.additional_input_name_for_pipeline_channel( value_or_reference) raise ValueError( - f'Conditional requires scalar parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.' + f'Conditional requires primitive parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.' ) + + # ensure the types compared are the same or compatible parameter_types = set() for value_or_reference in [left_operand, right_operand]: if isinstance(value_or_reference, pipeline_channel.PipelineChannel): @@ -822,11 +840,16 @@ def _resolve_condition_operands( operand_values.append(operand_value) - return tuple(operand_values) + left_operand_value, right_operand_value = tuple(operand_values) + + condition_string = ( + f'{left_operand_value} {operation.operator} {right_operand_value}') + + return f'!({condition_string})' if operation.negate else condition_string def _update_task_spec_for_condition_group( - group: tasks_group.Condition, + group: tasks_group._ConditionBase, pipeline_task_spec: pipeline_spec_pb2.PipelineTaskSpec, ) -> None: """Updates PipelineTaskSpec for condition group. @@ -835,15 +858,9 @@ def _update_task_spec_for_condition_group( group: The condition group to update task spec for. pipeline_task_spec: The pipeline task spec to update in place. """ - left_operand_value, right_operand_value = _resolve_condition_operands( - group.condition.left_operand, group.condition.right_operand) - - condition_string = ( - f'{left_operand_value} {group.condition.operator} {right_operand_value}' - ) + condition = _binary_operations_to_cel_conjunctive(group.conditions) pipeline_task_spec.trigger_policy.CopyFrom( - pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy( - condition=condition_string)) + pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy(condition=condition)) def build_task_spec_for_exit_task( @@ -954,7 +971,7 @@ def build_task_spec_for_group( group=group, pipeline_task_spec=pipeline_task_spec, ) - elif isinstance(group, tasks_group.Condition): + elif isinstance(group, tasks_group._ConditionBase): _update_task_spec_for_condition_group( group=group, pipeline_task_spec=pipeline_task_spec, @@ -1236,17 +1253,14 @@ def build_spec_by_group( _build_dag_outputs(subgroup_component_spec, subgroup_output_channels) - elif isinstance(subgroup, tasks_group.Condition): + elif isinstance(subgroup, tasks_group._ConditionBase): # "Punch the hole", adding inputs needed by its subgroups or # tasks. condition_subgroup_channels = list(subgroup_input_channels) - for operand in [ - subgroup.condition.left_operand, - subgroup.condition.right_operand, - ]: - if isinstance(operand, pipeline_channel.PipelineChannel): - condition_subgroup_channels.append(operand) + + compiler_utils.get_channels_from_condition( + subgroup.conditions, condition_subgroup_channels) subgroup_component_spec = build_component_spec_for_group( input_pipeline_channels=condition_subgroup_channels, diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index a23b640fdb5..001226b02cf 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -237,7 +237,10 @@ def my_pipeline(): from kfp.dsl.placeholders import IfPresentPlaceholder from kfp.dsl.structures import ContainerSpec from kfp.dsl.tasks_group import Condition + from kfp.dsl.tasks_group import Elif + from kfp.dsl.tasks_group import Else from kfp.dsl.tasks_group import ExitHandler + from kfp.dsl.tasks_group import If from kfp.dsl.tasks_group import ParallelFor __all__.extend([ 'component', @@ -246,6 +249,9 @@ def my_pipeline(): 'importer', 'ContainerSpec', 'Condition', + 'If', + 'Elif', + 'Else', 'ExitHandler', 'ParallelFor', 'Collected', diff --git a/sdk/python/kfp/dsl/pipeline_channel.py b/sdk/python/kfp/dsl/pipeline_channel.py index 66616103fb6..4841928bbf4 100644 --- a/sdk/python/kfp/dsl/pipeline_channel.py +++ b/sdk/python/kfp/dsl/pipeline_channel.py @@ -24,17 +24,20 @@ @dataclasses.dataclass -class ConditionOperator: - """Represents a condition expression to be used in dsl.Condition(). +class ConditionOperation: + """Represents a condition expression to be used in condition control flow + group. Attributes: operator: The operator of the condition. left_operand: The left operand. right_operand: The right operand. + negate: Whether to negate the result of the binary operation. """ operator: str left_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] right_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + negate: bool = False # The string template used to generate the placeholder of a PipelineChannel. @@ -149,22 +152,22 @@ def __hash__(self) -> int: return hash(self.pattern) def __eq__(self, other): - return ConditionOperator('==', self, other) + return ConditionOperation('==', self, other) def __ne__(self, other): - return ConditionOperator('!=', self, other) + return ConditionOperation('!=', self, other) def __lt__(self, other): - return ConditionOperator('<', self, other) + return ConditionOperation('<', self, other) def __le__(self, other): - return ConditionOperator('<=', self, other) + return ConditionOperation('<=', self, other) def __gt__(self, other): - return ConditionOperator('>', self, other) + return ConditionOperation('>', self, other) def __ge__(self, other): - return ConditionOperator('>=', self, other) + return ConditionOperation('>=', self, other) class PipelineParameterChannel(PipelineChannel): diff --git a/sdk/python/kfp/dsl/pipeline_context.py b/sdk/python/kfp/dsl/pipeline_context.py index c1304c39bac..72ada197ae5 100644 --- a/sdk/python/kfp/dsl/pipeline_context.py +++ b/sdk/python/kfp/dsl/pipeline_context.py @@ -189,6 +189,12 @@ def pop_tasks_group(self): """Removes the current TasksGroup from the stack.""" del self.groups[-1] + def get_last_tasks_group(self) -> Optional['tasks_group.TasksGroup']: + """Gets the last TasksGroup added to the pipeline at the current level + of the pipeline definition.""" + groups = self.groups[-1].groups + return groups[-1] if groups else None + def remove_task_from_groups(self, task: pipeline_task.PipelineTask): """Removes a task from the pipeline. diff --git a/sdk/python/kfp/dsl/tasks_group.py b/sdk/python/kfp/dsl/tasks_group.py index 42d1446a9d6..6bf6b63cc04 100644 --- a/sdk/python/kfp/dsl/tasks_group.py +++ b/sdk/python/kfp/dsl/tasks_group.py @@ -13,8 +13,10 @@ # limitations under the License. """Definition for TasksGroup.""" +import copy import enum -from typing import Optional, Union +from typing import List, Optional, Union +import warnings from kfp.dsl import for_loop from kfp.dsl import pipeline_channel @@ -52,7 +54,7 @@ def __init__( group_type: TasksGroupType, name: Optional[str] = None, is_root: bool = False, - ): + ) -> None: """Create a new instance of TasksGroup. Args: @@ -117,7 +119,7 @@ def __init__( self, exit_task: pipeline_task.PipelineTask, name: Optional[str] = None, - ): + ) -> None: """Initializes a Condition task group.""" super().__init__( group_type=TasksGroupType.EXIT_HANDLER, @@ -138,9 +140,31 @@ def __init__( self.exit_task = exit_task -class Condition(TasksGroup): - """A class for creating conditional control flow within a pipeline - definition. +class _ConditionBase(TasksGroup): + """Parent class for condition control flow context managers (Condition, If, + Elif, Else). + + Args: + condition: A list of binary operations to be combined via conjunction. + name: The name of the condition group. + """ + + def __init__( + self, + conditions: List[pipeline_channel.ConditionOperation], + name: Optional[str] = None, + ) -> None: + super().__init__( + group_type=TasksGroupType.CONDITION, + name=name, + is_root=False, + ) + self.conditions: List[pipeline_channel.ConditionOperation] = conditions + + +class If(_ConditionBase): + """A class for creating a conditional control flow "if" block within a + pipeline. Args: condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. @@ -150,22 +174,151 @@ class Condition(TasksGroup): :: task1 = my_component1(...) - with Condition(task1.output=='pizza', 'pizza-condition'): + with dsl.If(task1.output=='pizza', 'pizza-condition'): task2 = my_component2(...) """ def __init__( self, - condition: pipeline_channel.ConditionOperator, + condition, name: Optional[str] = None, - ): - """Initializes a conditional task group.""" + ) -> None: super().__init__( - group_type=TasksGroupType.CONDITION, + conditions=[condition], name=name, - is_root=False, ) - self.condition = condition + if isinstance(condition, bool): + raise ValueError( + f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.' + ) + copied_condition = copy.copy(condition) + copied_condition.negate = True + self._negated_upstream_conditions = [copied_condition] + + +class Condition(If): + """Deprecated. + + Use dsl.If instead. + """ + + def __enter__(self): + super().__enter__() + warnings.warn( + 'dsl.Condition is deprecated. Please use dsl.If instead.', + category=DeprecationWarning, + stacklevel=2) + return self + + +class Elif(_ConditionBase): + """A class for creating a conditional control flow "else if" block within a + pipeline. Can be used following an upstream dsl.If or dsl.Elif. + + Args: + condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. + name: The name of the condition group. + + Example: + :: + + task1 = my_component1(...) + task2 = my_component2(...) + with dsl.If(task1.output=='pizza', 'pizza-condition'): + task3 = my_component3(...) + + with dsl.Elif(task2.output=='pasta', 'pasta-condition'): + task4 = my_component4(...) + """ + + def __init__( + self, + condition, + name: Optional[str] = None, + ) -> None: + prev_cond = pipeline_context.Pipeline.get_default_pipeline( + ).get_last_tasks_group() + if not isinstance(prev_cond, (Condition, If, Elif)): + # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python + raise InvalidControlFlowException( + 'dsl.Elif can only be used following an upstream dsl.If or dsl.Elif.' + ) + + if isinstance(condition, bool): + raise ValueError( + f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.' + ) + + copied_condition = copy.copy(condition) + copied_condition.negate = True + self._negated_upstream_conditions = _shallow_copy_list_of_binary_operations( + prev_cond._negated_upstream_conditions) + [copied_condition] + + conditions = _shallow_copy_list_of_binary_operations( + prev_cond._negated_upstream_conditions) + conditions.append(condition) + + super().__init__( + conditions=conditions, + name=name, + ) + + +class Else(_ConditionBase): + """A class for creating a conditional control flow "else" block within a + pipeline. Can be used following an upstream dsl.If or dsl.Elif. + + Args: + name: The name of the condition group. + + Example: + :: + + task1 = my_component1(...) + task2 = my_component2(...) + with dsl.If(task1.output=='pizza', 'pizza-condition'): + task3 = my_component3(...) + + with dsl.Elif(task2.output=='pasta', 'pasta-condition'): + task4 = my_component4(...) + + with dsl.Else(): + my_component5(...) + """ + + def __init__( + self, + name: Optional[str] = None, + ) -> None: + prev_cond = pipeline_context.Pipeline.get_default_pipeline( + ).get_last_tasks_group() + + if isinstance(prev_cond, Else): + # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python + raise InvalidControlFlowException( + 'Cannot use dsl.Else following another dsl.Else. dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' + ) + if not isinstance(prev_cond, (Condition, If, Elif)): + # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python + raise InvalidControlFlowException( + 'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' + ) + + super().__init__( + conditions=prev_cond._negated_upstream_conditions, + name=name, + ) + + +class InvalidControlFlowException(Exception): + pass + + +def _shallow_copy_list_of_binary_operations( + operations: List[pipeline_channel.ConditionOperation] +) -> List[pipeline_channel.ConditionOperation]: + # shallow copy is sufficient to allow us to invert the negate flag of a ConditionOperation without affecting copies. deep copy not needed and would result in many copies of the full pipeline since PipelineChannels hold references to the pipeline. + return [copy.copy(operation) for operation in operations] class ParallelFor(TasksGroup): @@ -198,7 +351,7 @@ def __init__( items: Union[for_loop.ItemList, pipeline_channel.PipelineChannel], name: Optional[str] = None, parallelism: Optional[int] = None, - ): + ) -> None: """Initializes a for loop task group.""" parallelism = parallelism or 0 if parallelism < 0: diff --git a/sdk/python/kfp/dsl/tasks_group_test.py b/sdk/python/kfp/dsl/tasks_group_test.py index 09ba5cdbc34..40c68ab3725 100644 --- a/sdk/python/kfp/dsl/tasks_group_test.py +++ b/sdk/python/kfp/dsl/tasks_group_test.py @@ -12,13 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from absl.testing import parameterized +import unittest + +from kfp import dsl from kfp.dsl import for_loop from kfp.dsl import pipeline_context from kfp.dsl import tasks_group -class ParallelForTest(parameterized.TestCase): +class ParallelForTest(unittest.TestCase): def test_basic(self): loop_items = ['pizza', 'hotdog', 'pasta'] @@ -58,3 +60,21 @@ def test_parallelfor_invalid_parallelism(self): 'ParallelFor parallelism must be >= 0.'): with pipeline_context.Pipeline('pipeline') as p: tasks_group.ParallelFor(items=loop_items, parallelism=-1) + + +class TestConditionDeprecated(unittest.TestCase): + + def test(self): + + @dsl.component + def foo() -> str: + return 'foo' + + @dsl.pipeline + def my_pipeline(string: str): + with self.assertWarnsRegex( + DeprecationWarning, + 'dsl\.Condition is deprecated\. Please use dsl\.If instead\.' + ): + with dsl.Condition(string == 'text'): + foo() diff --git a/sdk/python/test_data/pipelines/if_elif_else.py b/sdk/python/test_data/pipelines/if_elif_else.py new file mode 100644 index 00000000000..fdaa3428f64 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else.py @@ -0,0 +1,51 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp import compiler +from kfp import dsl + + +@dsl.component +def flip_three_sided_die() -> str: + import random + val = random.randint(0, 2) + + if val == 0: + return 'heads' + elif val == 1: + return 'tails' + else: + return 'draw' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.pipeline +def roll_die_pipeline(): + flip_coin_task = flip_three_sided_die() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Elif(flip_coin_task.output == 'tails'): + print_and_return(text='Got tails!') + with dsl.Else(): + print_and_return(text='Draw!') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=roll_die_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_elif_else.yaml b/sdk/python/test_data/pipelines/if_elif_else.yaml new file mode 100644 index 00000000000..3a353f79a96 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else.yaml @@ -0,0 +1,280 @@ +# PIPELINE DEFINITION +# Name: roll-die-pipeline +components: + comp-condition-1: + dag: + tasks: + print-and-return: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return + inputs: + parameters: + text: + runtimeValue: + constant: Got heads! + taskInfo: + name: print-and-return + inputDefinitions: + parameters: + pipelinechannel--flip-three-sided-die-Output: + parameterType: STRING + comp-condition-2: + dag: + tasks: + print-and-return-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-2 + inputs: + parameters: + text: + runtimeValue: + constant: Got tails! + taskInfo: + name: print-and-return-2 + inputDefinitions: + parameters: + pipelinechannel--flip-three-sided-die-Output: + parameterType: STRING + comp-condition-3: + dag: + tasks: + print-and-return-3: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-3 + inputs: + parameters: + text: + runtimeValue: + constant: Draw! + taskInfo: + name: print-and-return-3 + inputDefinitions: + parameters: + pipelinechannel--flip-three-sided-die-Output: + parameterType: STRING + comp-flip-three-sided-die: + executorLabel: exec-flip-three-sided-die + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return: + executorLabel: exec-print-and-return + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-2: + executorLabel: exec-print-and-return-2 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-3: + executorLabel: exec-print-and-return-3 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-flip-three-sided-die: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - flip_three_sided_die + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef flip_three_sided_die() -> str:\n import random\n val =\ + \ random.randint(0, 2)\n\n if val == 0:\n return 'heads'\n \ + \ elif val == 1:\n return 'tails'\n else:\n return 'draw'\n\ + \n" + image: python:3.7 + exec-print-and-return: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-3: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 +pipelineInfo: + name: roll-die-pipeline +root: + dag: + tasks: + condition-1: + componentRef: + name: comp-condition-1 + dependentTasks: + - flip-three-sided-die + inputs: + parameters: + pipelinechannel--flip-three-sided-die-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-three-sided-die + taskInfo: + name: condition-1 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--flip-three-sided-die-Output'] + == 'heads' + condition-2: + componentRef: + name: comp-condition-2 + dependentTasks: + - flip-three-sided-die + inputs: + parameters: + pipelinechannel--flip-three-sided-die-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-three-sided-die + taskInfo: + name: condition-2 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] + == ''heads'') && inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] + == ''tails''' + condition-3: + componentRef: + name: comp-condition-3 + dependentTasks: + - flip-three-sided-die + inputs: + parameters: + pipelinechannel--flip-three-sided-die-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-three-sided-die + taskInfo: + name: condition-3 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] + == ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] + == ''tails'')' + flip-three-sided-die: + cachingOptions: + enableCache: true + componentRef: + name: comp-flip-three-sided-die + taskInfo: + name: flip-three-sided-die +schemaVersion: 2.1.0 +sdkVersion: kfp-2.1.3 diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.py b/sdk/python/test_data/pipelines/if_elif_else_complex.py new file mode 100644 index 00000000000..42623cb5084 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.py @@ -0,0 +1,86 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List + +from kfp import compiler +from kfp import dsl + + +@dsl.component +def int_0_to_9999() -> int: + import random + return random.randint(0, 9999) + + +@dsl.component +def is_even_or_odd(num: int) -> str: + return 'odd' if num % 2 else 'even' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.component +def print_strings(strings: List[str]): + print(strings) + + +@dsl.pipeline +def lucky_number_pipeline(add_drumroll: bool = True, + repeat_if_lucky_number: bool = True, + trials: List[int] = [1, 2, 3]): + with dsl.ParallelFor(trials) as trial: + int_task = int_0_to_9999().set_caching_options(False) + with dsl.If(add_drumroll == True): + with dsl.If(trial == 3): + print_and_return(text='Adding drumroll on last trial!') + + with dsl.If(int_task.output < 5000): + + even_or_odd_task = is_even_or_odd(num=int_task.output) + + with dsl.If(even_or_odd_task.output == 'even'): + print_and_return(text='Got a low even number!') + with dsl.Else(): + print_and_return(text='Got a low odd number!') + + with dsl.Elif(int_task.output > 5000): + + even_or_odd_task = is_even_or_odd(num=int_task.output) + + with dsl.If(even_or_odd_task.output == 'even'): + print_and_return(text='Got a high even number!') + with dsl.Else(): + print_and_return(text='Got a high odd number!') + + with dsl.Else(): + print_and_return( + text='Announcing: Got the lucky number 5000! A one in 10,000 chance.' + ) + with dsl.If(repeat_if_lucky_number == True): + with dsl.ParallelFor([1, 2]) as _: + print_and_return( + text='Announcing again: Got the lucky number 5000! A one in 10,000 chance.' + ) + + print_strings(strings=dsl.Collected(even_or_odd_task.output)) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=lucky_number_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.yaml b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml new file mode 100644 index 00000000000..7fe0116c46b --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml @@ -0,0 +1,910 @@ +# PIPELINE DEFINITION +# Name: lucky-number-pipeline +# Inputs: +# add_drumroll: bool [Default: True] +# repeat_if_lucky_number: bool [Default: True] +# trials: list [Default: [1.0, 2.0, 3.0]] +components: + comp-condition-10: + dag: + tasks: + condition-11: + componentRef: + name: comp-condition-11 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: pipelinechannel--repeat_if_lucky_number + taskInfo: + name: condition-11 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--repeat_if_lucky_number'] + == true + print-and-return-6: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-6 + inputs: + parameters: + text: + runtimeValue: + constant: 'Announcing: Got the lucky number 5000! A one in 10,000 + chance.' + taskInfo: + name: print-and-return-6 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + comp-condition-11: + dag: + tasks: + for-loop-13: + componentRef: + name: comp-for-loop-13 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: pipelinechannel--repeat_if_lucky_number + parameterIterator: + itemInput: pipelinechannel--loop-item-param-12 + items: + raw: '[1, 2]' + taskInfo: + name: for-loop-13 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + comp-condition-2: + dag: + tasks: + condition-3: + componentRef: + name: comp-condition-3 + inputs: + parameters: + pipelinechannel--add_drumroll: + componentInputParameter: pipelinechannel--add_drumroll + pipelinechannel--trials-loop-item: + componentInputParameter: pipelinechannel--trials-loop-item + taskInfo: + name: condition-3 + triggerPolicy: + condition: int(inputs.parameter_values['pipelinechannel--trials-loop-item']) + == 3 + inputDefinitions: + parameters: + pipelinechannel--add_drumroll: + parameterType: BOOLEAN + pipelinechannel--trials-loop-item: + parameterType: NUMBER_INTEGER + comp-condition-3: + dag: + tasks: + print-and-return: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return + inputs: + parameters: + text: + runtimeValue: + constant: Adding drumroll on last trial! + taskInfo: + name: print-and-return + inputDefinitions: + parameters: + pipelinechannel--add_drumroll: + parameterType: BOOLEAN + pipelinechannel--trials-loop-item: + parameterType: NUMBER_INTEGER + comp-condition-4: + dag: + tasks: + condition-5: + componentRef: + name: comp-condition-5 + dependentTasks: + - is-even-or-odd + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd + taskInfo: + name: condition-5 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--is-even-or-odd-Output'] + == 'even' + condition-6: + componentRef: + name: comp-condition-6 + dependentTasks: + - is-even-or-odd + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd + taskInfo: + name: condition-6 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--is-even-or-odd-Output''] + == ''even'')' + is-even-or-odd: + cachingOptions: + enableCache: true + componentRef: + name: comp-is-even-or-odd + inputs: + parameters: + num: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + taskInfo: + name: is-even-or-odd + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + comp-condition-5: + dag: + tasks: + print-and-return-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-2 + inputs: + parameters: + text: + runtimeValue: + constant: Got a low even number! + taskInfo: + name: print-and-return-2 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--is-even-or-odd-Output: + parameterType: STRING + comp-condition-6: + dag: + tasks: + print-and-return-3: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-3 + inputs: + parameters: + text: + runtimeValue: + constant: Got a low odd number! + taskInfo: + name: print-and-return-3 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--is-even-or-odd-Output: + parameterType: STRING + comp-condition-7: + dag: + outputs: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + valueFromParameter: + outputParameterKey: Output + producerSubtask: is-even-or-odd-2 + tasks: + condition-8: + componentRef: + name: comp-condition-8 + dependentTasks: + - is-even-or-odd-2 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-2-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd-2 + taskInfo: + name: condition-8 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--is-even-or-odd-2-Output'] + == 'even' + condition-9: + componentRef: + name: comp-condition-9 + dependentTasks: + - is-even-or-odd-2 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-2-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd-2 + taskInfo: + name: condition-9 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--is-even-or-odd-2-Output''] + == ''even'')' + is-even-or-odd-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-is-even-or-odd-2 + inputs: + parameters: + num: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + taskInfo: + name: is-even-or-odd-2 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + parameterType: NUMBER_INTEGER + comp-condition-8: + dag: + tasks: + print-and-return-4: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-4 + inputs: + parameters: + text: + runtimeValue: + constant: Got a high even number! + taskInfo: + name: print-and-return-4 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--is-even-or-odd-2-Output: + parameterType: STRING + comp-condition-9: + dag: + tasks: + print-and-return-5: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-5 + inputs: + parameters: + text: + runtimeValue: + constant: Got a high odd number! + taskInfo: + name: print-and-return-5 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--is-even-or-odd-2-Output: + parameterType: STRING + comp-for-loop-1: + dag: + outputs: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + valueFromParameter: + outputParameterKey: pipelinechannel--is-even-or-odd-2-Output + producerSubtask: condition-7 + tasks: + condition-10: + componentRef: + name: comp-condition-10 + dependentTasks: + - int-0-to-9999 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-0-to-9999 + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: pipelinechannel--repeat_if_lucky_number + taskInfo: + name: condition-10 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + < 5000) && !(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + > 5000)' + condition-2: + componentRef: + name: comp-condition-2 + inputs: + parameters: + pipelinechannel--add_drumroll: + componentInputParameter: pipelinechannel--add_drumroll + pipelinechannel--trials-loop-item: + componentInputParameter: pipelinechannel--trials-loop-item + taskInfo: + name: condition-2 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--add_drumroll'] == + true + condition-4: + componentRef: + name: comp-condition-4 + dependentTasks: + - int-0-to-9999 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-0-to-9999 + taskInfo: + name: condition-4 + triggerPolicy: + condition: int(inputs.parameter_values['pipelinechannel--int-0-to-9999-Output']) + < 5000 + condition-7: + componentRef: + name: comp-condition-7 + dependentTasks: + - int-0-to-9999 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-0-to-9999 + taskInfo: + name: condition-7 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + < 5000) && int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + > 5000' + int-0-to-9999: + cachingOptions: {} + componentRef: + name: comp-int-0-to-9999 + taskInfo: + name: int-0-to-9999 + inputDefinitions: + parameters: + pipelinechannel--add_drumroll: + parameterType: BOOLEAN + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + pipelinechannel--trials: + parameterType: LIST + pipelinechannel--trials-loop-item: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + parameterType: NUMBER_INTEGER + comp-for-loop-13: + dag: + tasks: + print-and-return-7: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-7 + inputs: + parameters: + text: + runtimeValue: + constant: 'Announcing again: Got the lucky number 5000! A one in + 10,000 chance.' + taskInfo: + name: print-and-return-7 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--loop-item-param-12: + parameterType: NUMBER_INTEGER + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + comp-int-0-to-9999: + executorLabel: exec-int-0-to-9999 + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER + comp-is-even-or-odd: + executorLabel: exec-is-even-or-odd + inputDefinitions: + parameters: + num: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-is-even-or-odd-2: + executorLabel: exec-is-even-or-odd-2 + inputDefinitions: + parameters: + num: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return: + executorLabel: exec-print-and-return + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-2: + executorLabel: exec-print-and-return-2 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-3: + executorLabel: exec-print-and-return-3 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-4: + executorLabel: exec-print-and-return-4 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-5: + executorLabel: exec-print-and-return-5 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-6: + executorLabel: exec-print-and-return-6 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-7: + executorLabel: exec-print-and-return-7 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-strings: + executorLabel: exec-print-strings + inputDefinitions: + parameters: + strings: + parameterType: LIST +deploymentSpec: + executors: + exec-int-0-to-9999: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - int_0_to_9999 + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef int_0_to_9999() -> int:\n import random\n return random.randint(0,\ + \ 9999)\n\n" + image: python:3.7 + exec-is-even-or-odd: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - is_even_or_odd + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef is_even_or_odd(num: int) -> str:\n return 'odd' if num % 2\ + \ else 'even'\n\n" + image: python:3.7 + exec-is-even-or-odd-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - is_even_or_odd + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef is_even_or_odd(num: int) -> str:\n return 'odd' if num % 2\ + \ else 'even'\n\n" + image: python:3.7 + exec-print-and-return: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-3: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-4: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-5: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-6: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-7: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-strings: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_strings + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_strings(strings: List[str]):\n print(strings)\n\n" + image: python:3.7 +pipelineInfo: + name: lucky-number-pipeline +root: + dag: + tasks: + for-loop-1: + componentRef: + name: comp-for-loop-1 + inputs: + parameters: + pipelinechannel--add_drumroll: + componentInputParameter: add_drumroll + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: repeat_if_lucky_number + pipelinechannel--trials: + componentInputParameter: trials + parameterIterator: + itemInput: pipelinechannel--trials-loop-item + items: + inputParameter: pipelinechannel--trials + taskInfo: + name: for-loop-1 + print-strings: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-strings + dependentTasks: + - for-loop-1 + inputs: + parameters: + strings: + taskOutputParameter: + outputParameterKey: pipelinechannel--is-even-or-odd-2-Output + producerTask: for-loop-1 + taskInfo: + name: print-strings + inputDefinitions: + parameters: + add_drumroll: + defaultValue: true + isOptional: true + parameterType: BOOLEAN + repeat_if_lucky_number: + defaultValue: true + isOptional: true + parameterType: BOOLEAN + trials: + defaultValue: + - 1.0 + - 2.0 + - 3.0 + isOptional: true + parameterType: LIST +schemaVersion: 2.1.0 +sdkVersion: kfp-2.1.3 diff --git a/sdk/python/test_data/pipelines/if_else.py b/sdk/python/test_data/pipelines/if_else.py new file mode 100644 index 00000000000..1da8a074ac1 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_else.py @@ -0,0 +1,42 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp import compiler +from kfp import dsl + + +@dsl.component +def flip_coin() -> str: + import random + return 'heads' if random.randint(0, 1) == 0 else 'tails' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.pipeline +def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Else(): + print_and_return(text='Got tails!') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=flip_coin_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_else.yaml b/sdk/python/test_data/pipelines/if_else.yaml new file mode 100644 index 00000000000..a64ff7b87d0 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_else.yaml @@ -0,0 +1,202 @@ +# PIPELINE DEFINITION +# Name: flip-coin-pipeline +components: + comp-condition-1: + dag: + tasks: + print-and-return: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return + inputs: + parameters: + text: + runtimeValue: + constant: Got heads! + taskInfo: + name: print-and-return + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-condition-2: + dag: + tasks: + print-and-return-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-2 + inputs: + parameters: + text: + runtimeValue: + constant: Got tails! + taskInfo: + name: print-and-return-2 + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-flip-coin: + executorLabel: exec-flip-coin + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return: + executorLabel: exec-print-and-return + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-2: + executorLabel: exec-print-and-return-2 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-flip-coin: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - flip_coin + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef flip_coin() -> str:\n import random\n return 'heads' if\ + \ random.randint(0, 1) == 0 else 'tails'\n\n" + image: python:3.7 + exec-print-and-return: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 +pipelineInfo: + name: flip-coin-pipeline +root: + dag: + tasks: + condition-1: + componentRef: + name: comp-condition-1 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-1 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--flip-coin-Output'] + == 'heads' + condition-2: + componentRef: + name: comp-condition-2 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-2 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + == ''heads'')' + flip-coin: + cachingOptions: + enableCache: true + componentRef: + name: comp-flip-coin + taskInfo: + name: flip-coin +schemaVersion: 2.1.0 +sdkVersion: kfp-2.1.2 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index 02aae9d1da0..b40267f35c4 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -168,6 +168,15 @@ pipelines: - module: pipeline_with_metadata_fields name: dataset_concatenator execute: false + - module: if_else + name: flip_coin_pipeline + execute: false + - module: if_elif_else + name: roll_die_pipeline + execute: false + - module: if_elif_else_complex + name: lucky_number_pipeline + execute: false components: test_data_dir: sdk/python/test_data/components read: true