From 43d8a8b3fcd8844232806285d0bdb9b41ab20149 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Thu, 17 Aug 2023 10:24:29 -0700 Subject: [PATCH 1/6] support if/elif/else --- sdk/python/kfp/compiler/compiler_test.py | 325 +++++++++++ sdk/python/kfp/compiler/compiler_utils.py | 33 +- .../kfp/compiler/pipeline_spec_builder.py | 72 ++- sdk/python/kfp/dsl/__init__.py | 6 + sdk/python/kfp/dsl/pipeline_channel.py | 19 +- sdk/python/kfp/dsl/pipeline_context.py | 8 +- sdk/python/kfp/dsl/tasks_group.py | 182 ++++++- .../test_data/pipelines/if_elif_else.py | 51 ++ .../test_data/pipelines/if_elif_else.yaml | 275 ++++++++++ .../pipelines/if_elif_else_complex.py | 60 ++ .../pipelines/if_elif_else_complex.yaml | 511 ++++++++++++++++++ sdk/python/test_data/pipelines/if_else.py | 42 ++ sdk/python/test_data/pipelines/if_else.yaml | 199 +++++++ sdk/python/test_data/test_data_config.yaml | 9 + 14 files changed, 1728 insertions(+), 64 deletions(-) create mode 100644 sdk/python/test_data/pipelines/if_elif_else.py create mode 100644 sdk/python/test_data/pipelines/if_elif_else.yaml create mode 100644 sdk/python/test_data/pipelines/if_elif_else_complex.py create mode 100644 sdk/python/test_data/pipelines/if_elif_else_complex.yaml create mode 100644 sdk/python/test_data/pipelines/if_else.py create mode 100644 sdk/python/test_data/pipelines/if_else.yaml diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 92b1f6a1b72..bf39f98b1c0 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -4161,5 +4161,330 @@ def my_pipeline( 'Component output artifact.') +@dsl.component +def flip_coin() -> str: + import random + return 'heads' if random.randint(0, 1) == 0 else 'tails' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.component +def flip_three_sided_coin() -> str: + import random + val = random.randint(0, 2) + + if val == 0: + return 'heads' + elif val == 1: + return 'tails' + else: + return 'draw' + + +@dsl.component +def int_zero_through_three() -> int: + import random + return random.randint(0, 3) + + +class TestConditionLogic(unittest.TestCase): + + def test_if(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + + def test_if_else(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Else(): + print_and_return(text='Got tails!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads')" + ) + + def test_if_elif_else(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_three_sided_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Elif(flip_coin_task.output == 'tails'): + print_and_return(text='Got tails!') + with dsl.Else(): + print_and_return(text='Draw!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads') && inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'tails'" + ) + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-3'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'heads') && !(inputs.parameter_values['pipelinechannel--flip-three-sided-coin-Output'] == 'tails')" + ) + + def test_if_multiple_elif_else(self): + + @dsl.pipeline + def int_to_string(): + int_task = int_zero_through_three() + with dsl.If(int_task.output == 0): + print_and_return(text='Got zero!') + with dsl.Elif(int_task.output == 1): + print_and_return(text='Got one!') + with dsl.Elif(int_task.output == 2): + print_and_return(text='Got two!') + with dsl.Else(): + print_and_return(text='Got three!') + + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0" + ) + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1" + ) + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-3'] + .trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2" + ) + self.assertEqual( + int_to_string.pipeline_spec.root.dag.tasks['condition-4'] + .trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2)" + ) + + def test_nested_if_elif_else_with_pipeline_param(self): + + @dsl.pipeline + def flip_coin_pipeline(confirm: bool): + int_task = int_zero_through_three() + heads_task = flip_coin() + + with dsl.If(heads_task.output == 'heads'): + with dsl.If(int_task.output == 0): + print_and_return(text='Got zero!') + + with dsl.Elif(int_task.output == 1): + task = print_and_return(text='Got one!') + with dsl.If(confirm == True): + print_and_return(text='Confirmed: definitely got one.') + + with dsl.Elif(int_task.output == 2): + print_and_return(text='Got two!') + + with dsl.Else(): + print_and_return(text='Got three!') + + # top level conditions + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + # second level nested conditions + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-2'].trigger_policy.condition, + "int(inputs.parameter_values[\'pipelinechannel--int-zero-through-three-Output\']) == 0" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-3'].trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-5'].trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-1'].dag + .tasks['condition-6'].trigger_policy.condition, + "!(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 0) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 1) && !(int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) == 2)" + ) + # third level nested conditions + self.assertEqual( + flip_coin_pipeline.pipeline_spec.components['comp-condition-3'].dag + .tasks['condition-4'].trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--confirm'] == true") + + def test_multiple_ifs_permitted(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.If(flip_coin_task.output == 'tails'): + print_and_return(text='Got tails!') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'tails'" + ) + + def test_multiple_else_not_permitted(self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + r'Cannot use dsl\.Else following another dsl\.Else\. dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Else(): + print_and_return(text='Got tails!') + with dsl.Else(): + print_and_return(text='Got tails!') + + def test_else_no_if_not_supported(self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + r'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + with dsl.Else(): + print_and_return(text='Got unknown') + + def test_elif_no_if_not_supported(self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + r'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.Elif(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + + def test_boolean_condition_has_helpful_error(self): + with self.assertRaisesRegex( + ValueError, + r'Got constant boolean True as a condition\. This is likely because the provided condition evaluated immediately\. At least one of the operands must be an output from an upstream task or a pipeline parameter\.' + ): + + @dsl.pipeline + def my_pipeline(): + with dsl.Condition('foo' == 'foo'): + print_and_return(text='I will always run.') + + def test_boolean_elif_has_helpful_error(self): + with self.assertRaisesRegex( + ValueError, + r'Got constant boolean False as a condition\. This is likely because the provided condition evaluated immediately\. At least one of the operands must be an output from an upstream task or a pipeline parameter\.' + ): + + @dsl.pipeline + def my_pipeline(text: str): + with dsl.If(text == 'foo'): + print_and_return(text='I will always run.') + with dsl.Elif('foo' == 'bar'): + print_and_return(text='I will never run.') + + def test_tasks_instantiated_between_if_else_and_elif_permitted(self): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads on coin one!') + + flip_coin_task_2 = flip_coin() + + with dsl.Elif(flip_coin_task_2.output == 'tails'): + print_and_return(text='Got heads on coin two!') + + flip_coin_task_3 = flip_coin() + + with dsl.Else(): + print_and_return( + text=f'Coin three result: {flip_coin_task_3.output}') + + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-1'] + .trigger_policy.condition, + "inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-2'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads') && inputs.parameter_values['pipelinechannel--flip-coin-2-Output'] == 'tails'" + ) + self.assertEqual( + flip_coin_pipeline.pipeline_spec.root.dag.tasks['condition-3'] + .trigger_policy.condition, + "!(inputs.parameter_values['pipelinechannel--flip-coin-Output'] == 'heads') && !(inputs.parameter_values['pipelinechannel--flip-coin-2-Output'] == 'tails')" + ) + + def test_other_control_flow_instantiated_between_if_else_not_permitted( + self): + with self.assertRaisesRegex( + tasks_group.InvalidControlFlowException, + 'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + ): + + @dsl.pipeline + def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.ParallelFor(['foo', 'bar']) as item: + print_and_return(text=item) + with dsl.Else(): + print_and_return(text='Got tails!') + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 79c4418bdff..b4ffb755921 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -119,7 +119,18 @@ def _get_parent_groups_helper( return (tasks_to_groups, groups_to_groups) -# TODO: do we really need this? +def get_channels_from_condition( + operations: List[pipeline_channel.BinaryOperation], + collected_channels: list, +) -> None: + """Append to collected_channels each pipeline channels used in each operand + of each operation in operations.""" + for operation in operations: + for operand in [operation.left_operand, operation.right_operand]: + if isinstance(operand, pipeline_channel.PipelineChannel): + collected_channels.append(operand) + + def get_condition_channels_for_tasks( root_group: tasks_group.TasksGroup, ) -> Mapping[str, Set[pipeline_channel.PipelineChannel]]: @@ -139,16 +150,13 @@ def _get_condition_channels_for_tasks_helper( current_conditions_channels, ): new_current_conditions_channels = current_conditions_channels - if isinstance(group, tasks_group.Condition): + if isinstance(group, tasks_group._ConditionBase): new_current_conditions_channels = list(current_conditions_channels) - if isinstance(group.condition.left_operand, - pipeline_channel.PipelineChannel): - new_current_conditions_channels.append( - group.condition.left_operand) - if isinstance(group.condition.right_operand, - pipeline_channel.PipelineChannel): - new_current_conditions_channels.append( - group.condition.right_operand) + get_channels_from_condition( + group.condition, + new_current_conditions_channels, + ) + for task in group.tasks: for channel in new_current_conditions_channels: conditions[task.name].add(channel) @@ -661,8 +669,9 @@ def get_dependencies( dependent_group = group_name_to_group.get( uncommon_upstream_groups[0], None) - if isinstance(dependent_group, - (tasks_group.Condition, tasks_group.ExitHandler)): + if isinstance( + dependent_group, + (tasks_group._ConditionBase, tasks_group.ExitHandler)): raise InvalidTopologyException( f'{ILLEGAL_CROSS_DAG_ERROR_PREFIX} A downstream task cannot depend on an upstream task within a dsl.{dependent_group.__class__.__name__} context unless the downstream is within that context too. Found task {task.name} which depends on upstream task {upstream_task.name} within an uncommon dsl.{dependent_group.__class__.__name__} context.' ) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index b276f892c1d..98ab68ea9ba 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -709,22 +709,38 @@ def _update_task_spec_for_loop_group( input_name=pipeline_task_spec.parameter_iterator.item_input) -def _resolve_condition_operands( - left_operand: Union[str, pipeline_channel.PipelineChannel], - right_operand: Union[str, pipeline_channel.PipelineChannel], -) -> Tuple[str, str]: - """Resolves values and PipelineChannels for condition operands. +def _binary_operations_to_cel_conjunctive( + operations: List[pipeline_channel.BinaryOperation]) -> str: + """Converts a list of BinaryOperation to a CEL string with placeholders. + Each BinaryOperation will be joined the others via the conjunctive (&&). Args: - left_operand: The left operand of a condition expression. - right_operand: The right operand of a condition expression. + operations: The binary operations to convert to convert and join. Returns: - A tuple of the resolved operands values: - (left_operand_value, right_operand_value). + The binary operations as a CEL string. """ + operands = [ + _single_binary_operation_to_cel_condition(operation=bin_op) + for bin_op in operations + ] + return ' && '.join(operands) - # Pre-scan the operand to get the type of constant value if there's any. + +def _single_binary_operation_to_cel_condition( + operation: pipeline_channel.BinaryOperation) -> str: + """Converts a BinaryOperation to a CEL string with placeholders. + + Args: + operation: The binary operation to convert to a string. + + Returns: + The binary operation as a CEL string. + """ + left_operand = operation.left_operand + right_operand = operation.right_operand + + # cannot make comparisons involving particular types for value_or_reference in [left_operand, right_operand]: if isinstance(value_or_reference, pipeline_channel.PipelineChannel): parameter_type = type_utils.get_parameter_type( @@ -738,8 +754,10 @@ def _resolve_condition_operands( input_name = compiler_utils.additional_input_name_for_pipeline_channel( value_or_reference) raise ValueError( - f'Conditional requires scalar parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.' + f'Conditional requires primitive parameter values for comparison. Found input "{input_name}" of type {value_or_reference.channel_type} in pipeline definition instead.' ) + + # ensure the types compared are the same or compatible parameter_types = set() for value_or_reference in [left_operand, right_operand]: if isinstance(value_or_reference, pipeline_channel.PipelineChannel): @@ -822,11 +840,16 @@ def _resolve_condition_operands( operand_values.append(operand_value) - return tuple(operand_values) + left_operand_value, right_operand_value = tuple(operand_values) + + condition_string = ( + f'{left_operand_value} {operation.operator} {right_operand_value}') + + return f'!({condition_string})' if operation.negate else condition_string def _update_task_spec_for_condition_group( - group: tasks_group.Condition, + group: tasks_group._ConditionBase, pipeline_task_spec: pipeline_spec_pb2.PipelineTaskSpec, ) -> None: """Updates PipelineTaskSpec for condition group. @@ -835,15 +858,9 @@ def _update_task_spec_for_condition_group( group: The condition group to update task spec for. pipeline_task_spec: The pipeline task spec to update in place. """ - left_operand_value, right_operand_value = _resolve_condition_operands( - group.condition.left_operand, group.condition.right_operand) - - condition_string = ( - f'{left_operand_value} {group.condition.operator} {right_operand_value}' - ) + condition = _binary_operations_to_cel_conjunctive(group.condition) pipeline_task_spec.trigger_policy.CopyFrom( - pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy( - condition=condition_string)) + pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy(condition=condition)) def build_task_spec_for_exit_task( @@ -954,7 +971,7 @@ def build_task_spec_for_group( group=group, pipeline_task_spec=pipeline_task_spec, ) - elif isinstance(group, tasks_group.Condition): + elif isinstance(group, tasks_group._ConditionBase): _update_task_spec_for_condition_group( group=group, pipeline_task_spec=pipeline_task_spec, @@ -1236,17 +1253,14 @@ def build_spec_by_group( _build_dag_outputs(subgroup_component_spec, subgroup_output_channels) - elif isinstance(subgroup, tasks_group.Condition): + elif isinstance(subgroup, tasks_group._ConditionBase): # "Punch the hole", adding inputs needed by its subgroups or # tasks. condition_subgroup_channels = list(subgroup_input_channels) - for operand in [ - subgroup.condition.left_operand, - subgroup.condition.right_operand, - ]: - if isinstance(operand, pipeline_channel.PipelineChannel): - condition_subgroup_channels.append(operand) + + compiler_utils.get_channels_from_condition( + subgroup.condition, condition_subgroup_channels) subgroup_component_spec = build_component_spec_for_group( input_pipeline_channels=condition_subgroup_channels, diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index a23b640fdb5..001226b02cf 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -237,7 +237,10 @@ def my_pipeline(): from kfp.dsl.placeholders import IfPresentPlaceholder from kfp.dsl.structures import ContainerSpec from kfp.dsl.tasks_group import Condition + from kfp.dsl.tasks_group import Elif + from kfp.dsl.tasks_group import Else from kfp.dsl.tasks_group import ExitHandler + from kfp.dsl.tasks_group import If from kfp.dsl.tasks_group import ParallelFor __all__.extend([ 'component', @@ -246,6 +249,9 @@ def my_pipeline(): 'importer', 'ContainerSpec', 'Condition', + 'If', + 'Elif', + 'Else', 'ExitHandler', 'ParallelFor', 'Collected', diff --git a/sdk/python/kfp/dsl/pipeline_channel.py b/sdk/python/kfp/dsl/pipeline_channel.py index 66616103fb6..91f6b836b7a 100644 --- a/sdk/python/kfp/dsl/pipeline_channel.py +++ b/sdk/python/kfp/dsl/pipeline_channel.py @@ -24,17 +24,20 @@ @dataclasses.dataclass -class ConditionOperator: - """Represents a condition expression to be used in dsl.Condition(). +class BinaryOperation: + """Represents a condition expression to be used in condition control flow + group. Attributes: operator: The operator of the condition. left_operand: The left operand. right_operand: The right operand. + negate: Whether to negate the result of the binary operation. """ operator: str left_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] right_operand: Union['PipelineParameterChannel', type_utils.PARAMETER_TYPES] + negate: bool = False # The string template used to generate the placeholder of a PipelineChannel. @@ -149,22 +152,22 @@ def __hash__(self) -> int: return hash(self.pattern) def __eq__(self, other): - return ConditionOperator('==', self, other) + return BinaryOperation('==', self, other) def __ne__(self, other): - return ConditionOperator('!=', self, other) + return BinaryOperation('!=', self, other) def __lt__(self, other): - return ConditionOperator('<', self, other) + return BinaryOperation('<', self, other) def __le__(self, other): - return ConditionOperator('<=', self, other) + return BinaryOperation('<=', self, other) def __gt__(self, other): - return ConditionOperator('>', self, other) + return BinaryOperation('>', self, other) def __ge__(self, other): - return ConditionOperator('>=', self, other) + return BinaryOperation('>=', self, other) class PipelineParameterChannel(PipelineChannel): diff --git a/sdk/python/kfp/dsl/pipeline_context.py b/sdk/python/kfp/dsl/pipeline_context.py index c1304c39bac..a04566af018 100644 --- a/sdk/python/kfp/dsl/pipeline_context.py +++ b/sdk/python/kfp/dsl/pipeline_context.py @@ -14,7 +14,7 @@ """Definition for Pipeline.""" import functools -from typing import Callable, Optional +from typing import Callable, Optional, Union from kfp.dsl import component_factory from kfp.dsl import pipeline_task @@ -189,6 +189,12 @@ def pop_tasks_group(self): """Removes the current TasksGroup from the stack.""" del self.groups[-1] + def get_last_tasks_group(self) -> Union['tasks_group.TasksGroup', None]: + """Gets the last TasksGroup added to the pipeline at the current level + of the pipeline definition.""" + groups = self.groups[-1].groups + return groups[-1] if groups else None + def remove_task_from_groups(self, task: pipeline_task.PipelineTask): """Removes a task from the pipeline. diff --git a/sdk/python/kfp/dsl/tasks_group.py b/sdk/python/kfp/dsl/tasks_group.py index 42d1446a9d6..689a502b6cd 100644 --- a/sdk/python/kfp/dsl/tasks_group.py +++ b/sdk/python/kfp/dsl/tasks_group.py @@ -13,8 +13,9 @@ # limitations under the License. """Definition for TasksGroup.""" +import copy import enum -from typing import Optional, Union +from typing import List, Optional, Union from kfp.dsl import for_loop from kfp.dsl import pipeline_channel @@ -52,7 +53,7 @@ def __init__( group_type: TasksGroupType, name: Optional[str] = None, is_root: bool = False, - ): + ) -> None: """Create a new instance of TasksGroup. Args: @@ -117,7 +118,7 @@ def __init__( self, exit_task: pipeline_task.PipelineTask, name: Optional[str] = None, - ): + ) -> None: """Initializes a Condition task group.""" super().__init__( group_type=TasksGroupType.EXIT_HANDLER, @@ -138,9 +139,31 @@ def __init__( self.exit_task = exit_task -class Condition(TasksGroup): - """A class for creating conditional control flow within a pipeline - definition. +class _ConditionBase(TasksGroup): + """Parent class for condition control flow context managers (Condition, If, + Elif, Else). + + Args: + condition: A list of binary operations to be combined via conjunction. + name: The name of the condition group. + """ + + def __init__( + self, + condition: List[pipeline_channel.BinaryOperation], + name: Optional[str] = None, + ) -> None: + super().__init__( + group_type=TasksGroupType.CONDITION, + name=name, + is_root=False, + ) + self.condition: List[pipeline_channel.BinaryOperation] = condition + + +class Condition(_ConditionBase): + """A class for creating a conditional control flow "if" block within a + pipeline. Args: condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. @@ -150,22 +173,153 @@ class Condition(TasksGroup): :: task1 = my_component1(...) - with Condition(task1.output=='pizza', 'pizza-condition'): + with dsl.Condition(task1.output=='pizza', 'pizza-condition'): task2 = my_component2(...) """ def __init__( self, - condition: pipeline_channel.ConditionOperator, + condition, name: Optional[str] = None, - ): - """Initializes a conditional task group.""" + ) -> None: super().__init__( - group_type=TasksGroupType.CONDITION, + condition=[condition], + name=name, + ) + if isinstance(condition, bool): + raise ValueError( + f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.' + ) + copied_condition = copy.copy(condition) + copied_condition.negate = True + self._negated_upstream_conditions = [copied_condition] + + +class If(Condition): + """A class for creating a conditional control flow "if" block within a + pipeline. Identical to dsl.Condition. + + Args: + condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. + name: The name of the condition group. + + Example: + :: + + task1 = my_component1(...) + with dsl.If(task1.output=='pizza', 'pizza-condition'): + task2 = my_component2(...) + """ + + +class Elif(_ConditionBase): + """A class for creating a conditional control flow "else if" block within a + pipeline. Can be used following an upstream dsl.If or dsl.Elif. + + Args: + condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. + name: The name of the condition group. + + Example: + :: + + task1 = my_component1(...) + task2 = my_component2(...) + with dsl.If(task1.output=='pizza', 'pizza-condition'): + task3 = my_component3(...) + + with dsl.Elif(task2.output=='pasta', 'pasta-condition'): + task4 = my_component4(...) + """ + + def __init__( + self, + condition, + name: Optional[str] = None, + ) -> None: + prev_cond = pipeline_context.Pipeline.get_default_pipeline( + ).get_last_tasks_group() + if not isinstance(prev_cond, (Condition, If, Elif)): + # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python + raise InvalidControlFlowException( + 'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' + ) + + if isinstance(condition, bool): + raise ValueError( + f'Got constant boolean {condition} as a condition. This is likely because the provided condition evaluated immediately. At least one of the operands must be an output from an upstream task or a pipeline parameter.' + ) + + copied_condition = copy.copy(condition) + copied_condition.negate = True + self._negated_upstream_conditions = _shallow_copy_list_of_binary_operations( + prev_cond._negated_upstream_conditions) + [copied_condition] + + conditions = _shallow_copy_list_of_binary_operations( + prev_cond._negated_upstream_conditions) + conditions.append(condition) + + super().__init__( + condition=conditions, name=name, - is_root=False, ) - self.condition = condition + + +class Else(_ConditionBase): + """A class for creating a conditional control flow "else" block within a + pipeline. Can be used following an upstream dsl.If or dsl.Elif. + + Args: + name: The name of the condition group. + + Example: + :: + + task1 = my_component1(...) + task2 = my_component2(...) + with dsl.If(task1.output=='pizza', 'pizza-condition'): + task3 = my_component3(...) + + with dsl.Elif(task2.output=='pasta', 'pasta-condition'): + task4 = my_component4(...) + + with dsl.Else(): + my_component5(...) + """ + + def __init__( + self, + name: Optional[str] = None, + ) -> None: + prev_cond = pipeline_context.Pipeline.get_default_pipeline( + ).get_last_tasks_group() + + if isinstance(prev_cond, Else): + # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python + raise InvalidControlFlowException( + 'Cannot use dsl.Else following another dsl.Else. dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' + ) + if not isinstance(prev_cond, (Condition, If, Elif)): + # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python + raise InvalidControlFlowException( + 'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' + ) + + super().__init__( + condition=prev_cond._negated_upstream_conditions, + name=name, + ) + + +class InvalidControlFlowException(Exception): + pass + + +def _shallow_copy_list_of_binary_operations( + operations: List[pipeline_channel.BinaryOperation] +) -> List[pipeline_channel.BinaryOperation]: + # shallow copy is sufficient to allow us to invert the negate flag of a BinaryOperation without affecting copies. deep copy not needed and would result in many copies of the full pipeline since PipelineChannels hold references to the pipeline. + return [copy.copy(operation) for operation in operations] class ParallelFor(TasksGroup): @@ -198,7 +352,7 @@ def __init__( items: Union[for_loop.ItemList, pipeline_channel.PipelineChannel], name: Optional[str] = None, parallelism: Optional[int] = None, - ): + ) -> None: """Initializes a for loop task group.""" parallelism = parallelism or 0 if parallelism < 0: diff --git a/sdk/python/test_data/pipelines/if_elif_else.py b/sdk/python/test_data/pipelines/if_elif_else.py new file mode 100644 index 00000000000..4f0332a88ee --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else.py @@ -0,0 +1,51 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp import compiler +from kfp import dsl + + +@dsl.component +def flip_coin() -> str: + import random + val = random.randint(0, 2) + + if val == 0: + return 'heads' + elif val == 1: + return 'tails' + else: + return 'draw' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.pipeline +def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Elif(flip_coin_task.output == 'tails'): + print_and_return(text='Got tails!') + with dsl.Else(): + print_and_return(text='Draw!') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=flip_coin_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_elif_else.yaml b/sdk/python/test_data/pipelines/if_elif_else.yaml new file mode 100644 index 00000000000..f227156a179 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else.yaml @@ -0,0 +1,275 @@ +# PIPELINE DEFINITION +# Name: flip-coin-pipeline +components: + comp-condition-1: + dag: + tasks: + print-and-return: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return + inputs: + parameters: + text: + runtimeValue: + constant: Got heads! + taskInfo: + name: print-and-return + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-condition-2: + dag: + tasks: + print-and-return-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-2 + inputs: + parameters: + text: + runtimeValue: + constant: Got tails! + taskInfo: + name: print-and-return-2 + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-condition-3: + dag: + tasks: + print-and-return-3: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-3 + inputs: + parameters: + text: + runtimeValue: + constant: Draw! + taskInfo: + name: print-and-return-3 + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-flip-coin: + executorLabel: exec-flip-coin + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return: + executorLabel: exec-print-and-return + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-2: + executorLabel: exec-print-and-return-2 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-3: + executorLabel: exec-print-and-return-3 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-flip-coin: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - flip_coin + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef flip_coin() -> str:\n import random\n val = random.randint(0,\ + \ 2)\n\n if val == 0:\n return 'heads'\n elif val == 1:\n \ + \ return 'tails'\n else:\n return 'draw'\n\n" + image: python:3.7 + exec-print-and-return: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-3: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 +pipelineInfo: + name: flip-coin-pipeline +root: + dag: + tasks: + condition-1: + componentRef: + name: comp-condition-1 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-1 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--flip-coin-Output'] + == 'heads' + condition-2: + componentRef: + name: comp-condition-2 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-2 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + == ''heads'') && inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + == ''tails''' + condition-3: + componentRef: + name: comp-condition-3 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-3 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + == ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + == ''tails'')' + flip-coin: + cachingOptions: + enableCache: true + componentRef: + name: comp-flip-coin + taskInfo: + name: flip-coin +schemaVersion: 2.1.0 +sdkVersion: kfp-2.1.2 diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.py b/sdk/python/test_data/pipelines/if_elif_else_complex.py new file mode 100644 index 00000000000..9740636ed60 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.py @@ -0,0 +1,60 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp import compiler +from kfp import dsl + + +@dsl.component +def int_zero_through_three() -> int: + import random + return random.randint(0, 3) + + +@dsl.component +def flip_coin() -> str: + import random + return 'heads' if random.randint(0, 1) == 0 else 'tails' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.pipeline +def flip_coin_pipeline(confirm: bool): + int_task = int_zero_through_three() + flip_coin_task = flip_coin() + + with dsl.If(flip_coin_task.output == 'heads'): + with dsl.If(int_task.output == 0): + print_and_return(text='Got zero!') + + with dsl.Elif(int_task.output == 1): + task = print_and_return(text='Got one!') + with dsl.If(confirm == True): + print_and_return(text='Confirmed: definitely got one.') + + with dsl.Elif(int_task.output == 2): + print_and_return(text='Got two!') + + with dsl.Else(): + print_and_return(text='Got three!') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=flip_coin_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.yaml b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml new file mode 100644 index 00000000000..acf26dd3d03 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml @@ -0,0 +1,511 @@ +# PIPELINE DEFINITION +# Name: flip-coin-pipeline +# Inputs: +# confirm: bool +components: + comp-condition-1: + dag: + tasks: + condition-2: + componentRef: + name: comp-condition-2 + inputs: + parameters: + pipelinechannel--flip-coin-Output: + componentInputParameter: pipelinechannel--flip-coin-Output + pipelinechannel--int-zero-through-three-Output: + componentInputParameter: pipelinechannel--int-zero-through-three-Output + taskInfo: + name: condition-2 + triggerPolicy: + condition: int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) + == 0 + condition-3: + componentRef: + name: comp-condition-3 + inputs: + parameters: + pipelinechannel--confirm: + componentInputParameter: pipelinechannel--confirm + pipelinechannel--flip-coin-Output: + componentInputParameter: pipelinechannel--flip-coin-Output + pipelinechannel--int-zero-through-three-Output: + componentInputParameter: pipelinechannel--int-zero-through-three-Output + taskInfo: + name: condition-3 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 0) && int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 1' + condition-5: + componentRef: + name: comp-condition-5 + inputs: + parameters: + pipelinechannel--flip-coin-Output: + componentInputParameter: pipelinechannel--flip-coin-Output + pipelinechannel--int-zero-through-three-Output: + componentInputParameter: pipelinechannel--int-zero-through-three-Output + taskInfo: + name: condition-5 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 0) && !(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 1) && int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 2' + condition-6: + componentRef: + name: comp-condition-6 + inputs: + parameters: + pipelinechannel--flip-coin-Output: + componentInputParameter: pipelinechannel--flip-coin-Output + pipelinechannel--int-zero-through-three-Output: + componentInputParameter: pipelinechannel--int-zero-through-three-Output + taskInfo: + name: condition-6 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 0) && !(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 1) && !(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) + == 2)' + inputDefinitions: + parameters: + pipelinechannel--confirm: + parameterType: BOOLEAN + pipelinechannel--flip-coin-Output: + parameterType: STRING + pipelinechannel--int-zero-through-three-Output: + parameterType: NUMBER_INTEGER + comp-condition-2: + dag: + tasks: + print-and-return: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return + inputs: + parameters: + text: + runtimeValue: + constant: Got zero! + taskInfo: + name: print-and-return + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + pipelinechannel--int-zero-through-three-Output: + parameterType: NUMBER_INTEGER + comp-condition-3: + dag: + tasks: + condition-4: + componentRef: + name: comp-condition-4 + inputs: + parameters: + pipelinechannel--confirm: + componentInputParameter: pipelinechannel--confirm + pipelinechannel--flip-coin-Output: + componentInputParameter: pipelinechannel--flip-coin-Output + pipelinechannel--int-zero-through-three-Output: + componentInputParameter: pipelinechannel--int-zero-through-three-Output + taskInfo: + name: condition-4 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--confirm'] == true + print-and-return-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-2 + inputs: + parameters: + text: + runtimeValue: + constant: Got one! + taskInfo: + name: print-and-return-2 + inputDefinitions: + parameters: + pipelinechannel--confirm: + parameterType: BOOLEAN + pipelinechannel--flip-coin-Output: + parameterType: STRING + pipelinechannel--int-zero-through-three-Output: + parameterType: NUMBER_INTEGER + comp-condition-4: + dag: + tasks: + print-and-return-3: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-3 + inputs: + parameters: + text: + runtimeValue: + constant: 'Confirmed: definitely got one.' + taskInfo: + name: print-and-return-3 + inputDefinitions: + parameters: + pipelinechannel--confirm: + parameterType: BOOLEAN + pipelinechannel--flip-coin-Output: + parameterType: STRING + pipelinechannel--int-zero-through-three-Output: + parameterType: NUMBER_INTEGER + comp-condition-5: + dag: + tasks: + print-and-return-4: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-4 + inputs: + parameters: + text: + runtimeValue: + constant: Got two! + taskInfo: + name: print-and-return-4 + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + pipelinechannel--int-zero-through-three-Output: + parameterType: NUMBER_INTEGER + comp-condition-6: + dag: + tasks: + print-and-return-5: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-5 + inputs: + parameters: + text: + runtimeValue: + constant: Got three! + taskInfo: + name: print-and-return-5 + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + pipelinechannel--int-zero-through-three-Output: + parameterType: NUMBER_INTEGER + comp-flip-coin: + executorLabel: exec-flip-coin + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-int-zero-through-three: + executorLabel: exec-int-zero-through-three + outputDefinitions: + parameters: + Output: + parameterType: NUMBER_INTEGER + comp-print-and-return: + executorLabel: exec-print-and-return + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-2: + executorLabel: exec-print-and-return-2 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-3: + executorLabel: exec-print-and-return-3 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-4: + executorLabel: exec-print-and-return-4 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-5: + executorLabel: exec-print-and-return-5 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-flip-coin: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - flip_coin + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef flip_coin() -> str:\n import random\n return 'heads' if\ + \ random.randint(0, 1) == 0 else 'tails'\n\n" + image: python:3.7 + exec-int-zero-through-three: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - int_zero_through_three + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef int_zero_through_three() -> int:\n import random\n return\ + \ random.randint(0, 3)\n\n" + image: python:3.7 + exec-print-and-return: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-3: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-4: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-5: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 +pipelineInfo: + name: flip-coin-pipeline +root: + dag: + tasks: + condition-1: + componentRef: + name: comp-condition-1 + dependentTasks: + - flip-coin + - int-zero-through-three + inputs: + parameters: + pipelinechannel--confirm: + componentInputParameter: confirm + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + pipelinechannel--int-zero-through-three-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-zero-through-three + taskInfo: + name: condition-1 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--flip-coin-Output'] + == 'heads' + flip-coin: + cachingOptions: + enableCache: true + componentRef: + name: comp-flip-coin + taskInfo: + name: flip-coin + int-zero-through-three: + cachingOptions: + enableCache: true + componentRef: + name: comp-int-zero-through-three + taskInfo: + name: int-zero-through-three + inputDefinitions: + parameters: + confirm: + parameterType: BOOLEAN +schemaVersion: 2.1.0 +sdkVersion: kfp-2.1.2 diff --git a/sdk/python/test_data/pipelines/if_else.py b/sdk/python/test_data/pipelines/if_else.py new file mode 100644 index 00000000000..1da8a074ac1 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_else.py @@ -0,0 +1,42 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp import compiler +from kfp import dsl + + +@dsl.component +def flip_coin() -> str: + import random + return 'heads' if random.randint(0, 1) == 0 else 'tails' + + +@dsl.component +def print_and_return(text: str) -> str: + print(text) + return text + + +@dsl.pipeline +def flip_coin_pipeline(): + flip_coin_task = flip_coin() + with dsl.If(flip_coin_task.output == 'heads'): + print_and_return(text='Got heads!') + with dsl.Else(): + print_and_return(text='Got tails!') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=flip_coin_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_else.yaml b/sdk/python/test_data/pipelines/if_else.yaml new file mode 100644 index 00000000000..ab0bc37fe60 --- /dev/null +++ b/sdk/python/test_data/pipelines/if_else.yaml @@ -0,0 +1,199 @@ +# PIPELINE DEFINITION +# Name: flip-coin-pipeline +components: + comp-condition-1: + dag: + tasks: + print-and-return: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return + inputs: + parameters: + text: + runtimeValue: + constant: Got heads! + taskInfo: + name: print-and-return + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-condition-2: + dag: + tasks: + print-and-return-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-2 + inputs: + parameters: + text: + runtimeValue: + constant: Got tails! + taskInfo: + name: print-and-return-2 + inputDefinitions: + parameters: + pipelinechannel--flip-coin-Output: + parameterType: STRING + comp-flip-coin: + executorLabel: exec-flip-coin + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return: + executorLabel: exec-print-and-return + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-2: + executorLabel: exec-print-and-return-2 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-flip-coin: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - flip_coin + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef flip_coin() -> str:\n import random\n return 'heads' if\ + \ random.randint(0, 1) == 0 else 'tails'\n\n" + image: python:3.7 + exec-print-and-return: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 +pipelineInfo: + name: flip-coin-pipeline +root: + dag: + tasks: + condition-1: + componentRef: + name: comp-condition-1 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-1 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--flip-coin-Output'] + == 'heads' + condition-2: + componentRef: + name: comp-condition-2 + dependentTasks: + - flip-coin + inputs: + parameters: + pipelinechannel--flip-coin-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: flip-coin + taskInfo: + name: condition-2 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + == ''heads'')' + flip-coin: + cachingOptions: + enableCache: true + componentRef: + name: comp-flip-coin + taskInfo: + name: flip-coin +schemaVersion: 2.1.0 +sdkVersion: kfp-2.1.2 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index 02aae9d1da0..5b8436dd496 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -168,6 +168,15 @@ pipelines: - module: pipeline_with_metadata_fields name: dataset_concatenator execute: false + - module: if_else + name: flip_coin_pipeline + execute: false + - module: if_elif_else + name: flip_coin_pipeline + execute: false + - module: if_elif_else_complex + name: flip_coin_pipeline + execute: false components: test_data_dir: sdk/python/test_data/components read: true From bbf37194cb0181416a57e06ea016b85b9f382aeb Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Thu, 17 Aug 2023 10:35:08 -0700 Subject: [PATCH 2/6] deprecate dsl.Condition --- sdk/python/kfp/dsl/tasks_group.py | 29 +++++++++++++------------- sdk/python/kfp/dsl/tasks_group_test.py | 24 +++++++++++++++++++-- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/sdk/python/kfp/dsl/tasks_group.py b/sdk/python/kfp/dsl/tasks_group.py index 689a502b6cd..78f51cb4bf9 100644 --- a/sdk/python/kfp/dsl/tasks_group.py +++ b/sdk/python/kfp/dsl/tasks_group.py @@ -16,6 +16,7 @@ import copy import enum from typing import List, Optional, Union +import warnings from kfp.dsl import for_loop from kfp.dsl import pipeline_channel @@ -161,7 +162,7 @@ def __init__( self.condition: List[pipeline_channel.BinaryOperation] = condition -class Condition(_ConditionBase): +class If(_ConditionBase): """A class for creating a conditional control flow "if" block within a pipeline. @@ -173,7 +174,7 @@ class Condition(_ConditionBase): :: task1 = my_component1(...) - with dsl.Condition(task1.output=='pizza', 'pizza-condition'): + with dsl.If(task1.output=='pizza', 'pizza-condition'): task2 = my_component2(...) """ @@ -195,22 +196,20 @@ def __init__( self._negated_upstream_conditions = [copied_condition] -class If(Condition): - """A class for creating a conditional control flow "if" block within a - pipeline. Identical to dsl.Condition. - - Args: - condition: A comparative expression that evaluates to True or False. At least one of the operands must be an output from an upstream task or a pipeline parameter. - name: The name of the condition group. - - Example: - :: +class Condition(If): + """Deprecated. - task1 = my_component1(...) - with dsl.If(task1.output=='pizza', 'pizza-condition'): - task2 = my_component2(...) + Use dsl.If instead. """ + def __enter__(self): + super().__enter__() + warnings.warn( + 'dsl.Condition is deprecated. Please use dsl.If instead.', + category=DeprecationWarning, + stacklevel=2) + return self + class Elif(_ConditionBase): """A class for creating a conditional control flow "else if" block within a diff --git a/sdk/python/kfp/dsl/tasks_group_test.py b/sdk/python/kfp/dsl/tasks_group_test.py index 09ba5cdbc34..40c68ab3725 100644 --- a/sdk/python/kfp/dsl/tasks_group_test.py +++ b/sdk/python/kfp/dsl/tasks_group_test.py @@ -12,13 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from absl.testing import parameterized +import unittest + +from kfp import dsl from kfp.dsl import for_loop from kfp.dsl import pipeline_context from kfp.dsl import tasks_group -class ParallelForTest(parameterized.TestCase): +class ParallelForTest(unittest.TestCase): def test_basic(self): loop_items = ['pizza', 'hotdog', 'pasta'] @@ -58,3 +60,21 @@ def test_parallelfor_invalid_parallelism(self): 'ParallelFor parallelism must be >= 0.'): with pipeline_context.Pipeline('pipeline') as p: tasks_group.ParallelFor(items=loop_items, parallelism=-1) + + +class TestConditionDeprecated(unittest.TestCase): + + def test(self): + + @dsl.component + def foo() -> str: + return 'foo' + + @dsl.pipeline + def my_pipeline(string: str): + with self.assertWarnsRegex( + DeprecationWarning, + 'dsl\.Condition is deprecated\. Please use dsl\.If instead\.' + ): + with dsl.Condition(string == 'text'): + foo() From cf67c5ce3caab9227c0e6d5c42549551038672c6 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Thu, 31 Aug 2023 08:55:45 -0700 Subject: [PATCH 3/6] alter rebase --- sdk/python/kfp/compiler/compiler_test.py | 1 + .../test_data/pipelines/if_elif_else.yaml | 36 ++++++----- .../pipelines/if_elif_else_complex.yaml | 63 ++++++++++--------- sdk/python/test_data/pipelines/if_else.yaml | 27 ++++---- 4 files changed, 71 insertions(+), 56 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index bf39f98b1c0..1e773800741 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -40,6 +40,7 @@ from kfp.dsl import OutputPath from kfp.dsl import pipeline_task from kfp.dsl import PipelineTaskFinalStatus +from kfp.dsl import tasks_group from kfp.dsl import yaml_component from kfp.dsl.types import type_utils from kfp.pipeline_spec import pipeline_spec_pb2 diff --git a/sdk/python/test_data/pipelines/if_elif_else.yaml b/sdk/python/test_data/pipelines/if_elif_else.yaml index f227156a179..faf5fb73a14 100644 --- a/sdk/python/test_data/pipelines/if_elif_else.yaml +++ b/sdk/python/test_data/pipelines/if_elif_else.yaml @@ -108,16 +108,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -137,16 +138,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -165,16 +167,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -193,16 +196,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.yaml b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml index acf26dd3d03..d0a923c20b4 100644 --- a/sdk/python/test_data/pipelines/if_elif_else_complex.yaml +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml @@ -277,16 +277,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -305,16 +306,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -333,16 +335,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -361,16 +364,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -389,16 +393,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -417,16 +422,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -445,16 +451,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ diff --git a/sdk/python/test_data/pipelines/if_else.yaml b/sdk/python/test_data/pipelines/if_else.yaml index ab0bc37fe60..a64ff7b87d0 100644 --- a/sdk/python/test_data/pipelines/if_else.yaml +++ b/sdk/python/test_data/pipelines/if_else.yaml @@ -79,16 +79,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -107,16 +108,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ @@ -135,16 +137,17 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ _RUNTIME=true python3 -m pip install --quiet --no-warn-script-location\ - \ 'kfp==2.1.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ - 3.9\"' && \"$0\" \"$@\"\n" + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" - sh - -ec - 'program_path=$(mktemp -d) + printf "%s" "$0" > "$program_path/ephemeral_component.py" - python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ From cdd8e11c0d07afde7d762e4c6120e9feff690037 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Thu, 7 Sep 2023 13:20:26 -0500 Subject: [PATCH 4/6] update release notes --- sdk/RELEASE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 03954015443..bc2effc7057 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -2,6 +2,7 @@ ## Features +* Add support for `dsl.If`, `dsl.Elif`, and `dsl.Else` control flow context managers; deprecate `dsl.Condition` in favor of `dsl.If` [\#9894](https://github.com/kubeflow/pipelines/pull/9894) ## Breaking changes From 3eebc74c5a6fe6daaaa7d3d9afa905ab9ccc1703 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Fri, 8 Sep 2023 14:49:26 -0500 Subject: [PATCH 5/6] address review feedback --- sdk/python/kfp/compiler/compiler_test.py | 2 +- sdk/python/kfp/compiler/compiler_utils.py | 6 +- .../kfp/compiler/pipeline_spec_builder.py | 4 +- sdk/python/kfp/dsl/pipeline_context.py | 4 +- sdk/python/kfp/dsl/tasks_group.py | 12 +- .../test_data/pipelines/if_elif_else.py | 8 +- .../test_data/pipelines/if_elif_else.yaml | 69 +- .../pipelines/if_elif_else_complex.py | 64 +- .../pipelines/if_elif_else_complex.yaml | 674 ++++++++++++++---- sdk/python/test_data/test_data_config.yaml | 4 +- 10 files changed, 633 insertions(+), 214 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 1e773800741..56407cd7524 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -4401,7 +4401,7 @@ def flip_coin_pipeline(): def test_elif_no_if_not_supported(self): with self.assertRaisesRegex( tasks_group.InvalidControlFlowException, - r'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.' + r'dsl\.Elif can only be used following an upstream dsl\.If or dsl\.Elif\.' ): @dsl.pipeline diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index b4ffb755921..498b9c19fd3 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -123,8 +123,8 @@ def get_channels_from_condition( operations: List[pipeline_channel.BinaryOperation], collected_channels: list, ) -> None: - """Append to collected_channels each pipeline channels used in each operand - of each operation in operations.""" + """Appends to collected_channels each pipeline channels used in each + operand of each operation in operations.""" for operation in operations: for operand in [operation.left_operand, operation.right_operand]: if isinstance(operand, pipeline_channel.PipelineChannel): @@ -153,7 +153,7 @@ def _get_condition_channels_for_tasks_helper( if isinstance(group, tasks_group._ConditionBase): new_current_conditions_channels = list(current_conditions_channels) get_channels_from_condition( - group.condition, + group.conditions, new_current_conditions_channels, ) diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 98ab68ea9ba..caaf46cb975 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -858,7 +858,7 @@ def _update_task_spec_for_condition_group( group: The condition group to update task spec for. pipeline_task_spec: The pipeline task spec to update in place. """ - condition = _binary_operations_to_cel_conjunctive(group.condition) + condition = _binary_operations_to_cel_conjunctive(group.conditions) pipeline_task_spec.trigger_policy.CopyFrom( pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy(condition=condition)) @@ -1260,7 +1260,7 @@ def build_spec_by_group( condition_subgroup_channels = list(subgroup_input_channels) compiler_utils.get_channels_from_condition( - subgroup.condition, condition_subgroup_channels) + subgroup.conditions, condition_subgroup_channels) subgroup_component_spec = build_component_spec_for_group( input_pipeline_channels=condition_subgroup_channels, diff --git a/sdk/python/kfp/dsl/pipeline_context.py b/sdk/python/kfp/dsl/pipeline_context.py index a04566af018..72ada197ae5 100644 --- a/sdk/python/kfp/dsl/pipeline_context.py +++ b/sdk/python/kfp/dsl/pipeline_context.py @@ -14,7 +14,7 @@ """Definition for Pipeline.""" import functools -from typing import Callable, Optional, Union +from typing import Callable, Optional from kfp.dsl import component_factory from kfp.dsl import pipeline_task @@ -189,7 +189,7 @@ def pop_tasks_group(self): """Removes the current TasksGroup from the stack.""" del self.groups[-1] - def get_last_tasks_group(self) -> Union['tasks_group.TasksGroup', None]: + def get_last_tasks_group(self) -> Optional['tasks_group.TasksGroup']: """Gets the last TasksGroup added to the pipeline at the current level of the pipeline definition.""" groups = self.groups[-1].groups diff --git a/sdk/python/kfp/dsl/tasks_group.py b/sdk/python/kfp/dsl/tasks_group.py index 78f51cb4bf9..48424ce47e4 100644 --- a/sdk/python/kfp/dsl/tasks_group.py +++ b/sdk/python/kfp/dsl/tasks_group.py @@ -151,7 +151,7 @@ class _ConditionBase(TasksGroup): def __init__( self, - condition: List[pipeline_channel.BinaryOperation], + conditions: List[pipeline_channel.BinaryOperation], name: Optional[str] = None, ) -> None: super().__init__( @@ -159,7 +159,7 @@ def __init__( name=name, is_root=False, ) - self.condition: List[pipeline_channel.BinaryOperation] = condition + self.conditions: List[pipeline_channel.BinaryOperation] = conditions class If(_ConditionBase): @@ -184,7 +184,7 @@ def __init__( name: Optional[str] = None, ) -> None: super().__init__( - condition=[condition], + conditions=[condition], name=name, ) if isinstance(condition, bool): @@ -241,7 +241,7 @@ def __init__( if not isinstance(prev_cond, (Condition, If, Elif)): # prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python raise InvalidControlFlowException( - 'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.' + 'dsl.Elif can only be used following an upstream dsl.If or dsl.Elif.' ) if isinstance(condition, bool): @@ -259,7 +259,7 @@ def __init__( conditions.append(condition) super().__init__( - condition=conditions, + conditions=conditions, name=name, ) @@ -305,7 +305,7 @@ def __init__( ) super().__init__( - condition=prev_cond._negated_upstream_conditions, + conditions=prev_cond._negated_upstream_conditions, name=name, ) diff --git a/sdk/python/test_data/pipelines/if_elif_else.py b/sdk/python/test_data/pipelines/if_elif_else.py index 4f0332a88ee..fdaa3428f64 100644 --- a/sdk/python/test_data/pipelines/if_elif_else.py +++ b/sdk/python/test_data/pipelines/if_elif_else.py @@ -16,7 +16,7 @@ @dsl.component -def flip_coin() -> str: +def flip_three_sided_die() -> str: import random val = random.randint(0, 2) @@ -35,8 +35,8 @@ def print_and_return(text: str) -> str: @dsl.pipeline -def flip_coin_pipeline(): - flip_coin_task = flip_coin() +def roll_die_pipeline(): + flip_coin_task = flip_three_sided_die() with dsl.If(flip_coin_task.output == 'heads'): print_and_return(text='Got heads!') with dsl.Elif(flip_coin_task.output == 'tails'): @@ -47,5 +47,5 @@ def flip_coin_pipeline(): if __name__ == '__main__': compiler.Compiler().compile( - pipeline_func=flip_coin_pipeline, + pipeline_func=roll_die_pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_elif_else.yaml b/sdk/python/test_data/pipelines/if_elif_else.yaml index faf5fb73a14..3a353f79a96 100644 --- a/sdk/python/test_data/pipelines/if_elif_else.yaml +++ b/sdk/python/test_data/pipelines/if_elif_else.yaml @@ -1,5 +1,5 @@ # PIPELINE DEFINITION -# Name: flip-coin-pipeline +# Name: roll-die-pipeline components: comp-condition-1: dag: @@ -18,7 +18,7 @@ components: name: print-and-return inputDefinitions: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--flip-three-sided-die-Output: parameterType: STRING comp-condition-2: dag: @@ -37,7 +37,7 @@ components: name: print-and-return-2 inputDefinitions: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--flip-three-sided-die-Output: parameterType: STRING comp-condition-3: dag: @@ -56,10 +56,10 @@ components: name: print-and-return-3 inputDefinitions: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--flip-three-sided-die-Output: parameterType: STRING - comp-flip-coin: - executorLabel: exec-flip-coin + comp-flip-three-sided-die: + executorLabel: exec-flip-three-sided-die outputDefinitions: parameters: Output: @@ -96,19 +96,19 @@ components: parameterType: STRING deploymentSpec: executors: - exec-flip-coin: + exec-flip-three-sided-die: container: args: - --executor_input - '{{$}}' - --function_to_execute - - flip_coin + - flip_three_sided_die command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -122,9 +122,10 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef flip_coin() -> str:\n import random\n val = random.randint(0,\ - \ 2)\n\n if val == 0:\n return 'heads'\n elif val == 1:\n \ - \ return 'tails'\n else:\n return 'draw'\n\n" + \ *\n\ndef flip_three_sided_die() -> str:\n import random\n val =\ + \ random.randint(0, 2)\n\n if val == 0:\n return 'heads'\n \ + \ elif val == 1:\n return 'tails'\n else:\n return 'draw'\n\ + \n" image: python:3.7 exec-print-and-return: container: @@ -138,7 +139,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -167,7 +168,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -196,7 +197,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -214,7 +215,7 @@ deploymentSpec: \ text\n\n" image: python:3.7 pipelineInfo: - name: flip-coin-pipeline + name: roll-die-pipeline root: dag: tasks: @@ -222,58 +223,58 @@ root: componentRef: name: comp-condition-1 dependentTasks: - - flip-coin + - flip-three-sided-die inputs: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--flip-three-sided-die-Output: taskOutputParameter: outputParameterKey: Output - producerTask: flip-coin + producerTask: flip-three-sided-die taskInfo: name: condition-1 triggerPolicy: - condition: inputs.parameter_values['pipelinechannel--flip-coin-Output'] + condition: inputs.parameter_values['pipelinechannel--flip-three-sided-die-Output'] == 'heads' condition-2: componentRef: name: comp-condition-2 dependentTasks: - - flip-coin + - flip-three-sided-die inputs: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--flip-three-sided-die-Output: taskOutputParameter: outputParameterKey: Output - producerTask: flip-coin + producerTask: flip-three-sided-die taskInfo: name: condition-2 triggerPolicy: - condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] - == ''heads'') && inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] + == ''heads'') && inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] == ''tails''' condition-3: componentRef: name: comp-condition-3 dependentTasks: - - flip-coin + - flip-three-sided-die inputs: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--flip-three-sided-die-Output: taskOutputParameter: outputParameterKey: Output - producerTask: flip-coin + producerTask: flip-three-sided-die taskInfo: name: condition-3 triggerPolicy: - condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] - == ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-coin-Output''] + condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] + == ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output''] == ''tails'')' - flip-coin: + flip-three-sided-die: cachingOptions: enableCache: true componentRef: - name: comp-flip-coin + name: comp-flip-three-sided-die taskInfo: - name: flip-coin + name: flip-three-sided-die schemaVersion: 2.1.0 -sdkVersion: kfp-2.1.2 +sdkVersion: kfp-2.1.3 diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.py b/sdk/python/test_data/pipelines/if_elif_else_complex.py index 9740636ed60..42623cb5084 100644 --- a/sdk/python/test_data/pipelines/if_elif_else_complex.py +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.py @@ -11,20 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import List + from kfp import compiler from kfp import dsl @dsl.component -def int_zero_through_three() -> int: +def int_0_to_9999() -> int: import random - return random.randint(0, 3) + return random.randint(0, 9999) @dsl.component -def flip_coin() -> str: - import random - return 'heads' if random.randint(0, 1) == 0 else 'tails' +def is_even_or_odd(num: int) -> str: + return 'odd' if num % 2 else 'even' @dsl.component @@ -33,28 +34,53 @@ def print_and_return(text: str) -> str: return text +@dsl.component +def print_strings(strings: List[str]): + print(strings) + + @dsl.pipeline -def flip_coin_pipeline(confirm: bool): - int_task = int_zero_through_three() - flip_coin_task = flip_coin() +def lucky_number_pipeline(add_drumroll: bool = True, + repeat_if_lucky_number: bool = True, + trials: List[int] = [1, 2, 3]): + with dsl.ParallelFor(trials) as trial: + int_task = int_0_to_9999().set_caching_options(False) + with dsl.If(add_drumroll == True): + with dsl.If(trial == 3): + print_and_return(text='Adding drumroll on last trial!') - with dsl.If(flip_coin_task.output == 'heads'): - with dsl.If(int_task.output == 0): - print_and_return(text='Got zero!') + with dsl.If(int_task.output < 5000): - with dsl.Elif(int_task.output == 1): - task = print_and_return(text='Got one!') - with dsl.If(confirm == True): - print_and_return(text='Confirmed: definitely got one.') + even_or_odd_task = is_even_or_odd(num=int_task.output) - with dsl.Elif(int_task.output == 2): - print_and_return(text='Got two!') + with dsl.If(even_or_odd_task.output == 'even'): + print_and_return(text='Got a low even number!') + with dsl.Else(): + print_and_return(text='Got a low odd number!') + + with dsl.Elif(int_task.output > 5000): + + even_or_odd_task = is_even_or_odd(num=int_task.output) + + with dsl.If(even_or_odd_task.output == 'even'): + print_and_return(text='Got a high even number!') + with dsl.Else(): + print_and_return(text='Got a high odd number!') with dsl.Else(): - print_and_return(text='Got three!') + print_and_return( + text='Announcing: Got the lucky number 5000! A one in 10,000 chance.' + ) + with dsl.If(repeat_if_lucky_number == True): + with dsl.ParallelFor([1, 2]) as _: + print_and_return( + text='Announcing again: Got the lucky number 5000! A one in 10,000 chance.' + ) + + print_strings(strings=dsl.Collected(even_or_odd_task.output)) if __name__ == '__main__': compiler.Compiler().compile( - pipeline_func=flip_coin_pipeline, + pipeline_func=lucky_number_pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/if_elif_else_complex.yaml b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml index d0a923c20b4..7fe0116c46b 100644 --- a/sdk/python/test_data/pipelines/if_elif_else_complex.yaml +++ b/sdk/python/test_data/pipelines/if_elif_else_complex.yaml @@ -1,83 +1,94 @@ # PIPELINE DEFINITION -# Name: flip-coin-pipeline +# Name: lucky-number-pipeline # Inputs: -# confirm: bool +# add_drumroll: bool [Default: True] +# repeat_if_lucky_number: bool [Default: True] +# trials: list [Default: [1.0, 2.0, 3.0]] components: - comp-condition-1: + comp-condition-10: dag: tasks: - condition-2: + condition-11: componentRef: - name: comp-condition-2 + name: comp-condition-11 inputs: parameters: - pipelinechannel--flip-coin-Output: - componentInputParameter: pipelinechannel--flip-coin-Output - pipelinechannel--int-zero-through-three-Output: - componentInputParameter: pipelinechannel--int-zero-through-three-Output + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: pipelinechannel--repeat_if_lucky_number taskInfo: - name: condition-2 + name: condition-11 triggerPolicy: - condition: int(inputs.parameter_values['pipelinechannel--int-zero-through-three-Output']) - == 0 - condition-3: + condition: inputs.parameter_values['pipelinechannel--repeat_if_lucky_number'] + == true + print-and-return-6: + cachingOptions: + enableCache: true componentRef: - name: comp-condition-3 + name: comp-print-and-return-6 inputs: parameters: - pipelinechannel--confirm: - componentInputParameter: pipelinechannel--confirm - pipelinechannel--flip-coin-Output: - componentInputParameter: pipelinechannel--flip-coin-Output - pipelinechannel--int-zero-through-three-Output: - componentInputParameter: pipelinechannel--int-zero-through-three-Output + text: + runtimeValue: + constant: 'Announcing: Got the lucky number 5000! A one in 10,000 + chance.' taskInfo: - name: condition-3 - triggerPolicy: - condition: '!(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 0) && int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 1' - condition-5: + name: print-and-return-6 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + comp-condition-11: + dag: + tasks: + for-loop-13: componentRef: - name: comp-condition-5 + name: comp-for-loop-13 inputs: parameters: - pipelinechannel--flip-coin-Output: - componentInputParameter: pipelinechannel--flip-coin-Output - pipelinechannel--int-zero-through-three-Output: - componentInputParameter: pipelinechannel--int-zero-through-three-Output + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: pipelinechannel--repeat_if_lucky_number + parameterIterator: + itemInput: pipelinechannel--loop-item-param-12 + items: + raw: '[1, 2]' taskInfo: - name: condition-5 - triggerPolicy: - condition: '!(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 0) && !(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 1) && int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 2' - condition-6: + name: for-loop-13 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + comp-condition-2: + dag: + tasks: + condition-3: componentRef: - name: comp-condition-6 + name: comp-condition-3 inputs: parameters: - pipelinechannel--flip-coin-Output: - componentInputParameter: pipelinechannel--flip-coin-Output - pipelinechannel--int-zero-through-three-Output: - componentInputParameter: pipelinechannel--int-zero-through-three-Output + pipelinechannel--add_drumroll: + componentInputParameter: pipelinechannel--add_drumroll + pipelinechannel--trials-loop-item: + componentInputParameter: pipelinechannel--trials-loop-item taskInfo: - name: condition-6 + name: condition-3 triggerPolicy: - condition: '!(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 0) && !(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 1) && !(int(inputs.parameter_values[''pipelinechannel--int-zero-through-three-Output'']) - == 2)' + condition: int(inputs.parameter_values['pipelinechannel--trials-loop-item']) + == 3 inputDefinitions: parameters: - pipelinechannel--confirm: + pipelinechannel--add_drumroll: parameterType: BOOLEAN - pipelinechannel--flip-coin-Output: - parameterType: STRING - pipelinechannel--int-zero-through-three-Output: + pipelinechannel--trials-loop-item: parameterType: NUMBER_INTEGER - comp-condition-2: + comp-condition-3: dag: tasks: print-and-return: @@ -89,33 +100,72 @@ components: parameters: text: runtimeValue: - constant: Got zero! + constant: Adding drumroll on last trial! taskInfo: name: print-and-return inputDefinitions: parameters: - pipelinechannel--flip-coin-Output: - parameterType: STRING - pipelinechannel--int-zero-through-three-Output: + pipelinechannel--add_drumroll: + parameterType: BOOLEAN + pipelinechannel--trials-loop-item: parameterType: NUMBER_INTEGER - comp-condition-3: + comp-condition-4: dag: tasks: - condition-4: + condition-5: componentRef: - name: comp-condition-4 + name: comp-condition-5 + dependentTasks: + - is-even-or-odd inputs: parameters: - pipelinechannel--confirm: - componentInputParameter: pipelinechannel--confirm - pipelinechannel--flip-coin-Output: - componentInputParameter: pipelinechannel--flip-coin-Output - pipelinechannel--int-zero-through-three-Output: - componentInputParameter: pipelinechannel--int-zero-through-three-Output + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd taskInfo: - name: condition-4 + name: condition-5 triggerPolicy: - condition: inputs.parameter_values['pipelinechannel--confirm'] == true + condition: inputs.parameter_values['pipelinechannel--is-even-or-odd-Output'] + == 'even' + condition-6: + componentRef: + name: comp-condition-6 + dependentTasks: + - is-even-or-odd + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd + taskInfo: + name: condition-6 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--is-even-or-odd-Output''] + == ''even'')' + is-even-or-odd: + cachingOptions: + enableCache: true + componentRef: + name: comp-is-even-or-odd + inputs: + parameters: + num: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + taskInfo: + name: is-even-or-odd + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + comp-condition-5: + dag: + tasks: print-and-return-2: cachingOptions: enableCache: true @@ -125,18 +175,16 @@ components: parameters: text: runtimeValue: - constant: Got one! + constant: Got a low even number! taskInfo: name: print-and-return-2 inputDefinitions: parameters: - pipelinechannel--confirm: - parameterType: BOOLEAN - pipelinechannel--flip-coin-Output: - parameterType: STRING - pipelinechannel--int-zero-through-three-Output: + pipelinechannel--int-0-to-9999-Output: parameterType: NUMBER_INTEGER - comp-condition-4: + pipelinechannel--is-even-or-odd-Output: + parameterType: STRING + comp-condition-6: dag: tasks: print-and-return-3: @@ -148,18 +196,80 @@ components: parameters: text: runtimeValue: - constant: 'Confirmed: definitely got one.' + constant: Got a low odd number! taskInfo: name: print-and-return-3 inputDefinitions: parameters: - pipelinechannel--confirm: - parameterType: BOOLEAN - pipelinechannel--flip-coin-Output: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--is-even-or-odd-Output: parameterType: STRING - pipelinechannel--int-zero-through-three-Output: + comp-condition-7: + dag: + outputs: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + valueFromParameter: + outputParameterKey: Output + producerSubtask: is-even-or-odd-2 + tasks: + condition-8: + componentRef: + name: comp-condition-8 + dependentTasks: + - is-even-or-odd-2 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-2-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd-2 + taskInfo: + name: condition-8 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--is-even-or-odd-2-Output'] + == 'even' + condition-9: + componentRef: + name: comp-condition-9 + dependentTasks: + - is-even-or-odd-2 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + pipelinechannel--is-even-or-odd-2-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: is-even-or-odd-2 + taskInfo: + name: condition-9 + triggerPolicy: + condition: '!(inputs.parameter_values[''pipelinechannel--is-even-or-odd-2-Output''] + == ''even'')' + is-even-or-odd-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-is-even-or-odd-2 + inputs: + parameters: + num: + componentInputParameter: pipelinechannel--int-0-to-9999-Output + taskInfo: + name: is-even-or-odd-2 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: parameterType: NUMBER_INTEGER - comp-condition-5: + outputDefinitions: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + parameterType: NUMBER_INTEGER + comp-condition-8: dag: tasks: print-and-return-4: @@ -171,16 +281,16 @@ components: parameters: text: runtimeValue: - constant: Got two! + constant: Got a high even number! taskInfo: name: print-and-return-4 inputDefinitions: parameters: - pipelinechannel--flip-coin-Output: - parameterType: STRING - pipelinechannel--int-zero-through-three-Output: + pipelinechannel--int-0-to-9999-Output: parameterType: NUMBER_INTEGER - comp-condition-6: + pipelinechannel--is-even-or-odd-2-Output: + parameterType: STRING + comp-condition-9: dag: tasks: print-and-return-5: @@ -192,27 +302,160 @@ components: parameters: text: runtimeValue: - constant: Got three! + constant: Got a high odd number! taskInfo: name: print-and-return-5 inputDefinitions: parameters: - pipelinechannel--flip-coin-Output: + pipelinechannel--int-0-to-9999-Output: + parameterType: NUMBER_INTEGER + pipelinechannel--is-even-or-odd-2-Output: parameterType: STRING - pipelinechannel--int-zero-through-three-Output: + comp-for-loop-1: + dag: + outputs: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + valueFromParameter: + outputParameterKey: pipelinechannel--is-even-or-odd-2-Output + producerSubtask: condition-7 + tasks: + condition-10: + componentRef: + name: comp-condition-10 + dependentTasks: + - int-0-to-9999 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-0-to-9999 + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: pipelinechannel--repeat_if_lucky_number + taskInfo: + name: condition-10 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + < 5000) && !(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + > 5000)' + condition-2: + componentRef: + name: comp-condition-2 + inputs: + parameters: + pipelinechannel--add_drumroll: + componentInputParameter: pipelinechannel--add_drumroll + pipelinechannel--trials-loop-item: + componentInputParameter: pipelinechannel--trials-loop-item + taskInfo: + name: condition-2 + triggerPolicy: + condition: inputs.parameter_values['pipelinechannel--add_drumroll'] == + true + condition-4: + componentRef: + name: comp-condition-4 + dependentTasks: + - int-0-to-9999 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-0-to-9999 + taskInfo: + name: condition-4 + triggerPolicy: + condition: int(inputs.parameter_values['pipelinechannel--int-0-to-9999-Output']) + < 5000 + condition-7: + componentRef: + name: comp-condition-7 + dependentTasks: + - int-0-to-9999 + inputs: + parameters: + pipelinechannel--int-0-to-9999-Output: + taskOutputParameter: + outputParameterKey: Output + producerTask: int-0-to-9999 + taskInfo: + name: condition-7 + triggerPolicy: + condition: '!(int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + < 5000) && int(inputs.parameter_values[''pipelinechannel--int-0-to-9999-Output'']) + > 5000' + int-0-to-9999: + cachingOptions: {} + componentRef: + name: comp-int-0-to-9999 + taskInfo: + name: int-0-to-9999 + inputDefinitions: + parameters: + pipelinechannel--add_drumroll: + parameterType: BOOLEAN + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + pipelinechannel--trials: + parameterType: LIST + pipelinechannel--trials-loop-item: + parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + pipelinechannel--is-even-or-odd-2-Output: + parameterType: NUMBER_INTEGER + comp-for-loop-13: + dag: + tasks: + print-and-return-7: + cachingOptions: + enableCache: true + componentRef: + name: comp-print-and-return-7 + inputs: + parameters: + text: + runtimeValue: + constant: 'Announcing again: Got the lucky number 5000! A one in + 10,000 chance.' + taskInfo: + name: print-and-return-7 + inputDefinitions: + parameters: + pipelinechannel--int-0-to-9999-Output: parameterType: NUMBER_INTEGER - comp-flip-coin: - executorLabel: exec-flip-coin + pipelinechannel--loop-item-param-12: + parameterType: NUMBER_INTEGER + pipelinechannel--repeat_if_lucky_number: + parameterType: BOOLEAN + comp-int-0-to-9999: + executorLabel: exec-int-0-to-9999 outputDefinitions: parameters: Output: - parameterType: STRING - comp-int-zero-through-three: - executorLabel: exec-int-zero-through-three + parameterType: NUMBER_INTEGER + comp-is-even-or-odd: + executorLabel: exec-is-even-or-odd + inputDefinitions: + parameters: + num: + parameterType: NUMBER_INTEGER outputDefinitions: parameters: Output: + parameterType: STRING + comp-is-even-or-odd-2: + executorLabel: exec-is-even-or-odd-2 + inputDefinitions: + parameters: + num: parameterType: NUMBER_INTEGER + outputDefinitions: + parameters: + Output: + parameterType: STRING comp-print-and-return: executorLabel: exec-print-and-return inputDefinitions: @@ -263,21 +506,47 @@ components: parameters: Output: parameterType: STRING + comp-print-and-return-6: + executorLabel: exec-print-and-return-6 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-and-return-7: + executorLabel: exec-print-and-return-7 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-print-strings: + executorLabel: exec-print-strings + inputDefinitions: + parameters: + strings: + parameterType: LIST deploymentSpec: executors: - exec-flip-coin: + exec-int-0-to-9999: container: args: - --executor_input - '{{$}}' - --function_to_execute - - flip_coin + - int_0_to_9999 command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -291,22 +560,22 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef flip_coin() -> str:\n import random\n return 'heads' if\ - \ random.randint(0, 1) == 0 else 'tails'\n\n" + \ *\n\ndef int_0_to_9999() -> int:\n import random\n return random.randint(0,\ + \ 9999)\n\n" image: python:3.7 - exec-int-zero-through-three: + exec-is-even-or-odd: container: args: - --executor_input - '{{$}}' - --function_to_execute - - int_zero_through_three + - is_even_or_odd command: - sh - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -320,8 +589,37 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef int_zero_through_three() -> int:\n import random\n return\ - \ random.randint(0, 3)\n\n" + \ *\n\ndef is_even_or_odd(num: int) -> str:\n return 'odd' if num % 2\ + \ else 'even'\n\n" + image: python:3.7 + exec-is-even-or-odd-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - is_even_or_odd + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef is_even_or_odd(num: int) -> str:\n return 'odd' if num % 2\ + \ else 'even'\n\n" image: python:3.7 exec-print-and-return: container: @@ -335,7 +633,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -364,7 +662,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -393,7 +691,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -422,7 +720,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -451,7 +749,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -468,51 +766,145 @@ deploymentSpec: \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ \ text\n\n" image: python:3.7 + exec-print-and-return-6: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-and-return-7: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_and_return + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_and_return(text: str) -> str:\n print(text)\n return\ + \ text\n\n" + image: python:3.7 + exec-print-strings: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - print_strings + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef print_strings(strings: List[str]):\n print(strings)\n\n" + image: python:3.7 pipelineInfo: - name: flip-coin-pipeline + name: lucky-number-pipeline root: dag: tasks: - condition-1: + for-loop-1: componentRef: - name: comp-condition-1 - dependentTasks: - - flip-coin - - int-zero-through-three + name: comp-for-loop-1 inputs: parameters: - pipelinechannel--confirm: - componentInputParameter: confirm - pipelinechannel--flip-coin-Output: - taskOutputParameter: - outputParameterKey: Output - producerTask: flip-coin - pipelinechannel--int-zero-through-three-Output: - taskOutputParameter: - outputParameterKey: Output - producerTask: int-zero-through-three + pipelinechannel--add_drumroll: + componentInputParameter: add_drumroll + pipelinechannel--repeat_if_lucky_number: + componentInputParameter: repeat_if_lucky_number + pipelinechannel--trials: + componentInputParameter: trials + parameterIterator: + itemInput: pipelinechannel--trials-loop-item + items: + inputParameter: pipelinechannel--trials taskInfo: - name: condition-1 - triggerPolicy: - condition: inputs.parameter_values['pipelinechannel--flip-coin-Output'] - == 'heads' - flip-coin: + name: for-loop-1 + print-strings: cachingOptions: enableCache: true componentRef: - name: comp-flip-coin - taskInfo: - name: flip-coin - int-zero-through-three: - cachingOptions: - enableCache: true - componentRef: - name: comp-int-zero-through-three + name: comp-print-strings + dependentTasks: + - for-loop-1 + inputs: + parameters: + strings: + taskOutputParameter: + outputParameterKey: pipelinechannel--is-even-or-odd-2-Output + producerTask: for-loop-1 taskInfo: - name: int-zero-through-three + name: print-strings inputDefinitions: parameters: - confirm: + add_drumroll: + defaultValue: true + isOptional: true + parameterType: BOOLEAN + repeat_if_lucky_number: + defaultValue: true + isOptional: true parameterType: BOOLEAN + trials: + defaultValue: + - 1.0 + - 2.0 + - 3.0 + isOptional: true + parameterType: LIST schemaVersion: 2.1.0 -sdkVersion: kfp-2.1.2 +sdkVersion: kfp-2.1.3 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index 5b8436dd496..b40267f35c4 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -172,10 +172,10 @@ pipelines: name: flip_coin_pipeline execute: false - module: if_elif_else - name: flip_coin_pipeline + name: roll_die_pipeline execute: false - module: if_elif_else_complex - name: flip_coin_pipeline + name: lucky_number_pipeline execute: false components: test_data_dir: sdk/python/test_data/components From fdde04235b43391228dcfee97b4ba212b9dbd2ab Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Mon, 11 Sep 2023 10:06:43 -0700 Subject: [PATCH 6/6] change BinaryOperation to ConditionOperation --- sdk/python/kfp/compiler/compiler_utils.py | 2 +- sdk/python/kfp/compiler/pipeline_spec_builder.py | 10 +++++----- sdk/python/kfp/dsl/pipeline_channel.py | 14 +++++++------- sdk/python/kfp/dsl/tasks_group.py | 10 +++++----- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 498b9c19fd3..ccc6730b1e3 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -120,7 +120,7 @@ def _get_parent_groups_helper( def get_channels_from_condition( - operations: List[pipeline_channel.BinaryOperation], + operations: List[pipeline_channel.ConditionOperation], collected_channels: list, ) -> None: """Appends to collected_channels each pipeline channels used in each diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index caaf46cb975..75be4eb647f 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -710,9 +710,9 @@ def _update_task_spec_for_loop_group( def _binary_operations_to_cel_conjunctive( - operations: List[pipeline_channel.BinaryOperation]) -> str: - """Converts a list of BinaryOperation to a CEL string with placeholders. - Each BinaryOperation will be joined the others via the conjunctive (&&). + operations: List[pipeline_channel.ConditionOperation]) -> str: + """Converts a list of ConditionOperation to a CEL string with placeholders. + Each ConditionOperation will be joined the others via the conjunctive (&&). Args: operations: The binary operations to convert to convert and join. @@ -728,8 +728,8 @@ def _binary_operations_to_cel_conjunctive( def _single_binary_operation_to_cel_condition( - operation: pipeline_channel.BinaryOperation) -> str: - """Converts a BinaryOperation to a CEL string with placeholders. + operation: pipeline_channel.ConditionOperation) -> str: + """Converts a ConditionOperation to a CEL string with placeholders. Args: operation: The binary operation to convert to a string. diff --git a/sdk/python/kfp/dsl/pipeline_channel.py b/sdk/python/kfp/dsl/pipeline_channel.py index 91f6b836b7a..4841928bbf4 100644 --- a/sdk/python/kfp/dsl/pipeline_channel.py +++ b/sdk/python/kfp/dsl/pipeline_channel.py @@ -24,7 +24,7 @@ @dataclasses.dataclass -class BinaryOperation: +class ConditionOperation: """Represents a condition expression to be used in condition control flow group. @@ -152,22 +152,22 @@ def __hash__(self) -> int: return hash(self.pattern) def __eq__(self, other): - return BinaryOperation('==', self, other) + return ConditionOperation('==', self, other) def __ne__(self, other): - return BinaryOperation('!=', self, other) + return ConditionOperation('!=', self, other) def __lt__(self, other): - return BinaryOperation('<', self, other) + return ConditionOperation('<', self, other) def __le__(self, other): - return BinaryOperation('<=', self, other) + return ConditionOperation('<=', self, other) def __gt__(self, other): - return BinaryOperation('>', self, other) + return ConditionOperation('>', self, other) def __ge__(self, other): - return BinaryOperation('>=', self, other) + return ConditionOperation('>=', self, other) class PipelineParameterChannel(PipelineChannel): diff --git a/sdk/python/kfp/dsl/tasks_group.py b/sdk/python/kfp/dsl/tasks_group.py index 48424ce47e4..6bf6b63cc04 100644 --- a/sdk/python/kfp/dsl/tasks_group.py +++ b/sdk/python/kfp/dsl/tasks_group.py @@ -151,7 +151,7 @@ class _ConditionBase(TasksGroup): def __init__( self, - conditions: List[pipeline_channel.BinaryOperation], + conditions: List[pipeline_channel.ConditionOperation], name: Optional[str] = None, ) -> None: super().__init__( @@ -159,7 +159,7 @@ def __init__( name=name, is_root=False, ) - self.conditions: List[pipeline_channel.BinaryOperation] = conditions + self.conditions: List[pipeline_channel.ConditionOperation] = conditions class If(_ConditionBase): @@ -315,9 +315,9 @@ class InvalidControlFlowException(Exception): def _shallow_copy_list_of_binary_operations( - operations: List[pipeline_channel.BinaryOperation] -) -> List[pipeline_channel.BinaryOperation]: - # shallow copy is sufficient to allow us to invert the negate flag of a BinaryOperation without affecting copies. deep copy not needed and would result in many copies of the full pipeline since PipelineChannels hold references to the pipeline. + operations: List[pipeline_channel.ConditionOperation] +) -> List[pipeline_channel.ConditionOperation]: + # shallow copy is sufficient to allow us to invert the negate flag of a ConditionOperation without affecting copies. deep copy not needed and would result in many copies of the full pipeline since PipelineChannels hold references to the pipeline. return [copy.copy(operation) for operation in operations]