Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tekton loop dsl extension skeleton #799

Merged
merged 1 commit into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion sdk/python/kfp_tekton/tekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Iterable, Union
from typing import List, Iterable, Union, Optional, TypeVar
from kfp.dsl import _pipeline_param, _for_loop
from kfp import dsl
from kfp import components
from kfp.dsl._pipeline_param import ConditionOperator
from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name
from kfp_tekton.compiler._op_to_template import TEKTON_BASH_STEP_IMAGE


CEL_EVAL_IMAGE = "aipipeline/cel-eval:latest"
Expand All @@ -25,6 +27,7 @@
TEKTON_CUSTOM_TASK_IMAGES = [CEL_EVAL_IMAGE]
LOOP_PIPELINE_NAME_LENGTH = 40
LOOP_GROUP_NAME_LENGTH = 16
_Num = TypeVar('_Num', int, float)


def AnySequencer(any: Iterable[Union[dsl.ContainerOp, ConditionOperator]],
Expand Down Expand Up @@ -177,3 +180,68 @@ def CEL_ConditionOp(condition_statement):
ConditionOp = ConditionOp_template(condition_statement)
ConditionOp.add_pod_annotation("valid_container", "false")
return ConditionOp


def Break():
'''A BreakOp template for Break Operation using PipelineLoop
'''
BreakOp_yaml = '''\
name: 'pipelineloop-break-operation'
description: 'Break Operation using PipelineLoop'
implementation:
container:
image: %s
command:
- sh
- -c
- |
echo "$0"
args:
- "break loop"
''' % (TEKTON_BASH_STEP_IMAGE)
BreakOp_template = components.load_component_from_text(BreakOp_yaml)
BreakOp = BreakOp_template()
return BreakOp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I did not follow how the break happens here. Seems it's just a normal task?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pugangxa I think it's related to issue #800.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right now the pipeline loop break only works when the last-loop-task is skipped. However, the proposed Break() DSL is to break when the condition is met. So I opened #800 to add a new break features to break when the Break() task is being executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I just overlooked that issue. Then it makes sense. Thanks for all your explanation.


class Loop(dsl.ParallelFor):

@classmethod
def sequential(self,
loop_args: _for_loop.ItemList):
return Loop(loop_args=loop_args, parallelism=1)

@classmethod
def from_string(self,
loop_args: Union[str, _pipeline_param.PipelineParam],
separator: Optional[str] = None,
parallelism: Optional[int] = None):
return Loop(loop_args=loop_args, separator=separator, parallelism=parallelism)

@classmethod
def range(self,
a: _Num,
b: _Num,
c: Optional[_Num] = None,
parallelism: Optional[int] = None):
return Loop(start=a, step=b, end=c, parallelism=parallelism)

def __init__(self,
loop_args: Union[_for_loop.ItemList,
_pipeline_param.PipelineParam] = None,
start: _Num = None,
end: _Num = None,
step: _Num = None,
separator: Optional[str] = None,
parallelism: Optional[int] = None):
tekton_params = (start, end, step, separator)
if loop_args and not [x for x in tekton_params if x is not None]:
super().__init__(loop_args=loop_args, parallelism=parallelism)
elif loop_args and separator:
# TODO: implement loop separator DSL extension
pass
elif start and end:
# TODO: implement loop start, end, step DSL extension
pass
else:
raise("loop_args or start/end parameters are missing for 'Loop' class")
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def test_recur_cond_workflow(self):
from .testdata.recur_cond import recur_and_condition
self._test_pipeline_workflow(recur_and_condition, 'recur_cond.yaml')

def test_recur_cond_workflow(self):
"""
Test compiling a loop workflow using tekton loop dsl extension.
"""
from .testdata.tekton_loop_dsl import pipeline
self._test_pipeline_workflow(pipeline, 'tekton_loop_dsl.yaml')

def test_cond_recur_workflow(self):
"""
Test compiling a conditional recursive workflow.
Expand Down
57 changes: 57 additions & 0 deletions sdk/python/tests/compiler/testdata/tekton_loop_dsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.dsl as dsl
from kfp import components
from kfp_tekton import tekton

op1_yaml = '''\
name: 'my-in-coop1'
inputs:
- {name: item, type: Integer}
- {name: my_pipe_param, type: Integer}
implementation:
container:
image: library/bash:4.4.23
command: ['sh', '-c']
args:
- |
set -e
echo op1 "$0" "$1"
- {inputValue: item}
- {inputValue: my_pipe_param}
'''


@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param: int = 10):
loop_args = [1, 2]
# The DSL above should produce the same result and the DSL in the bottom
# with dsl.ParallelFor(loop_args, parallelism=1) as item:
# op1_template = components.load_component_from_text(op1_yaml)
# op1 = op1_template(item, my_pipe_param)
# condi_1 = tekton.CEL_ConditionOp(f"{item} == 0").output
# with dsl.Condition(condi_1 == 'true'):
# tekton.Break()
with tekton.Loop.sequential(loop_args) as item:
op1_template = components.load_component_from_text(op1_yaml)
op1 = op1_template(item, my_pipe_param)
condi_1 = tekton.CEL_ConditionOp(f"{item} == 1").output
with dsl.Condition(condi_1 == 'true'):
tekton.Break()


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
135 changes: 135 additions & 0 deletions sdk/python/tests/compiler/testdata/tekton_loop_dsl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: my-pipeline
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-coop1": [], "pipelineloop-break-operation":
[]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param",
"optional": true, "type": "Integer"}], "name": "my-pipeline"}'
spec:
params:
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- name: my_pipe_param
default: '10'
tasks:
- name: my-pipeline-for-loop-2
params:
- name: loop-item-param-1
value: '[1, 2]'
- name: my_pipe_param
value: $(params.my_pipe_param)
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
spec:
pipelineSpec:
params:
- name: loop-item-param-1
type: string
- name: my_pipe_param
type: string
tasks:
- name: my-in-coop1
params:
- name: loop-item-param-1
value: $(params.loop-item-param-1)
- name: my_pipe_param
value: $(params.my_pipe_param)
taskSpec:
steps:
- name: main
args:
- |
set -e
echo op1 "$0" "$1"
- $(inputs.params.loop-item-param-1)
- $(inputs.params.my_pipe_param)
command:
- sh
- -c
image: library/bash:4.4.23
params:
- name: loop-item-param-1
type: string
- name: my_pipe_param
type: string
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["set -e\necho op1 \"$0\" \"$1\"\n", {"inputValue":
"item"}, {"inputValue": "my_pipe_param"}], "command": ["sh",
"-c"], "image": "library/bash:4.4.23"}}, "inputs": [{"name":
"item", "type": "Integer"}, {"name": "my_pipe_param", "type":
"Integer"}], "name": "my-in-coop1"}'
tekton.dev/template: ''
timeout: 525600m
- name: condition-cel
params:
- name: outcome
value: $(params.loop-item-param-1) == 1
taskRef:
name: cel_condition
apiVersion: cel.tekton.dev/v1alpha1
kind: CEL
timeout: 525600m
- name: pipelineloop-break-operation
taskSpec:
steps:
- name: main
args:
- break loop
command:
- sh
- -c
- |
echo "$0"
image: busybox
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec: '{"description": "Break
Operation using PipelineLoop", "implementation": {"container":
{"args": ["break loop"], "command": ["sh", "-c", "echo \"$0\"\n"],
"image": "busybox"}}, "name": "pipelineloop-break-operation"}'
tekton.dev/template: ''
when:
- input: $(tasks.condition-cel.results.outcome)
operator: in
values:
- "true"
timeout: 525600m
parallelism: 1
iterateParam: loop-item-param-1
timeout: 525600m
79 changes: 79 additions & 0 deletions sdk/python/tests/compiler/testdata/tekton_loop_dsl_noninlined.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: my-pipeline
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-coop1": [], "pipelineloop-break-operation":
[]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param",
"optional": true, "type": "Integer"}], "name": "my-pipeline"}'
tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "metadata": {"name": "my-pipeline-for-loop-2"}, "spec":
{"iterateParam": "loop-item-param-1", "parallelism": 1, "pipelineSpec": {"params":
[{"name": "loop-item-param-1", "type": "string"}, {"name": "my_pipe_param",
"type": "string"}], "tasks": [{"name": "my-in-coop1", "params": [{"name": "loop-item-param-1",
"value": "$(params.loop-item-param-1)"}, {"name": "my_pipe_param", "value":
"$(params.my_pipe_param)"}], "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec":
"{\"implementation\": {\"container\": {\"args\": [\"set -e\\necho op1 \\\"$0\\\"
\\\"$1\\\"\\n\", {\"inputValue\": \"item\"}, {\"inputValue\": \"my_pipe_param\"}],
\"command\": [\"sh\", \"-c\"], \"image\": \"library/bash:4.4.23\"}}, \"inputs\":
[{\"name\": \"item\", \"type\": \"Integer\"}, {\"name\": \"my_pipe_param\",
\"type\": \"Integer\"}], \"name\": \"my-in-coop1\"}", "tekton.dev/template":
""}, "labels": {"pipelines.kubeflow.org/cache_enabled": "true", "pipelines.kubeflow.org/generation":
"", "pipelines.kubeflow.org/pipelinename": ""}}, "params": [{"name": "loop-item-param-1",
"type": "string"}, {"name": "my_pipe_param", "type": "string"}], "steps": [{"args":
["set -e\necho op1 \"$0\" \"$1\"\n", "$(inputs.params.loop-item-param-1)", "$(inputs.params.my_pipe_param)"],
"command": ["sh", "-c"], "image": "library/bash:4.4.23", "name": "main"}]},
"timeout": "525600m"}, {"name": "condition-cel", "params": [{"name": "outcome",
"value": "$(params.loop-item-param-1) == 1"}], "taskRef": {"apiVersion": "cel.tekton.dev/v1alpha1",
"kind": "CEL", "name": "cel_condition"}, "timeout": "525600m"}, {"name": "pipelineloop-break-operation",
"taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec":
"{\"description\": \"Break Operation using PipelineLoop\", \"implementation\":
{\"container\": {\"args\": [\"break loop\"], \"command\": [\"sh\", \"-c\", \"echo
\\\"$0\\\"\\n\"], \"image\": \"busybox\"}}, \"name\": \"pipelineloop-break-operation\"}",
"tekton.dev/template": ""}, "labels": {"pipelines.kubeflow.org/cache_enabled":
"true", "pipelines.kubeflow.org/generation": "", "pipelines.kubeflow.org/pipelinename":
""}}, "steps": [{"args": ["break loop"], "command": ["sh", "-c", "echo \"$0\"\n"],
"image": "busybox", "name": "main"}]}, "timeout": "525600m", "when": [{"input":
"$(tasks.condition-cel.results.outcome)", "operator": "in", "values": ["true"]}]}]}}}]'
spec:
params:
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- name: my_pipe_param
default: '10'
tasks:
- name: my-pipeline-for-loop-2
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: my-pipeline-for-loop-2
params:
- name: loop-item-param-1
value: '[1, 2]'
- name: my_pipe_param
value: $(params.my_pipe_param)
timeout: 525600m