From 69723141119cf65ffc6b5813a28c42100bdbcea5 Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Tue, 13 Dec 2022 12:48:20 -0800 Subject: [PATCH 1/2] add pipeline_conf.timeout sdk support --- sdk/python/kfp_tekton/compiler/compiler.py | 5 +++++ sdk/python/tests/compiler/testdata/timeout.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index 9604178b80..7c4a1edbeb 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -1610,6 +1610,11 @@ def _create_workflow(self, if pipeline_conf and pipeline_conf.data_passing_method is not None: workflow = fix_big_data_passing_using_volume(workflow, pipeline_conf) + if pipeline_conf and pipeline_conf.timeout > 0: + workflow['spec'].setdefault('timeouts', {'pipeline': '0s', 'tasks': '0s'}) + workflow['spec']['timeouts']['pipeline'] = '%ds' % pipeline_conf.timeout + workflow['spec']['timeouts']['pipeline'] = '%ds' % (pipeline_conf.timeout + DEFAULT_FINALLY_SECONDS) + workflow.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/pipeline_spec'] = \ json.dumps(pipeline_meta.to_dict(), sort_keys=True) diff --git a/sdk/python/tests/compiler/testdata/timeout.py b/sdk/python/tests/compiler/testdata/timeout.py index d5cf8c3285..b12b89ff49 100755 --- a/sdk/python/tests/compiler/testdata/timeout.py +++ b/sdk/python/tests/compiler/testdata/timeout.py @@ -47,4 +47,6 @@ def timeout_sample_pipeline(): if __name__ == '__main__': from kfp_tekton.compiler import TektonCompiler - TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml')) + pipeline_conf = dsl.PipelineConf() + pipeline_conf.set_timeout(100) + TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml'), pipeline_conf=pipeline_conf) From 1ee82aea885e88cf0350449936cac017012218ac Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Tue, 13 Dec 2022 15:48:20 -0800 Subject: [PATCH 2/2] update unit test script to take kfp config object --- sdk/python/kfp_tekton/compiler/compiler.py | 2 +- sdk/python/tests/compiler/compiler_tests.py | 25 ++++-- sdk/python/tests/compiler/testdata/timeout.py | 4 +- .../tests/compiler/testdata/timeout_config.py | 51 +++++++++++++ .../compiler/testdata/timeout_config.yaml | 76 +++++++++++++++++++ 5 files changed, 149 insertions(+), 9 deletions(-) create mode 100755 sdk/python/tests/compiler/testdata/timeout_config.py create mode 100644 sdk/python/tests/compiler/testdata/timeout_config.yaml diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index 7c4a1edbeb..f4370776a1 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -1612,7 +1612,7 @@ def _create_workflow(self, if pipeline_conf and pipeline_conf.timeout > 0: workflow['spec'].setdefault('timeouts', {'pipeline': '0s', 'tasks': '0s'}) - workflow['spec']['timeouts']['pipeline'] = '%ds' % pipeline_conf.timeout + workflow['spec']['timeouts']['tasks'] = '%ds' % pipeline_conf.timeout workflow['spec']['timeouts']['pipeline'] = '%ds' % (pipeline_conf.timeout + DEFAULT_FINALLY_SECONDS) workflow.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/pipeline_spec'] = \ diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 90f690b1ef..36aaffa509 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -634,6 +634,16 @@ def test_timeout_workflow(self): from .testdata.timeout import timeout_sample_pipeline self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml', skip_noninlined=True) + def test_timeout_config_workflow(self): + """ + Test compiling a step level timeout config workflow. + """ + from .testdata.timeout_config import timeout_sample_pipeline + from kfp import dsl + pipeline_conf = dsl.PipelineConf() + pipeline_conf.set_timeout(100) + self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout_config.yaml', pipeline_conf=pipeline_conf, skip_noninlined=True) + def test_display_name_workflow(self): """ Test compiling a step level timeout workflow. @@ -847,7 +857,8 @@ def _test_pipeline_workflow_inlined_spec(self, pipeline_function, pipeline_yaml, normalize_compiler_output_function=None, - tekton_pipeline_conf=TektonPipelineConf()): + tekton_pipeline_conf=TektonPipelineConf(), + pipeline_conf=None): test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml) temp_dir = tempfile.mkdtemp() @@ -856,7 +867,8 @@ def _test_pipeline_workflow_inlined_spec(self, try: compiler.TektonCompiler().compile(pipeline_function, compiled_yaml_file, - tekton_pipeline_conf=tekton_pipeline_conf) + tekton_pipeline_conf=tekton_pipeline_conf, + pipeline_conf=pipeline_conf) with open(compiled_yaml_file, 'r') as f: f = normalize_compiler_output_function( f.read()) if normalize_compiler_output_function else f @@ -870,12 +882,14 @@ def _test_pipeline_workflow(self, pipeline_yaml, normalize_compiler_output_function=None, tekton_pipeline_conf=TektonPipelineConf(), - skip_noninlined=False): + skip_noninlined=False, + pipeline_conf=None): self._test_pipeline_workflow_inlined_spec( pipeline_function=pipeline_function, pipeline_yaml=pipeline_yaml, normalize_compiler_output_function=normalize_compiler_output_function, - tekton_pipeline_conf=tekton_pipeline_conf) + tekton_pipeline_conf=tekton_pipeline_conf, + pipeline_conf=pipeline_conf) if not skip_noninlined: test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml") @@ -885,7 +899,8 @@ def _test_pipeline_workflow(self, try: compiler.TektonCompiler().compile(pipeline_function, compiled_yaml_file, - tekton_pipeline_conf=tekton_pipeline_conf) + tekton_pipeline_conf=tekton_pipeline_conf, + pipeline_conf=pipeline_conf) with open(compiled_yaml_file, 'r') as f: f = normalize_compiler_output_function( f.read()) if normalize_compiler_output_function else f diff --git a/sdk/python/tests/compiler/testdata/timeout.py b/sdk/python/tests/compiler/testdata/timeout.py index b12b89ff49..d5cf8c3285 100755 --- a/sdk/python/tests/compiler/testdata/timeout.py +++ b/sdk/python/tests/compiler/testdata/timeout.py @@ -47,6 +47,4 @@ def timeout_sample_pipeline(): if __name__ == '__main__': from kfp_tekton.compiler import TektonCompiler - pipeline_conf = dsl.PipelineConf() - pipeline_conf.set_timeout(100) - TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml'), pipeline_conf=pipeline_conf) + TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/timeout_config.py b/sdk/python/tests/compiler/testdata/timeout_config.py new file mode 100755 index 0000000000..77b600aadc --- /dev/null +++ b/sdk/python/tests/compiler/testdata/timeout_config.py @@ -0,0 +1,51 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from kfp import dsl, components + + +random_failure_1Op = components.load_component_from_text(""" +name: random-failure +description: random failure +inputs: + - {name: exitcodes, type: String} +implementation: + container: + image: python:alpine3.6 + command: + - python + - -c + args: + - | + import random; import sys; exit_code = random.choice([$0]); print(exit_code); \ + import time; time.sleep(30); sys.exit(exit_code) + - {inputValue: exitcodes} +""") + + +@dsl.pipeline( + name='pipeline-includes-two-steps-which-fail-randomly', + description='shows how to use ContainerOp set_timeout().' +) +def timeout_sample_pipeline(): + op1 = random_failure_1Op('0,1,2,3').set_timeout(20) + op2 = random_failure_1Op('0,1') + + +if __name__ == '__main__': + from kfp_tekton.compiler import TektonCompiler + pipeline_conf = dsl.PipelineConf() + pipeline_conf.set_timeout(100) + TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml'), pipeline_conf=pipeline_conf) diff --git a/sdk/python/tests/compiler/testdata/timeout_config.yaml b/sdk/python/tests/compiler/testdata/timeout_config.yaml new file mode 100644 index 0000000000..85253805d5 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/timeout_config.yaml @@ -0,0 +1,76 @@ +# Copyright 2021 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: pipeline-includes-two-steps-which-fail-randomly + annotations: + tekton.dev/output_artifacts: '{}' + tekton.dev/input_artifacts: '{}' + tekton.dev/artifact_bucket: mlpipeline + tekton.dev/artifact_endpoint: minio-service.kubeflow:9000 + tekton.dev/artifact_endpoint_scheme: http:// + tekton.dev/artifact_items: '{"random-failure": [], "random-failure-2": []}' + sidecar.istio.io/inject: "false" + tekton.dev/template: '' + pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME + pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use ContainerOp + set_timeout().", "name": "pipeline-includes-two-steps-which-fail-randomly"}' + labels: + pipelines.kubeflow.org/pipelinename: '' + pipelines.kubeflow.org/generation: '' +spec: + pipelineSpec: + tasks: + - name: random-failure + taskSpec: + steps: + - name: main + args: + - | + import random; import sys; exit_code = random.choice([$0]); print(exit_code); import time; time.sleep(30); sys.exit(exit_code) + - 0,1,2,3 + command: + - python + - -c + image: python:alpine3.6 + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "random-failure", + "outputs": [], "version": "random-failure@sha256=7a3950e9d0afce355b325b09e2cd5710feb99e6233a330af889d80515c4aaac2"}' + timeout: 20s + - name: random-failure-2 + taskSpec: + steps: + - name: main + args: + - | + import random; import sys; exit_code = random.choice([$0]); print(exit_code); import time; time.sleep(30); sys.exit(exit_code) + - 0,1 + command: + - python + - -c + image: python:alpine3.6 + metadata: + labels: + pipelines.kubeflow.org/cache_enabled: "true" + annotations: + pipelines.kubeflow.org/component_spec_digest: '{"name": "random-failure", + "outputs": [], "version": "random-failure@sha256=7a3950e9d0afce355b325b09e2cd5710feb99e6233a330af889d80515c4aaac2"}' + timeouts: + pipeline: 400s + tasks: 100s