From 5a43eff3544cc1ce678da577f872acc6a6dcd4ff Mon Sep 17 00:00:00 2001 From: tomcli Date: Tue, 14 Dec 2021 17:38:53 -0800 Subject: [PATCH] add tekton loop dsl extension skeleton --- sdk/python/kfp_tekton/tekton.py | 70 ++++++++- sdk/python/tests/compiler/compiler_tests.py | 7 + .../compiler/testdata/tekton_loop_dsl.py | 57 ++++++++ .../compiler/testdata/tekton_loop_dsl.yaml | 135 ++++++++++++++++++ .../testdata/tekton_loop_dsl_noninlined.yaml | 79 ++++++++++ 5 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 sdk/python/tests/compiler/testdata/tekton_loop_dsl.py create mode 100644 sdk/python/tests/compiler/testdata/tekton_loop_dsl.yaml create mode 100644 sdk/python/tests/compiler/testdata/tekton_loop_dsl_noninlined.yaml diff --git a/sdk/python/kfp_tekton/tekton.py b/sdk/python/kfp_tekton/tekton.py index 239b92c7be..299b4093fe 100644 --- a/sdk/python/kfp_tekton/tekton.py +++ b/sdk/python/kfp_tekton/tekton.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Iterable, Union +from typing import List, Iterable, Union, Optional, TypeVar +from kfp.dsl import _pipeline_param, _for_loop from kfp import dsl from kfp import components from kfp.dsl._pipeline_param import ConditionOperator from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name +from kfp_tekton.compiler._op_to_template import TEKTON_BASH_STEP_IMAGE CEL_EVAL_IMAGE = "aipipeline/cel-eval:latest" @@ -25,6 +27,7 @@ TEKTON_CUSTOM_TASK_IMAGES = [CEL_EVAL_IMAGE] LOOP_PIPELINE_NAME_LENGTH = 40 LOOP_GROUP_NAME_LENGTH = 16 +_Num = TypeVar('_Num', int, float) def AnySequencer(any: Iterable[Union[dsl.ContainerOp, ConditionOperator]], @@ -177,3 +180,68 @@ def CEL_ConditionOp(condition_statement): ConditionOp = ConditionOp_template(condition_statement) ConditionOp.add_pod_annotation("valid_container", "false") return ConditionOp + + +def Break(): + '''A BreakOp template for Break Operation using PipelineLoop + ''' + BreakOp_yaml = '''\ + name: 'pipelineloop-break-operation' + description: 'Break Operation using PipelineLoop' + implementation: + container: + image: %s + command: + - sh + - -c + - | + echo "$0" + args: + - "break loop" + ''' % (TEKTON_BASH_STEP_IMAGE) + BreakOp_template = components.load_component_from_text(BreakOp_yaml) + BreakOp = BreakOp_template() + return BreakOp + + +class Loop(dsl.ParallelFor): + + @classmethod + def sequential(self, + loop_args: _for_loop.ItemList): + return Loop(loop_args=loop_args, parallelism=1) + + @classmethod + def from_string(self, + loop_args: Union[str, _pipeline_param.PipelineParam], + separator: Optional[str] = None, + parallelism: Optional[int] = None): + return Loop(loop_args=loop_args, separator=separator, parallelism=parallelism) + + @classmethod + def range(self, + a: _Num, + b: _Num, + c: Optional[_Num] = None, + parallelism: Optional[int] = None): + return Loop(start=a, step=b, end=c, parallelism=parallelism) + + def __init__(self, + loop_args: Union[_for_loop.ItemList, + _pipeline_param.PipelineParam] = None, + start: _Num = None, + end: _Num = None, + step: _Num = None, + separator: Optional[str] = None, + parallelism: Optional[int] = None): + tekton_params = (start, end, step, separator) + if loop_args and not [x for x in tekton_params if x is not None]: + super().__init__(loop_args=loop_args, parallelism=parallelism) + elif loop_args and separator: + # TODO: implement loop separator DSL extension + pass + elif start and end: + # TODO: implement loop start, end, step DSL extension + pass + else: + raise("loop_args or start/end parameters are missing for 'Loop' class") diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 3d9f1efae8..e6f4a097b8 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -112,6 +112,13 @@ def test_recur_cond_workflow(self): from .testdata.recur_cond import recur_and_condition self._test_pipeline_workflow(recur_and_condition, 'recur_cond.yaml') + def test_recur_cond_workflow(self): + """ + Test compiling a loop workflow using tekton loop dsl extension. + """ + from .testdata.tekton_loop_dsl import pipeline + self._test_pipeline_workflow(pipeline, 'tekton_loop_dsl.yaml') + def test_cond_recur_workflow(self): """ Test compiling a conditional recursive workflow. diff --git a/sdk/python/tests/compiler/testdata/tekton_loop_dsl.py b/sdk/python/tests/compiler/testdata/tekton_loop_dsl.py new file mode 100644 index 0000000000..e7118f625a --- /dev/null +++ b/sdk/python/tests/compiler/testdata/tekton_loop_dsl.py @@ -0,0 +1,57 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp.dsl as dsl +from kfp import components +from kfp_tekton import tekton + +op1_yaml = '''\ +name: 'my-in-coop1' +inputs: +- {name: item, type: Integer} +- {name: my_pipe_param, type: Integer} +implementation: + container: + image: library/bash:4.4.23 + command: ['sh', '-c'] + args: + - | + set -e + echo op1 "$0" "$1" + - {inputValue: item} + - {inputValue: my_pipe_param} +''' + + +@dsl.pipeline(name='my-pipeline') +def pipeline(my_pipe_param: int = 10): + loop_args = [1, 2] + # The DSL above should produce the same result and the DSL in the bottom + # with dsl.ParallelFor(loop_args, parallelism=1) as item: + # op1_template = components.load_component_from_text(op1_yaml) + # op1 = op1_template(item, my_pipe_param) + # condi_1 = tekton.CEL_ConditionOp(f"{item} == 0").output + # with dsl.Condition(condi_1 == 'true'): + # tekton.Break() + with tekton.Loop.sequential(loop_args) as item: + op1_template = components.load_component_from_text(op1_yaml) + op1 = op1_template(item, my_pipe_param) + condi_1 = tekton.CEL_ConditionOp(f"{item} == 1").output + with dsl.Condition(condi_1 == 'true'): + tekton.Break() + + +if __name__ == '__main__': + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/tekton_loop_dsl.yaml b/sdk/python/tests/compiler/testdata/tekton_loop_dsl.yaml new file mode 100644 index 0000000000..cb5b1d52a8 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/tekton_loop_dsl.yaml @@ -0,0 +1,135 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: my-pipeline + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"my-in-coop1": [], "pipelineloop-break-operation": + []}' + sidecar.istio.io/inject: "false" + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param", + "optional": true, "type": "Integer"}], "name": "my-pipeline"}' +spec: + params: + - name: my_pipe_param + value: '10' + pipelineSpec: + params: + - name: my_pipe_param + default: '10' + tasks: + - name: my-pipeline-for-loop-2 + params: + - name: loop-item-param-1 + value: '[1, 2]' + - name: my_pipe_param + value: $(params.my_pipe_param) + taskSpec: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + spec: + pipelineSpec: + params: + - name: loop-item-param-1 + type: string + - name: my_pipe_param + type: string + tasks: + - name: my-in-coop1 + params: + - name: loop-item-param-1 + value: $(params.loop-item-param-1) + - name: my_pipe_param + value: $(params.my_pipe_param) + taskSpec: + steps: + - name: main + args: + - | + set -e + echo op1 "$0" "$1" + - $(inputs.params.loop-item-param-1) + - $(inputs.params.my_pipe_param) + command: + - sh + - -c + image: library/bash:4.4.23 + params: + - name: loop-item-param-1 + type: string + - name: my_pipe_param + type: string + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"implementation": {"container": + {"args": ["set -e\necho op1 \"$0\" \"$1\"\n", {"inputValue": + "item"}, {"inputValue": "my_pipe_param"}], "command": ["sh", + "-c"], "image": "library/bash:4.4.23"}}, "inputs": [{"name": + "item", "type": "Integer"}, {"name": "my_pipe_param", "type": + "Integer"}], "name": "my-in-coop1"}' + tekton.dev/template: '' + timeout: 525600m + - name: condition-cel + params: + - name: outcome + value: $(params.loop-item-param-1) == 1 + taskRef: + name: cel_condition + apiVersion: cel.tekton.dev/v1alpha1 + kind: CEL + timeout: 525600m + - name: pipelineloop-break-operation + taskSpec: + steps: + - name: main + args: + - break loop + command: + - sh + - -c + - | + echo "$0" + image: busybox + metadata: + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec: '{"description": "Break + Operation using PipelineLoop", "implementation": {"container": + {"args": ["break loop"], "command": ["sh", "-c", "echo \"$0\"\n"], + "image": "busybox"}}, "name": "pipelineloop-break-operation"}' + tekton.dev/template: '' + when: + - input: $(tasks.condition-cel.results.outcome) + operator: in + values: + - "true" + timeout: 525600m + parallelism: 1 + iterateParam: loop-item-param-1 + timeout: 525600m diff --git a/sdk/python/tests/compiler/testdata/tekton_loop_dsl_noninlined.yaml b/sdk/python/tests/compiler/testdata/tekton_loop_dsl_noninlined.yaml new file mode 100644 index 0000000000..512a415971 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/tekton_loop_dsl_noninlined.yaml @@ -0,0 +1,79 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: my-pipeline + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"my-in-coop1": [], "pipelineloop-break-operation": + []}' + sidecar.istio.io/inject: "false" + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param", + "optional": true, "type": "Integer"}], "name": "my-pipeline"}' + tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1", + "kind": "PipelineLoop", "metadata": {"name": "my-pipeline-for-loop-2"}, "spec": + {"iterateParam": "loop-item-param-1", "parallelism": 1, "pipelineSpec": {"params": + [{"name": "loop-item-param-1", "type": "string"}, {"name": "my_pipe_param", + "type": "string"}], "tasks": [{"name": "my-in-coop1", "params": [{"name": "loop-item-param-1", + "value": "$(params.loop-item-param-1)"}, {"name": "my_pipe_param", "value": + "$(params.my_pipe_param)"}], "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec": + "{\"implementation\": {\"container\": {\"args\": [\"set -e\\necho op1 \\\"$0\\\" + \\\"$1\\\"\\n\", {\"inputValue\": \"item\"}, {\"inputValue\": \"my_pipe_param\"}], + \"command\": [\"sh\", \"-c\"], \"image\": \"library/bash:4.4.23\"}}, \"inputs\": + [{\"name\": \"item\", \"type\": \"Integer\"}, {\"name\": \"my_pipe_param\", + \"type\": \"Integer\"}], \"name\": \"my-in-coop1\"}", "tekton.dev/template": + ""}, "labels": {"pipelines.kubeflow.org/cache_enabled": "true", "pipelines.kubeflow.org/generation": + "", "pipelines.kubeflow.org/pipelinename": ""}}, "params": [{"name": "loop-item-param-1", + "type": "string"}, {"name": "my_pipe_param", "type": "string"}], "steps": [{"args": + ["set -e\necho op1 \"$0\" \"$1\"\n", "$(inputs.params.loop-item-param-1)", "$(inputs.params.my_pipe_param)"], + "command": ["sh", "-c"], "image": "library/bash:4.4.23", "name": "main"}]}, + "timeout": "525600m"}, {"name": "condition-cel", "params": [{"name": "outcome", + "value": "$(params.loop-item-param-1) == 1"}], "taskRef": {"apiVersion": "cel.tekton.dev/v1alpha1", + "kind": "CEL", "name": "cel_condition"}, "timeout": "525600m"}, {"name": "pipelineloop-break-operation", + "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec": + "{\"description\": \"Break Operation using PipelineLoop\", \"implementation\": + {\"container\": {\"args\": [\"break loop\"], \"command\": [\"sh\", \"-c\", \"echo + \\\"$0\\\"\\n\"], \"image\": \"busybox\"}}, \"name\": \"pipelineloop-break-operation\"}", + "tekton.dev/template": ""}, "labels": {"pipelines.kubeflow.org/cache_enabled": + "true", "pipelines.kubeflow.org/generation": "", "pipelines.kubeflow.org/pipelinename": + ""}}, "steps": [{"args": ["break loop"], "command": ["sh", "-c", "echo \"$0\"\n"], + "image": "busybox", "name": "main"}]}, "timeout": "525600m", "when": [{"input": + "$(tasks.condition-cel.results.outcome)", "operator": "in", "values": ["true"]}]}]}}}]' +spec: + params: + - name: my_pipe_param + value: '10' + pipelineSpec: + params: + - name: my_pipe_param + default: '10' + tasks: + - name: my-pipeline-for-loop-2 + taskRef: + apiVersion: custom.tekton.dev/v1alpha1 + kind: PipelineLoop + name: my-pipeline-for-loop-2 + params: + - name: loop-item-param-1 + value: '[1, 2]' + - name: my_pipe_param + value: $(params.my_pipe_param) + timeout: 525600m