Skip to content

Commit

Permalink
address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy committed Sep 8, 2023
1 parent cdd8e11 commit 3eebc74
Show file tree
Hide file tree
Showing 10 changed files with 633 additions and 214 deletions.
2 changes: 1 addition & 1 deletion sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4401,7 +4401,7 @@ def flip_coin_pipeline():
def test_elif_no_if_not_supported(self):
with self.assertRaisesRegex(
tasks_group.InvalidControlFlowException,
r'dsl\.Else can only be used following an upstream dsl\.If or dsl\.Elif\.'
r'dsl\.Elif can only be used following an upstream dsl\.If or dsl\.Elif\.'
):

@dsl.pipeline
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/kfp/compiler/compiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ def get_channels_from_condition(
operations: List[pipeline_channel.BinaryOperation],
collected_channels: list,
) -> None:
"""Append to collected_channels each pipeline channels used in each operand
of each operation in operations."""
"""Appends to collected_channels each pipeline channels used in each
operand of each operation in operations."""
for operation in operations:
for operand in [operation.left_operand, operation.right_operand]:
if isinstance(operand, pipeline_channel.PipelineChannel):
Expand Down Expand Up @@ -153,7 +153,7 @@ def _get_condition_channels_for_tasks_helper(
if isinstance(group, tasks_group._ConditionBase):
new_current_conditions_channels = list(current_conditions_channels)
get_channels_from_condition(
group.condition,
group.conditions,
new_current_conditions_channels,
)

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ def _update_task_spec_for_condition_group(
group: The condition group to update task spec for.
pipeline_task_spec: The pipeline task spec to update in place.
"""
condition = _binary_operations_to_cel_conjunctive(group.condition)
condition = _binary_operations_to_cel_conjunctive(group.conditions)
pipeline_task_spec.trigger_policy.CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec.TriggerPolicy(condition=condition))

Expand Down Expand Up @@ -1260,7 +1260,7 @@ def build_spec_by_group(
condition_subgroup_channels = list(subgroup_input_channels)

compiler_utils.get_channels_from_condition(
subgroup.condition, condition_subgroup_channels)
subgroup.conditions, condition_subgroup_channels)

subgroup_component_spec = build_component_spec_for_group(
input_pipeline_channels=condition_subgroup_channels,
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/dsl/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""Definition for Pipeline."""

import functools
from typing import Callable, Optional, Union
from typing import Callable, Optional

from kfp.dsl import component_factory
from kfp.dsl import pipeline_task
Expand Down Expand Up @@ -189,7 +189,7 @@ def pop_tasks_group(self):
"""Removes the current TasksGroup from the stack."""
del self.groups[-1]

def get_last_tasks_group(self) -> Union['tasks_group.TasksGroup', None]:
def get_last_tasks_group(self) -> Optional['tasks_group.TasksGroup']:
"""Gets the last TasksGroup added to the pipeline at the current level
of the pipeline definition."""
groups = self.groups[-1].groups
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/kfp/dsl/tasks_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ class _ConditionBase(TasksGroup):

def __init__(
self,
condition: List[pipeline_channel.BinaryOperation],
conditions: List[pipeline_channel.BinaryOperation],
name: Optional[str] = None,
) -> None:
super().__init__(
group_type=TasksGroupType.CONDITION,
name=name,
is_root=False,
)
self.condition: List[pipeline_channel.BinaryOperation] = condition
self.conditions: List[pipeline_channel.BinaryOperation] = conditions


class If(_ConditionBase):
Expand All @@ -184,7 +184,7 @@ def __init__(
name: Optional[str] = None,
) -> None:
super().__init__(
condition=[condition],
conditions=[condition],
name=name,
)
if isinstance(condition, bool):
Expand Down Expand Up @@ -241,7 +241,7 @@ def __init__(
if not isinstance(prev_cond, (Condition, If, Elif)):
# prefer pushing toward dsl.If rather than dsl.Condition for syntactic consistency with the if-elif-else keywords in Python
raise InvalidControlFlowException(
'dsl.Else can only be used following an upstream dsl.If or dsl.Elif.'
'dsl.Elif can only be used following an upstream dsl.If or dsl.Elif.'
)

if isinstance(condition, bool):
Expand All @@ -259,7 +259,7 @@ def __init__(
conditions.append(condition)

super().__init__(
condition=conditions,
conditions=conditions,
name=name,
)

Expand Down Expand Up @@ -305,7 +305,7 @@ def __init__(
)

super().__init__(
condition=prev_cond._negated_upstream_conditions,
conditions=prev_cond._negated_upstream_conditions,
name=name,
)

Expand Down
8 changes: 4 additions & 4 deletions sdk/python/test_data/pipelines/if_elif_else.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


@dsl.component
def flip_coin() -> str:
def flip_three_sided_die() -> str:
import random
val = random.randint(0, 2)

Expand All @@ -35,8 +35,8 @@ def print_and_return(text: str) -> str:


@dsl.pipeline
def flip_coin_pipeline():
flip_coin_task = flip_coin()
def roll_die_pipeline():
flip_coin_task = flip_three_sided_die()
with dsl.If(flip_coin_task.output == 'heads'):
print_and_return(text='Got heads!')
with dsl.Elif(flip_coin_task.output == 'tails'):
Expand All @@ -47,5 +47,5 @@ def flip_coin_pipeline():

if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=flip_coin_pipeline,
pipeline_func=roll_die_pipeline,
package_path=__file__.replace('.py', '.yaml'))
69 changes: 35 additions & 34 deletions sdk/python/test_data/pipelines/if_elif_else.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# PIPELINE DEFINITION
# Name: flip-coin-pipeline
# Name: roll-die-pipeline
components:
comp-condition-1:
dag:
Expand All @@ -18,7 +18,7 @@ components:
name: print-and-return
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
pipelinechannel--flip-three-sided-die-Output:
parameterType: STRING
comp-condition-2:
dag:
Expand All @@ -37,7 +37,7 @@ components:
name: print-and-return-2
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
pipelinechannel--flip-three-sided-die-Output:
parameterType: STRING
comp-condition-3:
dag:
Expand All @@ -56,10 +56,10 @@ components:
name: print-and-return-3
inputDefinitions:
parameters:
pipelinechannel--flip-coin-Output:
pipelinechannel--flip-three-sided-die-Output:
parameterType: STRING
comp-flip-coin:
executorLabel: exec-flip-coin
comp-flip-three-sided-die:
executorLabel: exec-flip-three-sided-die
outputDefinitions:
parameters:
Output:
Expand Down Expand Up @@ -96,19 +96,19 @@ components:
parameterType: STRING
deploymentSpec:
executors:
exec-flip-coin:
exec-flip-three-sided-die:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- flip_coin
- flip_three_sided_die
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand All @@ -122,9 +122,10 @@ deploymentSpec:
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef flip_coin() -> str:\n import random\n val = random.randint(0,\
\ 2)\n\n if val == 0:\n return 'heads'\n elif val == 1:\n \
\ return 'tails'\n else:\n return 'draw'\n\n"
\ *\n\ndef flip_three_sided_die() -> str:\n import random\n val =\
\ random.randint(0, 2)\n\n if val == 0:\n return 'heads'\n \
\ elif val == 1:\n return 'tails'\n else:\n return 'draw'\n\
\n"
image: python:3.7
exec-print-and-return:
container:
Expand All @@ -138,7 +139,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -167,7 +168,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand Down Expand Up @@ -196,7 +197,7 @@ deploymentSpec:
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.2'\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.1.3'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
Expand All @@ -214,66 +215,66 @@ deploymentSpec:
\ text\n\n"
image: python:3.7
pipelineInfo:
name: flip-coin-pipeline
name: roll-die-pipeline
root:
dag:
tasks:
condition-1:
componentRef:
name: comp-condition-1
dependentTasks:
- flip-coin
- flip-three-sided-die
inputs:
parameters:
pipelinechannel--flip-coin-Output:
pipelinechannel--flip-three-sided-die-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-coin
producerTask: flip-three-sided-die
taskInfo:
name: condition-1
triggerPolicy:
condition: inputs.parameter_values['pipelinechannel--flip-coin-Output']
condition: inputs.parameter_values['pipelinechannel--flip-three-sided-die-Output']
== 'heads'
condition-2:
componentRef:
name: comp-condition-2
dependentTasks:
- flip-coin
- flip-three-sided-die
inputs:
parameters:
pipelinechannel--flip-coin-Output:
pipelinechannel--flip-three-sided-die-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-coin
producerTask: flip-three-sided-die
taskInfo:
name: condition-2
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''heads'') && inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''heads'') && inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''tails'''
condition-3:
componentRef:
name: comp-condition-3
dependentTasks:
- flip-coin
- flip-three-sided-die
inputs:
parameters:
pipelinechannel--flip-coin-Output:
pipelinechannel--flip-three-sided-die-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: flip-coin
producerTask: flip-three-sided-die
taskInfo:
name: condition-3
triggerPolicy:
condition: '!(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
== ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-coin-Output'']
condition: '!(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''heads'') && !(inputs.parameter_values[''pipelinechannel--flip-three-sided-die-Output'']
== ''tails'')'
flip-coin:
flip-three-sided-die:
cachingOptions:
enableCache: true
componentRef:
name: comp-flip-coin
name: comp-flip-three-sided-die
taskInfo:
name: flip-coin
name: flip-three-sided-die
schemaVersion: 2.1.0
sdkVersion: kfp-2.1.2
sdkVersion: kfp-2.1.3
Loading

0 comments on commit 3eebc74

Please sign in to comment.