Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add pipeline_conf.timeout sdk support #1108

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1610,6 +1610,11 @@ def _create_workflow(self,
if pipeline_conf and pipeline_conf.data_passing_method is not None:
workflow = fix_big_data_passing_using_volume(workflow, pipeline_conf)

if pipeline_conf and pipeline_conf.timeout > 0:
workflow['spec'].setdefault('timeouts', {'pipeline': '0s', 'tasks': '0s'})
workflow['spec']['timeouts']['tasks'] = '%ds' % pipeline_conf.timeout
workflow['spec']['timeouts']['pipeline'] = '%ds' % (pipeline_conf.timeout + DEFAULT_FINALLY_SECONDS)

workflow.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/pipeline_spec'] = \
json.dumps(pipeline_meta.to_dict(), sort_keys=True)

Expand Down
25 changes: 20 additions & 5 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,16 @@ def test_timeout_workflow(self):
from .testdata.timeout import timeout_sample_pipeline
self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml', skip_noninlined=True)

def test_timeout_config_workflow(self):
"""
Test compiling a step level timeout config workflow.
"""
from .testdata.timeout_config import timeout_sample_pipeline
from kfp import dsl
pipeline_conf = dsl.PipelineConf()
pipeline_conf.set_timeout(100)
self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout_config.yaml', pipeline_conf=pipeline_conf, skip_noninlined=True)

def test_display_name_workflow(self):
"""
Test compiling a step level timeout workflow.
Expand Down Expand Up @@ -847,7 +857,8 @@ def _test_pipeline_workflow_inlined_spec(self,
pipeline_function,
pipeline_yaml,
normalize_compiler_output_function=None,
tekton_pipeline_conf=TektonPipelineConf()):
tekton_pipeline_conf=TektonPipelineConf(),
pipeline_conf=None):
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml)
temp_dir = tempfile.mkdtemp()
Expand All @@ -856,7 +867,8 @@ def _test_pipeline_workflow_inlined_spec(self,
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file,
tekton_pipeline_conf=tekton_pipeline_conf)
tekton_pipeline_conf=tekton_pipeline_conf,
pipeline_conf=pipeline_conf)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f
Expand All @@ -870,12 +882,14 @@ def _test_pipeline_workflow(self,
pipeline_yaml,
normalize_compiler_output_function=None,
tekton_pipeline_conf=TektonPipelineConf(),
skip_noninlined=False):
skip_noninlined=False,
pipeline_conf=None):
self._test_pipeline_workflow_inlined_spec(
pipeline_function=pipeline_function,
pipeline_yaml=pipeline_yaml,
normalize_compiler_output_function=normalize_compiler_output_function,
tekton_pipeline_conf=tekton_pipeline_conf)
tekton_pipeline_conf=tekton_pipeline_conf,
pipeline_conf=pipeline_conf)
if not skip_noninlined:
test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata')
golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml.replace(".yaml", "") + "_noninlined.yaml")
Expand All @@ -885,7 +899,8 @@ def _test_pipeline_workflow(self,
try:
compiler.TektonCompiler().compile(pipeline_function,
compiled_yaml_file,
tekton_pipeline_conf=tekton_pipeline_conf)
tekton_pipeline_conf=tekton_pipeline_conf,
pipeline_conf=pipeline_conf)
with open(compiled_yaml_file, 'r') as f:
f = normalize_compiler_output_function(
f.read()) if normalize_compiler_output_function else f
Expand Down
51 changes: 51 additions & 0 deletions sdk/python/tests/compiler/testdata/timeout_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from kfp import dsl, components


random_failure_1Op = components.load_component_from_text("""
name: random-failure
description: random failure
inputs:
- {name: exitcodes, type: String}
implementation:
container:
image: python:alpine3.6
command:
- python
- -c
args:
- |
import random; import sys; exit_code = random.choice([$0]); print(exit_code); \
import time; time.sleep(30); sys.exit(exit_code)
- {inputValue: exitcodes}
""")


@dsl.pipeline(
name='pipeline-includes-two-steps-which-fail-randomly',
description='shows how to use ContainerOp set_timeout().'
)
def timeout_sample_pipeline():
op1 = random_failure_1Op('0,1,2,3').set_timeout(20)
op2 = random_failure_1Op('0,1')


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
pipeline_conf = dsl.PipelineConf()
pipeline_conf.set_timeout(100)
TektonCompiler().compile(timeout_sample_pipeline, __file__.replace('.py', '.yaml'), pipeline_conf=pipeline_conf)
76 changes: 76 additions & 0 deletions sdk/python/tests/compiler/testdata/timeout_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: pipeline-includes-two-steps-which-fail-randomly
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"random-failure": [], "random-failure-2": []}'
sidecar.istio.io/inject: "false"
tekton.dev/template: ''
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"description": "shows how to use ContainerOp
set_timeout().", "name": "pipeline-includes-two-steps-which-fail-randomly"}'
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
spec:
pipelineSpec:
tasks:
- name: random-failure
taskSpec:
steps:
- name: main
args:
- |
import random; import sys; exit_code = random.choice([$0]); print(exit_code); import time; time.sleep(30); sys.exit(exit_code)
- 0,1,2,3
command:
- python
- -c
image: python:alpine3.6
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "random-failure",
"outputs": [], "version": "random-failure@sha256=7a3950e9d0afce355b325b09e2cd5710feb99e6233a330af889d80515c4aaac2"}'
timeout: 20s
- name: random-failure-2
taskSpec:
steps:
- name: main
args:
- |
import random; import sys; exit_code = random.choice([$0]); print(exit_code); import time; time.sleep(30); sys.exit(exit_code)
- 0,1
command:
- python
- -c
image: python:alpine3.6
metadata:
labels:
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec_digest: '{"name": "random-failure",
"outputs": [], "version": "random-failure@sha256=7a3950e9d0afce355b325b09e2cd5710feb99e6233a330af889d80515c4aaac2"}'
timeouts:
pipeline: 400s
tasks: 100s